Since data can be held in connection filter buffers when sending gives
EAGAIN, add methods to query this and perform flushing of those buffers.

The transfer loop will continue sending until all upload data is
processed and the connection is flushed.

- add `CF_QUERY_SEND_PENDING` to query filters
- add `CF_CTRL_DATA_SEND_FLUSH` to flush filters
- change `Curl_req_want_send()` to query the connection
  if it needs flushing
- use `Curl_req_want_send()` to determine the POLLOUT
  in the PERFORMING multi state
- implement flush handling in the HTTP/2 connection filter

Closes #14271
This commit is contained in:
Stefan Eissing 2024-07-25 13:10:01 +02:00 коммит произвёл Daniel Stenberg
Родитель 911c3166b6
Коммит 709a6a3965
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 5CC908FDB71E12C2
10 изменённых файлов: 278 добавлений и 100 удалений

Просмотреть файл

@ -73,7 +73,6 @@ struct tunnel_stream {
char *authority;
int32_t stream_id;
uint32_t error;
size_t upload_blocked_len;
h2_tunnel_state state;
BIT(has_final_response);
BIT(closed);
@ -217,11 +216,13 @@ static void drain_tunnel(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct tunnel_stream *tunnel)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(!tunnel->closed && !tunnel->reset && tunnel->upload_blocked_len)
if(!tunnel->closed && !tunnel->reset &&
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf))
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
@ -1231,7 +1232,9 @@ static void cf_h2_proxy_adjust_pollset(struct Curl_cfilter *cf,
bool want_recv, want_send;
if(!cf->connected && ctx->h2) {
want_send = nghttp2_session_want_write(ctx->h2);
want_send = nghttp2_session_want_write(ctx->h2) ||
!Curl_bufq_is_empty(&ctx->outbufq) ||
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf);
want_recv = nghttp2_session_want_read(ctx->h2);
}
else
@ -1247,17 +1250,25 @@ static void cf_h2_proxy_adjust_pollset(struct Curl_cfilter *cf,
ctx->h2, ctx->tunnel.stream_id);
want_recv = (want_recv || c_exhaust || s_exhaust);
want_send = (!s_exhaust && want_send) ||
(!c_exhaust && nghttp2_session_want_write(ctx->h2));
(!c_exhaust && nghttp2_session_want_write(ctx->h2)) ||
!Curl_bufq_is_empty(&ctx->outbufq) ||
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf);
Curl_pollset_set(data, ps, sock, want_recv, want_send);
CURL_TRC_CF(data, cf, "adjust_pollset, want_recv=%d want_send=%d",
want_recv, want_send);
CF_DATA_RESTORE(cf, save);
}
else if(ctx->sent_goaway && !cf->shutdown) {
/* shutdown in progress */
CF_DATA_SAVE(save, cf, data);
want_send = nghttp2_session_want_write(ctx->h2);
want_send = nghttp2_session_want_write(ctx->h2) ||
!Curl_bufq_is_empty(&ctx->outbufq) ||
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf);
want_recv = nghttp2_session_want_read(ctx->h2);
Curl_pollset_set(data, ps, sock, want_recv, want_send);
CURL_TRC_CF(data, cf, "adjust_pollset, want_recv=%d want_send=%d",
want_recv, want_send);
CF_DATA_RESTORE(cf, save);
}
}
@ -1364,16 +1375,7 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf,
}
result = proxy_h2_progress_egress(cf, data);
if(result == CURLE_AGAIN) {
/* pending data to send, need to be called again. Ideally, we would
* monitor the socket for POLLOUT, but we might not be in SENDING
* transfer state any longer and are unable to make this happen.
*/
CURL_TRC_CF(data, cf, "[%d] egress blocked, DRAIN",
ctx->tunnel.stream_id);
drain_tunnel(cf, data, &ctx->tunnel);
}
else if(result) {
if(result && (result != CURLE_AGAIN)) {
*err = result;
nread = -1;
}
@ -1401,7 +1403,6 @@ static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf,
int rv;
ssize_t nwritten;
CURLcode result;
int blocked = 0;
(void)eos; /* TODO, maybe useful for blocks? */
if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
@ -1415,29 +1416,10 @@ static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf,
*err = CURLE_SEND_ERROR;
goto out;
}
else if(ctx->tunnel.upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
DEBUGASSERT(len >= ctx->tunnel.upload_blocked_len);
if(len < ctx->tunnel.upload_blocked_len) {
/* Did we get called again with a smaller `len`? This should not
* happen. We are not prepared to handle that. */
failf(data, "HTTP/2 proxy, send again with decreased length");
*err = CURLE_HTTP2;
nwritten = -1;
goto out;
}
nwritten = (ssize_t)ctx->tunnel.upload_blocked_len;
ctx->tunnel.upload_blocked_len = 0;
*err = CURLE_OK;
}
else {
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, err);
if(nwritten < 0) {
if(*err != CURLE_AGAIN)
goto out;
nwritten = 0;
}
if(nwritten < 0 && (*err != CURLE_AGAIN))
goto out;
}
if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
@ -1460,52 +1442,13 @@ static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf,
/* Call the nghttp2 send loop and flush to write ALL buffered data,
* headers and/or request body completely out to the network */
result = proxy_h2_progress_egress(cf, data);
if(result == CURLE_AGAIN) {
blocked = 1;
}
else if(result) {
if(result && (result != CURLE_AGAIN)) {
*err = result;
nwritten = -1;
goto out;
}
else if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
/* although we wrote everything that nghttp2 wants to send now,
* there is data left in our stream send buffer unwritten. This may
* be due to the stream's HTTP/2 flow window being exhausted. */
blocked = 1;
}
if(blocked) {
/* Unable to send all data, due to connection blocked or H2 window
* exhaustion. Data is left in our stream buffer, or nghttp2's internal
* frame buffer or our network out buffer. */
size_t rwin = (size_t)nghttp2_session_get_stream_remote_window_size(
ctx->h2, ctx->tunnel.stream_id);
if(rwin == 0) {
/* H2 flow window exhaustion.
* FIXME: there is no way to HOLD all transfers that use this
* proxy connection AND to UNHOLD all of them again when the
* window increases.
* We *could* iterate over all data on this conn maybe? */
CURL_TRC_CF(data, cf, "[%d] remote flow "
"window is exhausted", ctx->tunnel.stream_id);
}
/* Whatever the cause, we need to return CURL_EAGAIN for this call.
* We have unwritten state that needs us being invoked again and EAGAIN
* is the only way to ensure that. */
ctx->tunnel.upload_blocked_len = nwritten;
CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) BLOCK: win %u/%zu "
"blocked_len=%zu",
ctx->tunnel.stream_id, len,
nghttp2_session_get_remote_window_size(ctx->h2), rwin,
nwritten);
drain_tunnel(cf, data, &ctx->tunnel);
*err = CURLE_AGAIN;
nwritten = -1;
goto out;
}
else if(proxy_h2_should_close_session(ctx)) {
if(proxy_h2_should_close_session(ctx)) {
/* nghttp2 thinks this session is done. If the stream has not been
* closed, this is an error state for out transfer */
if(ctx->tunnel.closed) {
@ -1538,6 +1481,38 @@ out:
return nwritten;
}
static CURLcode cf_h2_proxy_flush(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
struct cf_call_data save;
CURLcode result = CURLE_OK;
CF_DATA_SAVE(save, cf, data);
if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
/* resume the potentially suspended tunnel */
int rv = nghttp2_session_resume_data(ctx->h2, ctx->tunnel.stream_id);
if(nghttp2_is_fatal(rv)) {
result = CURLE_SEND_ERROR;
goto out;
}
}
result = proxy_h2_progress_egress(cf, data);
out:
CURL_TRC_CF(data, cf, "[%d] flush -> %d, "
"h2 windows %d-%d (stream-conn), buffers %zu-%zu (stream-conn)",
ctx->tunnel.stream_id, result,
nghttp2_session_get_stream_remote_window_size(
ctx->h2, ctx->tunnel.stream_id),
nghttp2_session_get_remote_window_size(ctx->h2),
Curl_bufq_len(&ctx->tunnel.sendbuf),
Curl_bufq_len(&ctx->outbufq));
CF_DATA_RESTORE(cf, save);
return result;
}
static bool proxy_h2_connisalive(struct Curl_cfilter *cf,
struct Curl_easy *data,
bool *input_pending)
@ -1591,6 +1566,51 @@ static bool cf_h2_proxy_is_alive(struct Curl_cfilter *cf,
return result;
}
static CURLcode cf_h2_proxy_query(struct Curl_cfilter *cf,
struct Curl_easy *data,
int query, int *pres1, void *pres2)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
switch(query) {
case CF_QUERY_NEED_FLUSH: {
if(!Curl_bufq_is_empty(&ctx->outbufq) ||
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
*pres1 = TRUE;
return CURLE_OK;
}
break;
}
default:
break;
}
return cf->next?
cf->next->cft->query(cf->next, data, query, pres1, pres2) :
CURLE_UNKNOWN_OPTION;
}
static CURLcode cf_h2_proxy_cntrl(struct Curl_cfilter *cf,
struct Curl_easy *data,
int event, int arg1, void *arg2)
{
CURLcode result = CURLE_OK;
struct cf_call_data save;
(void)arg1;
(void)arg2;
switch(event) {
case CF_CTRL_FLUSH:
CF_DATA_SAVE(save, cf, data);
result = cf_h2_proxy_flush(cf, data);
CF_DATA_RESTORE(cf, save);
break;
default:
break;
}
return result;
}
struct Curl_cftype Curl_cft_h2_proxy = {
"H2-PROXY",
CF_TYPE_IP_CONNECT|CF_TYPE_PROXY,
@ -1604,10 +1624,10 @@ struct Curl_cftype Curl_cft_h2_proxy = {
cf_h2_proxy_data_pending,
cf_h2_proxy_send,
cf_h2_proxy_recv,
Curl_cf_def_cntrl,
cf_h2_proxy_cntrl,
cf_h2_proxy_is_alive,
Curl_cf_def_conn_keep_alive,
Curl_cf_def_query,
cf_h2_proxy_query,
};
CURLcode Curl_cf_h2_proxy_insert_after(struct Curl_cfilter *cf,

Просмотреть файл

@ -504,6 +504,17 @@ bool Curl_conn_data_pending(struct Curl_easy *data, int sockindex)
return FALSE;
}
bool Curl_conn_needs_flush(struct Curl_easy *data, int sockindex)
{
CURLcode result;
int pending = FALSE;
struct Curl_cfilter *cf = data->conn->cfilter[sockindex];
result = cf? cf->cft->query(cf, data, CF_QUERY_NEED_FLUSH,
&pending, NULL) : CURLE_UNKNOWN_OPTION;
return (result || pending == FALSE)? FALSE : TRUE;
}
void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct easy_pollset *ps)
@ -696,6 +707,13 @@ CURLcode Curl_conn_ev_data_idle(struct Curl_easy *data)
CF_CTRL_DATA_IDLE, 0, NULL);
}
CURLcode Curl_conn_flush(struct Curl_easy *data, int sockindex)
{
return Curl_conn_cf_cntrl(data->conn->cfilter[sockindex], data, FALSE,
CF_CTRL_FLUSH, 0, NULL);
}
/**
* Notify connection filters that the transfer represented by `data`
* is done with sending data (e.g. has uploaded everything).

Просмотреть файл

@ -141,6 +141,7 @@ typedef CURLcode Curl_cft_conn_keep_alive(struct Curl_cfilter *cf,
/* update conn info at connection and data */
#define CF_CTRL_CONN_INFO_UPDATE (256+0) /* 0 NULL ignored */
#define CF_CTRL_FORGET_SOCKET (256+1) /* 0 NULL ignored */
#define CF_CTRL_FLUSH (256+2) /* 0 NULL first fail */
/**
* Handle event/control for the filter.
@ -163,6 +164,7 @@ typedef CURLcode Curl_cft_cntrl(struct Curl_cfilter *cf,
* were received.
* -1 if not determined yet.
* - CF_QUERY_SOCKET: the socket used by the filter chain
* - CF_QUERY_NEED_FLUSH: TRUE iff any of the filters have unsent data
*/
/* query res1 res2 */
#define CF_QUERY_MAX_CONCURRENT 1 /* number - */
@ -171,6 +173,7 @@ typedef CURLcode Curl_cft_cntrl(struct Curl_cfilter *cf,
#define CF_QUERY_TIMER_CONNECT 4 /* - struct curltime */
#define CF_QUERY_TIMER_APPCONNECT 5 /* - struct curltime */
#define CF_QUERY_STREAM_ERROR 6 /* error code - */
#define CF_QUERY_NEED_FLUSH 7 /* TRUE/FALSE - */
/**
* Query the cfilter for properties. Filters ignorant of a query will
@ -401,6 +404,17 @@ CURLcode Curl_conn_shutdown(struct Curl_easy *data, int sockindex, bool *done);
bool Curl_conn_data_pending(struct Curl_easy *data,
int sockindex);
/**
* Return TRUE if any of the connection filters at chain `sockindex`
* have data still to send.
*/
bool Curl_conn_needs_flush(struct Curl_easy *data, int sockindex);
/**
* Flush any pending data on the connection filters at chain `sockindex`.
*/
CURLcode Curl_conn_flush(struct Curl_easy *data, int sockindex);
/**
* Return the socket used on data's connection for the index.
* Returns CURL_SOCKET_BAD if not available.

Просмотреть файл

@ -4431,8 +4431,16 @@ static CURLcode cr_exp100_read(struct Curl_easy *data,
switch(ctx->state) {
case EXP100_SENDING_REQUEST:
if(!Curl_req_sendbuf_empty(data)) {
/* The initial request data has not been fully sent yet. Do
* not start the timer yet. */
DEBUGF(infof(data, "cr_exp100_read, request not full sent yet"));
*nread = 0;
*eos = FALSE;
return CURLE_OK;
}
/* We are now waiting for a reply from the server or
* a timeout on our side */
* a timeout on our side IFF the request has been fully sent. */
DEBUGF(infof(data, "cr_exp100_read, start AWAITING_CONTINUE"));
ctx->state = EXP100_AWAITING_CONTINUE;
ctx->start = Curl_now();

