ptr_ring: add ptr_ring_unconsume

Applications that consume a batch of entries in one go
can benefit from ability to return some of them back
into the ring.

Add an API for that - assuming there's space. If there's no space
naturally can't do this and have to drop entries, but this implies ring
is full so we'd likely drop some anyway.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
Signed-off-by: Jason Wang <jasowang@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Michael S. Tsirkin 2017-05-17 12:14:37 +08:00 коммит произвёл David S. Miller
Родитель 1fc4d180b3
Коммит 197a5212c3
1 изменённых файлов: 55 добавлений и 0 удалений

Просмотреть файл

@ -403,6 +403,61 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
return 0;
}
/*
* Return entries into ring. Destroy entries that don't fit.
*
* Note: this is expected to be a rare slow path operation.
*
* Note: producer lock is nested within consumer lock, so if you
* resize you must make sure all uses nest correctly.
* In particular if you consume ring in interrupt or BH context, you must
* disable interrupts/BH when doing so.
*/
static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
void (*destroy)(void *))
{
unsigned long flags;
int head;
spin_lock_irqsave(&r->consumer_lock, flags);
spin_lock(&r->producer_lock);
if (!r->size)
goto done;
/*
* Clean out buffered entries (for simplicity). This way following code
* can test entries for NULL and if not assume they are valid.
*/
head = r->consumer_head - 1;
while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
r->consumer_tail = r->consumer_head;
/*
* Go over entries in batch, start moving head back and copy entries.
* Stop when we run into previously unconsumed entries.
*/
while (n) {
head = r->consumer_head - 1;
if (head < 0)
head = r->size - 1;
if (r->queue[head]) {
/* This batch entry will have to be destroyed. */
goto done;
}
r->queue[head] = batch[--n];
r->consumer_tail = r->consumer_head = head;
}
done:
/* Destroy all entries left in the batch. */
while (n)
destroy(batch[--n]);
spin_unlock(&r->producer_lock);
spin_unlock_irqrestore(&r->consumer_lock, flags);
}
static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
int size, gfp_t gfp,
void (*destroy)(void *))