Improved producer queue wakeups

This commit is contained in:
Magnus Edenhill 2022-04-05 12:32:47 +02:00
Родитель 940527af36
Коммит 73d9a63037
7 изменённых файлов: 395 добавлений и 79 удалений

Просмотреть файл

@ -28,6 +28,8 @@ librdkafka v1.9.0 is a feature release:
## Enhancements
* Improved producer queue scheduling. Fixes the performance regression
introduced in v1.7.0 for some produce patterns. (#3538, #2912)
* Windows: Added native Win32 IO/Queue scheduling. This removes the
internal TCP loopback connections that were previously used for timely
queue wakeups.
@ -121,7 +123,10 @@ librdkafka v1.9.0 is a feature release:
broker (added in Apache Kafka 2.8), which could cause the producer to
seemingly hang.
This error code is now correctly handled by raising a fatal error.
* The logic for enforcing that `message.timeout.ms` is greater than
* Improved producer queue wakeup scheduling. This should significantly
decrease the number of wakeups and thus syscalls for high message rate
producers. (#3538, #2912)
* The logic for enforcing that `message.timeout.ms` is greather than
an explicitly configured `linger.ms` was incorrect and instead of
erroring out early the lingering time was automatically adjusted to the
message timeout, ignoring the configured `linger.ms`.

Просмотреть файл

@ -3629,11 +3629,29 @@ rd_kafka_broker_outbufs_space(rd_kafka_broker_t *rkb) {
}
/**
* @brief Update \p *next_wakeup_ptr to \p maybe_next_wakeup if it is sooner.
*
* Both parameters are absolute timestamps.
* \p maybe_next_wakeup must not be 0.
*/
#define rd_kafka_set_next_wakeup(next_wakeup_ptr, maybe_next_wakeup) \
do { \
rd_ts_t *__n = (next_wakeup_ptr); \
rd_ts_t __m = (maybe_next_wakeup); \
rd_dassert(__m != 0); \
if (__m < *__n) \
*__n = __m; \
} while (0)
/**
* @brief Serve a toppar for producing.
*
* @param next_wakeup will be updated to when the next wake-up/attempt is
* desired, only lower (sooner) values will be set.
* desired. Does not take the current value into
* consideration, even if it is lower.
* @param do_timeout_scan perform msg timeout scan
* @param may_send if set to false there is something on the global level
* that prohibits sending messages, such as a transactional
@ -3661,6 +3679,7 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
int reqcnt;
int inflight = 0;
uint64_t epoch_base_msgid = 0;
rd_bool_t batch_ready = rd_false;
/* By limiting the number of not-yet-sent buffers (rkb_outbufs) we
* provide a backpressure mechanism to the producer loop
@ -3687,8 +3706,8 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
timeoutcnt =
rd_kafka_broker_toppar_msgq_scan(rkb, rktp, now, &next);
if (next && next < *next_wakeup)
*next_wakeup = next;
if (next)
rd_kafka_set_next_wakeup(next_wakeup, next);
if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
if (!rd_kafka_pid_valid(pid)) {
@ -3734,10 +3753,32 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
} else if (max_requests > 0) {
/* Move messages from locked partition produce queue
* to broker-local xmit queue. */
if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0)
if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0) {
rd_kafka_msgq_insert_msgq(
&rktp->rktp_xmit_msgq, &rktp->rktp_msgq,
rktp->rktp_rkt->rkt_conf.msg_order_cmp);
}
/* Calculate maximum wait-time to honour
* queue.buffering.max.ms contract.
* Unless flushing in which case immediate
* wakeups are allowed. */
batch_ready = rd_kafka_msgq_allow_wakeup_at(
&rktp->rktp_msgq, &rktp->rktp_xmit_msgq,
/* Only update the broker thread wakeup time
* if connection is up and messages can actually be
* sent, otherwise the wakeup can't do much. */
rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP ? next_wakeup
: NULL,
now, flushing ? 1 : rkb->rkb_rk->rk_conf.buffering_max_us,
/* Batch message count threshold */
rkb->rkb_rk->rk_conf.batch_num_messages,
/* Batch size threshold.
* When compression is enabled the
* threshold is increased by x8. */
(rktp->rktp_rkt->rkt_conf.compression_codec ? 1 : 8) *
(int64_t)rkb->rkb_rk->rk_conf.batch_size);
}
rd_kafka_toppar_unlock(rktp);
@ -3872,30 +3913,9 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
/* Attempt to fill the batch size, but limit our waiting
* to queue.buffering.max.ms, batch.num.messages, and batch.size. */
if (!flushing && r < rkb->rkb_rk->rk_conf.batch_num_messages &&
rktp->rktp_xmit_msgq.rkmq_msg_bytes <
(int64_t)rkb->rkb_rk->rk_conf.batch_size) {
rd_ts_t wait_max;
/* Calculate maximum wait-time to honour
* queue.buffering.max.ms contract. */
wait_max = rd_kafka_msg_enq_time(rkm) +
rkb->rkb_rk->rk_conf.buffering_max_us;
if (wait_max > now) {
/* Wait for more messages or queue.buffering.max.ms
* to expire. */
if (wait_max < *next_wakeup)
*next_wakeup = wait_max;
return 0;
}
}
/* Honour retry.backoff.ms. */
if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
if (rkm->rkm_u.producer.ts_backoff < *next_wakeup)
*next_wakeup = rkm->rkm_u.producer.ts_backoff;
/* Wait for backoff to expire */
if (!batch_ready) {
/* Wait for more messages or queue.buffering.max.ms
* to expire. */
return 0;
}
@ -3909,10 +3929,22 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
break;
}
/* If there are messages still in the queue, make the next
* wakeup immediate. */
if (rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) > 0)
*next_wakeup = now;
/* Update the allowed wake-up time based on remaining messages
* in the queue. */
if (cnt > 0) {
rd_kafka_toppar_lock(rktp);
batch_ready = rd_kafka_msgq_allow_wakeup_at(
&rktp->rktp_msgq, &rktp->rktp_xmit_msgq, next_wakeup, now,
flushing ? 1 : rkb->rkb_rk->rk_conf.buffering_max_us,
/* Batch message count threshold */
rkb->rkb_rk->rk_conf.batch_num_messages,
/* Batch size threshold.
* When compression is enabled the
* threshold is increased by x8. */
(rktp->rktp_rkt->rkt_conf.compression_codec ? 1 : 8) *
(int64_t)rkb->rkb_rk->rk_conf.batch_size);
rd_kafka_toppar_unlock(rktp);
}
return cnt;
}
@ -3923,7 +3955,7 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
* @brief Produce from all toppars assigned to this broker.
*
* @param next_wakeup is updated if the next IO/ops timeout should be
* less than the input value.
* less than the input value (i.e., sooner).
*
* @returns the total number of messages produced.
*/
@ -3972,8 +4004,7 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
rkb, rktp, pid, now, &this_next_wakeup, do_timeout_scan,
may_send, flushing);
if (this_next_wakeup < ret_next_wakeup)
ret_next_wakeup = this_next_wakeup;
rd_kafka_set_next_wakeup(&ret_next_wakeup, this_next_wakeup);
} while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp,
rktp_activelink)) !=

