Txn: properly handle PRODUCER_FENCED in InitPid reply

This commit is contained in:
Magnus Edenhill 2022-04-06 13:21:40 +02:00
Родитель 52949110d6
Коммит c5e9466f9b
3 изменённых файлов: 26 добавлений и 6 удалений

Просмотреть файл

@ -115,7 +115,12 @@ librdkafka v1.9.0 is a feature release:
The fix included in this release is to save the per-partition idempotency
state when a partition is removed, and then recover and use that saved
state if the partition comes back at a later time.
* The logic for enforcing that `message.timeout.ms` is greather than
* The transactional producer would retry (re)initializing its PID if a
`PRODUCER_FENCED` error was returned from the
broker (added in Apache Kafka 2.8), which could cause the producer to
seemingly hang.
This error code is now correctly handled by raising a fatal error.
* The logic for enforcing that `message.timeout.ms` is greater than
an explicitly configured `linger.ms` was incorrect and instead of
erroring out early the lingering time was automatically adjusted to the
message timeout, ignoring the configured `linger.ms`.

Просмотреть файл

@ -165,6 +165,7 @@ rd_bool_t rd_kafka_idemp_check_error(rd_kafka_t *rk,
break;
case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
case RD_KAFKA_RESP_ERR_PRODUCER_FENCED:
is_fatal = rd_true;
/* Normalize error */
err = RD_KAFKA_RESP_ERR__FENCED;

Просмотреть файл

@ -494,8 +494,12 @@ static void do_test_txn_slow_reinit(rd_bool_t with_sleep) {
* transaction errors, but let the broker-side bumping of the
* producer PID fail with a fencing error.
* Should raise a fatal error.
*
* @param error_code Which error code InitProducerIdRequest should fail with.
* Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
* or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
*/
static void do_test_txn_fenced_reinit(void) {
static void do_test_txn_fenced_reinit(rd_kafka_resp_err_t error_code) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;
@ -504,7 +508,7 @@ static void do_test_txn_fenced_reinit(void) {
char errstr[512];
rd_kafka_resp_err_t fatal_err;
SUB_TEST_QUICK();
SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
NULL);
@ -533,8 +537,7 @@ static void do_test_txn_fenced_reinit(void) {
/* Fail the PID reinit */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1,
RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, 0);
mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
/* Produce a message, let it fail with a fatal idempo error. */
rd_kafka_mock_push_request_errors(
@ -685,6 +688,16 @@ static void do_test_txn_endtxn_errors(void) {
rd_true /* abortable */,
rd_false /* !fatal */,
},
{
/* #11 */
1,
{RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
/* This error is normalized */
RD_KAFKA_RESP_ERR__FENCED,
rd_false /* !retriable */,
rd_false /* !abortable */,
rd_true /* fatal */
},
{0},
};
int i;
@ -2739,7 +2752,8 @@ int main_0105_transactions_mock(int argc, char **argv) {
do_test_txn_fatal_idempo_errors();
do_test_txn_fenced_reinit();
do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
do_test_txn_req_cnt();