This commit is contained in:
Magnus Edenhill 2013-06-03 09:00:14 +02:00
Родитель 2103defc23 c4e2479e79
Коммит 52440eaeee
8 изменённых файлов: 328 добавлений и 14 удалений

Просмотреть файл

@ -23,7 +23,7 @@ ifndef WITH_LIBRD
SRCS+=rdcrc32.c rdgz.c rdaddr.c rdrand.c rdfile.c SRCS+=rdcrc32.c rdgz.c rdaddr.c rdrand.c rdfile.c
endif endif
HDRS= rdkafka.h HDRS= rdkafka.h rdkafkacpp.h rdtypes.h rd.h rdaddr.h
OBJS= $(SRCS:.c=.o) OBJS= $(SRCS:.c=.o)
DEPS= ${OBJS:%.o=%.d} DEPS= ${OBJS:%.o=%.d}

1
examples/.gitignore поставляемый
Просмотреть файл

@ -1,3 +1,4 @@
rdkafka_example rdkafka_example
rdkafka_performance rdkafka_performance
rdkafka_example_cpp

Просмотреть файл

@ -1,6 +1,8 @@
CC ?= cc CC ?= cc
CXX ?= g++
CFLAGS += -g CFLAGS += -g
CFLAGS += -Wall -Werror -Wfloat-equal -Wpointer-arith -O2 -I../ CFLAGS += -Wall -Werror -Wfloat-equal -Wpointer-arith -O2 -I../
CXXFLAGS += $(CFLAGS)
LDFLAGS += ../librdkafka.a LDFLAGS += ../librdkafka.a
LDFLAGS += -lpthread -lrt -lz LDFLAGS += -lpthread -lrt -lz
@ -11,6 +13,7 @@ LDFLAGS += -lpthread -lrt -lz
all: all:
@echo "# Examples are built individually, i.e.:" @echo "# Examples are built individually, i.e.:"
@echo " make rdkafka_example" @echo " make rdkafka_example"
@echo " make rdkafka_example_cpp"
@echo " make rdkafka_performance" @echo " make rdkafka_performance"
@ -29,6 +32,18 @@ rdkafka_example: rdkafka_example.c
@echo "# More usage options:" @echo "# More usage options:"
@echo "./rdkafka_example --help" @echo "./rdkafka_example --help"
rdkafka_example_cpp: rdkafka_example.cpp
@(test $@ -nt $< || \
$(CXX) $(CXXFLAGS) $< -o $@ $(LDFLAGS))
@echo "# $@ is ready"
@echo "#"
@echo "# Run producer (write messages on stdin)"
@echo "./$@ -P -t <topic> -p <partition>"
@echo ""
@echo "#"
@echo "# More usage options:"
@echo "./$@ --help"
rdkafka_performance: rdkafka_performance.c rdkafka_performance: rdkafka_performance.c
@(test $@ -nt $< || \ @(test $@ -nt $< || \

Просмотреть файл

@ -0,0 +1,208 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka C++ consumer & producer example programs
* using the C++ Kafka driver wrapper from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <ctype.h>
#include <signal.h>
#include <string.h>
/* Typical include path would be <librdkafka/rdkafkah>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "../c++/rdkafka.h" /* for Kafka driver */
using rdkafka::Kafka;
static int run = 1;
static void stop (int sig) {
run = 0;
}
int main (int argc, char **argv) {
Kafka kafka;
char *broker = NULL;
char mode = 'C';
char *topic = NULL;
int partition = 0;
int opt;
while ((opt = getopt(argc, argv, "Pt:p:b:")) != -1) {
switch (opt) {
case 'P':
mode = opt;
break;
case 't':
topic = optarg;
break;
case 'p':
partition = atoi(optarg);
break;
case 'b':
broker = optarg;
break;
default:
goto usage;
}
}
if (!topic || optind != argc) {
usage:
fprintf(stderr,
"Usage: %s "/*[-C|*/"-P] -t <topic> "
"[-p <partition>] [-b <broker>]\n"
"\n"
" Options:\n"
//" -C | -P Consumer or Producer mode\n"
" -P Producer mode\n"
" -t <topic> Topic to fetch / produce\n"
" -p <num> Partition (defaults to 0)\n"
" -b <broker> Broker address (localhost:9092)\n"
"\n"
//" In Consumer mode:\n"
//" writes fetched messages to stdout\n"
" In Producer mode:\n"
" reads messages from stdin and sends to broker\n"
"\n",
argv[0]);
exit(1);
}
signal(SIGINT, stop);
/* Socket hangups are gracefully handled in librdkafka on socket error
* without the use of signals, so SIGPIPE should be ignored by the calling
* program. */
signal(SIGPIPE, SIG_IGN);
if (mode == 'P') {
/*
* Producer
*/
char buf[2048];
int sendcnt = 0;
/* Create Kafka handle */
if (!(kafka.setHandle(RD_KAFKA_PRODUCER, broker, NULL))) {
perror("kafka_new producer");
exit(1);
}
fprintf(stderr, "%% Type stuff and hit enter to send\n");
while (run && (fgets(buf, sizeof(buf), stdin))) {
int len = strlen(buf);
char * opbuf = (char *)malloc(len + 1);
strncpy(opbuf, buf, len + 1);
/* Send/Produce message. */
kafka.produce(topic, partition, RD_KAFKA_OP_F_FREE, opbuf, len);
fprintf(stderr, "%% Sent %i bytes to topic "
"%s partition %i\n", len, topic, partition);
sendcnt++;
}
}
#if 0 // Still not implemented
else if (mode == 'C') {
/*
* Consumer
*/
rd_kafka_op_t *rko;
/* Base our configuration on the default config. */
rd_kafka_conf_t conf = rd_kafka_defaultconf;
/* The offset storage file is optional but its presence
* avoids starting all over from offset 0 again when
* the program restarts.
* ZooKeeper functionality will be implemented in future
* versions and then the offset will be stored there instead. */
conf.consumer.offset_file = "."; /* current directory */
/* Indicate to rdkafka that the application is responsible
* for storing the offset. This allows the application to
* succesfully handle a message before storing the offset.
* If this flag is not set rdkafka will store the offset
* just prior to returning the message from rd_kafka_consume().
*/
conf.flags |= RD_KAFKA_CONF_F_APP_OFFSET_STORE;
/* Use the consumer convenience function
* to create a Kafka handle. */
if (!(rk = rd_kafka_new_consumer(broker, topic,
(uint32_t)partition,
0, &conf))) {
perror("kafka_new_consumer");
exit(1);
}
while (run) {
/* Fetch an "op" which is one of:
* - a kafka message (if rko_len>0 && rko_err==0)
* - an error (if rko_err)
*/
if (!(rko = rd_kafka_consume(rk, 1000/*timeout ms*/)))
continue;
if (rko->rko_err)
fprintf(stderr, "%% Error: %.*s\n",
rko->rko_len, rko->rko_payload);
else if (rko->rko_len) {
fprintf(stderr, "%% Message with "
"next-offset %"PRIu64" is %i bytes\n",
rko->rko_offset, rko->rko_len);
hexdump(stdout, "Message",
rko->rko_payload, rko->rko_len);
}
/* rko_offset contains the offset of the _next_
* message. We store it when we're done processing
* the current message. */
if (rko->rko_offset)
rd_kafka_offset_store(rk, rko->rko_offset);
/* Destroy the op */
rd_kafka_op_destroy(rk, rko);
}
/* Destroy the handle */
rd_kafka_destroy(rk);
}
#endif
return 0;
}

Просмотреть файл

@ -163,14 +163,14 @@ void rd_sockaddr_list_destroy (rd_sockaddr_list_t *rsal);
*/ */
static const char *rd_family2str (int af) RD_UNUSED; static const char *rd_family2str (int af) RD_UNUSED;
static const char *rd_family2str (int af) { static const char *rd_family2str (int af) {
static const char *names[] = { switch(af){
[AF_LOCAL] = "local", case AF_LOCAL:
[AF_INET] = "inet", return "local";
[AF_INET6] = "inet6", case AF_INET:
return "inet";
case AF_INET6:
return "inet6";
default:
return "af?";
}; };
if (unlikely(af < 0 || af >= RD_ARRAYSIZE(names) || !names[af]))
return "af?";
return names[af];
} }

Просмотреть файл

@ -1228,11 +1228,17 @@ rd_kafka_op_t *rd_kafka_consume (rd_kafka_t *rk, int timeout_ms) {
* *
* Locality: application thread * Locality: application thread
*/ */
void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition, int rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
int msgflags, int msgflags,
char *payload, size_t len) { char *payload, size_t len) {
rd_kafka_op_t *rko; rd_kafka_op_t *rko;
if (rk->rk_conf.producer.max_outq_msg_cnt &&
rk->rk_op.rkq_qlen >= rk->rk_conf.producer.max_outq_msg_cnt) {
errno = ENOBUFS;
return -1;
}
rko = calloc(1, sizeof(*rko)); rko = calloc(1, sizeof(*rko));
rko->rko_type = RD_KAFKA_OP_PRODUCE; rko->rko_type = RD_KAFKA_OP_PRODUCE;
@ -1243,6 +1249,8 @@ void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
rko->rko_len = len; rko->rko_len = len;
rd_kafka_q_enq(&rk->rk_op, rko); rd_kafka_q_enq(&rk->rk_op, rko);
return 0;
} }

Просмотреть файл

@ -140,6 +140,16 @@ typedef struct rd_kafka_conf_s {
} consumer; } consumer;
struct {
int max_outq_msg_cnt; /* Maximum number of messages allowed
* in the output queue.
* If this number is exceeded the
* rd_kafka_produce() call will
* return with -1 and errno
* set to ENOBUFS. */
} producer;
} rd_kafka_conf_t; } rd_kafka_conf_t;
@ -337,9 +347,15 @@ int rd_kafka_offset_store (rd_kafka_t *rk, uint64_t offset);
* done with the payload, so the control of freeing the payload must be left * done with the payload, so the control of freeing the payload must be left
* to librdkafka as described in alternative 2) above. * to librdkafka as described in alternative 2) above.
* *
*
* Returns 0 on success or -1 on error (see errno for details)
*
* errno:
* ENOBUFS - The conf.producer.max_outq_msg_cnt would be exceeded.
*
* Locality: application thread * Locality: application thread
*/ */
void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition, int rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
int msgflags, char *payload, size_t len); int msgflags, char *payload, size_t len);
/** /**

66
rdkafkacpp.h Normal file
Просмотреть файл

@ -0,0 +1,66 @@
// rdkafka.h
#ifndef RD_KAFKACPP_H
#define RD_KAFKACPP_H
extern "C"{
#include "../rdkafka.h"
}
namespace rdkafka{
class Kafka{
public:
/// @warning Make sure SIGPIPE is either ignored or handled by the calling application before.
/// Make sure test if handle has created successfuly using hasHandle().
inline Kafka():rk(NULL){};
inline ~Kafka();
/** Socket hangups are gracefully handled in librdkafka on socket error
* without the use of signals, so SIGPIPE should be ignored by the calling
* program. */
static void ignore_sigpipe(){signal(SIGPIPE, SIG_IGN);}
bool setHandle(rd_kafka_type_t type,const char * broker, const rd_kafka_conf_t *conf);
bool hasHandle(){return rk!=NULL;}
/*
void setTopic(const char * newtopic);
const char * getTopic()const{return topic;}
*/
void produce(char *topic, uint32_t partition,int msgflags, char *payload, size_t len);
private:
rd_kafka_t *rk;
};
Kafka::~Kafka(){
if(rk){
/* Wait for messaging to finish. */
while (rd_kafka_outq_len(rk) > 0)
usleep(50000);
/* Since there is no ack for produce messages in 0.7
* we wait some more for any packets to be sent.
* This is fixed in protocol version 0.8 */
usleep(500000);
/* Destroy the handle */
rd_kafka_destroy(rk);
}
}
bool Kafka::setHandle(rd_kafka_type_t type,const char * broker,const rd_kafka_conf_t *conf){
return (rk = rd_kafka_new(type, broker, conf))!=NULL;
}
void Kafka::produce(char *topic, uint32_t partition,int msgflags, char *payload, size_t len){
rd_kafka_produce(rk, topic, partition, msgflags, payload, len);
}
}
#endif // RD_KAFKACPP_H