[api] add querying and streaming. optional capped collection.

This commit is contained in:
Christopher Jeffrey 2012-05-17 11:07:08 -05:00
Родитель d150a1d29d
Коммит 72e2c454d9
2 изменённых файлов: 167 добавлений и 26 удалений

Просмотреть файл

@ -10,6 +10,7 @@ var util = require('util');
var mongodb = require('mongodb');
var winston = require('winston');
var common = require('winston/lib/winston/common');
var Stream = require('stream').Stream;
//
// ### function MongoDB (options)
@ -28,13 +29,16 @@ var MongoDB = exports.MongoDB = function (options) {
this.db = options.db;
this.host = options.host || 'localhost';
this.port = options.port || mongodb.Connection.DEFAULT_PORT;
this.collection = options.collection || "log";
this.collection = options.collection || 'log';
this.safe = options.safe || true;
this.level = options.level || 'info';
this.silent = options.silent || false;
this.username = options.username || null;
this.password = options.password || null;
this.errorTimeout = options.errorTimeout || 10000;
this.capped = options.capped;
// TODO: possibly go by docs (`max`) instead
this.cappedSize = options.cappedSize || 10000000;
if (options.keepAlive !== true) {
//
@ -125,6 +129,10 @@ MongoDB.prototype.log = function (level, msg, meta, callback) {
return onError(err);
}
if (typeof meta !== 'object' && meta != null) {
meta = { meta: meta };
}
var entry = common.clone(meta) || {};
entry.timestamp = new Date;
entry.level = level;
@ -155,18 +163,25 @@ MongoDB.prototype.query = function (options, callback) {
options = {};
}
if (this.state !== 'opened') {
var self = this;
return this.open(function () {
return self.query(options, callback);
});
}
var self = this,
options = this.normalizeQuery(options),
query = {},
opt = {},
fields;
if (options.fields) {
fields = {};
options.fields.forEach(function (key) {
fields[key] = 1;
});
}
// if (options.fields) {
// fields = {};
// options.fields.forEach(function (key) {
// fields[key] = 1;
// });
// }
query = {
timestamp: {
@ -176,21 +191,33 @@ MongoDB.prototype.query = function (options, callback) {
};
opt = {
// .skip(options.start)
skip: options.start,
// .limit(options.rows)
limit: options.rows,
// .sort({timestamp: -1})
sort: { timestamp: options.order === 'desc' ? -1 : 1 }
};
if (fields) {
opt.fields = fields;
// if (fields) {
// // col.find({}, {field: 1}, {})
// opt.fields = fields;
// }
if (options.fields) {
// col.find({}, {field: 1}, {})
opt.fields = options.fields;
}
this._db.collection(this.collection, function (err, col) {
col.find(query, opt).toArray(function (err, docs) {
if (callback) {
if (err) return callback(err);
callback(null, docs);
if (err) return callback(err);
if (options.fields) {
docs.forEach(function (log) {
delete log._id;
});
}
if (callback) callback(null, docs);
});
});
};
@ -198,15 +225,112 @@ MongoDB.prototype.query = function (options, callback) {
//
// ### function stream (options)
// #### @options {Object} Stream options for this instance.
// #### @stream {Object} Pass in a pre-existing stream.
// Returns a log stream for this transport. Options object is optional.
// This will only work with a capped collection.
//
MongoDB.prototype.stream = function (options) {
MongoDB.prototype.stream = function (options, stream) {
var self = this,
options = options || {},
stream = new Stream,
last,
stream = stream || new Stream,
start = options.start;
if (this.state !== 'opened') {
this.open(function () {
self.stream(options, stream);
});
return stream;
}
stream.destroy = function() {
this.destroyed = true;
};
if (start === -1) {
start = null;
}
if (start == null) {
; // get collection size
}
var opt = {
skip: start || 0,
tailable: true
};
this._db.collection(this.collection, function (err, col) {
if (stream.destroyed) return;
// col.isCapped(function (err, isCapped) {
// if (err) return stream.emit('error', err);
// if (!isCapped) {
// return self.streamPoll(options, stream);
// }
// next();
// });
// if (start == null) {
// col.count({}, function (err, count) {
// opt.skip = count;
// next();
// });
// }
// .sort({ $natural: 1 })
var cursor = col.find({}, opt);
// tail cursor
var tail = cursor.stream();
stream.destroy = function() {
this.destroyed = true;
tail.destroy();
};
tail.on('data', function (doc) {
stream.emit('log', doc);
});
tail.on('error', function (err) {
if (typeof err !== 'object') {
err = new Error(err);
}
// hack because isCapped doesn't work
if (err.message === 'tailable cursor requested on non capped collection') {
//if (~(err.message + '').indexOf('non capped collection')) {
tail.destroy();
self.streamPoll(options, stream);
return;
}
stream.emit('error', err);
});
});
return stream;
};
//
// ### function streamPoll (options)
// #### @options {Object} Stream options for this instance.
// #### @stream {Object} Pass in a pre-existing stream.
// Returns a log stream for this transport. Options object is optional.
//
MongoDB.prototype.streamPoll = function (options, stream) {
var self = this,
options = options || {},
stream = stream || new Stream,
start = options.start,
row = 0;
last;
if (this.state !== 'opened') {
this.open(function () {
self.streamPoll(options, stream);
});
return stream;
}
if (start === -1) {
start = null;
@ -251,6 +375,21 @@ MongoDB.prototype.stream = function (options) {
return stream;
};
//
// ### function _ensureCollection (callback)
// #### @db {Object} The database to create the collection in.
// #### @callback {function} Continuation to respond to when complete
// Attempt to create a capped collection if possible.
//
MongoDB.prototype._ensureCollection = function (db, callback) {
if (!this.capped) return callback(null);
var opt = { capped: true, size: this.cappedSize };
db.createCollection(this.collection, opt, function (err, col) {
callback(null);
});
};
//
// ### function open (callback)
// #### @callback {function} Continuation to respond to when complete
@ -337,13 +476,15 @@ MongoDB.prototype.open = function (callback) {
return onError(err);
}
if (self.username && self.password) {
return self.client.authenticate(self.username, self.password, function (err) {
return err ? onError(err) : onSuccess(db);
});
}
self._ensureCollection(db, function () {
if (self.username && self.password) {
return self.client.authenticate(self.username, self.password, function (err) {
return err ? onError(err) : onSuccess(db);
});
}
onSuccess(db);
onSuccess(db);
});
});
};

Просмотреть файл

@ -14,9 +14,9 @@ var path = require('path'),
transport = require('winston/test/transports/transport'),
MongoDB = require('../lib/winston-mongodb').MongoDB;
var config = helpers.loadConfig(__dirname);
vows.describe('winston-mongodb').addBatch({
"An instance of the MongoDB Transport":
transport(MongoDB, config.transports.mongodb)
"An instance of the MongoDB Transport": transport(MongoDB, {
db: 'winston',
keepAlive: 1000
})
}).export(module);