Просмотреть файл

@ -776,7 +776,7 @@ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt,
continue;
}
}
rd_kafka_toppar_enq_msg(rktp, rkm);
rd_kafka_toppar_enq_msg(rktp, rkm, now);
if (rd_kafka_is_transactional(rkt->rkt_rk)) {
/* Add partition to transaction */
@ -796,7 +796,7 @@ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt,
} else {
/* Single destination partition. */
rd_kafka_toppar_enq_msg(rktp, rkm);
rd_kafka_toppar_enq_msg(rktp, rkm, now);
}
rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
@ -1244,7 +1244,7 @@ int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt,
rkm->rkm_partition = partition;
/* Partition is available: enqueue msg on partition's queue */
rd_kafka_toppar_enq_msg(rktp_new, rkm);
rd_kafka_toppar_enq_msg(rktp_new, rkm, rd_clock());
if (do_lock)
rd_kafka_topic_rdunlock(rkt);
@ -1667,6 +1667,155 @@ void rd_kafka_msgbatch_ready_produce(rd_kafka_msgbatch_t *rkmb) {
}
/**
* @brief Allow queue wakeups after \p abstime, or when the
* given \p batch_msg_cnt or \p batch_msg_bytes have been reached.
*
* @param rkmq Queue to monitor and set wakeup parameters on.
* @param dest_rkmq Destination queue used to meter current queue depths
* and oldest message. May be the same as \p rkmq but is
* typically the rktp_xmit_msgq.
* @param next_wakeup If non-NULL: update the caller's next scheduler wakeup
* according to the wakeup time calculated by this function.
* @param now The current time.
* @param linger_us The configured queue linger / batching time.
* @param batch_msg_cnt Queue threshold before signalling.
* @param batch_msg_bytes Queue threshold before signalling.
*
* @returns true if the wakeup conditions are already met and messages are ready
* to be sent, else false.
*
* @locks_required rd_kafka_toppar_lock()
*
*
* Producer queue and broker thread wake-up behaviour.
*
* There are contradicting requirements at play here:
* - Latency: queued messages must be batched and sent according to
* batch size and linger.ms configuration.
* - Wakeups: keep the number of thread wake-ups to a minimum to avoid
* high CPU utilization and context switching.
*
* The message queue (rd_kafka_msgq_t) has functionality for the writer (app)
* to wake up the reader (broker thread) when there's a new message added.
* This wakeup is done thru a combination of cndvar signalling and IO writes
* to make sure a thread wakeup is triggered regardless if the broker thread
* is blocking on cnd_timedwait() or on IO poll.
* When the broker thread is woken up it will scan all the partitions it is
* the leader for to check if there are messages to be sent - all according
* to the configured batch size and linger.ms - and then decide its next
* wait time depending on the lowest remaining linger.ms setting of any
* partition with messages enqueued.
*
* This wait time must also be set as a threshold on the message queue, telling
* the writer (app) that it must not trigger a wakeup until the wait time
* has expired, or the batch sizes have been exceeded.
*
* The message queue wakeup time is per partition, while the broker thread
* wakeup time is the lowest of all its partitions' wakeup times.
*
* The per-partition wakeup constraints are calculated and set by
* rd_kafka_msgq_allow_wakeup_at() which is called from the broker thread's
* per-partition handler.
* This function is called each time there are changes to the broker-local
* partition transmit queue (rktp_xmit_msgq), such as:
* - messages are moved from the partition queue (rktp_msgq) to rktp_xmit_msgq
* - messages are moved to a ProduceRequest
* - messages are timed out from the rktp_xmit_msgq
* - the flushing state changed (rd_kafka_flush() is called or returned).
*
* If none of these things happen, the broker thread will simply read the
* last stored wakeup time for each partition and use that for calculating its
* minimum wait time.
*
*
* On the writer side, namely the application calling rd_kafka_produce(), the
* followings checks are performed to see if it may trigger a wakeup when
* it adds a new message to the partition queue:
* - the current time has reached the wakeup time (e.g., remaining linger.ms
* has expired), or
* - with the new message(s) being added, either the batch.size or
* batch.num.messages thresholds have been exceeded, or
* - the application is calling rd_kafka_flush(),
* - and no wakeup has been signalled yet. This is critical since it may take
* some time for the broker thread to do its work we'll want to avoid
* flooding it with wakeups. So a wakeup is only sent once per
* wakeup period.
*/
rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq,
const rd_kafka_msgq_t *dest_rkmq,
rd_ts_t *next_wakeup,
rd_ts_t now,
rd_ts_t linger_us,
int32_t batch_msg_cnt,
int64_t batch_msg_bytes) {
int32_t msg_cnt = rd_kafka_msgq_len(dest_rkmq);
int64_t msg_bytes = rd_kafka_msgq_size(dest_rkmq);
if (RD_KAFKA_MSGQ_EMPTY(dest_rkmq)) {
rkmq->rkmq_wakeup.on_first = rd_true;
rkmq->rkmq_wakeup.abstime = now + linger_us;
/* Leave next_wakeup untouched since the queue is empty */
msg_cnt = 0;
msg_bytes = 0;
} else {
const rd_kafka_msg_t *rkm = rd_kafka_msgq_first(dest_rkmq);
rkmq->rkmq_wakeup.on_first = rd_false;
if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
/* Honour retry.backoff.ms:
* wait for backoff to expire */
rkmq->rkmq_wakeup.abstime =
rkm->rkm_u.producer.ts_backoff;
} else {
/* Use message's produce() time + linger.ms */
rkmq->rkmq_wakeup.abstime =
rd_kafka_msg_enq_time(rkm) + linger_us;
if (rkmq->rkmq_wakeup.abstime <= now)
rkmq->rkmq_wakeup.abstime = now;
}
/* Update the caller's scheduler wakeup time */
if (next_wakeup && rkmq->rkmq_wakeup.abstime < *next_wakeup)
*next_wakeup = rkmq->rkmq_wakeup.abstime;
msg_cnt = rd_kafka_msgq_len(dest_rkmq);
msg_bytes = rd_kafka_msgq_size(dest_rkmq);
}
/*
* If there are more messages or bytes in queue than the batch limits,
* or the linger time has been exceeded,
* then there is no need for wakeup since the broker thread will
* produce those messages as quickly as it can.
*/
if (msg_cnt >= batch_msg_cnt || msg_bytes >= batch_msg_bytes ||
(msg_cnt > 0 && now >= rkmq->rkmq_wakeup.abstime)) {
/* Prevent further signalling */
rkmq->rkmq_wakeup.signalled = rd_true;
/* Batch is ready */
return rd_true;
}
/* If the current msg or byte count is less than the batch limit
* then set the rkmq count to the remaining count or size to
* reach the batch limits.
* This is for the case where the producer is waiting for more
* messages to accumulate into a batch. The wakeup should only
* occur once a threshold is reached or the abstime has expired.
*/
rkmq->rkmq_wakeup.signalled = rd_false;
rkmq->rkmq_wakeup.msg_cnt = batch_msg_cnt - msg_cnt;
rkmq->rkmq_wakeup.msg_bytes = batch_msg_bytes - msg_bytes;
return rd_false;
}
/**
* @brief Verify order (by msgid) in message queue.
* For development use only.

Просмотреть файл

@ -194,6 +194,16 @@ typedef struct rd_kafka_msgq_s {
struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
int32_t rkmq_msg_cnt;
int64_t rkmq_msg_bytes;
struct {
rd_ts_t abstime; /**< Allow wake-ups after this point in time.*/
int32_t msg_cnt; /**< Signal wake-up when this message count
* is reached. */
int64_t msg_bytes; /**< .. or when this byte count is
* reached. */
rd_bool_t on_first; /**< Wake-up on first message enqueued
* regardless of .abstime. */
rd_bool_t signalled; /**< Wake-up (already) signalled. */
} rkmq_wakeup;
} rd_kafka_msgq_t;
#define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \
@ -383,6 +393,43 @@ rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t *rkmq) {
}
rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq,
const rd_kafka_msgq_t *dest_rkmq,
rd_ts_t *next_wakeup,
rd_ts_t now,
rd_ts_t linger_us,
int32_t batch_msg_cnt,
int64_t batch_msg_bytes);
/**
* @returns true if msgq may be awoken.
*/
static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_msgq_may_wakeup(const rd_kafka_msgq_t *rkmq, rd_ts_t now) {
/* No: Wakeup already signalled */
if (rkmq->rkmq_wakeup.signalled)
return rd_false;
/* Yes: Wakeup linger time has expired */
if (now >= rkmq->rkmq_wakeup.abstime)
return rd_true;
/* Yes: First message enqueued may trigger wakeup */
if (rkmq->rkmq_msg_cnt == 1 && rkmq->rkmq_wakeup.on_first)
return rd_true;
/* Yes: batch.size or batch.num.messages exceeded */
if (rkmq->rkmq_msg_cnt >= rkmq->rkmq_wakeup.msg_cnt ||
rkmq->rkmq_msg_bytes > rkmq->rkmq_wakeup.msg_bytes)
return rd_true;
/* No */
return rd_false;
}
/**
* @brief Message ordering comparator using the message id
* number to order messages in ascending order (FIFO).

Просмотреть файл

@ -670,8 +670,9 @@ void rd_kafka_toppar_desired_del(rd_kafka_toppar_t *rktp) {
/**
* Append message at tail of 'rktp' message queue.
*/
void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
int queue_len;
void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp,
rd_kafka_msg_t *rkm,
rd_ts_t now) {
rd_kafka_q_t *wakeup_q = NULL;
rd_kafka_toppar_lock(rktp);
@ -683,18 +684,22 @@ void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
/* No need for enq_sorted(), this is the oldest message. */
queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
} else {
queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
&rktp->rktp_msgq, rkm);
rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, &rktp->rktp_msgq, rkm);
}
if (unlikely(queue_len == 1 && (wakeup_q = rktp->rktp_msgq_wakeup_q)))
if (unlikely(rktp->rktp_partition != RD_KAFKA_PARTITION_UA &&
rd_kafka_msgq_may_wakeup(&rktp->rktp_msgq, now) &&
(wakeup_q = rktp->rktp_msgq_wakeup_q))) {
/* Wake-up broker thread */
rktp->rktp_msgq.rkmq_wakeup.signalled = rd_true;
rd_kafka_q_keep(wakeup_q);
}
rd_kafka_toppar_unlock(rktp);
if (wakeup_q) {
if (unlikely(wakeup_q != NULL)) {
rd_kafka_q_yield(wakeup_q);
rd_kafka_q_destroy(wakeup_q);
}

Просмотреть файл

@ -449,7 +449,9 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt,
void rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t *rktp);
void rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t *rktp, int fetch_state);
void rd_kafka_toppar_insert_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
void rd_kafka_toppar_enq_msg(rd_kafka_toppar_t *rktp,
rd_kafka_msg_t *rkm,
rd_ts_t now);
int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq,
rd_kafka_msgq_t *srcq,
int incr_retry,

