Broker thread: don't block on IO if there are ops available

This commit is contained in:
Magnus Edenhill 2022-04-05 12:38:15 +02:00
Родитель 73d9a63037
Коммит be4e096579
3 изменённых файлов: 36 добавлений и 0 удалений

Просмотреть файл

@ -52,6 +52,10 @@ librdkafka v1.9.0 is a feature release:
### General fixes ### General fixes
* Fix various 1 second delays due to internal broker threads blocking on IO
even though there are events to handle.
These delays could be seen randomly in any of the non produce/consume
request APIs, such as `commit_transaction()`, `list_groups()`, etc.
* Windows: some applications would crash with an error message like * Windows: some applications would crash with an error message like
`no OPENSSL_Applink()` written to the console if `ssl.keystore.location` `no OPENSSL_Applink()` written to the console if `ssl.keystore.location`
was configured. was configured.

Просмотреть файл

@ -3417,7 +3417,13 @@ rd_kafka_broker_ops_io_serve(rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) {
* *
* The return value indicates if ops_serve() below should * The return value indicates if ops_serve() below should
* use a timeout or not. * use a timeout or not.
*
* If there are ops enqueued cut the timeout short so
* that they're processed as soon as possible.
*/ */
if (abs_timeout > 0 && rd_kafka_q_len(rkb->rkb_ops) > 0)
abs_timeout = RD_POLL_NOWAIT;
if (rd_kafka_transport_io_serve( if (rd_kafka_transport_io_serve(
rkb->rkb_transport, rkb->rkb_ops, rkb->rkb_transport, rkb->rkb_ops,
rd_timeout_remains(abs_timeout))) rd_timeout_remains(abs_timeout)))

Просмотреть файл

@ -180,6 +180,8 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) {
rd_kafka_resp_err_t err; rd_kafka_resp_err_t err;
int i; int i;
size_t sz; size_t sz;
rd_bool_t with_transactions = rd_false;
SUB_TEST("%s (linger.ms=%d)", latconf->name); SUB_TEST("%s (linger.ms=%d)", latconf->name);
test_conf_init(&conf, NULL, 60); test_conf_init(&conf, NULL, 60);
@ -194,6 +196,8 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) {
TEST_SAY("%s: set conf %s = %s\n", latconf->name, TEST_SAY("%s: set conf %s = %s\n", latconf->name,
latconf->conf[i], latconf->conf[i + 1]); latconf->conf[i], latconf->conf[i + 1]);
test_conf_set(conf, latconf->conf[i], latconf->conf[i + 1]); test_conf_set(conf, latconf->conf[i], latconf->conf[i + 1]);
if (!strcmp(latconf->conf[i], "transactional.id"))
with_transactions = rd_true;
} }
sz = sizeof(latconf->linger_ms_conf); sz = sizeof(latconf->linger_ms_conf);
@ -201,6 +205,11 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) {
rk = test_create_handle(RD_KAFKA_PRODUCER, conf); rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
if (with_transactions) {
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 10 * 1000));
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
}
TEST_SAY("%s: priming producer\n", latconf->name); TEST_SAY("%s: priming producer\n", latconf->name);
/* Send a priming message to make sure everything is up /* Send a priming message to make sure everything is up
* and functional before starting measurements */ * and functional before starting measurements */
@ -227,6 +236,8 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) {
int64_t *ts_send; int64_t *ts_send;
int pre_cnt = latconf->cnt; int pre_cnt = latconf->cnt;
if (with_transactions)
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
ts_send = malloc(sizeof(*ts_send)); ts_send = malloc(sizeof(*ts_send));
*ts_send = test_clock(); *ts_send = test_clock();
@ -244,6 +255,13 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) {
while (latconf->cnt == pre_cnt) while (latconf->cnt == pre_cnt)
rd_kafka_poll(rk, 5000); rd_kafka_poll(rk, 5000);
if (with_transactions) {
test_timing_t timing;
TIMING_START(&timing, "commit_transaction");
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
TIMING_ASSERT_LATER(&timing, 0,
(int)(latconf->rtt + 50.0));
}
} }
while (tot_wakeups == 0) while (tot_wakeups == 0)
@ -306,6 +324,14 @@ int main_0055_producer_latency(int argc, char **argv) {
NULL}, NULL},
0, 0,
0}, 0},
{"idempotence (10ms)",
{"linger.ms", "10", "enable.idempotence", "true", NULL},
10,
10},
{"transactions (35ms)",
{"linger.ms", "35", "transactional.id", topic, NULL},
35,
50 + 35 /* extra time for AddPartitions..*/},
{NULL}}; {NULL}};
struct latconf *latconf; struct latconf *latconf;