diff --git a/.jshintrc b/.jshintrc index 08a8bed..5780e54 100644 --- a/.jshintrc +++ b/.jshintrc @@ -3,7 +3,7 @@ "curly": true, "eqeqeq": true, "es3": false, - "forin": false, + "forin": true, "freeze": true, "immed": false, "indent": 2, @@ -26,5 +26,6 @@ "maxlen": 80, "node": true, "browser": false, - "esnext": true + "esnext": true, + "sub": true } diff --git a/config/default.yml b/config/default.yml index b2ffca1..b21ba4d 100644 --- a/config/default.yml +++ b/config/default.yml @@ -15,3 +15,16 @@ pac: gzip: # Compression level for gzip. level: 9 + +# Cache configuration. +cache: + # Enable cache. Don't rename it to 'enabled' as this will crash the parser. + use: true + # Supported cache types are: + # - basic (in-memory JS) + # - redis (requires Redis database) + type: basic + # Configuration for non-basic database server. + database: + host: localhost + port: 55255 diff --git a/lib/cache.js b/lib/cache.js new file mode 100644 index 0000000..60a070a --- /dev/null +++ b/lib/cache.js @@ -0,0 +1,41 @@ +var storage = require('./storage'); + +var DEF_MAX_SIZE = 3000; + +// Resource caching interface, creates a new storage. +var Cache = function(options) { + this.maxSize = options.maxCacheSize || DEF_MAX_SIZE; + this.storage = storage.create(options.cache.type); +}; +Cache.prototype.save = function(key, value, expire) { + this.storage.save(key, value, expire); +}; +Cache.prototype.load = function(key, callback) { + this.storage.load(key, callback); +}; + +// Default cache instance. +var instance = null; + +// Initializes a new cache for given storage type. +exports.init = function(options) { + if (options.cache.use) { + instance = new Cache(options); + } +}; + +// Saves new cache entry. +exports.save = function(key, value, expire) { + if (instance) { + instance.save(key, value, expire); + } +}; + +// Returns cache entry for given key if available. +exports.load = function(key, callback) { + if (instance) { + instance.load(key, callback); + } else { + callback(); + } +}; diff --git a/lib/index.js b/lib/index.js index 7fc2642..a38d0af 100644 --- a/lib/index.js +++ b/lib/index.js @@ -38,7 +38,6 @@ function addVersionConfig() { CONFIG.getConfigSources().forEach(function(config) { addVersionConfig(); console.log('Using configuration %s:', config.name); - console.dir(config.parsed); }); var SpdyProxy = require('./proxy'); diff --git a/lib/plugins/cacheload.js b/lib/plugins/cacheload.js new file mode 100644 index 0000000..ccb7602 --- /dev/null +++ b/lib/plugins/cacheload.js @@ -0,0 +1,22 @@ +var url = require('url'); +var cache = require('../cache'); + +exports.load = function(request, dest, options, callback) { + if (!options.cache.use || request.method !== 'GET') { + // Do nothing for POST and CONNECT requests or when caching is disabled. + callback(null, false); + } + + var path = request.headers.path || url.parse(request.url).path; + var key = [request.headers.host, path]; + cache.load(key, function(error, cached) { + if (!error && cached) { + var value = cached.value; + request.log('delivering %d bytes from cache', cached.size); + dest.writeHead(value[0][0], value[0][1], value[0][2]); + // TODO(esawin): fix explicit buffer creation workaround for redis. + dest.end(new Buffer(value[1])); + } + callback(error, Boolean(cached)); + }); +}; diff --git a/lib/plugins/cachesave.js b/lib/plugins/cachesave.js new file mode 100644 index 0000000..7ee5210 --- /dev/null +++ b/lib/plugins/cachesave.js @@ -0,0 +1,86 @@ +var url = require('url'); +var cache = require('../cache'); + +var MAX_EXPIRE = 7 * 24 * 60 * 60; +// var DEF_EXPIRE = 1 * 24 * 60 * 60; +var DEF_EXPIRE = 30; + +// Parses cache control header and last-modified. +function parseCacheControl(headers) { + var lastMod = headers['last-modified']; + var expires = headers.expires; + + var cacheHeaders = { + 'last-modified': lastMod ? new Date(lastMod) : null, + expires: expires ? new Date(expires) : null + }; + + var cacheControl = headers['cache-control']; + if (cacheControl) { + cacheControl.split(',').forEach(function(elem) { + elem = elem.trim(); + var i = elem.indexOf('='); + if (i === -1) { + cacheHeaders[elem] = true; + } else { + cacheHeaders[elem.substr(0, i)] = elem.substr(i + 1); + } + }); + } + + return cacheHeaders; +} + +// Returns the expire time in seconds. +function maxAge(cacheHeaders) { + var expire = cacheHeaders['s-maxage'] || cacheHeaders['max-age']; + if (expire) { + expire = parseInt(expire); + } else if (cacheHeaders.expires) { + expire = (cacheHeaders.expires.getTime() - (new Date()).getTime()) / 1000; + } else { + expire = DEF_EXPIRE; + } + return Math.min(MAX_EXPIRE, expire); +} + +// Aggregates data and caches it when appropriate. +exports.handleResponse = function(request, source, dest, options) { + var cacheControl = parseCacheControl(source.headers); + + if (!options.cache.use || request.method !== 'GET' || + cacheControl['private'] || cacheControl['no-store']) { + // Do nothing for POST and CONNECT requests or when caching is disabled. + source.pipe(dest); + source.resume(); + return; + } + + // Expire time in seconds. + var expire = maxAge(cacheControl); + var count = 0; + var data = []; + + source.on('data', function(chunk) { + count += chunk.length; + data.push(chunk); + dest.write(chunk); + }); + + source.on('end', function() { + if (expire > 0) { + // Cache data. + data = Buffer.concat(data); + var path = request.headers.path || url.parse(request.url).path; + var key = [request.headers.host, path]; + var header = [source.statusCode, '', source.headers]; + cache.save(key, + { value: [header, data, cacheControl], size: data.length }, expire); + request.log('cached %d bytes for %d s', count, expire); + } + + dest.end(); + }); + + source.resume(); +}; diff --git a/lib/plugins/index.js b/lib/plugins/index.js index 0dad597..729c523 100644 --- a/lib/plugins/index.js +++ b/lib/plugins/index.js @@ -4,9 +4,10 @@ var Duplex = require('stream').Duplex; var PassthroughPlugin = require('./passthrough'); var GzipPlugin = require('./gzip'); var DeliverPlugin = require('./deliver'); +var CacheSavePlugin = require('./cachesave'); var plugins = { - response: [PassthroughPlugin, GzipPlugin, DeliverPlugin] + response: [PassthroughPlugin, GzipPlugin, CacheSavePlugin, DeliverPlugin] }; function PipedResponse(response, options) { diff --git a/lib/proxy.js b/lib/proxy.js index 4e67251..506e6a6 100644 --- a/lib/proxy.js +++ b/lib/proxy.js @@ -4,7 +4,11 @@ var util = require('util'); var net = require('net'); var http = require('http'); var spdy = require('spdy'); +var sync = require('synchronize'); + var plugins = require('./plugins'); +var cache = require('./cache'); +var CacheLoadPlugin = require('./plugins/cacheload'); // Shortens the given URL to given maxLen by inserting '...'. function shortenUrl(url, maxLen) { @@ -22,6 +26,7 @@ function shortenUrl(url, maxLen) { var SpdyProxy = function(options) { function handleListen() { + cache.init(options); console.log('%s listens on port %d', options.name, options.proxy.port); } @@ -43,31 +48,38 @@ var SpdyProxy = function(options) { shortenUrl(httpOpts.path) ); - var forwardRequest = http.request(httpOpts, function(forwardResponse) { - forwardResponse.headers['proxy-agent'] = options.title; + request.log = function() { + console.log('%s\t%s\t%s', + new Date().toISOString(), + shortenUrl(request.url), + util.format.apply(null, Array.prototype.slice.call(arguments, 0))); + }; - request.log = function() { - console.log('%s\t%s\t%s', - new Date().toISOString(), - shortenUrl(request.url), - util.format.apply(null, - Array.prototype.slice.call(arguments, 0))); - }; + sync.fiber(function() { + // Load from cache, if available. + var cached = sync.await(CacheLoadPlugin.load(request, response, options, + sync.defer())); + if (cached) { + return; + } - plugins.handleResponse(request, forwardResponse, response, options); - }); + var forwardRequest = http.request(httpOpts, function(forwardResponse) { + forwardResponse.headers['proxy-agent'] = options.title; + plugins.handleResponse(request, forwardResponse, response, options); + }); - forwardRequest.on('error', function(e) { - console.error('Client error: %s', e.message); - response.writeHead(502, 'Proxy fetch failed'); - response.end(); - }); + forwardRequest.on('error', function(e) { + console.error('Client error: '.error + e.message); + response.writeHead(502, 'Proxy fetch failed'); + response.end(); + }); - // Pipe POST data. - request.pipe(forwardRequest); + // Pipe POST data. + request.pipe(forwardRequest); - response.on('close', function() { - forwardRequest.abort(); + response.on('close', function() { + forwardRequest.abort(); + }); }); } diff --git a/lib/storage/basic.js b/lib/storage/basic.js new file mode 100644 index 0000000..cc5f973 --- /dev/null +++ b/lib/storage/basic.js @@ -0,0 +1,24 @@ +var basicCache = require('memory-cache'); + +var TYPE = 'basic'; + +// Basic in-memory pure JS database. +var BasicDb = function() { + this.type = TYPE; +}; +BasicDb.prototype.save = function(key, value, expire, onSuccess, onExpire) { + basicCache.put(key, value, expire * 1000, onExpire); + onSuccess(); +}; +BasicDb.prototype.load = function(key, callback) { + var value = basicCache.get(key); + callback(null, value); +}; + +var instance = new BasicDb(); + +exports.type = TYPE; + +exports.connect = function() { + return instance; +}; diff --git a/lib/storage/index.js b/lib/storage/index.js new file mode 100644 index 0000000..c940a68 --- /dev/null +++ b/lib/storage/index.js @@ -0,0 +1,49 @@ +var basic = require('./basic'); +var redis = require('./redis'); + +// Available database modules. +var DATABASES = [basic, redis]; + +// Creates a new storage connected to given database. +var Storage = function(db) { + this.db = db; + this.size = 0; + this.memSize = 0; + console.log('### new %s storage created', this.db.type); +}; +Storage.prototype.save = function(key, value, expire) { + var that = this; + var memSize = value.size; + + function onSuccess() { + that.size += 1; + that.memSize += memSize; + console.log('+++ storage %d items %d MB', that.size, + (that.memSize / 1048576).toFixed(2)); + } + + function onExpire() { + that.size -= 1; + that.memSize -= memSize; + console.log('--- storage %d items %d MB', that.size, + (that.memSize / 1048576).toFixed(2)); + } + + this.db.save(key, value, expire, onSuccess, onExpire); +}; +Storage.prototype.load = function(key, callback) { + this.db.load(key, callback); +}; + +// Creates a new storage instance connected to given database type. +exports.create = function(type) { + var db = null; + + DATABASES.forEach(function(d) { + if (d.type === type) { + db = d.connect(); + } + }); + + return db && new Storage(db); +}; diff --git a/lib/storage/redis.js b/lib/storage/redis.js new file mode 100644 index 0000000..1d3b536 --- /dev/null +++ b/lib/storage/redis.js @@ -0,0 +1,23 @@ +var redis = require('redis'); + +var TYPE = 'redis'; + +var RedisDb = function() { + this.type = TYPE; + this.client = redis.createClient(); +}; +RedisDb.prototype.save = function(key, value, expire, onSuccess) { + this.client.set(key.toString(), JSON.stringify(value), onSuccess); + this.client.expire(key.toString(), expire); +}; +RedisDb.prototype.load = function(key, callback) { + this.client.get(key.toString(), function(error, reply) { + callback(error, JSON.parse(reply)); + }); +}; + +exports.type = TYPE; + +exports.connect = function() { + return new RedisDb(); +};