This commit is contained in:
Gene Hazan 2017-03-13 16:17:23 -07:00
Родитель c052a36545
Коммит 2ec5863c11
3 изменённых файлов: 117 добавлений и 6 удалений

Просмотреть файл

@ -17,6 +17,7 @@ const DeltaStore = require('../providers/storage/deltaStore');
const MongoDocStore = require('../providers/storage/mongodocstore');
const AzureStorageDocStore = require('../providers/storage/storageDocStore');
const UrlToUrnMappingStore = require('../providers/storage/urlToUrnMappingStore');
const AzureTableMappingStore = require('../providers/storage/tableMappingStore');
const amqp10 = require('amqp10');
const appInsights = require('applicationinsights');
const aiLogger = require('winston-azure-application-insights').AzureApplicationInsightsLogger;
@ -373,6 +374,10 @@ class CrawlerFactory {
let store = null;
switch (options.provider) {
case 'azure': {
store = CrawlerFactory.createTableAndStorageStore(options);
break;
}
case 'azure-redis': {
store = CrawlerFactory.createRedisAndStorageStore(options);
break;
}
@ -395,11 +400,19 @@ class CrawlerFactory {
}
static createRedisAndStorageStore(options, name = null) {
factoryLogger.info(`creating azure store`, { name: name });
factoryLogger.info(`creating azure redis store`, { name: name });
const baseStore = CrawlerFactory.createAzureStorageStore(options, name);
return new UrlToUrnMappingStore(baseStore, CrawlerFactory.getRedisClient(options.logger), baseStore.name, options);
}
static createTableAndStorageStore(options, name = null) {
factoryLogger.info(`creating azure store`, { name: name });
const baseStore = CrawlerFactory.createAzureStorageStore(options, name);
const account = config.get('CRAWLER_STORAGE_ACCOUNT');
const key = config.get('CRAWLER_STORAGE_KEY');
return new AzureTableMappingStore(baseStore, CrawlerFactory.createTableService(account, key), baseStore.name, options);
}
static createAzureStorageStore(options, name = null) {
factoryLogger.info(`creating azure storage store`);
name = name || config.get('CRAWLER_STORAGE_NAME');
@ -413,7 +426,8 @@ class CrawlerFactory {
const provider = options.provider || 'azure';
factoryLogger.info(`Create deadletter store for provider ${options.provider}`);
switch (options.provider) {
case 'azure': {
case 'azure':
case 'azure-redis': {
return CrawlerFactory.createAzureStorageStore(options, config.get('CRAWLER_STORAGE_NAME') + '-deadletter');
}
case 'mongo': {
@ -432,7 +446,8 @@ class CrawlerFactory {
}
factoryLogger.info(`creating delta store`);
switch (options.delta.provider) {
case 'azure': {
case 'azure':
case 'azure-redis': {
return CrawlerFactory.createAzureDeltaStore(inner, null, options);
}
default: throw new Error(`Invalid delta store provider: ${options.delta.provider}`);
@ -492,6 +507,12 @@ class CrawlerFactory {
return AzureStorage.createBlobService(account, key).withFilter(retryOperations);
}
static createTableService(account, key) {
factoryLogger.info(`creating table service`);
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter();
return AzureStorage.createTableService(account, key).withFilter(retryOperations);
}
static createLocker(options) {
factoryLogger.info(`creating locker`, { provider: options.provider });
if (options.provider === 'memory') {

Просмотреть файл

@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
const azure = require('azure-storage');
const Q = require('q');
class AzureTableMappingStore {
constructor(baseStore, tableService, name, options) {
this.baseStore = baseStore;
this.service = tableService;
this.name = name;
this.options = options;
}
connect() {
return this.baseStore.connect().then(() => {
return this._createTable(this.name);
});
}
upsert(document) {
return this.baseStore.upsert(document).then(blobName => {
const url = document._metadata.url;
const urn = document._metadata.links.self.href;
const deferred = Q.defer();
const urlBatch = { PartitionKey: { '_': this.name }, RowKey: { '_': encodeURIComponent(url) }, blobName: { '_': blobName } };
const urnBatch = { PartitionKey: { '_': this.name }, RowKey: { '_': encodeURIComponent(urn) }, blobName: { '_': blobName } };
const batch = new azure.TableBatch();
batch.insertOrReplaceEntity(urlBatch);
batch.insertOrReplaceEntity(urnBatch);
this.service.executeBatch(this.name, batch, this._callbackToPromise(deferred));
return deferred.promise;
});
}
get(type, url) {
return this._getBlobNameForUrl(url).then(urn => {
if (!urn) {
throw new Error(`Document not found at ${url}`);
}
return this.baseStore.get(type, urn);
});
}
etag(type, url) {
return this._getBlobNameForUrl(url).then(urn => {
return urn ? this.baseStore.etag(type, urn) : null;
});
}
list(pattern) {
return this.baseStore.list(pattern);
}
delete(type, url) {
return this.baseStore.delete(type, url);
}
count(pattern) {
return this.baseStore.count(pattern);
}
close() {
return this.baseStore.close();
}
_createTable(name) {
const createTableIfNotExists = Q.nbind(this.service.createTableIfNotExists, this.service);
return createTableIfNotExists(name);
}
_getBlobNameForUrl(url) {
const deferred = Q.defer();
this.service.retrieveEntity(this.name, this.name, encodeURIComponent(url), (error, result) => {
if (error) {
return deferred.reject(error);
}
deferred.resolve(result.blobName._);
});
return deferred.promise;
}
_callbackToPromise(deferred) {
return (error, value) => {
return error ? deferred.reject(error) : deferred.resolve(value);
};
}
}
module.exports = AzureTableMappingStore;

Просмотреть файл

@ -26,7 +26,7 @@ class UrltoUrnMappingStore {
}
get(type, url) {
return this._getUrnForUrl(url).then(urn => {
return this._getBlobNameForUrl(url).then(urn => {
if (!urn) {
throw new Error(`Document not found at ${url}`);
}
@ -35,7 +35,7 @@ class UrltoUrnMappingStore {
}
etag(type, url) {
return this._getUrnForUrl(url).then(urn => {
return this._getBlobNameForUrl(url).then(urn => {
return urn ? this.baseStore.etag(type, urn) : null;
});
}
@ -56,7 +56,7 @@ class UrltoUrnMappingStore {
return this.baseStore.close();
}
_getUrnForUrl(url) {
_getBlobNameForUrl(url) {
const deferred = Q.defer();
this.redisClient.hget(this.name, url, this._callbackToPromise(deferred));
return deferred.promise;