Trigger RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN error when all brokers are down (issue #64)

This commit is contained in:
Magnus Edenhill 2014-01-26 20:04:27 +07:00
Родитель d3486e654a
Коммит 045c559d9d
4 изменённых файлов: 27 добавлений и 10 удалений

Просмотреть файл

@ -487,6 +487,8 @@ const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
return "Local: File or filesystem error"; return "Local: File or filesystem error";
case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
return "Local: Unknown topic"; return "Local: Unknown topic";
case RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN:
return "Local: All broker connections are down";
case RD_KAFKA_RESP_ERR_UNKNOWN: case RD_KAFKA_RESP_ERR_UNKNOWN:
return "Unknown error"; return "Unknown error";

Просмотреть файл

@ -114,6 +114,9 @@ typedef enum {
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188, /* Permanent: RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188, /* Permanent:
* Topic does not exist * Topic does not exist
* in cluster. */ * in cluster. */
RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN = -187, /* All broker connections
* are down. */
RD_KAFKA_RESP_ERR__END = -100, /* end internal error codes */ RD_KAFKA_RESP_ERR__END = -100, /* end internal error codes */
/* Standard Kafka errors: */ /* Standard Kafka errors: */

Просмотреть файл

@ -52,8 +52,8 @@
#include "endian_compat.h" #include "endian_compat.h"
const char *rd_kafka_broker_state_names[] = { const char *rd_kafka_broker_state_names[] = {
"INIT",
"DOWN", "DOWN",
"CONNECTING",
"UP" "UP"
}; };
@ -106,11 +106,22 @@ static void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb,
return; return;
rd_kafka_dbg(rkb->rkb_rk, BROKER, "STATE", rd_kafka_dbg(rkb->rkb_rk, BROKER, "STATE",
"%s: Broker changed state %s -> %s", "%s: Broker changed state %s -> %s",
rkb->rkb_name, rkb->rkb_name,
rd_kafka_broker_state_names[rkb->rkb_state], rd_kafka_broker_state_names[rkb->rkb_state],
rd_kafka_broker_state_names[state]); rd_kafka_broker_state_names[state]);
if (state == RD_KAFKA_BROKER_STATE_DOWN) {
if (rd_atomic_add(&rkb->rkb_rk->rk_broker_down_cnt, 1) ==
rkb->rkb_rk->rk_broker_cnt)
rd_kafka_op_err(rkb->rkb_rk,
RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
"%i/%i brokers are down",
rkb->rkb_rk->rk_broker_down_cnt,
rkb->rkb_rk->rk_broker_cnt);
} else if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN)
rd_atomic_sub(&rkb->rkb_rk->rk_broker_down_cnt, 1);
rkb->rkb_state = state; rkb->rkb_state = state;
} }
@ -1301,10 +1312,6 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
strerror(errno)); strerror(errno));
#endif #endif
rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECTING);
rd_kafka_broker_unlock(rkb);
if (connect(rkb->rkb_s, (struct sockaddr *)sinx, if (connect(rkb->rkb_s, (struct sockaddr *)sinx,
RD_SOCKADDR_INX_LEN(sinx)) == -1) { RD_SOCKADDR_INX_LEN(sinx)) == -1) {
rd_rkb_dbg(rkb, BROKER, "CONNECT", rd_rkb_dbg(rkb, BROKER, "CONNECT",
@ -3219,6 +3226,10 @@ static void *rd_kafka_broker_thread_main (void *arg) {
while (!rkb->rkb_rk->rk_terminate) { while (!rkb->rkb_rk->rk_terminate) {
switch (rkb->rkb_state) switch (rkb->rkb_state)
{ {
case RD_KAFKA_BROKER_STATE_INIT:
/* The INIT state exists so that an initial connection
* failure triggers a state transition which might
* trigger a ALL_BROKERS_DOWN error. */
case RD_KAFKA_BROKER_STATE_DOWN: case RD_KAFKA_BROKER_STATE_DOWN:
/* ..connect() will block until done (or failure) */ /* ..connect() will block until done (or failure) */
if (rd_kafka_broker_connect(rkb) == -1) { if (rd_kafka_broker_connect(rkb) == -1) {
@ -3233,9 +3244,6 @@ static void *rd_kafka_broker_thread_main (void *arg) {
} }
break; break;
case RD_KAFKA_BROKER_STATE_CONNECTING:
break;
case RD_KAFKA_BROKER_STATE_UP: case RD_KAFKA_BROKER_STATE_UP:
if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA) if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA)
rd_kafka_broker_ua_idle(rkb); rd_kafka_broker_ua_idle(rkb);
@ -3250,6 +3258,7 @@ static void *rd_kafka_broker_thread_main (void *arg) {
rd_kafka_lock(rkb->rkb_rk); rd_kafka_lock(rkb->rkb_rk);
TAILQ_REMOVE(&rkb->rkb_rk->rk_brokers, rkb, rkb_link); TAILQ_REMOVE(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
rd_atomic_sub(&rkb->rkb_rk->rk_broker_cnt, 1);
rd_kafka_unlock(rkb->rkb_rk); rd_kafka_unlock(rkb->rkb_rk);
rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__DESTROY, NULL); rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__DESTROY, NULL);
rd_kafka_broker_destroy(rkb); rd_kafka_broker_destroy(rkb);
@ -3354,6 +3363,7 @@ static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
} }
TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link); TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
rd_atomic_add(&rkb->rkb_rk->rk_broker_cnt, 1);
rd_rkb_dbg(rkb, BROKER, "BROKER", rd_rkb_dbg(rkb, BROKER, "BROKER",
"Added new broker with NodeId %"PRId32, "Added new broker with NodeId %"PRId32,

Просмотреть файл

@ -415,8 +415,8 @@ typedef struct rd_kafka_broker_s {
pthread_rwlock_unlock(&(rkb)->rkb_toppar_lock) pthread_rwlock_unlock(&(rkb)->rkb_toppar_lock)
enum { enum {
RD_KAFKA_BROKER_STATE_INIT,
RD_KAFKA_BROKER_STATE_DOWN, RD_KAFKA_BROKER_STATE_DOWN,
RD_KAFKA_BROKER_STATE_CONNECTING,
RD_KAFKA_BROKER_STATE_UP, RD_KAFKA_BROKER_STATE_UP,
} rkb_state; } rkb_state;
@ -586,6 +586,8 @@ struct rd_kafka_s {
rd_kafka_q_t rk_rep; /* kafka -> application reply queue */ rd_kafka_q_t rk_rep; /* kafka -> application reply queue */
TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers; TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
int rk_broker_cnt; /* atomic */
int rk_broker_down_cnt; /* atomic */
TAILQ_HEAD(, rd_kafka_topic_s) rk_topics; TAILQ_HEAD(, rd_kafka_topic_s) rk_topics;
int rk_topic_cnt; int rk_topic_cnt;