Merge pull request #448 from benburry/tcp_server

Add optional TCP interface
This commit is contained in:
Ben Burry 2014-09-16 14:28:23 -07:00
Родитель 303451b14a 7aabdcfc7a
Коммит 07a85e0ed3
6 изменённых файлов: 107 добавлений и 13 удалений

Просмотреть файл

@ -2,8 +2,8 @@ StatsD [![Build Status][travis-ci_status_img]][travis-ci_statsd]
====== ======
A network daemon that runs on the [Node.js][node] platform and A network daemon that runs on the [Node.js][node] platform and
listens for statistics, like counters and timers, sent over [UDP][udp] listens for statistics, like counters and timers, sent over [UDP][udp] or
and sends aggregates to one or more pluggable backend services (e.g., [TCP][tcp] and sends aggregates to one or more pluggable backend services (e.g.,
[Graphite][graphite]). [Graphite][graphite]).
We ([Etsy][etsy]) [blogged][blog post] about how it works and why we created it. We ([Etsy][etsy]) [blogged][blog post] about how it works and why we created it.
@ -46,12 +46,12 @@ Installation and Configuration
Usage Usage
------- -------
The basic line protocol expects metrics to be sent via UDP in the format: The basic line protocol expects metrics to be sent in the format:
<metricname>:<value>|<type> <metricname>:<value>|<type>
So the simplest way to send in metrics from your command line if you have So the simplest way to send in metrics from your command line if you have
StatsD running on localhost would be: StatsD running with the default UDP server on localhost would be:
echo "foo:1|c" | nc -u -w0 127.0.0.1 8125 echo "foo:1|c" | nc -u -w0 127.0.0.1 8125
@ -104,6 +104,7 @@ Meta
[counting-timing]: http://code.flickr.com/blog/2008/10/27/counting-timing/ [counting-timing]: http://code.flickr.com/blog/2008/10/27/counting-timing/
[Flicker-StatsD]: https://github.com/iamcal/Flickr-StatsD [Flicker-StatsD]: https://github.com/iamcal/Flickr-StatsD
[udp]: http://en.wikipedia.org/wiki/User_Datagram_Protocol [udp]: http://en.wikipedia.org/wiki/User_Datagram_Protocol
[tcp]: http://en.wikipedia.org/wiki/Transmission_Control_Protocol
[docs_metric_types]: https://github.com/etsy/statsd/blob/master/docs/metric_types.md [docs_metric_types]: https://github.com/etsy/statsd/blob/master/docs/metric_types.md
[docs_graphite]: https://github.com/etsy/statsd/blob/master/docs/graphite.md [docs_graphite]: https://github.com/etsy/statsd/blob/master/docs/graphite.md
[docs_backend]: https://github.com/etsy/statsd/blob/master/docs/backend.md [docs_backend]: https://github.com/etsy/statsd/blob/master/docs/backend.md

Просмотреть файл

@ -15,10 +15,14 @@ Optional Variables:
the default graphite backend will be loaded. the default graphite backend will be loaded.
* example for console and graphite: * example for console and graphite:
[ "./backends/console", "./backends/graphite" ] [ "./backends/console", "./backends/graphite" ]
server: the server to load. The server must exist by name in the directory
servers/. If not specified, the default udp server will be loaded.
* example for tcp server:
"./servers/tcp"
debug: debug flag [default: false] debug: debug flag [default: false]
address: address to listen on over UDP [default: 0.0.0.0] address: address to listen on [default: 0.0.0.0]
address_ipv6: defines if the address is an IPv4 or IPv6 address [true or false, default: false] address_ipv6: defines if the address is an IPv4 or IPv6 address [true or false, default: false]
port: port to listen for messages on over UDP [default: 8125] port: port to listen for messages on [default: 8125]
mgmt_address: address to run the management TCP interface on mgmt_address: address to run the management TCP interface on
[default: 0.0.0.0] [default: 0.0.0.0]
mgmt_port: port to run the management TCP interface on [default: 8126] mgmt_port: port to run the management TCP interface on [default: 8126]

21
servers/tcp.js Normal file
Просмотреть файл

@ -0,0 +1,21 @@
var net = require('net');
function rinfo(tcpstream, data) {
this.address = tcpstream.remoteAddress;
this.port = tcpstream.remotePort;
this.family = tcpstream.address().family;
this.size = data.length;
}
exports.start = function(config, callback){
var server = net.createServer(function(stream) {
stream.setEncoding('ascii');
stream.on('data', function(data) {
callback(data, new rinfo(stream, data));
});
});
server.listen(config.port || 8125, config.address || undefined);
return true;
};

