added concurrent upload and streamming support for upload/download

This commit is contained in:
Tank Tang 2016-10-25 14:16:48 +08:00
Родитель 9e80199eeb
Коммит a2be39d372
7 изменённых файлов: 1026 добавлений и 222 удалений

Просмотреть файл

@ -31,6 +31,10 @@
depends="prepare,static-analysis,phpunit,phpdox,-check-failure"
description="Performs static analysis, runs the tests, and generates project documentation"/>
<target name="full-build-large-scale"
depends="prepare,static-analysis,phpunit-large-scale,phpdox,-check-failure"
description="Performs static analysis, runs the tests (including large scale tests), and generates project documentation"/>
<target name="full-build-parallel"
depends="prepare,static-analysis-parallel,phpunit,phpdox,-check-failure"
description="Performs static analysis (executing the tools in parallel), runs the tests, and generates project documentation"/>
@ -229,6 +233,19 @@
unless="phpunit.done"
depends="prepare"
description="Run unit tests with PHPUnit">
<exec executable="${phpunit}" resultproperty="result.phpunit" taskname="phpunit">
<arg value="--configuration"/>
<arg path="${basedir}/phpunit.xml.dist"/>
<arg value="--exclude-group=large-scale"/>
</exec>
<property name="phpunit.done" value="true"/>
</target>
<target name="phpunit-large-scale"
unless="phpunit.done"
depends="prepare"
description="Run unit tests with PHPUnit, including the large-scale tests">
<exec executable="${phpunit}" resultproperty="result.phpunit" taskname="phpunit">
<arg value="--configuration"/>
<arg path="${basedir}/phpunit.xml.dist"/>
@ -244,6 +261,7 @@
<exec executable="${phpunit}" failonerror="true" taskname="phpunit">
<arg value="--configuration"/>
<arg path="${basedir}/phpunit.xml.dist"/>
<arg value="--exclude-group=large-scale"/>
<arg value="--no-coverage"/>
</exec>
@ -257,6 +275,7 @@
<exec executable="${phpunit}" resultproperty="result-ft.phpunit" taskname="phpunit">
<arg value="--configuration"/>
<arg path="${basedir}/phpunit.functional.xml.dist"/>
<arg value="--exclude-group=large-scale"/>
</exec>
<property name="phpunit-ft.done" value="true"/>

Просмотреть файл

