Transactional producer: Fix possible message loss on OUT_OF_ORDER_SEQ error (#3575)

This commit is contained in:
Magnus Edenhill 2021-10-08 14:55:11 +02:00
Родитель 09916a752e
Коммит 2d78e928d8
6 изменённых файлов: 171 добавлений и 11 удалений

Просмотреть файл

@ -24,15 +24,30 @@ librdkafka v1.8.2 is a maintenance release.
* It was not possible to configure `ssl.ca.location` on OSX, the property
would automatically revert back to `probe` (default value).
This regression was introduced in v1.8.0. (#3566)
* librdkafka's internal timers would not start if the timeout was set to 0,
which would result in some timeout operations not being enforced correctly,
e.g., the transactional producer API timeouts.
These timers are now started with a timeout of 1 microsecond.
### Transactional producer fixes
* Upon quick repeated leader changes the transactional producer could receive
an `OUT_OF_ORDER_SEQUENCE` error from the broker, which triggered an
Epoch bump on the producer resulting in an InitProducerIdRequest being sent
to the transaction coordinator in the middle of a transaction.
This request would start a new transaction on the coordinator, but the
producer would still think (erroneously) it was in current transaction.
Any messages produced in the current transaction prior to this event would
be silently lost when the application committed the transaction, leading
to message loss.
This has been fixed by setting the Abortable transaction error state
in the producer. #3575.
* The transactional producer could stall during a transaction if the transaction
coordinator changed while adding offsets to the transaction (send_offsets_to_transaction()).
This stall lasted until the coordinator connection went down, the
transaction timed out, transaction was aborted, or messages were produced
to a new partition, whichever came first. #3571.
* librdkafka's internal timers would not start if the timeout was set to 0,
which would result in some timeout operations not being enforced correctly,
e.g., the transactional producer API timeouts.
These timers are now started with a timeout of 1 microsecond.
*Note: there was no v1.8.1 librdkafka release*

Просмотреть файл

@ -3759,6 +3759,7 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
rd_kafka_idemp_drain_epoch_bump(
rkb->rkb_rk,
RD_KAFKA_RESP_ERR__TIMED_OUT,
"%d message(s) timed out "
"on %s [%"PRId32"]",
timeoutcnt,

Просмотреть файл

@ -613,7 +613,8 @@ void rd_kafka_idemp_drain_reset (rd_kafka_t *rk, const char *reason) {
* @locality any
* @locks none
*/
void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...) {
void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *fmt, ...) {
va_list ap;
char buf[256];
@ -630,6 +631,11 @@ void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...) {
rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_BUMP);
rd_kafka_wrunlock(rk);
/* Transactions: bumping the epoch requires the current transaction
* to be aborted. */
if (rd_kafka_is_transactional(rk))
rd_kafka_txn_set_abortable_error_with_bump(rk, err, "%s", buf);
/* Check right away if the drain could be done. */
rd_kafka_idemp_check_drain_done(rk);
}

Просмотреть файл

@ -74,8 +74,9 @@ void rd_kafka_idemp_pid_update (rd_kafka_broker_t *rkb,
const rd_kafka_pid_t pid);
void rd_kafka_idemp_pid_fsm (rd_kafka_t *rk);
void rd_kafka_idemp_drain_reset (rd_kafka_t *rk, const char *reason);
void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, const char *fmt, ...)
RD_FORMAT(printf, 2, 3);
void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *fmt, ...)
RD_FORMAT(printf, 3, 4);
void rd_kafka_idemp_drain_toppar (rd_kafka_toppar_t *rktp, const char *reason);
void rd_kafka_idemp_inflight_toppar_sub (rd_kafka_t *rk,
rd_kafka_toppar_t *rktp);

Просмотреть файл