Просмотреть файл

@ -2413,6 +2413,48 @@ out:
return nwritten;
}
static CURLcode cf_h2_flush(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
struct cf_call_data save;
CURLcode result = CURLE_OK;
CF_DATA_SAVE(save, cf, data);
if(stream && !Curl_bufq_is_empty(&stream->sendbuf)) {
/* resume the potentially suspended stream */
int rv = nghttp2_session_resume_data(ctx->h2, stream->id);
if(nghttp2_is_fatal(rv)) {
result = CURLE_SEND_ERROR;
goto out;
}
}
result = h2_progress_egress(cf, data);
out:
if(stream) {
CURL_TRC_CF(data, cf, "[%d] flush -> %d, "
"h2 windows %d-%d (stream-conn), "
"buffers %zu-%zu (stream-conn)",
stream->id, result,
nghttp2_session_get_stream_remote_window_size(
ctx->h2, stream->id),
nghttp2_session_get_remote_window_size(ctx->h2),
Curl_bufq_len(&stream->sendbuf),
Curl_bufq_len(&ctx->outbufq));
}
else {
CURL_TRC_CF(data, cf, "flush -> %d, "
"connection-window=%d, nw_send_buffer(%zu)",
result, nghttp2_session_get_remote_window_size(ctx->h2),
Curl_bufq_len(&ctx->outbufq));
}
CF_DATA_RESTORE(cf, save);
return result;
}
static void cf_h2_adjust_pollset(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct easy_pollset *ps)
@ -2622,6 +2664,9 @@ static CURLcode cf_h2_cntrl(struct Curl_cfilter *cf,
case CF_CTRL_DATA_PAUSE:
result = http2_data_pause(cf, data, (arg1 != 0));
break;
case CF_CTRL_FLUSH:
result = cf_h2_flush(cf, data);
break;
case CF_CTRL_DATA_DONE_SEND:
result = http2_data_done_send(cf, data);
break;
@ -2706,6 +2751,15 @@ static CURLcode cf_h2_query(struct Curl_cfilter *cf,
*pres1 = stream? (int)stream->error : 0;
return CURLE_OK;
}
case CF_QUERY_NEED_FLUSH: {
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
if(!Curl_bufq_is_empty(&ctx->outbufq) ||
(stream && !Curl_bufq_is_empty(&stream->sendbuf))) {
*pres1 = TRUE;
return CURLE_OK;
}
break;
}
default:
break;
}

