2018-12-03 00:17:45 +03:00
|
|
|
var kafka = require('kafka-node');
|
|
|
|
const kclient = new kafka.Client("localhost:2181", "my-client-id", {
|
|
|
|
sessionTimeout: 300,
|
|
|
|
spinDelay: 100,
|
|
|
|
retries: 2
|
|
|
|
});
|
|
|
|
|
|
|
|
const producer = new kafka.HighLevelProducer(kclient);
|
|
|
|
producer.on("ready", function() {
|
|
|
|
console.log("Kafka Producer is connected and ready.");
|
|
|
|
});
|
|
|
|
|
|
|
|
// For this demo we just log producer errors to the console.
|
|
|
|
producer.on("error", function(error) {
|
|
|
|
console.error(error);
|
|
|
|
});
|
|
|
|
|
|
|
|
const KafkaService = {
|
|
|
|
sendRecord: ({ data}, callback = () => {}) => {
|
|
|
|
const event = {
|
|
|
|
timestamp: new Date(),
|
|
|
|
data: data
|
|
|
|
};
|
|
|
|
|
|
|
|
const buffer = new Buffer.from(JSON.stringify(event));
|
2019-02-03 03:13:42 +03:00
|
|
|
//console.log(event);
|
2018-12-03 00:17:45 +03:00
|
|
|
// Create a new payload
|
|
|
|
const record = [
|
|
|
|
{
|
|
|
|
topic: "streaming-data",
|
|
|
|
messages: buffer,
|
|
|
|
attributes: 1 /* Use GZip compression for the payload */
|
|
|
|
}
|
|
|
|
];
|
|
|
|
|
|
|
|
//Send record to Kafka and log result/error
|
|
|
|
producer.send(record, callback);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
var count = 0
|
|
|
|
setInterval(() => {
|
2018-12-04 03:27:41 +03:00
|
|
|
KafkaService.sendRecord({data: Math.random()});
|
2018-12-03 00:17:45 +03:00
|
|
|
count = count + 1;
|
|
|
|
|
2019-02-03 03:13:42 +03:00
|
|
|
}, 1000);
|