- write response data directly to the transfer via
 `Curl_xfer_write_resp()` like we do in HTTP/2.

Closes #13073
This commit is contained in:
Stefan Eissing 2024-03-07 09:23:11 +01:00 коммит произвёл Daniel Stenberg
Родитель a89be3cdff
Коммит 8a9fbd6291
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 5CC908FDB71E12C2
1 изменённых файлов: 38 добавлений и 128 удалений

Просмотреть файл

@ -58,6 +58,7 @@
#include "http1.h"
#include "select.h"
#include "inet_pton.h"
#include "transfer.h"
#include "vquic.h"
#include "vquic_int.h"
#include "vquic-tls.h"
@ -145,11 +146,9 @@ struct cf_ngtcp2_ctx {
struct h3_stream_ctx {
int64_t id; /* HTTP/3 protocol identifier */
struct bufq sendbuf; /* h3 request body */
struct bufq recvbuf; /* h3 response body */
struct h1_req_parser h1; /* h1 request parsing */
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
size_t upload_blocked_len; /* the amount written last and EGAINed */
size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
int status_code; /* HTTP status code */
@ -190,11 +189,6 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
Curl_bufq_initp(&stream->sendbuf, &ctx->stream_bufcp,
H3_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
stream->sendbuf_len_in_flight = 0;
/* on recv, we need a flexible buffer limit since we also write
* headers to it that are not counted against the nghttp3 flow limits. */
Curl_bufq_initp(&stream->recvbuf, &ctx->stream_bufcp,
H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
stream->recv_buf_nonflow = 0;
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
H3_STREAM_LCTX(data) = stream;
@ -219,7 +213,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
}
Curl_bufq_free(&stream->sendbuf);
Curl_bufq_free(&stream->recvbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
H3_STREAM_LCTX(data) = NULL;
@ -387,36 +380,6 @@ static int cb_handshake_completed(ngtcp2_conn *tconn, void *user_data)
return 0;
}
static void report_consumed_data(struct Curl_cfilter *cf,
struct Curl_easy *data,
size_t consumed)
{
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_ngtcp2_ctx *ctx = cf->ctx;
if(!stream)
return;
/* the HTTP/1.1 response headers are written to the buffer, but
* consuming those does not count against flow control. */
if(stream->recv_buf_nonflow) {
if(consumed >= stream->recv_buf_nonflow) {
consumed -= stream->recv_buf_nonflow;
stream->recv_buf_nonflow = 0;
}
else {
stream->recv_buf_nonflow -= consumed;
consumed = 0;
}
}
if(consumed > 0) {
CURL_TRC_CF(data, cf, "[%" PRId64 "] ACK %zu bytes of DATA",
stream->id, consumed);
ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id,
consumed);
ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
}
}
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
int64_t stream_id, uint64_t offset,
const uint8_t *buf, size_t buflen,
@ -796,49 +759,23 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
return 0;
}
/*
* write_resp_raw() copies response data in raw format to the `data`'s
* receive buffer. If not enough space is available, it appends to the
* `data`'s overflow buffer.
*/
static CURLcode write_resp_raw(struct Curl_cfilter *cf,
struct Curl_easy *data,
const void *mem, size_t memlen,
bool flow)
static CURLcode write_resp_hds(struct Curl_easy *data,
const char *buf, size_t blen)
{
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
(void)cf;
if(!stream) {
return CURLE_RECV_ERROR;
}
nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
if(nwritten < 0) {
return result;
}
if(!flow)
stream->recv_buf_nonflow += (size_t)nwritten;
if((size_t)nwritten < memlen) {
/* This MUST not happen. Our recbuf is dimensioned to hold the
* full max_stream_window and then some for this very reason. */
DEBUGASSERT(0);
return CURLE_RECV_ERROR;
}
return result;
bool done;
return Curl_xfer_write_resp(data, (char *)buf, blen, FALSE, &done);
}
static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
const uint8_t *buf, size_t buflen,
const uint8_t *buf, size_t blen,
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result;
bool done;
(void)conn;
(void)stream3_id;
@ -846,14 +783,19 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
if(!stream)
return NGHTTP3_ERR_CALLBACK_FAILURE;
result = write_resp_raw(cf, data, buf, buflen, TRUE);
result = Curl_xfer_write_resp(data, (char *)buf, blen, FALSE, &done);
if(result) {
CURL_TRC_CF(data, cf, "[%" PRId64 "] DATA len=%zu, ERROR receiving %d",
stream->id, buflen, result);
stream->id, blen, result);
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
CURL_TRC_CF(data, cf, "[%" PRId64 "] DATA len=%zu", stream->id, buflen);
h3_drain_stream(cf, data);
if(blen) {
CURL_TRC_CF(data, cf, "[%" PRId64 "] ACK %zu bytes of DATA",
stream->id, blen);
ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, blen);
ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
}
CURL_TRC_CF(data, cf, "[%" PRId64 "] DATA len=%zu", stream->id, blen);
return 0;
}
@ -888,7 +830,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
if(!stream)
return 0;
/* add a CRLF only if we've received some headers */
result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
result = write_resp_hds(data, "\r\n", 2);
if(result) {
return -1;
}
@ -934,7 +876,7 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t stream_id,
ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n",
stream->status_code);
CURL_TRC_CF(data, cf, "[%" PRId64 "] status: %s", stream_id, line);
result = write_resp_raw(cf, data, line, ncopy, FALSE);
result = write_resp_hds(data, line, ncopy);
if(result) {
return -1;
}
@ -944,19 +886,19 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t stream_id,
CURL_TRC_CF(data, cf, "[%" PRId64 "] header: %.*s: %.*s",
stream_id, (int)h3name.len, h3name.base,
(int)h3val.len, h3val.base);
result = write_resp_raw(cf, data, h3name.base, h3name.len, FALSE);
result = write_resp_hds(data, (const char *)h3name.base, h3name.len);
if(result) {
return -1;
}
result = write_resp_raw(cf, data, ": ", 2, FALSE);
result = write_resp_hds(data, ": ", 2);
if(result) {
return -1;
}
result = write_resp_raw(cf, data, h3val.base, h3val.len, FALSE);
result = write_resp_hds(data, (const char *)h3val.base, h3val.len);
if(result) {
return -1;
}
result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
result = write_resp_hds(data, "\r\n", 2);
if(result) {
return -1;
}
@ -1112,7 +1054,7 @@ out:
/* incoming data frames on the h3 stream */
static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
char *buf, size_t blen, CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
@ -1121,6 +1063,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
struct pkt_io_ctx pktx;
(void)ctx;
(void)buf;
CF_DATA_SAVE(save, cf, data);
DEBUGASSERT(cf->connected);
@ -1136,46 +1079,18 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, err);
if(nread < 0) {
CURL_TRC_CF(data, cf, "[%" PRId64 "] read recvbuf(len=%zu) "
"-> %zd, %d", stream->id, len, nread, *err);
goto out;
}
report_consumed_data(cf, data, nread);
}
if(cf_progress_ingress(cf, data, &pktx)) {
*err = CURLE_RECV_ERROR;
nread = -1;
goto out;
}
/* recvbuf had nothing before, maybe after progressing ingress? */
if(nread < 0 && !Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, err);
if(nread < 0) {
CURL_TRC_CF(data, cf, "[%" PRId64 "] read recvbuf(len=%zu) "
"-> %zd, %d", stream->id, len, nread, *err);
goto out;
}
report_consumed_data(cf, data, nread);
}
if(nread > 0) {
h3_drain_stream(cf, data);
}
else {
if(stream->closed) {
nread = recv_closed_stream(cf, data, stream, err);
goto out;
}
*err = CURLE_AGAIN;
nread = -1;
if(stream->closed) {
nread = recv_closed_stream(cf, data, stream, err);
goto out;
}
*err = CURLE_AGAIN;
nread = -1;
out:
if(cf_progress_egress(cf, data, &pktx)) {
@ -1189,8 +1104,8 @@ out:
nread = -1;
}
}
CURL_TRC_CF(data, cf, "[%" PRId64 "] cf_recv(len=%zu) -> %zd, %d",
stream? stream->id : -1, len, nread, *err);
CURL_TRC_CF(data, cf, "[%" PRId64 "] cf_recv(blen=%zu) -> %zd, %d",
stream? stream->id : -1, blen, nread, *err);
CF_DATA_RESTORE(cf, save);
return nread;
}
@ -1593,7 +1508,6 @@ static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct pkt_io_ctx local_pktx;
size_t pkts_chunk = 128, i;
size_t pkts_max = 10 * pkts_chunk;
CURLcode result = CURLE_OK;
if(!pktx) {
@ -1608,17 +1522,13 @@ static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
if(result)
return result;
for(i = 0; i < pkts_max; i += pkts_chunk) {
for(i = 0; i < 4; ++i) {
if(i)
pktx_update_time(pktx, cf);
pktx->pkt_count = 0;
result = vquic_recv_packets(cf, data, &ctx->q, pkts_chunk,
recv_pkt, pktx);
if(result) /* error */
break;
if(pktx->pkt_count < pkts_chunk) /* got less than we could */
break;
/* give egress a chance before we receive more */
result = cf_progress_egress(cf, data, pktx);
if(result) /* error */
if(result || !pktx->pkt_count) /* error or got nothing */
break;
}
return result;
@ -1857,9 +1767,9 @@ out:
static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
const struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
(void)cf;
return stream && !Curl_bufq_is_empty(&stream->recvbuf);
(void)data;
return FALSE;
}
static CURLcode h3_data_pause(struct Curl_cfilter *cf,