This commit is contained in:
Dieter Plaetinck 2012-12-09 13:02:26 -05:00
Родитель 6582ea204f 1a8bb9b144
Коммит 93086913c0
20 изменённых файлов: 1100 добавлений и 186 удалений

112
README.md
Просмотреть файл

@ -22,34 +22,44 @@ etc)
general values should be integer.
* *flush*
After the flush interval timeout (default 10 seconds), stats are
aggregated and sent to an upstream backend service.
After the flush interval timeout (defined by `config.flushInterval`,
default 10 seconds), stats are aggregated and sent to an upstream backend service.
Counting
--------
gorets:1|c
This is a simple counter. Add 1 to the "gorets" bucket. It stays in memory
until the flush interval `config.flushInterval`.
This is a simple counter. Add 1 to the "gorets" bucket.
At each flush the current count is sent and reset to 0.
If the count at flush is 0 then you can opt to send no metric at all for
this counter, by setting `config.deleteCounters` (applies only to graphite
backend). Statsd will send both the rate as well as the count at each flush.
### Sampling
gorets:1|c|@0.1
Tells StatsD that this counter is being sent sampled every 1/10th of the time.
Timing
------
glork:320|ms
The glork took 320ms to complete this time. StatsD figures out 90th percentile,
average (mean), lower and upper bounds for the flush interval. The percentile
threshold can be tweaked with `config.percentThreshold`.
The glork took 320ms to complete this time. StatsD figures out percentiles,
average (mean), standard deviation, sum, lower and upper bounds for the flush interval.
The percentile threshold can be tweaked with `config.percentThreshold`.
The percentile threshold can be a single value, or a list of values, and will
generate the following list of stats for each threshold:
stats.timers.$KEY.mean_$PCT stats.timers.$KEY.upper_$PCT
stats.timers.$KEY.mean_$PCT
stats.timers.$KEY.upper_$PCT
stats.timers.$KEY.sum_$PCT
Where `$KEY` is the key you stats key you specify when sending to statsd, and
`$PCT` is the percentile threshold.
Where `$KEY` is the stats key you specify when sending to statsd, and `$PCT` is
the percentile threshold.
Use the `config.histogram` setting to instruct statsd to maintain histograms
over time. Specify which metrics to match and a corresponding list of
@ -79,13 +89,6 @@ Note:
histograms, as you can make each bin arbitrarily wide,
i.e. class intervals of different sizes.
Sampling
--------
gorets:1|c|@0.1
Tells StatsD that this counter is being sent sampled every 1/10th of the time.
Gauges
------
StatsD now also supports gauges, arbitrary values, which can be recorded.
@ -116,27 +119,29 @@ For more information, check the `exampleConfig.js`.
Supported Backends
------------------
StatsD supports multiple, pluggable, backend modules that can publish
StatsD supports pluggable backend modules that can publish
statistics from the local StatsD daemon to a backend service or data
store. Backend services can retain statistics for
longer durations in a time series data store, visualize statistics in
graphs or tables, or generate alerts based on defined thresholds. A
backend can also correlate statistics sent from StatsD daemons running
across multiple hosts in an infrastructure.
store. Backend services can retain statistics in a time series data store,
visualize statistics in graphs or tables, or generate alerts based on
defined thresholds. A backend can also correlate statistics sent from StatsD
daemons running across multiple hosts in an infrastructure.
StatsD includes the following backends:
StatsD includes the following built-in backends:
* [Graphite][graphite] (`graphite`): Graphite is an open-source
time-series data store that provides visualization through a
web-browser interface.
* Console (`console`): The console backend outputs the received
metrics to stdout (e.g. for seeing what's going on during development).
* Repeater (`repeater`): The repeater backend utilizes the `packet` emit API to
* [Graphite][graphite] (`graphite`): An open-source
time-series data store that provides visualization through a web-browser.
* Console (`console`): Outputs the received
metrics to stdout (see what's going on during development).
* Repeater (`repeater`): Utilizes the `packet` emit API to
forward raw packets retrieved by StatsD to multiple backend StatsD instances.
By default, the `graphite` backend will be loaded automatically. To
select which backends are loaded, set the `backends` configuration
variable to the list of backend modules to load.
A robust set of [other backends](https://github.com/etsy/statsd/wiki/Backends)
are also available as plugins to allow easy reporting into databases, queues
and third-party services.
By default, the `graphite` backend will be loaded automatically. Multiple
backends can be run at once. To select which backends are loaded, set
the `backends` configuration variable to the list of backend modules to load.
Backends are just npm modules which implement the interface described in
section *Backend Interface*. In order to be able to load the backend, add the
@ -153,7 +158,7 @@ Graphite uses "schemas" to define the different round robin datasets it houses
In conf/storage-schemas.conf:
[stats]
pattern = ^stats\..*
pattern = ^stats.*
retentions = 10:2160,60:10080,600:262974
In conf/storage-aggregation.conf:
@ -299,12 +304,18 @@ metrics: {
counters: counters,
gauges: gauges,
timers: timers,
sets: sets,
counter_rates: counter_rates,
timer_data: timer_data,
statsd_metrics: statsd_metrics,
pctThreshold: pctThreshold
}
```
Each backend module is passed the same set of statistics, so a
backend module should treat the metrics as immutable
The counter_rates and timer_data are precalculated statistics to simplify
the creation of backends, the statsd_metrics hash contains metrics generated
by statsd itself. Each backend module is passed the same set of
statistics, so a backend module should treat the metrics as immutable
structures. StatsD will reset timers and counters after each
listener has handled the event.
@ -331,6 +342,35 @@ metrics: {
the raw received message string and the `rinfo` paramter contains remote
address information from the UDP socket.
Metric namespacing
-------------------
The metric namespacing in the Graphite backend is configurable with regard to
the prefixes. Per default all stats are put under `stats` in Graphite, which
makes it easier to consolidate them all under one schema. However it is
possible to change these namespaces in the backend configuration options.
The available configuration options (living under the `graphite` key) are:
```
legacyNamespace: use the legacy namespace [default: true]
globalPrefix: global prefix to use for sending stats to graphite [default: "stats"]
prefixCounter: graphite prefix for counter metrics [default: "counters"]
prefixTimer: graphite prefix for timer metrics [default: "timers"]
prefixGauge: graphite prefix for gauge metrics [default: "gauges"]
prefixSet: graphite prefix for set metrics [default: "sets"]
```
If you decide not to use the legacy namespacing, besides the obvious changes
in the prefixing, there will also be a breaking change in the way counters are
submitted. So far counters didn't live under any namespace and were also a bit
confusing due to the way they record rate and absolute counts. In the legacy
setting rates were recorded under `stats.counter_name` directly, whereas the
absolute count could be found under `stats_count.counter_name`. With disabling
the legacy namespacing those values can be found (with default prefixing)
under `stats.counters.counter_name.rate` and
`stats.counters.counter_name.count` now.
Inspiration
-----------

Просмотреть файл

@ -31,9 +31,11 @@ ConsoleBackend.prototype.flush = function(timestamp, metrics) {
});
var out = {
counter: this.statsCache.counters,
counters: this.statsCache.counters,
timers: this.statsCache.timers,
gauges: metrics.gauges,
timer_data: metrics.timer_data,
counter_rates: metrics.counter_rates,
sets: function (vals) {
var ret = {};
for (val in vals) {

Просмотреть файл

@ -15,12 +15,27 @@
var net = require('net'),
util = require('util');
var config;
var debug;
var flushInterval;
var graphiteHost;
var graphitePort;
// prefix configuration
var globalPrefix;
var prefixPersecond;
var prefixCounter;
var prefixTimer;
var prefixGauge;
var prefixSet;
// set up namespaces
var legacyNamespace = true;
var globalNamespace = [];
var counterNamespace = [];
var timerNamespace = [];
var gaugesNamespace = [];
var setsNamespace = [];
var graphiteStats = {};
var post_stats = function graphite_post_stats(statString) {
@ -36,8 +51,9 @@ var post_stats = function graphite_post_stats(statString) {
});
graphite.on('connect', function() {
var ts = Math.round(new Date().getTime() / 1000);
statString += 'stats.statsd.graphiteStats.last_exception ' + last_exception + ' ' + ts + "\n";
statString += 'stats.statsd.graphiteStats.last_flush ' + last_flush + ' ' + ts + "\n";
var namespace = globalNamespace.concat('statsd');
statString += namespace.join(".") + '.graphiteStats.last_exception ' + last_exception + ' ' + ts + "\n";
statString += namespace.join(".") + '.graphiteStats.last_flush ' + last_flush + ' ' + ts + "\n";
this.write(statString);
this.end();
graphiteStats.last_flush = Math.round(new Date().getTime() / 1000);
@ -56,117 +72,71 @@ var flush_stats = function graphite_flush(ts, metrics) {
var statString = '';
var numStats = 0;
var key;
var timer_data_key;
var counters = metrics.counters;
var gauges = metrics.gauges;
var timers = metrics.timers;
var sets = metrics.sets;
var pctThreshold = metrics.pctThreshold;
var counter_rates = metrics.counter_rates;
var timer_data = metrics.timer_data;
var statsd_metrics = metrics.statsd_metrics;
for (key in counters) {
var namespace = counterNamespace.concat(key);
var value = counters[key];
var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate
statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + ts + "\n";
statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n";
if (legacyNamespace === true) {
statString += namespace.join(".") + ' ' + valuePerSecond + ' ' + ts + "\n";
statString += 'stats_counts.' + key + ' ' + value + ' ' + ts + "\n";
} else {
statString += namespace.concat('rate').join(".") + ' ' + valuePerSecond + ' ' + ts + "\n";
statString += namespace.concat('count').join(".") + ' ' + value + ' ' + ts + "\n";
}
numStats += 1;
}
for (key in timers) {
if (timers[key].length > 0) {
var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];
var cumulativeValues = [min];
for (var i = 1; i < count; i++) {
cumulativeValues.push(values[i] + cumulativeValues[i-1]);
for (key in timer_data) {
if (Object.keys(timer_data).length > 0) {
for (timer_data_key in timer_data[key]) {
var namespace = timerNamespace.concat(key);
var the_key = namespace.join(".");
statString += the_key + '.' + timer_data_key + ' ' + timer_data[key][timer_data_key] + ' ' + ts + "\n";
}
var sum = min;
var mean = min;
var maxAtThreshold = max;
var message = "";
var key2;
for (key2 in pctThreshold) {
var pct = pctThreshold[key2];
if (count > 1) {
var thresholdIndex = Math.round(((100 - pct) / 100) * count);
var numInThreshold = count - thresholdIndex;
maxAtThreshold = values[numInThreshold - 1];
sum = cumulativeValues[numInThreshold - 1];
mean = sum / numInThreshold;
}
var clean_pct = '' + pct;
clean_pct.replace('.', '_');
message += 'stats.timers.' + key + '.mean_' + clean_pct + ' ' + mean + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper_' + clean_pct + ' ' + maxAtThreshold + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.sum_' + clean_pct + ' ' + sum + ' ' + ts + "\n";
}
sum = cumulativeValues[count-1];
mean = sum / count;
var sumOfDiffs = 0;
for (var i = 0; i < count; i++) {
sumOfDiffs += (values[i] - mean) * (values[i] - mean);
}
var stddev = Math.sqrt(sumOfDiffs / count);
message += 'stats.timers.' + key + '.std ' + stddev + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.sum ' + sum + ' ' + ts + "\n";
message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
// note: values bigger than the upper limit of the last bin are ignored, by design
conf = config.histogram || [];
bins = [];
for (var i = 0; i < conf.length; i++) {
if (key.indexOf(conf[i].metric) > -1) {
bins = conf[i].bins;
break;
}
}
// the outer loop iterates bins, the inner loop iterates timer values;
// within each run of the inner loop we should only consider the timer value range that's within the scope of the current bin
// so we leverage the fact that the values are already sorted to end up with only full 1 iteration of the entire values range
var i = 0;
for (var bin_i = 0; bin_i < bins.length; bin_i++) {
var freq = 0;
for (; i < count && (bins[bin_i] == 'inf' || values[i] < bins[bin_i]); i++) {
freq += 1;
}
bin_name = ('bin_' + bins[bin_i]).replace('.','_');
message += 'stats.timers.' + key + '.' + bin_name + ' ' + freq + ' ' + ts + "\n";
}
statString += message;
numStats += 1;
}
}
for (key in gauges) {
statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n";
var namespace = gaugesNamespace.concat(key);
statString += namespace.join(".") + ' ' + gauges[key] + ' ' + ts + "\n";
numStats += 1;
}
for (key in sets) {
statString += 'stats.sets.' + key + '.count ' + sets[key].values().length + ' ' + ts + "\n";
var namespace = setsNamespace.concat(key);
statString += namespace.join(".") + '.count ' + sets[key].values().length + ' ' + ts + "\n";
numStats += 1;
}
statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
statString += 'stats.statsd.graphiteStats.calculationtime ' + (Date.now() - starttime) + ' ' + ts + "\n";
var namespace = globalNamespace.concat('statsd');
if (legacyNamespace === true) {
statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
statString += 'stats.statsd.graphiteStats.calculationtime ' + (Date.now() - starttime) + ' ' + ts + "\n";
for (key in statsd_metrics) {
statString += 'stats.statsd.' + key + ' ' + statsd_metrics[key] + ' ' + ts + "\n";
}
} else {
statString += namespace.join(".") + '.numStats ' + numStats + ' ' + ts + "\n";
statString += namespace.join(".") + '.graphiteStats.calculationtime ' + (Date.now() - starttime) + ' ' + ts + "\n";
for (key in statsd_metrics) {
var the_key = namespace.concat(key);
statString += the_key.join(".") + ' ' + statsd_metrics[key] + ' ' + ts + "\n";
}
}
post_stats(statString);
};
@ -176,11 +146,55 @@ var backend_status = function graphite_status(writeCb) {
}
};
exports.init = function graphite_init(startup_time, conf, events) {
config = conf
exports.init = function graphite_init(startup_time, config, events) {
debug = config.debug;
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;
config.graphite = config.graphite || {};
globalPrefix = config.graphite.globalPrefix;
prefixCounter = config.graphite.prefixCounter;
prefixTimer = config.graphite.prefixTimer;
prefixGauge = config.graphite.prefixGauge;
prefixSet = config.graphite.prefixSet;
legacyNamespace = config.graphite.legacyNamespace;
// set defaults for prefixes
globalPrefix = globalPrefix !== undefined ? globalPrefix : "stats";
prefixCounter = prefixCounter !== undefined ? prefixCounter : "counters";
prefixTimer = prefixTimer !== undefined ? prefixTimer : "timers";
prefixGauge = prefixGauge !== undefined ? prefixGauge : "gauges";
prefixSet = prefixSet !== undefined ? prefixSet : "sets";
legacyNamespace = legacyNamespace !== undefined ? legacyNamespace : true;
if (legacyNamespace === false) {
if (globalPrefix !== "") {
globalNamespace.push(globalPrefix);
counterNamespace.push(globalPrefix);
timerNamespace.push(globalPrefix);
gaugesNamespace.push(globalPrefix);
setsNamespace.push(globalPrefix);
}
if (prefixCounter !== "") {
counterNamespace.push(prefixCounter);
}
if (prefixTimer !== "") {
timerNamespace.push(prefixTimer);
}
if (prefixGauge !== "") {
gaugesNamespace.push(prefixGauge);
}
if (prefixSet !== "") {
setsNamespace.push(prefixSet);
}
} else {
globalNamespace = ['stats'];
counterNamespace = ['stats'];
timerNamespace = ['stats', 'timers'];
gaugesNamespace = ['stats', 'gauges'];
setsNamespace = ['stats', 'sets'];
}
graphiteStats.last_flush = startup_time;
graphiteStats.last_exception = startup_time;

Просмотреть файл

@ -4,7 +4,9 @@ var util = require('util'),
function RepeaterBackend(startupTime, config, emitter){
var self = this;
this.config = config.repeater || [];
this.sock = dgram.createSocket('udp4');
this.sock = (config.repeaterProtocol == 'udp6') ?
dgram.createSocket('udp6') :
dgram.createSocket('udp4');
// attach
emitter.on('packet', function(packet, rinfo) { self.process(packet, rinfo); });

3
debian/statsd.init поставляемый
Просмотреть файл

@ -25,6 +25,7 @@ DAEMON=$NODE_BIN
DAEMON_ARGS="/usr/share/statsd/stats.js /etc/statsd/localConfig.js 2>&1 >> /var/log/statsd/statsd.log "
PIDFILE=/var/run/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
CHDIR="/usr/share/statsd"
# Exit if the package is not installed
# [ -x "$DAEMON" ] || exit 0
@ -50,7 +51,7 @@ do_start()
# 2 if daemon could not be started
start-stop-daemon --start --quiet -m --pidfile $PIDFILE --startas $DAEMON --background --test > /dev/null \
|| return 1
start-stop-daemon --start --quiet -m --pidfile $PIDFILE --startas $DAEMON --background -- \
start-stop-daemon --start --quiet -m --pidfile $PIDFILE --startas $DAEMON --background --chdir $CHDIR -- \
$DAEMON_ARGS > /dev/null 2> /var/log/$NAME-stderr.log \
|| return 2
# Add code here, if necessary, that waits for the process to be ready

2
debian/statsd.install поставляемый
Просмотреть файл

@ -1,6 +1,6 @@
stats.js /usr/share/statsd
config.js /usr/share/statsd
lib/*.js /usr/share/statsd/lib
backends/*.js /usr/share/statsd/backends
lib/*.js /usr/share/statsd/lib
debian/localConfig.js /etc/statsd
debian/scripts/start /usr/share/statsd/scripts

3
debian/statsd.upstart поставляемый
Просмотреть файл

@ -5,8 +5,7 @@ start on startup
stop on shutdown
script
# We found $HOME is needed. Without it, we ran into problems
export HOME="/root"
chdir /usr/share/statsd
exec sudo -u nobody /usr/share/statsd/scripts/start
end script

Просмотреть файл

@ -31,6 +31,7 @@ Optional Variables:
interval: how often to log frequent keys [ms, default: 0]
percent: percentage of frequent keys to log [%, default: 100]
log: location of log file for frequent keys [default: STDOUT]
deleteCounters: don't send values to graphite for inactive counters, as opposed to sending 0 [default: false]
console:
prettyprint: whether to prettyprint the console backend
@ -41,11 +42,20 @@ Optional Variables:
application: name of the application for syslog [string, default: statsd]
level: log level for [node-]syslog [string, default: LOG_INFO]
graphite:
legacyNamespace: use the legacy namespace [default: true]
globalPrefix: global prefix to use for sending stats to graphite [default: "stats"]
prefixCounter: graphite prefix for counter metrics [default: "counters"]
prefixTimer: graphite prefix for timer metrics [default: "timers"]
prefixGauge: graphite prefix for gauge metrics [default: "gauges"]
prefixSet: graphite prefix for set metrics [default: "sets"]
repeater: an array of hashes of the for host: and port:
that details other statsd servers to which the received
packets should be "repeated" (duplicated to).
e.g. [ { host: '10.10.10.10', port: 8125 },
{ host: 'observer', port: 88125 } ]
timer:
percentThreshold: calculate the Nth percentile(s)
(can be a single value or list of floating-point values)
@ -64,11 +74,15 @@ Optional Variables:
equal class interval and catchall for outliers:
[ { metric: 'foo', bins: [] },
{ metric: '', bins: [ 50, 100, 150, 200, 'inf'] } ]
repeaterProtocol: whether to use udp4 or udp4 for repeaters.
["udp4" or "udp6", default: "udp4"]
*/
{
graphitePort: 2003
, graphiteHost: "graphite.host.com"
, port: 8125
, backends: [ "./backends/repeater" ]
, backends: [ "./backends/graphite" ]
, repeater: [ { host: "10.8.3.214", port: 8125 } ]
, repeaterProtocol: "udp4"
}

Просмотреть файл

@ -8,14 +8,41 @@
class StatsD {
/**
* Log timing information
* Sets one or more timing values
*
* @param string $stats The metric to in log timing info for.
* @param float $time The ellapsed time (ms) to log
* @param float|1 $sampleRate the rate (0-1) for sampling.
* @param string|array $stats The metric(s) to set.
* @param float $time The elapsed time (ms) to log
**/
public static function timing($stat, $time, $sampleRate=1) {
StatsD::send(array($stat => "$time|ms"), $sampleRate);
public static function timing($stats, $time) {
StatsD::updateStats($stats, $time, 1, 'ms');
}
/**
* Sets one or more gauges to a value
*
* @param string|array $stats The metric(s) to set.
* @param float $value The value for the stats.
**/
public static function gauge($stats, $value) {
StatsD::updateStats($stats, $value, 1, 'g');
}
/**
* A "Set" is a count of unique events.
* This data type acts like a counter, but supports counting
* of unique occurences of values between flushes. The backend
* receives the number of unique events that happened since
* the last flush.
*
* The reference use case involved tracking the number of active
* and logged in users by sending the current userId of a user
* with each request with a key of "uniques" (or similar).
*
* @param string|array $stats The metric(s) to set.
* @param float $value The value for the stats.
**/
public static function set($stats, $value) {
StatsD::updateStats($stats, $value, 1, 's');
}
/**
@ -26,7 +53,7 @@ class StatsD {
* @return boolean
**/
public static function increment($stats, $sampleRate=1) {
StatsD::updateStats($stats, 1, $sampleRate);
StatsD::updateStats($stats, 1, $sampleRate, 'c');
}
/**
@ -37,22 +64,23 @@ class StatsD {
* @return boolean
**/
public static function decrement($stats, $sampleRate=1) {
StatsD::updateStats($stats, -1, $sampleRate);
StatsD::updateStats($stats, -1, $sampleRate, 'c');
}
/**
* Updates one or more stats counters by arbitrary amounts.
* Updates one or more stats.
*
* @param string|array $stats The metric(s) to update. Should be either a string or array of metrics.
* @param int|1 $delta The amount to increment/decrement each metric by.
* @param float|1 $sampleRate the rate (0-1) for sampling.
* @param string|c $metric The metric type ("c" for count, "ms" for timing, "g" for gauge, "s" for set)
* @return boolean
**/
public static function updateStats($stats, $delta=1, $sampleRate=1) {
public static function updateStats($stats, $delta=1, $sampleRate=1, $metric='c') {
if (!is_array($stats)) { $stats = array($stats); }
$data = array();
foreach($stats as $stat) {
$data[$stat] = "$delta|c";
$data[$stat] = "$delta|$metric";
}
StatsD::send($data, $sampleRate);
@ -137,4 +165,4 @@ class Config
host = yourhost
port = 8125
*/
*/

Просмотреть файл

@ -9,7 +9,17 @@
# statsd_port = 8125
# Sends statistics to the stats daemon over UDP
class Statsd(object):
class StatsdClient(object):
def __init__(self, host='localhost', port=8125):
self.host = host
self.port = port
try:
import local_settings as settings
self.host = settings.statsd_host
self.port = settings.statsd_port
except:
pass
self.addr=(host, port)
@staticmethod
def timing(stat, time, sample_rate=1):
@ -83,9 +93,14 @@ class Statsd(object):
for stat in sampled_data.keys():
value = sampled_data[stat]
send_data = "%s:%s" % (stat, value)
udp_sock.sendto(send_data, addr)
udp_sock.sendto(send_data, self.addr)
except:
import sys
from pprint import pprint
print "Unexpected error:", pprint(sys.exc_info())
pass # we don't care
if __name__=="__main__":
c = StatsdClient()
c.increment('example.python')

Просмотреть файл

@ -5,8 +5,8 @@ require 'yaml'
# Ian Sefferman <iseff@iseff.com>
# http://www.iseff.com
# If this is running in a Rails environment, will pick up config/statsd.yml.
# If this is running in a Rails environment, will pick up config/statsd.yml.
# config/statsd.yml should look like:
# production:
# host: statsd.domain.com
@ -24,7 +24,7 @@ require 'yaml'
# Sends statistics to the stats daemon over UDP
class Statsd
def self.timing(stats, time, sample_rate=1)
Statsd.update_stats(stats, time, sample_rate, 'ms')
end
@ -41,6 +41,10 @@ class Statsd
Statsd.update_stats(stats, value, sample_rate, 'g')
end
def self.sets(stats, value, sample_rate=1)
Statsd.update_stats(stats, value, sample_rate, 's')
end
def self.update_stats(stats, delta=1, sample_rate=1, metric='c')
stats = [stats].flatten
@ -56,7 +60,7 @@ class Statsd
begin
host = config["host"] || "localhost"
port = config["port"] || "8125"
sampled_data = {}
if sample_rate < 1
if rand <= sample_rate
@ -67,7 +71,7 @@ class Statsd
else
sampled_data = data
end
udp = UDPSocket.new
sampled_data.each_pair do |stat, val|
send_data = "%s:%s" % [stat, val]
@ -80,7 +84,7 @@ class Statsd
def self.config
return @@config if self.class_variable_defined?(:@@config)
begin
begin
config_path = File.join(File.dirname(__FILE__), "statsd.yml")
# for Rails environments, check Rails.root/config/statsd.yml
if defined? Rails

Просмотреть файл

@ -21,8 +21,8 @@ var Configurator = function (file) {
this.updateConfig();
fs.watchFile(file, function (curr, prev) {
if (curr.ino != prev.ino) { self.updateConfig(); }
fs.watch(file, function (event, filename) {
if (event == 'change') { self.updateConfig(); }
});
};

Просмотреть файл

@ -18,9 +18,9 @@ Logger.prototype = {
log: function (msg, type) {
if (this.backend == 'stdout') {
if (!type) {
type = 'DEBUG: ';
type = 'DEBUG';
}
this.util.log(type + msg);
this.util.log(type + ": " + msg);
} else {
if (!type) {
type = this.level

112
lib/process_metrics.js Normal file
Просмотреть файл

@ -0,0 +1,112 @@
var process_metrics = function (metrics, flushInterval, ts, flushCallback) {
var starttime = Date.now();
var key;
var counter_rates = {};
var timer_data = {};
var statsd_metrics = {};
var counters = metrics.counters;
var timers = metrics.timers;
var pctThreshold = metrics.pctThreshold;
var histogram = metrics.histogram;
for (key in counters) {
var value = counters[key];
// calculate "per second" rate
var valuePerSecond = value / (flushInterval / 1000);
counter_rates[key] = valuePerSecond;
}
for (key in timers) {
if (timers[key].length > 0) {
timer_data[key] = {};
var current_timer_data = {};
var values = timers[key].sort(function (a,b) { return a-b; });
var count = values.length;
var min = values[0];
var max = values[count - 1];
var cumulativeValues = [min];
for (var i = 1; i < count; i++) {
cumulativeValues.push(values[i] + cumulativeValues[i-1]);
}
var sum = min;
var mean = min;
var maxAtThreshold = max;
var message = "";
var key2;
for (key2 in pctThreshold) {
var pct = pctThreshold[key2];
if (count > 1) {
var numInThreshold = Math.round(pct / 100 * count);
maxAtThreshold = values[numInThreshold - 1];
sum = cumulativeValues[numInThreshold - 1];
mean = sum / numInThreshold;
}
var clean_pct = '' + pct;
clean_pct = clean_pct.replace('.', '_');
current_timer_data["mean_" + clean_pct] = mean;
current_timer_data["upper_" + clean_pct] = maxAtThreshold;
current_timer_data["sum_" + clean_pct] = sum;
}
sum = cumulativeValues[count-1];
mean = sum / count;
var sumOfDiffs = 0;
for (var i = 0; i < count; i++) {
sumOfDiffs += (values[i] - mean) * (values[i] - mean);
}
var stddev = Math.sqrt(sumOfDiffs / count);
current_timer_data["std"] = stddev;
current_timer_data["upper"] = max;
current_timer_data["lower"] = min;
current_timer_data["count"] = count;
current_timer_data["sum"] = sum;
current_timer_data["mean"] = mean;
// note: values bigger than the upper limit of the last bin are ignored, by design
conf = histogram || [];
bins = [];
for (var i = 0; i < conf.length; i++) {
if (key.indexOf(conf[i].metric) > -1) {
bins = conf[i].bins;
break;
}
}
// the outer loop iterates bins, the inner loop iterates timer values;
// within each run of the inner loop we should only consider the timer value range that's within the scope of the current bin
// so we leverage the fact that the values are already sorted to end up with only full 1 iteration of the entire values range
var i = 0;
for (var bin_i = 0; bin_i < bins.length; bin_i++) {
var freq = 0;
for (; i < count && (bins[bin_i] == 'inf' || values[i] < bins[bin_i]); i++) {
freq += 1;
}
bin_name = ('bin_' + bins[bin_i]).replace('.','_');
current_timer_data[bin_name] = freq;
}
timer_data[key] = current_timer_data;
}
}
statsd_metrics["processing_time"] = (Date.now() - starttime);
//add processed metrics to the metrics_hash
metrics.counter_rates = counter_rates;
metrics.timer_data = timer_data;
metrics.statsd_metrics = statsd_metrics;
flushCallback(metrics);
}
exports.process_metrics = process_metrics

Просмотреть файл

@ -3,7 +3,10 @@
"description": "A simple, lightweight network daemon to collect metrics over UDP",
"author": "Etsy",
"scripts": {
"test": "./run_tests.sh"
"test": "./run_tests.sh",
"start": "node stats.js config.js",
"install-windows-service": "node_modules\\.bin\\winser -i",
"uninstall-windows-service": "node_modules\\.bin\\winser -r"
},
"repository": {
"type": "git",
@ -19,9 +22,10 @@
"temp": "0.4.x"
},
"optionalDependencies": {
"node-syslog":"1.1.3"
"node-syslog":"1.1.3",
"winser": "=0.0.11"
},
"engine": {
"engines": {
"node" : ">=0.4"
},
"bin": { "statsd": "./bin/statsd" }

Просмотреть файл

@ -1,11 +1,13 @@
var dgram = require('dgram')
, util = require('util')
, net = require('net')
, config = require('./config')
, config = require('./lib/config')
, fs = require('fs')
, events = require('events')
, logger = require('./lib/logger')
, set = require('./lib/set')
, pm = require('./lib/process_metrics')
// initialize data structures with defaults for statsd stats
var keyCounter = {};
@ -13,12 +15,11 @@ var counters = {
"statsd.packets_received": 0,
"statsd.bad_lines_seen": 0
};
var timers = {
"statsd.packet_process_time": []
};
var timers = {};
var gauges = {};
var sets = {
};
var sets = {};
var counter_rates = {};
var timer_data = {};
var pctThreshold = null;
var debugInt, flushInterval, keyFlushInt, server, mgmtServer;
var startup_time = Math.round(new Date().getTime() / 1000);
@ -39,6 +40,9 @@ function loadBackend(config, name) {
}
};
// global for conf
var conf;
// Flush metrics to each backend.
function flushMetrics() {
var time_stamp = Math.round(new Date().getTime() / 1000);
@ -48,14 +52,22 @@ function flushMetrics() {
gauges: gauges,
timers: timers,
sets: sets,
pctThreshold: pctThreshold
counter_rates: counter_rates,
timer_data: timer_data,
pctThreshold: pctThreshold,
histogram: config.histogram
}
// After all listeners, reset the stats
backendEvents.once('flush', function clear_metrics(ts, metrics) {
// Clear the counters
conf.deleteCounters = conf.deleteCounters || false;
for (key in metrics.counters) {
metrics.counters[key] = 0;
if (conf.deleteCounters) {
delete(metrics.counters[key]);
} else {
metrics.counters[key] = 0;
}
}
// Clear the timers
@ -69,14 +81,16 @@ function flushMetrics() {
}
});
// Flush metrics to each backend.
backendEvents.emit('flush', time_stamp, metrics_hash);
pm.process_metrics(metrics_hash, flushInterval, time_stamp, function emitFlush(metrics) {
backendEvents.emit('flush', time_stamp, metrics);
});
};
var stats = {
messages: {
last_msg_seen: startup_time,
bad_lines_seen: 0,
bad_lines_seen: 0
}
};
@ -84,6 +98,7 @@ var stats = {
var l;
config.configFile(process.argv[2], function (config, oldConfig) {
conf = config;
if (! config.debug && debugInt) {
clearInterval(debugInt);
debugInt = false;
@ -155,8 +170,15 @@ config.configFile(process.argv[2], function (config, oldConfig) {
}
sets[key].insert(fields[0] || '0');
} else {
if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
if (fields[2]) {
if (fields[2].match(/^@([\d\.]+)/)) {
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
} else {
l.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"; has invalid sample rate');
counters["statsd.bad_lines_seen"]++;
stats['messages']['bad_lines_seen']++;
continue;
}
}
if (! counters[key]) {
counters[key] = 0;
@ -301,7 +323,7 @@ config.configFile(process.argv[2], function (config, oldConfig) {
if (keyFlushInterval > 0) {
var keyFlushPercent = Number((config.keyFlush && config.keyFlush.percent) || 100);
var keyFlushLog = (config.keyFlush && config.keyFlush.log) || "stdout";
var keyFlushLog = config.keyFlush && config.keyFlush.log;
keyFlushInt = setInterval(function () {
var key;
@ -321,9 +343,13 @@ config.configFile(process.argv[2], function (config, oldConfig) {
logMessage += timeString + " count=" + sortedKeys[i][1] + " key=" + sortedKeys[i][0] + "\n";
}
var logFile = fs.createWriteStream(keyFlushLog, {flags: 'a+'});
logFile.write(logMessage);
logFile.end();
if (keyFlushLog) {
var logFile = fs.createWriteStream(keyFlushLog, {flags: 'a+'});
logFile.write(logMessage);
logFile.end();
} else {
process.stdout.write(logMessage);
}
// clear the counter
keyCounter = {};

Просмотреть файл

@ -0,0 +1,263 @@
var fs = require('fs'),
net = require('net'),
temp = require('temp'),
spawn = require('child_process').spawn,
util = require('util'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
qsparse = require('querystring').parse,
http = require('http');
var writeconfig = function(text,worker,cb,obj){
temp.open({suffix: '-statsdconf.js'}, function(err, info) {
if (err) throw err;
fs.writeSync(info.fd, text);
fs.close(info.fd, function(err) {
if (err) throw err;
worker(info.path,cb,obj);
});
});
}
var array_contents_are_equal = function(first,second){
var intlen = _.intersection(first,second).length;
var unlen = _.union(first,second).length;
return (intlen == unlen) && (intlen == first.length);
}
var statsd_send = function(data,sock,host,port,cb){
send_data = new Buffer(data);
sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
if (err) {
throw err;
}
cb();
});
}
// keep collecting data until a specified timeout period has elapsed
// this will let us capture all data chunks so we don't miss one
var collect_for = function(server,timeout,cb){
var received = [];
var in_flight = 0;
var timed_out = false;
var collector = function(req,res){
in_flight += 1;
var body = '';
req.on('data',function(data){ body += data; });
req.on('end',function(){
received = received.concat(body.split("\n"));
in_flight -= 1;
if((in_flight < 1) && timed_out){
server.removeListener('request',collector);
cb(received);
}
});
}
setTimeout(function (){
timed_out = true;
if((in_flight < 1)) {
server.removeListener('connection',collector);
cb(received);
}
},timeout);
server.on('connection',collector);
}
module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
var configfile = "{graphService: \"graphite\"\n\
, batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, percentThreshold: 90\n\
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, deleteCounters: true\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";
this.acceptor = net.createServer();
this.acceptor.listen(this.testport);
this.sock = dgram.createSocket('udp4');
this.server_up = true;
this.ok_to_die = false;
this.exit_callback_callback = process.exit;
writeconfig(configfile,function(path,cb,obj){
obj.path = path;
obj.server = spawn('node',['stats.js', path]);
obj.exit_callback = function (code) {
obj.server_up = false;
if(!obj.ok_to_die){
console.log('node server unexpectedly quit with code: ' + code);
process.exit(1);
}
else {
obj.exit_callback_callback();
}
};
obj.server.on('exit', obj.exit_callback);
obj.server.stderr.on('data', function (data) {
console.log('stderr: ' + data.toString().replace(/\n$/,''));
});
/*
obj.server.stdout.on('data', function (data) {
console.log('stdout: ' + data.toString().replace(/\n$/,''));
});
*/
obj.server.stdout.on('data', function (data) {
// wait until server is up before we finish setUp
if (data.toString().match(/server is up/)) {
cb();
}
});
},callback,this);
},
tearDown: function (callback) {
this.sock.close();
this.acceptor.close();
this.ok_to_die = true;
if(this.server_up){
this.exit_callback_callback = callback;
this.server.kill();
} else {
callback();
}
},
send_well_formed_posts: function (test) {
test.expect(2);
// we should integrate a timeout into this
this.acceptor.once('connection',function(c){
var body = '';
c.on('data',function(d){ body += d; });
c.on('end',function(){
var rows = body.split("\n");
var entries = _.map(rows, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsd.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsd.numStats' })['statsd.numStats'],2);
test.done();
});
});
},
send_malformed_post: function (test) {
test.expect(3);
var testvalue = 1;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_bad_test_value|z',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 2);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 0');
var bad_lines_seen_value_test = function(post){
var mykey = 'stats_counts.statsd.bad_lines_seen';
return _.include(_.keys(post),mykey) && isNaN(post[mykey]);
};
test.ok(_.any(hashes,bad_lines_seen_value_test), 'stats_counts.statsd.bad_lines_seen should be ' + testvalue);
test.done();
});
});
});
},
timers_are_valid: function (test) {
test.expect(3);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 2);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean_90';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue);
test.done();
});
});
});
},
counts_are_valid: function (test) {
test.expect(4);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 2);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
var testavgvalue_test = function(post){
var mykey = 'stats.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000)));
};
test.ok(_.any(hashes,testavgvalue_test), 'stats.a_test_value should be ' + (testvalue/(me.myflush / 1000)));
var testcountvalue_test = function(post){
var mykey = 'stats_counts.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testcountvalue_test), 'stats_counts.a_test_value should be ' + testvalue);
test.done();
});
});
});
}
}

Просмотреть файл

@ -0,0 +1,229 @@
var fs = require('fs'),
net = require('net'),
temp = require('temp'),
spawn = require('child_process').spawn,
util = require('util'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
qsparse = require('querystring').parse,
http = require('http');
var writeconfig = function(text,worker,cb,obj){
temp.open({suffix: '-statsdconf.js'}, function(err, info) {
if (err) throw err;
fs.writeSync(info.fd, text);
fs.close(info.fd, function(err) {
if (err) throw err;
worker(info.path,cb,obj);
});
});
}
var array_contents_are_equal = function(first,second){
var intlen = _.intersection(first,second).length;
var unlen = _.union(first,second).length;
return (intlen == unlen) && (intlen == first.length);
}
var statsd_send = function(data,sock,host,port,cb){
send_data = new Buffer(data);
sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
if (err) {
throw err;
}
cb();
});
}
// keep collecting data until a specified timeout period has elapsed
// this will let us capture all data chunks so we don't miss one
var collect_for = function(server,timeout,cb){
var received = [];
var in_flight = 0;
var timed_out = false;
var collector = function(req,res){
in_flight += 1;
var body = '';
req.on('data',function(data){ body += data; });
req.on('end',function(){
received = received.concat(body.split("\n"));
in_flight -= 1;
if((in_flight < 1) && timed_out){
server.removeListener('request',collector);
cb(received);
}
});
}
setTimeout(function (){
timed_out = true;
if((in_flight < 1)) {
server.removeListener('connection',collector);
cb(received);
}
},timeout);
server.on('connection',collector);
}
module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
var configfile = "{graphService: \"graphite\"\n\
, batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, percentThreshold: 90\n\
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";
this.acceptor = net.createServer();
this.acceptor.listen(this.testport);
this.sock = dgram.createSocket('udp4');
this.server_up = true;
this.ok_to_die = false;
this.exit_callback_callback = process.exit;
writeconfig(configfile,function(path,cb,obj){
obj.path = path;
obj.server = spawn('node',['stats.js', path]);
obj.exit_callback = function (code) {
obj.server_up = false;
if(!obj.ok_to_die){
console.log('node server unexpectedly quit with code: ' + code);
process.exit(1);
}
else {
obj.exit_callback_callback();
}
};
obj.server.on('exit', obj.exit_callback);
obj.server.stderr.on('data', function (data) {
console.log('stderr: ' + data.toString().replace(/\n$/,''));
});
/*
obj.server.stdout.on('data', function (data) {
console.log('stdout: ' + data.toString().replace(/\n$/,''));
});
*/
obj.server.stdout.on('data', function (data) {
// wait until server is up before we finish setUp
if (data.toString().match(/server is up/)) {
cb();
}
});
},callback,this);
},
tearDown: function (callback) {
this.sock.close();
this.acceptor.close();
this.ok_to_die = true;
if(this.server_up){
this.exit_callback_callback = callback;
this.server.kill();
} else {
callback();
}
},
send_well_formed_posts: function (test) {
test.expect(2);
// we should integrate a timeout into this
this.acceptor.once('connection',function(c){
var body = '';
c.on('data',function(d){ body += d; });
c.on('end',function(){
var rows = body.split("\n");
var entries = _.map(rows, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsd.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsd.numStats' })['statsd.numStats'],2);
test.done();
});
});
},
timers_are_valid: function (test) {
test.expect(3);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean_90';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean should be ' + testvalue);
test.done();
});
});
});
},
counts_are_valid: function (test) {
test.expect(4);
var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|c',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
var testavgvalue_test = function(post){
var mykey = 'stats.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000)));
};
test.ok(_.any(hashes,testavgvalue_test), 'stats.a_test_value should be ' + (testvalue/(me.myflush / 1000)));
var testcountvalue_test = function(post){
var mykey = 'stats_counts.a_test_value';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testcountvalue_test), 'stats_counts.a_test_value should be ' + testvalue);
test.done();
});
});
});
}
}

Просмотреть файл

@ -79,6 +79,7 @@ module.exports = {
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, graphite: { legacyNamespace: false }\n\
, graphitePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"}";
@ -148,13 +149,46 @@ module.exports = {
data[chunks[0]] = chunks[1];
return data;
});
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'statsd.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'statsd.numStats' })['statsd.numStats'],2);
test.ok(_.include(_.map(entries,function(x) { return _.keys(x)[0] }),'stats.statsd.numStats'),'graphite output includes numStats');
test.equal(_.find(entries, function(x) { return _.keys(x)[0] == 'stats.statsd.numStats' })['stats.statsd.numStats'],2);
test.done();
});
});
},
send_malformed_post: function (test) {
test.expect(3);
var testvalue = 1;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_bad_test_value|z',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(strings){
test.ok(strings.length > 0,'should receive some data');
var hashes = _.map(strings, function(x) {
var chunks = x.split(' ');
var data = {};
data[chunks[0]] = chunks[1];
return data;
});
var numstat_test = function(post){
var mykey = 'stats.statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 2);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 0');
var bad_lines_seen_value_test = function(post){
var mykey = 'stats.counters.statsd.bad_lines_seen.count';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,bad_lines_seen_value_test), 'stats.counters.statsd.bad_lines_seen.count should be ' + testvalue);
test.done();
});
});
});
},
timers_are_valid: function (test) {
test.expect(3);
@ -171,10 +205,10 @@ module.exports = {
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
var mykey = 'stats.statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
test.ok(_.any(hashes,numstat_test), 'stats.statsd.numStats should be 1');
var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean_90';
@ -204,22 +238,22 @@ module.exports = {
return data;
});
var numstat_test = function(post){
var mykey = 'statsd.numStats';
var mykey = 'stats.statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 1');
test.ok(_.any(hashes,numstat_test), 'statsd.numStats should be 3');
var testavgvalue_test = function(post){
var mykey = 'stats.a_test_value';
var mykey = 'stats.counters.a_test_value.rate';
return _.include(_.keys(post),mykey) && (post[mykey] == (testvalue/(me.myflush / 1000)));
};
test.ok(_.any(hashes,testavgvalue_test), 'stats.a_test_value should be ' + (testvalue/(me.myflush / 1000)));
test.ok(_.any(hashes,testavgvalue_test), 'a_test_value.rate should be ' + (testvalue/(me.myflush / 1000)));
var testcountvalue_test = function(post){
var mykey = 'stats_counts.a_test_value';
var mykey = 'stats.counters.a_test_value.count';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
test.ok(_.any(hashes,testcountvalue_test), 'stats_counts.a_test_value should be ' + testvalue);
test.ok(_.any(hashes,testcountvalue_test), 'a_test_value.count should be ' + testvalue);
test.done();
});

Просмотреть файл

@ -0,0 +1,127 @@
var pm = require('../lib/process_metrics')
module.exports = {
setUp: function (callback) {
this.time_stamp = Math.round(new Date().getTime() / 1000);
var counters = {};
var gauges = {};
var timers = {};
var sets = {};
var pctThreshold = null;
this.metrics = {
counters: counters,
gauges: gauges,
timers: timers,
sets: sets,
pctThreshold: pctThreshold
}
callback();
},
counters_has_stats_count: function(test) {
test.expect(1);
this.metrics.counters['a'] = 2;
pm.process_metrics(this.metrics, 1000, this.time_stamp, function(){});
test.equal(2, this.metrics.counters['a']);
test.done();
},
counters_has_correct_rate: function(test) {
test.expect(1);
this.metrics.counters['a'] = 2;
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
test.equal(20, this.metrics.counter_rates['a']);
test.done();
},
timers_handle_empty: function(test) {
test.expect(1);
this.metrics.timers['a'] = [];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
//potentially a cleaner way to check this
test.equal(undefined, this.metrics.counter_rates['a']);
test.done();
},
timers_single_time: function(test) {
test.expect(6);
this.metrics.timers['a'] = [100];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
timer_data = this.metrics.timer_data['a'];
test.equal(0, timer_data.std);
test.equal(100, timer_data.upper);
test.equal(100, timer_data.lower);
test.equal(1, timer_data.count);
test.equal(100, timer_data.sum);
test.equal(100, timer_data.mean);
test.done();
},
timers_multiple_times: function(test) {
test.expect(6);
this.metrics.timers['a'] = [100, 200, 300];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
timer_data = this.metrics.timer_data['a'];
test.equal(81.64965809277261, timer_data.std);
test.equal(300, timer_data.upper);
test.equal(100, timer_data.lower);
test.equal(3, timer_data.count);
test.equal(600, timer_data.sum);
test.equal(200, timer_data.mean);
test.done();
},
timers_single_time_single_percentile: function(test) {
test.expect(3);
this.metrics.timers['a'] = [100];
this.metrics.pctThreshold = [90];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
timer_data = this.metrics.timer_data['a'];
test.equal(100, timer_data.mean_90);
test.equal(100, timer_data.upper_90);
test.equal(100, timer_data.sum_90);
test.done();
},
timers_single_time_multiple_percentiles: function(test) {
test.expect(6);
this.metrics.timers['a'] = [100];
this.metrics.pctThreshold = [90, 80];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
timer_data = this.metrics.timer_data['a'];
test.equal(100, timer_data.mean_90);
test.equal(100, timer_data.upper_90);
test.equal(100, timer_data.sum_90);
test.equal(100, timer_data.mean_80);
test.equal(100, timer_data.upper_80);
test.equal(100, timer_data.sum_80);
test.done();
},
timers_multiple_times_single_percentiles: function(test) {
test.expect(3);
this.metrics.timers['a'] = [100, 200, 300];
this.metrics.pctThreshold = [90];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
timer_data = this.metrics.timer_data['a'];
test.equal(200, timer_data.mean_90);
test.equal(300, timer_data.upper_90);
test.equal(600, timer_data.sum_90);
test.done();
},
timers_multiple_times_multiple_percentiles: function(test) {
test.expect(6);
this.metrics.timers['a'] = [100, 200, 300];
this.metrics.pctThreshold = [90, 80];
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
timer_data = this.metrics.timer_data['a'];
test.equal(200, timer_data.mean_90);
test.equal(300, timer_data.upper_90);
test.equal(600, timer_data.sum_90);
test.equal(150, timer_data.mean_80);
test.equal(200, timer_data.upper_80);
test.equal(300, timer_data.sum_80);
test.done();
},
statsd_metrics_exist: function(test) {
test.expect(1);
pm.process_metrics(this.metrics, 100, this.time_stamp, function(){});
statsd_metrics = this.metrics.statsd_metrics;
test.notEqual(undefined, statsd_metrics["processing_time"]);
test.done();
}
}