Просмотреть файл

@ -43,11 +43,14 @@ struct latconf {
char linger_ms_conf[32]; /**< Read back to show actual value */
/* Result vector */
rd_bool_t passed;
float latency[_MSG_COUNT];
float sum;
int cnt;
int wakeups;
};
static int tot_wakeups = 0;
static void
dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
@ -76,6 +79,46 @@ dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
}
/**
* @brief A stats callback to get the per-broker wakeup counts.
*
* The JSON "parsing" here is crude..
*/
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
const char *t = json;
int cnt = 0;
int total = 0;
/* Since we're only producing to one partition there will only be
* one broker, the leader, who's wakeup counts we're interested in, but
* we also want to know that other broker threads aren't spinning
* like crazy. So just summarize all the wakeups from all brokers. */
while ((t = strstr(t, "\"wakeups\":"))) {
int wakeups;
const char *next;
t += strlen("\"wakeups\":");
while (isspace((int)*t))
t++;
wakeups = strtol(t, (char **)&next, 0);
TEST_ASSERT(t != next, "No wakeup number found at \"%.*s...\"",
16, t);
total += wakeups;
cnt++;
t = next;
}
TEST_ASSERT(cnt > 0, "No brokers found in stats");
tot_wakeups = total;
return 0;
}
static int verify_latency(struct latconf *latconf) {
float avg;
int fails = 0;
@ -86,8 +129,11 @@ static int verify_latency(struct latconf *latconf) {
avg = latconf->sum / (float)latconf->cnt;
TEST_SAY("%s: average latency %.3fms, allowed range %d..%d +%.0fms\n",
latconf->name, avg, latconf->min, latconf->max, ext_overhead);
TEST_SAY(
"%s: average latency %.3fms, allowed range %d..%d +%.0fms, "
"%d wakeups\n",
latconf->name, avg, latconf->min, latconf->max, ext_overhead,
tot_wakeups);
if (avg < (float)latconf->min ||
avg > (float)latconf->max + ext_overhead) {
@ -99,6 +145,16 @@ static int verify_latency(struct latconf *latconf) {
fails++;
}
latconf->wakeups = tot_wakeups;
if (latconf->wakeups < 10 || latconf->wakeups > 1000) {
TEST_FAIL_LATER(
"%s: broker wakeups out of range: %d, "
"expected 10..1000",
latconf->name, latconf->wakeups);
fails++;
}
return fails;
}
@ -116,19 +172,24 @@ static void measure_rtt(struct latconf *latconf, rd_kafka_t *rk) {
rd_kafka_metadata_destroy(md);
}
static int test_producer_latency(const char *topic, struct latconf *latconf) {
static void test_producer_latency(const char *topic, struct latconf *latconf) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_resp_err_t err;
int i;
size_t sz;
SUB_TEST("%s (linger.ms=%d)", latconf->name);
test_conf_init(&conf, NULL, 60);
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
rd_kafka_conf_set_opaque(conf, latconf);
rd_kafka_conf_set_stats_cb(conf, stats_cb);
test_conf_set(conf, "statistics.interval.ms", "100");
tot_wakeups = 0;
TEST_SAY(_C_BLU "[%s: begin]\n" _C_CLR, latconf->name);
for (i = 0; latconf->conf[i]; i += 2) {
TEST_SAY("%s: set conf %s = %s\n", latconf->name,
latconf->conf[i], latconf->conf[i + 1]);
@ -151,8 +212,12 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) {
TEST_FAIL("%s: priming producev failed: %s", latconf->name,
rd_kafka_err2str(err));
/* Await delivery */
rd_kafka_flush(rk, tmout_multip(5000));
if (with_transactions) {
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
} else {
/* Await delivery */
rd_kafka_flush(rk, tmout_multip(5000));
}
/* Get a network+broker round-trip-time base time. */
measure_rtt(latconf, rk);
@ -160,6 +225,8 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) {
TEST_SAY("%s: producing %d messages\n", latconf->name, _MSG_COUNT);
for (i = 0; i < _MSG_COUNT; i++) {
int64_t *ts_send;
int pre_cnt = latconf->cnt;
ts_send = malloc(sizeof(*ts_send));
*ts_send = test_clock();
@ -174,12 +241,24 @@ static int test_producer_latency(const char *topic, struct latconf *latconf) {
i, rd_kafka_err2str(err));
/* Await delivery */
rd_kafka_poll(rk, 5000);
while (latconf->cnt == pre_cnt)
rd_kafka_poll(rk, 5000);
}
while (tot_wakeups == 0)
rd_kafka_poll(rk, 100); /* Get final stats_cb */
rd_kafka_destroy(rk);
return verify_latency(latconf);
if (verify_latency(latconf))
return; /* verify_latency() has already
* called TEST_FAIL_LATER() */
latconf->passed = rd_true;
SUB_TEST_PASS();
}
@ -206,33 +285,29 @@ static float find_max(const struct latconf *latconf) {
}
int main_0055_producer_latency(int argc, char **argv) {
const char *topic = test_mk_topic_name("0055_producer_latency", 1);
struct latconf latconfs[] = {
{"standard settings", {NULL}, 5, 5}, /* default is now 5ms */
{"low queue.buffering.max.ms",
{"queue.buffering.max.ms", "0", NULL},
0,
0},
{"microsecond queue.buffering.max.ms",
{"queue.buffering.max.ms", "0.001", NULL},
{"low linger.ms (0ms)", {"linger.ms", "0", NULL}, 0, 0},
{"microsecond linger.ms (0.001ms)",
{"linger.ms", "0.001", NULL},
0,
1},
{"high queue.buffering.max.ms",
{"queue.buffering.max.ms", "3000", NULL},
{"high linger.ms (3000ms)",
{"linger.ms", "3000", NULL},
3000,
3100},
{"queue.buffering.max.ms < 1000", /* internal block_max_ms */
{"queue.buffering.max.ms", "500", NULL},
{"linger.ms < 1000 (500ms)", /* internal block_max_ms */
{"linger.ms", "500", NULL},
500,
600},
{"no acks",
{"queue.buffering.max.ms", "0", "acks", "0", "enable.idempotence",
"false", NULL},
{"no acks (0ms)",
{"linger.ms", "0", "acks", "0", "enable.idempotence", "false",
NULL},
0,
0},
{NULL}};
struct latconf *latconf;
const char *topic = test_mk_topic_name("0055_producer_latency", 0);
int fails = 0;
if (test_on_ci) {
TEST_SKIP("Latency measurements not reliable on CI\n");
@ -240,24 +315,26 @@ int main_0055_producer_latency(int argc, char **argv) {
}
/* Create topic without replicas to keep broker-side latency down */
test_create_topic(NULL, topic, 4, 1);
test_create_topic(NULL, topic, 1, 1);
for (latconf = latconfs; latconf->name; latconf++)
fails += test_producer_latency(topic, latconf);
if (fails)
TEST_FAIL("See %d previous failure(s)", fails);
test_producer_latency(topic, latconf);
TEST_SAY(_C_YEL "Latency tests summary:\n" _C_CLR);
TEST_SAY("%-40s %9s %6s..%-6s %7s %9s %9s %9s\n", "Name",
TEST_SAY("%-40s %9s %6s..%-6s %7s %9s %9s %9s %8s\n", "Name",
"linger.ms", "MinExp", "MaxExp", "RTT", "Min", "Average",
"Max");
"Max", "Wakeups");
for (latconf = latconfs; latconf->name; latconf++)
TEST_SAY("%-40s %9s %6d..%-6d %7g %9g %9g %9g\n",
TEST_SAY("%-40s %9s %6d..%-6d %7g %9g %9g %9g %8d%s\n",
latconf->name, latconf->linger_ms_conf, latconf->min,
latconf->max, latconf->rtt, find_min(latconf),
latconf->sum / latconf->cnt, find_max(latconf));
latconf->sum / latconf->cnt, find_max(latconf),
latconf->wakeups,
latconf->passed ? "" : _C_RED " FAILED");
TEST_LATER_CHECK("");
return 0;
}