Fix crash in roundrobin assignor for asymmetrical subscriptions (#2121)

This also adds declarative unit tests of the assignors.
This commit is contained in:
Magnus Edenhill 2020-05-04 16:12:10 +02:00
Родитель efcfef372e
Коммит 8ba5df7ab7
5 изменённых файлов: 424 добавлений и 5 удалений

Просмотреть файл

@ -41,6 +41,9 @@ librdkafka.
### Consumer fixes
* The roundrobin partition assignor could crash if subscriptions
where asymmetrical (different sets from different members of the group).
Thanks to @ankon and @wilmai for identifying the root cause (#2121).
* Initial consumer group joins should now be a couple of seconds quicker
thanks expedited query intervals (@benesch).
* Don't propagate temporary offset lookup errors to application

Просмотреть файл

@ -27,6 +27,7 @@
*/
#include "rdkafka_int.h"
#include "rdkafka_assignor.h"
#include "rdunittest.h"
#include <ctype.h>
@ -558,3 +559,403 @@ int rd_kafka_assignors_init (rd_kafka_t *rk, char *errstr, size_t errstr_size) {
void rd_kafka_assignors_term (rd_kafka_t *rk) {
rd_list_destroy(&rk->rk_conf.partition_assignors);
}
/**
* @brief Unittest for assignors
*/
int unittest_assignors (void) {
const struct {
const char *name;
int topic_cnt;
struct {
const char *name;
int partition_cnt;
} topics[12];
int member_cnt;
struct {
const char *name;
int topic_cnt;
const char *topics[12];
} members[3];
int expect_cnt;
struct {
const char *protocol_name;
struct {
int partition_cnt;
const char *partitions[12]; /* "topic:part" */
} members[3];
} expect[2];
} tests[] = {
/*
* Test cases
*/
{
.name = "Symmetrical subscription",
.topic_cnt = 4,
.topics = {
{ "a", 3 }, /* a:0 a:1 a:2 */
{ "b", 4, }, /* b:0 b:1 b:2 b:3 */
{ "c", 2 }, /* c:0 c:1 */
{ "d", 1 }, /* d:0 */
},
.member_cnt = 2,
.members = {
{ .name = "consumer1",
.topic_cnt = 4,
.topics = { "d", "b", "a", "c" } },
{ .name = "consumer2",
.topic_cnt = 4,
.topics = { "a", "b", "c", "d" } },
},
.expect_cnt = 2,
.expect = {
{ .protocol_name = "range",
.members = {
/* Consumer1 */
{ 6,
{ "a:0", "a:1",
"b:0", "b:1",
"c:0",
"d:0" } },
/* Consumer2 */
{ 4,
{ "a:2",
"b:2" ,"b:3",
"c:1" } },
},
},
{ .protocol_name = "roundrobin",
.members = {
/* Consumer1 */
{ 5,
{ "a:0", "a:2",
"b:1", "b:3",
"c:1" } },
/* Consumer2 */
{ 5,
{ "a:1",
"b:0" ,"b:2",
"c:0",
"d:0" } },
},
},
},
},
{
.name = "1*3 partitions (asymmetrical)",
.topic_cnt = 1,
.topics = {
{ "a", 3 },
},
.member_cnt = 2,
.members = {
{ .name = "consumer1",
.topic_cnt = 3,
.topics = { "a", "b", "c" } },
{ .name = "consumer2",
.topic_cnt = 1,
.topics = { "a" } },
},
.expect_cnt = 2,
.expect = {
{ .protocol_name = "range",
.members = {
/* Consumer1.
* range assignor applies
* per topic. */
{ 2,
{ "a:0", "a:1" } },
/* Consumer2 */
{ 1,
{ "a:2" } },
},
},
{ .protocol_name = "roundrobin",
.members = {
/* Consumer1 */
{ 2,
{ "a:0", "a:2" } },
/* Consumer2 */
{ 1,
{ "a:1" } },
},
},
},
},
{
.name = "#2121 (asymmetrical)",
.topic_cnt = 12,
.topics = {
{ "a", 1 },
{ "b", 1 },
{ "c", 1 },
{ "d", 1 },
{ "e", 1 },
{ "f", 1 },
{ "g", 1 },
{ "h", 1 },
{ "i", 1 },
{ "j", 1 },
{ "k", 1 },
{ "l", 1 },
},
.member_cnt = 2,
.members = {
{ .name = "consumer1",
.topic_cnt = 12,
.topics = {
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
},
},
{ .name = "consumer2", /* must be second */
.topic_cnt = 5,
.topics = {
"b",
"d",
"f",
"h",
"l",
},
},
},
.expect_cnt = 2,
.expect = {
{ .protocol_name = "range",
.members = {
/* Consumer1.
* All partitions. */
{ 12,
{
"a:0",
"b:0",
"c:0",
"d:0",
"e:0",
"f:0",
"g:0",
"h:0",
"i:0",
"j:0",
"k:0",
"l:0",
}
},
/* Consumer2 */
{ 0 },
},
},
{ .protocol_name = "roundrobin",
.members = {
/* Consumer1 */
{ 7,
{
"a:0",
"c:0",
"e:0",
"g:0",
"i:0",
"j:0",
"k:0",
},
},
/* Consumer2 */
{ 5,
{
"b:0",
"d:0",
"f:0",
"h:0",
"l:0"
}
},
},
},
},
},
{ NULL },
};
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
int fails = 0;
int i;
conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "group.id", "group", NULL, 0);
rd_kafka_conf_set(conf, "debug", rd_getenv("TEST_DEBUG", NULL),
NULL, 0);
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
RD_UT_ASSERT(rk != NULL, "Failed to create consumer");
/* Run through test cases */
for (i = 0 ; tests[i].name ; i++) {
int ie, it, im;
rd_kafka_metadata_t metadata;
rd_kafka_group_member_t *members;
/* Create topic metadata */
metadata.topic_cnt = tests[i].topic_cnt;
metadata.topics = rd_alloca(sizeof(*metadata.topics) *
metadata.topic_cnt);
memset(metadata.topics, 0,
sizeof(*metadata.topics) * metadata.topic_cnt);
for (it = 0 ; it < metadata.topic_cnt ; it++) {
metadata.topics[it].topic =
(char *)tests[i].topics[it].name;
metadata.topics[it].partition_cnt =
tests[i].topics[it].partition_cnt;
metadata.topics[it].partitions = NULL; /* Not used */
}
/* Create members */
members = rd_alloca(sizeof(*members) * tests[i].member_cnt);
memset(members, 0, sizeof(*members) * tests[i].member_cnt);
for (im = 0 ; im < tests[i].member_cnt ; im++) {
rd_kafka_group_member_t *rkgm = &members[im];
rkgm->rkgm_member_id =
rd_kafkap_str_new(tests[i].members[im].name,
-1);
rkgm->rkgm_group_instance_id =
rd_kafkap_str_new(tests[i].members[im].name,
-1);
rd_list_init(&rkgm->rkgm_eligible,
tests[i].members[im].topic_cnt, NULL);
rkgm->rkgm_subscription =
rd_kafka_topic_partition_list_new(
tests[i].members[im].topic_cnt);
for (it = 0; it < tests[i].members[im].topic_cnt; it++)
rd_kafka_topic_partition_list_add(
rkgm->rkgm_subscription,
tests[i].members[im].topics[it],
RD_KAFKA_PARTITION_UA);
rkgm->rkgm_userdata = NULL;
rkgm->rkgm_assignment =
rd_kafka_topic_partition_list_new(
rkgm->rkgm_subscription->size);
}
/* For each assignor verify that the assignment
* matches the expection set out in the test case. */
for (ie = 0 ; ie < tests[i].expect_cnt ; ie++) {
rd_kafka_resp_err_t err;
char errstr[256];
RD_UT_SAY("Test case %s: %s assignor",
tests[i].name,
tests[i].expect[ie].protocol_name);
/* Run assignor */
err = rd_kafka_assignor_run(
rk->rk_cgrp,
tests[i].expect[ie].protocol_name,
&metadata,
members, tests[i].member_cnt,
errstr, sizeof(errstr));
RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s",
tests[i].name,
tests[i].expect[ie].protocol_name,
errstr);
/* Verify assignments */
for (im = 0 ; im < tests[i].member_cnt ; im++) {
rd_kafka_group_member_t *rkgm = &members[im];
int ia;
if (rkgm->rkgm_assignment->cnt !=
tests[i].expect[ie].members[im].
partition_cnt) {
RD_UT_WARN(
" Member %.*s assignment count "
"mismatch: %d != %d",
RD_KAFKAP_STR_PR(
rkgm->rkgm_member_id),
rkgm->rkgm_assignment->cnt,
tests[i].expect[ie].members[im].
partition_cnt);
fails++;
}
if (rkgm->rkgm_assignment->cnt > 0)
rd_kafka_topic_partition_list_sort_by_topic(
rkgm->rkgm_assignment);
for (ia = 0 ;
ia < rkgm->rkgm_assignment->cnt ; ia++) {
rd_kafka_topic_partition_t *p =
&rkgm->rkgm_assignment->
elems[ia];
char part[64];
const char *exp =
ia < tests[i].expect[ie].
members[im].partition_cnt ?
tests[i].expect[ie].
members[im].partitions[ia] :
"(none)";
rd_snprintf(part, sizeof(part), "%s:%d",
p->topic,
(int)p->partition);
#if 0 /* Enable to print actual assignment */
RD_UT_SAY(" Member %.*s assignment "
"%d/%d %s =? %s",
RD_KAFKAP_STR_PR(
rkgm->rkgm_member_id),
ia,
rkgm->rkgm_assignment->cnt-1,
part, exp);
#endif
if (strcmp(part, exp)) {
RD_UT_WARN(
" Member %.*s "
"assignment %d/%d "
"mismatch: %s != %s",
RD_KAFKAP_STR_PR(
rkgm->
rkgm_member_id),
ia,
rkgm->rkgm_assignment->
cnt-1,
part, exp);
fails++;
}
}
/* Reset assignment for next loop */
rd_kafka_topic_partition_list_destroy(
rkgm->rkgm_assignment);
rkgm->rkgm_assignment =
rd_kafka_topic_partition_list_new(
rkgm->rkgm_subscription->size);
}
}
for (im = 0 ; im < tests[i].member_cnt ; im++) {
rd_kafka_group_member_t *rkgm = &members[im];
rd_kafka_group_member_clear(rkgm);
}
}
rd_kafka_destroy(rk);
return fails ? 1 : 0;
}

Просмотреть файл

@ -61,7 +61,7 @@ rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
char *errstr, size_t errstr_size,
void *opaque) {
unsigned int ti;
int next = 0; /* Next member id */
int next = -1; /* Next member id */
/* Sort topics by name */
qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics),
@ -82,12 +82,20 @@ rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
partition++) {
rd_kafka_group_member_t *rkgm;
next = (next+1) % rd_list_cnt(&eligible_topic->members);
/* Scan through members until we find one with a
* subscription to this topic. */
while (!rd_kafka_group_member_find_subscription(
rk, &members[next],
eligible_topic->metadata->topic))
next++;
eligible_topic->metadata->topic)) {
next++; /* The next-increment modulo check above
* ensures this increment does not
* run out of range. */
rd_assert(next <
rd_list_cnt(&eligible_topic->
members));
}
rkgm = &members[next];
@ -102,7 +110,6 @@ rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
rkgm->rkgm_assignment,
eligible_topic->metadata->topic, partition);
next = (next+1) % rd_list_cnt(&eligible_topic->members);
}
}

