A few more fixes for better compatibility with the latest mongodb.

Keep connection as long as there is a stream of logs. Close on inactivity.
This commit is contained in:
Yosef Dinerstein 2012-03-18 21:14:31 +02:00
Родитель 74caf00193
Коммит caa4957657
1 изменённых файлов: 68 добавлений и 51 удалений

Просмотреть файл

@ -31,18 +31,17 @@ var MongoDB = exports.MongoDB = function (options) {
this.silent = options.silent || false; this.silent = options.silent || false;
this.username = options.username || null; this.username = options.username || null;
this.password = options.password || null; this.password = options.password || null;
this.timeout = options.timeout || 10000; this.errortimeout = options.errortimeout || 10000;
if (!(options.keepAlive === true)) { if (options.keepAlive !== true) {
this.keepAlive = false; // Backward compatibility for timeout delivered in keepAlive parameter.
if (!options.timeout && options.keepAlive) { this.timeout = options.timeout || options.keepAlive || 10000;
// Backward compatibility for setting timeout via keepAlive.
this.timeout = options.keepAlive;
}
} }
this.state = 'unopened'; this.state = 'unopened';
this.timeoutId = null;
this.pending = []; this.pending = [];
this.client = new mongodb.Db(this.db, new mongodb.Server(this.host, this.port, {}), { this.server = new mongodb.Server(this.host, this.port, {});
this.client = new mongodb.Db(this.db, this.server, {
native_parser: false native_parser: false
}); });
}; };
@ -69,42 +68,68 @@ winston.transports.MongoDB = MongoDB;
MongoDB.prototype.log = function (level, msg, meta, callback) { MongoDB.prototype.log = function (level, msg, meta, callback) {
var self = this; var self = this;
if (this.silent) { // Avoid reentrancy that can be not assumed by database code.
return callback(null, true); // If database logs, better not to call database itself in the same call.
} process.nextTick(function () {
this.open(function (err) { if (self.silent) {
if (err) { return callback(null, true);
self.emit('error', err);
return callback(err, null);
} }
self._db.collection(self.collection, function (err, col) { self.open(function (err) {
if (err) { if (err) {
self.emit('error', err); self.emit('error', err);
return callback(err, null); return callback(err, null);
} }
var entry = { self._db.collection(self.collection, function (err, col) {
timestamp: new Date(), // RFC3339/ISO8601 format instead of common.timestamp()
level: level,
message: msg,
meta: meta
};
col.save(entry, { safe: self.safe }, function (err, doc) {
if (err) { if (err) {
self.emit('error', err); self.emit('error', err);
return callback(err, null); return callback(err, null);
} }
self.emit('logged'); var entry = {
return callback(null, true); timestamp: new Date(), // RFC3339/ISO8601 format instead of common.timestamp()
level: level,
message: msg,
meta: meta
};
col.save(entry, { safe: self.safe }, function (err, doc) {
if (err) {
self.emit('error', err);
return callback(err, null);
}
self.emit('logged');
// Delay timeout to start from the last successful log.
self.setTimeout();
return callback(null, true);
});
}); });
}); });
}); });
}; };
MongoDB.prototype.setTimeout = function () {
var self = this;
if (!self.timeout) {
return;
}
if (self.timeoutId) {
clearTimeout(self.timeoutId);
}
//
// Set a timeout to close the client connection unless `self.keepAlive`
// has been set to true in which case it is the responsibility of the
// programmer to close the underlying connection.
//
self.timeoutId = setTimeout(function () {
self.client.close();
self.state = 'unopened';
}, self.timeout);
}
// //
// ### function open (callback) // ### function open (callback)
// #### @callback {function} Continuation to respond to when complete // #### @callback {function} Continuation to respond to when complete
@ -114,25 +139,25 @@ MongoDB.prototype.log = function (level, msg, meta, callback) {
MongoDB.prototype.open = function (callback) { MongoDB.prototype.open = function (callback) {
var self = this; var self = this;
if (this.state === 'opening' || this.state === 'unopened') { if (self.state === 'opening' || self.state === 'unopened') {
// //
// While opening our MongoDB connection, append any callback // While opening our MongoDB connection, append any callback
// to a list that is managed by this instance. // to a list that is managed by this instance.
// //
this.pending.push(callback); self.pending.push(callback);
if (this.state === 'opening') { if (self.state === 'opening') {
return; return;
} }
} }
else if (this.state === 'opened') { else if (self.state === 'opened') {
return callback(); return callback();
} }
else if (this.state === 'error') { else if (self.state === 'error') {
return callback(self.error); return callback(self.error);
} }
function completion(err) { function flushCallbacks(err) {
// //
// Iterate over all callbacks that have accumulated during // Iterate over all callbacks that have accumulated during
// the creation of the TCP socket. // the creation of the TCP socket.
@ -143,37 +168,29 @@ MongoDB.prototype.open = function (callback) {
// Quickly truncate the Array (this is more performant). // Quickly truncate the Array (this is more performant).
self.pending.length = 0; self.pending.length = 0;
//
// Set a timeout to close the client connection unless `self.keepAlive`
// has been set to true in which case it is the responsibility of the
// programmer to close the underlying connection.
//
if (!self.keepAlive || err) {
setTimeout(function () {
self.state = 'unopened';
if (self._db) {
// Connection was opened.
self._db.close();
}
}, self.timeout);
}
} }
function onError(err) { function onError(err) {
self.state = 'error'; self.state = 'error';
self.error = err; self.error = err;
completion(err); flushCallbacks(err);
// Close to be able to attempt opening later.
self.client.close();
// Retry new connection upon following request after error timeout expired.
setTimeout(function () {
self.state = 'unopened';
}, self.errortimeout);
} }
function onSuccess(db) { function onSuccess(db) {
self.state = 'opened'; self.state = 'opened';
self._db = db; self._db = db;
completion(); flushCallbacks();
self.setTimeout();
} }
this.state = 'opening'; self.state = 'opening';
this.client.open(function (err, db) { self.client.open(function (err, db) {
if (err) { if (err) {
return onError(err); return onError(err);
} }