@ -75,6 +75,12 @@ use MicrosoftAzure\Storage\Blob\Models\CreateBlobSnapshotResult;
use MicrosoftAzure\Storage\Blob\Models\PageRange;
use MicrosoftAzure\Storage\Blob\Models\CopyBlobResult;
use MicrosoftAzure\Storage\Blob\Models\BreakLeaseResult;
use MicrosoftAzure\Storage\Common\Internal\ServiceFunctionThread;
use GuzzleHttp\Psr7;
use GuzzleHttp\Exception\ConnectException;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Middleware;
/**
* This class constructs HTTP requests and receive HTTP responses for blob
@ -93,7 +99,7 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
/**
* @var int Defaults to 32MB
*/
private $_SingleBlobUploadThresholdInBytes = 33554432 ;
private $_SingleBlobUploadThresholdInBytes = Resources::MB_IN_BYTES_32;
/**
* Get the value for SingleBlobUploadThresholdInBytes
@ -114,12 +120,12 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
*/
public function setSingleBlobUploadThresholdInBytes($val)
{
if ($val > 67108864) {
if ($val > Resources::MB_IN_BYTES_64) {
// What should the proper action here be?
$val = 67108864;
$val = Resources::MB_IN_BYTES_64;
} elseif ($val < 1) {
// another spot that could use looking at
$val = 33554432;
$val = Resources::MB_IN_BYTES_32;
}
$this->_SingleBlobUploadThresholdInBytes = $val;
}
@ -507,7 +513,7 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
* @param string $container The container name.
* @param string $blob The blob name.
* @param PageRange $range The page ranges.
* @param string|resource $content The content stream.
* @param string $content The content string.
* @param CreateBlobPagesOptions $options The optional parameters.
*
* @return CreateBlobPagesResult
@ -518,6 +524,7 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
Validate::isString($blob, 'blob');
Validate::notNullOrEmpty($blob, 'blob');
Validate::isString($container, 'container');
Validate::isString($content, 'content');
Validate::isTrue(
$range instanceof PageRange,
sprintf(
@ -526,10 +533,7 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
get_class(new PageRange())
)
);
Validate::isTrue(
is_string($content) || is_resource($content),
sprintf(Resources::INVALID_PARAM_MSG, 'content', 'string|resource')
);
$content = Psr7\stream_for($content);
$method = Resources::HTTP_PUT;
$headers = array();
@ -580,13 +584,13 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
);
$response = $this->send(
$method,
$headers,
$queryParams,
$method,
$headers,
$queryParams,
$postParams,
$path,
$statusCode,
$body
$path,
$statusCode,
$content
);
return CreateBlobPagesResult::create(HttpFormatter::formatHeaders($response->getHeaders()));
@ -1267,7 +1271,7 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
*
* @param string $container The name of the container.
* @param string $blob The name of the blob.
* @param string|resource $content The content of the blob.
* @param string|resource|StreamInterface $content The content of the blob.
* @param Models\CreateBlobOptions $options The optional parameters.
*
* @return CopyBlobResult
@ -1279,16 +1283,13 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
Validate::isString($container, 'container');
Validate::isString($blob, 'blob');
Validate::notNullOrEmpty($blob, 'blob');
Validate::isTrue(
is_string($content) || is_resource($content),
sprintf(Resources::INVALID_PARAM_MSG, 'content', 'string|resource')
);
$body = Psr7\stream_for($content);
$method = Resources::HTTP_PUT;
$headers = array();
$postParams = array();
$queryParams = array();
$bodySize = false;
$bodySize = null;
$path = $this->_createPath($container, $blob);
$statusCode = Resources::STATUS_CREATED;
@ -1296,18 +1297,34 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
$options = new CreateBlobOptions();
}
if (is_resource($content)) {
$cStat = fstat($content);
// if the resource is a remote file, $cStat will be false
if ($cStat) {
$bodySize = $cStat['size'];
// if we have a size, and the size is lower than the threshold,
// we can try to one shot this, else failsafe on block upload
$isSingleUpload = $body->isSeekable();
if ($isSingleUpload) {
$isSingleUpload = false;
try {
$body->read($this->_SingleBlobUploadThresholdInBytes);
} catch (\RuntimeException $e) {
//if the runtime exception contains certain string,
//the body is considered to be unable to seek to the end.
$pos = strpos(
$e->getMessage(),
'to seek to stream position '
);
if ($pos == null) {
throw $e;
}
$isSingleUpload = true;
}
if ($body->eof()) {
$isSingleUpload = true;
} elseif ($body->read(1) == '') {
$isSingleUpload = true;
}
} else {
$bodySize = strlen($content);
}
$body->rewind();
// if we have a size we can try to one shot this, else failsafe on block upload
if (is_int($bodySize) && $bodySize <= $this->_SingleBlobUploadThresholdInBytes) {
if ($isSingleUpload) {
$headers = $this->_addCreateBlobOptionalHeaders($options, $headers);
$this->addOptionalHeader(
@ -1321,52 +1338,22 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
$options->getTimeout()
);
// If read file failed for any reason it will throw an exception.
$body = is_resource($content) ? stream_get_contents($content) : $content;
$response = $this->send(
$method,
$headers,
$queryParams,
$method,
$headers,
$queryParams,
$postParams,
$path,
$path,
$statusCode,
$body
);
return CopyBlobResult::create(
HttpFormatter::formatHeaders($response->getHeaders())
);
} else {
// This is for large or failsafe upload
$end = 0;
$counter = 0;
$body = '';
$blockIds = array();
// if threshold is lower than 4mb, honor threshold, else use 4mb
$blockSize = ($this->_SingleBlobUploadThresholdInBytes < 4194304) ? $this->_SingleBlobUploadThresholdInBytes : 4194304;
while(!$end) {
if (is_resource($content)) {
$body = fread($content, $blockSize);
if (feof($content)) {
$end = 1;
}
} else {
if (strlen($content) <= $blockSize) {
$body = $content;
$end = 1;
} else {
$body = substr($content, 0, $blockSize);
$content = substr_replace($content, '', 0, $blockSize);
}
}
if (!empty($body)) {
$block = new Block();
$block->setBlockId(base64_encode(str_pad($counter++, 6, '0', STR_PAD_LEFT)));
$block->setType('Uncommitted');
array_push($blockIds, $block);
$this->createBlobBlock($container, $blob, $block->getBlockId(), $body);
}
}
$response = $this->commitBlobBlocks($container, $blob, $blockIds, $options);
return $this->createBlobBlocks($container, $blob, $body, $options);
}
return CopyBlobResult::create(HttpFormatter::formatHeaders($response->getHeaders()));
}
/**
@ -1401,7 +1388,7 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
* @param string $blob name of the blob
* @param Models\PageRange $range Can be up to 4 MB in size
* Note that ranges must be aligned to 512 (0-511, 512-1023)
* @param string $content the blob contents.
* @param string|resource|StreamInterface $content the blob contents.
* @param Models\CreateBlobPagesOptions $options optional parameters
*
* @return Models\CreateBlobPagesResult.
@ -1411,12 +1398,23 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
public function createBlobPages($container, $blob, $range, $content,
$options = null
) {
$contentStream = Psr7\stream_for($content);
//because the content is at most 4MB long, can retrieve all the data
//here at once.
$body = $contentStream->getContents();
//if the range is not align to 512, throw exception.
$chunks = (int)($range->getLength() / 512);
if ($chunks * 512 != $range->getLength()) {
throw new \RuntimeException(Resources::ERROR_RANGE_NOT_ALIGN_TO_512);
}
return $this->_updatePageBlobPagesImpl(
PageWriteOption::UPDATE_OPTION,
$container,
$blob,
$range,
$content,
$body,
$options
);
}
@ -1429,14 +1427,18 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
* @param string $blockId must be less than or equal to
* 64 bytes in size. For a given blob, the length of the value specified for the
* blockid parameter must be the same size for each block.
* @param string $content the blob block contents
* @param resource|string|StreamInterface $content the blob block contents
* @param Models\CreateBlobBlockOptions $options optional parameters
*
* @return none
*
*
* @return \MicrosoftAzure\Storage\Blob\Models\CopyBlobResult
*
* @see http://msdn.microsoft.com/en-us/library/windowsazure/dd135726.aspx
*/
public function createBlobBlock($container, $blob, $blockId, $content,
public function createBlobBlock(
$container,
$blob,
$blockId,
$content,
$options = null
) {
Validate::isString($container, 'container');
@ -1444,23 +1446,42 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
Validate::notNullOrEmpty($blob, 'blob');
Validate::isString($blockId, 'blockId');
Validate::notNullOrEmpty($blockId, 'blockId');
Validate::isTrue(
is_string($content) || is_resource($content),
sprintf(Resources::INVALID_PARAM_MSG, 'content', 'string|resource')
);
$method = Resources::HTTP_PUT;
$headers = array();
$postParams = array();
$queryParams = array();
$path = $this->_createPath($container, $blob);
$statusCode = Resources::STATUS_CREATED;
$body = $content;
if (is_null($options)) {
$options = new CreateBlobBlockOptions();
}
$method = Resources::HTTP_PUT;
$headers = $this->createBlobBlockHeader($options);
$postParams = array();
$queryParams = $this->createBlobBlockQueryParams($options, $blockId);
$path = $this->_createPath($container, $blob);
$statusCode = Resources::STATUS_CREATED;
$contentStream = Psr7\stream_for($content);
$body = $contentStream->getContents();
$response = $this->send(
$method,
$headers,
$queryParams,
$postParams,
$path,
$statusCode,
$body
);
return CopyBlobResult::create(HttpFormatter::formatHeaders($response->getHeaders()));
}
/**
* create the header for createBlobBlock(s)
* @param array $options the option of the request
*
* @return array [description]
*/
protected function createBlobBlockHeader($options)
{
$headers = array();
$this->addOptionalHeader(
$headers,
Resources::X_MS_LEASE_ID,
@ -1476,6 +1497,20 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
Resources::CONTENT_TYPE,
Resources::URL_ENCODED_CONTENT_TYPE
);
return $headers;
}
/**
* create the query params for createBlobBlock(s)
* @param array $options the option of the request
* @param string $blockId the block id of the block.
*
* @return array the constructed query parameters.
*/
protected function createBlobBlockQueryParams($options, $blockId)
{
$queryParams = array();
$this->addOptionalQueryParam(
$queryParams,
Resources::QP_TIMEOUT,
@ -1491,18 +1526,165 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
Resources::QP_BLOCKID,
$blockId
);
return $queryParams;
}
protected function createRetryHandler()
{
return function (
$retries,
$request,
$response = null,
$exception = null
) {
//set the max retries.
$maxRetries = Resources::CHUNK_MAX_RETRY;
//Exceeds the retry limit. No retry.
if ($retries >= $maxRetries) {
return false;
}
//Not connection exception. No retry.
if (!(($response && $response->getStatusCode() >= 400)
|| ($exception instanceof ConnectException))) {
return false;
}
return true;
};
}
/**
* This method creates the blob blocks. This method will send the request
* concurrently for better performance.
*
* @param string $container [description]
* @param string $blob [description]
* @param resource|string|StreamInterface $content [description]
* @param array $options [description]
*
* @return \MicrosoftAzure\Storage\Blob\Models\CopyBlobResult
*/
protected function createBlobBlocks(
$container,
$blob,
$content,
$options = null
) {
Validate::isString($container, 'container');
Validate::isString($blob, 'blob');
$contentStream = Psr7\stream_for($content);
if (is_null($options)) {
$options = new CreateBlobBlockOptions();
}
$response = $this->send(
$method,
$headers,
$queryParams,
$method = Resources::HTTP_PUT;
$headers = $this->createBlobBlockHeader($options);
$postParams = array();
$path = $this->_createPath($container, $blob);
$statusCode = Resources::STATUS_CREATED;
$blockIds = array();
// if threshold is lower than 4mb, honor threshold, else use 4mb
$blockSize = (
$this->_SingleBlobUploadThresholdInBytes
< Resources::MB_IN_BYTES_4) ?
$this->_SingleBlobUploadThresholdInBytes : Resources::MB_IN_BYTES_4;
$counter = 0;
//create the generator for requests.
$generator = function () use (
$contentStream,
&$blockIds,
$blockSize,
$options,
$method,
$headers,
$postParams,
$path,
$statusCode,
$body
$path,
&$counter
) {
//read the content.
$blockContent = $contentStream->read($blockSize);
//construct the blockId
$blockId = base64_encode(
str_pad($counter++, 6, '0', STR_PAD_LEFT)
);
//add the id to array.
$size = strlen($blockContent);
if ($size == 0) {
return null;
}
array_push($blockIds, new Block($blockId, 'Uncommitted'));
$queryParams = $this->createBlobBlockQueryParams(
$options,
$blockId
);
//return the array of requests.
return $this->createRequest(
$method,
$headers,
$queryParams,
$postParams,
$path,
$blockContent
);
};
//generate the decider.
$decider = function () use ($contentStream) {
$isEnd = $contentStream->eof();
//if the content stream is read to exactly the end of file
//the content stream will still not return true for eof()
//Have to read another byte, then see if it is null.
if (!$isEnd) {
$str = $contentStream->read(1);
if ($str != '') {
$contentStream->seek(-1, SEEK_CUR);
} else {
$isEnd = true;
}
}
return (!$isEnd);
};
//initialize the first batch of request.
$requests = array();
for ($index = 0;
$index < Resources::NUMBER_OF_CONCURRENCY && $decider();
++$index) {
$requests[] = $generator();
}
//create handler stack with retry handler.
$stack = HandlerStack::create();
$stack->push(Middleware::retry($this->createRetryHandler()));
$clientOptions = ['handler' => $stack];
//Send the request concurrently.
//Does not need to evaluate the results. If operation not successful,
//exception will be thrown.
$this->sendConcurrent(
new \ArrayIterator($requests),
$generator,
$decider,
$clientOptions
);
$response = $this->commitBlobBlocks(
$container,
$blob,
$blockIds,
$options
);
return CopyBlobResult::create(
HttpFormatter::formatHeaders(
$response->getHeaders()
)
);
return CopyBlobResult::create(HttpFormatter::formatHeaders($response->getHeaders()));
}
/**
@ -2072,6 +2254,43 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
return SetBlobMetadataResult::create(HttpFormatter::formatHeaders($response->getHeaders()));
}
/**
* Downloads a blob to a file, the result include its metadata and
* properties. The result will not contain a stream pointing to the
* content of the file.
*
* @param string $path The path and name of the file
* @param string $container name of the container
* @param string $blob name of the blob
* @param Models\GetBlobOptions $options optional parameters
*
* @return Models\GetBlobResult
*
* @see http://msdn.microsoft.com/en-us/library/windowsazure/dd179440.aspx
*/
public function getBlobToFile($path, $container, $blob, $options = null)
{
$resource = fopen($path, 'w+');
if ($resource == null) {
throw new \Exception(Resources::ERROR_FILE_COULD_NOT_BE_OPENED);
}
$result = $this->getBlob($container, $blob, $options);
$content = $result->getContentStream();
while (!$content->eof()) {
fwrite($resource, $content->read(Resources::MB_IN_BYTES_4));
}
//response body has already been set to file. Set the stream of the
//response body to be null, then close the file.
$result->setContentStream(null);
fclose($resource);
return $result;
}
/**
* Reads or downloads a blob from the system, including its metadata and
@ -2134,12 +2353,14 @@ class BlobRestProxy extends ServiceRestProxy implements IBlob
);
$response = $this->send(
$method,
$headers,
$queryParams,
$postParams,
$path,
$statusCode
$method,
$headers,
$queryParams,
$postParams,
$path,
$statusCode,
Resources::EMPTY_STRING,
['stream' => true] //setting stream to true to enable streaming
);
$metadata = $this->getMetadataArray(HttpFormatter::formatHeaders($response->getHeaders()));

Просмотреть файл

@ -46,6 +46,12 @@ class Block
* @var string
*/
private $_type;
public function __construct($blockId = '', $type = '')
{
$this->_blockId = $blockId;
$this->_type = $type;
}
/**
* Sets the blockId.

Просмотреть файл

@ -50,7 +50,7 @@ class GetBlobResult
private $_metadata;
/**
* @var resource
* @var StreamInterface
*/
private $_contentStream;
@ -58,7 +58,7 @@ class GetBlobResult
* Creates GetBlobResult from getBlob call.
*
* @param array $headers The HTTP response headers.
* @param string $body The response body.
* @param StreamInterface $body The response body.
* @param array $metadata The blob metadata.
*
* @return GetBlobResult
@ -66,7 +66,7 @@ class GetBlobResult
public static function create($headers, $body, $metadata)
{
$result = new GetBlobResult();
$result->setContentStream(Utilities::stringToStream($body));
$result->setContentStream($body);
$result->setProperties(BlobProperties::create($headers));
$result->setMetadata(is_null($metadata) ? array() : $metadata);
@ -120,7 +120,7 @@ class GetBlobResult
/**
* Gets blob contentStream.
*
* @return resource
* @return StreamInterface
*/
public function getContentStream()
{
@ -130,8 +130,8 @@ class GetBlobResult
/**
* Sets blob contentStream.
*
* @param resource $contentStream The stream handle.
*
* @param StreamInterface $contentStream The stream handle.
*
* @return none
*/
public function setContentStream($contentStream)

Просмотреть файл

@ -59,6 +59,7 @@ class Resources
const CERTIFICATE_PATH_NAME = 'CertificatePath';
// Messages
const INVALID_FUNCTION_NAME = 'The class %s does not have a function named %s.';
const INVALID_TYPE_MSG = 'The provided variable should be of type: ';
const INVALID_META_MSG = 'Metadata cannot contain newline characters.';
const AZURE_ERROR_MSG = "Fail:\nCode: %s\nValue: %s\ndetails (if any): %s.";
@ -112,6 +113,11 @@ class Resources
const ERROR_OAUTH_SERVICE_MISSING = 'OAuth service missing for account name \'%s\'';
const ERROR_METHOD_NOT_FOUND = 'Method \'%s\' not found in object class \'%s\'';
const ERROR_INVALID_DATE_STRING = 'Parameter \'%s\' is not a date formatted string \'%s\'';
const ERROR_TOO_LARGE_FOR_BLOCK_BLOB = 'Error: Exceeds the uppper limit of the blob.';
const ERROR_RANGE_NOT_ALIGN_TO_512 = 'Error: Range of the page blob must be align to 512';
const ERROR_FILE_COULD_NOT_BE_OPENED = 'Error: file with given path could not be opened or created.';
const ERROR_CONTAINER_NOT_EXIST = 'The specified container does not exist';
const ERROR_BLOB_NOT_EXIST = 'The specified blob does not exist';
// HTTP Headers
const X_MS_HEADER_PREFIX = 'x-ms-';
@ -214,6 +220,10 @@ class Resources
const DEV_STORE_URI = 'http://127.0.0.1';
const SERVICE_URI_FORMAT = "%s://%s.%s";
const WRAP_ENDPOINT_URI_FORMAT = "https://%s-sb.accesscontrol.windows.net/WRAPv0.9";
const MB_IN_BYTES_4 = 4194304;
const MB_IN_BYTES_32 = 33554432;
const MB_IN_BYTES_64 = 67108864;
const MAX_BLOB_BLOCKS = 50000;
// Xml Namespaces
const WA_XML_NAMESPACE = 'http://schemas.microsoft.com/windowsazure';
@ -221,10 +231,12 @@ class Resources
const DS_XML_NAMESPACE = 'http://schemas.microsoft.com/ado/2007/08/dataservices';
const DSM_XML_NAMESPACE = 'http://schemas.microsoft.com/ado/2007/08/dataservices/metadata';
const XSI_XML_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance';
const NUMBER_OF_CONCURRENCY = 25;//Guzzle's default value
const CHUNK_MAX_RETRY = 3;
// Header values
const SDK_VERSION = '0.10.0';
const SDK_VERSION = '0.10.2';
const STORAGE_API_LATEST_VERSION = '2015-04-05';
const DATA_SERVICE_VERSION_VALUE = '1.0;NetFx';
const MAX_DATA_SERVICE_VERSION_VALUE = '2.0;NetFx';

Просмотреть файл

@ -32,6 +32,7 @@ use GuzzleHttp\Psr7;
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\Psr7\Response;
use GuzzleHttp\Psr7\Uri;
use GuzzleHttp\Pool;
/**
* Base class for all services rest proxies.
@ -94,6 +95,153 @@ class ServiceRestProxy extends RestProxy
return $this->_accountName;
}
/**
* Filter the request using the filters. This is for user to create
* request.
* @param \GuzzleHttp\Psr7\Request $request The request to be filtered.
*
* @return \GuzzleHttp\Psr7\Request The filtered request.
*/
protected function filterRequest($request)
{
// Apply filters to the requests
foreach ($this->getFilters() as $filter) {
$request = $filter->handleRequest($request);
}
return $request;
}
/**
* Static helper function to create a usable client for the proxy.
* @param array $clientOptions Added options for client.
*
* @return \GuzzleHttp\Client
*/
protected function createClient($clientOptions)
{
return (new \GuzzleHttp\Client(
array_merge(
$this->_options['http'],
array(
"defaults" => array(
"allow_redirects" => true, "exceptions" => true,
"decode_content" => true,
),
'cookies' => true,
'verify' => false,
// For testing with Fiddler
//'proxy' => "localhost:8888",
),
$clientOptions
)
));
}
/**
* Send the requests concurrently
* @param ArrayIterator $requestsIterator an iterator to the array of
* request that is filtered
* using this object's filters.
* @param callable $generator the generator function to
* generate request upon fullfilment
* @param callable $decider decide if the generator
* continues to append.
* @param array $clientOptions an array of additional options
* for the client.
*
* @return array
*/
protected function sendConcurrent(
$requestsIterator,
$generator,
$decider,
$clientOptions = []
) {
$client = $this->createClient($clientOptions);
$pool = new Pool($client, $requestsIterator, [
'concurrency' => Resources::NUMBER_OF_CONCURRENCY,
'fulfilled' => function (
$response,
$index
) use (
$requestsIterator,
$generator,
$decider
) {
//append new request using the generator.
if (is_callable($generator) && $decider()) {
$requestsIterator->append($generator());
}
},
'rejected' => function ($reason, $index) {
//Still rejected even if the retry logic has been applied.
//Throwing exception.
throw $reason;
},
]);
return $pool->promise()->wait();
}
/**
* Create the request to be sent.
*
* @param string $method The method of the HTTP request
* @param array $headers The header field of the request
* @param array $queryParams The query parameter of the request
* @param array $postParameters The HTTP POST parameters
* @param string $path URL path
* @param string $body Request body
*
* @return \GuzzleHttp\Psr7\Request
*/
protected function createRequest(
$method,
$headers,
$queryParams,
$postParameters,
$path,
$body = Resources::EMPTY_STRING
) {
// add query parameters into headers
$uri = $this->_psrUri;
if ($path != null) {
$uri = $uri->withPath($path);
}
if ($queryParams != null) {
$queryString = Psr7\build_query($queryParams);
$uri = $uri->withQuery($queryString);
}
// add post parameters into bodys
$actualBody = null;
if (empty($body)) {
if (empty($headers['content-type'])) {
$headers['content-type'] = 'application/x-www-form-urlencoded';
$actualBody = Psr7\build_query($postParameters);
}
} else {
$actualBody = $body;
}
$request = new Request(
$method,
$uri,
$headers,
$actualBody
);
//add content-length to header
$bodySize = $request->getBody()->getSize();
if ($bodySize > 0) {
$request = $request->withHeader('content-length', $bodySize);
}
// Apply filters to the requests
return $this->filterRequest($request);
}
/**
* Sends HTTP request with the specified parameters.
*
@ -115,92 +263,39 @@ class ServiceRestProxy extends RestProxy
$postParameters,
$path,
$statusCode,
$body = Resources::EMPTY_STRING
$body = Resources::EMPTY_STRING,
$clientOptions = []
) {
// add query parameters into headers
$uri = $this->_psrUri;
if ($path != NULL)
{
$uri = $uri->withPath($path);
}
if ($queryParams != NULL)
{
$queryString = Psr7\build_query($queryParams);
$uri = $uri->withQuery($queryString);
}
// add post parameters into bodys
$actualBody = NULL;
if (empty($body))
{
if (empty($headers['content-type']))
{
$headers['content-type'] = 'application/x-www-form-urlencoded';
$actualBody = Psr7\build_query($postParameters);
}
}
else
{
$actualBody = $body;
}
$request = new Request(
$method,
$uri,
$headers,
$actualBody);
$client = new \GuzzleHttp\Client(
array_merge(
$this->_options['http'],
array(
"defaults" => array(
"allow_redirects" => true, "exceptions" => true,
"decode_content" => true,
),
'cookies' => true,
'verify' => false,
// For testing with Fiddler
// 'proxy' => "localhost:8888",
)
)
$request = $this->createRequest(
$method,
$headers,
$queryParams,
$postParameters,
$path,
$body
);
$bodySize = $request->getBody()->getSize();
if ($bodySize > 0)
{
$request = $request->withHeader('content-length', $bodySize);
}
// Apply filters to the requests
foreach ($this->getFilters() as $filter) {
$request = $filter->handleRequest($request);
}
$client = $this->createClient($clientOptions);
try {
$response = $client->send($request);
self::throwIfError(
$response->getStatusCode(),
$response->getReasonPhrase(),
$response->getBody(),
$statusCode
);
return $response;
} catch (\GuzzleHttp\Exception\RequestException $e) {
if ($e->hasResponse()) {
$response = $e->getResponse();
self::throwIfError(
$response->getStatusCode(),
$response->getReasonPhrase(),
$response->getBody(),
$statusCode);
return $response;
}
catch(\GuzzleHttp\Exception\RequestException $e)
{
if ($e->hasResponse())
{
$response = $e->getResponse();
self::throwIfError(
$response->getStatusCode(),
$response->getReasonPhrase(),
$response->getBody(),
$statusCode);
$statusCode
);
return $response;
}
else
{
} else {
throw $e;
}
}
@ -209,13 +304,14 @@ class ServiceRestProxy extends RestProxy
protected function sendContext($context)
{
return $this->send(
$context->getMethod(),
$context->getHeaders(),
$context->getQueryParameters(),
$context->getPostParameters(),
$context->getPath(),
$context->getStatusCodes(),
$context->getBody());
$context->getMethod(),
$context->getHeaders(),
$context->getQueryParameters(),
$context->getPostParameters(),
$context->getPath(),
$context->getStatusCodes(),
$context->getBody()
);
}
/**
@ -285,26 +381,22 @@ class ServiceRestProxy extends RestProxy
$header = $accessCondition->getHeader();
$headerName = null;
if (!empty($header)) {
switch($header) {
case Resources::IF_MATCH:
$headerName = Resources::X_MS_SOURCE_IF_MATCH;
break;
case Resources::IF_UNMODIFIED_SINCE:
$headerName = Resources::X_MS_SOURCE_IF_UNMODIFIED_SINCE;
break;
case Resources::IF_MODIFIED_SINCE:
$headerName = Resources::X_MS_SOURCE_IF_MODIFIED_SINCE;
break;
case Resources::IF_NONE_MATCH:
$headerName = Resources::X_MS_SOURCE_IF_NONE_MATCH;
break;
default:
throw new \Exception(Resources::INVALID_ACH_MSG);
break;
switch ($header) {
case Resources::IF_MATCH:
$headerName = Resources::X_MS_SOURCE_IF_MATCH;
break;
case Resources::IF_UNMODIFIED_SINCE:
$headerName = Resources::X_MS_SOURCE_IF_UNMODIFIED_SINCE;
break;
case Resources::IF_MODIFIED_SINCE:
$headerName = Resources::X_MS_SOURCE_IF_MODIFIED_SINCE;
break;
case Resources::IF_NONE_MATCH:
$headerName = Resources::X_MS_SOURCE_IF_NONE_MATCH;
break;
default:
throw new \Exception(Resources::INVALID_ACH_MSG);
break;
}
}
$value = $accessCondition->getValue();

Просмотреть файл

@ -1424,8 +1424,9 @@ class BlobRestProxyTest extends BlobServiceRestProxyTestBase
// Assert
$sourceBlob = $this->restProxy->getBlob($sourceContainerName, $sourceBlobName);
$destinationBlob = $this->restProxy->getBlob($destinationContainerName, $destinationBlobName);
$sourceBlobContent = stream_get_contents($sourceBlob->getContentStream());
$destinationBlobContent = stream_get_contents($destinationBlob->getContentStream());
$sourceBlobContent = $sourceBlob->getContentStream()->getContents();
$destinationBlobContent =
$destinationBlob->getContentStream()->getContents();
$this->assertEquals($sourceBlobContent, $destinationBlobContent);
$this->assertNotNull($result->getETag());
@ -1458,8 +1459,10 @@ class BlobRestProxyTest extends BlobServiceRestProxyTestBase
$sourceBlob = $this->restProxy->getBlob($containerName, $sourceBlobName);
$destinationBlob = $this->restProxy->getBlob($containerName, $destinationBlobName);
$sourceBlobContent = stream_get_contents($sourceBlob->getContentStream());
$destinationBlobContent = stream_get_contents($destinationBlob->getContentStream());
$sourceBlobContent =
$sourceBlob->getContentStream()->getContents();
$destinationBlobContent =
$destinationBlob->getContentStream()->getContents();
$this->assertEquals($sourceBlobContent, $destinationBlobContent);
}
@ -1490,8 +1493,9 @@ class BlobRestProxyTest extends BlobServiceRestProxyTestBase
// Assert
$sourceBlob = $this->restProxy->getBlob($containerName, $sourceBlobName);
$destinationBlob = $this->restProxy->getBlob($containerName, $destinationBlobName);
$sourceBlobContent = stream_get_contents($sourceBlob->getContentStream());
$destinationBlobContent = stream_get_contents($destinationBlob->getContentStream());
$sourceBlobContent = $sourceBlob->getContentStream()->getContents();
$destinationBlobContent =
$destinationBlob->getContentStream()->getContents();
$this->assertEquals($sourceBlobContent, $destinationBlobContent);
$this->assertNotEquals($destinationBlobContent, $oldBlobValue);
@ -1526,8 +1530,9 @@ class BlobRestProxyTest extends BlobServiceRestProxyTestBase
// Assert
$sourceBlob = $this->restProxy->getBlob($containerName, $sourceBlobName);
$destinationBlob = $this->restProxy->getBlob($containerName, $destinationBlobName);
$sourceBlobContent = stream_get_contents($sourceBlob->getContentStream());
$destinationBlobContent = stream_get_contents($destinationBlob->getContentStream());
$sourceBlobContent = $sourceBlob->getContentStream()->getContents();
$destinationBlobContent =
$destinationBlob->getContentStream()->getContents();
$this->assertEquals($sourceBlobContent, $destinationBlobContent);
}
@ -1580,7 +1585,7 @@ class BlobRestProxyTest extends BlobServiceRestProxyTestBase
$result = $this->restProxy->getBlob($name, $blob);
// Assert
$this->assertEquals($content, stream_get_contents($result->getContentStream()));
$this->assertEquals($content, $result->getContentStream()->getContents());
}
/**
@ -1668,4 +1673,453 @@ class BlobRestProxyTest extends BlobServiceRestProxyTestBase
$blocks = $result->getCommittedBlocks();
$this->assertEquals(count($blocks), ceil(strlen($content)/(4*1024*1024)));
}
/**
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
*/
public function testGetBlockBlobToFile()
{
// Setup
$name = 'getblob' . $this->createSuffix();
$blob = 'myblob';
$metadata = array('m1' => 'v1', 'm2' => 'v2');
$contentType = 'text/plain; charset=UTF-8';
$contentStream = 'Hello world';
$this->createContainer($name);
$options = new CreateBlobOptions();
$options->setContentType($contentType);
$options->setMetadata($metadata);
$this->restProxy->createBlockBlob(
$name,
$blob,
$contentStream,
$options
);
//get current working directory for the path to download
$cwd = getcwd();
$uuid = uniqid('test-file-', true);
$path = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
// Test
$result = $this->restProxy->getBlobToFile($path, $name, $blob);
$contents = file_get_contents($path);
// Assert
$this->assertEquals(BlobType::BLOCK_BLOB, $result->getProperties()->getBlobType());
$this->assertEquals($metadata, $result->getMetadata());
$this->assertEquals($contentStream, $contents);
// Delete file after assertion.
unlink($path);
}
/**
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
*/
public function testGetPageBlobToFile()
{
// Setup
$name = 'createblobpages' . $this->createSuffix();
$blob = 'myblob';
$length = 512;
$range = new PageRange(0, 511);
$content = Resources::EMPTY_STRING;
$this->createContainer($name);
$this->restProxy->createPageBlob($name, $blob, $length);
for ($i = 0; $i < 512; $i++) {
$content .= 'A';
}
$actual = $this->restProxy->createBlobPages($name, $blob, $range, $content);
//get current working directory for the path to download
$cwd = getcwd();
$uuid = uniqid('test-file-', true);
$path = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
// Test
$result = $this->restProxy->getBlobToFile($path, $name, $blob);
$contents = file_get_contents($path);
// Assert
$this->assertEquals(
BlobType::PAGE_BLOB,
$result->getProperties()->getBlobType()
);
$this->assertEquals($content, $contents);
unlink($path);
}
/**
* @group large-scale
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlob
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createBlockBlob
*/
public function testPutGetLargeBlockBlob()
{
// Setup
//create a temp file that is 2GB in size.
$cwd = getcwd();
$uuid = uniqid('test-file-', true);
$path = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
$resource = fopen($path, 'w+');
$count = 2 * 1024 / 4;
for ($index = 0; $index < $count; ++$index) {
fwrite($resource, openssl_random_pseudo_bytes(Resources::MB_IN_BYTES_4));
}
rewind($resource);
//upload the blob
$name = 'getblob' . $this->createSuffix();
$blob = 'myblob';
$metadata = array('m1' => 'v1', 'm2' => 'v2');
$contentType = 'text/plain; charset=UTF-8';
$this->createContainer($name);
$options = new CreateBlobOptions();
$options->setContentType($contentType);
$options->setMetadata($metadata);
$this->restProxy->createBlockBlob(
$name,
$blob,
$resource,
$options
);
// Test
$result = $this->restProxy->getBlob($name, $blob);
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
$downloadResource = fopen($downloadPath, 'w');
//download the file
$content = $result->getContentStream();
while (!$content->eof()) {
fwrite($downloadResource, $content->read(Resources::MB_IN_BYTES_4));
}
// Assert
$this->assertEquals(
BlobType::BLOCK_BLOB,
$result->getProperties()->getBlobType()
);
$this->assertEquals($metadata, $result->getMetadata());
$originMd5 = md5_file($path);
$downloadMd5 = md5_file($downloadPath);
$this->assertEquals($originMd5, $downloadMd5);
//clean-up.
if (is_resource($resource)) {
fclose($resource);
}
fclose($downloadResource);
unlink($path);
unlink($downloadPath);
}
/**
* @group large-scale
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlob
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createPageBlob
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createBlobPages
*/
public function testGetLargePageBlob()
{
//Setup
//create a temp file that is 2GB in size.
$cwd = getcwd();
$uuid = uniqid('test-file-', true);
$path = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
$resource = fopen($path, 'w+');
$count = 2 * 1024 / 4;
for ($index = 0; $index < $count; ++$index) {
fwrite($resource, openssl_random_pseudo_bytes(Resources::MB_IN_BYTES_4));
}
rewind($resource);
//upload the blob
$name = 'createblobpages' . $this->createSuffix();
$blob = 'myblob';
$length = $count * Resources::MB_IN_BYTES_4;
$this->createContainer($name);
$this->restProxy->createPageBlob($name, $blob, $length);
//upload the blob for 4MB a chunk
$chunkSize = Resources::MB_IN_BYTES_4;
$uploadCount = $length / $chunkSize;
for ($chunkIdx = 0; $chunkIdx < $uploadCount; ++$chunkIdx) {
$range = new PageRange(
$chunkSize * $chunkIdx,
($chunkSize * ($chunkIdx + 1)) - 1
);
$body = fread($resource, $chunkSize);
$actual = $this->restProxy->createBlobPages(
$name,
$blob,
$range,
$body
);
}
// Test
$result = $this->restProxy->getBlob($name, $blob);
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
$downloadResource = fopen($downloadPath, 'w');
//download the file
$content = $result->getContentStream();
while (!$content->eof()) {
fwrite($downloadResource, $content->read(Resources::MB_IN_BYTES_4));
}
// Assert
$this->assertEquals(
BlobType::PAGE_BLOB,
$result->getProperties()->getBlobType()
);
$originMd5 = md5_file($path);
$downloadMd5 = md5_file($downloadPath);
$this->assertEquals($originMd5, $downloadMd5);
// Delete file after assertion.
if (is_resource($resource)) {
fclose($resource);
}
fclose($downloadResource);
unlink($path);
unlink($downloadPath);
}
/**
* @group large-scale
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createBlockBlob
*/
public function testGetLargeBlockBlobToFile()
{
// Setup
//create a temp file that is 2GB in size.
$cwd = getcwd();
$uuid = uniqid('test-file-', true);
$path = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
$resource = fopen($path, 'w+');
$count = 2 * 1024 / 4;
for ($index = 0; $index < $count; ++$index) {
fwrite($resource, openssl_random_pseudo_bytes(Resources::MB_IN_BYTES_4));
}
rewind($resource);
//upload the blob
$name = 'getblob' . $this->createSuffix();
$blob = 'myblob';
$metadata = array('m1' => 'v1', 'm2' => 'v2');
$contentType = 'text/plain; charset=UTF-8';
$this->createContainer($name);
$options = new CreateBlobOptions();
$options->setContentType($contentType);
$options->setMetadata($metadata);
$this->restProxy->createBlockBlob(
$name,
$blob,
$resource,
$options
);
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
// Test
$result = $this->restProxy->getBlobToFile($downloadPath, $name, $blob);
// Assert
$this->assertEquals(
BlobType::BLOCK_BLOB,
$result->getProperties()->getBlobType()
);
$this->assertEquals($metadata, $result->getMetadata());
if (is_resource($resource)) {
fclose($resource);
}
$originMd5 = md5_file($path);
$downloadMd5 = md5_file($downloadPath);
$this->assertEquals($originMd5, $downloadMd5);
// Delete file after assertion.
unlink($path);
unlink($downloadPath);
}
/**
* @group large-scale
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createPageBlob
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createBlobPages
*/
public function testGetLargePageBlobToFile()
{
//Setup
//create a temp file that is 2GB in size.
$cwd = getcwd();
$uuid = uniqid('test-file-', true);
$path = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
$resource = fopen($path, 'w+');
$count = 2 * 1024 / 4;
for ($index = 0; $index < $count; ++$index) {
fwrite($resource, openssl_random_pseudo_bytes(Resources::MB_IN_BYTES_4));
}
rewind($resource);
//upload the blob
$name = 'createblobpages' . $this->createSuffix();
$blob = 'myblob';
$length = $count * Resources::MB_IN_BYTES_4;
$this->createContainer($name);
$this->restProxy->createPageBlob($name, $blob, $length);
//upload the blob for 4MB a chunk
$chunkSize = Resources::MB_IN_BYTES_4;
$uploadCount = $length / $chunkSize;
for ($chunkIdx = 0; $chunkIdx < $uploadCount; ++$chunkIdx) {
$range = new PageRange(
$chunkSize * $chunkIdx,
($chunkSize * ($chunkIdx + 1)) - 1
);
$body = fread($resource, $chunkSize);
$actual = $this->restProxy->createBlobPages(
$name,
$blob,
$range,
$body
);
}
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = $cwd.DIRECTORY_SEPARATOR.$uuid.'.txt';
// Test
$result = $this->restProxy->getBlobToFile($downloadPath, $name, $blob);
// Assert
$this->assertEquals(
BlobType::PAGE_BLOB,
$result->getProperties()->getBlobType()
);
if (is_resource($resource)) {
fclose($resource);
}
$originMd5 = md5_file($path);
$downloadMd5 = md5_file($downloadPath);
$this->assertEquals($originMd5, $downloadMd5);
// Delete file after assertion.
unlink($path);
unlink($downloadPath);
}
/**
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::createBlobPages
*/
public function testPageRangeCreationWithInvalidRange()
{
$errorMsg = '';
//upload the blob
$name = 'createblobpages' . $this->createSuffix();
$blob = 'myblob';
$length = 512;
$this->createContainer($name);
$this->restProxy->createPageBlob($name, $blob, $length);
//upload the blob
$range = new PageRange(0, 255);
$body = openssl_random_pseudo_bytes(256);
try {
$actual = $this->restProxy->createBlobPages(
$name,
$blob,
$range,
$body
);
} catch (\RuntimeException $e) {
$errorMsg = $e->getMessage();
}
$this->assertEquals($errorMsg, Resources::ERROR_RANGE_NOT_ALIGN_TO_512);
}
/**
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
*/
public function testGetBlobToFileWithInvalidPath()
{
$errorMsg = '';
//Create a random string that is 8MB in size.
$contentStr = openssl_random_pseudo_bytes(Resources::MB_IN_BYTES_4 * 2);
//upload the blob
$name = 'getblob' . $this->createSuffix();
$blob = 'myblob';
$metadata = array('m1' => 'v1', 'm2' => 'v2');
$contentType = 'text/plain; charset=UTF-8';
$this->restProxy->createContainer($name);
$options = new CreateBlobOptions();
$options->setContentType($contentType);
$options->setMetadata($metadata);
$this->restProxy->createBlockBlob(
$name,
$blob,
$contentStr,
$options
);
// Test
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = 'Zasdf:\\\\\\\\Invalid.PATH'.$uuid.'.txt';
error_reporting(E_ALL ^ E_WARNING);
try {
$result = $this->restProxy->getBlobToFile($downloadPath, $name, $blob);
} catch (\Exception $e) {
$errorMsg = $e->getMessage();
} finally {
error_reporting(E_ALL);
}
$this->assertEquals($errorMsg, Resources::ERROR_FILE_COULD_NOT_BE_OPENED);
}
/**
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
*/
public function testGetBlobToFileWithBlobNotExist()
{
$errorMsg = '';
$name = 'getblob' . $this->createSuffix();
$blob = 'non_existing_blob';
$this->restProxy->createContainer($name);
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = getcwd().DIRECTORY_SEPARATOR.$uuid.'.txt';
$downloadResource = fopen($downloadPath, 'w');
try {
$result = $this->restProxy->getBlobToFile($downloadPath, $name, $blob);
} catch (\Exception $e) {
$errorMsg = $e->getMessage();
}
$this->assertTrue(strpos($errorMsg, Resources::ERROR_BLOB_NOT_EXIST) != 0);
}
/**
* @covers \MicrosoftAzure\Storage\Blob\BlobRestProxy::getBlobToFile
*/
public function testGetBlobToFileWithContainerNotExist()
{
$errorMsg = '';
$name = 'nonexistingcontainer';
$blob = 'non_existing_blob';
//get the path for the file to be downloaded into.
$uuid = uniqid('test-file-', true);
$downloadPath = getcwd().DIRECTORY_SEPARATOR.$uuid.'.txt';
$downloadResource = fopen($downloadPath, 'w');
try {
$result = $this->restProxy->getBlobToFile($downloadPath, $name, $blob);
} catch (\Exception $e) {
$errorMsg = $e->getMessage();
}
$this->assertTrue(strpos($errorMsg, Resources::ERROR_CONTAINER_NOT_EXIST) != 0);
}
}