EndTxnRequests (sent on commit/abort) are only retried in allowed states (#3041)

Previously the transaction could hang on commit_transaction() if an abortable
error was hit and the EndTxnRequest was to be retried.
This commit is contained in:
Magnus Edenhill 2020-09-23 16:07:58 +02:00
Родитель 3a6258c2f7
Коммит 98312ec1a3
6 изменённых файлов: 197 добавлений и 17 удалений

Просмотреть файл

@ -76,6 +76,10 @@ librdkafka v1.5.2 is a maintenance release.
If the application did not take action on failed messages in its delivery
report callback and went on to commit the transaction, the transaction would
be successfully committed, simply omitting the failed messages.
* EndTxnRequests (sent on commit/abort) are only retried in allowed
states (#3041).
Previously the transaction could hang on commit_transaction() if an abortable
error was hit and the EndTxnRequest was to be retried.

Просмотреть файл

@ -229,7 +229,7 @@ rd_kafka_mock_partition_log_append (rd_kafka_mock_partition_t *mpart,
/**
* @brief Set the partition leader
* @brief Set the partition leader, or NULL for leader-less.
*/
static void
rd_kafka_mock_partition_set_leader0 (rd_kafka_mock_partition_t *mpart,
@ -1818,10 +1818,14 @@ rd_kafka_mock_cluster_cmd (rd_kafka_mock_cluster_t *mcluster,
if (!mpart)
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
mrkb = rd_kafka_mock_broker_find(mcluster,
rko->rko_u.mock.broker_id);
if (!mrkb)
return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE;
if (rko->rko_u.mock.broker_id != -1) {
mrkb = rd_kafka_mock_broker_find(
mcluster, rko->rko_u.mock.broker_id);
if (!mrkb)
return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE;
} else {
mrkb = NULL;
}
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Set %s [%"PRId32"] leader to %"PRId32,

Просмотреть файл

@ -185,7 +185,8 @@ rd_kafka_mock_topic_create (rd_kafka_mock_cluster_t *mcluster,
*
* The topic will be created if it does not exist.
*
* \p broker_id needs to be an existing broker.
* \p broker_id needs to be an existing broker, or -1 to make the
* partition leader-less.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_partition_set_leader (rd_kafka_mock_cluster_t *mcluster,

Просмотреть файл

@ -906,13 +906,15 @@ rd_kafka_txn_curr_api_abort_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
rd_kafka_txn_set_abortable_error(
rkts->rkts_rk,
RD_KAFKA_RESP_ERR__TIMED_OUT,
"Transactional operation timed out");
"Transactional API operation (%s) timed out",
rkq->rkq_rk->rk_eos.txn_curr_api.name);
rd_kafka_txn_curr_api_reply_error(
rkq,
rd_kafka_error_new_txn_requires_abort(
RD_KAFKA_RESP_ERR__TIMED_OUT,
"Transactional operation timed out"));
"Transactional API operation (%s) timed out",
rkq->rkq_rk->rk_eos.txn_curr_api.name));
}
/**
@ -1917,7 +1919,7 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
rd_kafka_q_t *rkq = opaque;
int16_t ErrorCode;
int actions = 0;
rd_bool_t is_commit = rd_false;
rd_bool_t is_commit = rd_false, may_retry = rd_false;
if (err == RD_KAFKA_RESP_ERR__DESTROY) {
rd_kafka_q_destroy(rkq);
@ -1937,12 +1939,14 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
err = rkbuf->rkbuf_err;
err:
rd_kafka_wrlock(rk);
if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION)
if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION) {
is_commit = rd_true;
else if (rk->rk_eos.txn_state ==
RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)
may_retry = rd_true;
} else if (rk->rk_eos.txn_state ==
RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) {
is_commit = rd_false;
else
may_retry = rd_true;
} else if (!err)
err = RD_KAFKA_RESP_ERR__OUTDATED;
if (!err) {
@ -1950,6 +1954,12 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
rd_kafka_txn_complete(rk);
}
rd_kafka_dbg(rk, EOS, "ENDTXN",
"EndTxn failed due to %s in state %s (may_retry=%s)",
rd_kafka_err2name(err),
rd_kafka_txn_state2str(rk->rk_eos.txn_state),
RD_STR_ToF(may_retry));
rd_kafka_wrunlock(rk);
switch (err)
@ -1959,10 +1969,12 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
case RD_KAFKA_RESP_ERR__DESTROY:
/* Producer is being terminated, ignore the response. */
case RD_KAFKA_RESP_ERR__TIMED_OUT:
/* Transaction API timeout has been hit
* (this is our internal timer) */
case RD_KAFKA_RESP_ERR__OUTDATED:
/* Set a non-actionable actions flag so that curr_api_reply()
* is called below, without other side-effects. */
actions = RD_KAFKA_ERR_ACTION_SPECIAL;
/* Transactional state no longer relevant for this
* outdated response. */
break;
case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
@ -1994,7 +2006,7 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
"Failed to end transaction: %s",
rd_kafka_err2str(err));
} else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
} else if (may_retry && actions & RD_KAFKA_ERR_ACTION_RETRY) {
if (rd_kafka_buf_retry(rkb, request))
return;
actions |= RD_KAFKA_ERR_ACTION_PERMANENT;

Просмотреть файл

@ -75,4 +75,8 @@ void *rd_strtup_list_copy (const void *elem, void *opaque);
char *rd_flags2str (char *dst, size_t size,
const char **desc, int flags);
/** @returns "true" if EXPR is true, else "false" */
#define RD_STR_ToF(EXPR) ((EXPR) ? "true" : "false")
#endif /* _RDSTRING_H_ */

Просмотреть файл

@ -811,6 +811,159 @@ static void do_test_txn_auth_failure (int16_t ApiKey,
}
/**
* @brief Issue #3041: Commit fails due to message flush() taking too long,
* eventually resulting in an unabortable error and failure to
* re-init the transactional producer.
*/
static void do_test_txn_flush_timeout (void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
const char *txnid = "myTxnId";
const char *topic = "myTopic";
const int32_t coord_id = 2;
int msgcounter = 0;
rd_bool_t is_retry = rd_false;
TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);
rk = create_txn_producer(&mcluster, txnid, 3,
"message.timeout.ms", "10000",
"transaction.timeout.ms", "10000",
/* Speed up coordinator reconnect */
"reconnect.backoff.max.ms", "1000",
NULL);
/* Broker down is not a test-failing error */
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
/* Set coordinator so we can disconnect it later */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
/*
* Init transactions
*/
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
retry:
if (!is_retry) {
/* First attempt should fail. */
test_curr->ignore_dr_err = rd_true;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
/* Assign invalid partition leaders for some partitions so
* that messages will not be delivered. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
} else {
/* The retry should succeed */
test_curr->ignore_dr_err = rd_false;
test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR :
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
}
/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
/*
* Produce some messages to specific partitions and random.
*/
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA,
0, 0, 100, NULL, 10, &msgcounter);
/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
999999111;
rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
123456789;
cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
rk, offsets,
cgmetadata, -1));
rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);
rd_sleep(2);
if (!is_retry) {
/* Now disconnect the coordinator. */
TEST_SAY("Disconnecting transaction coordinator %"PRId32"\n",
coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
}
/*
* Start committing.
*/
error = rd_kafka_commit_transaction(rk, -1);
if (!is_retry) {
TEST_ASSERT(error != NULL,
"Expected commit to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else {
TEST_ASSERT(!error,
"Expected commit to succeed, not: %s",
rd_kafka_error_string(error));
}
if (!is_retry) {
/*
* Bring the coordinator back up.
*/
rd_kafka_mock_broker_set_up(mcluster, coord_id);
rd_sleep(2);
/*
* Abort, and try again, this time without error.
*/
TEST_SAY("Aborting and retrying\n");
is_retry = rd_true;
TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
goto retry;
}
/* All done */
rd_kafka_destroy(rk);
TEST_SAY(_C_GRN "[ %s PASS ]\n", __FUNCTION__);
}
int main_0105_transactions_mock (int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
@ -841,6 +994,8 @@ int main_0105_transactions_mock (int argc, char **argv) {
RD_KAFKAP_FindCoordinator,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
do_test_txn_flush_timeout();
if (!test_quick)
do_test_txn_switch_coordinator();