This commit is contained in:
Magnus Edenhill 2014-01-25 23:36:38 +07:00
Родитель 5e9fcd9df5
Коммит 644d8683dc
5 изменённых файлов: 17 добавлений и 21 удалений

Просмотреть файл

@ -849,7 +849,7 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,
/* Construct a client id if none is given. */ /* Construct a client id if none is given. */
if (!rk->rk_conf.clientid) if (!rk->rk_conf.clientid)
rk->rk_conf.clientid = strdup("rdkafka"); rk->rk_conf.clientid = strdup("rdkafka");
snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i", snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
rk->rk_conf.clientid, rd_kafka_type2str(rk->rk_type), rkid++); rk->rk_conf.clientid, rd_kafka_type2str(rk->rk_type), rkid++);
@ -1247,7 +1247,6 @@ static void rd_kafka_poll_cb (rd_kafka_op_t *rko, void *opaque) {
if (!(dcnt % 1000)) if (!(dcnt % 1000))
rd_kafka_dbg(rk, MSG, "POLL", rd_kafka_dbg(rk, MSG, "POLL",
"Now %i messages delivered to app", dcnt); "Now %i messages delivered to app", dcnt);
break; break;
case RD_KAFKA_OP_STATS: case RD_KAFKA_OP_STATS:
@ -1343,7 +1342,6 @@ void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
rd_kafka_toppar_dump(fp, " ", rkt->rkt_ua); rd_kafka_toppar_dump(fp, " ", rkt->rkt_ua);
} }
rd_kafka_unlock(rk); rd_kafka_unlock(rk);
} }

Просмотреть файл

