Fix race-condition for rktp without assigned broker in purge()

This commit is contained in:
Magnus Edenhill 2021-03-22 15:30:43 +01:00
Родитель 66bed02625
Коммит fc097194d9
13 изменённых файлов: 129 добавлений и 47 удалений

Просмотреть файл

@ -52,6 +52,10 @@ librdkafka v1.7.0 is feature release:
* The timeout value of `flush()` was not respected when delivery reports
were scheduled as events (such as for confluent-kafka-go) rather than
callbacks.
* There was a race conditition in `purge()` which could cause newly
created partition objects, or partitions that were changing leaders, to
not have their message queues purged. This could cause
`abort_transaction()` to time out. This issue is now fixed.
### Transactional Producer fixes

Просмотреть файл

@ -4235,6 +4235,47 @@ rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
}
}
/**
* @brief Purge the partition message queue (according to \p purge_flags) for
* all toppars.
*
* This is a necessity to avoid the race condition when a purge() is scheduled
* shortly in-between an rktp has been created but before it has been
* joined to a broker handler thread.
*
* The rktp_xmit_msgq is handled by the broker-thread purge.
*
* @returns the number of messages purged.
*
* @locks_required rd_kafka_*lock()
* @locks_acquired rd_kafka_topic_rdlock()
*/
static int
rd_kafka_purge_toppars (rd_kafka_t *rk, int purge_flags) {
rd_kafka_topic_t *rkt;
int cnt = 0;
TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
rd_kafka_toppar_t *rktp;
int i;
rd_kafka_topic_rdlock(rkt);
for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
cnt += rd_kafka_toppar_purge_queues(
rkt->rkt_p[i], purge_flags, rd_false/*!xmit*/);
RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
cnt += rd_kafka_toppar_purge_queues(
rktp, purge_flags, rd_false/*!xmit*/);
if (rkt->rkt_ua)
cnt += rd_kafka_toppar_purge_queues(
rkt->rkt_ua, purge_flags, rd_false/*!xmit*/);
rd_kafka_topic_rdunlock(rkt);
}
return cnt;
}
rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) {
@ -4258,21 +4299,19 @@ rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) {
if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
tmpq = rd_kafka_q_new(rk);
/* Send purge request to all broker threads */
rd_kafka_rdlock(rk);
/* Purge msgq for all toppars. */
rd_kafka_purge_toppars(rk, purge_flags);
/* Send purge request to all broker threads */
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_kafka_broker_purge_queues(rkb, purge_flags,
RD_KAFKA_REPLYQ(tmpq, 0));
waitcnt++;
}
rd_kafka_rdunlock(rk);
/* The internal broker handler may hold unassigned partitions */
mtx_lock(&rk->rk_internal_rkb_lock);
rd_kafka_broker_purge_queues(rk->rk_internal_rkb, purge_flags,
RD_KAFKA_REPLYQ(tmpq, 0));
mtx_unlock(&rk->rk_internal_rkb_lock);
waitcnt++;
rd_kafka_rdunlock(rk);
if (tmpq) {

Просмотреть файл

@ -3643,6 +3643,7 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
int max_requests;
int reqcnt;
int inflight = 0;
uint64_t epoch_base_msgid = 0;
/* By limiting the number of not-yet-sent buffers (rkb_outbufs) we
* provide a backpressure mechanism to the producer loop
@ -3834,6 +3835,13 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
rktp, pid, rkm->rkm_u.producer.msgid))
return 0;
}
rd_kafka_toppar_lock(rktp);
/* Idempotent producer epoch base msgid, this is passed to the
* ProduceRequest and msgset writer to adjust the protocol-level
* per-message sequence number. */
epoch_base_msgid = rktp->rktp_eos.epoch_base_msgid;
rd_kafka_toppar_unlock(rktp);
}
if (unlikely(rkb->rkb_state != RD_KAFKA_BROKER_STATE_UP)) {
@ -3880,7 +3888,7 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
/* Send Produce requests for this toppar, honouring the
* queue backpressure threshold. */
for (reqcnt = 0 ; reqcnt < max_requests ; reqcnt++) {
r = rd_kafka_ProduceRequest(rkb, rktp, pid);
r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid);
if (likely(r > 0))
cnt += r;
else
@ -6344,8 +6352,9 @@ static void rd_kafka_broker_handle_purge_queues (rd_kafka_broker_t *rkb,
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
int r;
r = rd_kafka_toppar_handle_purge_queues(rktp, rkb,
purge_flags);
r = rd_kafka_toppar_purge_queues(
rktp, purge_flags,
rd_true/*include xmit msgq*/);
if (r > 0) {
msg_cnt += r;
part_cnt++;

Просмотреть файл

@ -1634,12 +1634,11 @@ void rd_kafka_msgbatch_destroy (rd_kafka_msgbatch_t *rkmb) {
/**
* @brief Initialize a message batch for the Idempotent Producer.
*
* @param rkm is the first message in the batch.
*/
void rd_kafka_msgbatch_init (rd_kafka_msgbatch_t *rkmb,
rd_kafka_toppar_t *rktp,
rd_kafka_pid_t pid) {
rd_kafka_pid_t pid,
uint64_t epoch_base_msgid) {
memset(rkmb, 0, sizeof(*rkmb));
rkmb->rktp = rd_kafka_toppar_keep(rktp);
@ -1648,12 +1647,15 @@ void rd_kafka_msgbatch_init (rd_kafka_msgbatch_t *rkmb,
rkmb->pid = pid;
rkmb->first_seq = -1;
rkmb->epoch_base_msgid = epoch_base_msgid;
}
/**
* @brief Set the first message in the batch. which is used to set
* the BaseSequence and keep track of batch reconstruction range.
*
* @param rkm is the first message in the batch.
*/
void rd_kafka_msgbatch_set_first_msg (rd_kafka_msgbatch_t *rkmb,
rd_kafka_msg_t *rkm) {
@ -1667,9 +1669,8 @@ void rd_kafka_msgbatch_set_first_msg (rd_kafka_msgbatch_t *rkmb,
/* Our msgid counter is 64-bits, but the
* Kafka protocol's sequence is only 31 (signed), so we'll
* need to handle wrapping. */
rkmb->first_seq =
rd_kafka_seq_wrap(rkm->rkm_u.producer.msgid -
rkmb->rktp->rktp_eos.epoch_base_msgid);
rkmb->first_seq = rd_kafka_seq_wrap(rkm->rkm_u.producer.msgid -
rkmb->epoch_base_msgid);
/* Check if there is a stored last message
* on the first msg, which means an entire

Просмотреть файл

@ -36,6 +36,8 @@ typedef struct rd_kafka_msgbatch_s {
rd_kafka_pid_t pid; /**< Producer Id and Epoch */
int32_t first_seq; /**< Base sequence */
int64_t first_msgid; /**< Base msgid */
uint64_t epoch_base_msgid; /**< The partition epoch's
* base msgid. */
uint64_t last_msgid; /**< Last message to add to batch.
* This is used when reconstructing
* batches for resends with
@ -51,7 +53,8 @@ typedef struct rd_kafka_msgbatch_s {
void rd_kafka_msgbatch_destroy (rd_kafka_msgbatch_t *rkmb);
void rd_kafka_msgbatch_init (rd_kafka_msgbatch_t *rkmb,
rd_kafka_toppar_t *rktp,
rd_kafka_pid_t pid);
rd_kafka_pid_t pid,
uint64_t epoch_base_msgid);
void rd_kafka_msgbatch_set_first_msg (rd_kafka_msgbatch_t *rkmb,
rd_kafka_msg_t *rkm);
void rd_kafka_msgbatch_ready_produce (rd_kafka_msgbatch_t *rkmb);

Просмотреть файл

@ -68,6 +68,7 @@ rd_kafka_msgset_create_ProduceRequest (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq,
const rd_kafka_pid_t pid,
uint64_t epoch_base_msgid,
size_t *MessageSetSizep);
/**

Просмотреть файл

@ -497,7 +497,8 @@ static int rd_kafka_msgset_writer_init (rd_kafka_msgset_writer_t *msetw,
rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq,
rd_kafka_pid_t pid) {
rd_kafka_pid_t pid,
uint64_t epoch_base_msgid) {
int msgcnt = rd_kafka_msgq_len(rkmq);
if (msgcnt == 0)
@ -536,7 +537,7 @@ static int rd_kafka_msgset_writer_init (rd_kafka_msgset_writer_t *msetw,
rkbuf_buf);
rd_kafka_msgbatch_init(&msetw->msetw_rkbuf->rkbuf_u.Produce.batch,
rktp, pid);
rktp, pid, epoch_base_msgid);
msetw->msetw_batch = &msetw->msetw_rkbuf->rkbuf_u.Produce.batch;
return msetw->msetw_msgcntmax;
@ -1456,11 +1457,13 @@ rd_kafka_msgset_create_ProduceRequest (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq,
const rd_kafka_pid_t pid,
uint64_t epoch_base_msgid,
size_t *MessageSetSizep) {
rd_kafka_msgset_writer_t msetw;
if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp, rkmq, pid) <= 0)
if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp, rkmq,
pid, epoch_base_msgid) <= 0)
return NULL;
if (!rd_kafka_msgset_writer_write_msgq(&msetw, msetw.msetw_msgq)) {

Просмотреть файл

@ -958,7 +958,7 @@ void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
* Helper method for purging queues when removing a toppar.
* Locks: rd_kafka_toppar_lock() MUST be held
*/
void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp) {
void rd_kafka_toppar_purge_and_disable_queues (rd_kafka_toppar_t *rktp) {
rd_kafka_q_disable(rktp->rktp_fetchq);
rd_kafka_q_purge(rktp->rktp_fetchq);
rd_kafka_q_disable(rktp->rktp_ops);
@ -4240,28 +4240,44 @@ int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
* Delivery reports will be enqueued for all purged messages, the error
* code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE.
*
* @warning Only to be used with the producer
* @param include_xmit_msgq If executing from the rktp's current broker handler
* thread, also include the xmit message queue.
*
* @warning Only to be used with the producer.
*
* @returns the number of messages purged
*
* @locality toppar handler thread
* @locks none
* @locality any thread.
* @locks_acquired rd_kafka_toppar_lock()
* @locks_required none
*/
int rd_kafka_toppar_handle_purge_queues (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb,
int purge_flags) {
int rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp,
int purge_flags,
rd_bool_t include_xmit_msgq) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
int cnt;
rd_assert(rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER);
rd_assert(thrd_is_current(rkb->rkb_thread));
rd_assert(rk->rk_type == RD_KAFKA_PRODUCER);
rd_kafka_dbg(rk, TOPIC, "PURGE",
"%s [%"PRId32"]: purging queues "
"(purge_flags 0x%x, %s xmit_msgq)",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
purge_flags,
include_xmit_msgq ? "include" : "exclude");
if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE))
return 0;
/* xmit_msgq is owned by the toppar handler thread (broker thread)
* and requires no locking. */
rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
if (include_xmit_msgq) {
/* xmit_msgq is owned by the toppar handler thread
* (broker thread) and requires no locking. */
rd_assert(rktp->rktp_broker);
rd_assert(thrd_is_current(rktp->rktp_broker->rkb_thread));
rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
}
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq);
@ -4273,7 +4289,7 @@ int rd_kafka_toppar_handle_purge_queues (rd_kafka_toppar_t *rktp,
* will not be produced (retried) we need to adjust the
* idempotence epoch's base msgid to skip the messages. */
rktp->rktp_eos.epoch_base_msgid += cnt;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
rd_kafka_dbg(rk,
TOPIC|RD_KAFKA_DBG_EOS, "ADVBASE",
"%.*s [%"PRId32"] "
"advancing epoch base msgid to %"PRIu64

Просмотреть файл

@ -156,8 +156,7 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
* requests will have
* a sequence number series
* starting at 0.
* Only accessed from
* toppar handler thread. */
* Protected by toppar_lock */
int32_t next_ack_seq; /**< Next expected ack sequence.
* Protected by toppar lock. */
int32_t next_err_seq; /**< Next expected error sequence.
@ -440,7 +439,7 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_topic_t *rkt,
const char *func, int line);
#define rd_kafka_toppar_new(rkt,partition) \
rd_kafka_toppar_new0(rkt, partition, __FUNCTION__, __LINE__)
void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp);
void rd_kafka_toppar_purge_and_disable_queues (rd_kafka_toppar_t *rktp);
void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
int fetch_state);
void rd_kafka_toppar_insert_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
@ -530,10 +529,9 @@ void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
int64_t query_offset, int backoff_ms);
rd_kafka_assignor_t *
rd_kafka_assignor_find (rd_kafka_t *rk, const char *protocol);
int rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp,
int purge_flags,
rd_bool_t include_xmit_msgq);
rd_kafka_broker_t *rd_kafka_toppar_broker (rd_kafka_toppar_t *rktp,
int proper_broker);

Просмотреть файл

@ -3437,7 +3437,8 @@ static void rd_kafka_handle_Produce (rd_kafka_t *rk,
* @locality broker thread
*/
int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp,
const rd_kafka_pid_t pid) {
const rd_kafka_pid_t pid,
uint64_t epoch_base_msgid) {
rd_kafka_buf_t *rkbuf;
rd_kafka_topic_t *rkt = rktp->rktp_rkt;
size_t MessageSetSize = 0;
@ -3452,7 +3453,8 @@ int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp,
*/
rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
&rktp->rktp_xmit_msgq,
pid, &MessageSetSize);
pid, epoch_base_msgid,
&MessageSetSize);
if (unlikely(!rkbuf))
return 0;
@ -4613,7 +4615,8 @@ static int unittest_idempotent_producer (void) {
for (rcnt = 0 ; rcnt < remaining_batches ; rcnt++) {
size_t msize;
request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), &msize);
rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0,
&msize);
RD_UT_ASSERT(request[rcnt], "request #%d failed", rcnt);
}
@ -4705,7 +4708,8 @@ static int unittest_idempotent_producer (void) {
for (rcnt = 0 ; rcnt < remaining_batches ; rcnt++) {
size_t msize;
request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), &msize);
rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0,
&msize);
RD_UT_ASSERT(request[rcnt],
"Failed to create retry #%d (%d msgs in queue)",
rcnt, rd_kafka_msgq_len(&rkmq));

Просмотреть файл

@ -248,7 +248,8 @@ void rd_kafka_SaslAuthenticateRequest (rd_kafka_broker_t *rkb,
void *opaque);
int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp,
const rd_kafka_pid_t pid);
const rd_kafka_pid_t pid,
uint64_t epoch_base_msgid);
rd_kafka_resp_err_t
rd_kafka_CreateTopicsRequest (rd_kafka_broker_t *rkb,

Просмотреть файл

@ -1325,7 +1325,7 @@ void rd_kafka_topic_partitions_remove (rd_kafka_topic_t *rkt) {
RD_LIST_FOREACH(rktp, partitions, i) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq);
rd_kafka_toppar_purge_queues(rktp);
rd_kafka_toppar_purge_and_disable_queues(rktp);
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(rktp);

Просмотреть файл

@ -735,6 +735,7 @@ static void do_test_txn_endtxn_errors (void) {
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
test_timing_t t_call;
TEST_SAY("Testing scenario #%d %s with %"PRIusz
" injected erorrs, expecting %s\n",
@ -801,12 +802,14 @@ static void do_test_txn_endtxn_errors (void) {
scenario[i].error_cnt,
scenario[i].errors);
TIMING_START(&t_call, "%s", commit_str);
if (commit)
error = rd_kafka_commit_transaction(
rk, tmout_multip(5000));
else
error = rd_kafka_abort_transaction(
rk, tmout_multip(5000));
TIMING_STOP(&t_call);
if (error)
TEST_SAY("Scenario #%d %s failed: %s: %s "