From a28689cff31f25a1a4b95101a37577f33bbbd666 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Fri, 6 Sep 2013 00:00:57 +0200 Subject: [PATCH] Added snappy and gzip compression support to the producer --- README.md | 2 +- examples/rdkafka_example.c | 13 +- examples/rdkafka_performance.c | 27 +++- rdkafka_broker.c | 224 +++++++++++++++++++++++++++++++-- rdkafka_int.h | 5 +- 5 files changed, 248 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index df372ac4..43af4fbf 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ reliability in mind. * Branch: master * Producer: supported * Consumer: not yet implemented - * Compression: work in progress + * Compression: snappy and gzip * Debian package: work in progress (separate "debian" branch) * ZooKeeper: not supported * API: not backwards compatible diff --git a/examples/rdkafka_example.c b/examples/rdkafka_example.c index eae33e27..cf723124 100644 --- a/examples/rdkafka_example.c +++ b/examples/rdkafka_example.c @@ -136,7 +136,7 @@ int main (int argc, char **argv) { rd_kafka_topic_defaultconf_set(&topic_conf); - while ((opt = getopt(argc, argv, "PCt:p:b:d:")) != -1) { + while ((opt = getopt(argc, argv, "PCt:p:b:z:d:")) != -1) { switch (opt) { case 'P': case 'C': @@ -151,6 +151,15 @@ int main (int argc, char **argv) { case 'b': brokers = optarg; break; + case 'z': + if (rd_kafka_conf_set(&conf, "compression.codec", + optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + break; case 'd': debug = optarg; break; @@ -170,6 +179,8 @@ int main (int argc, char **argv) { " -t Topic to fetch / produce\n" " -p Partition (random partitioner)\n" " -b Broker address (localhost:9092)\n" + " -z Enable compression:\n" + " none|gzip|snappy\n" " -d [facs..] Enable debugging contexts:\n" " %s\n" "\n" diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index ddade272..53dc6fe2 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -149,7 +149,12 @@ int main (int argc, char **argv) { int seed = time(NULL); rd_kafka_conf_t conf; rd_kafka_topic_conf_t topic_conf; - + static const char *compression_names[] = { + "no", + "gzip", + "snappy" + }; + /* Kafka configuration * Base configuration on the default config. */ rd_kafka_defaultconf_set(&conf); @@ -166,7 +171,7 @@ int main (int argc, char **argv) { topic_conf.message_timeout_ms = 5000; while ((opt = getopt(argc, argv, - "PCt:p:b:s:k:c:fi:Dd:m:S:x:R:a:")) != -1) { + "PCt:p:b:s:k:c:fi:Dd:m:S:x:R:a:z:")) != -1) { switch (opt) { case 'P': case 'C': @@ -212,6 +217,15 @@ int main (int argc, char **argv) { case 'a': topic_conf.required_acks = atoi(optarg); break; + case 'z': + if (rd_kafka_conf_set(&conf, "compression.codec", + optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + break; case 'd': debug = optarg; break; @@ -242,6 +256,8 @@ int main (int argc, char **argv) { " -R Random seed value (defaults to time)\n" " -a Required acks (producer): " "-1, 0, 1, >1\n" + " -z Enable compression:\n" + " none|gzip|snappy\n" " -d [facs..] Enable debugging contexts:\n" " %s\n" "\n" @@ -537,14 +553,15 @@ int main (int argc, char **argv) { printf("Result:\n"); if (cnt.t_total > 0) { printf("%% %"PRIu64" messages and %"PRIu64" bytes " - "%s in %"PRIu64"ms: %"PRIu64" msgs/s and %.02f Mb, " - "%i messages failed\n", + "%s in %"PRIu64"ms: %"PRIu64" msgs/s and %.02f Mb/s, " + "%i messages failed, %s compression\n", cnt.msgs, cnt.bytes, dirstr, cnt.t_total / 1000, ((cnt.msgs * 1000000) / cnt.t_total), (float)((cnt.bytes) / (float)cnt.t_total), - msgs_failed); + msgs_failed, + compression_names[conf.producer.compression_codec]); } if (cnt.t_latency) diff --git a/rdkafka_broker.c b/rdkafka_broker.c index 57da427f..0111a7b5 100644 --- a/rdkafka_broker.c +++ b/rdkafka_broker.c @@ -36,6 +36,8 @@ #include #include +#include + #include "rd.h" #include "rdkafka_int.h" #include "rdkafka_msg.h" @@ -45,7 +47,7 @@ #include "rdthread.h" #include "rdcrc32.h" #include "rdrand.h" - +#include "snappy.h" const char *rd_kafka_broker_state_names[] = { "DOWN", @@ -120,21 +122,26 @@ static void rd_kafka_buf_destroy (rd_kafka_buf_t *rkbuf) { free(rkbuf); } -static struct iovec *rd_kafka_buf_iov_next (rd_kafka_buf_t *rkbuf) { - - assert(rkbuf->rkbuf_msg.msg_iovlen + 1 <= rkbuf->rkbuf_iovcnt); - if (rkbuf->rkbuf_msg.msg_iovlen > 0) - rkbuf->rkbuf_iovof += - rkbuf->rkbuf_iov[rkbuf->rkbuf_msg.msg_iovlen].iov_len; - return &rkbuf->rkbuf_iov[rkbuf->rkbuf_msg.msg_iovlen++]; +static void rd_kafka_buf_auxbuf_add (rd_kafka_buf_t *rkbuf, void *auxbuf) { + assert(rkbuf->rkbuf_buf2 == NULL); + rkbuf->rkbuf_buf2 = auxbuf; +} + +static void rd_kafka_buf_rewind (rd_kafka_buf_t *rkbuf, int iovindex) { + rkbuf->rkbuf_msg.msg_iovlen = iovindex; } +static struct iovec *rd_kafka_buf_iov_next (rd_kafka_buf_t *rkbuf) { + + assert(rkbuf->rkbuf_msg.msg_iovlen + 1 <= rkbuf->rkbuf_iovcnt); + return &rkbuf->rkbuf_iov[rkbuf->rkbuf_msg.msg_iovlen++]; +} + /** * Pushes 'buf' & 'len' onto the previously allocated iov stack for 'rkbuf'. - * Returns the offset for the beginning of the pushed iovec. */ -static int rd_kafka_buf_push (rd_kafka_buf_t *rkbuf, int flags, +static void rd_kafka_buf_push (rd_kafka_buf_t *rkbuf, int flags, void *buf, size_t len) { struct iovec *iov; @@ -143,8 +150,6 @@ static int rd_kafka_buf_push (rd_kafka_buf_t *rkbuf, int flags, /* FIXME: store and adhere to flags */ iov->iov_base = buf; iov->iov_len = len; - - return rkbuf->rkbuf_iovof; } @@ -1427,7 +1432,7 @@ static int rd_kafka_broker_send_toppar (rd_kafka_broker_t *rkb, } __attribute__((packed)) part4; } *msghdr; int iovcnt; - + int iov_firstmsg; /* iovs: * 1 * RequiredAcks + Timeout (part1) @@ -1494,6 +1499,8 @@ static int rd_kafka_broker_send_toppar (rd_kafka_broker_t *rkb, rd_kafka_buf_push(rkbuf, RD_KAFKA_OP_F_NOCOPY, &prodhdr->part2, sizeof(prodhdr->part2)); + iov_firstmsg = rkbuf->rkbuf_msg.msg_iovlen; + while (msgcnt > 0 && (rkm = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs))) { @@ -1590,6 +1597,197 @@ static int rd_kafka_broker_send_toppar (rd_kafka_broker_t *rkb, msghdr++; } + /* Compress the messages */ + if (rkb->rkb_rk->rk_conf.producer.compression_codec) { + int siovlen = 1; + size_t coutlen; + int r; + struct { + int64_t Offset; + int32_t MessageSize; + int32_t Crc; + int8_t MagicByte; + int8_t Attributes; + int32_t Key_len; /* -1 */ + int32_t Value_len; + } RD_PACKED *msghdr2 = NULL; + int32_t ctotlen; + struct snappy_env senv; + struct iovec siov; + z_stream strm; + int i; + + switch (rkb->rkb_rk->rk_conf.producer.compression_codec) { + case RD_KAFKA_COMPRESSION_NONE: + assert(!*"unreachable"); + break; + case RD_KAFKA_COMPRESSION_GZIP: + memset(&strm, 0, sizeof(strm)); + /* Initialize gzip compression */ + r = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, + Z_DEFLATED, 15+16, + 8, Z_DEFAULT_STRATEGY); + if (r != Z_OK) { + rd_rkb_log(rkb, LOG_ERR, "SNAPPY", + "Failed to initialize gzip for " + "compressing %"PRId32" bytes in " + "topic %.*s [%"PRId32"]: %s (%i): " + "sending uncompressed", + prodhdr->part2.MessageSetSize, + RD_KAFKAP_STR_PR(rktp->rktp_rkt-> + rkt_topic), + rktp->rktp_partition, + strm.msg ? : "", r); + goto do_send; + } + + /* Calculate maximum compressed size and + * allocate an output buffer accordingly, being + * prefixed with the Message header. */ + siov.iov_len = deflateBound(&strm, + prodhdr->part2. + MessageSetSize); + msghdr2 = malloc(sizeof(*msghdr2) + siov.iov_len); + siov.iov_base = (void *)(msghdr2+1); + + strm.next_out = siov.iov_base; + strm.avail_out = siov.iov_len; + + /* Iterate through each message and compress it. */ + for (i = iov_firstmsg ; + i < rkbuf->rkbuf_msg.msg_iovlen ; i++) { + strm.next_in = rkbuf->rkbuf_msg. + msg_iov[i].iov_base; + strm.avail_in = rkbuf->rkbuf_msg. + msg_iov[i].iov_len; + + /* Compress message */ + if ((r = deflate(&strm, Z_NO_FLUSH) != Z_OK)) { + rd_rkb_log(rkb, LOG_ERR, "SNAPPY", + "Failed to gzip-compress" + "%zd bytes for " + "topic %.*s [%"PRId32"]: " + "%s (%i): " + "sending uncompressed", + rkbuf->rkbuf_msg.msg_iov[i]. + iov_len, + RD_KAFKAP_STR_PR(rktp-> + rktp_rkt-> + rkt_topic), + rktp->rktp_partition, + strm.msg ? : "", r); + deflateEnd(&strm); + free(msghdr2); + goto do_send; + } + + assert(strm.avail_in == 0); + } + + /* Finish the compression */ + if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) { + rd_rkb_log(rkb, LOG_ERR, "SNAPPY", + "Failed to finish gzip compression " + " of %"PRId32" bytes for " + "topic %.*s [%"PRId32"]: " + "%s (%i): " + "sending uncompressed", + prodhdr->part2.MessageSetSize, + RD_KAFKAP_STR_PR(rktp->rktp_rkt-> + rkt_topic), + rktp->rktp_partition, + strm.msg ? : "", r); + deflateEnd(&strm); + free(msghdr2); + goto do_send; + } + + coutlen = strm.total_out; + + /* Deinitialize compression */ + deflateEnd(&strm); + break; + + + case RD_KAFKA_COMPRESSION_SNAPPY: + /* Initialize snappy compression environment */ + snappy_init_env_sg(&senv, 1/*iov enable*/); + + /* Calculate maximum compressed size and + * allocate an output buffer accordingly, being + * prefixed with the Message header. */ + siov.iov_len = + snappy_max_compressed_length(prodhdr->part2. + MessageSetSize); + msghdr2 = malloc(sizeof(*msghdr2) + siov.iov_len); + siov.iov_base = (void *)(msghdr2+1); + + /* Compress each message */ + if ((r = snappy_compress_iov(&senv, + &rkbuf-> + rkbuf_iov[iov_firstmsg], + rkbuf->rkbuf_msg. + msg_iovlen - + iov_firstmsg, + prodhdr->part2. + MessageSetSize, + &siov, &siovlen, + &coutlen)) != 0) { + rd_rkb_log(rkb, LOG_ERR, "SNAPPY", + "Failed to snappy-compress " + "%"PRId32" bytes for " + "topic %.*s [%"PRId32"]: %s: " + "sending uncompressed", + prodhdr->part2.MessageSetSize, + RD_KAFKAP_STR_PR(rktp->rktp_rkt-> + rkt_topic), + rktp->rktp_partition, + strerror(-r)); + free(msghdr2); + goto do_send; + } + + /* Free snappy environment */ + snappy_free_env(&senv); + + } + + /* Create a new Message who's Value is the compressed data */ + ctotlen = sizeof(*msghdr2) + coutlen; + msghdr2->Offset = 0; + msghdr2->MessageSize = htonl(4+1+1+4+4 + coutlen); + msghdr2->MagicByte = 0; + msghdr2->Attributes = rkb->rkb_rk->rk_conf. + producer.compression_codec & 0x7; + msghdr2->Key_len = htonl(-1); + msghdr2->Value_len = htonl(coutlen); + msghdr2->Crc = rd_crc32_init(); + msghdr2->Crc = rd_crc32_update(msghdr2->Crc, + (void *)&msghdr2->MagicByte, + 1+1+4+4); + msghdr2->Crc = rd_crc32_update(msghdr2->Crc, + siov.iov_base, coutlen); + msghdr2->Crc = htonl(rd_crc32_finalize(msghdr2->Crc)); + + /* Update enveloping MessageSet's length. */ + prodhdr->part2.MessageSetSize = ctotlen; + + /* Rewind rkbuf to the pre-message checkpoint. + * This replaces all the original Messages with just the + * Message containing the compressed payload. */ + rd_kafka_buf_rewind(rkbuf, iov_firstmsg); + + /* Add allocated buffer as auxbuf to rkbuf so that + * it will get freed with the rkbuf */ + rd_kafka_buf_auxbuf_add(rkbuf, msghdr2); + + /* Push the new Message onto the buffer stack. */ + rd_kafka_buf_push(rkbuf, RD_KAFKA_OP_F_NOCOPY, + msghdr2, ctotlen); + } + +do_send: + rktp->rktp_c.tx_msgs += rkbuf->rkbuf_msgq.rkmq_msg_cnt; rktp->rktp_c.tx_bytes += prodhdr->part2.MessageSetSize; diff --git a/rdkafka_int.h b/rdkafka_int.h index 7a5434d0..9fb2b1ce 100644 --- a/rdkafka_int.h +++ b/rdkafka_int.h @@ -150,13 +150,12 @@ typedef struct rd_kafka_buf_s { struct msghdr rkbuf_msg; struct iovec *rkbuf_iov; int rkbuf_iovcnt; - size_t rkbuf_iovof; /* byte offset for beginning of current iov*/ size_t rkbuf_of; /* recv/send: byte offset */ size_t rkbuf_len; /* send: total length */ size_t rkbuf_size; /* allocated size */ - char *rkbuf_buf; - char *rkbuf_buf2; + char *rkbuf_buf; /* Main buffer */ + char *rkbuf_buf2; /* Aux buffer */ struct rd_kafkap_reqhdr rkbuf_reqhdr; struct rd_kafkap_reshdr rkbuf_reshdr;