Added rd_kafka_errno2err() for converting errno to proper rdkafka errors (issue #39)
This also changes the behaviour of rd_kafka_consume*() to set errno to ESRCH for unknown partitions (was ENOENT) to be in line with rd_kafka_produce() behaviour.
This commit is contained in:
Родитель
15f810ebc2
Коммит
7e0f492b82
37
rdkafka.c
37
rdkafka.c
|
@ -489,6 +489,12 @@ const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
|
|||
return "Local: Unknown topic";
|
||||
case RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN:
|
||||
return "Local: All broker connections are down";
|
||||
case RD_KAFKA_RESP_ERR__INVALID_ARG:
|
||||
return "Local: Invalid argument or configuration";
|
||||
case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
||||
return "Local: Timed out";
|
||||
case RD_KAFKA_RESP_ERR__QUEUE_FULL:
|
||||
return "Local: Queue full";
|
||||
|
||||
case RD_KAFKA_RESP_ERR_UNKNOWN:
|
||||
return "Unknown error";
|
||||
|
@ -525,8 +531,31 @@ const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
|
|||
}
|
||||
|
||||
|
||||
rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
|
||||
switch (errnox)
|
||||
{
|
||||
case EINVAL:
|
||||
return RD_KAFKA_RESP_ERR__INVALID_ARG;
|
||||
|
||||
case ENOENT:
|
||||
return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
|
||||
|
||||
case ESRCH:
|
||||
return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
|
||||
|
||||
case ETIMEDOUT:
|
||||
return RD_KAFKA_RESP_ERR__TIMED_OUT;
|
||||
|
||||
case EMSGSIZE:
|
||||
return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
|
||||
|
||||
case ENOBUFS:
|
||||
return RD_KAFKA_RESP_ERR__QUEUE_FULL;
|
||||
|
||||
default:
|
||||
return RD_KAFKA_RESP_ERR__FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void rd_kafka_destroy0 (rd_kafka_t *rk) {
|
||||
|
@ -974,7 +1003,7 @@ int rd_kafka_consume_stop (rd_kafka_topic_t *rkt, int32_t partition) {
|
|||
if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
|
||||
!(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
|
||||
rd_kafka_topic_unlock(rkt);
|
||||
errno = ENOENT;
|
||||
errno = ESRCH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1057,7 +1086,7 @@ ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *rkt, int32_t partition,
|
|||
|
||||
if (unlikely(!rktp)) {
|
||||
/* No such toppar known */
|
||||
errno = ENOENT;
|
||||
errno = ESRCH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1141,7 +1170,7 @@ int rd_kafka_consume_callback (rd_kafka_topic_t *rkt, int32_t partition,
|
|||
|
||||
if (unlikely(!rktp)) {
|
||||
/* No such toppar known */
|
||||
errno = ENOENT;
|
||||
errno = ESRCH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1171,7 +1200,7 @@ rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *rkt, int32_t partition,
|
|||
|
||||
if (unlikely(!rktp)) {
|
||||
/* No such toppar known */
|
||||
errno = ENOENT;
|
||||
errno = ESRCH;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
25
rdkafka.h
25
rdkafka.h
|
@ -116,7 +116,10 @@ typedef enum {
|
|||
* in cluster. */
|
||||
RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN = -187, /* All broker connections
|
||||
* are down. */
|
||||
|
||||
RD_KAFKA_RESP_ERR__INVALID_ARG = -186, /* Invalid argument, or
|
||||
* invalid configuration */
|
||||
RD_KAFKA_RESP_ERR__TIMED_OUT = -185, /* Operation timed out */
|
||||
RD_KAFKA_RESP_ERR__QUEUE_FULL = -184, /* Queue is full */
|
||||
RD_KAFKA_RESP_ERR__END = -100, /* end internal error codes */
|
||||
|
||||
/* Standard Kafka errors: */
|
||||
|
@ -143,7 +146,18 @@ typedef enum {
|
|||
const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Converts `errno` to a `rd_kafka_resp_err_t` error code
|
||||
* upon failure from the following functions:
|
||||
* - rd_kafka_topic_new()
|
||||
* - rd_kafka_consume_start()
|
||||
* - rd_kafka_consume_stop()
|
||||
* - rd_kafka_consume()
|
||||
* - rd_kafka_consume_batch()
|
||||
* - rd_kafka_consume_callback()
|
||||
* - rd_kafka_produce()
|
||||
*/
|
||||
rd_kafka_resp_err_t rd_kafka_errno2err (int errnox);
|
||||
|
||||
|
||||
|
||||
|
@ -738,11 +752,16 @@ rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *rkt,
|
|||
* Returns 0 on success or -1 on error in which case errno is set accordingly:
|
||||
* ENOBUFS - maximum number of outstanding messages has been reached:
|
||||
* "queue.buffering.max.message"
|
||||
* (RD_KAFKA_RESP_ERR__QUEUE_FULL)
|
||||
* EMSGSIZE - message is larger than configured max size:
|
||||
* "messages.max.bytes".
|
||||
* ENOENT - topic is unknown in the Kafka cluster.
|
||||
* (RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
|
||||
* ESRCH - requested 'partition' is unknown in the Kafka cluster.
|
||||
* (RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
|
||||
* ENOENT - topic is unknown in the Kafka cluster.
|
||||
* (RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
|
||||
*
|
||||
* NOTE: Use `rd_kafka_errno2err()` to convert `errno` to rdkafka error code.
|
||||
*/
|
||||
|
||||
#define RD_KAFKA_MSG_F_FREE 0x1 /* Delegate freeing of payload to rdkafka. */
|
||||
|
|
Загрузка…
Ссылка в новой задаче