Fixed a hang issue of `StorageServiceClient` with retry policy filter set when retrying sending the request, the stream is not readable anymore.
This commit is contained in:
Родитель
b8909b4ba4
Коммит
46e0798677
|
@ -6,6 +6,7 @@ be taken. This is a GA release and the changes described below indicate the chan
|
|||
ALL
|
||||
|
||||
* Fixed the issue that retry filter will fail against storage emulator.
|
||||
* Fixed a hang issue of `StorageServiceClient` with retry policy filter set when retrying sending the request, the stream is not readable anymore.
|
||||
|
||||
2016.07 Version 1.2.0
|
||||
|
||||
|
|
|
@ -126,7 +126,9 @@ RetryPolicyFilter._handle = function (self, requestOptions, next) {
|
|||
|
||||
// Only in the case of success from server but client side failure like MD5 or length mismatch, returnObject.retryable has a value(we explicitly set it to false).
|
||||
// In this case, we should not retry the request.
|
||||
if (returnObject.error && azureutil.objectIsNull(returnObject.retryable) &&
|
||||
// If the output stream already get sent to server and get error back,
|
||||
// we should NOT retry within the SDK as the stream data is not valid anymore if we retry directly.
|
||||
if (!returnObject.outputStreamSent && returnObject.error && azureutil.objectIsNull(returnObject.retryable) &&
|
||||
((!azureutil.objectIsNull(returnObject.response) && retryInfo.retryable) ||
|
||||
(returnObject.error.code === 'ETIMEDOUT' || returnObject.error.code === 'ESOCKETTIMEDOUT' || returnObject.error.code === 'ECONNRESET'))) {
|
||||
|
||||
|
@ -185,7 +187,7 @@ RetryPolicyFilter._handle = function (self, requestOptions, next) {
|
|||
|
||||
RetryPolicyFilter._shouldRetryOnError = function (statusCode, requestOptions) {
|
||||
var retryInfo = (requestOptions && requestOptions.retryContext) ? requestOptions.retryContext : {};
|
||||
|
||||
|
||||
// Non-timeout Cases
|
||||
if (statusCode >= 300 && statusCode != 408) {
|
||||
// Always no retry on "not implemented" and "version not supported"
|
||||
|
|
|
@ -269,6 +269,17 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
|
|||
// Initialize the operationExpiryTime
|
||||
this._setOperationExpiryTime(options);
|
||||
|
||||
// If the output stream already got sent to server and got error back,
|
||||
// we should NOT retry within the SDK as the stream data is not valid anymore if we retry directly.
|
||||
// And it's very hard for SDK to re-wind the stream.
|
||||
//
|
||||
// If users want to retry on this kind of error, they can implement their own logic to parse the response and
|
||||
// determine if they need to re-prepare a stream and call our SDK API to retry.
|
||||
//
|
||||
// Currently for blobs/files with size greater than 32MB (DEFAULT_SINGLE_BLOB_PUT_THRESHOLD_IN_BYTES),
|
||||
// we'll sent the steam by chunk buffers which doesn't have this issue.
|
||||
var outputStreamSent = false;
|
||||
|
||||
var operation = function (options, next) {
|
||||
self._validateLocation(options);
|
||||
var currentLocation = options.currentLocation;
|
||||
|
@ -299,7 +310,8 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
|
|||
responseObject.operationEndTime = new Date();
|
||||
// Required for listing operations to make sure successive operations go to the same location.
|
||||
responseObject.targetLocation = currentLocation;
|
||||
|
||||
responseObject.outputStreamSent = outputStreamSent;
|
||||
|
||||
callback(responseObject, next);
|
||||
};
|
||||
|
||||
|
@ -450,6 +462,7 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
|
|||
var index = 0;
|
||||
|
||||
body.outputStream.on('data', function (d) {
|
||||
outputStreamSent = true;
|
||||
if(self._maximumExecutionTimeExceeded(Date.now(), options.operationExpiryTime)) {
|
||||
processResponseCallback(new TimeoutError(SR.MAXIMUM_EXECUTION_TIMEOUT_EXCEPTION));
|
||||
} else {
|
||||
|
@ -470,6 +483,7 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
|
|||
// NOTE: workaround for an unexpected EPIPE exception when piping streams larger than 29 MB
|
||||
if (!azureutil.objectIsNull(finalRequestOptions.headers['content-length']) && finalRequestOptions.headers['content-length'] < 29 * 1024 * 1024) {
|
||||
body.outputStream.pipe(buildRequest());
|
||||
outputStreamSent = true;
|
||||
|
||||
if (azureutil.isStreamPaused(body.outputStream)) {
|
||||
body.outputStream.resume();
|
||||
|
|
|
@ -15,10 +15,12 @@
|
|||
//
|
||||
|
||||
var assert = require('assert');
|
||||
var fs = require('fs');
|
||||
|
||||
// Test includes
|
||||
var testutil = require('../../framework/util');
|
||||
var TestSuite = require('../../framework/test-suite');
|
||||
var rfs = testutil.libRequire('common/streams/readablefs');
|
||||
|
||||
// Lib includes
|
||||
var azure = testutil.libRequire('azure-storage');
|
||||
|
@ -31,10 +33,20 @@ var exponentialRetryPolicyFilter;
|
|||
var tableNames = [];
|
||||
var tablePrefix = 'expretry';
|
||||
|
||||
var shareNames = [];
|
||||
var sharePrefix = 'expretry';
|
||||
|
||||
var fileNames = [];
|
||||
var filePrefix = 'expretry';
|
||||
|
||||
var tableService;
|
||||
var tableName;
|
||||
|
||||
var fileService;
|
||||
var shareName;
|
||||
|
||||
var suite = new TestSuite('exponentialretrypolicyfilter-tests');
|
||||
var runOrSkip = suite.isMocked ? it.skip : it;
|
||||
|
||||
describe('exponentialretrypolicyfilter-tests', function () {
|
||||
before(function (done) {
|
||||
|
@ -57,10 +69,21 @@ describe('exponentialretrypolicyfilter-tests', function () {
|
|||
});
|
||||
|
||||
afterEach(function (done) {
|
||||
tableService.deleteTableIfExists(tableName, function (deleteError) {
|
||||
assert.equal(deleteError, null);
|
||||
if (tableName) {
|
||||
tableService.deleteTableIfExists(tableName, function (deleteError) {
|
||||
assert.equal(deleteError, null);
|
||||
if (shareName) {
|
||||
fileService.deleteShareIfExists(shareName, function (deleteError) {
|
||||
assert.equal(deleteError, null);
|
||||
suite.teardownTest(done);
|
||||
});
|
||||
} else {
|
||||
suite.teardownTest(done);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
suite.teardownTest(done);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it('should fail when the table already exists', function (done) {
|
||||
|
@ -142,4 +165,35 @@ describe('exponentialretrypolicyfilter-tests', function () {
|
|||
done();
|
||||
});
|
||||
});
|
||||
|
||||
runOrSkip('should NOT retry when the output stream is already sent', function(done) {
|
||||
fileService = azure.createFileService().withFilter(exponentialRetryPolicyFilter);
|
||||
shareName = testutil.generateId(sharePrefix, shareNames, suite.isMocked);
|
||||
var fileName = testutil.generateId(filePrefix, fileNames, suite.isMocked);
|
||||
var localTempFileName = suite.getName('fileservice_test_retry');
|
||||
var fileSize = 100;
|
||||
|
||||
// Real stream length is smaller than the expected data length to mock the client timeout error to trigger the retry
|
||||
var fileBuffer = new Buffer( fileSize % 2 );
|
||||
fileBuffer.fill(1);
|
||||
fs.writeFileSync(localTempFileName, fileBuffer);
|
||||
|
||||
fileService.createShare(shareName, function(err, result) {
|
||||
assert.equal(err, null);
|
||||
assert.notEqual(result, null);
|
||||
assert.equal(result.name, shareName);
|
||||
|
||||
fileService.createFile(shareName, '', fileName, fileSize, function(err) {
|
||||
assert.equal(err, null);
|
||||
|
||||
// Expect 100 bytes to sent but the stream only have 50 bytes.
|
||||
// It'll result in ECONNRESET error and should NOT retry. If retry, it'll hang to wait for data from the stream but the stream is already closed as the data already sent out in the 1st failed request.
|
||||
fileService.createRangesFromStream(shareName, '', fileName, rfs.createReadStream(localTempFileName), 0, fileSize - 1, function(err, result, response){
|
||||
assert.notEqual(err, null);
|
||||
assert.equal(err.code, 'ECONNRESET');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
Загрузка…
Ссылка в новой задаче