diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 3e5efb2b236e..5903d2c0ab4d 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -24,6 +24,12 @@ #define RPC_MAX_SLOT_TABLE_LIMIT (65536U) #define RPC_MAX_SLOT_TABLE RPC_MAX_SLOT_TABLE_LIMIT +#define RPC_CWNDSHIFT (8U) +#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) +#define RPC_INITCWND RPC_CWNDSCALE +#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) +#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) + /* * This describes a timeout strategy */ diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index d173f79947c6..2d1d5a643b95 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -71,24 +71,6 @@ static void xprt_destroy(struct rpc_xprt *xprt); static DEFINE_SPINLOCK(xprt_list_lock); static LIST_HEAD(xprt_list); -/* - * The transport code maintains an estimate on the maximum number of out- - * standing RPC requests, using a smoothed version of the congestion - * avoidance implemented in 44BSD. This is basically the Van Jacobson - * congestion algorithm: If a retransmit occurs, the congestion window is - * halved; otherwise, it is incremented by 1/cwnd when - * - * - a reply is received and - * - a full number of requests are outstanding and - * - the congestion window hasn't been updated recently. - */ -#define RPC_CWNDSHIFT (8U) -#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) -#define RPC_INITCWND RPC_CWNDSCALE -#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) - -#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) - /** * xprt_register_transport - register a transport implementation * @transport: transport to register @@ -446,7 +428,15 @@ EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); * @task: recently completed RPC request used to adjust window * @result: result code of completed RPC request * - * We use a time-smoothed congestion estimator to avoid heavy oscillation. + * The transport code maintains an estimate on the maximum number of out- + * standing RPC requests, using a smoothed version of the congestion + * avoidance implemented in 44BSD. This is basically the Van Jacobson + * congestion algorithm: If a retransmit occurs, the congestion window is + * halved; otherwise, it is incremented by 1/cwnd when + * + * - a reply is received and + * - a full number of requests are outstanding and + * - the congestion window hasn't been updated recently. */ void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) { diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 96ead526b125..693966d3f33b 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,7 @@ static const char transfertypes[][12] = { * elements. Segments are then coalesced when registered, if possible * within the selected memreg mode. * - * Note, this routine is never called if the connection's memory - * registration strategy is 0 (bounce buffers). + * Returns positive number of segments converted, or a negative errno. */ static int @@ -102,10 +101,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, page_base = xdrbuf->page_base & ~PAGE_MASK; p = 0; while (len && n < nsegs) { + if (!ppages[p]) { + /* alloc the pagelist for receiving buffer */ + ppages[p] = alloc_page(GFP_ATOMIC); + if (!ppages[p]) + return -ENOMEM; + } seg[n].mr_page = ppages[p]; seg[n].mr_offset = (void *)(unsigned long) page_base; seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); - BUG_ON(seg[n].mr_len > PAGE_SIZE); + if (seg[n].mr_len > PAGE_SIZE) + return -EIO; len -= seg[n].mr_len; ++n; ++p; @@ -114,7 +120,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, /* Message overflows the seg array */ if (len && n == nsegs) - return 0; + return -EIO; if (xdrbuf->tail[0].iov_len) { /* the rpcrdma protocol allows us to omit any trailing @@ -123,7 +129,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, return n; if (n == nsegs) /* Tail remains, but we're out of segments */ - return 0; + return -EIO; seg[n].mr_page = NULL; seg[n].mr_offset = xdrbuf->tail[0].iov_base; seg[n].mr_len = xdrbuf->tail[0].iov_len; @@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO + * + * Returns positive RPC/RDMA header size, or negative errno. */ -static unsigned int +static ssize_t rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); - int nsegs, nchunks = 0; + int n, nsegs, nchunks = 0; unsigned int pos; struct rpcrdma_mr_seg *seg = req->rl_segments; struct rpcrdma_read_chunk *cur_rchunk = NULL; @@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, pos = target->head[0].iov_len; nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); - if (nsegs == 0) - return 0; + if (nsegs < 0) + return nsegs; do { - /* bind/register the memory, then build chunk from result. */ - int n = rpcrdma_register_external(seg, nsegs, + n = rpcrdma_register_external(seg, nsegs, cur_wchunk != NULL, r_xprt); if (n <= 0) goto out; @@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, /* success. all failures return above */ req->rl_nchunks = nchunks; - BUG_ON(nchunks == 0); - BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) - && (nchunks > 3)); - /* * finish off header. If write, marshal discrim and nchunks. */ @@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, out: for (pos = 0; nchunks--;) pos += rpcrdma_deregister_external( - &req->rl_segments[pos], r_xprt, NULL); - return 0; + &req->rl_segments[pos], r_xprt); + return n; } /* @@ -361,6 +364,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. * [2] -- optional padding. * [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno. */ int @@ -370,7 +375,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); char *base; - size_t hdrlen, rpclen, padlen; + size_t rpclen, padlen; + ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; @@ -441,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) /* The following simplification is not true forever */ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) wtype = rpcrdma_noch; - BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); - - if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && - (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { - /* forced to "pure inline"? */ - dprintk("RPC: %s: too much data (%d/%d) for inline\n", - __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); - return -1; + if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { + dprintk("RPC: %s: cannot marshal multiple chunk lists\n", + __func__); + return -EIO; } hdrlen = 28; /*sizeof *headerp;*/ @@ -474,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ - BUG_ON(wtype != rpcrdma_noch); - + if (wtype != rpcrdma_noch) { + dprintk("RPC: %s: invalid chunk list\n", + __func__); + return -EIO; + } } else { headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; @@ -492,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * on receive. Therefore, we request a reply chunk * for non-writes wherever feasible and efficient. */ - if (wtype == rpcrdma_noch && - r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) + if (wtype == rpcrdma_noch) wtype = rpcrdma_replych; } } @@ -511,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, headerp, wtype); } - - if (hdrlen == 0) - return -1; + if (hdrlen < 0) + return hdrlen; dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" " headerp 0x%p base 0x%p lkey 0x%x\n", @@ -680,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) rqst->rq_private_buf = rqst->rq_rcv_buf; } -/* - * This function is called when an async event is posted to - * the connection which changes the connection state. All it - * does at this point is mark the connection up/down, the rpc - * timers do the rest. - */ void -rpcrdma_conn_func(struct rpcrdma_ep *ep) +rpcrdma_connect_worker(struct work_struct *work) { + struct rpcrdma_ep *ep = + container_of(work, struct rpcrdma_ep, rep_connect_worker.work); struct rpc_xprt *xprt = ep->rep_xprt; spin_lock_bh(&xprt->transport_lock); @@ -705,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep) } /* - * This function is called when memory window unbind which we are waiting - * for completes. Just use rr_func (zeroed by upcall) to signal completion. + * This function is called when an async event is posted to + * the connection which changes the connection state. All it + * does at this point is mark the connection up/down, the rpc + * timers do the rest. */ -static void -rpcrdma_unbind_func(struct rpcrdma_rep *rep) +void +rpcrdma_conn_func(struct rpcrdma_ep *ep) { - wake_up(&rep->rr_unbind); + schedule_delayed_work(&ep->rep_connect_worker, 0); } /* @@ -728,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpc_xprt *xprt = rep->rr_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); __be32 *iptr; - int i, rdmalen, status; + int rdmalen, status; + unsigned long cwnd; /* Check status. If bad, signal disconnect and return rep to pool */ if (rep->rr_len == ~0U) { @@ -783,6 +785,7 @@ repost: /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; + xprt->reestablish_timeout = 0; /* check for expected message types */ /* The order of some of these tests is important. */ @@ -857,26 +860,10 @@ badheader: break; } - /* If using mw bind, start the deregister process now. */ - /* (Note: if mr_free(), cannot perform it here, in tasklet context) */ - if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { - case RPCRDMA_MEMWINDOWS: - for (i = 0; req->rl_nchunks-- > 1;) - i += rpcrdma_deregister_external( - &req->rl_segments[i], r_xprt, NULL); - /* Optionally wait (not here) for unbinds to complete */ - rep->rr_func = rpcrdma_unbind_func; - (void) rpcrdma_deregister_external(&req->rl_segments[i], - r_xprt, rep); - break; - case RPCRDMA_MEMWINDOWS_ASYNC: - for (i = 0; req->rl_nchunks--;) - i += rpcrdma_deregister_external(&req->rl_segments[i], - r_xprt, NULL); - break; - default: - break; - } + cwnd = xprt->cwnd; + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; + if (xprt->cwnd > cwnd) + xprt_release_rqst_cong(rqst->rq_task); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 1eb9c468d0c9..66f91f0d071a 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -149,6 +149,11 @@ static struct ctl_table sunrpc_table[] = { #endif +#define RPCRDMA_BIND_TO (60U * HZ) +#define RPCRDMA_INIT_REEST_TO (5U * HZ) +#define RPCRDMA_MAX_REEST_TO (30U * HZ) +#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ) + static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */ static void @@ -229,7 +234,6 @@ static void xprt_rdma_destroy(struct rpc_xprt *xprt) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - int rc; dprintk("RPC: %s: called\n", __func__); @@ -238,10 +242,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) xprt_clear_connected(xprt); rpcrdma_buffer_destroy(&r_xprt->rx_buf); - rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); - if (rc) - dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n", - __func__, rc); + rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); rpcrdma_ia_close(&r_xprt->rx_ia); xprt_rdma_free_addresses(xprt); @@ -289,9 +290,9 @@ xprt_setup_rdma(struct xprt_create *args) /* 60 second timeout, no retries */ xprt->timeout = &xprt_rdma_default_timeout; - xprt->bind_timeout = (60U * HZ); - xprt->reestablish_timeout = (5U * HZ); - xprt->idle_timeout = (5U * 60 * HZ); + xprt->bind_timeout = RPCRDMA_BIND_TO; + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; + xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; xprt->resvport = 0; /* privileged port not needed */ xprt->tsh_size = 0; /* RPC-RDMA handles framing */ @@ -391,7 +392,7 @@ out4: xprt_rdma_free_addresses(xprt); rc = -EINVAL; out3: - (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); + rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); out2: rpcrdma_ia_close(&new_xprt->rx_ia); out1: @@ -436,10 +437,10 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) schedule_delayed_work(&r_xprt->rdma_connect, xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > (30 * HZ)) - xprt->reestablish_timeout = (30 * HZ); - else if (xprt->reestablish_timeout < (5 * HZ)) - xprt->reestablish_timeout = (5 * HZ); + if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; + else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; } else { schedule_delayed_work(&r_xprt->rdma_connect, 0); if (!RPC_IS_ASYNC(task)) @@ -447,23 +448,6 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) } } -static int -xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) -{ - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - int credits = atomic_read(&r_xprt->rx_buf.rb_credits); - - /* == RPC_CWNDSCALE @ init, but *after* setup */ - if (r_xprt->rx_buf.rb_cwndscale == 0UL) { - r_xprt->rx_buf.rb_cwndscale = xprt->cwnd; - dprintk("RPC: %s: cwndscale %lu\n", __func__, - r_xprt->rx_buf.rb_cwndscale); - BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); - } - xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; - return xprt_reserve_xprt_cong(xprt, task); -} - /* * The RDMA allocate/free functions need the task structure as a place * to hide the struct rpcrdma_req, which is necessary for the actual send/recv @@ -479,7 +463,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) struct rpcrdma_req *req, *nreq; req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); - BUG_ON(NULL == req); + if (req == NULL) + return NULL; if (size > req->rl_size) { dprintk("RPC: %s: size %zd too large for buffer[%zd]: " @@ -503,18 +488,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) * If the allocation or registration fails, the RPC framework * will (doggedly) retry. */ - if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy == - RPCRDMA_BOUNCEBUFFERS) { - /* forced to "pure inline" */ - dprintk("RPC: %s: too much data (%zd) for inline " - "(r/w max %d/%d)\n", __func__, size, - rpcx_to_rdmad(xprt).inline_rsize, - rpcx_to_rdmad(xprt).inline_wsize); - size = req->rl_size; - rpc_exit(task, -EIO); /* fail the operation */ - rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; - goto out; - } if (task->tk_flags & RPC_TASK_SWAPPER) nreq = kmalloc(sizeof *req + size, GFP_ATOMIC); else @@ -543,7 +516,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) req = nreq; } dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); -out: req->rl_connect_cookie = 0; /* our reserved value */ return req->rl_xdr_buf; @@ -579,9 +551,7 @@ xprt_rdma_free(void *buffer) __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : ""); /* - * Finish the deregistration. When using mw bind, this was - * begun in rpcrdma_reply_handler(). In all other modes, we - * do it here, in thread context. The process is considered + * Finish the deregistration. The process is considered * complete when the rr_func vector becomes NULL - this * was put in place during rpcrdma_reply_handler() - the wait * call below will not block if the dereg is "done". If @@ -590,12 +560,7 @@ xprt_rdma_free(void *buffer) for (i = 0; req->rl_nchunks;) { --req->rl_nchunks; i += rpcrdma_deregister_external( - &req->rl_segments[i], r_xprt, NULL); - } - - if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) { - rep->rr_func = NULL; /* abandon the callback */ - req->rl_reply = NULL; + &req->rl_segments[i], r_xprt); } if (req->rl_iov.length == 0) { /* see allocate above */ @@ -630,13 +595,12 @@ xprt_rdma_send_request(struct rpc_task *task) struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + int rc; - /* marshal the send itself */ - if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) { - r_xprt->rx_stats.failed_marshal_count++; - dprintk("RPC: %s: rpcrdma_marshal_req failed\n", - __func__); - return -EIO; + if (req->rl_niovs == 0) { + rc = rpcrdma_marshal_req(rqst); + if (rc < 0) + goto failed_marshal; } if (req->rl_reply == NULL) /* e.g. reconnection */ @@ -660,6 +624,12 @@ xprt_rdma_send_request(struct rpc_task *task) rqst->rq_bytes_sent = 0; return 0; +failed_marshal: + r_xprt->rx_stats.failed_marshal_count++; + dprintk("RPC: %s: rpcrdma_marshal_req failed, status %i\n", + __func__, rc); + if (rc == -EIO) + return -EIO; drop_connection: xprt_disconnect_done(xprt); return -ENOTCONN; /* implies disconnect */ @@ -705,7 +675,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) */ static struct rpc_xprt_ops xprt_rdma_procs = { - .reserve_xprt = xprt_rdma_reserve_xprt, + .reserve_xprt = xprt_reserve_xprt_cong, .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ .alloc_slot = xprt_alloc_slot, .release_request = xprt_release_rqst_cong, /* ditto */ diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 93726560eaa8..13dbd1c389ff 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -48,8 +48,8 @@ */ #include -#include /* for Tavor hack below */ #include +#include #include "xprt_rdma.h" @@ -142,98 +142,139 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) } } -static inline -void rpcrdma_event_process(struct ib_wc *wc) +static void +rpcrdma_sendcq_process_wc(struct ib_wc *wc) { - struct rpcrdma_mw *frmr; - struct rpcrdma_rep *rep = - (struct rpcrdma_rep *)(unsigned long) wc->wr_id; + struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n", - __func__, rep, wc->status, wc->opcode, wc->byte_len); + dprintk("RPC: %s: frmr %p status %X opcode %d\n", + __func__, frmr, wc->status, wc->opcode); - if (!rep) /* send or bind completion that we don't care about */ + if (wc->wr_id == 0ULL) + return; + if (wc->status != IB_WC_SUCCESS) return; - if (IB_WC_SUCCESS != wc->status) { - dprintk("RPC: %s: WC opcode %d status %X, connection lost\n", - __func__, wc->opcode, wc->status); - rep->rr_len = ~0U; - if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV) - rpcrdma_schedule_tasklet(rep); - return; - } - - switch (wc->opcode) { - case IB_WC_FAST_REG_MR: - frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + if (wc->opcode == IB_WC_FAST_REG_MR) frmr->r.frmr.state = FRMR_IS_VALID; - break; - case IB_WC_LOCAL_INV: - frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + else if (wc->opcode == IB_WC_LOCAL_INV) frmr->r.frmr.state = FRMR_IS_INVALID; - break; - case IB_WC_RECV: - rep->rr_len = wc->byte_len; - ib_dma_sync_single_for_cpu( - rdmab_to_ia(rep->rr_buffer)->ri_id->device, - rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); - /* Keep (only) the most recent credits, after check validity */ - if (rep->rr_len >= 16) { - struct rpcrdma_msg *p = - (struct rpcrdma_msg *) rep->rr_base; - unsigned int credits = ntohl(p->rm_credit); - if (credits == 0) { - dprintk("RPC: %s: server" - " dropped credits to 0!\n", __func__); - /* don't deadlock */ - credits = 1; - } else if (credits > rep->rr_buffer->rb_max_requests) { - dprintk("RPC: %s: server" - " over-crediting: %d (%d)\n", - __func__, credits, - rep->rr_buffer->rb_max_requests); - credits = rep->rr_buffer->rb_max_requests; - } - atomic_set(&rep->rr_buffer->rb_credits, credits); - } - /* fall through */ - case IB_WC_BIND_MW: - rpcrdma_schedule_tasklet(rep); - break; - default: - dprintk("RPC: %s: unexpected WC event %X\n", - __func__, wc->opcode); - break; - } } -static inline int -rpcrdma_cq_poll(struct ib_cq *cq) +static int +rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) { - struct ib_wc wc; - int rc; + struct ib_wc *wcs; + int budget, count, rc; - for (;;) { - rc = ib_poll_cq(cq, 1, &wc); - if (rc < 0) { - dprintk("RPC: %s: ib_poll_cq failed %i\n", - __func__, rc); + budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; + do { + wcs = ep->rep_send_wcs; + + rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); + if (rc <= 0) return rc; - } - if (rc == 0) - break; - - rpcrdma_event_process(&wc); - } + count = rc; + while (count-- > 0) + rpcrdma_sendcq_process_wc(wcs++); + } while (rc == RPCRDMA_POLLSIZE && --budget); return 0; } /* - * rpcrdma_cq_event_upcall + * Handle send, fast_reg_mr, and local_inv completions. + * + * Send events are typically suppressed and thus do not result + * in an upcall. Occasionally one is signaled, however. This + * prevents the provider's completion queue from wrapping and + * losing a completion. + */ +static void +rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) +{ + struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; + int rc; + + rc = rpcrdma_sendcq_poll(cq, ep); + if (rc) { + dprintk("RPC: %s: ib_poll_cq failed: %i\n", + __func__, rc); + return; + } + + rc = ib_req_notify_cq(cq, + IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); + if (rc == 0) + return; + if (rc < 0) { + dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", + __func__, rc); + return; + } + + rpcrdma_sendcq_poll(cq, ep); +} + +static void +rpcrdma_recvcq_process_wc(struct ib_wc *wc) +{ + struct rpcrdma_rep *rep = + (struct rpcrdma_rep *)(unsigned long)wc->wr_id; + + dprintk("RPC: %s: rep %p status %X opcode %X length %u\n", + __func__, rep, wc->status, wc->opcode, wc->byte_len); + + if (wc->status != IB_WC_SUCCESS) { + rep->rr_len = ~0U; + goto out_schedule; + } + if (wc->opcode != IB_WC_RECV) + return; + + rep->rr_len = wc->byte_len; + ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, + rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); + + if (rep->rr_len >= 16) { + struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base; + unsigned int credits = ntohl(p->rm_credit); + + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > rep->rr_buffer->rb_max_requests) + credits = rep->rr_buffer->rb_max_requests; + atomic_set(&rep->rr_buffer->rb_credits, credits); + } + +out_schedule: + rpcrdma_schedule_tasklet(rep); +} + +static int +rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +{ + struct ib_wc *wcs; + int budget, count, rc; + + budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; + do { + wcs = ep->rep_recv_wcs; + + rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); + if (rc <= 0) + return rc; + + count = rc; + while (count-- > 0) + rpcrdma_recvcq_process_wc(wcs++); + } while (rc == RPCRDMA_POLLSIZE && --budget); + return 0; +} + +/* + * Handle receive completions. * - * This upcall handles recv, send, bind and unbind events. * It is reentrant but processes single events in order to maintain * ordering of receives to keep server credits. * @@ -242,26 +283,31 @@ rpcrdma_cq_poll(struct ib_cq *cq) * connection shutdown. That is, the structures required for * the completion of the reply handler must remain intact until * all memory has been reclaimed. - * - * Note that send events are suppressed and do not result in an upcall. */ static void -rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context) +rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) { + struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; int rc; - rc = rpcrdma_cq_poll(cq); - if (rc) - return; - - rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); + rc = rpcrdma_recvcq_poll(cq, ep); if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed %i\n", + dprintk("RPC: %s: ib_poll_cq failed: %i\n", __func__, rc); return; } - rpcrdma_cq_poll(cq); + rc = ib_req_notify_cq(cq, + IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); + if (rc == 0) + return; + if (rc < 0) { + dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", + __func__, rc); + return; + } + + rpcrdma_recvcq_poll(cq, ep); } #ifdef RPC_DEBUG @@ -493,54 +539,32 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey; } - switch (memreg) { - case RPCRDMA_MEMWINDOWS: - case RPCRDMA_MEMWINDOWS_ASYNC: - if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) { - dprintk("RPC: %s: MEMWINDOWS registration " - "specified but not supported by adapter, " - "using slower RPCRDMA_REGISTER\n", - __func__); - memreg = RPCRDMA_REGISTER; - } - break; - case RPCRDMA_MTHCAFMR: - if (!ia->ri_id->device->alloc_fmr) { -#if RPCRDMA_PERSISTENT_REGISTRATION - dprintk("RPC: %s: MTHCAFMR registration " - "specified but not supported by adapter, " - "using riskier RPCRDMA_ALLPHYSICAL\n", - __func__); - memreg = RPCRDMA_ALLPHYSICAL; -#else - dprintk("RPC: %s: MTHCAFMR registration " - "specified but not supported by adapter, " - "using slower RPCRDMA_REGISTER\n", - __func__); - memreg = RPCRDMA_REGISTER; -#endif - } - break; - case RPCRDMA_FRMR: + if (memreg == RPCRDMA_FRMR) { /* Requires both frmr reg and local dma lkey */ if ((devattr.device_cap_flags & (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) != (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) { -#if RPCRDMA_PERSISTENT_REGISTRATION dprintk("RPC: %s: FRMR registration " - "specified but not supported by adapter, " - "using riskier RPCRDMA_ALLPHYSICAL\n", - __func__); + "not supported by HCA\n", __func__); + memreg = RPCRDMA_MTHCAFMR; + } else { + /* Mind the ia limit on FRMR page list depth */ + ia->ri_max_frmr_depth = min_t(unsigned int, + RPCRDMA_MAX_DATA_SEGS, + devattr.max_fast_reg_page_list_len); + } + } + if (memreg == RPCRDMA_MTHCAFMR) { + if (!ia->ri_id->device->alloc_fmr) { + dprintk("RPC: %s: MTHCAFMR registration " + "not supported by HCA\n", __func__); +#if RPCRDMA_PERSISTENT_REGISTRATION memreg = RPCRDMA_ALLPHYSICAL; #else - dprintk("RPC: %s: FRMR registration " - "specified but not supported by adapter, " - "using slower RPCRDMA_REGISTER\n", - __func__); - memreg = RPCRDMA_REGISTER; + rc = -ENOMEM; + goto out2; #endif } - break; } /* @@ -552,8 +576,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) * adapter. */ switch (memreg) { - case RPCRDMA_BOUNCEBUFFERS: - case RPCRDMA_REGISTER: case RPCRDMA_FRMR: break; #if RPCRDMA_PERSISTENT_REGISTRATION @@ -563,30 +585,26 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) IB_ACCESS_REMOTE_READ; goto register_setup; #endif - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - mem_priv = IB_ACCESS_LOCAL_WRITE | - IB_ACCESS_MW_BIND; - goto register_setup; case RPCRDMA_MTHCAFMR: if (ia->ri_have_dma_lkey) break; mem_priv = IB_ACCESS_LOCAL_WRITE; +#if RPCRDMA_PERSISTENT_REGISTRATION register_setup: +#endif ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv); if (IS_ERR(ia->ri_bind_mem)) { printk(KERN_ALERT "%s: ib_get_dma_mr for " - "phys register failed with %lX\n\t" - "Will continue with degraded performance\n", + "phys register failed with %lX\n", __func__, PTR_ERR(ia->ri_bind_mem)); - memreg = RPCRDMA_REGISTER; - ia->ri_bind_mem = NULL; + rc = -ENOMEM; + goto out2; } break; default: - printk(KERN_ERR "%s: invalid memory registration mode %d\n", - __func__, memreg); - rc = -EINVAL; + printk(KERN_ERR "RPC: Unsupported memory " + "registration mode: %d\n", memreg); + rc = -ENOMEM; goto out2; } dprintk("RPC: %s: memory registration strategy is %d\n", @@ -640,6 +658,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { struct ib_device_attr devattr; + struct ib_cq *sendcq, *recvcq; int rc, err; rc = ib_query_device(ia->ri_id->device, &devattr); @@ -659,32 +678,42 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; switch (ia->ri_memreg_strategy) { - case RPCRDMA_FRMR: + case RPCRDMA_FRMR: { + int depth = 7; + /* Add room for frmr register and invalidate WRs. * 1. FRMR reg WR for head * 2. FRMR invalidate WR for head - * 3. FRMR reg WR for pagelist - * 4. FRMR invalidate WR for pagelist + * 3. N FRMR reg WRs for pagelist + * 4. N FRMR invalidate WRs for pagelist * 5. FRMR reg WR for tail * 6. FRMR invalidate WR for tail * 7. The RDMA_SEND WR */ - ep->rep_attr.cap.max_send_wr *= 7; + + /* Calculate N if the device max FRMR depth is smaller than + * RPCRDMA_MAX_DATA_SEGS. + */ + if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) { + int delta = RPCRDMA_MAX_DATA_SEGS - + ia->ri_max_frmr_depth; + + do { + depth += 2; /* FRMR reg + invalidate */ + delta -= ia->ri_max_frmr_depth; + } while (delta > 0); + + } + ep->rep_attr.cap.max_send_wr *= depth; if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) { - cdata->max_requests = devattr.max_qp_wr / 7; + cdata->max_requests = devattr.max_qp_wr / depth; if (!cdata->max_requests) return -EINVAL; - ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7; + ep->rep_attr.cap.max_send_wr = cdata->max_requests * + depth; } break; - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - /* Add room for mw_binds+unbinds - overkill! */ - ep->rep_attr.cap.max_send_wr++; - ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS); - if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) - return -EINVAL; - break; + } default: break; } @@ -705,46 +734,51 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.cap.max_recv_sge); /* set trigger for requesting send completion */ - ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/; - switch (ia->ri_memreg_strategy) { - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - ep->rep_cqinit -= RPCRDMA_MAX_SEGS; - break; - default: - break; - } + ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; if (ep->rep_cqinit <= 2) ep->rep_cqinit = 0; INIT_CQCOUNT(ep); ep->rep_ia = ia; init_waitqueue_head(&ep->rep_connect_wait); + INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); - /* - * Create a single cq for receive dto and mw_bind (only ever - * care about unbind, really). Send completions are suppressed. - * Use single threaded tasklet upcalls to maintain ordering. - */ - ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall, - rpcrdma_cq_async_error_upcall, NULL, - ep->rep_attr.cap.max_recv_wr + + sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall, + rpcrdma_cq_async_error_upcall, ep, ep->rep_attr.cap.max_send_wr + 1, 0); - if (IS_ERR(ep->rep_cq)) { - rc = PTR_ERR(ep->rep_cq); - dprintk("RPC: %s: ib_create_cq failed: %i\n", + if (IS_ERR(sendcq)) { + rc = PTR_ERR(sendcq); + dprintk("RPC: %s: failed to create send CQ: %i\n", __func__, rc); goto out1; } - rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP); + rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP); if (rc) { dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", __func__, rc); goto out2; } - ep->rep_attr.send_cq = ep->rep_cq; - ep->rep_attr.recv_cq = ep->rep_cq; + recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall, + rpcrdma_cq_async_error_upcall, ep, + ep->rep_attr.cap.max_recv_wr + 1, 0); + if (IS_ERR(recvcq)) { + rc = PTR_ERR(recvcq); + dprintk("RPC: %s: failed to create recv CQ: %i\n", + __func__, rc); + goto out2; + } + + rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP); + if (rc) { + dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", + __func__, rc); + ib_destroy_cq(recvcq); + goto out2; + } + + ep->rep_attr.send_cq = sendcq; + ep->rep_attr.recv_cq = recvcq; /* Initialize cma parameters */ @@ -754,9 +788,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, /* Client offers RDMA Read but does not initiate */ ep->rep_remote_cma.initiator_depth = 0; - if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS) - ep->rep_remote_cma.responder_resources = 0; - else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ + if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ ep->rep_remote_cma.responder_resources = 32; else ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom; @@ -768,7 +800,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return 0; out2: - err = ib_destroy_cq(ep->rep_cq); + err = ib_destroy_cq(sendcq); if (err) dprintk("RPC: %s: ib_destroy_cq returned %i\n", __func__, err); @@ -782,11 +814,8 @@ out1: * Disconnect and destroy endpoint. After this, the only * valid operations on the ep are to free it (if dynamically * allocated) or re-create it. - * - * The caller's error handling must be sure to not leak the endpoint - * if this function fails. */ -int +void rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { int rc; @@ -794,6 +823,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) dprintk("RPC: %s: entering, connected is %d\n", __func__, ep->rep_connected); + cancel_delayed_work_sync(&ep->rep_connect_worker); + if (ia->ri_id->qp) { rc = rpcrdma_ep_disconnect(ep, ia); if (rc) @@ -809,13 +840,17 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ep->rep_pad_mr = NULL; } - rpcrdma_clean_cq(ep->rep_cq); - rc = ib_destroy_cq(ep->rep_cq); + rpcrdma_clean_cq(ep->rep_attr.recv_cq); + rc = ib_destroy_cq(ep->rep_attr.recv_cq); if (rc) dprintk("RPC: %s: ib_destroy_cq returned %i\n", __func__, rc); - return rc; + rpcrdma_clean_cq(ep->rep_attr.send_cq); + rc = ib_destroy_cq(ep->rep_attr.send_cq); + if (rc) + dprintk("RPC: %s: ib_destroy_cq returned %i\n", + __func__, rc); } /* @@ -831,17 +866,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) if (ep->rep_connected != 0) { struct rpcrdma_xprt *xprt; retry: + dprintk("RPC: %s: reconnecting...\n", __func__); rc = rpcrdma_ep_disconnect(ep, ia); if (rc && rc != -ENOTCONN) dprintk("RPC: %s: rpcrdma_ep_disconnect" " status %i\n", __func__, rc); - rpcrdma_clean_cq(ep->rep_cq); + + rpcrdma_clean_cq(ep->rep_attr.recv_cq); + rpcrdma_clean_cq(ep->rep_attr.send_cq); xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); id = rpcrdma_create_id(xprt, ia, (struct sockaddr *)&xprt->rx_data.addr); if (IS_ERR(id)) { - rc = PTR_ERR(id); + rc = -EHOSTUNREACH; goto out; } /* TEMP TEMP TEMP - fail if new device: @@ -855,35 +893,32 @@ retry: printk("RPC: %s: can't reconnect on " "different device!\n", __func__); rdma_destroy_id(id); - rc = -ENETDOWN; + rc = -ENETUNREACH; goto out; } /* END TEMP */ + rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); + if (rc) { + dprintk("RPC: %s: rdma_create_qp failed %i\n", + __func__, rc); + rdma_destroy_id(id); + rc = -ENETUNREACH; + goto out; + } rdma_destroy_qp(ia->ri_id); rdma_destroy_id(ia->ri_id); ia->ri_id = id; + } else { + dprintk("RPC: %s: connecting...\n", __func__); + rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + if (rc) { + dprintk("RPC: %s: rdma_create_qp failed %i\n", + __func__, rc); + /* do not update ep->rep_connected */ + return -ENETUNREACH; + } } - rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); - if (rc) { - dprintk("RPC: %s: rdma_create_qp failed %i\n", - __func__, rc); - goto out; - } - -/* XXX Tavor device performs badly with 2K MTU! */ -if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) { - struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device); - if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR && - (pcid->vendor == PCI_VENDOR_ID_MELLANOX || - pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) { - struct ib_qp_attr attr = { - .path_mtu = IB_MTU_1024 - }; - rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU); - } -} - ep->rep_connected = 0; rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); @@ -944,7 +979,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { int rc; - rpcrdma_clean_cq(ep->rep_cq); + rpcrdma_clean_cq(ep->rep_attr.recv_cq); + rpcrdma_clean_cq(ep->rep_attr.send_cq); rc = rdma_disconnect(ia->ri_id); if (!rc) { /* returns without wait if not connected */ @@ -967,7 +1003,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { char *p; - size_t len; + size_t len, rlen, wlen; int i, rc; struct rpcrdma_mw *r; @@ -997,11 +1033,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS * sizeof(struct rpcrdma_mw); break; - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS * - sizeof(struct rpcrdma_mw); - break; default: break; } @@ -1032,32 +1063,29 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, } p += cdata->padding; - /* - * Allocate the fmr's, or mw's for mw_bind chunk registration. - * We "cycle" the mw's in order to minimize rkey reuse, - * and also reduce unbind-to-bind collision. - */ INIT_LIST_HEAD(&buf->rb_mws); r = (struct rpcrdma_mw *)p; switch (ia->ri_memreg_strategy) { case RPCRDMA_FRMR: for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) { r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd, - RPCRDMA_MAX_SEGS); + ia->ri_max_frmr_depth); if (IS_ERR(r->r.frmr.fr_mr)) { rc = PTR_ERR(r->r.frmr.fr_mr); dprintk("RPC: %s: ib_alloc_fast_reg_mr" " failed %i\n", __func__, rc); goto out; } - r->r.frmr.fr_pgl = - ib_alloc_fast_reg_page_list(ia->ri_id->device, - RPCRDMA_MAX_SEGS); + r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list( + ia->ri_id->device, + ia->ri_max_frmr_depth); if (IS_ERR(r->r.frmr.fr_pgl)) { rc = PTR_ERR(r->r.frmr.fr_pgl); dprintk("RPC: %s: " "ib_alloc_fast_reg_page_list " "failed %i\n", __func__, rc); + + ib_dereg_mr(r->r.frmr.fr_mr); goto out; } list_add(&r->mw_list, &buf->rb_mws); @@ -1082,21 +1110,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, ++r; } break; - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - /* Allocate one extra request's worth, for full cycling */ - for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) { - r->r.mw = ib_alloc_mw(ia->ri_pd, IB_MW_TYPE_1); - if (IS_ERR(r->r.mw)) { - rc = PTR_ERR(r->r.mw); - dprintk("RPC: %s: ib_alloc_mw" - " failed %i\n", __func__, rc); - goto out; - } - list_add(&r->mw_list, &buf->rb_mws); - ++r; - } - break; default: break; } @@ -1105,16 +1118,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, * Allocate/init the request/reply buffers. Doing this * using kmalloc for now -- one for each buf. */ + wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req)); + rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep)); + dprintk("RPC: %s: wlen = %zu, rlen = %zu\n", + __func__, wlen, rlen); + for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; struct rpcrdma_rep *rep; - len = cdata->inline_wsize + sizeof(struct rpcrdma_req); - /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */ - /* Typical ~2400b, so rounding up saves work later */ - if (len < 4096) - len = 4096; - req = kmalloc(len, GFP_KERNEL); + req = kmalloc(wlen, GFP_KERNEL); if (req == NULL) { dprintk("RPC: %s: request buffer %d alloc" " failed\n", __func__, i); @@ -1126,16 +1139,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, buf->rb_send_bufs[i]->rl_buffer = buf; rc = rpcrdma_register_internal(ia, req->rl_base, - len - offsetof(struct rpcrdma_req, rl_base), + wlen - offsetof(struct rpcrdma_req, rl_base), &buf->rb_send_bufs[i]->rl_handle, &buf->rb_send_bufs[i]->rl_iov); if (rc) goto out; - buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req); + buf->rb_send_bufs[i]->rl_size = wlen - + sizeof(struct rpcrdma_req); - len = cdata->inline_rsize + sizeof(struct rpcrdma_rep); - rep = kmalloc(len, GFP_KERNEL); + rep = kmalloc(rlen, GFP_KERNEL); if (rep == NULL) { dprintk("RPC: %s: reply buffer %d alloc failed\n", __func__, i); @@ -1145,10 +1158,9 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, memset(rep, 0, sizeof(struct rpcrdma_rep)); buf->rb_recv_bufs[i] = rep; buf->rb_recv_bufs[i]->rr_buffer = buf; - init_waitqueue_head(&rep->rr_unbind); rc = rpcrdma_register_internal(ia, rep->rr_base, - len - offsetof(struct rpcrdma_rep, rr_base), + rlen - offsetof(struct rpcrdma_rep, rr_base), &buf->rb_recv_bufs[i]->rr_handle, &buf->rb_recv_bufs[i]->rr_iov); if (rc) @@ -1179,7 +1191,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) /* clean up in reverse order from create * 1. recv mr memory (mr free, then kfree) - * 1a. bind mw memory * 2. send mr memory (mr free, then kfree) * 3. padding (if any) [moved to rpcrdma_ep_destroy] * 4. arrays @@ -1194,41 +1205,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) kfree(buf->rb_recv_bufs[i]); } if (buf->rb_send_bufs && buf->rb_send_bufs[i]) { - while (!list_empty(&buf->rb_mws)) { - r = list_entry(buf->rb_mws.next, - struct rpcrdma_mw, mw_list); - list_del(&r->mw_list); - switch (ia->ri_memreg_strategy) { - case RPCRDMA_FRMR: - rc = ib_dereg_mr(r->r.frmr.fr_mr); - if (rc) - dprintk("RPC: %s:" - " ib_dereg_mr" - " failed %i\n", - __func__, rc); - ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); - break; - case RPCRDMA_MTHCAFMR: - rc = ib_dealloc_fmr(r->r.fmr); - if (rc) - dprintk("RPC: %s:" - " ib_dealloc_fmr" - " failed %i\n", - __func__, rc); - break; - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - rc = ib_dealloc_mw(r->r.mw); - if (rc) - dprintk("RPC: %s:" - " ib_dealloc_mw" - " failed %i\n", - __func__, rc); - break; - default: - break; - } - } rpcrdma_deregister_internal(ia, buf->rb_send_bufs[i]->rl_handle, &buf->rb_send_bufs[i]->rl_iov); @@ -1236,6 +1212,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) } } + while (!list_empty(&buf->rb_mws)) { + r = list_entry(buf->rb_mws.next, + struct rpcrdma_mw, mw_list); + list_del(&r->mw_list); + switch (ia->ri_memreg_strategy) { + case RPCRDMA_FRMR: + rc = ib_dereg_mr(r->r.frmr.fr_mr); + if (rc) + dprintk("RPC: %s:" + " ib_dereg_mr" + " failed %i\n", + __func__, rc); + ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); + break; + case RPCRDMA_MTHCAFMR: + rc = ib_dealloc_fmr(r->r.fmr); + if (rc) + dprintk("RPC: %s:" + " ib_dealloc_fmr" + " failed %i\n", + __func__, rc); + break; + default: + break; + } + } + kfree(buf->rb_pool); } @@ -1299,21 +1302,17 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) int i; unsigned long flags; - BUG_ON(req->rl_nchunks != 0); spin_lock_irqsave(&buffers->rb_lock, flags); buffers->rb_send_bufs[--buffers->rb_send_index] = req; req->rl_niovs = 0; if (req->rl_reply) { buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply; - init_waitqueue_head(&req->rl_reply->rr_unbind); req->rl_reply->rr_func = NULL; req->rl_reply = NULL; } switch (ia->ri_memreg_strategy) { case RPCRDMA_FRMR: case RPCRDMA_MTHCAFMR: - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: /* * Cycle mw's back in reverse order, and "spin" them. * This delays and scrambles reuse as much as possible. @@ -1358,8 +1357,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) /* * Put reply buffers back into pool when not attached to - * request. This happens in error conditions, and when - * aborting unbinds. Pre-decrement counter/array index. + * request. This happens in error conditions. */ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) @@ -1498,8 +1496,8 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, seg1->mr_offset -= pageoff; /* start of page */ seg1->mr_len += pageoff; len = -pageoff; - if (*nsegs > RPCRDMA_MAX_DATA_SEGS) - *nsegs = RPCRDMA_MAX_DATA_SEGS; + if (*nsegs > ia->ri_max_frmr_depth) + *nsegs = ia->ri_max_frmr_depth; for (page_no = i = 0; i < *nsegs;) { rpcrdma_map_one(ia, seg, writing); pa = seg->mr_dma; @@ -1536,10 +1534,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, } else post_wr = &frmr_wr; - /* Bump the key */ - key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); - ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); - /* Prepare FRMR WR */ memset(&frmr_wr, 0, sizeof frmr_wr); frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; @@ -1550,7 +1544,16 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, frmr_wr.wr.fast_reg.page_list_len = page_no; frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT; frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT; - BUG_ON(frmr_wr.wr.fast_reg.length < len); + if (frmr_wr.wr.fast_reg.length < len) { + while (seg1->mr_nsegs--) + rpcrdma_unmap_one(ia, seg++); + return -EIO; + } + + /* Bump the key */ + key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); + ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); + frmr_wr.wr.fast_reg.access_flags = (writing ? IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : IB_ACCESS_REMOTE_READ); @@ -1661,135 +1664,6 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg, return rc; } -static int -rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg, - int *nsegs, int writing, struct rpcrdma_ia *ia, - struct rpcrdma_xprt *r_xprt) -{ - int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE : - IB_ACCESS_REMOTE_READ); - struct ib_mw_bind param; - int rc; - - *nsegs = 1; - rpcrdma_map_one(ia, seg, writing); - param.bind_info.mr = ia->ri_bind_mem; - param.wr_id = 0ULL; /* no send cookie */ - param.bind_info.addr = seg->mr_dma; - param.bind_info.length = seg->mr_len; - param.send_flags = 0; - param.bind_info.mw_access_flags = mem_priv; - - DECR_CQCOUNT(&r_xprt->rx_ep); - rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, ¶m); - if (rc) { - dprintk("RPC: %s: failed ib_bind_mw " - "%u@0x%llx status %i\n", - __func__, seg->mr_len, - (unsigned long long)seg->mr_dma, rc); - rpcrdma_unmap_one(ia, seg); - } else { - seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey; - seg->mr_base = param.bind_info.addr; - seg->mr_nsegs = 1; - } - return rc; -} - -static int -rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg, - struct rpcrdma_ia *ia, - struct rpcrdma_xprt *r_xprt, void **r) -{ - struct ib_mw_bind param; - LIST_HEAD(l); - int rc; - - BUG_ON(seg->mr_nsegs != 1); - param.bind_info.mr = ia->ri_bind_mem; - param.bind_info.addr = 0ULL; /* unbind */ - param.bind_info.length = 0; - param.bind_info.mw_access_flags = 0; - if (*r) { - param.wr_id = (u64) (unsigned long) *r; - param.send_flags = IB_SEND_SIGNALED; - INIT_CQCOUNT(&r_xprt->rx_ep); - } else { - param.wr_id = 0ULL; - param.send_flags = 0; - DECR_CQCOUNT(&r_xprt->rx_ep); - } - rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, ¶m); - rpcrdma_unmap_one(ia, seg); - if (rc) - dprintk("RPC: %s: failed ib_(un)bind_mw," - " status %i\n", __func__, rc); - else - *r = NULL; /* will upcall on completion */ - return rc; -} - -static int -rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg, - int *nsegs, int writing, struct rpcrdma_ia *ia) -{ - int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE : - IB_ACCESS_REMOTE_READ); - struct rpcrdma_mr_seg *seg1 = seg; - struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS]; - int len, i, rc = 0; - - if (*nsegs > RPCRDMA_MAX_DATA_SEGS) - *nsegs = RPCRDMA_MAX_DATA_SEGS; - for (len = 0, i = 0; i < *nsegs;) { - rpcrdma_map_one(ia, seg, writing); - ipb[i].addr = seg->mr_dma; - ipb[i].size = seg->mr_len; - len += seg->mr_len; - ++seg; - ++i; - /* Check for holes */ - if ((i < *nsegs && offset_in_page(seg->mr_offset)) || - offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len)) - break; - } - seg1->mr_base = seg1->mr_dma; - seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd, - ipb, i, mem_priv, &seg1->mr_base); - if (IS_ERR(seg1->mr_chunk.rl_mr)) { - rc = PTR_ERR(seg1->mr_chunk.rl_mr); - dprintk("RPC: %s: failed ib_reg_phys_mr " - "%u@0x%llx (%d)... status %i\n", - __func__, len, - (unsigned long long)seg1->mr_dma, i, rc); - while (i--) - rpcrdma_unmap_one(ia, --seg); - } else { - seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey; - seg1->mr_nsegs = i; - seg1->mr_len = len; - } - *nsegs = i; - return rc; -} - -static int -rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg, - struct rpcrdma_ia *ia) -{ - struct rpcrdma_mr_seg *seg1 = seg; - int rc; - - rc = ib_dereg_mr(seg1->mr_chunk.rl_mr); - seg1->mr_chunk.rl_mr = NULL; - while (seg1->mr_nsegs--) - rpcrdma_unmap_one(ia, seg++); - if (rc) - dprintk("RPC: %s: failed ib_dereg_mr," - " status %i\n", __func__, rc); - return rc; -} - int rpcrdma_register_external(struct rpcrdma_mr_seg *seg, int nsegs, int writing, struct rpcrdma_xprt *r_xprt) @@ -1819,16 +1693,8 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg, rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia); break; - /* Registration using memory windows */ - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt); - break; - - /* Default registration each time */ default: - rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia); - break; + return -1; } if (rc) return -1; @@ -1838,7 +1704,7 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg, int rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg, - struct rpcrdma_xprt *r_xprt, void *r) + struct rpcrdma_xprt *r_xprt) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; int nsegs = seg->mr_nsegs, rc; @@ -1847,9 +1713,7 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg, #if RPCRDMA_PERSISTENT_REGISTRATION case RPCRDMA_ALLPHYSICAL: - BUG_ON(nsegs != 1); rpcrdma_unmap_one(ia, seg); - rc = 0; break; #endif @@ -1861,21 +1725,9 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg, rc = rpcrdma_deregister_fmr_external(seg, ia); break; - case RPCRDMA_MEMWINDOWS_ASYNC: - case RPCRDMA_MEMWINDOWS: - rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r); - break; - default: - rc = rpcrdma_deregister_default_external(seg, ia); break; } - if (r) { - struct rpcrdma_rep *rep = r; - void (*func)(struct rpcrdma_rep *) = rep->rr_func; - rep->rr_func = NULL; - func(rep); /* dereg done, callback now */ - } return nsegs; } @@ -1950,7 +1802,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, ib_dma_sync_single_for_cpu(ia->ri_id->device, rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL); - DECR_CQCOUNT(ep); rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); if (rc) diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index cc1445dc1d1a..89e7cd479705 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -43,6 +43,7 @@ #include /* wait_queue_head_t, etc */ #include /* spinlock_t, etc */ #include /* atomic_t, etc */ +#include /* struct work_struct */ #include /* RDMA connection api */ #include /* RDMA verbs api */ @@ -66,18 +67,21 @@ struct rpcrdma_ia { struct completion ri_done; int ri_async_rc; enum rpcrdma_memreg ri_memreg_strategy; + unsigned int ri_max_frmr_depth; }; /* * RDMA Endpoint -- one per transport instance */ +#define RPCRDMA_WC_BUDGET (128) +#define RPCRDMA_POLLSIZE (16) + struct rpcrdma_ep { atomic_t rep_cqcount; int rep_cqinit; int rep_connected; struct rpcrdma_ia *rep_ia; - struct ib_cq *rep_cq; struct ib_qp_init_attr rep_attr; wait_queue_head_t rep_connect_wait; struct ib_sge rep_pad; /* holds zeroed pad */ @@ -86,6 +90,9 @@ struct rpcrdma_ep { struct rpc_xprt *rep_xprt; /* for rep_func */ struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; + struct delayed_work rep_connect_worker; + struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE]; + struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE]; }; #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) @@ -124,7 +131,6 @@ struct rpcrdma_rep { struct rpc_xprt *rr_xprt; /* needed for request/reply matching */ void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */ struct list_head rr_list; /* tasklet list */ - wait_queue_head_t rr_unbind; /* optional unbind wait */ struct ib_sge rr_iov; /* for posting */ struct ib_mr *rr_handle; /* handle for mem in rr_iov */ char rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */ @@ -159,7 +165,6 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ struct ib_mr *rl_mr; /* if registered directly */ struct rpcrdma_mw { /* if registered from region */ union { - struct ib_mw *mw; struct ib_fmr *fmr; struct { struct ib_fast_reg_page_list *fr_pgl; @@ -207,7 +212,6 @@ struct rpcrdma_req { struct rpcrdma_buffer { spinlock_t rb_lock; /* protects indexes */ atomic_t rb_credits; /* most recent server credits */ - unsigned long rb_cwndscale; /* cached framework rpc_cwndscale */ int rb_max_requests;/* client max requests */ struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */ int rb_send_index; @@ -300,7 +304,7 @@ void rpcrdma_ia_close(struct rpcrdma_ia *); */ int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *, struct rpcrdma_create_data_internal *); -int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *); +void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *); int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *); int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); @@ -330,11 +334,12 @@ int rpcrdma_deregister_internal(struct rpcrdma_ia *, int rpcrdma_register_external(struct rpcrdma_mr_seg *, int, int, struct rpcrdma_xprt *); int rpcrdma_deregister_external(struct rpcrdma_mr_seg *, - struct rpcrdma_xprt *, void *); + struct rpcrdma_xprt *); /* * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c */ +void rpcrdma_connect_worker(struct work_struct *); void rpcrdma_conn_func(struct rpcrdma_ep *); void rpcrdma_reply_handler(struct rpcrdma_rep *);