зеркало из https://github.com/microsoft/statsd.git
Update to use config library, make cacheSize a config option, and switch to using the health status
This commit is contained in:
Родитель
821cfbca8d
Коммит
f014b0115c
|
@ -8,7 +8,7 @@ Create a proxyConfig.js file
|
|||
cp exampleProxyConfig.js proxyConfig.js
|
||||
|
||||
Once you have a config file run:
|
||||
node proxy.js
|
||||
node proxy.js proxyConfig.js
|
||||
|
||||
|
||||
It uses a consistent hashring to send the unique metric names to the same statsd instances so that
|
||||
|
|
|
@ -1,18 +1,30 @@
|
|||
var nodes = [
|
||||
{host: '127.0.0.1', port: 8125, adminport: 8126},
|
||||
{host: '127.0.0.1', port: 8128, adminport: 8127},
|
||||
/*
|
||||
|
||||
Required Variables:
|
||||
|
||||
port: StatsD Cluster Proxy listening port [default: 8125]
|
||||
nodes: list of StatsD instances
|
||||
host: address of an instance of StatsD
|
||||
port: port that this instance is listening on
|
||||
adminport: port that this instance is listening on for the admininterface
|
||||
|
||||
Optional Variables:
|
||||
|
||||
udp_version: defines if the address is an IPv4 or IPv6 address ['udp4' or 'udp6', default: 'udp4']
|
||||
host: address to listen on over UDP [default: 0.0.0.0]
|
||||
checkInterval: health status check interval [default: 10000]
|
||||
cacheSize: size of the cache to store for hashring key lookups [default: 10000]
|
||||
|
||||
*/
|
||||
{
|
||||
nodes: [
|
||||
{host: '127.0.0.1', port: 8129, adminport: 8126},
|
||||
{host: '127.0.0.1', port: 8127, adminport: 8128},
|
||||
{host: '127.0.0.1', port: 8129, adminport: 8130}
|
||||
];
|
||||
|
||||
var udp_version = 'udp4';
|
||||
var host = '0.0.0.0';
|
||||
var port = '8131';
|
||||
var checkInterval = 1000;
|
||||
|
||||
|
||||
// Exports the set variables
|
||||
exports.nodes = nodes;
|
||||
exports.host = host;
|
||||
exports.port = port;
|
||||
exports.checkInterval = checkInterval;
|
||||
exports.udp_version = udp_version;
|
||||
],
|
||||
udp_version: 'udp4',
|
||||
host: '0.0.0.0',
|
||||
port: '8125',
|
||||
checkInterval: 1000,
|
||||
cacheSize: 10000
|
||||
}
|
||||
|
|
202
proxy.js
202
proxy.js
|
@ -2,106 +2,126 @@ var dgram = require('dgram')
|
|||
, net = require('net')
|
||||
, events = require('events')
|
||||
, hashring = require('hashring')
|
||||
, config = require('./proxyConfig');
|
||||
|
||||
var udp_version = config.udp_version
|
||||
, nodes = config.nodes;
|
||||
, configlib = require('./lib/config');
|
||||
|
||||
var packet = new events.EventEmitter();
|
||||
var node_status = [];
|
||||
var node_ring = {};
|
||||
var config;
|
||||
|
||||
//load the node_ring object with the available nodes and a weight of 100
|
||||
// weight is currently arbitrary but the same for all
|
||||
nodes.forEach(function(element, index, array) {
|
||||
node_ring[element.host + ':' + element.port] = 100;
|
||||
});
|
||||
configlib.configFile(process.argv[2], function (conf, oldConfig) {
|
||||
config = conf;
|
||||
var udp_version = config.udp_version
|
||||
, nodes = config.nodes;
|
||||
|
||||
var ring = new hashring(
|
||||
node_ring, 'md5', {
|
||||
'max cache size': 10000,
|
||||
//We don't want duplicate keys sent so replicas set to 0
|
||||
'replicas': 0
|
||||
});
|
||||
|
||||
// Do an initial rount of health checks prior to starting up the server
|
||||
doHealthChecks();
|
||||
|
||||
// Setup the udp listener
|
||||
var server = dgram.createSocket(udp_version, function (msg, rinfo) {
|
||||
// Convert the raw packet to a string (defaults to UTF8 encoding)
|
||||
var packet_data = msg.toString();
|
||||
// If the packet contains a \n then it contains multiple metrics
|
||||
if (packet_data.indexOf("\n") > -1) {
|
||||
var metrics = packet_data.split("\n");
|
||||
} else {
|
||||
// metrics needs to be an array to fake it for single metric packets
|
||||
var metrics = [ packet_data ] ;
|
||||
}
|
||||
|
||||
// Loop through the metrics and split on : to get mertric name for hashing
|
||||
for (var midx in metrics) {
|
||||
var bits = metrics[midx].toString().split(':');
|
||||
var key = bits.shift();
|
||||
packet.emit('send', key, msg);
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for the send message, and process the metric key and msg
|
||||
packet.on('send', function(key, msg) {
|
||||
// retreives the destination for this key
|
||||
var statsd_host = ring.get(key);
|
||||
|
||||
// break the retreived host to pass to the send function
|
||||
var host_config = statsd_host.split(':');
|
||||
|
||||
var client = dgram.createSocket(udp_version);
|
||||
// Send the mesg to the backend
|
||||
client.send(msg, 0, msg.length, host_config[1], host_config[0], function(err, bytes) {
|
||||
client.close();
|
||||
});
|
||||
});
|
||||
|
||||
// Bind the listening udp server to the configured port and host
|
||||
server.bind(config.port, config.host || undefined);
|
||||
|
||||
// Set the interval for healthchecks
|
||||
setInterval(doHealthChecks, config.checkInterval || 10000);
|
||||
|
||||
// Perform health check on all nodes
|
||||
function doHealthChecks() {
|
||||
//load the node_ring object with the available nodes and a weight of 100
|
||||
// weight is currently arbitrary but the same for all
|
||||
nodes.forEach(function(element, index, array) {
|
||||
healthcheck(element);
|
||||
node_ring[element.host + ':' + element.port] = 100;
|
||||
});
|
||||
}
|
||||
|
||||
// Perform health check on node
|
||||
function healthcheck(node) {
|
||||
var node_id = node.host + ':' + node.port;
|
||||
var client = net.connect({port: node.adminport, host: node.host},
|
||||
function() {
|
||||
client.write('help\r\n');
|
||||
});
|
||||
client.on('data', function(data) {
|
||||
//could be checking data response here, but health check isn't pulled yet
|
||||
client.end();
|
||||
if (node_status[node_id] !== undefined) {
|
||||
var new_server = {};
|
||||
new_server[node_id] = 100;
|
||||
ring.add(new_server);
|
||||
}
|
||||
node_status[node_id] = 0;
|
||||
});
|
||||
client.on('error', function(e) {
|
||||
if (e.code == 'ECONNREFUSED') {
|
||||
ring.remove(node_id);
|
||||
if (node_status[node_id] === undefined) {
|
||||
node_status[node_id] = 1;
|
||||
} else if (node_status[node_id] > 0) {
|
||||
node_status[node_id]++;
|
||||
}
|
||||
var ring = new hashring(
|
||||
node_ring, 'md5', {
|
||||
'max cache size': config.cacheSize || 10000,
|
||||
//We don't want duplicate keys sent so replicas set to 0
|
||||
'replicas': 0
|
||||
});
|
||||
|
||||
// Do an initial rount of health checks prior to starting up the server
|
||||
doHealthChecks();
|
||||
|
||||
|
||||
// Setup the udp listener
|
||||
var server = dgram.createSocket(udp_version, function (msg, rinfo) {
|
||||
// Convert the raw packet to a string (defaults to UTF8 encoding)
|
||||
var packet_data = msg.toString();
|
||||
// If the packet contains a \n then it contains multiple metrics
|
||||
var metrics;
|
||||
if (packet_data.indexOf("\n") > -1) {
|
||||
metrics = packet_data.split("\n");
|
||||
} else {
|
||||
console.log('Errored with ' + e.code);
|
||||
// metrics needs to be an array to fake it for single metric packets
|
||||
metrics = [ packet_data ] ;
|
||||
}
|
||||
|
||||
// Loop through the metrics and split on : to get mertric name for hashing
|
||||
for (var midx in metrics) {
|
||||
var bits = metrics[midx].toString().split(':');
|
||||
var key = bits.shift();
|
||||
packet.emit('send', key, msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Listen for the send message, and process the metric key and msg
|
||||
packet.on('send', function(key, msg) {
|
||||
// retreives the destination for this key
|
||||
var statsd_host = ring.get(key);
|
||||
|
||||
// break the retreived host to pass to the send function
|
||||
var host_config = statsd_host.split(':');
|
||||
|
||||
var client = dgram.createSocket(udp_version);
|
||||
// Send the mesg to the backend
|
||||
client.send(msg, 0, msg.length, host_config[1], host_config[0], function(err, bytes) {
|
||||
client.close();
|
||||
});
|
||||
});
|
||||
|
||||
// Bind the listening udp server to the configured port and host
|
||||
server.bind(config.port, config.host || undefined);
|
||||
|
||||
// Set the interval for healthchecks
|
||||
setInterval(doHealthChecks, config.checkInterval || 10000);
|
||||
|
||||
// Perform health check on all nodes
|
||||
function doHealthChecks() {
|
||||
nodes.forEach(function(element, index, array) {
|
||||
healthcheck(element);
|
||||
});
|
||||
}
|
||||
|
||||
// Perform health check on node
|
||||
function healthcheck(node) {
|
||||
var node_id = node.host + ':' + node.port;
|
||||
var client = net.connect({port: node.adminport, host: node.host},
|
||||
function() {
|
||||
client.write('health\r\n');
|
||||
});
|
||||
client.on('data', function(data) {
|
||||
var health_status = data.toString();
|
||||
client.end();
|
||||
if (health_status.indexOf('up') < 0) {
|
||||
if (node_status[node_id] === undefined) {
|
||||
node_status[node_id] = 1;
|
||||
} else {
|
||||
node_status[node_id]++;
|
||||
}
|
||||
if (node_status[node_id] < 2) {
|
||||
ring.remove(node_id);
|
||||
}
|
||||
} else {
|
||||
if (node_status[node_id] !== undefined) {
|
||||
var new_server = {};
|
||||
new_server[node_id] = 100;
|
||||
ring.add(new_server);
|
||||
}
|
||||
node_status[node_id] = 0;
|
||||
}
|
||||
});
|
||||
client.on('error', function(e) {
|
||||
if (e.code == 'ECONNREFUSED') {
|
||||
if (node_status[node_id] === undefined) {
|
||||
node_status[node_id] = 1;
|
||||
} else {
|
||||
node_status[node_id]++;
|
||||
}
|
||||
if (node_status[node_id] < 2) {
|
||||
ring.remove(node_id);
|
||||
}
|
||||
} else {
|
||||
util.log('Errored with ' + e.code);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
});
|
||||
|
|
Загрузка…
Ссылка в новой задаче