Log OP_ERR right away, without need for poll, if no error_cb is set.

This commit is contained in:
Magnus Edenhill 2014-01-15 21:34:07 +07:00
Родитель 43fa3adf18
Коммит 1de5ed3c20
3 изменённых файлов: 41 добавлений и 10 удалений

Просмотреть файл

@ -99,6 +99,15 @@ static int pthread_cond_timedwait_ms (pthread_cond_t *cond,
}
void rd_kafka_log_buf (const rd_kafka_t *rk, int level,
const char *fac, const char *buf) {
if (!rk->rk_log_cb || level > rk->rk_log_level)
return;
rk->rk_log_cb(rk, level, fac, buf);
}
void rd_kafka_log0 (const rd_kafka_t *rk, const char *extra, int level,
const char *fac, const char *fmt, ...) {
char buf[2048];
@ -419,6 +428,27 @@ void rd_kafka_op_reply2 (rd_kafka_t *rk, rd_kafka_op_t *rko) {
}
/**
* Propogate an error event to the application.
* If no error_cb has been set by the application the error will
* be logged instead.
*/
void rd_kafka_op_err (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *fmt, ...) {
va_list ap;
char buf[2048];
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
if (rk->rk_conf.error_cb)
rd_kafka_op_reply(rk, RD_KAFKA_OP_ERR, err,
strdup(buf), strlen(buf));
else
rd_kafka_log_buf(rk, LOG_ERR, "ERROR", buf);
}
static const char *rd_kafka_type2str (rd_kafka_type_t type) {
static const char *types[] = {

Просмотреть файл

@ -340,10 +340,8 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
rd_rkb_log(rkb, LOG_ERR, "FAIL", "%s", rkb->rkb_err.msg);
/* Send ERR op back to application for processing. */
rd_kafka_op_reply(rkb->rkb_rk, RD_KAFKA_OP_ERR,
RD_KAFKA_RESP_ERR__FAIL,
strdup(rkb->rkb_err.msg),
strlen(rkb->rkb_err.msg));
rd_kafka_op_err(rkb->rkb_rk, RD_KAFKA_RESP_ERR__FAIL,
"%s", rkb->rkb_err.msg);
}
/*
@ -444,9 +442,8 @@ static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb) {
rkb->rkb_nodename, errstr);
/* Send ERR op back to application for processing. */
rd_kafka_op_reply(rkb->rkb_rk, RD_KAFKA_OP_ERR,
RD_KAFKA_RESP_ERR__RESOLVE,
strdup(tmp), strlen(tmp));
rd_kafka_op_err(rkb->rkb_rk,RD_KAFKA_RESP_ERR__RESOLVE,
"%s", tmp);
rd_rkb_log(rkb, LOG_ERR, "GETADDR", "%s", tmp);
return -1;
@ -3337,9 +3334,9 @@ static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
rd_kafka_log(rk, LOG_CRIT, "THREAD", "%s", tmp);
/* Send ERR op back to application for processing. */
rd_kafka_op_reply(rk, RD_KAFKA_OP_ERR,
RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
strdup(tmp), strlen(tmp));
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
"%s", tmp);
free(rkb);
rd_kafka_destroy(rk);
return NULL;

Просмотреть файл

@ -634,6 +634,8 @@ struct rd_kafka_s {
#define RD_KAFKA_DBG_ALL 0xff
void rd_kafka_log_buf (const rd_kafka_t *rk, int level,
const char *fac, const char *buf);
void rd_kafka_log0 (const rd_kafka_t *rk, const char *extra, int level,
const char *fac, const char *fmt, ...)
__attribute__((format (printf, 5, 6)));
@ -707,6 +709,8 @@ void rd_kafka_op_reply (rd_kafka_t *rk,
rd_kafka_op_type_t type,
rd_kafka_resp_err_t err,
void *payload, int len);
void rd_kafka_op_err (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *fmt, ...);
#define rd_kafka_keep(rk) (void)rd_atomic_add(&(rk)->rk_refcnt, 1)
void rd_kafka_destroy0 (rd_kafka_t *rk);