8
servers/udp.js Normal file
Просмотреть файл

@ -0,0 +1,8 @@
var dgram = require('dgram');
exports.start = function(config, callback){
var udp_version = config.address_ipv6 ? 'udp6' : 'udp4';
var server = dgram.createSocket(udp_version, callback);
server.bind(config.port || 8125, config.address || undefined);
return true;
};

Просмотреть файл

@ -1,7 +1,6 @@
/*jshint node:true, laxcomma:true */ /*jshint node:true, laxcomma:true */
var dgram = require('dgram') var util = require('util')
, util = require('util')
, net = require('net') , net = require('net')
, config = require('./lib/config') , config = require('./lib/config')
, helpers = require('./lib/helpers') , helpers = require('./lib/helpers')
@ -24,7 +23,7 @@ var sets = {};
var counter_rates = {}; var counter_rates = {};
var timer_data = {}; var timer_data = {};
var pctThreshold = null; var pctThreshold = null;
var flushInterval, keyFlushInt, server, mgmtServer; var flushInterval, keyFlushInt, serverLoaded, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000); var startup_time = Math.round(new Date().getTime() / 1000);
var backendEvents = new events.EventEmitter(); var backendEvents = new events.EventEmitter();
var healthStatus = config.healthStatus || 'up'; var healthStatus = config.healthStatus || 'up';
@ -46,6 +45,25 @@ function loadBackend(config, name) {
} }
} }
// Load and init the server from the servers/ directory.
// The callback mimics the dgram 'message' event parameters (msg, rinfo)
// msg: the message received by the server. may contain more than one metric
// rinfo: contains remote address information and message length
// (attributes are .address, .port, .family, .size - you're welcome)
function startServer(config, name, callback) {
var servermod = require(name);
if (config.debug) {
l.log("Loading server: " + name, 'DEBUG');
}
var ret = servermod.start(config, callback);
if (!ret) {
l.log("Failed to load server: " + name);
process.exit(1);
}
}
// global for conf // global for conf
var conf; var conf;
@ -162,13 +180,14 @@ config.configFile(process.argv[2], function (config) {
counters[bad_lines_seen] = 0; counters[bad_lines_seen] = 0;
counters[packets_received] = 0; counters[packets_received] = 0;
if (server === undefined) { if (!serverLoaded) {
// key counting // key counting
var keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0); var keyFlushInterval = Number((config.keyFlush && config.keyFlush.interval) || 0);
var udp_version = config.address_ipv6 ? 'udp6' : 'udp4'; // The default server is UDP
server = dgram.createSocket(udp_version, function (msg, rinfo) { var server = config.server || './servers/udp'
serverLoaded = startServer(config, server, function (msg, rinfo) {
backendEvents.emit('packet', msg, rinfo); backendEvents.emit('packet', msg, rinfo);
counters[packets_received]++; counters[packets_received]++;
var packet_data = msg.toString(); var packet_data = msg.toString();
@ -355,7 +374,6 @@ config.configFile(process.argv[2], function (config) {
}); });
}); });
server.bind(config.port || 8125, config.address || undefined);
mgmtServer.listen(config.mgmt_port || 8126, config.mgmt_address || undefined); mgmtServer.listen(config.mgmt_port || 8126, config.mgmt_address || undefined);
util.log("server is up"); util.log("server is up");

42
test/server_tests.js Normal file
Просмотреть файл

@ -0,0 +1,42 @@
var dgram = require('dgram'),
net = require('net');
var config = {
address: '127.0.0.1',
port: 8125
};
var msg = "This is a test\r\n";
module.exports = {
udp_data_received: function(test) {
test.expect(3);
var server = require('../servers/udp');
var started = server.start(config, function(data, rinfo) {
test.equal(msg, data.toString());
test.equal(msg.length, rinfo.size);
test.done();
});
test.ok(started);
var buf = new Buffer(msg);
var sock = dgram.createSocket('udp4');
sock.send(buf, 0, buf.length, config.port, config.address, function(err, bytes) {
sock.close();
});
},
tcp_data_received: function(test) {
test.expect(3);
var server = require('../servers/tcp');
var started = server.start(config, function(data, rinfo) {
test.equal(msg, data.toString());
test.equal(msg.length, rinfo.size);
test.done();
});
test.ok(started);
var client = net.connect(config.port, config.address, function() {
client.write(msg);
client.end();
});
}
}