http3: improvements across backends

- ngtcp2: using bufq for recv stream data
- internal stream_ctx instead of `struct HTTP` members
  for quiche, ngtcp2 and msh3
- no more QUIC related members in `struct HTTP`
- experimental use of recvmmsg(), disabled by default
  - testing on my old debian box shows no throughput improvements.
  - leaving it in, but disabled, for future revisit
- vquic: common UDP receive code for ngtcp2 and quiche
- vquic: common UDP send code for ngtcp2 and quiche
- added pytest skips for known msh3 failures
- fix unit2601 to survive torture testing
- quiche: using latest `master` from quiche and enabling large download
  tests, now that key change is supported
- fixing test_07_21 where retry handling of starting a stream
  was faulty
- msh3: use bufq for recv buffering headers and data
- msh3: replace fprintf debug logging with LOG_CF where possible
- msh3: force QUIC expire timers on recv/send to have more than
  1 request per second served

Closes #10772
This commit is contained in:
Stefan Eissing 2023-03-30 13:00:51 +02:00 коммит произвёл Daniel Stenberg
Родитель a094ec1a85
Коммит 544abeea83
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 5CC908FDB71E12C2
14 изменённых файлов: 2008 добавлений и 1449 удалений

Просмотреть файл

@ -84,12 +84,12 @@ static size_t chunk_read(struct buf_chunk *chunk,
return n;
}
static ssize_t chunk_slurp(struct buf_chunk *chunk,
Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
{
unsigned char *p = &chunk->x.data[chunk->w_offset];
size_t n = chunk->dlen - chunk->w_offset;
size_t n = chunk->dlen - chunk->w_offset; /* free amount */
ssize_t nread;
DEBUGASSERT(chunk->dlen >= chunk->w_offset);
@ -97,6 +97,8 @@ static ssize_t chunk_slurp(struct buf_chunk *chunk,
*err = CURLE_AGAIN;
return -1;
}
if(max_len && n > max_len)
n = max_len;
nread = reader(reader_ctx, p, n, err);
if(nread > 0) {
DEBUGASSERT((size_t)nread <= n);
@ -374,6 +376,7 @@ ssize_t Curl_bufq_write(struct bufq *q,
ssize_t nwritten = 0;
size_t n;
DEBUGASSERT(q->max_chunks > 0);
while(len) {
tail = get_non_full_tail(q);
if(!tail) {
@ -536,48 +539,75 @@ out:
return nwritten;
}
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err)
{
struct buf_chunk *tail = NULL;
ssize_t nread = 0, chunk_nread;
ssize_t nread;
*err = CURLE_AGAIN;
tail = get_non_full_tail(q);
if(!tail) {
if(q->chunk_count < q->max_chunks) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
}
/* full, blocked */
*err = CURLE_AGAIN;
return -1;
}
nread = chunk_slurpn(tail, max_len, reader, reader_ctx, err);
if(nread < 0) {
return -1;
}
else if(nread == 0) {
/* eof */
*err = CURLE_OK;
}
return nread;
}
ssize_t Curl_bufq_slurpn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err)
{
ssize_t nread = 0, n;
*err = CURLE_AGAIN;
while(1) {
tail = get_non_full_tail(q);
if(!tail) {
if(q->chunk_count < q->max_chunks) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
}
else if(nread) {
/* full, return what we read */
return nread;
}
else {
/* full, blocked */
*err = CURLE_AGAIN;
return -1;
}
}
chunk_nread = chunk_slurp(tail, reader, reader_ctx, err);
if(chunk_nread < 0) {
n = Curl_bufq_sipn(q, max_len, reader, reader_ctx, err);
if(n < 0) {
if(!nread || *err != CURLE_AGAIN) {
/* blocked on first read or real error, fail */
nread = -1;
}
break;
}
else if(chunk_nread == 0) {
else if(n == 0) {
/* eof */
*err = CURLE_OK;
break;
}
nread += chunk_nread;
nread += (size_t)n;
if(max_len) {
DEBUGASSERT((size_t)n <= max_len);
max_len -= (size_t)n;
if(!max_len)
break;
}
/* give up slurping when we get less bytes than we asked for */
if(!chunk_is_full(tail))
if(q->tail && !chunk_is_full(q->tail))
break;
}
return nread;
}
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
{
return Curl_bufq_slurpn(q, 0, reader, reader_ctx, err);
}

Просмотреть файл

@ -245,6 +245,28 @@ typedef ssize_t Curl_bufq_reader(void *reader_ctx,
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err);
/**
* Read up to `max_len` bytes and append it to the end of the buffer queue.
* if `max_len` is 0, no limit is imposed and the call behaves exactly
* the same as `Curl_bufq_slurp()`.
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
* Note that even in case of a -1 chunks may have been read and
* the buffer queue will have different length than before.
*/
ssize_t Curl_bufq_slurpn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err);
/**
* Read *once* up to `max_len` bytes and append it to the buffer.
* if `max_len` is 0, no limit is imposed besides the chunk space.
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
*/
ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err);
/**
* Write buf to the end of the buffer queue.

Просмотреть файл

@ -185,10 +185,6 @@ CURLcode Curl_http_auth_act(struct Curl_easy *data);
#endif /* CURL_DISABLE_HTTP */
#ifdef USE_NGHTTP3
struct h3out; /* see ngtcp2 */
#endif
/****************************************************************************
* HTTP unique setup
***************************************************************************/
@ -216,6 +212,8 @@ struct HTTP {
HTTPSEND_BODY /* sending body */
} sending;
void *impl_ctx; /* context for actual HTTP implementation */
#ifdef USE_WEBSOCKETS
struct websocket ws;
#endif
@ -240,15 +238,11 @@ struct HTTP {
size_t push_headers_used; /* number of entries filled in */
size_t push_headers_alloc; /* number of entries allocated */
uint32_t error; /* HTTP/2 stream error code */
#endif
#if defined(USE_NGHTTP2) || defined(USE_NGHTTP3)
bool bodystarted;
int status_code; /* HTTP status code */
char *mem; /* points to a buffer in memory to store received data */
size_t len; /* size of the buffer 'mem' points to */
size_t memlen; /* size of data copied to mem */
#endif
#if defined(USE_NGHTTP2) || defined(ENABLE_QUIC)
/* fields used by both HTTP/2 and HTTP/3 */
const uint8_t *upload_mem; /* points to a buffer to read from */
size_t upload_len; /* size of the buffer 'upload_mem' points to */
@ -256,49 +250,6 @@ struct HTTP {
bool closed; /* TRUE on stream close */
bool reset; /* TRUE on stream reset */
#endif
#ifdef ENABLE_QUIC
#ifndef USE_MSH3
/*********** for HTTP/3 we store stream-local data here *************/
int64_t stream3_id; /* stream we are interested in */
uint64_t error3; /* HTTP/3 stream error code */
bool firstheader; /* FALSE until headers arrive */
bool firstbody; /* FALSE until body arrives */
bool h3req; /* FALSE until request is issued */
#endif /* !USE_MSH3 */
bool upload_done;
#endif /* ENABLE_QUIC */
#ifdef USE_NGHTTP3
size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
struct h3out *h3out; /* per-stream buffers for upload */
struct dynbuf overflow; /* excess data received during a single Curl_read */
#endif /* USE_NGHTTP3 */
#ifdef USE_MSH3
struct MSH3_REQUEST *req;
#ifdef _WIN32
CRITICAL_SECTION recv_lock;
#else /* !_WIN32 */
pthread_mutex_t recv_lock;
#endif /* _WIN32 */
/* Receive Buffer (Headers and Data) */
uint8_t* recv_buf;
size_t recv_buf_alloc;
size_t recv_buf_max;
/* Receive Headers */
size_t recv_header_len;
bool recv_header_complete;
/* Receive Data */
size_t recv_data_len;
bool recv_data_complete;
/* General Receive Error */
CURLcode recv_error;
#endif /* USE_MSH3 */
#ifdef USE_QUICHE
bool h3_got_header; /* TRUE when h3 stream has recvd some HEADER */
bool h3_recving_data; /* TRUE when h3 stream is reading DATA */
bool h3_body_pending; /* TRUE when h3 stream may have more body DATA */
struct h3_event_node *pending;
#endif /* USE_QUICHE */
};
CURLcode Curl_http_size(struct Curl_easy *data);

Просмотреть файл

@ -45,16 +45,10 @@
#include "curl_memory.h"
#include "memdebug.h"
#define DEBUG_CF 1
#if DEBUG_CF && defined(DEBUGBUILD)
#define CF_DEBUGF(x) x
#else
#define CF_DEBUGF(x) do { } while(0)
#endif
#define MSH3_REQ_INIT_BUF_LEN 16384
#define MSH3_REQ_MAX_BUF_LEN 0x100000
#define H3_STREAM_WINDOW_SIZE (128 * 1024)
#define H3_STREAM_CHUNK_SIZE (16 * 1024)
#define H3_STREAM_RECV_CHUNKS \
(H3_STREAM_WINDOW_SIZE / H3_STREAM_CHUNK_SIZE)
#ifdef _WIN32
#define msh3_lock CRITICAL_SECTION
@ -116,6 +110,7 @@ struct cf_msh3_ctx {
curl_socket_t sock[2]; /* fake socket pair until we get support in msh3 */
char l_ip[MAX_IPADR_LEN]; /* local IP as string */
int l_port; /* local port number */
struct cf_call_data call_data;
struct curltime connect_started; /* time the current attempt started */
struct curltime handshake_at; /* time connect handshake finished */
/* Flags written by msh3/msquic thread */
@ -127,6 +122,83 @@ struct cf_msh3_ctx {
BIT(active);
};
/* How to access `call_data` from a cf_msh3 filter */
#define CF_CTX_CALL_DATA(cf) \
((struct cf_msh3_ctx *)(cf)->ctx)->call_data
/**
* All about the H3 internals of a stream
*/
struct stream_ctx {
struct MSH3_REQUEST *req;
struct bufq recvbuf; /* h3 response */
#ifdef _WIN32
CRITICAL_SECTION recv_lock;
#else /* !_WIN32 */
pthread_mutex_t recv_lock;
#endif /* _WIN32 */
uint64_t error3; /* HTTP/3 stream error code */
int status_code; /* HTTP status code */
CURLcode recv_error;
bool closed;
bool reset;
bool upload_done;
bool firstheader; /* FALSE until headers arrive */
bool recv_header_complete;
};
#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->impl_ctx \
: NULL))
#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->impl_ctx
#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
H3_STREAM_CTX(d)->id : -2)
static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
if(stream)
return CURLE_OK;
stream = calloc(1, sizeof(*stream));
if(!stream)
return CURLE_OUT_OF_MEMORY;
H3_STREAM_LCTX(data) = stream;
stream->req = ZERO_NULL;
msh3_lock_initialize(&stream->recv_lock);
Curl_bufq_init2(&stream->recvbuf, H3_STREAM_CHUNK_SIZE,
H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
DEBUGF(LOG_CF(data, cf, "data setup (easy %p)", (void *)data));
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)cf;
if(stream) {
DEBUGF(LOG_CF(data, cf, "easy handle is done"));
Curl_bufq_free(&stream->recvbuf);
free(stream);
H3_STREAM_LCTX(data) = NULL;
}
}
static void notify_drain(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
(void)cf;
if(!data->state.drain) {
data->state.drain = 1;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
static const MSH3_CONNECTION_IF msh3_conn_if = {
msh3_conn_connected,
msh3_conn_shutdown_complete,
@ -136,10 +208,12 @@ static const MSH3_CONNECTION_IF msh3_conn_if = {
static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection,
void *IfContext)
{
struct cf_msh3_ctx *ctx = IfContext;
struct Curl_cfilter *cf = IfContext;
struct cf_msh3_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
(void)Connection;
if(ctx->verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] evt: connected\n"));
DEBUGF(LOG_CF(data, cf, "[MSH3] connected"));
ctx->handshake_succeeded = true;
ctx->connected = true;
ctx->handshake_complete = true;
@ -148,10 +222,12 @@ static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection,
static void MSH3_CALL msh3_conn_shutdown_complete(MSH3_CONNECTION *Connection,
void *IfContext)
{
struct cf_msh3_ctx *ctx = IfContext;
struct Curl_cfilter *cf = IfContext;
struct cf_msh3_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
(void)Connection;
if(ctx->verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] evt: shutdown complete\n"));
DEBUGF(LOG_CF(data, cf, "[MSH3] shutdown complete"));
ctx->connected = false;
ctx->handshake_complete = true;
}
@ -173,173 +249,159 @@ static const MSH3_REQUEST_IF msh3_request_if = {
msh3_data_sent
};
static CURLcode msh3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
/* Decode HTTP status code. Returns -1 if no valid status code was
decoded. (duplicate from http2.c) */
static int decode_status_code(const char *value, size_t len)
{
struct HTTP *stream = data->req.p.http;
(void)cf;
int i;
int res;
DEBUGASSERT(stream);
if(!stream->recv_buf) {
DEBUGF(LOG_CF(data, cf, "req: setup"));
stream->recv_buf = malloc(MSH3_REQ_INIT_BUF_LEN);
if(!stream->recv_buf) {
return CURLE_OUT_OF_MEMORY;
}
stream->req = ZERO_NULL;
msh3_lock_initialize(&stream->recv_lock);
stream->recv_buf_alloc = MSH3_REQ_INIT_BUF_LEN;
stream->recv_buf_max = MSH3_REQ_MAX_BUF_LEN;
stream->recv_header_len = 0;
stream->recv_header_complete = false;
stream->recv_data_len = 0;
stream->recv_data_complete = false;
stream->recv_error = CURLE_OK;
if(len != 3) {
return -1;
}
return CURLE_OK;
res = 0;
for(i = 0; i < 3; ++i) {
char c = value[i];
if(c < '0' || c > '9') {
return -1;
}
res *= 10;
res += c - '0';
}
return res;
}
/* Requires stream->recv_lock to be held */
static bool msh3request_ensure_room(struct HTTP *stream, size_t len)
/*
* write_resp_raw() copies response data in raw format to the `data`'s
* receive buffer. If not enough space is available, it appends to the
* `data`'s overflow buffer.
*/
static CURLcode write_resp_raw(struct Curl_easy *data,
const void *mem, size_t memlen)
{
uint8_t *new_recv_buf;
const size_t cur_recv_len = stream->recv_header_len + stream->recv_data_len;
struct stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
if(cur_recv_len + len > stream->recv_buf_alloc) {
size_t new_recv_buf_alloc_len = stream->recv_buf_alloc;
do {
new_recv_buf_alloc_len <<= 1; /* TODO - handle overflow */
} while(cur_recv_len + len > new_recv_buf_alloc_len);
CF_DEBUGF(fprintf(stderr, "* enlarging buffer to %zu\n",
new_recv_buf_alloc_len));
new_recv_buf = malloc(new_recv_buf_alloc_len);
if(!new_recv_buf) {
CF_DEBUGF(fprintf(stderr, "* FAILED: enlarging buffer to %zu\n",
new_recv_buf_alloc_len));
return false;
}
if(cur_recv_len) {
memcpy(new_recv_buf, stream->recv_buf, cur_recv_len);
}
stream->recv_buf_alloc = new_recv_buf_alloc_len;
free(stream->recv_buf);
stream->recv_buf = new_recv_buf;
nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
if(nwritten < 0) {
return result;
}
return true;
if((size_t)nwritten < memlen) {
/* This MUST not happen. Our recbuf is dimensioned to hold the
* full max_stream_window and then some for this very reason. */
DEBUGASSERT(0);
return CURLE_RECV_ERROR;
}
return result;
}
static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request,
void *IfContext,
const MSH3_HEADER *Header)
void *userp,
const MSH3_HEADER *hd)
{
struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http;
size_t total_len;
struct Curl_easy *data = userp;
struct stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result;
(void)Request;
if(stream->recv_header_complete) {
CF_DEBUGF(fprintf(stderr, "* ignoring header after data\n"));
return;
}
msh3_lock_acquire(&stream->recv_lock);
if((Header->NameLength == 7) &&
!strncmp(H2H3_PSEUDO_STATUS, (char *)Header->Name, 7)) {
total_len = 10 + Header->ValueLength;
if(!msh3request_ensure_room(stream, total_len)) {
CF_DEBUGF(fprintf(stderr, "* ERROR: unable to buffer: %.*s\n",
(int)Header->NameLength, Header->Name));
stream->recv_error = CURLE_OUT_OF_MEMORY;
goto release_lock;
}
msnprintf((char *)stream->recv_buf + stream->recv_header_len,
stream->recv_buf_alloc - stream->recv_header_len,
"HTTP/3 %.*s \r\n", (int)Header->ValueLength, Header->Value);
if((hd->NameLength == 7) &&
!strncmp(H2H3_PSEUDO_STATUS, (char *)hd->Name, 7)) {
char line[14]; /* status line is always 13 characters long */
size_t ncopy;
DEBUGASSERT(!stream->firstheader);
stream->status_code = decode_status_code(hd->Value, hd->ValueLength);
DEBUGASSERT(stream->status_code != -1);
ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n",
stream->status_code);
result = write_resp_raw(data, line, ncopy);
if(result)
stream->recv_error = result;
stream->firstheader = TRUE;
}
else {
total_len = 4 + Header->NameLength + Header->ValueLength;
if(!msh3request_ensure_room(stream, total_len)) {
CF_DEBUGF(fprintf(stderr, "* ERROR: unable to buffer: %.*s\n",
(int)Header->NameLength, Header->Name));
stream->recv_error = CURLE_OUT_OF_MEMORY;
goto release_lock;
/* store as an HTTP1-style header */
DEBUGASSERT(stream->firstheader);
result = write_resp_raw(data, hd->Name, hd->NameLength);
if(!result)
result = write_resp_raw(data, ": ", 2);
if(!result)
result = write_resp_raw(data, hd->Value, hd->ValueLength);
if(!result)
result = write_resp_raw(data, "\r\n", 2);
if(result) {
stream->recv_error = result;
}
msnprintf((char *)stream->recv_buf + stream->recv_header_len,
stream->recv_buf_alloc - stream->recv_header_len,
"%.*s: %.*s\r\n",
(int)Header->NameLength, Header->Name,
(int)Header->ValueLength, Header->Value);
}
stream->recv_header_len += total_len;
data->state.drain = 1;
release_lock:
msh3_lock_release(&stream->recv_lock);
}
static bool MSH3_CALL msh3_data_received(MSH3_REQUEST *Request,
void *IfContext, uint32_t *Length,
const uint8_t *Data)
void *IfContext, uint32_t *buflen,
const uint8_t *buf)
{
struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http;
size_t cur_recv_len = stream->recv_header_len + stream->recv_data_len;
struct stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result;
bool rv = FALSE;
/* TODO: we would like to limit the amount of data we are buffer here.
* There seems to be no mechanism in msh3 to adjust flow control and
* it is undocumented what happens if we return FALSE here or less
* length (buflen is an inout parameter).
*/
(void)Request;
if(data && data->set.verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] req: evt: received %u. %zu buffered, "
"%zu allocated\n",
*Length, cur_recv_len, stream->recv_buf_alloc));
/* TODO - Update this code to limit data bufferring by `stream->recv_buf_max`
and return `false` when we reach that limit. Then, when curl drains some
of the buffer, making room, call MsH3RequestSetReceiveEnabled to enable
receive callbacks again. */
msh3_lock_acquire(&stream->recv_lock);
if(!stream->recv_header_complete) {
if(data && data->set.verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] req: Headers complete!\n"));
if(!msh3request_ensure_room(stream, 2)) {
stream->recv_error = CURLE_OUT_OF_MEMORY;
goto release_lock;
result = write_resp_raw(data, "\r\n", 2);
if(result) {
stream->recv_error = result;
goto out;
}
stream->recv_buf[stream->recv_header_len++] = '\r';
stream->recv_buf[stream->recv_header_len++] = '\n';
stream->recv_header_complete = true;
cur_recv_len += 2;
}
if(!msh3request_ensure_room(stream, *Length)) {
stream->recv_error = CURLE_OUT_OF_MEMORY;
goto release_lock;
}
memcpy(stream->recv_buf + cur_recv_len, Data, *Length);
stream->recv_data_len += (size_t)*Length;
data->state.drain = 1;
release_lock:
result = write_resp_raw(data, buf, *buflen);
if(result) {
stream->recv_error = result;
}
rv = TRUE;
out:
msh3_lock_release(&stream->recv_lock);
return true;
return rv;
}
static void MSH3_CALL msh3_complete(MSH3_REQUEST *Request, void *IfContext,
bool Aborted, uint64_t AbortError)
bool aborted, uint64_t error)
{
struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)Request;
(void)AbortError;
if(data && data->set.verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] req: evt: complete, aborted=%s\n",
Aborted ? "true" : "false"));
msh3_lock_acquire(&stream->recv_lock);
if(Aborted) {
stream->recv_error = CURLE_HTTP3; /* TODO - how do we pass AbortError? */
}
stream->closed = TRUE;
stream->recv_header_complete = true;
stream->recv_data_complete = true;
if(error)
stream->error3 = error;
if(aborted)
stream->reset = TRUE;
msh3_lock_release(&stream->recv_lock);
}
@ -347,7 +409,7 @@ static void MSH3_CALL msh3_shutdown_complete(MSH3_REQUEST *Request,
void *IfContext)
{
struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)Request;
(void)stream;
}
@ -356,82 +418,121 @@ static void MSH3_CALL msh3_data_sent(MSH3_REQUEST *Request,
void *IfContext, void *SendContext)
{
struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)Request;
(void)stream;
(void)SendContext;
}
static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
CURLcode *err)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
ssize_t nread = -1;
(void)cf;
if(stream->reset) {
failf(data, "HTTP/3 stream reset by server");
*err = CURLE_PARTIAL_FILE;
DEBUGF(LOG_CF(data, cf, "cf_recv, was reset -> %d", *err));
goto out;
}
else if(stream->error3) {
failf(data, "HTTP/3 stream was not closed cleanly: (error %zd)",
(ssize_t)stream->error3);
*err = CURLE_HTTP3;
DEBUGF(LOG_CF(data, cf, "cf_recv, closed uncleanly -> %d", *err));
goto out;
}
else {
DEBUGF(LOG_CF(data, cf, "cf_recv, closed ok -> %d", *err));
}
*err = CURLE_OK;
nread = 0;
out:
data->state.drain = 0;
return nread;
}
static void set_quic_expire(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
/* we have no indication from msh3 when it would be a good time
* to juggle the connection again. So, we compromise by calling
* us again every some milliseconds. */
(void)cf;
if(stream && stream->req && !stream->closed) {
Curl_expire(data, 10, EXPIRE_QUIC);
}
else {
Curl_expire(data, 50, EXPIRE_QUIC);
}
}
static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
{
struct HTTP *stream = data->req.p.http;
size_t outsize = 0;
struct stream_ctx *stream = H3_STREAM_CTX(data);
ssize_t nread = -1;
struct cf_call_data save;
(void)cf;
CF_DATA_SAVE(save, cf, data);
DEBUGF(LOG_CF(data, cf, "req: recv with %zu byte buffer", len));
msh3_lock_acquire(&stream->recv_lock);
if(stream->recv_error) {
failf(data, "request aborted");
data->state.drain = 0;
*err = stream->recv_error;
return -1;
goto out;
}
*err = CURLE_OK;
msh3_lock_acquire(&stream->recv_lock);
if(stream->recv_header_len) {
outsize = len;
if(stream->recv_header_len < outsize) {
outsize = stream->recv_header_len;
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, err);
DEBUGF(LOG_CF(data, cf, "read recvbuf(len=%zu) -> %zd, %d",
len, nread, *err));
if(nread < 0)
goto out;
if(!Curl_bufq_is_empty(&stream->recvbuf) ||
stream->closed) {
notify_drain(cf, data);
}
memcpy(buf, stream->recv_buf, outsize);
if(outsize < stream->recv_header_len + stream->recv_data_len) {
memmove(stream->recv_buf, stream->recv_buf + outsize,
stream->recv_header_len + stream->recv_data_len - outsize);
}
stream->recv_header_len -= outsize;
DEBUGF(LOG_CF(data, cf, "req: returned %zu bytes of header", outsize));
}
else if(stream->recv_data_len) {
outsize = len;
if(stream->recv_data_len < outsize) {
outsize = stream->recv_data_len;
}
memcpy(buf, stream->recv_buf, outsize);
if(outsize < stream->recv_data_len) {
memmove(stream->recv_buf, stream->recv_buf + outsize,
stream->recv_data_len - outsize);
}
stream->recv_data_len -= outsize;
DEBUGF(LOG_CF(data, cf, "req: returned %zu bytes of data", outsize));
if(stream->recv_data_len == 0 && stream->recv_data_complete)
data->state.drain = 1;
}
else if(stream->recv_data_complete) {
DEBUGF(LOG_CF(data, cf, "req: receive complete"));
data->state.drain = 0;
else if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
goto out;
}
else {
DEBUGF(LOG_CF(data, cf, "req: nothing here, call again"));
*err = CURLE_AGAIN;
outsize = -1;
}
out:
msh3_lock_release(&stream->recv_lock);
return (ssize_t)outsize;
set_quic_expire(cf, data);
CF_DATA_RESTORE(cf, save);
return nread;
}
static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct h2h3req *hreq;
size_t hdrlen = 0;
size_t sentlen = 0;
ssize_t nwritten = -1;
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
/* Sizes must match for cast below to work" */
DEBUGASSERT(sizeof(MSH3_HEADER) == sizeof(struct h2h3pseudo));
@ -442,16 +543,11 @@ static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
data. Parse out the headers and create the request, then if there is
any data left over go ahead and send it too. */
*err = msh3_data_setup(cf, data);
if(*err) {
failf(data, "could not setup data");
return -1;
}
*err = Curl_pseudo_headers(data, buf, len, &hdrlen, &hreq);
if(*err) {
failf(data, "Curl_pseudo_headers failed");
return -1;
*err = CURLE_SEND_ERROR;
goto out;
}
DEBUGF(LOG_CF(data, cf, "req: send %zu headers", hreq->entries));
@ -463,31 +559,35 @@ static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
if(!stream->req) {
failf(data, "request open failed");
*err = CURLE_SEND_ERROR;
return -1;
goto out;
}
*err = CURLE_OK;
return len;
nwritten = len;
goto out;
}
else {
/* request is open */
DEBUGF(LOG_CF(data, cf, "req: send %zd body bytes", len));
if(len > 0xFFFFFFFF) {
len = 0xFFFFFFFF;
}
if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_NONE, buf,
(uint32_t)len, stream)) {
*err = CURLE_SEND_ERROR;
goto out;
}
/* TODO - msh3/msquic will hold onto this memory until the send complete
event. How do we make sure curl doesn't free it until then? */
*err = CURLE_OK;
nwritten = len;
}
DEBUGF(LOG_CF(data, cf, "req: send %zd body bytes", len));
if(len > 0xFFFFFFFF) {
/* msh3 doesn't support size_t sends currently. */
*err = CURLE_SEND_ERROR;
return -1;
}
/* TODO - Need an explicit signal to know when to FIN. */
if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_FIN, buf, (uint32_t)len,
stream)) {
*err = CURLE_SEND_ERROR;
return -1;
}
/* TODO - msh3/msquic will hold onto this memory until the send complete
event. How do we make sure curl doesn't free it until then? */
sentlen += len;
*err = CURLE_OK;
return sentlen;
out:
set_quic_expire(cf, data);
CF_DATA_RESTORE(cf, save);
return nwritten;
}
static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
@ -495,36 +595,49 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
curl_socket_t *socks)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
int bitmap = GETSOCK_BLANK;
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
if(stream && ctx->sock[SP_LOCAL] != CURL_SOCKET_BAD) {
socks[0] = ctx->sock[SP_LOCAL];
if(stream->recv_error) {
bitmap |= GETSOCK_READSOCK(0);
data->state.drain = 1;
notify_drain(cf, data);
}
else if(stream->recv_header_len || stream->recv_data_len) {
else if(stream->req) {
bitmap |= GETSOCK_READSOCK(0);
data->state.drain = 1;
notify_drain(cf, data);
}
}
DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d",
(uint32_t)data->state.drain, bitmap));
CF_DATA_RESTORE(cf, save);
return bitmap;
}
static bool cf_msh3_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_call_data save;
bool pending = FALSE;
CF_DATA_SAVE(save, cf, data);
(void)cf;
DEBUGF(LOG_CF((struct Curl_easy *)data, cf, "data pending = %hhu",
(bool)(stream->recv_header_len || stream->recv_data_len)));
return stream->recv_header_len || stream->recv_data_len;
if(stream->req) {
msh3_lock_acquire(&stream->recv_lock);
DEBUGF(LOG_CF((struct Curl_easy *)data, cf, "data pending = %zu",
Curl_bufq_len(&stream->recvbuf)));
pending = !Curl_bufq_is_empty(&stream->recvbuf);
msh3_lock_release(&stream->recv_lock);
}
CF_DATA_RESTORE(cf, save);
return pending;
}
static void cf_msh3_active(struct Curl_cfilter *cf, struct Curl_easy *data)
@ -548,31 +661,30 @@ static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf,
struct Curl_easy *data,
int event, int arg1, void *arg2)
{
struct HTTP *stream = data->req.p.http;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_call_data save;
CURLcode result = CURLE_OK;
CF_DATA_SAVE(save, cf, data);
(void)arg1;
(void)arg2;
switch(event) {
case CF_CTRL_DATA_SETUP:
result = msh3_data_setup(cf, data);
result = h3_data_setup(cf, data);
break;
case CF_CTRL_DATA_DONE:
DEBUGF(LOG_CF(data, cf, "req: done"));
if(stream) {
if(stream->recv_buf) {
Curl_safefree(stream->recv_buf);
msh3_lock_uninitialize(&stream->recv_lock);
}
if(stream->req) {
MsH3RequestClose(stream->req);
stream->req = ZERO_NULL;
}
}
h3_data_done(cf, data);
break;
case CF_CTRL_DATA_DONE_SEND:
DEBUGF(LOG_CF(data, cf, "req: send done"));
stream->upload_done = TRUE;
if(stream && stream->req) {
char buf[1];
if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_FIN, buf, 0, data)) {
result = CURLE_SEND_ERROR;
}
}
break;
case CF_CTRL_CONN_INFO_UPDATE:
DEBUGF(LOG_CF(data, cf, "req: update info"));
@ -581,6 +693,8 @@ static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf,
default:
break;
}
CF_DATA_RESTORE(cf, save);
return result;
}
@ -590,9 +704,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
struct cf_msh3_ctx *ctx = cf->ctx;
bool verify = !!cf->conn->ssl_config.verifypeer;
MSH3_ADDR addr = {0};
CURLcode result;
memcpy(&addr, &ctx->addr.sa_addr, ctx->addr.addrlen);
MSH3_SET_PORT(&addr, (uint16_t)cf->conn->remote_port);
ctx->verbose = (data && data->set.verbose);
if(verify && (cf->conn->ssl_config.CAfile || cf->conn->ssl_config.CApath)) {
/* TODO: need a way to provide trust anchors to MSH3 */
@ -618,7 +733,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
ctx->qconn = MsH3ConnectionOpen(ctx->api,
&msh3_conn_if,
ctx,
cf,
cf->conn->host.name,
&addr,
!verify);
@ -631,6 +746,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
return CURLE_FAILED_INIT;
}
result = h3_data_setup(cf, data);
if(result)
return result;
return CURLE_OK;
}
@ -639,6 +758,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
bool blocking, bool *done)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct cf_call_data save;
CURLcode result = CURLE_OK;
(void)blocking;
@ -647,6 +767,8 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
return CURLE_OK;
}
CF_DATA_SAVE(save, cf, data);
if(ctx->sock[SP_LOCAL] == CURL_SOCKET_BAD) {
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, &ctx->sock[0]) < 0) {
ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD;
@ -666,6 +788,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
if(ctx->handshake_complete) {
ctx->handshake_at = Curl_now();
if(ctx->handshake_succeeded) {
DEBUGF(LOG_CF(data, cf, "handshake succeeded"));
cf->conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
cf->conn->httpversion = 30;
cf->conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@ -682,26 +805,35 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
}
out:
CF_DATA_RESTORE(cf, save);
return result;
}
static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct cf_call_data save;
(void)data;
CF_DATA_SAVE(save, cf, data);
if(ctx) {
DEBUGF(LOG_CF(data, cf, "destroying"));
if(ctx->qconn)
if(ctx->qconn) {
MsH3ConnectionClose(ctx->qconn);
if(ctx->api)
ctx->qconn = NULL;
}
if(ctx->api) {
MsH3ApiClose(ctx->api);
ctx->api = NULL;
}
if(ctx->active) {
/* We share our socket at cf->conn->sock[cf->sockindex] when active.
* If it is no longer there, someone has stolen (and hopefully
* closed it) and we just forget about it.
*/
ctx->active = FALSE;
if(ctx->sock[SP_LOCAL] == cf->conn->sock[cf->sockindex]) {
DEBUGF(LOG_CF(data, cf, "cf_msh3_close(%d) active",
(int)ctx->sock[SP_LOCAL]));
@ -721,17 +853,22 @@ static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data)
if(ctx->sock[SP_REMOTE] != CURL_SOCKET_BAD) {
sclose(ctx->sock[SP_REMOTE]);
}
memset(ctx, 0, sizeof(*ctx));
ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD;
ctx->sock[SP_REMOTE] = CURL_SOCKET_BAD;
}
CF_DATA_RESTORE(cf, save);
}
static void cf_msh3_destroy(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
cf_msh3_close(cf, data);
free(cf->ctx);
cf->ctx = NULL;
/* no CF_DATA_RESTORE(cf, save); its gone */
}
static CURLcode cf_msh3_query(struct Curl_cfilter *cf,

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -22,12 +22,25 @@
*
***************************************************************************/
/* WIP, experimental: use recvmmsg() on linux
* we have no configure check, yet
* and also it is only available for _GNU_SOURCE, which
* we do not use otherwise.
#define HAVE_SENDMMSG
*/
#if defined(HAVE_SENDMMSG)
#define _GNU_SOURCE
#include <sys/socket.h>
#undef _GNU_SOURCE
#endif
#include "curl_setup.h"
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "urldata.h"
#include "bufq.h"
#include "dynbuf.h"
#include "cfilters.h"
#include "curl_log.h"
@ -51,6 +64,10 @@
#define QLOGMODE O_WRONLY|O_CREAT
#endif
#define NW_CHUNK_SIZE (64 * 1024)
#define NW_SEND_CHUNKS 2
void Curl_quic_ver(char *p, size_t len)
{
#if defined(USE_NGTCP2) && defined(USE_NGHTTP3)
@ -62,17 +79,10 @@ void Curl_quic_ver(char *p, size_t len)
#endif
}
CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen)
CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx)
{
qctx->num_blocked_pkt = 0;
qctx->num_blocked_pkt_sent = 0;
memset(&qctx->blocked_pkt, 0, sizeof(qctx->blocked_pkt));
qctx->pktbuflen = pktbuflen;
qctx->pktbuf = malloc(qctx->pktbuflen);
if(!qctx->pktbuf)
return CURLE_OUT_OF_MEMORY;
Curl_bufq_init2(&qctx->sendbuf, NW_CHUNK_SIZE, NW_SEND_CHUNKS,
BUFQ_OPT_SOFT_LIMIT);
#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG)
qctx->no_gso = FALSE;
#else
@ -84,8 +94,7 @@ CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen)
void vquic_ctx_free(struct cf_quic_ctx *qctx)
{
free(qctx->pktbuf);
qctx->pktbuf = NULL;
Curl_bufq_free(&qctx->sendbuf);
}
static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
@ -215,11 +224,11 @@ static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
return CURLE_OK;
}
CURLcode vquic_send_packet(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen,
size_t *psent)
CURLcode vquic_send_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen,
size_t *psent)
{
if(qctx->no_gso && pktlen > gsolen) {
return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent);
@ -228,53 +237,271 @@ CURLcode vquic_send_packet(struct Curl_cfilter *cf,
return do_sendmsg(cf, data, qctx, pkt, pktlen, gsolen, psent);
}
void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen)
CURLcode vquic_flush(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx)
{
struct vquic_blocked_pkt *blkpkt;
const unsigned char *buf;
size_t blen, sent;
CURLcode result;
size_t gsolen;
(void)cf;
assert(qctx->num_blocked_pkt <
sizeof(qctx->blocked_pkt) / sizeof(qctx->blocked_pkt[0]));
while(Curl_bufq_peek(&qctx->sendbuf, &buf, &blen)) {
gsolen = qctx->gsolen;
if(qctx->split_len) {
gsolen = qctx->split_gsolen;
if(blen > qctx->split_len)
blen = qctx->split_len;
}
blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt++];
blkpkt->pkt = pkt;
blkpkt->pktlen = pktlen;
blkpkt->gsolen = gsolen;
DEBUGF(LOG_CF(data, cf, "vquic_send(len=%zu, gso=%zu)",
blen, gsolen));
result = vquic_send_packets(cf, data, qctx, buf, blen, gsolen, &sent);
DEBUGF(LOG_CF(data, cf, "vquic_send(len=%zu, gso=%zu) -> %d, sent=%zu",
blen, gsolen, result, sent));
if(result) {
if(result == CURLE_AGAIN) {
Curl_bufq_skip(&qctx->sendbuf, sent);
if(qctx->split_len)
qctx->split_len -= sent;
}
return result;
}
Curl_bufq_skip(&qctx->sendbuf, sent);
if(qctx->split_len)
qctx->split_len -= sent;
}
return CURLE_OK;
}
CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx)
CURLcode vquic_send(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx, size_t gsolen)
{
size_t sent;
CURLcode curlcode;
struct vquic_blocked_pkt *blkpkt;
qctx->gsolen = gsolen;
return vquic_flush(cf, data, qctx);
}
(void)cf;
for(; qctx->num_blocked_pkt_sent < qctx->num_blocked_pkt;
++qctx->num_blocked_pkt_sent) {
blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt_sent];
curlcode = vquic_send_packet(cf, data, qctx, blkpkt->pkt,
blkpkt->pktlen, blkpkt->gsolen, &sent);
CURLcode vquic_send_tail_split(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx, size_t gsolen,
size_t tail_len, size_t tail_gsolen)
{
DEBUGASSERT(Curl_bufq_len(&qctx->sendbuf) > tail_len);
qctx->split_len = Curl_bufq_len(&qctx->sendbuf) - tail_len;
qctx->split_gsolen = gsolen;
qctx->gsolen = tail_gsolen;
DEBUGF(LOG_CF(data, cf, "vquic_send_tail_split: [%zu gso=%zu][%zu gso=%zu]",
qctx->split_len, qctx->split_gsolen,
tail_len, qctx->gsolen));
return vquic_flush(cf, data, qctx);
}
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
blkpkt->pkt += sent;
blkpkt->pktlen -= sent;
#ifdef HAVE_SENDMMSG
static CURLcode recvmmsg_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
#define MMSG_NUM 64
struct iovec msg_iov[MMSG_NUM];
struct mmsghdr mmsg[MMSG_NUM];
uint8_t bufs[MMSG_NUM][2*1024];
struct sockaddr_storage remote_addr[MMSG_NUM];
size_t total_nread, pkts;
int mcount, i, n;
CURLcode result = CURLE_OK;
DEBUGASSERT(max_pkts > 0);
pkts = 0;
total_nread = 0;
while(pkts < max_pkts) {
n = (int)CURLMIN(MMSG_NUM, max_pkts);
memset(&mmsg, 0, sizeof(mmsg));
for(i = 0; i < n; ++i) {
msg_iov[i].iov_base = bufs[i];
msg_iov[i].iov_len = (int)sizeof(bufs[i]);
mmsg[i].msg_hdr.msg_iov = &msg_iov[i];
mmsg[i].msg_hdr.msg_iovlen = 1;
mmsg[i].msg_hdr.msg_name = &remote_addr[i];
mmsg[i].msg_hdr.msg_namelen = sizeof(remote_addr[i]);
}
while((mcount = recvmmsg(qctx->sockfd, mmsg, n, 0, NULL)) == -1 &&
SOCKERRNO == EINTR)
;
if(mcount == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
DEBUGF(LOG_CF(data, cf, "ingress, recvmmsg -> EAGAIN"));
goto out;
}
return curlcode;
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
&r_ip, &r_port, NULL, NULL);
failf(data, "QUIC: connection to %s port %u refused",
r_ip, r_port);
result = CURLE_COULDNT_CONNECT;
goto out;
}
failf(data, "QUIC: recvmsg() unexpectedly returned %d (errno=%d)",
mcount, SOCKERRNO);
result = CURLE_RECV_ERROR;
goto out;
}
DEBUGF(LOG_CF(data, cf, "recvmmsg() -> %d packets", mcount));
pkts += mcount;
for(i = 0; i < mcount; ++i) {
total_nread += mmsg[i].msg_len;
result = recv_cb(bufs[i], mmsg[i].msg_len,
mmsg[i].msg_hdr.msg_name, mmsg[i].msg_hdr.msg_namelen,
0, userp);
if(result)
goto out;
}
}
qctx->num_blocked_pkt = 0;
qctx->num_blocked_pkt_sent = 0;
out:
DEBUGF(LOG_CF(data, cf, "recvd %zu packets with %zd bytes -> %d",
pkts, total_nread, result));
return result;
}
return CURLE_OK;
#elif defined(HAVE_SENDMSG)
static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
struct iovec msg_iov;
struct msghdr msg;
uint8_t buf[64*1024];
struct sockaddr_storage remote_addr;
size_t total_nread, pkts;
ssize_t nread;
CURLcode result = CURLE_OK;
msg_iov.iov_base = buf;
msg_iov.iov_len = (int)sizeof(buf);
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &msg_iov;
msg.msg_iovlen = 1;
DEBUGASSERT(max_pkts > 0);
for(pkts = 0, total_nread = 0; pkts < max_pkts;) {
msg.msg_name = &remote_addr;
msg.msg_namelen = sizeof(remote_addr);
while((nread = recvmsg(qctx->sockfd, &msg, 0)) == -1 &&
SOCKERRNO == EINTR)
;
if(nread == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN"));
goto out;
}
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
&r_ip, &r_port, NULL, NULL);
failf(data, "QUIC: connection to %s port %u refused",
r_ip, r_port);
result = CURLE_COULDNT_CONNECT;
goto out;
}
failf(data, "QUIC: recvmsg() unexpectedly returned %zd (errno=%d)",
nread, SOCKERRNO);
result = CURLE_RECV_ERROR;
goto out;
}
++pkts;
total_nread += (size_t)nread;
result = recv_cb(buf, (size_t)nread, msg.msg_name, msg.msg_namelen,
0, userp);
if(result)
goto out;
}
out:
DEBUGF(LOG_CF(data, cf, "recvd %zu packets with %zd bytes -> %d",
pkts, total_nread, result));
return result;
}
#else /* HAVE_SENDMMSG || HAVE_SENDMSG */
CURLcode recvfrom_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
uint8_t buf[64*1024];
int bufsize = (int)sizeof(buf);
struct sockaddr_storage remote_addr;
socklen_t remote_addrlen = sizeof(remote_addr);
size_t total_nread, pkts;
ssize_t nread;
CURLcode result = CURLE_OK;
DEBUGASSERT(max_pkts > 0);
for(pkts = 0, total_nread = 0; pkts < max_pkts;) {
while((nread = recvfrom(qctx->sockfd, (char *)buf, bufsize, 0,
(struct sockaddr *)&remote_addr,
&remote_addrlen)) == -1 &&
SOCKERRNO == EINTR)
;
if(nread == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
DEBUGF(LOG_CF(data, cf, "ingress, recvfrom -> EAGAIN"));
goto out;
}
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
&r_ip, &r_port, NULL, NULL);
failf(data, "QUIC: connection to %s port %u refused",
r_ip, r_port);
result = CURLE_COULDNT_CONNECT;
goto out;
}
failf(data, "QUIC: recvfrom() unexpectedly returned %zd (errno=%d)",
nread, SOCKERRNO);
result = CURLE_RECV_ERROR;
goto out;
}
++pkts;
total_nread += (size_t)nread;
result = recv_cb(buf, (size_t)nread, &remote_addr, remote_addrlen,
0, userp);
if(result)
goto out;
}
out:
DEBUGF(LOG_CF(data, cf, "recvd %zu packets with %zd bytes -> %d",
pkts, total_nread, result));
return result;
}
#endif /* !HAVE_SENDMMSG && !HAVE_SENDMSG */
CURLcode vquic_recv_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
#if defined(HAVE_SENDMMSG)
return recvmmsg_packets(cf, data, qctx, max_pkts, recv_cb, userp);
#elif defined(HAVE_SENDMSG)
return recvmsg_packets(cf, data, qctx, max_pkts, recv_cb, userp);
#else
return recvfrom_packets(cf, data, qctx, max_pkts, recv_cb, userp);
#endif
}
/*

Просмотреть файл

@ -25,47 +25,63 @@
***************************************************************************/
#include "curl_setup.h"
#include "bufq.h"
#ifdef ENABLE_QUIC
struct vquic_blocked_pkt {
const uint8_t *pkt;
size_t pktlen;
size_t gsolen;
};
#define MAX_PKT_BURST 10
#define MAX_UDP_PAYLOAD_SIZE 1452
struct cf_quic_ctx {
curl_socket_t sockfd;
struct sockaddr_storage local_addr;
socklen_t local_addrlen;
struct vquic_blocked_pkt blocked_pkt[2];
uint8_t *pktbuf;
/* the number of entries in blocked_pkt */
size_t num_blocked_pkt;
size_t num_blocked_pkt_sent;
/* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */
size_t pktbuflen;
/* the number of processed entries in blocked_pkt */
bool no_gso;
curl_socket_t sockfd; /* connected UDP socket */
struct sockaddr_storage local_addr; /* address socket is bound to */
socklen_t local_addrlen; /* length of local address */
struct bufq sendbuf; /* buffer for sending one or more packets */
size_t gsolen; /* length of individual packets in send buf */
size_t split_len; /* if != 0, buffer length after which GSO differs */
size_t split_gsolen; /* length of individual packets after split_len */
bool no_gso; /* do not use gso on sending */
};
CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen);
CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx);
void vquic_ctx_free(struct cf_quic_ctx *qctx);
CURLcode vquic_send_packet(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen,
size_t *psent);
CURLcode vquic_send_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen,
size_t *psent);
void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen);
CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx);
CURLcode vquic_send_blocked_pkts(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx);
CURLcode vquic_send(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx, size_t gsolen);
CURLcode vquic_send_tail_split(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx, size_t gsolen,
size_t tail_len, size_t tail_gsolen);
CURLcode vquic_flush(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx);
typedef CURLcode vquic_recv_pkt_cb(const unsigned char *pkt, size_t pktlen,
struct sockaddr_storage *remote_addr,
socklen_t remote_addrlen, int ecn,
void *userp);
CURLcode vquic_recv_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp);
#endif /* !ENABLE_QUIC */

