This commit is contained in:
Magnus Edenhill 2013-08-26 22:34:23 +02:00
Родитель c12ceab8ea
Коммит 67391491b5
2 изменённых файлов: 21 добавлений и 5 удалений

Просмотреть файл

@ -595,6 +595,7 @@ void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
rd_kafka_topic_t *rkt;
rd_kafka_toppar_t *rktp;
rd_kafka_lock(rk);
fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
fprintf(fp, " rk_op request queue: %i ops\n", rk->rk_op.rkq_qlen);
@ -602,6 +603,7 @@ void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
fprintf(fp, " brokers:\n");
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_kafka_broker_lock(rkb);
fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
" in state %s\n",
rkb, rkb->rkb_name, rkb->rkb_nodeid,
@ -622,14 +624,13 @@ void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
fprintf(fp, " %i toppars:\n", rkb->rkb_toppar_cnt);
rd_kafka_broker_toppars_rdlock(rkb);
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
rd_kafka_toppar_dump(fp, " ", rktp);
rd_kafka_broker_toppars_unlock(rkb);
}
rd_kafka_broker_unlock(rkb);
}
fprintf(fp, " topics:\n");
rd_kafka_lock(rk);
TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
fprintf(fp, " %.*s with %"PRId32" partitions\n",
RD_KAFKAP_STR_PR(rkt->rkt_topic),

Просмотреть файл

@ -92,6 +92,10 @@ static size_t rd_kafka_msghdr_size (const struct msghdr *msg) {
return tot;
}
/**
* Locks: rd_kafka_broker_lock() MUST be held.
* Locality: broker thread
*/
static void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb,
int state) {
if (rkb->rkb_state == state)
@ -829,15 +833,22 @@ static void rd_kafka_broker_metadata_req (rd_kafka_broker_t *rkb,
/**
* Locks: rd_kafka_lock(rk) MUST be held.
* Locality: any thread
*/
static rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state) {
rd_kafka_broker_t *rkb;
/* FIXME: lock */
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link)
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_kafka_broker_lock(rkb);
if (rkb->rkb_state == state) {
rd_kafka_broker_keep(rkb);
rd_kafka_broker_unlock(rkb);
return rkb;
}
rd_kafka_broker_unlock(rkb);
}
return NULL;
}
@ -1133,7 +1144,9 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
return -1;
}
rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECTING);
rd_kafka_broker_unlock(rkb);
if (connect(rkb->rkb_s, (struct sockaddr *)sinx,
RD_SOCKADDR_INX_LEN(sinx)) == -1) {
@ -1161,8 +1174,10 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
rd_rkb_dbg(rkb, BROKER, "CONNECTED", "connected to %s",
rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE));
rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
rkb->rkb_err.err = 0;
rd_kafka_broker_unlock(rkb);
rkb->rkb_pfd.fd = rkb->rkb_s;
rkb->rkb_pfd.events = EPOLLIN;