Attempt to fill the batch queue before sending messagesets
This increases throughput under heavy load. Controlled through properties: queue.buffering.max.ms batch.num.messages
This commit is contained in:
Родитель
a28689cff3
Коммит
12701b630a
|
@ -1941,6 +1941,24 @@ static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb) {
|
|||
&timedout,
|
||||
now);
|
||||
|
||||
if (rktp->rktp_xmit_msgq.rkmq_msg_cnt == 0)
|
||||
continue;
|
||||
|
||||
/* Attempt to fill the batch size, but limit
|
||||
* our waiting to queue.buffering.max.ms
|
||||
* and batch.num.messages. */
|
||||
if (rktp->rktp_ts_last_xmit +
|
||||
(rkb->rkb_rk->rk_conf.producer.
|
||||
buffering_max_ms * 1000) > now &&
|
||||
rktp->rktp_xmit_msgq.rkmq_msg_cnt <
|
||||
rkb->rkb_rk->rk_conf.producer.
|
||||
batch_num_messages) {
|
||||
/* Wait for more messages */
|
||||
continue;
|
||||
}
|
||||
|
||||
rktp->rktp_ts_last_xmit = now;
|
||||
|
||||
while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) {
|
||||
int r;
|
||||
r = rd_kafka_broker_send_toppar(rkb,
|
||||
|
|
|
@ -306,6 +306,8 @@ typedef struct rd_kafka_toppar_s {
|
|||
rd_kafka_msgq_t rktp_msgq; /* application->rdkafka queue.
|
||||
* protected by rktp_lock */
|
||||
rd_kafka_msgq_t rktp_xmit_msgq; /* internal broker xmit queue */
|
||||
|
||||
rd_ts_t rktp_ts_last_xmit;
|
||||
|
||||
struct {
|
||||
uint64_t tx_msgs;
|
||||
|
|
Загрузка…
Ссылка в новой задаче