Line data Source code
1 : /*
2 : * Copyright (C) 2002 Sistina Software (UK) Limited.
3 : * Copyright (C) 2006 Red Hat GmbH
4 : *
5 : * This file is released under the GPL.
6 : *
7 : * Kcopyd provides a simple interface for copying an area of one
8 : * block-device to one or more other block-devices, with an asynchronous
9 : * completion notification.
10 : */
11 :
12 : #include <linux/types.h>
13 : #include <linux/atomic.h>
14 : #include <linux/blkdev.h>
15 : #include <linux/fs.h>
16 : #include <linux/init.h>
17 : #include <linux/list.h>
18 : #include <linux/mempool.h>
19 : #include <linux/module.h>
20 : #include <linux/pagemap.h>
21 : #include <linux/slab.h>
22 : #include <linux/vmalloc.h>
23 : #include <linux/workqueue.h>
24 : #include <linux/mutex.h>
25 : #include <linux/delay.h>
26 : #include <linux/device-mapper.h>
27 : #include <linux/dm-kcopyd.h>
28 :
29 : #include "dm-core.h"
30 :
31 : #define SPLIT_COUNT 8
32 : #define MIN_JOBS 8
33 :
34 : #define DEFAULT_SUB_JOB_SIZE_KB 512
35 : #define MAX_SUB_JOB_SIZE_KB 1024
36 :
37 : static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB;
38 :
39 : module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR);
40 : MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients");
41 :
42 0 : static unsigned dm_get_kcopyd_subjob_size(void)
43 : {
44 0 : unsigned sub_job_size_kb;
45 :
46 0 : sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb,
47 : DEFAULT_SUB_JOB_SIZE_KB,
48 : MAX_SUB_JOB_SIZE_KB);
49 :
50 0 : return sub_job_size_kb << 1;
51 : }
52 :
53 : /*-----------------------------------------------------------------
54 : * Each kcopyd client has its own little pool of preallocated
55 : * pages for kcopyd io.
56 : *---------------------------------------------------------------*/
57 : struct dm_kcopyd_client {
58 : struct page_list *pages;
59 : unsigned nr_reserved_pages;
60 : unsigned nr_free_pages;
61 : unsigned sub_job_size;
62 :
63 : struct dm_io_client *io_client;
64 :
65 : wait_queue_head_t destroyq;
66 :
67 : mempool_t job_pool;
68 :
69 : struct workqueue_struct *kcopyd_wq;
70 : struct work_struct kcopyd_work;
71 :
72 : struct dm_kcopyd_throttle *throttle;
73 :
74 : atomic_t nr_jobs;
75 :
76 : /*
77 : * We maintain four lists of jobs:
78 : *
79 : * i) jobs waiting for pages
80 : * ii) jobs that have pages, and are waiting for the io to be issued.
81 : * iii) jobs that don't need to do any IO and just run a callback
82 : * iv) jobs that have completed.
83 : *
84 : * All four of these are protected by job_lock.
85 : */
86 : spinlock_t job_lock;
87 : struct list_head callback_jobs;
88 : struct list_head complete_jobs;
89 : struct list_head io_jobs;
90 : struct list_head pages_jobs;
91 : };
92 :
93 : static struct page_list zero_page_list;
94 :
95 : static DEFINE_SPINLOCK(throttle_spinlock);
96 :
97 : /*
98 : * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
99 : * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
100 : * by 2.
101 : */
102 : #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ
103 :
104 : /*
105 : * Sleep this number of milliseconds.
106 : *
107 : * The value was decided experimentally.
108 : * Smaller values seem to cause an increased copy rate above the limit.
109 : * The reason for this is unknown but possibly due to jiffies rounding errors
110 : * or read/write cache inside the disk.
111 : */
112 : #define SLEEP_MSEC 100
113 :
114 : /*
115 : * Maximum number of sleep events. There is a theoretical livelock if more
116 : * kcopyd clients do work simultaneously which this limit avoids.
117 : */
118 : #define MAX_SLEEPS 10
119 :
120 0 : static void io_job_start(struct dm_kcopyd_throttle *t)
121 : {
122 0 : unsigned throttle, now, difference;
123 0 : int slept = 0, skew;
124 :
125 0 : if (unlikely(!t))
126 : return;
127 :
128 0 : try_again:
129 0 : spin_lock_irq(&throttle_spinlock);
130 :
131 0 : throttle = READ_ONCE(t->throttle);
132 :
133 0 : if (likely(throttle >= 100))
134 0 : goto skip_limit;
135 :
136 0 : now = jiffies;
137 0 : difference = now - t->last_jiffies;
138 0 : t->last_jiffies = now;
139 0 : if (t->num_io_jobs)
140 0 : t->io_period += difference;
141 0 : t->total_period += difference;
142 :
143 : /*
144 : * Maintain sane values if we got a temporary overflow.
145 : */
146 0 : if (unlikely(t->io_period > t->total_period))
147 0 : t->io_period = t->total_period;
148 :
149 0 : if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
150 0 : int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
151 0 : t->total_period >>= shift;
152 0 : t->io_period >>= shift;
153 : }
154 :
155 0 : skew = t->io_period - throttle * t->total_period / 100;
156 :
157 0 : if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
158 0 : slept++;
159 0 : spin_unlock_irq(&throttle_spinlock);
160 0 : msleep(SLEEP_MSEC);
161 0 : goto try_again;
162 : }
163 :
164 0 : skip_limit:
165 0 : t->num_io_jobs++;
166 :
167 0 : spin_unlock_irq(&throttle_spinlock);
168 : }
169 :
170 0 : static void io_job_finish(struct dm_kcopyd_throttle *t)
171 : {
172 0 : unsigned long flags;
173 :
174 0 : if (unlikely(!t))
175 : return;
176 :
177 0 : spin_lock_irqsave(&throttle_spinlock, flags);
178 :
179 0 : t->num_io_jobs--;
180 :
181 0 : if (likely(READ_ONCE(t->throttle) >= 100))
182 0 : goto skip_limit;
183 :
184 0 : if (!t->num_io_jobs) {
185 0 : unsigned now, difference;
186 :
187 0 : now = jiffies;
188 0 : difference = now - t->last_jiffies;
189 0 : t->last_jiffies = now;
190 :
191 0 : t->io_period += difference;
192 0 : t->total_period += difference;
193 :
194 : /*
195 : * Maintain sane values if we got a temporary overflow.
196 : */
197 0 : if (unlikely(t->io_period > t->total_period))
198 0 : t->io_period = t->total_period;
199 : }
200 :
201 0 : skip_limit:
202 0 : spin_unlock_irqrestore(&throttle_spinlock, flags);
203 : }
204 :
205 :
206 0 : static void wake(struct dm_kcopyd_client *kc)
207 : {
208 0 : queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
209 0 : }
210 :
211 : /*
212 : * Obtain one page for the use of kcopyd.
213 : */
214 0 : static struct page_list *alloc_pl(gfp_t gfp)
215 : {
216 0 : struct page_list *pl;
217 :
218 0 : pl = kmalloc(sizeof(*pl), gfp);
219 0 : if (!pl)
220 : return NULL;
221 :
222 0 : pl->page = alloc_page(gfp);
223 0 : if (!pl->page) {
224 0 : kfree(pl);
225 0 : return NULL;
226 : }
227 :
228 : return pl;
229 : }
230 :
231 0 : static void free_pl(struct page_list *pl)
232 : {
233 0 : __free_page(pl->page);
234 0 : kfree(pl);
235 0 : }
236 :
237 : /*
238 : * Add the provided pages to a client's free page list, releasing
239 : * back to the system any beyond the reserved_pages limit.
240 : */
241 0 : static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
242 : {
243 0 : struct page_list *next;
244 :
245 0 : do {
246 0 : next = pl->next;
247 :
248 0 : if (kc->nr_free_pages >= kc->nr_reserved_pages)
249 0 : free_pl(pl);
250 : else {
251 0 : pl->next = kc->pages;
252 0 : kc->pages = pl;
253 0 : kc->nr_free_pages++;
254 : }
255 :
256 0 : pl = next;
257 0 : } while (pl);
258 0 : }
259 :
260 0 : static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
261 : unsigned int nr, struct page_list **pages)
262 : {
263 0 : struct page_list *pl;
264 :
265 0 : *pages = NULL;
266 :
267 0 : do {
268 0 : pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
269 0 : if (unlikely(!pl)) {
270 : /* Use reserved pages */
271 0 : pl = kc->pages;
272 0 : if (unlikely(!pl))
273 0 : goto out_of_memory;
274 0 : kc->pages = pl->next;
275 0 : kc->nr_free_pages--;
276 : }
277 0 : pl->next = *pages;
278 0 : *pages = pl;
279 0 : } while (--nr);
280 :
281 : return 0;
282 :
283 0 : out_of_memory:
284 0 : if (*pages)
285 0 : kcopyd_put_pages(kc, *pages);
286 : return -ENOMEM;
287 : }
288 :
289 : /*
290 : * These three functions resize the page pool.
291 : */
292 0 : static void drop_pages(struct page_list *pl)
293 : {
294 0 : struct page_list *next;
295 :
296 0 : while (pl) {
297 0 : next = pl->next;
298 0 : free_pl(pl);
299 0 : pl = next;
300 : }
301 : }
302 :
303 : /*
304 : * Allocate and reserve nr_pages for the use of a specific client.
305 : */
306 0 : static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
307 : {
308 0 : unsigned i;
309 0 : struct page_list *pl = NULL, *next;
310 :
311 0 : for (i = 0; i < nr_pages; i++) {
312 0 : next = alloc_pl(GFP_KERNEL);
313 0 : if (!next) {
314 0 : if (pl)
315 0 : drop_pages(pl);
316 0 : return -ENOMEM;
317 : }
318 0 : next->next = pl;
319 0 : pl = next;
320 : }
321 :
322 0 : kc->nr_reserved_pages += nr_pages;
323 0 : kcopyd_put_pages(kc, pl);
324 :
325 0 : return 0;
326 : }
327 :
328 0 : static void client_free_pages(struct dm_kcopyd_client *kc)
329 : {
330 0 : BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
331 0 : drop_pages(kc->pages);
332 0 : kc->pages = NULL;
333 0 : kc->nr_free_pages = kc->nr_reserved_pages = 0;
334 0 : }
335 :
336 : /*-----------------------------------------------------------------
337 : * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
338 : * for this reason we use a mempool to prevent the client from
339 : * ever having to do io (which could cause a deadlock).
340 : *---------------------------------------------------------------*/
341 : struct kcopyd_job {
342 : struct dm_kcopyd_client *kc;
343 : struct list_head list;
344 : unsigned long flags;
345 :
346 : /*
347 : * Error state of the job.
348 : */
349 : int read_err;
350 : unsigned long write_err;
351 :
352 : /*
353 : * Either READ or WRITE
354 : */
355 : int rw;
356 : struct dm_io_region source;
357 :
358 : /*
359 : * The destinations for the transfer.
360 : */
361 : unsigned int num_dests;
362 : struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
363 :
364 : struct page_list *pages;
365 :
366 : /*
367 : * Set this to ensure you are notified when the job has
368 : * completed. 'context' is for callback to use.
369 : */
370 : dm_kcopyd_notify_fn fn;
371 : void *context;
372 :
373 : /*
374 : * These fields are only used if the job has been split
375 : * into more manageable parts.
376 : */
377 : struct mutex lock;
378 : atomic_t sub_jobs;
379 : sector_t progress;
380 : sector_t write_offset;
381 :
382 : struct kcopyd_job *master_job;
383 : };
384 :
385 : static struct kmem_cache *_job_cache;
386 :
387 1 : int __init dm_kcopyd_init(void)
388 : {
389 1 : _job_cache = kmem_cache_create("kcopyd_job",
390 : sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
391 : __alignof__(struct kcopyd_job), 0, NULL);
392 1 : if (!_job_cache)
393 : return -ENOMEM;
394 :
395 1 : zero_page_list.next = &zero_page_list;
396 2 : zero_page_list.page = ZERO_PAGE(0);
397 :
398 1 : return 0;
399 : }
400 :
401 0 : void dm_kcopyd_exit(void)
402 : {
403 0 : kmem_cache_destroy(_job_cache);
404 0 : _job_cache = NULL;
405 0 : }
406 :
407 : /*
408 : * Functions to push and pop a job onto the head of a given job
409 : * list.
410 : */
411 0 : static struct kcopyd_job *pop_io_job(struct list_head *jobs,
412 : struct dm_kcopyd_client *kc)
413 : {
414 0 : struct kcopyd_job *job;
415 :
416 : /*
417 : * For I/O jobs, pop any read, any write without sequential write
418 : * constraint and sequential writes that are at the right position.
419 : */
420 0 : list_for_each_entry(job, jobs, list) {
421 0 : if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
422 0 : list_del(&job->list);
423 0 : return job;
424 : }
425 :
426 0 : if (job->write_offset == job->master_job->write_offset) {
427 0 : job->master_job->write_offset += job->source.count;
428 0 : list_del(&job->list);
429 0 : return job;
430 : }
431 : }
432 :
433 : return NULL;
434 : }
435 :
436 0 : static struct kcopyd_job *pop(struct list_head *jobs,
437 : struct dm_kcopyd_client *kc)
438 : {
439 0 : struct kcopyd_job *job = NULL;
440 0 : unsigned long flags;
441 :
442 0 : spin_lock_irqsave(&kc->job_lock, flags);
443 :
444 0 : if (!list_empty(jobs)) {
445 0 : if (jobs == &kc->io_jobs)
446 0 : job = pop_io_job(jobs, kc);
447 : else {
448 0 : job = list_entry(jobs->next, struct kcopyd_job, list);
449 0 : list_del(&job->list);
450 : }
451 : }
452 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
453 :
454 0 : return job;
455 : }
456 :
457 0 : static void push(struct list_head *jobs, struct kcopyd_job *job)
458 : {
459 0 : unsigned long flags;
460 0 : struct dm_kcopyd_client *kc = job->kc;
461 :
462 0 : spin_lock_irqsave(&kc->job_lock, flags);
463 0 : list_add_tail(&job->list, jobs);
464 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
465 0 : }
466 :
467 :
468 0 : static void push_head(struct list_head *jobs, struct kcopyd_job *job)
469 : {
470 0 : unsigned long flags;
471 0 : struct dm_kcopyd_client *kc = job->kc;
472 :
473 0 : spin_lock_irqsave(&kc->job_lock, flags);
474 0 : list_add(&job->list, jobs);
475 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
476 0 : }
477 :
478 : /*
479 : * These three functions process 1 item from the corresponding
480 : * job list.
481 : *
482 : * They return:
483 : * < 0: error
484 : * 0: success
485 : * > 0: can't process yet.
486 : */
487 0 : static int run_complete_job(struct kcopyd_job *job)
488 : {
489 0 : void *context = job->context;
490 0 : int read_err = job->read_err;
491 0 : unsigned long write_err = job->write_err;
492 0 : dm_kcopyd_notify_fn fn = job->fn;
493 0 : struct dm_kcopyd_client *kc = job->kc;
494 :
495 0 : if (job->pages && job->pages != &zero_page_list)
496 0 : kcopyd_put_pages(kc, job->pages);
497 : /*
498 : * If this is the master job, the sub jobs have already
499 : * completed so we can free everything.
500 : */
501 0 : if (job->master_job == job) {
502 0 : mutex_destroy(&job->lock);
503 0 : mempool_free(job, &kc->job_pool);
504 : }
505 0 : fn(read_err, write_err, context);
506 :
507 0 : if (atomic_dec_and_test(&kc->nr_jobs))
508 0 : wake_up(&kc->destroyq);
509 :
510 0 : cond_resched();
511 :
512 0 : return 0;
513 : }
514 :
515 0 : static void complete_io(unsigned long error, void *context)
516 : {
517 0 : struct kcopyd_job *job = (struct kcopyd_job *) context;
518 0 : struct dm_kcopyd_client *kc = job->kc;
519 :
520 0 : io_job_finish(kc->throttle);
521 :
522 0 : if (error) {
523 0 : if (op_is_write(job->rw))
524 0 : job->write_err |= error;
525 : else
526 0 : job->read_err = 1;
527 :
528 0 : if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
529 0 : push(&kc->complete_jobs, job);
530 0 : wake(kc);
531 0 : return;
532 : }
533 : }
534 :
535 0 : if (op_is_write(job->rw))
536 0 : push(&kc->complete_jobs, job);
537 :
538 : else {
539 0 : job->rw = WRITE;
540 0 : push(&kc->io_jobs, job);
541 : }
542 :
543 0 : wake(kc);
544 : }
545 :
546 : /*
547 : * Request io on as many buffer heads as we can currently get for
548 : * a particular job.
549 : */
550 0 : static int run_io_job(struct kcopyd_job *job)
551 : {
552 0 : int r;
553 0 : struct dm_io_request io_req = {
554 0 : .bi_op = job->rw,
555 : .bi_op_flags = 0,
556 : .mem.type = DM_IO_PAGE_LIST,
557 0 : .mem.ptr.pl = job->pages,
558 : .mem.offset = 0,
559 : .notify.fn = complete_io,
560 : .notify.context = job,
561 0 : .client = job->kc->io_client,
562 : };
563 :
564 : /*
565 : * If we need to write sequentially and some reads or writes failed,
566 : * no point in continuing.
567 : */
568 0 : if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
569 0 : job->master_job->write_err) {
570 0 : job->write_err = job->master_job->write_err;
571 0 : return -EIO;
572 : }
573 :
574 0 : io_job_start(job->kc->throttle);
575 :
576 0 : if (job->rw == READ)
577 0 : r = dm_io(&io_req, 1, &job->source, NULL);
578 : else
579 0 : r = dm_io(&io_req, job->num_dests, job->dests, NULL);
580 :
581 : return r;
582 : }
583 :
584 0 : static int run_pages_job(struct kcopyd_job *job)
585 : {
586 0 : int r;
587 0 : unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
588 :
589 0 : r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
590 0 : if (!r) {
591 : /* this job is ready for io */
592 0 : push(&job->kc->io_jobs, job);
593 0 : return 0;
594 : }
595 :
596 0 : if (r == -ENOMEM)
597 : /* can't complete now */
598 0 : return 1;
599 :
600 : return r;
601 : }
602 :
603 : /*
604 : * Run through a list for as long as possible. Returns the count
605 : * of successful jobs.
606 : */
607 0 : static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
608 : int (*fn) (struct kcopyd_job *))
609 : {
610 0 : struct kcopyd_job *job;
611 0 : int r, count = 0;
612 :
613 0 : while ((job = pop(jobs, kc))) {
614 :
615 0 : r = fn(job);
616 :
617 0 : if (r < 0) {
618 : /* error this rogue job */
619 0 : if (op_is_write(job->rw))
620 0 : job->write_err = (unsigned long) -1L;
621 : else
622 0 : job->read_err = 1;
623 0 : push(&kc->complete_jobs, job);
624 0 : wake(kc);
625 : break;
626 : }
627 :
628 0 : if (r > 0) {
629 : /*
630 : * We couldn't service this job ATM, so
631 : * push this job back onto the list.
632 : */
633 0 : push_head(jobs, job);
634 0 : break;
635 : }
636 :
637 0 : count++;
638 : }
639 :
640 0 : return count;
641 : }
642 :
643 : /*
644 : * kcopyd does this every time it's woken up.
645 : */
646 0 : static void do_work(struct work_struct *work)
647 : {
648 0 : struct dm_kcopyd_client *kc = container_of(work,
649 : struct dm_kcopyd_client, kcopyd_work);
650 0 : struct blk_plug plug;
651 0 : unsigned long flags;
652 :
653 : /*
654 : * The order that these are called is *very* important.
655 : * complete jobs can free some pages for pages jobs.
656 : * Pages jobs when successful will jump onto the io jobs
657 : * list. io jobs call wake when they complete and it all
658 : * starts again.
659 : */
660 0 : spin_lock_irqsave(&kc->job_lock, flags);
661 0 : list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
662 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
663 :
664 0 : blk_start_plug(&plug);
665 0 : process_jobs(&kc->complete_jobs, kc, run_complete_job);
666 0 : process_jobs(&kc->pages_jobs, kc, run_pages_job);
667 0 : process_jobs(&kc->io_jobs, kc, run_io_job);
668 0 : blk_finish_plug(&plug);
669 0 : }
670 :
671 : /*
672 : * If we are copying a small region we just dispatch a single job
673 : * to do the copy, otherwise the io has to be split up into many
674 : * jobs.
675 : */
676 0 : static void dispatch_job(struct kcopyd_job *job)
677 : {
678 0 : struct dm_kcopyd_client *kc = job->kc;
679 0 : atomic_inc(&kc->nr_jobs);
680 0 : if (unlikely(!job->source.count))
681 0 : push(&kc->callback_jobs, job);
682 0 : else if (job->pages == &zero_page_list)
683 0 : push(&kc->io_jobs, job);
684 : else
685 0 : push(&kc->pages_jobs, job);
686 0 : wake(kc);
687 0 : }
688 :
689 0 : static void segment_complete(int read_err, unsigned long write_err,
690 : void *context)
691 : {
692 : /* FIXME: tidy this function */
693 0 : sector_t progress = 0;
694 0 : sector_t count = 0;
695 0 : struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
696 0 : struct kcopyd_job *job = sub_job->master_job;
697 0 : struct dm_kcopyd_client *kc = job->kc;
698 :
699 0 : mutex_lock(&job->lock);
700 :
701 : /* update the error */
702 0 : if (read_err)
703 0 : job->read_err = 1;
704 :
705 0 : if (write_err)
706 0 : job->write_err |= write_err;
707 :
708 : /*
709 : * Only dispatch more work if there hasn't been an error.
710 : */
711 0 : if ((!job->read_err && !job->write_err) ||
712 0 : test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
713 : /* get the next chunk of work */
714 0 : progress = job->progress;
715 0 : count = job->source.count - progress;
716 0 : if (count) {
717 0 : if (count > kc->sub_job_size)
718 : count = kc->sub_job_size;
719 :
720 0 : job->progress += count;
721 : }
722 : }
723 0 : mutex_unlock(&job->lock);
724 :
725 0 : if (count) {
726 0 : int i;
727 :
728 0 : *sub_job = *job;
729 0 : sub_job->write_offset = progress;
730 0 : sub_job->source.sector += progress;
731 0 : sub_job->source.count = count;
732 :
733 0 : for (i = 0; i < job->num_dests; i++) {
734 0 : sub_job->dests[i].sector += progress;
735 0 : sub_job->dests[i].count = count;
736 : }
737 :
738 0 : sub_job->fn = segment_complete;
739 0 : sub_job->context = sub_job;
740 0 : dispatch_job(sub_job);
741 :
742 0 : } else if (atomic_dec_and_test(&job->sub_jobs)) {
743 :
744 : /*
745 : * Queue the completion callback to the kcopyd thread.
746 : *
747 : * Some callers assume that all the completions are called
748 : * from a single thread and don't race with each other.
749 : *
750 : * We must not call the callback directly here because this
751 : * code may not be executing in the thread.
752 : */
753 0 : push(&kc->complete_jobs, job);
754 0 : wake(kc);
755 : }
756 0 : }
757 :
758 : /*
759 : * Create some sub jobs to share the work between them.
760 : */
761 0 : static void split_job(struct kcopyd_job *master_job)
762 : {
763 0 : int i;
764 :
765 0 : atomic_inc(&master_job->kc->nr_jobs);
766 :
767 0 : atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
768 0 : for (i = 0; i < SPLIT_COUNT; i++) {
769 0 : master_job[i + 1].master_job = master_job;
770 0 : segment_complete(0, 0u, &master_job[i + 1]);
771 : }
772 0 : }
773 :
774 0 : void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
775 : unsigned int num_dests, struct dm_io_region *dests,
776 : unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
777 : {
778 0 : struct kcopyd_job *job;
779 0 : int i;
780 :
781 : /*
782 : * Allocate an array of jobs consisting of one master job
783 : * followed by SPLIT_COUNT sub jobs.
784 : */
785 0 : job = mempool_alloc(&kc->job_pool, GFP_NOIO);
786 0 : mutex_init(&job->lock);
787 :
788 : /*
789 : * set up for the read.
790 : */
791 0 : job->kc = kc;
792 0 : job->flags = flags;
793 0 : job->read_err = 0;
794 0 : job->write_err = 0;
795 :
796 0 : job->num_dests = num_dests;
797 0 : memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
798 :
799 : /*
800 : * If one of the destination is a host-managed zoned block device,
801 : * we need to write sequentially. If one of the destination is a
802 : * host-aware device, then leave it to the caller to choose what to do.
803 : */
804 0 : if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
805 0 : for (i = 0; i < job->num_dests; i++) {
806 0 : if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
807 : set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
808 : break;
809 : }
810 : }
811 : }
812 :
813 : /*
814 : * If we need to write sequentially, errors cannot be ignored.
815 : */
816 0 : if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
817 0 : test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
818 0 : clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
819 :
820 0 : if (from) {
821 0 : job->source = *from;
822 0 : job->pages = NULL;
823 0 : job->rw = READ;
824 : } else {
825 0 : memset(&job->source, 0, sizeof job->source);
826 0 : job->source.count = job->dests[0].count;
827 0 : job->pages = &zero_page_list;
828 :
829 : /*
830 : * Use WRITE ZEROES to optimize zeroing if all dests support it.
831 : */
832 0 : job->rw = REQ_OP_WRITE_ZEROES;
833 0 : for (i = 0; i < job->num_dests; i++)
834 0 : if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
835 0 : job->rw = WRITE;
836 0 : break;
837 : }
838 : }
839 :
840 0 : job->fn = fn;
841 0 : job->context = context;
842 0 : job->master_job = job;
843 0 : job->write_offset = 0;
844 :
845 0 : if (job->source.count <= kc->sub_job_size)
846 0 : dispatch_job(job);
847 : else {
848 0 : job->progress = 0;
849 0 : split_job(job);
850 : }
851 0 : }
852 : EXPORT_SYMBOL(dm_kcopyd_copy);
853 :
854 0 : void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
855 : unsigned num_dests, struct dm_io_region *dests,
856 : unsigned flags, dm_kcopyd_notify_fn fn, void *context)
857 : {
858 0 : dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
859 0 : }
860 : EXPORT_SYMBOL(dm_kcopyd_zero);
861 :
862 0 : void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
863 : dm_kcopyd_notify_fn fn, void *context)
864 : {
865 0 : struct kcopyd_job *job;
866 :
867 0 : job = mempool_alloc(&kc->job_pool, GFP_NOIO);
868 :
869 0 : memset(job, 0, sizeof(struct kcopyd_job));
870 0 : job->kc = kc;
871 0 : job->fn = fn;
872 0 : job->context = context;
873 0 : job->master_job = job;
874 :
875 0 : atomic_inc(&kc->nr_jobs);
876 :
877 0 : return job;
878 : }
879 : EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
880 :
881 0 : void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
882 : {
883 0 : struct kcopyd_job *job = j;
884 0 : struct dm_kcopyd_client *kc = job->kc;
885 :
886 0 : job->read_err = read_err;
887 0 : job->write_err = write_err;
888 :
889 0 : push(&kc->callback_jobs, job);
890 0 : wake(kc);
891 0 : }
892 : EXPORT_SYMBOL(dm_kcopyd_do_callback);
893 :
894 : /*
895 : * Cancels a kcopyd job, eg. someone might be deactivating a
896 : * mirror.
897 : */
898 : #if 0
899 : int kcopyd_cancel(struct kcopyd_job *job, int block)
900 : {
901 : /* FIXME: finish */
902 : return -1;
903 : }
904 : #endif /* 0 */
905 :
906 : /*-----------------------------------------------------------------
907 : * Client setup
908 : *---------------------------------------------------------------*/
909 0 : struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
910 : {
911 0 : int r;
912 0 : unsigned reserve_pages;
913 0 : struct dm_kcopyd_client *kc;
914 :
915 0 : kc = kzalloc(sizeof(*kc), GFP_KERNEL);
916 0 : if (!kc)
917 0 : return ERR_PTR(-ENOMEM);
918 :
919 0 : spin_lock_init(&kc->job_lock);
920 0 : INIT_LIST_HEAD(&kc->callback_jobs);
921 0 : INIT_LIST_HEAD(&kc->complete_jobs);
922 0 : INIT_LIST_HEAD(&kc->io_jobs);
923 0 : INIT_LIST_HEAD(&kc->pages_jobs);
924 0 : kc->throttle = throttle;
925 :
926 0 : r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
927 0 : if (r)
928 0 : goto bad_slab;
929 :
930 0 : INIT_WORK(&kc->kcopyd_work, do_work);
931 0 : kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
932 0 : if (!kc->kcopyd_wq) {
933 0 : r = -ENOMEM;
934 0 : goto bad_workqueue;
935 : }
936 :
937 0 : kc->sub_job_size = dm_get_kcopyd_subjob_size();
938 0 : reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE);
939 :
940 0 : kc->pages = NULL;
941 0 : kc->nr_reserved_pages = kc->nr_free_pages = 0;
942 0 : r = client_reserve_pages(kc, reserve_pages);
943 0 : if (r)
944 0 : goto bad_client_pages;
945 :
946 0 : kc->io_client = dm_io_client_create();
947 0 : if (IS_ERR(kc->io_client)) {
948 0 : r = PTR_ERR(kc->io_client);
949 0 : goto bad_io_client;
950 : }
951 :
952 0 : init_waitqueue_head(&kc->destroyq);
953 0 : atomic_set(&kc->nr_jobs, 0);
954 :
955 0 : return kc;
956 :
957 0 : bad_io_client:
958 0 : client_free_pages(kc);
959 0 : bad_client_pages:
960 0 : destroy_workqueue(kc->kcopyd_wq);
961 0 : bad_workqueue:
962 0 : mempool_exit(&kc->job_pool);
963 0 : bad_slab:
964 0 : kfree(kc);
965 :
966 0 : return ERR_PTR(r);
967 : }
968 : EXPORT_SYMBOL(dm_kcopyd_client_create);
969 :
970 0 : void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
971 : {
972 : /* Wait for completion of all jobs submitted by this client. */
973 0 : wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
974 :
975 0 : BUG_ON(!list_empty(&kc->callback_jobs));
976 0 : BUG_ON(!list_empty(&kc->complete_jobs));
977 0 : BUG_ON(!list_empty(&kc->io_jobs));
978 0 : BUG_ON(!list_empty(&kc->pages_jobs));
979 0 : destroy_workqueue(kc->kcopyd_wq);
980 0 : dm_io_client_destroy(kc->io_client);
981 0 : client_free_pages(kc);
982 0 : mempool_exit(&kc->job_pool);
983 0 : kfree(kc);
984 0 : }
985 : EXPORT_SYMBOL(dm_kcopyd_client_destroy);
|