From 47f5b1a37f3e8055dfea446236c05478087858dc Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Mon, 4 Sep 2023 12:06:07 +0200 Subject: [PATCH] lib: introduce struct easy_poll_set for poll information Connection filter had a `get_select_socks()` method, inspired by the various `getsocks` functions involved during the lifetime of a transfer. These, depending on transfer state (CONNECT/DO/DONE/ etc.), return sockets to monitor and flag if this shall be done for POLLIN and/or POLLOUT. Due to this design, sockets and flags could only be added, not removed. This led to problems in filters like HTTP/2 where flow control prohibits the sending of data until the peer increases the flow window. The general transfer loop wants to write, adds POLLOUT, the socket is writeable but no data can be written. This leads to cpu busy loops. To prevent that, HTTP/2 did set the `SEND_HOLD` flag of such a blocked transfer, so the transfer loop cedes further attempts. This works if only one such filter is involved. If a HTTP/2 transfer goes through a HTTP/2 proxy, two filters are setting/clearing this flag and may step on each other's toes. Connection filters `get_select_socks()` is replaced by `adjust_pollset()`. They get passed a `struct easy_pollset` that keeps up to `MAX_SOCKSPEREASYHANDLE` sockets and their `POLLIN|POLLOUT` flags. This struct is initialized in `multi_getsock()` by calling the various `getsocks()` implementations based on transfer state, as before. After protocol handlers/transfer loop have set the sockets and flags they want, the `easy_pollset` is *always* passed to the filters. Filters "higher" in the chain are called first, starting at the first not-yet-connection one. Each filter may add sockets and/or change flags. When all flags are removed, the socket itself is removed from the pollset. Example: * transfer wants to send, adds POLLOUT * http/2 filter has a flow control block, removes POLLOUT and adds POLLIN (it is waiting on a WINDOW_UPDATE from the server) * TLS filter is connected and changes nothing * h2-proxy filter also has a flow control block on its tunnel stream, removes POLLOUT and adds POLLIN also. * socket filter is connected and changes nothing * The resulting pollset is then mixed together with all other transfers and their pollsets, just as before. Use of `SEND_HOLD` is no longer necessary in the filters. All filters are adapted for the changed method. The handling in `multi.c` has been adjusted, but its state handling the the protocol handlers' `getsocks` method are untouched. The most affected filters are http/2, ngtcp2, quiche and h2-proxy. TLS filters needed to be adjusted for the connecting handshake read/write handling. No noticeable difference in performance was detected in local scorecard runs. Closes #11833 --- lib/cf-h1-proxy.c | 24 ++-- lib/cf-h2-proxy.c | 50 ++++---- lib/cf-haproxy.c | 18 +-- lib/cf-https-connect.c | 47 +++---- lib/cf-socket.c | 25 ++-- lib/cfilters.c | 190 ++++++++++++++++++++++++---- lib/cfilters.h | 107 +++++++++++++--- lib/connect.c | 58 ++++----- lib/ftp.c | 2 +- lib/http2.c | 73 ++++------- lib/http_proxy.c | 2 +- lib/multi.c | 249 ++++++++++++++++--------------------- lib/socks.c | 17 ++- lib/urldata.h | 21 +++- lib/vquic/curl_msh3.c | 18 +-- lib/vquic/curl_ngtcp2.c | 122 +++++++++++------- lib/vquic/curl_quiche.c | 107 +++++----------- lib/vtls/bearssl.c | 34 ++--- lib/vtls/gtls.c | 2 +- lib/vtls/mbedtls.c | 2 +- lib/vtls/openssl.c | 2 +- lib/vtls/rustls.c | 42 +++---- lib/vtls/schannel.c | 2 +- lib/vtls/sectransp.c | 2 +- lib/vtls/vtls.c | 54 ++++---- lib/vtls/vtls_int.h | 17 ++- lib/vtls/wolfssl.c | 2 +- tests/http/testenv/curl.py | 4 +- tests/unit/unit2600.c | 2 +- 29 files changed, 692 insertions(+), 603 deletions(-) diff --git a/lib/cf-h1-proxy.c b/lib/cf-h1-proxy.c index 674802114..c46fb1e31 100644 --- a/lib/cf-h1-proxy.c +++ b/lib/cf-h1-proxy.c @@ -1038,31 +1038,29 @@ out: return result; } -static int cf_h1_proxy_get_select_socks(struct Curl_cfilter *cf, +static void cf_h1_proxy_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { struct h1_tunnel_state *ts = cf->ctx; - int fds; - fds = cf->next->cft->get_select_socks(cf->next, data, socks); - if(!fds && cf->next->connected && !cf->connected) { + if(!cf->connected) { /* If we are not connected, but the filter "below" is * and not waiting on something, we are tunneling. */ - socks[0] = Curl_conn_cf_get_socket(cf, data); + curl_socket_t sock = Curl_conn_cf_get_socket(cf, data); if(ts) { /* when we've sent a CONNECT to a proxy, we should rather either wait for the socket to become readable to be able to get the response headers or if we're still sending the request, wait for write. */ - if(ts->CONNECT.sending == HTTPSEND_REQUEST) { - return GETSOCK_WRITESOCK(0); - } - return GETSOCK_READSOCK(0); + if(ts->CONNECT.sending == HTTPSEND_REQUEST) + Curl_pollset_set_out_only(data, ps, sock); + else + Curl_pollset_set_in_only(data, ps, sock); } - return GETSOCK_WRITESOCK(0); + else + Curl_pollset_set_out_only(data, ps, sock); } - return fds; } static void cf_h1_proxy_destroy(struct Curl_cfilter *cf, @@ -1093,7 +1091,7 @@ struct Curl_cftype Curl_cft_h1_proxy = { cf_h1_proxy_connect, cf_h1_proxy_close, Curl_cf_http_proxy_get_host, - cf_h1_proxy_get_select_socks, + cf_h1_proxy_adjust_pollset, Curl_cf_def_data_pending, Curl_cf_def_send, Curl_cf_def_recv, diff --git a/lib/cf-h2-proxy.c b/lib/cf-h2-proxy.c index 9f852c1a5..e3b9c1dbc 100644 --- a/lib/cf-h2-proxy.c +++ b/lib/cf-h2-proxy.c @@ -688,12 +688,8 @@ static int proxy_h2_on_frame_recv(nghttp2_session *session, * window and *assume* that we treat this like a WINDOW_UPDATE. Some * servers send an explicit WINDOW_UPDATE, but not all seem to do that. * To be safe, we UNHOLD a stream in order not to stall. */ - if((data->req.keepon & KEEP_SEND_HOLD) && - (data->req.keepon & KEEP_SEND)) { - data->req.keepon &= ~KEEP_SEND_HOLD; + if(CURL_WANT_SEND(data)) { drain_tunnel(cf, data, &ctx->tunnel); - CURL_TRC_CF(data, cf, "[%d] un-holding after SETTINGS", - stream_id); } break; case NGHTTP2_GOAWAY: @@ -727,12 +723,8 @@ static int proxy_h2_on_frame_recv(nghttp2_session *session, } break; case NGHTTP2_WINDOW_UPDATE: - if((data->req.keepon & KEEP_SEND_HOLD) && - (data->req.keepon & KEEP_SEND)) { - data->req.keepon &= ~KEEP_SEND_HOLD; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - CURL_TRC_CF(data, cf, "[%d] unpausing after win update", - stream_id); + if(CURL_WANT_SEND(data)) { + drain_tunnel(cf, data, &ctx->tunnel); } break; default: @@ -1176,25 +1168,31 @@ static bool cf_h2_proxy_data_pending(struct Curl_cfilter *cf, return cf->next? cf->next->cft->has_data_pending(cf->next, data) : FALSE; } -static int cf_h2_proxy_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *sock) +static void cf_h2_proxy_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { struct cf_h2_proxy_ctx *ctx = cf->ctx; - int bitmap = GETSOCK_BLANK; - struct cf_call_data save; + curl_socket_t sock = Curl_conn_cf_get_socket(cf, data); + bool want_recv, want_send; - CF_DATA_SAVE(save, cf, data); - sock[0] = Curl_conn_cf_get_socket(cf, data); - bitmap |= GETSOCK_READSOCK(0); + Curl_pollset_check(data, ps, sock, &want_recv, &want_send); + if(ctx->h2 && (want_recv || want_send)) { + struct cf_call_data save; + bool c_exhaust, s_exhaust; - /* HTTP/2 layer wants to send data) AND there's a window to send data in */ - if(nghttp2_session_want_write(ctx->h2) && - nghttp2_session_get_remote_window_size(ctx->h2)) - bitmap |= GETSOCK_WRITESOCK(0); + CF_DATA_SAVE(save, cf, data); + c_exhaust = !nghttp2_session_get_remote_window_size(ctx->h2); + s_exhaust = ctx->tunnel.stream_id >= 0 && + !nghttp2_session_get_stream_remote_window_size( + ctx->h2, ctx->tunnel.stream_id); + want_recv = (want_recv || c_exhaust || s_exhaust); + want_send = (!s_exhaust && want_send) || + (!c_exhaust && nghttp2_session_want_write(ctx->h2)); - CF_DATA_RESTORE(cf, save); - return bitmap; + Curl_pollset_set(data, ps, sock, want_recv, want_send); + CF_DATA_RESTORE(cf, save); + } } static ssize_t h2_handle_tunnel_close(struct Curl_cfilter *cf, @@ -1531,7 +1529,7 @@ struct Curl_cftype Curl_cft_h2_proxy = { cf_h2_proxy_connect, cf_h2_proxy_close, Curl_cf_http_proxy_get_host, - cf_h2_proxy_get_select_socks, + cf_h2_proxy_adjust_pollset, cf_h2_proxy_data_pending, cf_h2_proxy_send, cf_h2_proxy_recv, diff --git a/lib/cf-haproxy.c b/lib/cf-haproxy.c index 39ac41571..c29b2be20 100644 --- a/lib/cf-haproxy.c +++ b/lib/cf-haproxy.c @@ -171,23 +171,17 @@ static void cf_haproxy_close(struct Curl_cfilter *cf, cf->next->cft->do_close(cf->next, data); } -static int cf_haproxy_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks) +static void cf_haproxy_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { - int fds; - - fds = cf->next->cft->get_select_socks(cf->next, data, socks); - if(!fds && cf->next->connected && !cf->connected) { + if(cf->next->connected && !cf->connected) { /* If we are not connected, but the filter "below" is * and not waiting on something, we are sending. */ - socks[0] = Curl_conn_cf_get_socket(cf, data); - return GETSOCK_WRITESOCK(0); + Curl_pollset_set_out_only(data, ps, Curl_conn_cf_get_socket(cf, data)); } - return fds; } - struct Curl_cftype Curl_cft_haproxy = { "HAPROXY", 0, @@ -196,7 +190,7 @@ struct Curl_cftype Curl_cft_haproxy = { cf_haproxy_connect, cf_haproxy_close, Curl_cf_def_get_host, - cf_haproxy_get_select_socks, + cf_haproxy_adjust_pollset, Curl_cf_def_data_pending, Curl_cf_def_send, Curl_cf_def_recv, diff --git a/lib/cf-https-connect.c b/lib/cf-https-connect.c index be54aec74..99a16a01e 100644 --- a/lib/cf-https-connect.c +++ b/lib/cf-https-connect.c @@ -325,42 +325,25 @@ out: return result; } -static int cf_hc_get_select_socks(struct Curl_cfilter *cf, +static void cf_hc_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { - struct cf_hc_ctx *ctx = cf->ctx; - size_t i, j, s; - int brc, rc = GETSOCK_BLANK; - curl_socket_t bsocks[MAX_SOCKSPEREASYHANDLE]; - struct cf_hc_baller *ballers[2]; + if(!cf->connected) { + struct cf_hc_ctx *ctx = cf->ctx; + struct cf_hc_baller *ballers[2]; + size_t i; - if(cf->connected) - return cf->next->cft->get_select_socks(cf->next, data, socks); - - ballers[0] = &ctx->h3_baller; - ballers[1] = &ctx->h21_baller; - for(i = s = 0; i < sizeof(ballers)/sizeof(ballers[0]); i++) { - struct cf_hc_baller *b = ballers[i]; - if(!cf_hc_baller_is_active(b)) - continue; - brc = Curl_conn_cf_get_select_socks(b->cf, data, bsocks); - CURL_TRC_CF(data, cf, "get_selected_socks(%s) -> %x", b->name, brc); - if(!brc) - continue; - for(j = 0; j < MAX_SOCKSPEREASYHANDLE && s < MAX_SOCKSPEREASYHANDLE; ++j) { - if((brc & GETSOCK_WRITESOCK(j)) || (brc & GETSOCK_READSOCK(j))) { - socks[s] = bsocks[j]; - if(brc & GETSOCK_WRITESOCK(j)) - rc |= GETSOCK_WRITESOCK(s); - if(brc & GETSOCK_READSOCK(j)) - rc |= GETSOCK_READSOCK(s); - s++; - } + ballers[0] = &ctx->h3_baller; + ballers[1] = &ctx->h21_baller; + for(i = 0; i < sizeof(ballers)/sizeof(ballers[0]); i++) { + struct cf_hc_baller *b = ballers[i]; + if(!cf_hc_baller_is_active(b)) + continue; + Curl_conn_cf_adjust_pollset(b->cf, data, ps); } + CURL_TRC_CF(data, cf, "adjust_pollset -> %d socks", ps->num); } - CURL_TRC_CF(data, cf, "get_selected_socks -> %x", rc); - return rc; } static bool cf_hc_data_pending(struct Curl_cfilter *cf, @@ -455,7 +438,7 @@ struct Curl_cftype Curl_cft_http_connect = { cf_hc_connect, cf_hc_close, Curl_cf_def_get_host, - cf_hc_get_select_socks, + cf_hc_adjust_pollset, cf_hc_data_pending, Curl_cf_def_send, Curl_cf_def_recv, diff --git a/lib/cf-socket.c b/lib/cf-socket.c index ce3f9e943..2d4088db9 100644 --- a/lib/cf-socket.c +++ b/lib/cf-socket.c @@ -1252,20 +1252,19 @@ static void cf_socket_get_host(struct Curl_cfilter *cf, *pport = cf->conn->port; } -static int cf_socket_get_select_socks(struct Curl_cfilter *cf, +static void cf_socket_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { struct cf_socket_ctx *ctx = cf->ctx; - int rc = GETSOCK_BLANK; - (void)data; - if(!cf->connected && ctx->sock != CURL_SOCKET_BAD) { - socks[0] = ctx->sock; - rc |= GETSOCK_WRITESOCK(0); + if(ctx->sock != CURL_SOCKET_BAD) { + if(!cf->connected) + Curl_pollset_set_out_only(data, ps, ctx->sock); + else + Curl_pollset_add_in(data, ps, ctx->sock); + CURL_TRC_CF(data, cf, "adjust_pollset -> %d socks", ps->num); } - - return rc; } static bool cf_socket_data_pending(struct Curl_cfilter *cf, @@ -1612,7 +1611,7 @@ struct Curl_cftype Curl_cft_tcp = { cf_tcp_connect, cf_socket_close, cf_socket_get_host, - cf_socket_get_select_socks, + cf_socket_adjust_pollset, cf_socket_data_pending, cf_socket_send, cf_socket_recv, @@ -1742,7 +1741,7 @@ struct Curl_cftype Curl_cft_udp = { cf_udp_connect, cf_socket_close, cf_socket_get_host, - cf_socket_get_select_socks, + cf_socket_adjust_pollset, cf_socket_data_pending, cf_socket_send, cf_socket_recv, @@ -1793,7 +1792,7 @@ struct Curl_cftype Curl_cft_unix = { cf_tcp_connect, cf_socket_close, cf_socket_get_host, - cf_socket_get_select_socks, + cf_socket_adjust_pollset, cf_socket_data_pending, cf_socket_send, cf_socket_recv, @@ -1857,7 +1856,7 @@ struct Curl_cftype Curl_cft_tcp_accept = { cf_tcp_accept_connect, cf_socket_close, cf_socket_get_host, /* TODO: not accurate */ - cf_socket_get_select_socks, + cf_socket_adjust_pollset, cf_socket_data_pending, cf_socket_send, cf_socket_recv, diff --git a/lib/cfilters.c b/lib/cfilters.c index f74eb4003..59d6c84b0 100644 --- a/lib/cfilters.c +++ b/lib/cfilters.c @@ -33,6 +33,7 @@ #include "sockaddr.h" /* required for Curl_sockaddr_storage */ #include "multiif.h" #include "progress.h" +#include "select.h" #include "warnless.h" /* The last 3 #include files should be in this order */ @@ -70,12 +71,14 @@ void Curl_cf_def_get_host(struct Curl_cfilter *cf, struct Curl_easy *data, } } -int Curl_cf_def_get_select_socks(struct Curl_cfilter *cf, +void Curl_cf_def_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { - return cf->next? - cf->next->cft->get_select_socks(cf->next, data, socks) : 0; + /* NOP */ + (void)cf; + (void)data; + (void)ps; } bool Curl_cf_def_data_pending(struct Curl_cfilter *cf, @@ -303,15 +306,6 @@ void Curl_conn_cf_close(struct Curl_cfilter *cf, struct Curl_easy *data) cf->cft->do_close(cf, data); } -int Curl_conn_cf_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks) -{ - if(cf) - return cf->cft->get_select_socks(cf, data, socks); - return 0; -} - ssize_t Curl_conn_cf_send(struct Curl_cfilter *cf, struct Curl_easy *data, const void *buf, size_t len, CURLcode *err) { @@ -433,22 +427,31 @@ bool Curl_conn_data_pending(struct Curl_easy *data, int sockindex) return FALSE; } -int Curl_conn_get_select_socks(struct Curl_easy *data, int sockindex, - curl_socket_t *socks) +void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { - struct Curl_cfilter *cf; + /* Get the lowest not-connected filter, if there are any */ + while(cf && !cf->connected && cf->next && !cf->next->connected) + cf = cf->next; + /* From there on, give all filters a chance to adjust the pollset. + * Lower filters are called later, so they may override */ + while(cf) { + cf->cft->adjust_pollset(cf, data, ps); + cf = cf->next; + } +} + +void Curl_conn_adjust_pollset(struct Curl_easy *data, + struct easy_pollset *ps) +{ + int i; DEBUGASSERT(data); DEBUGASSERT(data->conn); - cf = data->conn->cfilter[sockindex]; - - /* if the next one is not yet connected, that's the one we want */ - while(cf && cf->next && !cf->next->connected) - cf = cf->next; - if(cf) { - return cf->cft->get_select_socks(cf, data, socks); + for(i = 0; i < 2; ++i) { + Curl_conn_cf_adjust_pollset(data->conn->cfilter[i], data, ps); } - return GETSOCK_BLANK; } void Curl_conn_get_host(struct Curl_easy *data, int sockindex, @@ -646,3 +649,142 @@ size_t Curl_conn_get_max_concurrent(struct Curl_easy *data, &n, NULL) : CURLE_UNKNOWN_OPTION; return (result || n <= 0)? 1 : (size_t)n; } + + +void Curl_pollset_reset(struct Curl_easy *data, + struct easy_pollset *ps) +{ + size_t i; + (void)data; + memset(ps, 0, sizeof(*ps)); + for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) + ps->sockets[i] = CURL_SOCKET_BAD; +} + +/** + * + */ +void Curl_pollset_change(struct Curl_easy *data, + struct easy_pollset *ps, curl_socket_t sock, + int add_flags, int remove_flags) +{ + unsigned int i; + + (void)data; + DEBUGASSERT(VALID_SOCK(sock)); + if(!VALID_SOCK(sock)) + return; + + DEBUGASSERT(add_flags <= (CURL_POLL_IN|CURL_POLL_OUT)); + DEBUGASSERT(remove_flags <= (CURL_POLL_IN|CURL_POLL_OUT)); + DEBUGASSERT((add_flags&remove_flags) == 0); /* no overlap */ + for(i = 0; i < ps->num; ++i) { + if(ps->sockets[i] == sock) { + ps->actions[i] &= (unsigned char)(~remove_flags); + ps->actions[i] |= (unsigned char)add_flags; + /* all gone? remove socket */ + if(!ps->actions[i]) { + if((i + 1) < ps->num) { + memmove(&ps->sockets[i], &ps->sockets[i + 1], + (ps->num - (i + 1)) * sizeof(ps->sockets[0])); + memmove(&ps->actions[i], &ps->actions[i + 1], + (ps->num - (i + 1)) * sizeof(ps->actions[0])); + } + --ps->num; + } + return; + } + } + /* not present */ + if(add_flags) { + /* Having more SOCKETS per easy handle than what is defined + * is a programming error. This indicates that we need + * to raise this limit, making easy_pollset larger. + * Since we use this in tight loops, we do not want to make + * the pollset dynamic unnecessarily. + * The current maximum in practise is HTTP/3 eyeballing where + * we have up to 4 sockets involved in connection setup. + */ + DEBUGASSERT(i < MAX_SOCKSPEREASYHANDLE); + if(i < MAX_SOCKSPEREASYHANDLE) { + ps->sockets[i] = sock; + ps->actions[i] = (unsigned char)add_flags; + ps->num = i + 1; + } + } +} + +void Curl_pollset_set(struct Curl_easy *data, + struct easy_pollset *ps, curl_socket_t sock, + bool do_in, bool do_out) +{ + Curl_pollset_change(data, ps, sock, + (do_in?CURL_POLL_IN:0)|(do_out?CURL_POLL_OUT:0), + (!do_in?CURL_POLL_IN:0)|(!do_out?CURL_POLL_OUT:0)); +} + +static void ps_add(struct Curl_easy *data, struct easy_pollset *ps, + int bitmap, curl_socket_t *socks) +{ + if(bitmap) { + int i; + for(i = 0; i < MAX_SOCKSPEREASYHANDLE; ++i) { + if(!(bitmap & GETSOCK_MASK_RW(i)) || !VALID_SOCK((socks[i]))) { + break; + } + if(bitmap & GETSOCK_READSOCK(i)) { + if(bitmap & GETSOCK_WRITESOCK(i)) + Curl_pollset_add_inout(data, ps, socks[i]); + else + /* is READ, since we checked MASK_RW above */ + Curl_pollset_add_in(data, ps, socks[i]); + } + else + Curl_pollset_add_out(data, ps, socks[i]); + } + } +} + +void Curl_pollset_add_socks(struct Curl_easy *data, + struct easy_pollset *ps, + int (*get_socks_cb)(struct Curl_easy *data, + struct connectdata *conn, + curl_socket_t *socks)) +{ + curl_socket_t socks[MAX_SOCKSPEREASYHANDLE]; + int bitmap; + + DEBUGASSERT(data->conn); + bitmap = get_socks_cb(data, data->conn, socks); + ps_add(data, ps, bitmap, socks); +} + +void Curl_pollset_add_socks2(struct Curl_easy *data, + struct easy_pollset *ps, + int (*get_socks_cb)(struct Curl_easy *data, + curl_socket_t *socks)) +{ + curl_socket_t socks[MAX_SOCKSPEREASYHANDLE]; + int bitmap; + + bitmap = get_socks_cb(data, socks); + ps_add(data, ps, bitmap, socks); +} + +void Curl_pollset_check(struct Curl_easy *data, + struct easy_pollset *ps, curl_socket_t sock, + bool *pwant_read, bool *pwant_write) +{ + unsigned int i; + + (void)data; + DEBUGASSERT(VALID_SOCK(sock)); + for(i = 0; i < ps->num; ++i) { + if(ps->sockets[i] == sock) { + *pwant_read = !!(ps->actions[i] & CURL_POLL_IN); + *pwant_write = !!(ps->actions[i] & CURL_POLL_OUT); + return; + } + } + *pwant_read = *pwant_write = FALSE; +} diff --git a/lib/cfilters.h b/lib/cfilters.h index 2c65264d9..4d8520df1 100644 --- a/lib/cfilters.h +++ b/lib/cfilters.h @@ -60,14 +60,34 @@ typedef void Curl_cft_get_host(struct Curl_cfilter *cf, const char **pdisplay_host, int *pport); -/* Filters may return sockets and fdset flags they are waiting for. - * The passes array has room for up to MAX_SOCKSPEREASYHANDLE sockets. - * @return read/write fdset for index in socks - * or GETSOCK_BLANK when nothing to wait on +struct easy_pollset; + +/* Passing in an easy_pollset for monitoring of sockets, let + * filters add or remove sockets actions (CURL_POLL_OUT, CURL_POLL_IN). + * This may add a socket or, in case no actions remain, remove + * a socket from the set. + * + * Filter implementations need to call filters "below" *after* they have + * made their adjustments. This allows lower filters to override "upper" + * actions. If a "lower" filter is unable to write, it needs to be able + * to disallow POLL_OUT. + * + * A filter without own restrictions/preferences should not modify + * the pollset. Filters, whose filter "below" is not connected, should + * also do no adjustments. + * + * Examples: a TLS handshake, while ongoing, might remove POLL_IN + * when it needs to write, or vice versa. A HTTP/2 filter might remove + * POLL_OUT when a stream window is exhausted and a WINDOW_UPDATE needs + * to be received first and add instead POLL_IN. + * + * @param cf the filter to ask + * @param data the easy handle the pollset is about + * @param ps the pollset (inout) for the easy handle */ -typedef int Curl_cft_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks); +typedef void Curl_cft_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps); typedef bool Curl_cft_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data); @@ -171,7 +191,7 @@ struct Curl_cftype { Curl_cft_connect *do_connect; /* establish connection */ Curl_cft_close *do_close; /* close conn */ Curl_cft_get_host *get_host; /* host filter talks to */ - Curl_cft_get_select_socks *get_select_socks;/* sockets to select on */ + Curl_cft_adjust_pollset *adjust_pollset; /* adjust transfer poll set */ Curl_cft_data_pending *has_data_pending;/* conn has data pending */ Curl_cft_send *do_send; /* send data */ Curl_cft_recv *do_recv; /* receive data */ @@ -200,9 +220,9 @@ void Curl_cf_def_destroy_this(struct Curl_cfilter *cf, void Curl_cf_def_get_host(struct Curl_cfilter *cf, struct Curl_easy *data, const char **phost, const char **pdisplay_host, int *pport); -int Curl_cf_def_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks); +void Curl_cf_def_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps); bool Curl_cf_def_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data); ssize_t Curl_cf_def_send(struct Curl_cfilter *cf, struct Curl_easy *data, @@ -279,9 +299,6 @@ CURLcode Curl_conn_cf_connect(struct Curl_cfilter *cf, struct Curl_easy *data, bool blocking, bool *done); void Curl_conn_cf_close(struct Curl_cfilter *cf, struct Curl_easy *data); -int Curl_conn_cf_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks); ssize_t Curl_conn_cf_send(struct Curl_cfilter *cf, struct Curl_easy *data, const void *buf, size_t len, CURLcode *err); ssize_t Curl_conn_cf_recv(struct Curl_cfilter *cf, struct Curl_easy *data, @@ -364,11 +381,17 @@ bool Curl_conn_data_pending(struct Curl_easy *data, curl_socket_t Curl_conn_get_socket(struct Curl_easy *data, int sockindex); /** - * Get any select fd flags and the socket filters at chain `sockindex` - * at connection `conn` might be waiting for. + * Adjust the pollset for the filter chain startgin at `cf`. */ -int Curl_conn_get_select_socks(struct Curl_easy *data, int sockindex, - curl_socket_t *socks); +void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps); + +/** + * Adjust pollset from filters installed at transfer's connection. + */ +void Curl_conn_adjust_pollset(struct Curl_easy *data, + struct easy_pollset *ps); /** * Receive data through the filter chain at `sockindex` for connection @@ -468,6 +491,54 @@ size_t Curl_conn_get_max_concurrent(struct Curl_easy *data, int sockindex); +void Curl_pollset_reset(struct Curl_easy *data, + struct easy_pollset *ps); + +/* Change the poll flags (CURL_POLL_IN/CURL_POLL_OUT) to the poll set for + * socket `sock`. If the socket is not already part of the poll set, it + * will be added. + * If the socket is present and all poll flags are cleared, it will be removed. + */ +void Curl_pollset_change(struct Curl_easy *data, + struct easy_pollset *ps, curl_socket_t sock, + int add_flags, int remove_flags); + +void Curl_pollset_set(struct Curl_easy *data, + struct easy_pollset *ps, curl_socket_t sock, + bool do_in, bool do_out); + +#define Curl_pollset_add_in(data, ps, sock) \ + Curl_pollset_change((data), (ps), (sock), CURL_POLL_IN, 0) +#define Curl_pollset_add_out(data, ps, sock) \ + Curl_pollset_change((data), (ps), (sock), CURL_POLL_OUT, 0) +#define Curl_pollset_add_inout(data, ps, sock) \ + Curl_pollset_change((data), (ps), (sock), \ + CURL_POLL_IN|CURL_POLL_OUT, 0) +#define Curl_pollset_set_in_only(data, ps, sock) \ + Curl_pollset_change((data), (ps), (sock), \ + CURL_POLL_IN, CURL_POLL_OUT) +#define Curl_pollset_set_out_only(data, ps, sock) \ + Curl_pollset_change((data), (ps), (sock), \ + CURL_POLL_OUT, CURL_POLL_IN) + +void Curl_pollset_add_socks(struct Curl_easy *data, + struct easy_pollset *ps, + int (*get_socks_cb)(struct Curl_easy *data, + struct connectdata *conn, + curl_socket_t *socks)); +void Curl_pollset_add_socks2(struct Curl_easy *data, + struct easy_pollset *ps, + int (*get_socks_cb)(struct Curl_easy *data, + curl_socket_t *socks)); + +/** + * Check if the pollset, as is, wants to read and/or write regarding + * the given socket. + */ +void Curl_pollset_check(struct Curl_easy *data, + struct easy_pollset *ps, curl_socket_t sock, + bool *pwant_read, bool *pwant_write); + /** * Types and macros used to keep the current easy handle in filter calls, * allowing for nested invocations. See #10336. diff --git a/lib/connect.c b/lib/connect.c index 92eebf0d2..5e5cad93d 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -84,6 +84,9 @@ #include "curl_memory.h" #include "memdebug.h" +#ifndef ARRAYSIZE +#define ARRAYSIZE(A) (sizeof(A)/sizeof((A)[0])) +#endif /* * Curl_timeleft() returns the amount of milliseconds left allowed for the @@ -595,7 +598,7 @@ evaluate: *connected = FALSE; /* a very negative world view is best */ now = Curl_now(); ongoing = not_started = 0; - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { struct eyeballer *baller = ctx->baller[i]; if(!baller || baller->is_done) @@ -656,7 +659,7 @@ evaluate: if(not_started > 0) { int added = 0; - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { struct eyeballer *baller = ctx->baller[i]; if(!baller || baller->has_started) @@ -691,7 +694,7 @@ evaluate: /* all ballers have failed to connect. */ CURL_TRC_CF(data, cf, "all eyeballers failed"); result = CURLE_COULDNT_CONNECT; - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { struct eyeballer *baller = ctx->baller[i]; if(!baller) continue; @@ -838,7 +841,7 @@ static void cf_he_ctx_clear(struct Curl_cfilter *cf, struct Curl_easy *data) DEBUGASSERT(ctx); DEBUGASSERT(data); - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { baller_free(ctx->baller[i], data); ctx->baller[i] = NULL; } @@ -846,35 +849,22 @@ static void cf_he_ctx_clear(struct Curl_cfilter *cf, struct Curl_easy *data) ctx->winner = NULL; } -static int cf_he_get_select_socks(struct Curl_cfilter *cf, +static void cf_he_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { struct cf_he_ctx *ctx = cf->ctx; - size_t i, s; - int wrc, rc = GETSOCK_BLANK; - curl_socket_t wsocks[MAX_SOCKSPEREASYHANDLE]; + size_t i; - if(cf->connected) - return cf->next->cft->get_select_socks(cf->next, data, socks); - - for(i = s = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { - struct eyeballer *baller = ctx->baller[i]; - if(!baller || !baller->cf) - continue; - - wrc = Curl_conn_cf_get_select_socks(baller->cf, data, wsocks); - if(wrc) { - /* TODO: we assume we get at most one socket back */ - socks[s] = wsocks[0]; - if(wrc & GETSOCK_WRITESOCK(0)) - rc |= GETSOCK_WRITESOCK(s); - if(wrc & GETSOCK_READSOCK(0)) - rc |= GETSOCK_READSOCK(s); - s++; + if(!cf->connected) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { + struct eyeballer *baller = ctx->baller[i]; + if(!baller || !baller->cf) + continue; + Curl_conn_cf_adjust_pollset(baller->cf, data, ps); } + CURL_TRC_CF(data, cf, "adjust_pollset -> %d socks", ps->num); } - return rc; } static CURLcode cf_he_connect(struct Curl_cfilter *cf, @@ -956,7 +946,7 @@ static bool cf_he_data_pending(struct Curl_cfilter *cf, if(cf->connected) return cf->next->cft->has_data_pending(cf->next, data); - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { struct eyeballer *baller = ctx->baller[i]; if(!baller || !baller->cf) continue; @@ -975,7 +965,7 @@ static struct curltime get_max_baller_time(struct Curl_cfilter *cf, size_t i; memset(&tmax, 0, sizeof(tmax)); - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { struct eyeballer *baller = ctx->baller[i]; memset(&t, 0, sizeof(t)); @@ -1000,7 +990,7 @@ static CURLcode cf_he_query(struct Curl_cfilter *cf, int reply_ms = -1; size_t i; - for(i = 0; i < sizeof(ctx->baller)/sizeof(ctx->baller[0]); i++) { + for(i = 0; i < ARRAYSIZE(ctx->baller); i++) { struct eyeballer *baller = ctx->baller[i]; int breply_ms; @@ -1055,7 +1045,7 @@ struct Curl_cftype Curl_cft_happy_eyeballs = { cf_he_connect, cf_he_close, Curl_cf_def_get_host, - cf_he_get_select_socks, + cf_he_adjust_pollset, cf_he_data_pending, Curl_cf_def_send, Curl_cf_def_recv, @@ -1126,10 +1116,6 @@ struct transport_provider transport_providers[] = { { TRNSPRT_UNIX, Curl_cf_unix_create }, }; -#ifndef ARRAYSIZE -#define ARRAYSIZE(A) (sizeof(A)/sizeof((A)[0])) -#endif - static cf_ip_connect_create *get_cf_create(int transport) { size_t i; @@ -1319,7 +1305,7 @@ struct Curl_cftype Curl_cft_setup = { cf_setup_connect, cf_setup_close, Curl_cf_def_get_host, - Curl_cf_def_get_select_socks, + Curl_cf_def_adjust_pollset, Curl_cf_def_data_pending, Curl_cf_def_send, Curl_cf_def_recv, diff --git a/lib/ftp.c b/lib/ftp.c index 518c92332..40af40449 100644 --- a/lib/ftp.c +++ b/lib/ftp.c @@ -819,7 +819,7 @@ static int ftp_domore_getsock(struct Curl_easy *data, DEBUGF(infof(data, "ftp_domore_getsock()")); if(conn->cfilter[SECONDARYSOCKET] && !Curl_conn_is_connected(conn, SECONDARYSOCKET)) - return Curl_conn_get_select_socks(data, SECONDARYSOCKET, socks); + return 0; if(FTP_STOP == ftpc->state) { int bits = GETSOCK_READSOCK(0); diff --git a/lib/http2.c b/lib/http2.c index a989280f3..d497e33c7 100644 --- a/lib/http2.c +++ b/lib/http2.c @@ -1081,16 +1081,11 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, stream->reset = TRUE; } stream->send_closed = TRUE; - data->req.keepon &= ~KEEP_SEND_HOLD; drain_stream(cf, data, stream); break; case NGHTTP2_WINDOW_UPDATE: - if((data->req.keepon & KEEP_SEND_HOLD) && - (data->req.keepon & KEEP_SEND)) { - data->req.keepon &= ~KEEP_SEND_HOLD; + if(CURL_WANT_SEND(data)) { drain_stream(cf, data, stream); - CURL_TRC_CF(data, cf, "[%d] un-holding after win update", - stream_id); } break; default: @@ -1235,15 +1230,10 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, * window and *assume* that we treat this like a WINDOW_UPDATE. Some * servers send an explicit WINDOW_UPDATE, but not all seem to do that. * To be safe, we UNHOLD a stream in order not to stall. */ - if((data->req.keepon & KEEP_SEND_HOLD) && - (data->req.keepon & KEEP_SEND)) { + if(CURL_WANT_SEND(data)) { struct stream_ctx *stream = H2_STREAM_CTX(data); - data->req.keepon &= ~KEEP_SEND_HOLD; - if(stream) { + if(stream) drain_stream(cf, data, stream); - CURL_TRC_CF(data, cf, "[%d] un-holding after SETTINGS", - stream_id); - } } } break; @@ -1343,7 +1333,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, stream->error = error_code; if(stream->error) stream->reset = TRUE; - data_s->req.keepon &= ~KEEP_SEND_HOLD; if(stream->error) CURL_TRC_CF(data_s, cf, "[%d] RESET: %s (err %d)", @@ -2267,14 +2256,6 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, * frame buffer or our network out buffer. */ size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2, stream->id); - if(rwin == 0) { - /* H2 flow window exhaustion. We need to HOLD upload until we get - * a WINDOW_UPDATE from the server. */ - data->req.keepon |= KEEP_SEND_HOLD; - CURL_TRC_CF(data, cf, "[%d] holding send as remote flow " - "window is exhausted", stream->id); - } - /* Whatever the cause, we need to return CURL_EAGAIN for this call. * We have unwritten state that needs us being invoked again and EAGAIN * is the only way to ensure that. */ @@ -2326,38 +2307,34 @@ out: return nwritten; } -static int cf_h2_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *sock) +static void cf_h2_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { struct cf_h2_ctx *ctx = cf->ctx; - struct SingleRequest *k = &data->req; - struct stream_ctx *stream = H2_STREAM_CTX(data); - int bitmap = GETSOCK_BLANK; - struct cf_call_data save; + bool want_recv = CURL_WANT_RECV(data); + bool want_send = CURL_WANT_SEND(data); - CF_DATA_SAVE(save, cf, data); - sock[0] = Curl_conn_cf_get_socket(cf, data); + if(ctx->h2 && (want_recv || want_send)) { + struct stream_ctx *stream = H2_STREAM_CTX(data); + curl_socket_t sock = Curl_conn_cf_get_socket(cf, data); + struct cf_call_data save; + bool c_exhaust, s_exhaust; - if(!(k->keepon & (KEEP_RECV_PAUSE|KEEP_RECV_HOLD))) - /* Unless paused - in an HTTP/2 connection we can basically always get a - frame so we should always be ready for one */ - bitmap |= GETSOCK_READSOCK(0); + CF_DATA_SAVE(save, cf, data); + c_exhaust = !nghttp2_session_get_remote_window_size(ctx->h2); + s_exhaust = stream && stream->id >= 0 && + !nghttp2_session_get_stream_remote_window_size(ctx->h2, + stream->id); + want_recv = (want_recv || c_exhaust || s_exhaust); + want_send = (!s_exhaust && want_send) || + (!c_exhaust && nghttp2_session_want_write(ctx->h2)); - /* we're (still uploading OR the HTTP/2 layer wants to send data) AND - there's a window to send data in */ - if((((k->keepon & KEEP_SENDBITS) == KEEP_SEND) || - nghttp2_session_want_write(ctx->h2)) && - (nghttp2_session_get_remote_window_size(ctx->h2) && - nghttp2_session_get_stream_remote_window_size(ctx->h2, - stream->id))) - bitmap |= GETSOCK_WRITESOCK(0); - - CF_DATA_RESTORE(cf, save); - return bitmap; + Curl_pollset_set(data, ps, sock, want_recv, want_send); + CF_DATA_RESTORE(cf, save); + } } - static CURLcode cf_h2_connect(struct Curl_cfilter *cf, struct Curl_easy *data, bool blocking, bool *done) @@ -2601,7 +2578,7 @@ struct Curl_cftype Curl_cft_nghttp2 = { cf_h2_connect, cf_h2_close, Curl_cf_def_get_host, - cf_h2_get_select_socks, + cf_h2_adjust_pollset, cf_h2_data_pending, cf_h2_send, cf_h2_recv, diff --git a/lib/http_proxy.c b/lib/http_proxy.c index a1d6da954..8e1832581 100644 --- a/lib/http_proxy.c +++ b/lib/http_proxy.c @@ -299,7 +299,7 @@ struct Curl_cftype Curl_cft_http_proxy = { http_proxy_cf_connect, http_proxy_cf_close, Curl_cf_http_proxy_get_host, - Curl_cf_def_get_select_socks, + Curl_cf_def_adjust_pollset, Curl_cf_def_data_pending, Curl_cf_def_send, Curl_cf_def_recv, diff --git a/lib/multi.c b/lib/multi.c index b9d836390..296f459ee 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -1024,49 +1024,57 @@ static int protocol_getsock(struct Curl_easy *data, { if(conn->handler->proto_getsock) return conn->handler->proto_getsock(data, conn, socks); - return Curl_conn_get_select_socks(data, FIRSTSOCKET, socks); + return GETSOCK_BLANK; } -/* returns bitmapped flags for this handle and its sockets. The 'socks[]' - array contains MAX_SOCKSPEREASYHANDLE entries. */ -static int multi_getsock(struct Curl_easy *data, - curl_socket_t *socks) +/* Initializes `poll_set` with the current socket poll actions needed + * for transfer `data`. */ +static void multi_getsock(struct Curl_easy *data, + struct easy_pollset *ps) { - struct connectdata *conn = data->conn; /* The no connection case can happen when this is called from curl_multi_remove_handle() => singlesocket() => multi_getsock(). */ - if(!conn) - return 0; + Curl_pollset_reset(data, ps); + if(!data->conn) + return; switch(data->mstate) { default: - return 0; + break; case MSTATE_RESOLVING: - return Curl_resolv_getsock(data, socks); + Curl_pollset_add_socks2(data, ps, Curl_resolv_getsock); + /* connection filters are not involved in this phase */ + return; case MSTATE_PROTOCONNECTING: case MSTATE_PROTOCONNECT: - return protocol_getsock(data, conn, socks); + Curl_pollset_add_socks(data, ps, protocol_getsock); + break; case MSTATE_DO: case MSTATE_DOING: - return doing_getsock(data, conn, socks); + Curl_pollset_add_socks(data, ps, doing_getsock); + break; case MSTATE_TUNNELING: case MSTATE_CONNECTING: - return Curl_conn_get_select_socks(data, FIRSTSOCKET, socks); + break; case MSTATE_DOING_MORE: - return domore_getsock(data, conn, socks); + Curl_pollset_add_socks(data, ps, domore_getsock); + break; case MSTATE_DID: /* since is set after DO is completed, we switch to waiting for the same as the PERFORMING state */ case MSTATE_PERFORMING: - return Curl_single_getsock(data, conn, socks); + Curl_pollset_add_socks(data, ps, Curl_single_getsock); + break; } + /* Let connection filters add/remove as needed */ + Curl_conn_adjust_pollset(data, ps); } CURLMcode curl_multi_fdset(struct Curl_multi *multi, @@ -1078,8 +1086,8 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi, and then we must make sure that is done. */ struct Curl_easy *data; int this_max_fd = -1; - curl_socket_t sockbunch[MAX_SOCKSPEREASYHANDLE]; - int i; + struct easy_pollset ps; + unsigned int i; (void)exc_fd_set; /* not used */ if(!GOOD_MULTI_HANDLE(multi)) @@ -1088,29 +1096,20 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi, if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; + memset(&ps, 0, sizeof(ps)); for(data = multi->easyp; data; data = data->next) { - int bitmap; -#ifdef __clang_analyzer_ - /* to prevent "The left operand of '>=' is a garbage value" warnings */ - memset(sockbunch, 0, sizeof(sockbunch)); -#endif - bitmap = multi_getsock(data, sockbunch); + multi_getsock(data, &ps); - for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) { - if((bitmap & GETSOCK_MASK_RW(i)) && VALID_SOCK((sockbunch[i]))) { - if(!FDSET_SOCK(sockbunch[i])) - /* pretend it doesn't exist */ - continue; - if(bitmap & GETSOCK_READSOCK(i)) - FD_SET(sockbunch[i], read_fd_set); - if(bitmap & GETSOCK_WRITESOCK(i)) - FD_SET(sockbunch[i], write_fd_set); - if((int)sockbunch[i] > this_max_fd) - this_max_fd = (int)sockbunch[i]; - } - else { - break; - } + for(i = 0; i < ps.num; i++) { + if(!FDSET_SOCK(ps.sockets[i])) + /* pretend it doesn't exist */ + continue; + if(ps.actions[i] & CURL_POLL_IN) + FD_SET(ps.sockets[i], read_fd_set); + if(ps.actions[i] & CURL_POLL_OUT) + FD_SET(ps.sockets[i], write_fd_set); + if((int)ps.sockets[i] > this_max_fd) + this_max_fd = (int)ps.sockets[i]; } } @@ -1146,9 +1145,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi, bool use_wakeup) { struct Curl_easy *data; - curl_socket_t sockbunch[MAX_SOCKSPEREASYHANDLE]; - int bitmap; - unsigned int i; + struct easy_pollset ps; + size_t i; unsigned int nfds = 0; unsigned int curlfds; long timeout_internal; @@ -1174,17 +1172,10 @@ static CURLMcode multi_wait(struct Curl_multi *multi, return CURLM_BAD_FUNCTION_ARGUMENT; /* Count up how many fds we have from the multi handle */ + memset(&ps, 0, sizeof(ps)); for(data = multi->easyp; data; data = data->next) { - bitmap = multi_getsock(data, sockbunch); - - for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) { - if((bitmap & GETSOCK_MASK_RW(i)) && VALID_SOCK((sockbunch[i]))) { - ++nfds; - } - else { - break; - } - } + multi_getsock(data, &ps); + nfds += ps.num; } /* If the internally desired timeout is actually shorter than requested from @@ -1225,40 +1216,35 @@ static CURLMcode multi_wait(struct Curl_multi *multi, if(curlfds) { /* Add the curl handles to our pollfds first */ for(data = multi->easyp; data; data = data->next) { - bitmap = multi_getsock(data, sockbunch); + multi_getsock(data, &ps); - for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) { - if((bitmap & GETSOCK_MASK_RW(i)) && VALID_SOCK((sockbunch[i]))) { - struct pollfd *ufd = &ufds[nfds++]; + for(i = 0; i < ps.num; i++) { + struct pollfd *ufd = &ufds[nfds++]; #ifdef USE_WINSOCK - long mask = 0; + long mask = 0; #endif - ufd->fd = sockbunch[i]; - ufd->events = 0; - if(bitmap & GETSOCK_READSOCK(i)) { + ufd->fd = ps.sockets[i]; + ufd->events = 0; + if(ps.actions[i] & CURL_POLL_IN) { #ifdef USE_WINSOCK - mask |= FD_READ|FD_ACCEPT|FD_CLOSE; -#endif - ufd->events |= POLLIN; - } - if(bitmap & GETSOCK_WRITESOCK(i)) { -#ifdef USE_WINSOCK - mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; - reset_socket_fdwrite(sockbunch[i]); -#endif - ufd->events |= POLLOUT; - } -#ifdef USE_WINSOCK - if(WSAEventSelect(sockbunch[i], multi->wsa_event, mask) != 0) { - if(ufds_malloc) - free(ufds); - return CURLM_INTERNAL_ERROR; - } + mask |= FD_READ|FD_ACCEPT|FD_CLOSE; #endif + ufd->events |= POLLIN; } - else { - break; + if(ps.actions[i] & CURL_POLL_OUT) { +#ifdef USE_WINSOCK + mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; + reset_socket_fdwrite(ps.sockets[i]); +#endif + ufd->events |= POLLOUT; } +#ifdef USE_WINSOCK + if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) { + if(ufds_malloc) + free(ufds); + return CURLM_INTERNAL_ERROR; + } +#endif } } } @@ -1370,21 +1356,16 @@ static CURLMcode multi_wait(struct Curl_multi *multi, if(curlfds) { for(data = multi->easyp; data; data = data->next) { - bitmap = multi_getsock(data, sockbunch); + multi_getsock(data, &ps); - for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) { - if(bitmap & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))) { - wsa_events.lNetworkEvents = 0; - if(WSAEnumNetworkEvents(sockbunch[i], NULL, &wsa_events) == 0) { - if(ret && !pollrc && wsa_events.lNetworkEvents) - retcode++; - } - WSAEventSelect(sockbunch[i], multi->wsa_event, 0); - } - else { - /* break on entry not checked for being readable or writable */ - break; + for(i = 0; i < ps.num; i++) { + wsa_events.lNetworkEvents = 0; + if(WSAEnumNetworkEvents(ps.sockets[i], NULL, + &wsa_events) == 0) { + if(ret && !pollrc && wsa_events.lNetworkEvents) + retcode++; } + WSAEventSelect(ps.sockets[i], multi->wsa_event, 0); } } } @@ -2879,53 +2860,36 @@ CURLMsg *curl_multi_info_read(struct Curl_multi *multi, int *msgs_in_queue) static CURLMcode singlesocket(struct Curl_multi *multi, struct Curl_easy *data) { - curl_socket_t socks[MAX_SOCKSPEREASYHANDLE]; - int i; + struct easy_pollset cur_poll; + unsigned int i; struct Curl_sh_entry *entry; curl_socket_t s; - int num; - unsigned int curraction; - unsigned char actions[MAX_SOCKSPEREASYHANDLE]; int rc; - for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) - socks[i] = CURL_SOCKET_BAD; - /* Fill in the 'current' struct with the state as it is now: what sockets to supervise and for what actions */ - curraction = multi_getsock(data, socks); + multi_getsock(data, &cur_poll); /* We have 0 .. N sockets already and we get to know about the 0 .. M sockets we should have from now on. Detect the differences, remove no longer supervised ones and add new ones */ /* walk over the sockets we got right now */ - for(i = 0; (i< MAX_SOCKSPEREASYHANDLE) && - (curraction & GETSOCK_MASK_RW(i)); - i++) { - unsigned char action = CURL_POLL_NONE; - unsigned char prevaction = 0; + for(i = 0; i < cur_poll.num; i++) { + unsigned char cur_action = cur_poll.actions[i]; + unsigned char last_action = 0; int comboaction; - bool sincebefore = FALSE; - s = socks[i]; + s = cur_poll.sockets[i]; /* get it from the hash */ entry = sh_getentry(&multi->sockhash, s); - - if(curraction & GETSOCK_READSOCK(i)) - action |= CURL_POLL_IN; - if(curraction & GETSOCK_WRITESOCK(i)) - action |= CURL_POLL_OUT; - - actions[i] = action; if(entry) { /* check if new for this transfer */ - int j; - for(j = 0; j< data->numsocks; j++) { - if(s == data->sockets[j]) { - prevaction = data->actions[j]; - sincebefore = TRUE; + unsigned int j; + for(j = 0; j< data->last_poll.num; j++) { + if(s == data->last_poll.sockets[j]) { + last_action = data->last_poll.actions[j]; break; } } @@ -2937,23 +2901,23 @@ static CURLMcode singlesocket(struct Curl_multi *multi, /* fatal */ return CURLM_OUT_OF_MEMORY; } - if(sincebefore && (prevaction != action)) { + if(last_action && (last_action != cur_action)) { /* Socket was used already, but different action now */ - if(prevaction & CURL_POLL_IN) + if(last_action & CURL_POLL_IN) entry->readers--; - if(prevaction & CURL_POLL_OUT) + if(last_action & CURL_POLL_OUT) entry->writers--; - if(action & CURL_POLL_IN) + if(cur_action & CURL_POLL_IN) entry->readers++; - if(action & CURL_POLL_OUT) + if(cur_action & CURL_POLL_OUT) entry->writers++; } - else if(!sincebefore) { - /* a new user */ + else if(!last_action) { + /* a new transfer using this socket */ entry->users++; - if(action & CURL_POLL_IN) + if(cur_action & CURL_POLL_IN) entry->readers++; - if(action & CURL_POLL_OUT) + if(cur_action & CURL_POLL_OUT) entry->writers++; /* add 'data' to the transfer hash on this socket! */ @@ -2968,7 +2932,7 @@ static CURLMcode singlesocket(struct Curl_multi *multi, (entry->readers ? CURL_POLL_IN : 0); /* socket existed before and has the same action set as before */ - if(sincebefore && ((int)entry->action == comboaction)) + if(last_action && ((int)entry->action == comboaction)) /* same, continue */ continue; @@ -2976,6 +2940,7 @@ static CURLMcode singlesocket(struct Curl_multi *multi, set_in_callback(multi, TRUE); rc = multi->socket_cb(data, s, comboaction, multi->socket_userp, entry->socketp); + set_in_callback(multi, FALSE); if(rc == -1) { multi->dead = TRUE; @@ -2986,16 +2951,15 @@ static CURLMcode singlesocket(struct Curl_multi *multi, entry->action = comboaction; /* store the current action state */ } - num = i; /* number of sockets */ - - /* when we've walked over all the sockets we should have right now, we must - make sure to detect sockets that are removed */ - for(i = 0; i< data->numsocks; i++) { - int j; + /* Check for last_poll.sockets that no longer appear in cur_poll.sockets. + * Need to remove the easy handle from the multi->sockhash->transfers and + * remove multi->sockhash entry when this was the last transfer */ + for(i = 0; i< data->last_poll.num; i++) { + unsigned int j; bool stillused = FALSE; - s = data->sockets[i]; - for(j = 0; j < num; j++) { - if(s == socks[j]) { + s = data->last_poll.sockets[i]; + for(j = 0; j < cur_poll.num; j++) { + if(s == cur_poll.sockets[j]) { /* this is still supervised */ stillused = TRUE; break; @@ -3008,7 +2972,7 @@ static CURLMcode singlesocket(struct Curl_multi *multi, /* if this is NULL here, the socket has been closed and notified so already by Curl_multi_closed() */ if(entry) { - unsigned char oldactions = data->actions[i]; + unsigned char oldactions = data->last_poll.actions[i]; /* this socket has been removed. Decrease user count */ entry->users--; if(oldactions & CURL_POLL_OUT) @@ -3036,11 +3000,10 @@ static CURLMcode singlesocket(struct Curl_multi *multi, } } } - } /* for loop over numsocks */ + } /* for loop over num */ - memcpy(data->sockets, socks, num*sizeof(curl_socket_t)); - memcpy(data->actions, actions, num*sizeof(char)); - data->numsocks = num; + /* Remember for next time */ + memcpy(&data->last_poll, &cur_poll, sizeof(data->last_poll)); return CURLM_OK; } diff --git a/lib/socks.c b/lib/socks.c index 88365ce28..9bb9436c7 100644 --- a/lib/socks.c +++ b/lib/socks.c @@ -1170,32 +1170,29 @@ static CURLcode socks_proxy_cf_connect(struct Curl_cfilter *cf, return result; } -static int socks_cf_get_select_socks(struct Curl_cfilter *cf, +static void socks_cf_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { struct socks_state *sx = cf->ctx; - int fds; - fds = cf->next->cft->get_select_socks(cf->next, data, socks); - if(!fds && cf->next->connected && !cf->connected && sx) { + if(!cf->connected && sx) { /* If we are not connected, the filter below is and has nothing * to wait on, we determine what to wait for. */ - socks[0] = Curl_conn_cf_get_socket(cf, data); + curl_socket_t sock = Curl_conn_cf_get_socket(cf, data); switch(sx->state) { case CONNECT_RESOLVING: case CONNECT_SOCKS_READ: case CONNECT_AUTH_READ: case CONNECT_REQ_READ: case CONNECT_REQ_READ_MORE: - fds = GETSOCK_READSOCK(0); + Curl_pollset_set_in_only(data, ps, sock); break; default: - fds = GETSOCK_WRITESOCK(0); + Curl_pollset_set_out_only(data, ps, sock); break; } } - return fds; } static void socks_proxy_cf_close(struct Curl_cfilter *cf, @@ -1240,7 +1237,7 @@ struct Curl_cftype Curl_cft_socks_proxy = { socks_proxy_cf_connect, socks_proxy_cf_close, socks_cf_get_host, - socks_cf_get_select_socks, + socks_cf_adjust_pollset, Curl_cf_def_data_pending, Curl_cf_def_send, Curl_cf_def_recv, diff --git a/lib/urldata.h b/lib/urldata.h index 03c265f16..e81dab1bb 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -571,6 +571,13 @@ struct hostname { #define KEEP_RECVBITS (KEEP_RECV | KEEP_RECV_HOLD | KEEP_RECV_PAUSE) #define KEEP_SENDBITS (KEEP_SEND | KEEP_SEND_HOLD | KEEP_SEND_PAUSE) +/* transfer wants to send is not PAUSE or HOLD */ +#define CURL_WANT_SEND(data) \ + (((data)->req.keepon & KEEP_SENDBITS) == KEEP_SEND) +/* transfer receive is not on PAUSE or HOLD */ +#define CURL_WANT_RECV(data) \ + (!((data)->req.keepon & (KEEP_RECV_PAUSE|KEEP_RECV_HOLD))) + #if defined(CURLRES_ASYNCH) || !defined(CURL_DISABLE_DOH) #define USE_CURL_ASYNC struct Curl_async { @@ -589,6 +596,15 @@ struct Curl_async { #define FIRSTSOCKET 0 #define SECONDARYSOCKET 1 +/* Polling requested by an easy handle. + * `action` is CURL_POLL_IN, CURL_POLL_OUT or CURL_POLL_INOUT. + */ +struct easy_pollset { + curl_socket_t sockets[MAX_SOCKSPEREASYHANDLE]; + unsigned int num; + unsigned char actions[MAX_SOCKSPEREASYHANDLE]; +}; + enum expect100 { EXP100_SEND_DATA, /* enough waiting, just send the body now */ EXP100_AWAITING_CONTINUE, /* waiting for the 100 Continue header */ @@ -1974,10 +1990,7 @@ struct Curl_easy { particular order. Note that all sockets are added to the sockhash, where the state etc are also kept. This array is mostly used to detect when a socket is to be removed from the hash. See singlesocket(). */ - curl_socket_t sockets[MAX_SOCKSPEREASYHANDLE]; - unsigned char actions[MAX_SOCKSPEREASYHANDLE]; /* action for each socket in - sockets[] */ - int numsocks; + struct easy_pollset last_poll; struct Names dns; struct Curl_multi *multi; /* if non-NULL, points to the multi handle diff --git a/lib/vquic/curl_msh3.c b/lib/vquic/curl_msh3.c index 6bd0d2331..9c94a8ec1 100644 --- a/lib/vquic/curl_msh3.c +++ b/lib/vquic/curl_msh3.c @@ -672,31 +672,25 @@ out: return nwritten; } -static int cf_msh3_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks) +static void cf_msh3_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { struct cf_msh3_ctx *ctx = cf->ctx; struct stream_ctx *stream = H3_STREAM_CTX(data); - int bitmap = GETSOCK_BLANK; struct cf_call_data save; CF_DATA_SAVE(save, cf, data); if(stream && ctx->sock[SP_LOCAL] != CURL_SOCKET_BAD) { - socks[0] = ctx->sock[SP_LOCAL]; - if(stream->recv_error) { - bitmap |= GETSOCK_READSOCK(0); + Curl_pollset_add_in(data, ps, ctx->sock[SP_LOCAL]); drain_stream(cf, data); } else if(stream->req) { - bitmap |= GETSOCK_READSOCK(0); + Curl_pollset_add_out(data, ps, ctx->sock[SP_LOCAL]); drain_stream(cf, data); } } - CURL_TRC_CF(data, cf, "select_sock -> %d", bitmap); - CF_DATA_RESTORE(cf, save); - return bitmap; } static bool cf_msh3_data_pending(struct Curl_cfilter *cf, @@ -1025,7 +1019,7 @@ struct Curl_cftype Curl_cft_http3 = { cf_msh3_connect, cf_msh3_close, Curl_cf_def_get_host, - cf_msh3_get_select_socks, + cf_msh3_adjust_pollset, cf_msh3_data_pending, cf_msh3_send, cf_msh3_recv, diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index 7b765ac61..a967f90b2 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -191,6 +191,7 @@ struct h3_stream_ctx { bool closed; /* TRUE on stream close */ bool reset; /* TRUE on stream reset */ bool send_closed; /* stream is local closed */ + BIT(quic_flow_blocked); /* stream is blocked by QUIC flow control */ }; #define H3_STREAM_CTX(d) ((struct h3_stream_ctx *)(((d) && (d)->req.p.http)? \ @@ -249,6 +250,43 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) } } +static struct Curl_easy *get_stream_easy(struct Curl_cfilter *cf, + struct Curl_easy *data, + int64_t stream_id) +{ + struct Curl_easy *sdata; + + (void)cf; + if(H3_STREAM_ID(data) == stream_id) { + return data; + } + else { + DEBUGASSERT(data->multi); + for(sdata = data->multi->easyp; sdata; sdata = sdata->next) { + if((sdata->conn == data->conn) && H3_STREAM_ID(sdata) == stream_id) { + return sdata; + } + } + } + return NULL; +} + +static void h3_drain_stream(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); + unsigned char bits; + + (void)cf; + bits = CURL_CSELECT_IN; + if(stream && stream->upload_left && !stream->send_closed) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; + Curl_expire(data, 0, EXPIRE_RUN_NOW); + } +} + /* ngtcp2 default congestion controller does not perform pacing. Limit the maximum packet burst to MAX_PKT_BURST packets. */ #define MAX_PKT_BURST 10 @@ -915,6 +953,9 @@ static int cb_extend_max_stream_data(ngtcp2_conn *tconn, int64_t stream_id, { struct Curl_cfilter *cf = user_data; struct cf_ngtcp2_ctx *ctx = cf->ctx; + struct Curl_easy *data = CF_DATA_CURRENT(cf); + struct Curl_easy *s_data; + struct h3_stream_ctx *stream; int rv; (void)tconn; (void)max_data; @@ -924,7 +965,13 @@ static int cb_extend_max_stream_data(ngtcp2_conn *tconn, int64_t stream_id, if(rv) { return NGTCP2_ERR_CALLBACK_FAILURE; } - + s_data = get_stream_easy(cf, data, stream_id); + stream = H3_STREAM_CTX(s_data); + if(stream && stream->quic_flow_blocked) { + CURL_TRC_CF(data, cf, "[%" PRId64 "] unblock quic flow", stream_id); + stream->quic_flow_blocked = FALSE; + h3_drain_stream(cf, data); + } return 0; } @@ -1077,46 +1124,29 @@ static CURLcode check_and_set_expiry(struct Curl_cfilter *cf, return CURLE_OK; } -static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, +static void cf_ngtcp2_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct SingleRequest *k = &data->req; - int rv = GETSOCK_BLANK; - struct h3_stream_ctx *stream = H3_STREAM_CTX(data); - struct cf_call_data save; + bool want_recv = CURL_WANT_RECV(data); + bool want_send = CURL_WANT_SEND(data); - CF_DATA_SAVE(save, cf, data); - socks[0] = ctx->q.sockfd; + if(ctx->qconn && (want_recv || want_send)) { + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); + struct cf_call_data save; + bool c_exhaust, s_exhaust; - /* in HTTP/3 we can always get a frame, so check read */ - rv |= GETSOCK_READSOCK(0); + CF_DATA_SAVE(save, cf, data); + c_exhaust = !ngtcp2_conn_get_cwnd_left(ctx->qconn) || + !ngtcp2_conn_get_max_data_left(ctx->qconn); + s_exhaust = stream && stream->id >= 0 && stream->quic_flow_blocked; + want_recv = (want_recv || c_exhaust || s_exhaust); + want_send = (!s_exhaust && want_send) || + !Curl_bufq_is_empty(&ctx->q.sendbuf); - /* we're still uploading or the HTTP/2 layer wants to send data */ - if((k->keepon & KEEP_SENDBITS) == KEEP_SEND && - ngtcp2_conn_get_cwnd_left(ctx->qconn) && - ngtcp2_conn_get_max_data_left(ctx->qconn) && - stream && nghttp3_conn_is_stream_writable(ctx->h3conn, stream->id)) - rv |= GETSOCK_WRITESOCK(0); - - CF_DATA_RESTORE(cf, save); - return rv; -} - -static void h3_drain_stream(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct h3_stream_ctx *stream = H3_STREAM_CTX(data); - unsigned char bits; - - (void)cf; - bits = CURL_CSELECT_IN; - if(stream && stream->upload_left && !stream->send_closed) - bits |= CURL_CSELECT_OUT; - if(data->state.dselect_bits != bits) { - data->state.dselect_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); + Curl_pollset_set(data, ps, ctx->q.sockfd, want_recv, want_send); + CF_DATA_RESTORE(cf, save); } } @@ -1145,7 +1175,6 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, else { CURL_TRC_CF(data, cf, "[%" PRId64 "] CLOSED", stream->id); } - data->req.keepon &= ~KEEP_SEND_HOLD; h3_drain_stream(cf, data); return 0; } @@ -1577,12 +1606,6 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, if(rv) { return NGTCP2_ERR_CALLBACK_FAILURE; } - if((data->req.keepon & KEEP_SEND_HOLD) && - (data->req.keepon & KEEP_SEND)) { - data->req.keepon &= ~KEEP_SEND_HOLD; - h3_drain_stream(cf, data); - CURL_TRC_CF(data, cf, "[%" PRId64 "] unpausing acks", stream_id); - } } return 0; } @@ -1864,15 +1887,13 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, if(stream && sent > 0 && stream->sendbuf_len_in_flight) { /* We have unacknowledged DATA and cannot report success to our * caller. Instead we EAGAIN and remember how much we have already - * "written" into our various internal connection buffers. - * We put the stream upload on HOLD, until this gets ACKed. */ + * "written" into our various internal connection buffers. */ stream->upload_blocked_len = sent; CURL_TRC_CF(data, cf, "[%" PRId64 "] cf_send(len=%zu), " "%zu bytes in flight -> EGAIN", stream->id, len, stream->sendbuf_len_in_flight); *err = CURLE_AGAIN; sent = -1; - data->req.keepon |= KEEP_SEND_HOLD; } out: @@ -2085,11 +2106,18 @@ static ssize_t read_pkt_to_send(void *userp, } else if(n < 0) { switch(n) { - case NGTCP2_ERR_STREAM_DATA_BLOCKED: + case NGTCP2_ERR_STREAM_DATA_BLOCKED: { + struct h3_stream_ctx *stream = H3_STREAM_CTX(x->data); DEBUGASSERT(ndatalen == -1); nghttp3_conn_block_stream(ctx->h3conn, stream_id); + CURL_TRC_CF(x->data, x->cf, "[%" PRId64 "] block quic flow", + stream_id); + DEBUGASSERT(stream); + if(stream) + stream->quic_flow_blocked = TRUE; n = 0; break; + } case NGTCP2_ERR_STREAM_SHUT_WR: DEBUGASSERT(ndatalen == -1); nghttp3_conn_shutdown_stream_write(ctx->h3conn, stream_id); @@ -2714,7 +2742,7 @@ struct Curl_cftype Curl_cft_http3 = { cf_ngtcp2_connect, cf_ngtcp2_close, Curl_cf_def_get_host, - cf_ngtcp2_get_select_socks, + cf_ngtcp2_adjust_pollset, cf_ngtcp2_data_pending, cf_ngtcp2_send, cf_ngtcp2_recv, diff --git a/lib/vquic/curl_quiche.c b/lib/vquic/curl_quiche.c index 6caa33ed3..f8fa370c8 100644 --- a/lib/vquic/curl_quiche.c +++ b/lib/vquic/curl_quiche.c @@ -105,7 +105,6 @@ struct cf_quiche_ctx { struct bufc_pool stream_bufcp; /* chunk pool for streams */ curl_off_t data_recvd; uint64_t max_idle_ms; /* max idle time for QUIC conn */ - size_t sends_on_hold; /* # of streams with SEND_HOLD set */ BIT(goaway); /* got GOAWAY from server */ BIT(got_first_byte); /* if first byte was received */ BIT(x509_store_setup); /* if x509 store has been set up */ @@ -240,6 +239,7 @@ struct stream_ctx { bool send_closed; /* stream is locally closed */ bool resp_hds_complete; /* complete, final response has been received */ bool resp_got_header; /* TRUE when h3 stream has recvd some HEADER */ + BIT(quic_flow_blocked); /* stream is blocked by QUIC flow control */ }; #define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \ @@ -249,56 +249,20 @@ struct stream_ctx { #define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \ H3_STREAM_CTX(d)->id : -2) -static bool stream_send_is_suspended(struct Curl_easy *data) -{ - return (data->req.keepon & KEEP_SEND_HOLD); -} - -static void stream_send_suspend(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct cf_quiche_ctx *ctx = cf->ctx; - - if((data->req.keepon & KEEP_SENDBITS) == KEEP_SEND) { - data->req.keepon |= KEEP_SEND_HOLD; - ++ctx->sends_on_hold; - if(H3_STREAM_ID(data) >= 0) - CURL_TRC_CF(data, cf, "[%"PRId64"] suspend sending", - H3_STREAM_ID(data)); - else - CURL_TRC_CF(data, cf, "[%s] suspend sending", data->state.url); - } -} - -static void stream_send_resume(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct cf_quiche_ctx *ctx = cf->ctx; - - if(stream_send_is_suspended(data)) { - data->req.keepon &= ~KEEP_SEND_HOLD; - --ctx->sends_on_hold; - if(H3_STREAM_ID(data) >= 0) - CURL_TRC_CF(data, cf, "[%"PRId64"] resume sending", - H3_STREAM_ID(data)); - else - CURL_TRC_CF(data, cf, "[%s] resume sending", data->state.url); - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } -} - static void check_resumes(struct Curl_cfilter *cf, struct Curl_easy *data) { - struct cf_quiche_ctx *ctx = cf->ctx; struct Curl_easy *sdata; + struct stream_ctx *stream; - if(ctx->sends_on_hold) { - DEBUGASSERT(data->multi); - for(sdata = data->multi->easyp; - sdata && ctx->sends_on_hold; sdata = sdata->next) { - if(stream_send_is_suspended(sdata)) { - stream_send_resume(cf, sdata); + DEBUGASSERT(data->multi); + for(sdata = data->multi->easyp; sdata; sdata = sdata->next) { + if(sdata->conn == data->conn) { + stream = H3_STREAM_CTX(sdata); + if(stream && stream->quic_flow_blocked) { + stream->quic_flow_blocked = FALSE; + Curl_expire(data, 0, EXPIRE_RUN_NOW); + CURL_TRC_CF(data, cf, "[%"PRId64"] unblock", stream->id); } } } @@ -327,16 +291,11 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf, static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) { - struct cf_quiche_ctx *ctx = cf->ctx; struct stream_ctx *stream = H3_STREAM_CTX(data); (void)cf; if(stream) { CURL_TRC_CF(data, cf, "[%"PRId64"] easy handle is done", stream->id); - if(stream_send_is_suspended(data)) { - data->req.keepon &= ~KEEP_SEND_HOLD; - --ctx->sends_on_hold; - } Curl_bufq_free(&stream->recvbuf); Curl_h1_req_parse_free(&stream->h1); free(stream); @@ -590,7 +549,6 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf, } stream->closed = TRUE; streamclose(cf->conn, "End of stream"); - data->req.keepon &= ~KEEP_SEND_HOLD; break; case QUICHE_H3_EVENT_GOAWAY: @@ -1037,9 +995,8 @@ static ssize_t h3_open_stream(struct Curl_cfilter *cf, if(QUICHE_H3_ERR_STREAM_BLOCKED == stream3_id) { /* quiche seems to report this error if the connection window is * exhausted. Which happens frequently and intermittent. */ - CURL_TRC_CF(data, cf, "send_request(%s) rejected with BLOCKED", - data->state.url); - stream_send_suspend(cf, data); + CURL_TRC_CF(data, cf, "[%"PRId64"] blocked", stream->id); + stream->quic_flow_blocked = TRUE; *err = CURLE_AGAIN; nwritten = -1; goto out; @@ -1108,7 +1065,7 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data, if(!quiche_conn_stream_writable(ctx->qconn, stream->id, len)) { CURL_TRC_CF(data, cf, "[%" PRId64 "] send_body(len=%zu) " "-> window exhausted", stream->id, len); - stream_send_suspend(cf, data); + stream->quic_flow_blocked = TRUE; } *err = CURLE_AGAIN; nwritten = -1; @@ -1177,30 +1134,32 @@ static bool stream_is_writeable(struct Curl_cfilter *cf, struct cf_quiche_ctx *ctx = cf->ctx; struct stream_ctx *stream = H3_STREAM_CTX(data); - return stream && - quiche_conn_stream_writable(ctx->qconn, (uint64_t)stream->id, 1); + return stream && (quiche_conn_stream_writable(ctx->qconn, + (uint64_t)stream->id, 1) > 0); } -static int cf_quiche_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks) +static void cf_quiche_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { struct cf_quiche_ctx *ctx = cf->ctx; - struct SingleRequest *k = &data->req; - int rv = GETSOCK_BLANK; + bool want_recv = CURL_WANT_RECV(data); + bool want_send = CURL_WANT_SEND(data); - socks[0] = ctx->q.sockfd; + if(ctx->qconn && (want_recv || want_send)) { + struct stream_ctx *stream = H3_STREAM_CTX(data); + bool c_exhaust, s_exhaust; - /* in an HTTP/3 connection we can basically always get a frame so we should - always be ready for one */ - rv |= GETSOCK_READSOCK(0); + c_exhaust = FALSE; /* Have not found any call in quiche that tells + us if the connection itself is blocked */ + s_exhaust = stream && stream->id >= 0 && + (stream->quic_flow_blocked || !stream_is_writeable(cf, data)); + want_recv = (want_recv || c_exhaust || s_exhaust); + want_send = (!s_exhaust && want_send) || + !Curl_bufq_is_empty(&ctx->q.sendbuf); - /* we're still uploading or the HTTP/3 layer wants to send data */ - if(((k->keepon & KEEP_SENDBITS) == KEEP_SEND) - && stream_is_writeable(cf, data)) - rv |= GETSOCK_WRITESOCK(0); - - return rv; + Curl_pollset_set(data, ps, ctx->q.sockfd, want_recv, want_send); + } } /* @@ -1674,7 +1633,7 @@ struct Curl_cftype Curl_cft_http3 = { cf_quiche_connect, cf_quiche_close, Curl_cf_def_get_host, - cf_quiche_get_select_socks, + cf_quiche_adjust_pollset, cf_quiche_data_pending, cf_quiche_send, cf_quiche_recv, diff --git a/lib/vtls/bearssl.c b/lib/vtls/bearssl.c index 934149c1b..c7581c831 100644 --- a/lib/vtls/bearssl.c +++ b/lib/vtls/bearssl.c @@ -749,26 +749,26 @@ static CURLcode bearssl_connect_step1(struct Curl_cfilter *cf, return CURLE_OK; } -static int bearssl_get_select_socks(struct Curl_cfilter *cf, - struct Curl_easy *data, - curl_socket_t *socks) +static void bearssl_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { - struct ssl_connect_data *connssl = cf->ctx; - curl_socket_t sock = Curl_conn_cf_get_socket(cf->next, data); + if(!cf->connected) { + curl_socket_t sock = Curl_conn_cf_get_socket(cf->next, data); + if(sock != CURL_SOCKET_BAD) { + struct ssl_connect_data *connssl = cf->ctx; + struct bearssl_ssl_backend_data *backend = + (struct bearssl_ssl_backend_data *)connssl->backend; + unsigned state = br_ssl_engine_current_state(&backend->ctx.eng); - if(sock == CURL_SOCKET_BAD) - return GETSOCK_BLANK; - else { - struct bearssl_ssl_backend_data *backend = - (struct bearssl_ssl_backend_data *)connssl->backend; - unsigned state = br_ssl_engine_current_state(&backend->ctx.eng); - if(state & BR_SSL_SENDREC) { - socks[0] = sock; - return GETSOCK_WRITESOCK(0); + if(state & BR_SSL_SENDREC) { + Curl_pollset_set_out_only(data, ps, sock); + } + else { + Curl_pollset_set_in_only(data, ps, sock); + } } } - socks[0] = sock; - return GETSOCK_READSOCK(0); } static CURLcode bearssl_run_until(struct Curl_cfilter *cf, @@ -1210,7 +1210,7 @@ const struct Curl_ssl Curl_ssl_bearssl = { Curl_none_cert_status_request, /* cert_status_request */ bearssl_connect, /* connect */ bearssl_connect_nonblocking, /* connect_nonblocking */ - bearssl_get_select_socks, /* getsock */ + bearssl_adjust_pollset, /* adjust_pollset */ bearssl_get_internals, /* get_internals */ bearssl_close, /* close_one */ Curl_none_close_all, /* close_all */ diff --git a/lib/vtls/gtls.c b/lib/vtls/gtls.c index a7ad454cf..1dd1a7998 100644 --- a/lib/vtls/gtls.c +++ b/lib/vtls/gtls.c @@ -1672,7 +1672,7 @@ const struct Curl_ssl Curl_ssl_gnutls = { gtls_cert_status_request, /* cert_status_request */ gtls_connect, /* connect */ gtls_connect_nonblocking, /* connect_nonblocking */ - Curl_ssl_get_select_socks, /* getsock */ + Curl_ssl_adjust_pollset, /* adjust_pollset */ gtls_get_internals, /* get_internals */ gtls_close, /* close_one */ Curl_none_close_all, /* close_all */ diff --git a/lib/vtls/mbedtls.c b/lib/vtls/mbedtls.c index 2f994d741..bc08fba34 100644 --- a/lib/vtls/mbedtls.c +++ b/lib/vtls/mbedtls.c @@ -1274,7 +1274,7 @@ const struct Curl_ssl Curl_ssl_mbedtls = { Curl_none_cert_status_request, /* cert_status_request */ mbedtls_connect, /* connect */ mbedtls_connect_nonblocking, /* connect_nonblocking */ - Curl_ssl_get_select_socks, /* getsock */ + Curl_ssl_adjust_pollset, /* adjust_pollset */ mbedtls_get_internals, /* get_internals */ mbedtls_close, /* close_one */ mbedtls_close_all, /* close_all */ diff --git a/lib/vtls/openssl.c b/lib/vtls/openssl.c index 88f4d4629..9b36f3376 100644 --- a/lib/vtls/openssl.c +++ b/lib/vtls/openssl.c @@ -4915,7 +4915,7 @@ const struct Curl_ssl Curl_ssl_openssl = { ossl_cert_status_request, /* cert_status_request */ ossl_connect, /* connect */ ossl_connect_nonblocking, /* connect_nonblocking */ - Curl_ssl_get_select_socks,/* getsock */ + Curl_ssl_adjust_pollset, /* adjust_pollset */ ossl_get_internals, /* get_internals */ ossl_close, /* close_one */ ossl_close_all, /* close_all */ diff --git a/lib/vtls/rustls.c b/lib/vtls/rustls.c index a3e9d964c..691179d25 100644 --- a/lib/vtls/rustls.c +++ b/lib/vtls/rustls.c @@ -589,32 +589,28 @@ cr_connect_nonblocking(struct Curl_cfilter *cf, DEBUGASSERT(false); } -/* returns a bitmap of flags for this connection's first socket indicating - whether we want to read or write */ -static int -cr_get_select_socks(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) +static void cr_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) { - struct ssl_connect_data *const connssl = cf->ctx; - curl_socket_t sockfd = Curl_conn_cf_get_socket(cf, data); - struct rustls_ssl_backend_data *const backend = - (struct rustls_ssl_backend_data *)connssl->backend; - struct rustls_connection *rconn = NULL; + if(!cf->connected) { + curl_socket_t sock = Curl_conn_cf_get_socket(cf->next, data); + struct ssl_connect_data *const connssl = cf->ctx; + struct rustls_ssl_backend_data *const backend = + (struct rustls_ssl_backend_data *)connssl->backend; + struct rustls_connection *rconn = NULL; - (void)data; - DEBUGASSERT(backend); - rconn = backend->conn; + (void)data; + DEBUGASSERT(backend); + rconn = backend->conn; - if(rustls_connection_wants_write(rconn)) { - socks[0] = sockfd; - return GETSOCK_WRITESOCK(0); + if(rustls_connection_wants_write(rconn)) { + Curl_pollset_add_out(data, ps, sock); + } + if(rustls_connection_wants_read(rconn)) { + Curl_pollset_add_in(data, ps, sock); + } } - if(rustls_connection_wants_read(rconn)) { - socks[0] = sockfd; - return GETSOCK_READSOCK(0); - } - - return GETSOCK_BLANK; } static void * @@ -677,7 +673,7 @@ const struct Curl_ssl Curl_ssl_rustls = { Curl_none_cert_status_request, /* cert_status_request */ cr_connect, /* connect */ cr_connect_nonblocking, /* connect_nonblocking */ - cr_get_select_socks, /* get_select_socks */ + cr_adjust_pollset, /* adjust_pollset */ cr_get_internals, /* get_internals */ cr_close, /* close_one */ Curl_none_close_all, /* close_all */ diff --git a/lib/vtls/schannel.c b/lib/vtls/schannel.c index 410a5c4ec..e93aa2389 100644 --- a/lib/vtls/schannel.c +++ b/lib/vtls/schannel.c @@ -2777,7 +2777,7 @@ const struct Curl_ssl Curl_ssl_schannel = { Curl_none_cert_status_request, /* cert_status_request */ schannel_connect, /* connect */ schannel_connect_nonblocking, /* connect_nonblocking */ - Curl_ssl_get_select_socks, /* getsock */ + Curl_ssl_adjust_pollset, /* adjust_pollset */ schannel_get_internals, /* get_internals */ schannel_close, /* close_one */ Curl_none_close_all, /* close_all */ diff --git a/lib/vtls/sectransp.c b/lib/vtls/sectransp.c index 3378f7619..61375cbf2 100644 --- a/lib/vtls/sectransp.c +++ b/lib/vtls/sectransp.c @@ -3483,7 +3483,7 @@ const struct Curl_ssl Curl_ssl_sectransp = { Curl_none_cert_status_request, /* cert_status_request */ sectransp_connect, /* connect */ sectransp_connect_nonblocking, /* connect_nonblocking */ - Curl_ssl_get_select_socks, /* getsock */ + Curl_ssl_adjust_pollset, /* adjust_pollset */ sectransp_get_internals, /* get_internals */ sectransp_close, /* close_one */ Curl_none_close_all, /* close_all */ diff --git a/lib/vtls/vtls.c b/lib/vtls/vtls.c index 494b660a9..eea71f2b4 100644 --- a/lib/vtls/vtls.c +++ b/lib/vtls/vtls.c @@ -629,22 +629,21 @@ void Curl_ssl_close_all(struct Curl_easy *data) Curl_ssl->close_all(data); } -int Curl_ssl_get_select_socks(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) +void Curl_ssl_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, + struct easy_pollset *ps) { - struct ssl_connect_data *connssl = cf->ctx; - curl_socket_t sock = Curl_conn_cf_get_socket(cf->next, data); - - if(sock == CURL_SOCKET_BAD) - return GETSOCK_BLANK; - - if(connssl->connecting_state == ssl_connect_2_writing) { - /* we are only interested in writing */ - socks[0] = sock; - return GETSOCK_WRITESOCK(0); + if(!cf->connected) { + struct ssl_connect_data *connssl = cf->ctx; + curl_socket_t sock = Curl_conn_cf_get_socket(cf->next, data); + if(sock != CURL_SOCKET_BAD) { + if(connssl->connecting_state == ssl_connect_2_writing) { + Curl_pollset_set_out_only(data, ps, sock); + } + else { + Curl_pollset_set_in_only(data, ps, sock); + } + } } - socks[0] = sock; - return GETSOCK_READSOCK(0); } /* Selects an SSL crypto engine @@ -1156,13 +1155,13 @@ static CURLcode multissl_connect_nonblocking(struct Curl_cfilter *cf, return Curl_ssl->connect_nonblocking(cf, data, done); } -static int multissl_get_select_socks(struct Curl_cfilter *cf, +static void multissl_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { if(multissl_setup(NULL)) - return 0; - return Curl_ssl->get_select_socks(cf, data, socks); + return; + Curl_ssl->adjust_pollset(cf, data, ps); } static void *multissl_get_internals(struct ssl_connect_data *connssl, @@ -1214,7 +1213,7 @@ static const struct Curl_ssl Curl_ssl_multi = { Curl_none_cert_status_request, /* cert_status_request */ multissl_connect, /* connect */ multissl_connect_nonblocking, /* connect_nonblocking */ - multissl_get_select_socks, /* getsock */ + multissl_adjust_pollset, /* adjust_pollset */ multissl_get_internals, /* get_internals */ multissl_close, /* close_one */ Curl_none_close_all, /* close_all */ @@ -1599,22 +1598,17 @@ static ssize_t ssl_cf_recv(struct Curl_cfilter *cf, return nread; } -static int ssl_cf_get_select_socks(struct Curl_cfilter *cf, +static void ssl_cf_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks) + struct easy_pollset *ps) { struct cf_call_data save; - int fds = GETSOCK_BLANK; - if(!cf->next->connected) { - fds = cf->next->cft->get_select_socks(cf->next, data, socks); - } - else if(!cf->connected) { + if(!cf->connected) { CF_DATA_SAVE(save, cf, data); - fds = Curl_ssl->get_select_socks(cf, data, socks); + Curl_ssl->adjust_pollset(cf, data, ps); CF_DATA_RESTORE(cf, save); } - return fds; } static CURLcode ssl_cf_cntrl(struct Curl_cfilter *cf, @@ -1705,7 +1699,7 @@ struct Curl_cftype Curl_cft_ssl = { ssl_cf_connect, ssl_cf_close, Curl_cf_def_get_host, - ssl_cf_get_select_socks, + ssl_cf_adjust_pollset, ssl_cf_data_pending, ssl_cf_send, ssl_cf_recv, @@ -1723,7 +1717,7 @@ struct Curl_cftype Curl_cft_ssl_proxy = { ssl_cf_connect, ssl_cf_close, Curl_cf_def_get_host, - ssl_cf_get_select_socks, + ssl_cf_adjust_pollset, ssl_cf_data_pending, ssl_cf_send, ssl_cf_recv, diff --git a/lib/vtls/vtls_int.h b/lib/vtls/vtls_int.h index a6e4544a8..3581087e9 100644 --- a/lib/vtls/vtls_int.h +++ b/lib/vtls/vtls_int.h @@ -118,14 +118,11 @@ struct Curl_ssl { struct Curl_easy *data, bool *done); - /* If the SSL backend wants to read or write on this connection during a - handshake, set socks[0] to the connection's FIRSTSOCKET, and return - a bitmap indicating read or write with GETSOCK_WRITESOCK(0) or - GETSOCK_READSOCK(0). Otherwise return GETSOCK_BLANK. - Mandatory. */ - int (*get_select_socks)(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks); - + /* During handshake, adjust the pollset to include the socket + * for POLLOUT or POLLIN as needed. + * Mandatory. */ + void (*adjust_pollset)(struct Curl_cfilter *cf, struct Curl_easy *data, + struct easy_pollset *ps); void *(*get_internals)(struct ssl_connect_data *connssl, CURLINFO info); void (*close)(struct Curl_cfilter *cf, struct Curl_easy *data); void (*close_all)(struct Curl_easy *data); @@ -169,8 +166,8 @@ CURLcode Curl_none_set_engine(struct Curl_easy *data, const char *engine); CURLcode Curl_none_set_engine_default(struct Curl_easy *data); struct curl_slist *Curl_none_engines_list(struct Curl_easy *data); bool Curl_none_false_start(void); -int Curl_ssl_get_select_socks(struct Curl_cfilter *cf, struct Curl_easy *data, - curl_socket_t *socks); +void Curl_ssl_adjust_pollset(struct Curl_cfilter *cf, struct Curl_easy *data, + struct easy_pollset *ps); /** * Get the ssl_config_data in `data` that is relevant for cfilter `cf`. diff --git a/lib/vtls/wolfssl.c b/lib/vtls/wolfssl.c index 135ba697e..4a9bf3388 100644 --- a/lib/vtls/wolfssl.c +++ b/lib/vtls/wolfssl.c @@ -1398,7 +1398,7 @@ const struct Curl_ssl Curl_ssl_wolfssl = { Curl_none_cert_status_request, /* cert_status_request */ wolfssl_connect, /* connect */ wolfssl_connect_nonblocking, /* connect_nonblocking */ - Curl_ssl_get_select_socks, /* getsock */ + Curl_ssl_adjust_pollset, /* adjust_pollset */ wolfssl_get_internals, /* get_internals */ wolfssl_close, /* close_one */ Curl_none_close_all, /* close_all */ diff --git a/tests/http/testenv/curl.py b/tests/http/testenv/curl.py index 9f92f629c..ed05b79db 100644 --- a/tests/http/testenv/curl.py +++ b/tests/http/testenv/curl.py @@ -504,8 +504,8 @@ class CurlClient: args = [self._curl, "-s", "--path-as-is"] if with_headers: args.extend(["-D", self._headerfile]) - if def_tracing is not False: - args.extend(['-v', '--trace-config', 'ids,time']) + if def_tracing is not False and not self._silent: + args.extend(['-v', '--trace-ids', '--trace-time']) if self.env.verbose > 1: args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy']) pass diff --git a/tests/unit/unit2600.c b/tests/unit/unit2600.c index d7b1efdd7..3e9ecb7b0 100644 --- a/tests/unit/unit2600.c +++ b/tests/unit/unit2600.c @@ -162,7 +162,7 @@ static struct Curl_cftype cft_test = { cf_test_connect, Curl_cf_def_close, Curl_cf_def_get_host, - Curl_cf_def_get_select_socks, + Curl_cf_def_adjust_pollset, Curl_cf_def_data_pending, Curl_cf_def_send, Curl_cf_def_recv,