Insert into both Redis & Azure table stores. Read only from Redis for now.

This commit is contained in:
Gene Hazan 2017-03-14 11:20:33 -07:00
Родитель caaef167a0
Коммит e5469fcfda
2 изменённых файлов: 34 добавлений и 23 удалений

Просмотреть файл

@ -410,7 +410,8 @@ class CrawlerFactory {
const baseStore = CrawlerFactory.createAzureStorageStore(options, name);
const account = config.get('CRAWLER_STORAGE_ACCOUNT');
const key = config.get('CRAWLER_STORAGE_KEY');
return new AzureTableMappingStore(baseStore, CrawlerFactory.createTableService(account, key), baseStore.name, options);
//TODO: remove "CrawlerFactory.getRedisClient(options.logger)" after Redis to Azure table data migration
return new AzureTableMappingStore(baseStore, CrawlerFactory.createTableService(account, key), CrawlerFactory.getRedisClient(options.logger), baseStore.name, options);
}
static createAzureStorageStore(options, name = null) {

Просмотреть файл

@ -5,9 +5,10 @@ const azure = require('azure-storage');
const Q = require('q');
class AzureTableMappingStore {
constructor(baseStore, tableService, name, options) {
constructor(baseStore, tableService, redisClient, name, options) { //TODO: remove "redisClient" after Redis to Azure table data migration
this.baseStore = baseStore;
this.service = tableService;
this.redisClient = redisClient; //TODO: remove after Redis to Azure table data migration
this.name = name;
this.options = options;
}
@ -22,29 +23,37 @@ class AzureTableMappingStore {
return this.baseStore.upsert(document).then(blobName => {
const url = document._metadata.url;
const urn = document._metadata.links.self.href;
const type = document._metadata.type;
const deferred = Q.defer();
const urlBatch = { PartitionKey: { '_': this.name }, RowKey: { '_': encodeURIComponent(url) }, blobName: { '_': blobName } };
const urnBatch = { PartitionKey: { '_': this.name }, RowKey: { '_': encodeURIComponent(urn) }, blobName: { '_': blobName } };
const batch = new azure.TableBatch();
batch.insertOrReplaceEntity(urlBatch);
batch.insertOrReplaceEntity(urnBatch);
this.service.executeBatch(this.name, batch, this._callbackToPromise(deferred));
const urlEntity = { PartitionKey: { '_': `url:${type}` }, RowKey: { '_': encodeURIComponent(url) }, blobName: { '_': blobName } };
const urnEntity = { PartitionKey: { '_': `urn:${type}` }, RowKey: { '_': encodeURIComponent(urn) }, blobName: { '_': blobName } };
this.redisClient.hmset(this.name, [urn, blobName, url, blobName], (error) => { //TODO: remove after the data migration except for "this.service..."
if (error) {
return deferred.reject(error);
}
this.service.insertOrReplaceEntity(this.name, urlEntity, (error) => {
if (error) {
return deferred.reject(error);
}
this.service.insertOrReplaceEntity(this.name, urnEntity, this._callbackToPromise(deferred));
});
});
return deferred.promise;
});
}
get(type, url) {
return this._getBlobNameForUrl(url).then(urn => {
if (!urn) {
return this._getBlobNameForUrl(type, url).then(blobName => {
if (!blobName) {
throw new Error(`Document not found at ${url}`);
}
return this.baseStore.get(type, urn);
return this.baseStore.get(type, blobName);
});
}
etag(type, url) {
return this._getBlobNameForUrl(url).then(urn => {
return urn ? this.baseStore.etag(type, urn) : null;
return this._getBlobNameForUrl(type, url).then(blobName => {
return blobName ? this.baseStore.etag(type, blobName) : null;
});
}
@ -69,17 +78,18 @@ class AzureTableMappingStore {
return createTableIfNotExists(name);
}
_getBlobNameForUrl(url) {
_getBlobNameForUrl(type, url) {
const deferred = Q.defer();
this.service.retrieveEntity(this.name, this.name, encodeURIComponent(url), (error, result) => {
if (!error) {
return deferred.resolve(result.blobName._);
}
if (error && error.code === 'ResourceNotFound') {
return deferred.resolve(null);
}
deferred.reject(error);
});
this.redisClient.hget(this.name, url, this._callbackToPromise(deferred)); // TODO: remove this line and uncomment below after the data migration
// this.service.retrieveEntity(this.name, `url:${type}`, encodeURIComponent(url), (error, result) => {
// if (!error) {
// return deferred.resolve(result.blobName._);
// }
// if (error && error.code === 'ResourceNotFound') {
// return deferred.resolve(null);
// }
// deferred.reject(error);
// });
return deferred.promise;
}