@ -2660,7 +2660,7 @@ rd_kafka_handle_idempotent_Produce_error (rd_kafka_broker_t *rkb,
perr->update_next_err = rd_true;
rd_kafka_idemp_drain_epoch_bump(
rk, "skipped sequence numbers");
rk, perr->err, "skipped sequence numbers");
} else {
/* Request's sequence is less than next ack,
@ -2763,7 +2763,7 @@ rd_kafka_handle_idempotent_Produce_error (rd_kafka_broker_t *rkb,
firstmsg->rkm_u.producer.retries);
/* Drain outstanding requests and bump epoch. */
rd_kafka_idemp_drain_epoch_bump(rk,
rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
"unknown producer id");
rd_kafka_txn_set_abortable_error_with_bump(
@ -2800,7 +2800,7 @@ rd_kafka_handle_idempotent_Produce_error (rd_kafka_broker_t *rkb,
firstmsg->rkm_u.producer.retries);
/* Drain outstanding requests and bump epoch. */
rd_kafka_idemp_drain_epoch_bump(rk,
rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
"unknown producer id");
perr->incr_retry = 0;
@ -3169,7 +3169,7 @@ static int rd_kafka_handle_Produce_error (rd_kafka_broker_t *rkb,
/* Drain outstanding requests and bump the epoch .*/
rd_kafka_idemp_drain_epoch_bump(
rk, "message sequence gap");
rk, perr->err, "message sequence gap");
}
perr->update_next_ack = rd_false;

Просмотреть файл

@ -2618,6 +2618,141 @@ static void do_test_commit_after_msg_timeout (void) {
SUB_TEST_PASS();
}
/**
* @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
* during an ongoing transaction.
* The transaction should instead enter the abortable state.
*/
static void do_test_out_of_order_seq (void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
int32_t txn_coord = 1, leader = 2;
const char *txnid = "myTxnId";
test_timing_t timing;
rd_kafka_resp_err_t err;
SUB_TEST_QUICK();
rk = create_txn_producer(&mcluster, txnid, 3,
"batch.num.messages", "1",
NULL);
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
txn_coord);
rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);
test_curr->ignore_dr_err = rd_true;
test_curr->is_fatal_cb = NULL;
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/* Produce one seeding message first to get the leader up and running */
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
test_flush(rk, -1);
/* Let partition leader have a latency of 2 seconds
* so that we can have multiple messages in-flight. */
rd_kafka_mock_broker_set_rtt(mcluster, leader, 2*1000);
/* Produce a message, let it fail with with different errors,
* ending with OUT_OF_ORDER which previously triggered an
* Epoch bump. */
rd_kafka_mock_push_request_errors(
mcluster,
RD_KAFKAP_Produce,
3,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);
/* Produce three messages that will be delayed
* and have errors injected.*/
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
/* Now sleep a short while so that the messages are processed
* by the broker and errors are returned. */
TEST_SAY("Sleeping..\n");
rd_sleep(5);
rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);
/* Produce a fifth message, should fail with ERR__STATE since
* the transaction should have entered the abortable state. */
err = rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
"Expected produce() to fail with ERR__STATE, not %s",
rd_kafka_err2name(err));
TEST_SAY("produce() failed as expected: %s\n",
rd_kafka_err2str(err));
/* Commit the transaction, should fail with abortable error. */
TIMING_START(&timing, "commit_transaction(-1)");
error = rd_kafka_commit_transaction(rk, -1);
TIMING_STOP(&timing);
TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
TEST_ASSERT(!rd_kafka_error_is_fatal(error),
"Did not expect fatal error");
TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
"Expected abortable error");
rd_kafka_error_destroy(error);
/* Abort the transaction */
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
/* Run a new transaction without errors to verify that the
* producer can recover. */
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
TEST_CALL_ERR__(rd_kafka_producev(rk,
RD_KAFKA_V_TOPIC("mytopic"),
RD_KAFKA_V_PARTITION(0),
RD_KAFKA_V_VALUE("hi", 2),
RD_KAFKA_V_END));
TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
rd_kafka_destroy(rk);
SUB_TEST_PASS();
}
int main_0105_transactions_mock (int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
@ -2686,5 +2821,7 @@ int main_0105_transactions_mock (int argc, char **argv) {
do_test_txn_switch_coordinator_refresh();
do_test_out_of_order_seq();
return 0;
}