From 747ae669f51938239b1ce0bab4cbb6d9d39cea45 Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Mon, 15 Sep 2014 21:33:32 +0000 Subject: [PATCH 1/7] Convert listen server to loadable module, defaulting to udp --- servers/udp.js | 8 ++++++++ stats.js | 28 +++++++++++++++++++++------- 2 files changed, 29 insertions(+), 7 deletions(-) create mode 100644 servers/udp.js diff --git a/servers/udp.js b/servers/udp.js new file mode 100644 index 0000000..b03bc32 --- /dev/null +++ b/servers/udp.js @@ -0,0 +1,8 @@ +var dgram = require('dgram'); + +exports.init = function(config, callback){ + var udp_version = config.address_ipv6 ? 'udp6' : 'udp4'; + var server = dgram.createSocket(udp_version, callback); + server.bind(config.port || 8125, config.address || undefined); + return true; +}; diff --git a/stats.js b/stats.js index bea7a07..3a615f7 100644 --- a/stats.js +++ b/stats.js @@ -1,7 +1,6 @@ /*jshint node:true, laxcomma:true */ -var dgram = require('dgram') - , util = require('util') +var util = require('util') , net = require('net') , config = require('./lib/config') , helpers = require('./lib/helpers') @@ -24,7 +23,7 @@ var sets = {}; var counter_rates = {}; var timer_data = {}; var pctThreshold = null; -var flushInterval, keyFlushInt, server, mgmtServer; +var flushInterval, keyFlushInt, serverLoaded, mgmtServer; var startup_time = Math.round(new Date().getTime() / 1000); var backendEvents = new events.EventEmitter(); var healthStatus = config.healthStatus || 'up'; @@ -46,6 +45,21 @@ function loadBackend(config, name) { } } +// Load and init the server from the servers/ directory. +function startServer(config, name, callback) { + var servermod = require(name); + + if (config.debug) { + l.log("Loading server: " + name, 'DEBUG'); + } + + var ret = servermod.init(config, callback); + if (!ret) { + l.log("Failed to load server: " + name); + process.exit(1); + } +} + // global for conf var conf; @@ -162,13 +176,14 @@ config.configFile(process.argv[2], function (config) { counters[bad_lines_seen] = 0; counters[packets_received] = 0; - if (server === undefined) { + if (!serverLoaded) { // key counting var keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0); - var udp_version = config.address_ipv6 ? 'udp6' : 'udp4'; - server = dgram.createSocket(udp_version, function (msg, rinfo) { + // The default server is UDP + var server = config.server || './servers/udp' + serverLoaded = startServer(config, server, function (msg, rinfo) { backendEvents.emit('packet', msg, rinfo); counters[packets_received]++; var packet_data = msg.toString(); @@ -355,7 +370,6 @@ config.configFile(process.argv[2], function (config) { }); }); - server.bind(config.port || 8125, config.address || undefined); mgmtServer.listen(config.mgmt_port || 8126, config.mgmt_address || undefined); util.log("server is up"); From a6bc4d69250218dd6184058484bcaf128e57c748 Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Mon, 15 Sep 2014 21:33:54 +0000 Subject: [PATCH 2/7] Add tcp loadable server module --- servers/tcp.js | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 servers/tcp.js diff --git a/servers/tcp.js b/servers/tcp.js new file mode 100644 index 0000000..e094a32 --- /dev/null +++ b/servers/tcp.js @@ -0,0 +1,16 @@ +var net = require('net'); + +exports.init = function(config, callback){ + var server = net.createServer(function(stream) { + stream.setEncoding('ascii'); + + stream.on('data', function(data) { + var rinfo = stream.address(); + rinfo.size = data.length; + callback(data, rinfo); + }); + }); + + server.listen(config.port || 8125, config.address || undefined); + return true; +}; From 930011b52225c495d38e0546a0e510b675805642 Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Mon, 15 Sep 2014 23:28:40 +0000 Subject: [PATCH 3/7] Represent remote address in tcp server rinfo, not local --- servers/tcp.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/servers/tcp.js b/servers/tcp.js index e094a32..c1ee3af 100644 --- a/servers/tcp.js +++ b/servers/tcp.js @@ -1,13 +1,18 @@ var net = require('net'); +function rinfo(tcpstream, data) { + this.address = tcpstream.remoteAddress; + this.port = tcpstream.remotePort; + this.family = tcpstream.address().family; + this.size = data.length; +} + exports.init = function(config, callback){ var server = net.createServer(function(stream) { stream.setEncoding('ascii'); stream.on('data', function(data) { - var rinfo = stream.address(); - rinfo.size = data.length; - callback(data, rinfo); + callback(data, new rinfo(stream, data)); }); }); From 42d31f504a33e1378a20c5ca27273bb130cd6f3a Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Tue, 16 Sep 2014 00:26:47 +0000 Subject: [PATCH 4/7] Rename server init to 'start' to reflect reality --- servers/tcp.js | 2 +- servers/udp.js | 2 +- stats.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/servers/tcp.js b/servers/tcp.js index c1ee3af..3c2e651 100644 --- a/servers/tcp.js +++ b/servers/tcp.js @@ -7,7 +7,7 @@ function rinfo(tcpstream, data) { this.size = data.length; } -exports.init = function(config, callback){ +exports.start = function(config, callback){ var server = net.createServer(function(stream) { stream.setEncoding('ascii'); diff --git a/servers/udp.js b/servers/udp.js index b03bc32..a0a3072 100644 --- a/servers/udp.js +++ b/servers/udp.js @@ -1,6 +1,6 @@ var dgram = require('dgram'); -exports.init = function(config, callback){ +exports.start = function(config, callback){ var udp_version = config.address_ipv6 ? 'udp6' : 'udp4'; var server = dgram.createSocket(udp_version, callback); server.bind(config.port || 8125, config.address || undefined); diff --git a/stats.js b/stats.js index 3a615f7..280f3eb 100644 --- a/stats.js +++ b/stats.js @@ -53,7 +53,7 @@ function startServer(config, name, callback) { l.log("Loading server: " + name, 'DEBUG'); } - var ret = servermod.init(config, callback); + var ret = servermod.start(config, callback); if (!ret) { l.log("Failed to load server: " + name); process.exit(1); From 8e85af7b873149f17b6d2ed2fde02b8eccd5348a Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Tue, 16 Sep 2014 00:27:54 +0000 Subject: [PATCH 5/7] Add basic test for each of udp and tcp server module --- test/server_tests.js | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 test/server_tests.js diff --git a/test/server_tests.js b/test/server_tests.js new file mode 100644 index 0000000..b426ba9 --- /dev/null +++ b/test/server_tests.js @@ -0,0 +1,42 @@ +var dgram = require('dgram'), + net = require('net'); + +var config = { + address: '127.0.0.1', + port: 8125 +}; +var msg = "This is a test\r\n"; + +module.exports = { + udp_data_received: function(test) { + test.expect(3); + var server = require('../servers/udp'); + var started = server.start(config, function(data, rinfo) { + test.equal(msg, data.toString()); + test.equal(msg.length, rinfo.size); + test.done(); + }); + test.ok(started); + + var buf = new Buffer(msg); + var sock = dgram.createSocket('udp4'); + sock.send(buf, 0, buf.length, config.port, config.address, function(err, bytes) { + sock.close(); + }); + }, + tcp_data_received: function(test) { + test.expect(3); + var server = require('../servers/tcp'); + var started = server.start(config, function(data, rinfo) { + test.equal(msg, data.toString()); + test.equal(msg.length, rinfo.size); + test.done(); + }); + test.ok(started); + + var client = net.connect(config.port, config.address, function() { + client.write(msg); + client.end(); + }); + } +} From 987116d22cb87b322ad20b168343a0f3ec1d7758 Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Tue, 16 Sep 2014 00:35:26 +0000 Subject: [PATCH 6/7] Update docs and example config to reflect new tcp server option --- README.md | 9 +++++---- exampleConfig.js | 8 ++++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9f14b96..74b129c 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,8 @@ StatsD [![Build Status][travis-ci_status_img]][travis-ci_statsd] ====== A network daemon that runs on the [Node.js][node] platform and -listens for statistics, like counters and timers, sent over [UDP][udp] -and sends aggregates to one or more pluggable backend services (e.g., +listens for statistics, like counters and timers, sent over [UDP][udp] or +[TCP][tcp] and sends aggregates to one or more pluggable backend services (e.g., [Graphite][graphite]). We ([Etsy][etsy]) [blogged][blog post] about how it works and why we created it. @@ -46,12 +46,12 @@ Installation and Configuration Usage ------- -The basic line protocol expects metrics to be sent via UDP in the format: +The basic line protocol expects metrics to be sent in the format: :| So the simplest way to send in metrics from your command line if you have -StatsD running on localhost would be: +StatsD running with the default UDP server on localhost would be: echo "foo:1|c" | nc -u -w0 127.0.0.1 8125 @@ -104,6 +104,7 @@ Meta [counting-timing]: http://code.flickr.com/blog/2008/10/27/counting-timing/ [Flicker-StatsD]: https://github.com/iamcal/Flickr-StatsD [udp]: http://en.wikipedia.org/wiki/User_Datagram_Protocol +[tcp]: http://en.wikipedia.org/wiki/Transmission_Control_Protocol [docs_metric_types]: https://github.com/etsy/statsd/blob/master/docs/metric_types.md [docs_graphite]: https://github.com/etsy/statsd/blob/master/docs/graphite.md [docs_backend]: https://github.com/etsy/statsd/blob/master/docs/backend.md diff --git a/exampleConfig.js b/exampleConfig.js index 174dd09..b0053a5 100644 --- a/exampleConfig.js +++ b/exampleConfig.js @@ -15,10 +15,14 @@ Optional Variables: the default graphite backend will be loaded. * example for console and graphite: [ "./backends/console", "./backends/graphite" ] + server: the server to load. The server must exist by name in the directory + servers/. If not specified, the default udp server will be loaded. + * example for tcp server: + "./servers/tcp" debug: debug flag [default: false] - address: address to listen on over UDP [default: 0.0.0.0] + address: address to listen on [default: 0.0.0.0] address_ipv6: defines if the address is an IPv4 or IPv6 address [true or false, default: false] - port: port to listen for messages on over UDP [default: 8125] + port: port to listen for messages on [default: 8125] mgmt_address: address to run the management TCP interface on [default: 0.0.0.0] mgmt_port: port to run the management TCP interface on [default: 8126] From 7aabdcfc7a5d235d77ed5ae4c8694be3947fe070 Mon Sep 17 00:00:00 2001 From: Ben Burry Date: Tue, 16 Sep 2014 21:16:11 +0000 Subject: [PATCH 7/7] Add brief docs around server callback function sig --- stats.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/stats.js b/stats.js index 280f3eb..eb57671 100644 --- a/stats.js +++ b/stats.js @@ -46,6 +46,10 @@ function loadBackend(config, name) { } // Load and init the server from the servers/ directory. +// The callback mimics the dgram 'message' event parameters (msg, rinfo) +// msg: the message received by the server. may contain more than one metric +// rinfo: contains remote address information and message length +// (attributes are .address, .port, .family, .size - you're welcome) function startServer(config, name, callback) { var servermod = require(name);