Просмотреть файл

@ -441,6 +441,7 @@ extern int unittest_cgrp (void);
#if WITH_SASL_SCRAM
extern int unittest_scram (void);
#endif
extern int unittest_assignors (void);
int rd_unittest (void) {
int fails = 0;
@ -472,10 +473,12 @@ int rd_unittest (void) {
#if WITH_SASL_SCRAM
{ "scram", unittest_scram },
#endif
{ "assignors", unittest_assignors },
{ NULL }
};
int i;
const char *match = rd_getenv("RD_UT_TEST", NULL);
int cnt = 0;
if (rd_getenv("RD_UT_ASSERT", NULL))
rd_unittest_assert_on_failure = rd_true;
@ -505,6 +508,7 @@ int rd_unittest (void) {
unittests[i].name,
f ? "\033[31mFAIL" : "\033[32mPASS");
fails += f;
cnt++;
}
#if ENABLE_CODECOV
@ -525,5 +529,8 @@ int rd_unittest (void) {
#endif
#endif
if (!cnt && match)
RD_UT_WARN("No unittests matching \"%s\"", match);
return fails;
}

Просмотреть файл

@ -77,7 +77,8 @@ extern rd_bool_t rd_unittest_slow;
#define RD_UT_ASSERT(expr,...) do { \
if (!(expr)) { \
fprintf(stderr, \
"\033[31mRDUT: FAIL: %s:%d: %s: assert failed: " # expr ": ", \
"\033[31mRDUT: FAIL: %s:%d: %s: " \
"assert failed: " # expr ": ", \
__FILE__, __LINE__, __FUNCTION__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\033[0m\n"); \