Просмотреть файл

@ -1101,7 +1101,7 @@ static int perform_getsock(struct Curl_easy *data, curl_socket_t *sock)
sock[sockindex] = conn->sockfd;
}
if(CURL_WANT_SEND(data)) {
if(Curl_req_want_send(data)) {
if((conn->sockfd != conn->writesockfd) ||
bitmap == GETSOCK_BLANK) {
/* only if they are not the same socket and we have a readable

Просмотреть файл

@ -257,6 +257,7 @@ static CURLcode req_set_upload_done(struct Curl_easy *data)
Curl_creader_done(data, data->req.upload_aborted);
if(data->req.upload_aborted) {
Curl_bufq_reset(&data->req.sendbuf);
if(data->req.writebytecount)
infof(data, "abort upload after having sent %" CURL_FORMAT_CURL_OFF_T
" bytes", data->req.writebytecount);
@ -286,9 +287,15 @@ static CURLcode req_flush(struct Curl_easy *data)
if(result)
return result;
if(!Curl_bufq_is_empty(&data->req.sendbuf)) {
DEBUGF(infof(data, "Curl_req_flush(len=%zu) -> EAGAIN",
Curl_bufq_len(&data->req.sendbuf)));
return CURLE_AGAIN;
}
}
else if(Curl_xfer_needs_flush(data)) {
DEBUGF(infof(data, "Curl_req_flush(), xfer send_pending"));
return Curl_xfer_flush(data);
}
if(!data->req.upload_done && data->req.eos_read &&
Curl_bufq_is_empty(&data->req.sendbuf)) {
@ -374,18 +381,26 @@ CURLcode Curl_req_send(struct Curl_easy *data, struct dynbuf *req)
}
#endif /* !USE_HYPER */
bool Curl_req_sendbuf_empty(struct Curl_easy *data)
{
return !data->req.sendbuf_init || Curl_bufq_is_empty(&data->req.sendbuf);
}
bool Curl_req_want_send(struct Curl_easy *data)
{
return data->req.sendbuf_init && !Curl_bufq_is_empty(&data->req.sendbuf);
/* Not done and
* - KEEP_SEND and not PAUSEd.
* - or request has buffered data to send
* - or transfer connection has pending data to send */
return !data->req.done &&
(((data->req.keepon & KEEP_SENDBITS) == KEEP_SEND) ||
!Curl_req_sendbuf_empty(data) ||
Curl_xfer_needs_flush(data));
}
bool Curl_req_done_sending(struct Curl_easy *data)
{
if(data->req.upload_done) {
DEBUGASSERT(Curl_bufq_is_empty(&data->req.sendbuf));
return TRUE;
}
return FALSE;
return data->req.upload_done && !Curl_req_want_send(data);
}
CURLcode Curl_req_send_more(struct Curl_easy *data)
@ -393,7 +408,10 @@ CURLcode Curl_req_send_more(struct Curl_easy *data)
CURLcode result;
/* Fill our send buffer if more from client can be read. */
if(!data->req.eos_read && !Curl_bufq_is_full(&data->req.sendbuf)) {
if(!data->req.upload_aborted &&
!data->req.eos_read &&
!(data->req.keepon & KEEP_SEND_PAUSE) &&
!Curl_bufq_is_full(&data->req.sendbuf)) {
ssize_t nread = Curl_bufq_sipn(&data->req.sendbuf, 0,
add_from_client, data, &result);
if(nread < 0 && result != CURLE_AGAIN)
@ -412,7 +430,18 @@ CURLcode Curl_req_abort_sending(struct Curl_easy *data)
if(!data->req.upload_done) {
Curl_bufq_reset(&data->req.sendbuf);
data->req.upload_aborted = TRUE;
/* no longer KEEP_SEND and KEEP_SEND_PAUSE */
data->req.keepon &= ~KEEP_SENDBITS;
return req_set_upload_done(data);
}
return CURLE_OK;
}
CURLcode Curl_req_stop_send_recv(struct Curl_easy *data)
{
/* stop receiving and ALL sending as well, including PAUSE and HOLD.
* We might still be paused on receive client writes though, so
* keep those bits around. */
data->req.keepon &= ~(KEEP_RECV|KEEP_SENDBITS);
return Curl_req_abort_sending(data);
}

Просмотреть файл

@ -220,10 +220,21 @@ CURLcode Curl_req_send_more(struct Curl_easy *data);
*/
bool Curl_req_want_send(struct Curl_easy *data);
/**
* TRUE iff the request has no buffered bytes yet to send.
*/
bool Curl_req_sendbuf_empty(struct Curl_easy *data);
/**
* Stop sending any more request data to the server.
* Will clear the send buffer and mark request sending as done.
*/
CURLcode Curl_req_abort_sending(struct Curl_easy *data);
/**
* Stop sending and receiving any more request data.
* Will abort sending if not done.
*/
CURLcode Curl_req_stop_send_recv(struct Curl_easy *data);
#endif /* HEADER_CURL_REQUEST_H */

Просмотреть файл

@ -313,10 +313,9 @@ static CURLcode readwrite_data(struct Curl_easy *data,
DEBUGF(infof(data, "nread == 0, stream closed, bailing"));
else
DEBUGF(infof(data, "nread <= 0, server closed connection, bailing"));
/* stop receiving and ALL sending as well, including PAUSE and HOLD.
* We might still be paused on receive client writes though, so
* keep those bits around. */
k->keepon &= ~(KEEP_RECV|KEEP_SENDBITS);
result = Curl_req_stop_send_recv(data);
if(result)
goto out;
if(k->eos_written) /* already did write this to client, leave */
break;
}
@ -352,8 +351,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
may now close the connection. If there is now any kind of sending going
on from our side, we need to stop that immediately. */
infof(data, "we are done reading and this is set to close, stop send");
k->keepon &= ~KEEP_SEND; /* no writing anymore either */
k->keepon &= ~KEEP_SEND_PAUSE; /* no pausing anymore either */
Curl_req_abort_sending(data);
}
out:
@ -368,9 +366,6 @@ out:
*/
static CURLcode readwrite_upload(struct Curl_easy *data, int *didwhat)
{
if((data->req.keepon & KEEP_SEND_PAUSE))
return CURLE_OK;
/* We should not get here when the sending is already done. It
* probably means that someone set `data-req.keepon |= KEEP_SEND`
* when it should not. */
@ -435,7 +430,7 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
else
fd_read = CURL_SOCKET_BAD;
if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
if(Curl_req_want_send(data))
fd_write = conn->writesockfd;
else
fd_write = CURL_SOCKET_BAD;
@ -467,7 +462,7 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
}
/* If we still have writing to do, we check if we have a writable socket. */
if(((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) ||
if((Curl_req_want_send(data) && (select_bits & CURL_CSELECT_OUT)) ||
(k->keepon & KEEP_SEND_TIMED)) {
/* write */
@ -1233,6 +1228,22 @@ CURLcode Curl_xfer_write_done(struct Curl_easy *data, bool premature)
return Curl_cw_out_done(data);
}
bool Curl_xfer_needs_flush(struct Curl_easy *data)
{
int sockindex;
sockindex = ((data->conn->writesockfd != CURL_SOCKET_BAD) &&
(data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]));
return Curl_conn_needs_flush(data, sockindex);
}
CURLcode Curl_xfer_flush(struct Curl_easy *data)
{
int sockindex;
sockindex = ((data->conn->writesockfd != CURL_SOCKET_BAD) &&
(data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]));
return Curl_conn_flush(data, sockindex);
}
CURLcode Curl_xfer_send(struct Curl_easy *data,
const void *buf, size_t blen, bool eos,
size_t *pnwritten)
@ -1259,6 +1270,8 @@ CURLcode Curl_xfer_send(struct Curl_easy *data,
else if(!result && *pnwritten)
data->info.request_size += *pnwritten;
DEBUGF(infof(data, "Curl_xfer_send(len=%zu) -> %d, %zu",
blen, result, *pnwritten));
return result;
}

Просмотреть файл

@ -113,6 +113,17 @@ void Curl_xfer_setup2(struct Curl_easy *data,
*/
CURLcode Curl_xfer_write_done(struct Curl_easy *data, bool premature);
/**
* Return TRUE iff transfer has pending data to send. Checks involved
* connection filters.
*/
bool Curl_xfer_needs_flush(struct Curl_easy *data);
/**
* Flush any pending send data on the transfer connection.
*/
CURLcode Curl_xfer_flush(struct Curl_easy *data);
/**
* Send data on the socket/connection filter designated
* for transfer's outgoing data.