Line data Source code
1 : /* SPDX-License-Identifier: GPL-2.0-or-later */
2 : /*
3 : * Definitions for the 'struct ptr_ring' datastructure.
4 : *
5 : * Author:
6 : * Michael S. Tsirkin <mst@redhat.com>
7 : *
8 : * Copyright (C) 2016 Red Hat, Inc.
9 : *
10 : * This is a limited-size FIFO maintaining pointers in FIFO order, with
11 : * one CPU producing entries and another consuming entries from a FIFO.
12 : *
13 : * This implementation tries to minimize cache-contention when there is a
14 : * single producer and a single consumer CPU.
15 : */
16 :
17 : #ifndef _LINUX_PTR_RING_H
18 : #define _LINUX_PTR_RING_H 1
19 :
20 : #ifdef __KERNEL__
21 : #include <linux/spinlock.h>
22 : #include <linux/cache.h>
23 : #include <linux/types.h>
24 : #include <linux/compiler.h>
25 : #include <linux/slab.h>
26 : #include <linux/mm.h>
27 : #include <asm/errno.h>
28 : #endif
29 :
30 : struct ptr_ring {
31 : int producer ____cacheline_aligned_in_smp;
32 : spinlock_t producer_lock;
33 : int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34 : int consumer_tail; /* next entry to invalidate */
35 : spinlock_t consumer_lock;
36 : /* Shared consumer/producer data */
37 : /* Read-only by both the producer and the consumer */
38 : int size ____cacheline_aligned_in_smp; /* max entries in queue */
39 : int batch; /* number of entries to consume in a batch */
40 : void **queue;
41 : };
42 :
43 : /* Note: callers invoking this in a loop must use a compiler barrier,
44 : * for example cpu_relax().
45 : *
46 : * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47 : * see e.g. ptr_ring_full.
48 : */
49 : static inline bool __ptr_ring_full(struct ptr_ring *r)
50 : {
51 : return r->queue[r->producer];
52 : }
53 :
54 : static inline bool ptr_ring_full(struct ptr_ring *r)
55 : {
56 : bool ret;
57 :
58 : spin_lock(&r->producer_lock);
59 : ret = __ptr_ring_full(r);
60 : spin_unlock(&r->producer_lock);
61 :
62 : return ret;
63 : }
64 :
65 : static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66 : {
67 : bool ret;
68 :
69 : spin_lock_irq(&r->producer_lock);
70 : ret = __ptr_ring_full(r);
71 : spin_unlock_irq(&r->producer_lock);
72 :
73 : return ret;
74 : }
75 :
76 : static inline bool ptr_ring_full_any(struct ptr_ring *r)
77 : {
78 : unsigned long flags;
79 : bool ret;
80 :
81 : spin_lock_irqsave(&r->producer_lock, flags);
82 : ret = __ptr_ring_full(r);
83 : spin_unlock_irqrestore(&r->producer_lock, flags);
84 :
85 : return ret;
86 : }
87 :
88 : static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89 : {
90 : bool ret;
91 :
92 : spin_lock_bh(&r->producer_lock);
93 : ret = __ptr_ring_full(r);
94 : spin_unlock_bh(&r->producer_lock);
95 :
96 : return ret;
97 : }
98 :
99 : /* Note: callers invoking this in a loop must use a compiler barrier,
100 : * for example cpu_relax(). Callers must hold producer_lock.
101 : * Callers are responsible for making sure pointer that is being queued
102 : * points to a valid data.
103 : */
104 448 : static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105 : {
106 448 : if (unlikely(!r->size) || r->queue[r->producer])
107 : return -ENOSPC;
108 :
109 : /* Make sure the pointer we are storing points to a valid data. */
110 : /* Pairs with the dependency ordering in __ptr_ring_consume. */
111 448 : smp_wmb();
112 :
113 448 : WRITE_ONCE(r->queue[r->producer++], ptr);
114 448 : if (unlikely(r->producer >= r->size))
115 0 : r->producer = 0;
116 : return 0;
117 : }
118 :
119 : /*
120 : * Note: resize (below) nests producer lock within consumer lock, so if you
121 : * consume in interrupt or BH context, you must disable interrupts/BH when
122 : * calling this.
123 : */
124 448 : static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125 : {
126 448 : int ret;
127 :
128 448 : spin_lock(&r->producer_lock);
129 448 : ret = __ptr_ring_produce(r, ptr);
130 448 : spin_unlock(&r->producer_lock);
131 :
132 448 : return ret;
133 : }
134 :
135 : static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136 : {
137 : int ret;
138 :
139 : spin_lock_irq(&r->producer_lock);
140 : ret = __ptr_ring_produce(r, ptr);
141 : spin_unlock_irq(&r->producer_lock);
142 :
143 : return ret;
144 : }
145 :
146 : static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147 : {
148 : unsigned long flags;
149 : int ret;
150 :
151 : spin_lock_irqsave(&r->producer_lock, flags);
152 : ret = __ptr_ring_produce(r, ptr);
153 : spin_unlock_irqrestore(&r->producer_lock, flags);
154 :
155 : return ret;
156 : }
157 :
158 : static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159 : {
160 : int ret;
161 :
162 : spin_lock_bh(&r->producer_lock);
163 : ret = __ptr_ring_produce(r, ptr);
164 : spin_unlock_bh(&r->producer_lock);
165 :
166 : return ret;
167 : }
168 :
169 448 : static inline void *__ptr_ring_peek(struct ptr_ring *r)
170 : {
171 0 : if (likely(r->size))
172 0 : return READ_ONCE(r->queue[r->consumer_head]);
173 : return NULL;
174 : }
175 :
176 : /*
177 : * Test ring empty status without taking any locks.
178 : *
179 : * NB: This is only safe to call if ring is never resized.
180 : *
181 : * However, if some other CPU consumes ring entries at the same time, the value
182 : * returned is not guaranteed to be correct.
183 : *
184 : * In this case - to avoid incorrectly detecting the ring
185 : * as empty - the CPU consuming the ring entries is responsible
186 : * for either consuming all ring entries until the ring is empty,
187 : * or synchronizing with some other CPU and causing it to
188 : * re-test __ptr_ring_empty and/or consume the ring enteries
189 : * after the synchronization point.
190 : *
191 : * Note: callers invoking this in a loop must use a compiler barrier,
192 : * for example cpu_relax().
193 : */
194 2591 : static inline bool __ptr_ring_empty(struct ptr_ring *r)
195 : {
196 2591 : if (likely(r->size))
197 2591 : return !r->queue[READ_ONCE(r->consumer_head)];
198 : return true;
199 : }
200 :
201 : static inline bool ptr_ring_empty(struct ptr_ring *r)
202 : {
203 : bool ret;
204 :
205 : spin_lock(&r->consumer_lock);
206 : ret = __ptr_ring_empty(r);
207 : spin_unlock(&r->consumer_lock);
208 :
209 : return ret;
210 : }
211 :
212 : static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213 : {
214 : bool ret;
215 :
216 : spin_lock_irq(&r->consumer_lock);
217 : ret = __ptr_ring_empty(r);
218 : spin_unlock_irq(&r->consumer_lock);
219 :
220 : return ret;
221 : }
222 :
223 : static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224 : {
225 : unsigned long flags;
226 : bool ret;
227 :
228 : spin_lock_irqsave(&r->consumer_lock, flags);
229 : ret = __ptr_ring_empty(r);
230 : spin_unlock_irqrestore(&r->consumer_lock, flags);
231 :
232 : return ret;
233 : }
234 :
235 : static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236 : {
237 : bool ret;
238 :
239 : spin_lock_bh(&r->consumer_lock);
240 : ret = __ptr_ring_empty(r);
241 : spin_unlock_bh(&r->consumer_lock);
242 :
243 : return ret;
244 : }
245 :
246 : /* Must only be called after __ptr_ring_peek returned !NULL */
247 448 : static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248 : {
249 : /* Fundamentally, what we want to do is update consumer
250 : * index and zero out the entry so producer can reuse it.
251 : * Doing it naively at each consume would be as simple as:
252 : * consumer = r->consumer;
253 : * r->queue[consumer++] = NULL;
254 : * if (unlikely(consumer >= r->size))
255 : * consumer = 0;
256 : * r->consumer = consumer;
257 : * but that is suboptimal when the ring is full as producer is writing
258 : * out new entries in the same cache line. Defer these updates until a
259 : * batch of entries has been consumed.
260 : */
261 : /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
262 : * to work correctly.
263 : */
264 448 : int consumer_head = r->consumer_head;
265 448 : int head = consumer_head++;
266 :
267 : /* Once we have processed enough entries invalidate them in
268 : * the ring all at once so producer can reuse their space in the ring.
269 : * We also do this when we reach end of the ring - not mandatory
270 : * but helps keep the implementation simple.
271 : */
272 448 : if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
273 : consumer_head >= r->size)) {
274 : /* Zero out entries in the reverse order: this way we touch the
275 : * cache line that producer might currently be reading the last;
276 : * producer won't make progress and touch other cache lines
277 : * besides the first one until we write out all entries.
278 : */
279 459 : while (likely(head >= r->consumer_tail))
280 432 : r->queue[head--] = NULL;
281 27 : r->consumer_tail = consumer_head;
282 : }
283 448 : if (unlikely(consumer_head >= r->size)) {
284 0 : consumer_head = 0;
285 0 : r->consumer_tail = 0;
286 : }
287 : /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
288 448 : WRITE_ONCE(r->consumer_head, consumer_head);
289 448 : }
290 :
291 448 : static inline void *__ptr_ring_consume(struct ptr_ring *r)
292 : {
293 448 : void *ptr;
294 :
295 : /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
296 : * accessing data through the pointer is up to date. Pairs
297 : * with smp_wmb in __ptr_ring_produce.
298 : */
299 448 : ptr = __ptr_ring_peek(r);
300 448 : if (ptr)
301 448 : __ptr_ring_discard_one(r);
302 :
303 448 : return ptr;
304 : }
305 :
306 : static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
307 : void **array, int n)
308 : {
309 : void *ptr;
310 : int i;
311 :
312 : for (i = 0; i < n; i++) {
313 : ptr = __ptr_ring_consume(r);
314 : if (!ptr)
315 : break;
316 : array[i] = ptr;
317 : }
318 :
319 : return i;
320 : }
321 :
322 : /*
323 : * Note: resize (below) nests producer lock within consumer lock, so if you
324 : * call this in interrupt or BH context, you must disable interrupts/BH when
325 : * producing.
326 : */
327 : static inline void *ptr_ring_consume(struct ptr_ring *r)
328 : {
329 : void *ptr;
330 :
331 : spin_lock(&r->consumer_lock);
332 : ptr = __ptr_ring_consume(r);
333 : spin_unlock(&r->consumer_lock);
334 :
335 : return ptr;
336 : }
337 :
338 : static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
339 : {
340 : void *ptr;
341 :
342 : spin_lock_irq(&r->consumer_lock);
343 : ptr = __ptr_ring_consume(r);
344 : spin_unlock_irq(&r->consumer_lock);
345 :
346 : return ptr;
347 : }
348 :
349 : static inline void *ptr_ring_consume_any(struct ptr_ring *r)
350 : {
351 : unsigned long flags;
352 : void *ptr;
353 :
354 : spin_lock_irqsave(&r->consumer_lock, flags);
355 : ptr = __ptr_ring_consume(r);
356 : spin_unlock_irqrestore(&r->consumer_lock, flags);
357 :
358 : return ptr;
359 : }
360 :
361 : static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
362 : {
363 : void *ptr;
364 :
365 : spin_lock_bh(&r->consumer_lock);
366 : ptr = __ptr_ring_consume(r);
367 : spin_unlock_bh(&r->consumer_lock);
368 :
369 : return ptr;
370 : }
371 :
372 : static inline int ptr_ring_consume_batched(struct ptr_ring *r,
373 : void **array, int n)
374 : {
375 : int ret;
376 :
377 : spin_lock(&r->consumer_lock);
378 : ret = __ptr_ring_consume_batched(r, array, n);
379 : spin_unlock(&r->consumer_lock);
380 :
381 : return ret;
382 : }
383 :
384 : static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
385 : void **array, int n)
386 : {
387 : int ret;
388 :
389 : spin_lock_irq(&r->consumer_lock);
390 : ret = __ptr_ring_consume_batched(r, array, n);
391 : spin_unlock_irq(&r->consumer_lock);
392 :
393 : return ret;
394 : }
395 :
396 : static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
397 : void **array, int n)
398 : {
399 : unsigned long flags;
400 : int ret;
401 :
402 : spin_lock_irqsave(&r->consumer_lock, flags);
403 : ret = __ptr_ring_consume_batched(r, array, n);
404 : spin_unlock_irqrestore(&r->consumer_lock, flags);
405 :
406 : return ret;
407 : }
408 :
409 : static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
410 : void **array, int n)
411 : {
412 : int ret;
413 :
414 : spin_lock_bh(&r->consumer_lock);
415 : ret = __ptr_ring_consume_batched(r, array, n);
416 : spin_unlock_bh(&r->consumer_lock);
417 :
418 : return ret;
419 : }
420 :
421 : /* Cast to structure type and call a function without discarding from FIFO.
422 : * Function must return a value.
423 : * Callers must take consumer_lock.
424 : */
425 : #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
426 :
427 : #define PTR_RING_PEEK_CALL(r, f) ({ \
428 : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
429 : \
430 : spin_lock(&(r)->consumer_lock); \
431 : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
432 : spin_unlock(&(r)->consumer_lock); \
433 : __PTR_RING_PEEK_CALL_v; \
434 : })
435 :
436 : #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
437 : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
438 : \
439 : spin_lock_irq(&(r)->consumer_lock); \
440 : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
441 : spin_unlock_irq(&(r)->consumer_lock); \
442 : __PTR_RING_PEEK_CALL_v; \
443 : })
444 :
445 : #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
446 : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
447 : \
448 : spin_lock_bh(&(r)->consumer_lock); \
449 : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
450 : spin_unlock_bh(&(r)->consumer_lock); \
451 : __PTR_RING_PEEK_CALL_v; \
452 : })
453 :
454 : #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
455 : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
456 : unsigned long __PTR_RING_PEEK_CALL_f;\
457 : \
458 : spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
459 : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460 : spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
461 : __PTR_RING_PEEK_CALL_v; \
462 : })
463 :
464 : /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
465 : * documentation for vmalloc for which of them are legal.
466 : */
467 3 : static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
468 : {
469 3 : if (size > KMALLOC_MAX_SIZE / sizeof(void *))
470 : return NULL;
471 3 : return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
472 : }
473 :
474 3 : static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
475 : {
476 3 : r->size = size;
477 3 : r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
478 : /* We need to set batch at least to 1 to make logic
479 : * in __ptr_ring_discard_one work correctly.
480 : * Batching too much (because ring is small) would cause a lot of
481 : * burstiness. Needs tuning, for now disable batching.
482 : */
483 3 : if (r->batch > r->size / 2 || !r->batch)
484 0 : r->batch = 1;
485 : }
486 :
487 3 : static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
488 : {
489 3 : r->queue = __ptr_ring_init_queue_alloc(size, gfp);
490 3 : if (!r->queue)
491 : return -ENOMEM;
492 :
493 3 : __ptr_ring_set_size(r, size);
494 3 : r->producer = r->consumer_head = r->consumer_tail = 0;
495 3 : spin_lock_init(&r->producer_lock);
496 3 : spin_lock_init(&r->consumer_lock);
497 :
498 3 : return 0;
499 : }
500 :
501 : /*
502 : * Return entries into ring. Destroy entries that don't fit.
503 : *
504 : * Note: this is expected to be a rare slow path operation.
505 : *
506 : * Note: producer lock is nested within consumer lock, so if you
507 : * resize you must make sure all uses nest correctly.
508 : * In particular if you consume ring in interrupt or BH context, you must
509 : * disable interrupts/BH when doing so.
510 : */
511 : static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
512 : void (*destroy)(void *))
513 : {
514 : unsigned long flags;
515 : int head;
516 :
517 : spin_lock_irqsave(&r->consumer_lock, flags);
518 : spin_lock(&r->producer_lock);
519 :
520 : if (!r->size)
521 : goto done;
522 :
523 : /*
524 : * Clean out buffered entries (for simplicity). This way following code
525 : * can test entries for NULL and if not assume they are valid.
526 : */
527 : head = r->consumer_head - 1;
528 : while (likely(head >= r->consumer_tail))
529 : r->queue[head--] = NULL;
530 : r->consumer_tail = r->consumer_head;
531 :
532 : /*
533 : * Go over entries in batch, start moving head back and copy entries.
534 : * Stop when we run into previously unconsumed entries.
535 : */
536 : while (n) {
537 : head = r->consumer_head - 1;
538 : if (head < 0)
539 : head = r->size - 1;
540 : if (r->queue[head]) {
541 : /* This batch entry will have to be destroyed. */
542 : goto done;
543 : }
544 : r->queue[head] = batch[--n];
545 : r->consumer_tail = head;
546 : /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
547 : WRITE_ONCE(r->consumer_head, head);
548 : }
549 :
550 : done:
551 : /* Destroy all entries left in the batch. */
552 : while (n)
553 : destroy(batch[--n]);
554 : spin_unlock(&r->producer_lock);
555 : spin_unlock_irqrestore(&r->consumer_lock, flags);
556 : }
557 :
558 0 : static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
559 : int size, gfp_t gfp,
560 : void (*destroy)(void *))
561 : {
562 0 : int producer = 0;
563 0 : void **old;
564 0 : void *ptr;
565 :
566 0 : while ((ptr = __ptr_ring_consume(r)))
567 0 : if (producer < size)
568 0 : queue[producer++] = ptr;
569 0 : else if (destroy)
570 0 : destroy(ptr);
571 :
572 0 : if (producer >= size)
573 0 : producer = 0;
574 0 : __ptr_ring_set_size(r, size);
575 0 : r->producer = producer;
576 0 : r->consumer_head = 0;
577 0 : r->consumer_tail = 0;
578 0 : old = r->queue;
579 0 : r->queue = queue;
580 :
581 0 : return old;
582 : }
583 :
584 : /*
585 : * Note: producer lock is nested within consumer lock, so if you
586 : * resize you must make sure all uses nest correctly.
587 : * In particular if you consume ring in interrupt or BH context, you must
588 : * disable interrupts/BH when doing so.
589 : */
590 : static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
591 : void (*destroy)(void *))
592 : {
593 : unsigned long flags;
594 : void **queue = __ptr_ring_init_queue_alloc(size, gfp);
595 : void **old;
596 :
597 : if (!queue)
598 : return -ENOMEM;
599 :
600 : spin_lock_irqsave(&(r)->consumer_lock, flags);
601 : spin_lock(&(r)->producer_lock);
602 :
603 : old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
604 :
605 : spin_unlock(&(r)->producer_lock);
606 : spin_unlock_irqrestore(&(r)->consumer_lock, flags);
607 :
608 : kvfree(old);
609 :
610 : return 0;
611 : }
612 :
613 : /*
614 : * Note: producer lock is nested within consumer lock, so if you
615 : * resize you must make sure all uses nest correctly.
616 : * In particular if you consume ring in interrupt or BH context, you must
617 : * disable interrupts/BH when doing so.
618 : */
619 0 : static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
620 : unsigned int nrings,
621 : int size,
622 : gfp_t gfp, void (*destroy)(void *))
623 : {
624 0 : unsigned long flags;
625 0 : void ***queues;
626 0 : int i;
627 :
628 0 : queues = kmalloc_array(nrings, sizeof(*queues), gfp);
629 0 : if (!queues)
630 0 : goto noqueues;
631 :
632 0 : for (i = 0; i < nrings; ++i) {
633 0 : queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
634 0 : if (!queues[i])
635 0 : goto nomem;
636 : }
637 :
638 0 : for (i = 0; i < nrings; ++i) {
639 0 : spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
640 0 : spin_lock(&(rings[i])->producer_lock);
641 0 : queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
642 : size, gfp, destroy);
643 0 : spin_unlock(&(rings[i])->producer_lock);
644 0 : spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
645 : }
646 :
647 0 : for (i = 0; i < nrings; ++i)
648 0 : kvfree(queues[i]);
649 :
650 0 : kfree(queues);
651 :
652 0 : return 0;
653 :
654 0 : nomem:
655 0 : while (--i >= 0)
656 0 : kvfree(queues[i]);
657 :
658 0 : kfree(queues);
659 :
660 : noqueues:
661 : return -ENOMEM;
662 : }
663 :
664 0 : static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
665 : {
666 0 : void *ptr;
667 :
668 0 : if (destroy)
669 : while ((ptr = ptr_ring_consume(r)))
670 : destroy(ptr);
671 0 : kvfree(r->queue);
672 0 : }
673 :
674 : #endif /* _LINUX_PTR_RING_H */
|