Merge branch 'etsymaster' into p/debugupdates

This commit is contained in:
david raistrick 2013-01-17 12:31:52 -05:00
Родитель 9fef3161f4 68fdd4132b
Коммит 4eb4b319f7
6 изменённых файлов: 537 добавлений и 31 удалений

Просмотреть файл

@ -1,4 +1,4 @@
StatsD [![Build Status](https://secure.travis-ci.org/etsy/statsd.png)](http://travis-ci.org/etsy/statsd)
StatsD [![Build Status](https://travis-ci.org/etsy/statsd.png?branch=backends-as-packages)](https://travis-ci.org/etsy/statsd)
======
A network daemon that runs on the [Node.js][node] platform and

Просмотреть файл

@ -13,7 +13,10 @@
*/
var net = require('net'),
util = require('util');
logger = require('../lib/logger');
// this will be instantiated to the logger
var l;
var debug;
var flushInterval;
@ -46,12 +49,12 @@ var post_stats = function graphite_post_stats(statString) {
var graphite = net.createConnection(graphitePort, graphiteHost);
graphite.addListener('error', function(connectionException){
if (debug) {
util.log(connectionException);
l.log(connectionException);
}
});
graphite.on('connect', function() {
var ts = Math.round(new Date().getTime() / 1000);
var namespace = globalNamespace.concat('statsd');
var namespace = globalNamespace.concat(prefixStats);
statString += namespace.join(".") + '.graphiteStats.last_exception ' + last_exception + ' ' + ts + "\n";
statString += namespace.join(".") + '.graphiteStats.last_flush ' + last_flush + ' ' + ts + "\n";
this.write(statString);
@ -60,7 +63,7 @@ var post_stats = function graphite_post_stats(statString) {
});
} catch(e){
if (debug) {
util.log(e);
l.log(e);
}
graphiteStats.last_exception = Math.round(new Date().getTime() / 1000);
}
@ -122,12 +125,12 @@ var flush_stats = function graphite_flush(ts, metrics) {
numStats += 1;
}
var namespace = globalNamespace.concat('statsd');
var namespace = globalNamespace.concat(prefixStats);
if (legacyNamespace === true) {
statString += 'statsd.numStats ' + numStats + ts_suffix;
statString += 'stats.statsd.graphiteStats.calculationtime ' + (Date.now() - starttime) + ts_suffix;
statString += prefixStats + '.numStats ' + numStats + ts_suffix;
statString += 'stats.' + prefixStats + '.graphiteStats.calculationtime ' + (Date.now() - starttime) + ts_suffix;
for (key in statsd_metrics) {
statString += 'stats.statsd.' + key + ' ' + statsd_metrics[key] + ts_suffix;
statString += 'stats.' + prefixStats + '.' + key + ' ' + statsd_metrics[key] + ts_suffix;
}
} else {
statString += namespace.join(".") + '.numStats ' + numStats + ts_suffix;
@ -145,12 +148,13 @@ var flush_stats = function graphite_flush(ts, metrics) {
};
var backend_status = function graphite_status(writeCb) {
for (stat in graphiteStats) {
for (var stat in graphiteStats) {
writeCb(null, 'graphite', stat, graphiteStats[stat]);
}
};
exports.init = function graphite_init(startup_time, config, events) {
l = new logger.Logger(config.log || {});
debug = config.debug;
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;

Просмотреть файл

@ -35,6 +35,8 @@ Optional Variables:
percent: percentage of frequent keys to log [%, default: 100]
log: location of log file for frequent keys [default: STDOUT]
deleteCounters: don't send values to graphite for inactive counters, as opposed to sending 0 [default: false]
prefixStats: prefix to use for the statsd statistics data for this running instance of statsd [default: statsd]
applies to both legacy and new namespacing
console:
prettyprint: whether to prettyprint the console backend
@ -59,14 +61,12 @@ Optional Variables:
e.g. [ { host: '10.10.10.10', port: 8125 },
{ host: 'observer', port: 88125 } ]
repeaterProtocol: whether to use udp4 or udp4 for repeaters.
repeaterProtocol: whether to use udp4 or udp6 for repeaters.
["udp4" or "udp6", default: "udp4"]
*/
{
graphitePort: 2003
, graphiteHost: "graphite.host.com"
, graphiteHost: "graphite.example.com"
, port: 8125
, backends: [ "./backends/graphite" ]
, repeater: [ { host: "10.8.3.214", port: 8125 } ]
, repeaterProtocol: "udp4"
}

Просмотреть файл

@ -11,10 +11,7 @@ var dgram = require('dgram')
// initialize data structures with defaults for statsd stats
var keyCounter = {};
var counters = {
"statsd.packets_received": 0,
"statsd.bad_lines_seen": 0
};
var counters = {};
var timers = {};
var gauges = {};
var sets = {};
@ -61,7 +58,7 @@ function flushMetrics() {
backendEvents.once('flush', function clear_metrics(ts, metrics) {
// Clear the counters
conf.deleteCounters = conf.deleteCounters || false;
for (key in metrics.counters) {
for (var key in metrics.counters) {
if (conf.deleteCounters) {
delete(metrics.counters[key]);
} else {
@ -70,12 +67,12 @@ function flushMetrics() {
}
// Clear the timers
for (key in metrics.timers) {
for (var key in metrics.timers) {
metrics.timers[key] = [];
}
// Clear the sets
for (key in metrics.sets) {
for (var key in metrics.sets) {
metrics.sets[key] = new set.Set();
}
});
@ -117,6 +114,17 @@ config.configFile(process.argv[2], function (config, oldConfig) {
}, config.debugInterval || 10000);
}
// setup config for stats prefix
prefixStats = config.prefixStats;
prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
//setup the names for the stats stored in counters{}
bad_lines_seen = prefixStats + ".bad_lines_seen";
packets_received = prefixStats + ".packets_received";
//now set to zero so we can increment them
counters[bad_lines_seen] = 0;
counters[packets_received] = 0;
if (server === undefined) {
// key counting
@ -124,10 +132,10 @@ config.configFile(process.argv[2], function (config, oldConfig) {
server = dgram.createSocket('udp4', function (msg, rinfo) {
backendEvents.emit('packet', msg, rinfo);
counters["statsd.packets_received"]++;
counters[packets_received]++;
var metrics = msg.toString().split("\n");
for (midx in metrics) {
for (var midx in metrics) {
if (config.dumpMessages) {
l.log(metrics[midx].toString());
}
@ -153,7 +161,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
var fields = bits[i].split("|");
if (fields[1] === undefined) {
l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"');
counters["statsd.bad_lines_seen"]++;
counters[bad_lines_seen]++;
stats['messages']['bad_lines_seen']++;
continue;
}
@ -175,7 +183,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
} else {
l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"; has invalid sample rate');
counters["statsd.bad_lines_seen"]++;
counters[bad_lines_seen]++;
stats['messages']['bad_lines_seen']++;
continue;
}
@ -227,8 +235,8 @@ config.configFile(process.argv[2], function (config, oldConfig) {
};
// Loop through the base stats
for (group in stats) {
for (metric in stats[group]) {
for (var group in stats) {
for (var metric in stats[group]) {
stat_writer(group, metric, stats[group][metric]);
}
}
@ -265,7 +273,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
break;
case "delcounters":
for (index in cmdline) {
for (var index in cmdline) {
delete counters[cmdline[index]];
stream.write("deleted: " + cmdline[index] + "\n");
}
@ -273,7 +281,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
break;
case "deltimers":
for (index in cmdline) {
for (var index in cmdline) {
delete timers[cmdline[index]];
stream.write("deleted: " + cmdline[index] + "\n");
}
@ -281,7 +289,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
break;
case "delgauges":
for (index in cmdline) {
for (var index in cmdline) {
delete gauges[cmdline[index]];
stream.write("deleted: " + cmdline[index] + "\n");
}
@ -333,7 +341,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
var key;
var sortedKeys = [];
for (key in keyCounter) {
for (var key in keyCounter) {
sortedKeys.push([key, keyCounter[key]]);
}

Просмотреть файл

@ -0,0 +1,230 @@
var fs = require('fs'),
net = require('net'),
temp = require('temp'),
spawn = require('child_process').spawn,
util = require('util'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
qsparse = require('querystring').parse,
http = require('http');
var writeconfig = function(text,worker,cb,obj){
temp.open({suffix: '-statsdconf.js'}, function(err, info) {
if (err) throw err;
fs.writeSync(info.fd, text);
fs.close(info.fd, function(err) {
if (err) throw err;
worker(info.path,cb,obj);
});
});
}
var array_contents_are_equal = function(first,second){
var intlen = _.intersection(first,second).length;
var unlen = _.union(first,second).length;
return (intlen == unlen) && (intlen == first.length);
}
var statsd_send = function(data,sock,host,port,cb){
send_data = new Buffer(data);
sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
if (err) {
throw err;
}
cb();
});
}
// keep collecting data until a specified timeout period has elapsed
// this will let us capture all data chunks so we don't miss one
var collect_for = function(server,timeout,cb){
var received = [];
var in_flight = 0;
var timed_out = false;
var collector = function(req,res){
in_flight += 1;
var body = '';
req.on('data',function(data){ body += data; });
req.on('end',function(){
received = received.concat(body.split("\n"));
in_flight -= 1;
if((in_flight < 1) && timed_out){
server.removeListener('request',collector);
cb(received);
}
});
}
setTimeout(function (){
timed_out = true;
if((in_flight < 1)) {
server.removeListener('connection',collector);
cb(received);
}
},timeout);
server.on('connection',collector);
}
module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
var configfile = "{graphService: \"graphite\"\n\
, batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, percentThreshold: 90\n\
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, prefixStats: \"statsprefix\"\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";
this.acceptor = net.createServer();
this.acceptor.listen(this.testport);
this.sock = dgram.createSocket('udp4');
this.server_up = true;
this.ok_to_die = false;
this.exit_callback_callback = process.exit;
writeconfig(configfile,function(path,cb,obj){
obj.path = path;
obj.server = spawn('node',['stats.js', path]);
obj.exit_callback = function (code) {
obj.server_up = false;
if(!obj.ok_to_die){
console.log('node server unexpectedly quit with code: ' + code);
process.exit(1);
}
else {
obj.exit_callback_callback();
}
};
obj.server.on('exit', obj.exit_callback);
obj.server.stderr.on('data', function (data) {
console.log('stderr: ' + data.toString().replace(/\n$/,''));
});
/*
obj.server.stdout.on('data', function (data) {
console.log('stdout: ' + data.toString().replace(/\n$/,''));
});
*/
obj.server.stdout.on('data', function (data) {
// wait until server is up before we finish setUp
if (data.toString().match(/server is up/)) {
cb();
}
});
},callback,this);
},
tearDown: function (callback) {
this.sock.close();
this.acceptor.close();
this.ok_to_die = true;
if(this.server_up){
this.exit_callback_callback = callback;
this.server.kill();
} else {
callback();
}
},
send_well_formed_posts: function (test) {
test.expect(2);
// we should integrate a timeout into this
this.acceptor.once('connection',function(c){
var body = '';
c.on('data',function(d){ body += d; });
c.on('end',function(){
var rows = body.split("\n");
var entries = _.map(rows, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsprefix.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsprefix.numStats' })['statsprefix.numStats'],2);
test.done();
});
});
},
timers_are_valid: function (test) {
test.expect(3);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsprefix.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 1');
var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean_90';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue);
test.done();
});
});
});
},
counts_are_valid: function (test) {
test.expect(4);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsprefix.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 1');
var testavgvalue_test = function(post){
var mykey = 'stats.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000)));
};
test.ok(_.any(hashes,testavgvalue_test), 'stats.a_test_value should be ' + (testvalue/(me.myflush / 1000)));
var testcountvalue_test = function(post){
var mykey = 'stats_counts.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testcountvalue_test), 'stats_counts.a_test_value should be ' + testvalue);
test.done();
});
});
});
}
}

Просмотреть файл

@ -0,0 +1,264 @@
var fs = require('fs'),
net = require('net'),
temp = require('temp'),
spawn = require('child_process').spawn,
util = require('util'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
qsparse = require('querystring').parse,
http = require('http');
var writeconfig = function(text,worker,cb,obj){
temp.open({suffix: '-statsdconf.js'}, function(err, info) {
if (err) throw err;
fs.writeSync(info.fd, text);
fs.close(info.fd, function(err) {
if (err) throw err;
worker(info.path,cb,obj);
});
});
}
var array_contents_are_equal = function(first,second){
var intlen = _.intersection(first,second).length;
var unlen = _.union(first,second).length;
return (intlen == unlen) && (intlen == first.length);
}
var statsd_send = function(data,sock,host,port,cb){
send_data = new Buffer(data);
sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
if (err) {
throw err;
}
cb();
});
}
// keep collecting data until a specified timeout period has elapsed
// this will let us capture all data chunks so we don't miss one
var collect_for = function(server,timeout,cb){
var received = [];
var in_flight = 0;
var timed_out = false;
var collector = function(req,res){
in_flight += 1;
var body = '';
req.on('data',function(data){ body += data; });
req.on('end',function(){
received = received.concat(body.split("\n"));
in_flight -= 1;
if((in_flight < 1) && timed_out){
server.removeListener('request',collector);
cb(received);
}
});
}
setTimeout(function (){
timed_out = true;
if((in_flight < 1)) {
server.removeListener('connection',collector);
cb(received);
}
},timeout);
server.on('connection',collector);
}
module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
var configfile = "{graphService: \"graphite\"\n\
, batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, percentThreshold: 90\n\
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, prefixStats: \"statsprefix\"\n\
, graphite: { legacyNamespace: false }\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";
this.acceptor = net.createServer();
this.acceptor.listen(this.testport);
this.sock = dgram.createSocket('udp4');
this.server_up = true;
this.ok_to_die = false;
this.exit_callback_callback = process.exit;
writeconfig(configfile,function(path,cb,obj){
obj.path = path;
obj.server = spawn('node',['stats.js', path]);
obj.exit_callback = function (code) {
obj.server_up = false;
if(!obj.ok_to_die){
console.log('node server unexpectedly quit with code: ' + code);
process.exit(1);
}
else {
obj.exit_callback_callback();
}
};
obj.server.on('exit', obj.exit_callback);
obj.server.stderr.on('data', function (data) {
console.log('stderr: ' + data.toString().replace(/\n$/,''));
});
/*
obj.server.stdout.on('data', function (data) {
console.log('stdout: ' + data.toString().replace(/\n$/,''));
});
*/
obj.server.stdout.on('data', function (data) {
// wait until server is up before we finish setUp
if (data.toString().match(/server is up/)) {
cb();
}
});
},callback,this);
},
tearDown: function (callback) {
this.sock.close();
this.acceptor.close();
this.ok_to_die = true;
if(this.server_up){
this.exit_callback_callback = callback;
this.server.kill();
} else {
callback();
}
},
send_well_formed_posts: function (test) {
test.expect(2);
// we should integrate a timeout into this
this.acceptor.once('connection',function(c){
var body = '';
c.on('data',function(d){ body += d; });
c.on('end',function(){
var rows = body.split("\n");
var entries = _.map(rows, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'stats.statsprefix.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'stats.statsprefix.numStats' })['stats.statsprefix.numStats'],2);
test.done();
});
});
},
send_malformed_post: function (test) {
test.expect(3);
var testvalue = 1;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_bad_test_value|z',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'stats.statsprefix.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 2);
};
test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 0');
var bad_lines_seen_value_test = function(post){
var mykey = 'stats.counters.statsprefix.bad_lines_seen.count';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,bad_lines_seen_value_test), 'stats.counters.statsprefix.bad_lines_seen.count should be ' + testvalue);
test.done();
});
});
});
},
timers_are_valid: function (test) {
test.expect(3);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'stats.statsprefix.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'stats.statsprefix.numStats should be 1');
var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean_90';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue);
test.done();
});
});
});
},
counts_are_valid: function (test) {
test.expect(4);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'stats.statsprefix.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsprefix.numStats should be 3');
var testavgvalue_test = function(post){
var mykey = 'stats.counters.a_test_value.rate';
return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000)));
};
test.ok(_.any(hashes,testavgvalue_test), 'a_test_value.rate should be ' + (testvalue/(me.myflush / 1000)));
var testcountvalue_test = function(post){
var mykey = 'stats.counters.a_test_value.count';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testcountvalue_test), 'a_test_value.count should be ' + testvalue);
test.done();
});
});
});
}
}