io_uring: add support for link with drain
To support the link with drain, we need to do two parts. There is an sqes: 0 1 2 3 4 5 6 +-----+-----+-----+-----+-----+-----+-----+ | N | L | L | L+D | N | N | N | +-----+-----+-----+-----+-----+-----+-----+ First, we need to ensure that the io before the link is completed, there is a easy way is set drain flag to the link list's head, so all subsequent io will be inserted into the defer_list. +-----+ (0) | N | +-----+ | (2) (3) (4) +-----+ +-----+ +-----+ +-----+ (1) | L+D | --> | L | --> | L+D | --> | N | +-----+ +-----+ +-----+ +-----+ | +-----+ (5) | N | +-----+ | +-----+ (6) | N | +-----+ Second, ensure that the following IO will not be completed first, an easy way is to create a mirror of drain io and insert it into defer_list, in this way, as long as drain io is not processed, the following io in the defer_list will not be actively process. +-----+ (0) | N | +-----+ | (2) (3) (4) +-----+ +-----+ +-----+ +-----+ (1) | L+D | --> | L | --> | L+D | --> | N | +-----+ +-----+ +-----+ +-----+ | +-----+ ('3) | D | <== This is a shadow of (3) +-----+ | +-----+ (5) | N | +-----+ | +-----+ (6) | N | +-----+ Signed-off-by: Jackie Liu <liuyun01@kylinos.cn> Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Родитель
8776f3fa15
Коммит
4fe2c96315
112
fs/io_uring.c
112
fs/io_uring.c
|
@ -312,6 +312,7 @@ struct io_kiocb {
|
||||||
#define REQ_F_LINK 64 /* linked sqes */
|
#define REQ_F_LINK 64 /* linked sqes */
|
||||||
#define REQ_F_LINK_DONE 128 /* linked sqes done */
|
#define REQ_F_LINK_DONE 128 /* linked sqes done */
|
||||||
#define REQ_F_FAIL_LINK 256 /* fail rest of links */
|
#define REQ_F_FAIL_LINK 256 /* fail rest of links */
|
||||||
|
#define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */
|
||||||
u64 user_data;
|
u64 user_data;
|
||||||
u32 result;
|
u32 result;
|
||||||
u32 sequence;
|
u32 sequence;
|
||||||
|
@ -343,6 +344,7 @@ struct io_submit_state {
|
||||||
};
|
};
|
||||||
|
|
||||||
static void io_sq_wq_submit_work(struct work_struct *work);
|
static void io_sq_wq_submit_work(struct work_struct *work);
|
||||||
|
static void __io_free_req(struct io_kiocb *req);
|
||||||
|
|
||||||
static struct kmem_cache *req_cachep;
|
static struct kmem_cache *req_cachep;
|
||||||
|
|
||||||
|
@ -448,6 +450,11 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
|
||||||
__io_commit_cqring(ctx);
|
__io_commit_cqring(ctx);
|
||||||
|
|
||||||
while ((req = io_get_deferred_req(ctx)) != NULL) {
|
while ((req = io_get_deferred_req(ctx)) != NULL) {
|
||||||
|
if (req->flags & REQ_F_SHADOW_DRAIN) {
|
||||||
|
/* Just for drain, free it. */
|
||||||
|
__io_free_req(req);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
req->flags |= REQ_F_IO_DRAINED;
|
req->flags |= REQ_F_IO_DRAINED;
|
||||||
queue_work(ctx->sqo_wq, &req->work);
|
queue_work(ctx->sqo_wq, &req->work);
|
||||||
}
|
}
|
||||||
|
@ -2015,10 +2022,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
|
||||||
flags = READ_ONCE(s->sqe->flags);
|
flags = READ_ONCE(s->sqe->flags);
|
||||||
fd = READ_ONCE(s->sqe->fd);
|
fd = READ_ONCE(s->sqe->fd);
|
||||||
|
|
||||||
if (flags & IOSQE_IO_DRAIN) {
|
if (flags & IOSQE_IO_DRAIN)
|
||||||
req->flags |= REQ_F_IO_DRAIN;
|
req->flags |= REQ_F_IO_DRAIN;
|
||||||
|
/*
|
||||||
|
* All io need record the previous position, if LINK vs DARIN,
|
||||||
|
* it can be used to mark the position of the first IO in the
|
||||||
|
* link list.
|
||||||
|
*/
|
||||||
req->sequence = s->sequence;
|
req->sequence = s->sequence;
|
||||||
}
|
|
||||||
|
|
||||||
if (!io_op_needs_file(s->sqe))
|
if (!io_op_needs_file(s->sqe))
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2040,20 +2051,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
||||||
struct sqe_submit *s)
|
struct sqe_submit *s)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
ret = io_req_defer(ctx, req, s->sqe);
|
|
||||||
if (ret) {
|
|
||||||
if (ret != -EIOCBQUEUED) {
|
|
||||||
io_free_req(req);
|
|
||||||
io_cqring_add_event(ctx, s->sqe->user_data, ret);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = __io_submit_sqe(ctx, req, s, true);
|
ret = __io_submit_sqe(ctx, req, s, true);
|
||||||
if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
|
if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
|
||||||
struct io_uring_sqe *sqe_copy;
|
struct io_uring_sqe *sqe_copy;
|
||||||
|
@ -2096,6 +2098,64 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
||||||
|
struct sqe_submit *s)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
ret = io_req_defer(ctx, req, s->sqe);
|
||||||
|
if (ret) {
|
||||||
|
if (ret != -EIOCBQUEUED) {
|
||||||
|
io_free_req(req);
|
||||||
|
io_cqring_add_event(ctx, s->sqe->user_data, ret);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return __io_queue_sqe(ctx, req, s);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
||||||
|
struct sqe_submit *s, struct io_kiocb *shadow)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
int need_submit = false;
|
||||||
|
|
||||||
|
if (!shadow)
|
||||||
|
return io_queue_sqe(ctx, req, s);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Mark the first IO in link list as DRAIN, let all the following
|
||||||
|
* IOs enter the defer list. all IO needs to be completed before link
|
||||||
|
* list.
|
||||||
|
*/
|
||||||
|
req->flags |= REQ_F_IO_DRAIN;
|
||||||
|
ret = io_req_defer(ctx, req, s->sqe);
|
||||||
|
if (ret) {
|
||||||
|
if (ret != -EIOCBQUEUED) {
|
||||||
|
io_free_req(req);
|
||||||
|
io_cqring_add_event(ctx, s->sqe->user_data, ret);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/*
|
||||||
|
* If ret == 0 means that all IOs in front of link io are
|
||||||
|
* running done. let's queue link head.
|
||||||
|
*/
|
||||||
|
need_submit = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Insert shadow req to defer_list, blocking next IOs */
|
||||||
|
spin_lock_irq(&ctx->completion_lock);
|
||||||
|
list_add_tail(&shadow->list, &ctx->defer_list);
|
||||||
|
spin_unlock_irq(&ctx->completion_lock);
|
||||||
|
|
||||||
|
if (need_submit)
|
||||||
|
return __io_queue_sqe(ctx, req, s);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
|
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
|
||||||
|
|
||||||
static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
|
static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
|
||||||
|
@ -2241,6 +2301,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
|
||||||
{
|
{
|
||||||
struct io_submit_state state, *statep = NULL;
|
struct io_submit_state state, *statep = NULL;
|
||||||
struct io_kiocb *link = NULL;
|
struct io_kiocb *link = NULL;
|
||||||
|
struct io_kiocb *shadow_req = NULL;
|
||||||
bool prev_was_link = false;
|
bool prev_was_link = false;
|
||||||
int i, submitted = 0;
|
int i, submitted = 0;
|
||||||
|
|
||||||
|
@ -2255,11 +2316,20 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
|
||||||
* that's the end of the chain. Submit the previous link.
|
* that's the end of the chain. Submit the previous link.
|
||||||
*/
|
*/
|
||||||
if (!prev_was_link && link) {
|
if (!prev_was_link && link) {
|
||||||
io_queue_sqe(ctx, link, &link->submit);
|
io_queue_link_head(ctx, link, &link->submit, shadow_req);
|
||||||
link = NULL;
|
link = NULL;
|
||||||
}
|
}
|
||||||
prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
|
prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
|
||||||
|
|
||||||
|
if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
|
||||||
|
if (!shadow_req) {
|
||||||
|
shadow_req = io_get_req(ctx, NULL);
|
||||||
|
shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
|
||||||
|
refcount_dec(&shadow_req->refs);
|
||||||
|
}
|
||||||
|
shadow_req->sequence = sqes[i].sequence;
|
||||||
|
}
|
||||||
|
|
||||||
if (unlikely(mm_fault)) {
|
if (unlikely(mm_fault)) {
|
||||||
io_cqring_add_event(ctx, sqes[i].sqe->user_data,
|
io_cqring_add_event(ctx, sqes[i].sqe->user_data,
|
||||||
-EFAULT);
|
-EFAULT);
|
||||||
|
@ -2273,7 +2343,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (link)
|
if (link)
|
||||||
io_queue_sqe(ctx, link, &link->submit);
|
io_queue_link_head(ctx, link, &link->submit, shadow_req);
|
||||||
if (statep)
|
if (statep)
|
||||||
io_submit_state_end(&state);
|
io_submit_state_end(&state);
|
||||||
|
|
||||||
|
@ -2409,6 +2479,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
|
||||||
{
|
{
|
||||||
struct io_submit_state state, *statep = NULL;
|
struct io_submit_state state, *statep = NULL;
|
||||||
struct io_kiocb *link = NULL;
|
struct io_kiocb *link = NULL;
|
||||||
|
struct io_kiocb *shadow_req = NULL;
|
||||||
bool prev_was_link = false;
|
bool prev_was_link = false;
|
||||||
int i, submit = 0;
|
int i, submit = 0;
|
||||||
|
|
||||||
|
@ -2428,11 +2499,20 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
|
||||||
* that's the end of the chain. Submit the previous link.
|
* that's the end of the chain. Submit the previous link.
|
||||||
*/
|
*/
|
||||||
if (!prev_was_link && link) {
|
if (!prev_was_link && link) {
|
||||||
io_queue_sqe(ctx, link, &link->submit);
|
io_queue_link_head(ctx, link, &link->submit, shadow_req);
|
||||||
link = NULL;
|
link = NULL;
|
||||||
}
|
}
|
||||||
prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
|
prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
|
||||||
|
|
||||||
|
if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
|
||||||
|
if (!shadow_req) {
|
||||||
|
shadow_req = io_get_req(ctx, NULL);
|
||||||
|
shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
|
||||||
|
refcount_dec(&shadow_req->refs);
|
||||||
|
}
|
||||||
|
shadow_req->sequence = s.sequence;
|
||||||
|
}
|
||||||
|
|
||||||
s.has_user = true;
|
s.has_user = true;
|
||||||
s.needs_lock = false;
|
s.needs_lock = false;
|
||||||
s.needs_fixed_file = false;
|
s.needs_fixed_file = false;
|
||||||
|
@ -2442,7 +2522,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
|
||||||
io_commit_sqring(ctx);
|
io_commit_sqring(ctx);
|
||||||
|
|
||||||
if (link)
|
if (link)
|
||||||
io_queue_sqe(ctx, link, &link->submit);
|
io_queue_link_head(ctx, link, &link->submit, shadow_req);
|
||||||
if (statep)
|
if (statep)
|
||||||
io_submit_state_end(statep);
|
io_submit_state_end(statep);
|
||||||
|
|
||||||
|
|
Загрузка…
Ссылка в новой задаче