diff --git a/CHANGELOG.md b/CHANGELOG.md index 759d49b1..c9662399 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,10 @@ librdkafka v1.9.0 is a feature release: ### General fixes + * Fix various 1 second delays due to internal broker threads blocking on IO + even though there are events to handle. + These delays could be seen randomly in any of the non produce/consume + request APIs, such as `commit_transaction()`, `list_groups()`, etc. * Windows: some applications would crash with an error message like `no OPENSSL_Applink()` written to the console if `ssl.keystore.location` was configured. diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index c8335e8a..f24f6d95 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3417,7 +3417,13 @@ rd_kafka_broker_ops_io_serve(rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) { * * The return value indicates if ops_serve() below should * use a timeout or not. + * + * If there are ops enqueued cut the timeout short so + * that they're processed as soon as possible. */ + if (abs_timeout > 0 && rd_kafka_q_len(rkb->rkb_ops) > 0) + abs_timeout = RD_POLL_NOWAIT; + if (rd_kafka_transport_io_serve( rkb->rkb_transport, rkb->rkb_ops, rd_timeout_remains(abs_timeout))) diff --git a/tests/0055-producer_latency.c b/tests/0055-producer_latency.c index 5a850ff2..e0244cec 100644 --- a/tests/0055-producer_latency.c +++ b/tests/0055-producer_latency.c @@ -180,6 +180,8 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) { rd_kafka_resp_err_t err; int i; size_t sz; + rd_bool_t with_transactions = rd_false; + SUB_TEST("%s (linger.ms=%d)", latconf->name); test_conf_init(&conf, NULL, 60); @@ -194,6 +196,8 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) { TEST_SAY("%s: set conf %s = %s\n", latconf->name, latconf->conf[i], latconf->conf[i + 1]); test_conf_set(conf, latconf->conf[i], latconf->conf[i + 1]); + if (!strcmp(latconf->conf[i], "transactional.id")) + with_transactions = rd_true; } sz = sizeof(latconf->linger_ms_conf); @@ -201,6 +205,11 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) { rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + if (with_transactions) { + TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 10 * 1000)); + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + } + TEST_SAY("%s: priming producer\n", latconf->name); /* Send a priming message to make sure everything is up * and functional before starting measurements */ @@ -227,6 +236,8 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) { int64_t *ts_send; int pre_cnt = latconf->cnt; + if (with_transactions) + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); ts_send = malloc(sizeof(*ts_send)); *ts_send = test_clock(); @@ -244,6 +255,13 @@ static void test_producer_latency(const char *topic, struct latconf *latconf) { while (latconf->cnt == pre_cnt) rd_kafka_poll(rk, 5000); + if (with_transactions) { + test_timing_t timing; + TIMING_START(&timing, "commit_transaction"); + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + TIMING_ASSERT_LATER(&timing, 0, + (int)(latconf->rtt + 50.0)); + } } while (tot_wakeups == 0) @@ -306,6 +324,14 @@ int main_0055_producer_latency(int argc, char **argv) { NULL}, 0, 0}, + {"idempotence (10ms)", + {"linger.ms", "10", "enable.idempotence", "true", NULL}, + 10, + 10}, + {"transactions (35ms)", + {"linger.ms", "35", "transactional.id", topic, NULL}, + 35, + 50 + 35 /* extra time for AddPartitions..*/}, {NULL}}; struct latconf *latconf;