Просмотреть файл

@ -114,6 +114,8 @@ class TestDownload:
httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 shaky here")
curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-499]'
r = curl.http_download(urls=[urln], alpn_proto=proto)
@ -223,6 +225,8 @@ class TestDownload:
httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 20
urln = f'https://{env.authority_for(env.domain1, proto)}/data-10m?[0-{count-1}]'
curl = CurlClient(env=env)

Просмотреть файл

@ -81,6 +81,8 @@ class TestGoAway:
@pytest.mark.skipif(condition=not Env.have_h3(), reason="h3 not supported")
def test_03_02_h3_goaway(self, env: Env, httpd, nghttpx, repeat):
proto = 'h3'
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 3
self.r = None
def long_run():

Просмотреть файл

@ -52,6 +52,8 @@ class TestErrors:
proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 1
curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}' \
@ -73,8 +75,8 @@ class TestErrors:
proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('quiche'):
pytest.skip("quiche not reliable, sometimes reports success")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 20
curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}' \

Просмотреть файл

@ -52,6 +52,8 @@ class TestUpload:
def test_07_01_upload_1_small(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
data = '0123456789'
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
@ -66,6 +68,8 @@ class TestUpload:
def test_07_02_upload_1_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
fdata = os.path.join(env.gen_dir, 'data-100k')
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
@ -81,6 +85,8 @@ class TestUpload:
def test_07_10_upload_sequential(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 50
data = '0123456789'
curl = CurlClient(env=env)
@ -97,6 +103,8 @@ class TestUpload:
def test_07_11_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
# limit since we use a separate connection in h1
count = 50
data = '0123456789'
@ -115,6 +123,8 @@ class TestUpload:
def test_07_20_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
fdata = os.path.join(env.gen_dir, 'data-100k')
count = 50
curl = CurlClient(env=env)
@ -133,6 +143,8 @@ class TestUpload:
def test_07_12_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
fdata = os.path.join(env.gen_dir, 'data-10m')
count = 2
curl = CurlClient(env=env)
@ -151,6 +163,8 @@ class TestUpload:
def test_07_20_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
# limit since we use a separate connection in h1
count = 50
data = '0123456789'
@ -169,8 +183,8 @@ class TestUpload:
def test_07_21_upload_parallel_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('quiche'):
pytest.skip("quiche stalls on parallel, large uploads, unless --trace is used???")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
fdata = os.path.join(env.gen_dir, 'data-100k')
# limit since we use a separate connection in h1
count = 50
@ -187,6 +201,8 @@ class TestUpload:
def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
fdata = os.path.join(env.gen_dir, 'data-100k')
count = 1
curl = CurlClient(env=env)
@ -206,6 +222,8 @@ class TestUpload:
def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
fdata = os.path.join(env.gen_dir, 'data-10m')
count = 1
curl = CurlClient(env=env)

Просмотреть файл

@ -57,6 +57,7 @@ class TestCaddy:
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env, caddy):
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data1.data', fsize=1024*1024)
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data5.data', fsize=5*1024*1024)
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data10.data', fsize=10*1024*1024)
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data100.data', fsize=100*1024*1024)
@ -65,6 +66,8 @@ class TestCaddy:
def test_08_01_download_1(self, env: Env, caddy: Caddy, repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{caddy.port}/data.json'
r = curl.http_download(urls=[url], alpn_proto=proto)
@ -77,6 +80,8 @@ class TestCaddy:
repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 50
curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
@ -92,7 +97,9 @@ class TestCaddy:
repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
count = 50
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 20
curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
@ -106,14 +113,31 @@ class TestCaddy:
else:
assert r.total_connects == 1
# download 10MB files sequentially
# download 5MB files sequentially
@pytest.mark.parametrize("proto", ['h2', 'h3'])
def test_08_04_download_10mb_sequential(self, env: Env, caddy: Caddy,
def test_08_04a_download_10mb_sequential(self, env: Env, caddy: Caddy,
repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('quiche'):
pytest.skip("quiche stalls after a certain amount of data")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 40
curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data5.data?[0-{count-1}]'
r = curl.http_download(urls=[urln], alpn_proto=proto)
assert r.exit_code == 0
r.check_stats(count=count, exp_status=200)
# sequential transfers will open 1 connection
assert r.total_connects == 1
# download 10MB files sequentially
@pytest.mark.parametrize("proto", ['h2', 'h3'])
def test_08_04b_download_10mb_sequential(self, env: Env, caddy: Caddy,
repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 20
curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'
@ -129,8 +153,8 @@ class TestCaddy:
repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('quiche'):
pytest.skip("quiche stalls after a certain amount of data")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 50
curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'

Просмотреть файл

@ -243,4 +243,5 @@ UNITTEST_START
check_bufq(8, 8000, 10, 1234, 1234, BUFQ_OPT_NONE);
check_bufq(8, 1024, 4, 129, 127, BUFQ_OPT_NO_SPARES);
return 0;
UNITTEST_STOP