@ -567,6 +567,7 @@ static void rd_kafka_broker_buf_enq (rd_kafka_broker_t *rkb,
} }
/** /**
* Memory reading helper macros to be used when parsing network responses. * Memory reading helper macros to be used when parsing network responses.
* *
@ -710,7 +711,7 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
TopicMetadata = malloc(sizeof(*TopicMetadata) * TopicMetadata_cnt); TopicMetadata = malloc(sizeof(*TopicMetadata) * TopicMetadata_cnt);
for (i = 0 ; i < TopicMetadata_cnt ; i++) { for (i = 0 ; i < TopicMetadata_cnt ; i++) {
_READ_I16(&TopicMetadata[i].ErrorCode); _READ_I16(&TopicMetadata[i].ErrorCode);
_READ_STR(TopicMetadata[i].Name); _READ_STR(TopicMetadata[i].Name);
@ -726,7 +727,7 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
TopicMetadata[i].PartitionMetadata = TopicMetadata[i].PartitionMetadata =
alloca(sizeof(*TopicMetadata[i].PartitionMetadata) * alloca(sizeof(*TopicMetadata[i].PartitionMetadata) *
TopicMetadata[i].PartitionMetadata_cnt); TopicMetadata[i].PartitionMetadata_cnt);
for (j = 0 ; j < TopicMetadata[i].PartitionMetadata_cnt ; j++) { for (j = 0 ; j < TopicMetadata[i].PartitionMetadata_cnt ; j++) {
_READ_I16(&TopicMetadata[i].PartitionMetadata[j]. _READ_I16(&TopicMetadata[i].PartitionMetadata[j].
ErrorCode); ErrorCode);
@ -753,7 +754,7 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
PartitionMetadata[j].Replicas) PartitionMetadata[j].Replicas)
* TopicMetadata[i].PartitionMetadata[j]. * TopicMetadata[i].PartitionMetadata[j].
Replicas_cnt); Replicas_cnt);
for (k = 0 ; k < TopicMetadata[i].PartitionMetadata[j]. for (k = 0 ; k < TopicMetadata[i].PartitionMetadata[j].
Replicas_cnt ; k++) Replicas_cnt ; k++)
_READ_I32(&TopicMetadata[i]. _READ_I32(&TopicMetadata[i].
@ -783,11 +784,8 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
Isr_cnt ; k++) Isr_cnt ; k++)
_READ_I32(&TopicMetadata[i]. _READ_I32(&TopicMetadata[i].
PartitionMetadata[j].Isr[k]); PartitionMetadata[j].Isr[k]);
} }
} }
/* Update our list of brokers. */ /* Update our list of brokers. */
for (i = 0 ; i < Broker_cnt ; i++) { for (i = 0 ; i < Broker_cnt ; i++) {
@ -2118,7 +2116,7 @@ static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) {
if (unlikely(now >= rkb->rkb_ts_metadata_poll)) if (unlikely(now >= rkb->rkb_ts_metadata_poll))
rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL, rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL,
"periodic refresh"); "periodic refresh");
if (rkb->rkb_outbufs.rkbq_cnt > 0) if (rkb->rkb_outbufs.rkbq_cnt > 0)
rkb->rkb_pfd.events |= POLLOUT; rkb->rkb_pfd.events |= POLLOUT;
else else
@ -2127,7 +2125,7 @@ static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) {
if (poll(&rkb->rkb_pfd, 1, if (poll(&rkb->rkb_pfd, 1,
rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0) rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0)
return; return;
if (rkb->rkb_pfd.revents & POLLIN) if (rkb->rkb_pfd.revents & POLLIN)
while (rd_kafka_recv(rkb) > 0) while (rd_kafka_recv(rkb) > 0)
; ;
@ -2135,7 +2133,7 @@ static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) {
if (rkb->rkb_pfd.revents & POLLHUP) if (rkb->rkb_pfd.revents & POLLHUP)
return rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT, return rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
"Connection closed"); "Connection closed");
if (rkb->rkb_pfd.revents & POLLOUT) if (rkb->rkb_pfd.revents & POLLOUT)
while (rd_kafka_send(rkb) > 0) while (rd_kafka_send(rkb) > 0)
; ;
@ -2180,7 +2178,7 @@ static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
do { do {
cnt = 0; cnt = 0;
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) { TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
rd_rkb_dbg(rkb, TOPIC, "TOPPAR", rd_rkb_dbg(rkb, TOPIC, "TOPPAR",
@ -3209,7 +3207,7 @@ static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb) {
* Note: 'now' may be a bit outdated by now. */ * Note: 'now' may be a bit outdated by now. */
if (do_timeout_scan) if (do_timeout_scan)
rd_kafka_broker_waitresp_timeout_scan(rkb, now); rd_kafka_broker_waitresp_timeout_scan(rkb, now);
rd_kafka_broker_lock(rkb); rd_kafka_broker_lock(rkb);
} }
@ -3332,7 +3330,7 @@ static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
rd_kafka_bufq_init(&rkb->rkb_retrybufs); rd_kafka_bufq_init(&rkb->rkb_retrybufs);
rd_kafka_q_init(&rkb->rkb_ops); rd_kafka_q_init(&rkb->rkb_ops);
rd_kafka_broker_keep(rkb); rd_kafka_broker_keep(rkb);
/* Set next intervalled metadata refresh, offset by a random /* Set next intervalled metadata refresh, offset by a random
* value to avoid all brokers to be queried simultaneously. */ * value to avoid all brokers to be queried simultaneously. */
if (rkb->rkb_rk->rk_conf.metadata_refresh_interval_ms >= 0) if (rkb->rkb_rk->rk_conf.metadata_refresh_interval_ms >= 0)

Просмотреть файл

@ -134,7 +134,7 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
rd_ts_t now) { rd_ts_t now) {
rd_kafka_msg_t *rkm, *tmp; rd_kafka_msg_t *rkm, *tmp;
int cnt = timedout->rkmq_msg_cnt; int cnt = timedout->rkmq_msg_cnt;
/* Assume messages are added in time sequencial order */ /* Assume messages are added in time sequencial order */
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) { TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
if (likely(rkm->rkm_ts_timeout > now)) if (likely(rkm->rkm_ts_timeout > now))

Просмотреть файл

@ -48,7 +48,7 @@ static rd_kafka_toppar_t *rd_kafka_toppar_new (rd_kafka_topic_t *rkt,
rd_kafka_toppar_t *rktp; rd_kafka_toppar_t *rktp;
rktp = calloc(1, sizeof(*rktp)); rktp = calloc(1, sizeof(*rktp));
rktp->rktp_partition = partition; rktp->rktp_partition = partition;
rktp->rktp_rkt = rkt; rktp->rktp_rkt = rkt;
rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE; rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
@ -407,7 +407,7 @@ rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
if ((rkt = rd_kafka_topic_find(rk, topic))) if ((rkt = rd_kafka_topic_find(rk, topic)))
return rkt; return rkt;
rkt = calloc(1, sizeof(*rkt)); rkt = calloc(1, sizeof(*rkt));
rkt->rkt_topic = rd_kafkap_str_new(topic); rkt->rkt_topic = rd_kafkap_str_new(topic);
@ -512,7 +512,6 @@ void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_keep(rkb); rd_kafka_broker_keep(rkb);
rd_kafka_broker_toppars_unlock(rkb); rd_kafka_broker_toppars_unlock(rkb);
} else { } else {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT", rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"No broker is leader for topic %.*s [%"PRId32"]", "No broker is leader for topic %.*s [%"PRId32"]",
@ -911,3 +910,4 @@ int rd_kafka_topic_partition_available (const rd_kafka_topic_t *rkt,
rd_kafka_toppar_destroy(rktp); rd_kafka_toppar_destroy(rktp);
return avail; return avail;
} }

Просмотреть файл

@ -96,11 +96,11 @@ void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
if (!(t = strchr(b, '='))) if (!(t = strchr(b, '=')))
TEST_FAIL("%s:%i: expected name=value format\n", TEST_FAIL("%s:%i: expected name=value format\n",
test_conf, line); test_conf, line);
name = b; name = b;
*t = '\0'; *t = '\0';
val = t+1; val = t+1;
if (!strncmp(name, "topic.", strlen("topic."))) { if (!strncmp(name, "topic.", strlen("topic."))) {
name += strlen("topic."); name += strlen("topic.");
res = rd_kafka_topic_conf_set(*topic_conf, res = rd_kafka_topic_conf_set(*topic_conf,