Merge pull request #374 from zezha-msft/md5-mismatch-bug
Changed upload algorithm for large block blobs when using MD5 validation
This commit is contained in:
Коммит
769868f9ee
|
@ -1,3 +1,8 @@
|
|||
# Change Log azure-storage-blob
|
||||
|
||||
> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
|
||||
|
||||
## Version XX.XX.XX:
|
||||
|
||||
- Enabling MD5 validation no longer uses the memory-efficient algorithm for large block blobs, since computing the MD5 hash requires reading the entire block into memory.
|
||||
- Fixed a bug in the _SubStream class which was at risk of causing data corruption when using the memory-efficient algorithm for large block blobs.
|
||||
|
|
|
@ -18,3 +18,6 @@ __version__ = '0.37.0'
|
|||
|
||||
# x-ms-version for storage service.
|
||||
X_MS_VERSION = '2017-04-17'
|
||||
|
||||
# internal configurations, should not be changed
|
||||
_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
|
||||
|
|
|
@ -27,6 +27,9 @@ from ._encryption import (
|
|||
_get_blob_encryptor_and_padder,
|
||||
)
|
||||
from .models import BlobBlock
|
||||
from ._constants import (
|
||||
_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
|
||||
)
|
||||
|
||||
|
||||
def _upload_blob_chunks(blob_service, container_name, blob_name,
|
||||
|
@ -342,6 +345,7 @@ class _SubStream(IOBase):
|
|||
# derivations of io.IOBase and thus do not implement seekable().
|
||||
# Python > 3.0: file-like objects created with open() are derived from io.IOBase.
|
||||
try:
|
||||
# only the main thread runs this, so there's no need grabbing the lock
|
||||
wrapped_stream.seek(0, SEEK_CUR)
|
||||
except:
|
||||
raise ValueError("Wrapped stream must support seek().")
|
||||
|
@ -351,9 +355,14 @@ class _SubStream(IOBase):
|
|||
self._position = 0
|
||||
self._stream_begin_index = stream_begin_index
|
||||
self._length = length
|
||||
self._count = 0
|
||||
self._buffer = BytesIO()
|
||||
self._read_buffer_size = 4 * 1024 * 1024
|
||||
|
||||
# we must avoid buffering more than necessary, and also not use up too much memory
|
||||
# so the max buffer size is capped at 4MB
|
||||
self._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \
|
||||
else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
|
||||
self._current_buffer_start = 0
|
||||
self._current_buffer_size = 0
|
||||
|
||||
def __len__(self):
|
||||
return self._length
|
||||
|
@ -382,35 +391,45 @@ class _SubStream(IOBase):
|
|||
if n is 0 or self._buffer.closed:
|
||||
return b''
|
||||
|
||||
# attempt first read from the read buffer
|
||||
# attempt first read from the read buffer and update position
|
||||
read_buffer = self._buffer.read(n)
|
||||
bytes_read = len(read_buffer)
|
||||
bytes_remaining = n - bytes_read
|
||||
self._position += bytes_read
|
||||
|
||||
# repopulate the read buffer from the underlying stream to fulfill the request
|
||||
# ensure the seek and read operations are done atomically (only if a lock is provided)
|
||||
if bytes_remaining > 0:
|
||||
with self._buffer:
|
||||
# either read in the max buffer size specified on the class
|
||||
# or read in just enough data for the current block/sub stream
|
||||
current_max_buffer_size = min(self._max_buffer_size, self._length - self._position)
|
||||
|
||||
# lock is only defined if max_connections > 1 (parallel uploads)
|
||||
if self._lock:
|
||||
with self._lock:
|
||||
# reposition the underlying stream to match the start of the substream
|
||||
# reposition the underlying stream to match the start of the data to read
|
||||
absolute_position = self._stream_begin_index + self._position
|
||||
self._wrapped_stream.seek(absolute_position, SEEK_SET)
|
||||
# If we can't seek to the right location, our read will be corrupted so fail fast.
|
||||
if self._wrapped_stream.tell() != absolute_position:
|
||||
raise IOError("Stream failed to seek to the desired location.")
|
||||
buffer_from_stream = self._wrapped_stream.read(self._read_buffer_size)
|
||||
buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)
|
||||
else:
|
||||
buffer_from_stream = self._wrapped_stream.read(self._read_buffer_size)
|
||||
buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size)
|
||||
|
||||
if buffer_from_stream:
|
||||
# update the buffer with new data from the wrapped stream
|
||||
# we need to note down the start position and size of the buffer, in case seek is performed later
|
||||
self._buffer = BytesIO(buffer_from_stream)
|
||||
second_read_buffer = self._buffer.read(bytes_remaining)
|
||||
bytes_read += len(second_read_buffer)
|
||||
read_buffer += second_read_buffer
|
||||
self._current_buffer_start = self._position
|
||||
self._current_buffer_size = len(buffer_from_stream)
|
||||
|
||||
# read the remaining bytes from the new buffer and update position
|
||||
second_read_buffer = self._buffer.read(bytes_remaining)
|
||||
read_buffer += second_read_buffer
|
||||
self._position += len(second_read_buffer)
|
||||
|
||||
self._position += bytes_read
|
||||
return read_buffer
|
||||
|
||||
def readable(self):
|
||||
|
@ -437,6 +456,15 @@ class _SubStream(IOBase):
|
|||
elif pos < 0:
|
||||
pos = 0
|
||||
|
||||
# check if buffer is still valid
|
||||
# if not, drop buffer
|
||||
if pos < self._current_buffer_start or pos >= self._current_buffer_start + self._current_buffer_size:
|
||||
self._buffer.close()
|
||||
self._buffer = BytesIO()
|
||||
else: # if yes seek to correct position
|
||||
delta = pos - self._current_buffer_start
|
||||
self._buffer.seek(delta, SEEK_SET)
|
||||
|
||||
self._position = pos
|
||||
return pos
|
||||
|
||||
|
|
|
@ -348,7 +348,9 @@ class BlockBlobService(BaseBlobService):
|
|||
that was sent. This is primarily valuable for detecting bitflips on
|
||||
the wire if using http instead of https as https (the default) will
|
||||
already validate. Note that this MD5 hash is not stored with the
|
||||
blob.
|
||||
blob. Also note that if enabled, the memory-efficient upload algorithm
|
||||
will not be used, because computing the MD5 hash requires buffering
|
||||
entire blocks, and doing so defeats the purpose of the memory-efficient algorithm.
|
||||
:param progress_callback:
|
||||
Callback for progress with signature function(current, total) where
|
||||
current is the number of bytes transfered so far, and total is the
|
||||
|
@ -441,7 +443,9 @@ class BlockBlobService(BaseBlobService):
|
|||
that was sent. This is primarily valuable for detecting bitflips on
|
||||
the wire if using http instead of https as https (the default) will
|
||||
already validate. Note that this MD5 hash is not stored with the
|
||||
blob.
|
||||
blob. Also note that if enabled, the memory-efficient upload algorithm
|
||||
will not be used, because computing the MD5 hash requires buffering
|
||||
entire blocks, and doing so defeats the purpose of the memory-efficient algorithm.
|
||||
:param progress_callback:
|
||||
Callback for progress with signature function(current, total) where
|
||||
current is the number of bytes transfered so far, and total is the
|
||||
|
@ -507,6 +511,7 @@ class BlockBlobService(BaseBlobService):
|
|||
if (self.key_encryption_key is not None) and (adjusted_count is not None):
|
||||
adjusted_count += (16 - (count % 16))
|
||||
|
||||
# Do single put if the size is smaller than MAX_SINGLE_PUT_SIZE
|
||||
if adjusted_count is not None and (adjusted_count < self.MAX_SINGLE_PUT_SIZE):
|
||||
if progress_callback:
|
||||
progress_callback(0, count)
|
||||
|
@ -530,10 +535,10 @@ class BlockBlobService(BaseBlobService):
|
|||
progress_callback(count, count)
|
||||
|
||||
return resp
|
||||
else:
|
||||
else: # Size is larger than MAX_SINGLE_PUT_SIZE, must upload with multiple put_block calls
|
||||
cek, iv, encryption_data = None, None, None
|
||||
|
||||
use_original_upload_path = use_byte_buffer or self.require_encryption or \
|
||||
use_original_upload_path = use_byte_buffer or validate_content or self.require_encryption or \
|
||||
self.MAX_BLOCK_SIZE < self.MIN_LARGE_BLOCK_UPLOAD_THRESHOLD or \
|
||||
hasattr(stream, 'seekable') and not stream.seekable() or \
|
||||
not hasattr(stream, 'seek') or not hasattr(stream, 'tell')
|
||||
|
|
|
@ -36,11 +36,12 @@ from tests.testcase import (
|
|||
# ------------------------------------------------------------------------------
|
||||
TEST_BLOB_PREFIX = 'largeblob'
|
||||
FILE_PATH = 'blob_large_input.temp.dat'
|
||||
LARGE_BLOB_SIZE = 6 * 1024 * 1024
|
||||
|
||||
LARGE_BLOB_SIZE = 12 * 1024 * 1024
|
||||
LARGE_BLOCK_SIZE = 6 * 1024 * 1024
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
class StorageLargeBlockBlobTest(StorageTestCase):
|
||||
def setUp(self):
|
||||
super(StorageLargeBlockBlobTest, self).setUp()
|
||||
|
@ -56,7 +57,7 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
# for chunking and the size of each chunk, otherwise
|
||||
# the tests would take too long to execute
|
||||
self.bs.MAX_BLOCK_SIZE = 2 * 1024 * 1024
|
||||
self.bs.MIN_LARGE_BLOCK_UPLOAD_THRESHOLD = 1 * 1024 * 1024;
|
||||
self.bs.MIN_LARGE_BLOCK_UPLOAD_THRESHOLD = 1 * 1024 * 1024
|
||||
self.bs.MAX_SINGLE_PUT_SIZE = 32 * 1024
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -89,8 +90,7 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
|
||||
# --Test cases for block blobs --------------------------------------------
|
||||
|
||||
|
||||
def test_put_block_large(self):
|
||||
def test_put_block_bytes_large(self):
|
||||
if TestMode.need_recording_file(self.test_mode):
|
||||
return
|
||||
|
||||
|
@ -101,12 +101,28 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
for i in range(5):
|
||||
resp = self.bs.put_block(self.container_name,
|
||||
blob_name,
|
||||
'block {0}'.format(i).encode('utf-8'),
|
||||
i)
|
||||
os.urandom(LARGE_BLOCK_SIZE),
|
||||
'block {0}'.format(i).encode('utf-8'),)
|
||||
self.assertIsNone(resp)
|
||||
|
||||
# Assert
|
||||
|
||||
def test_put_block_bytes_large_with_md5(self):
|
||||
if TestMode.need_recording_file(self.test_mode):
|
||||
return
|
||||
|
||||
# Arrange
|
||||
blob_name = self._create_blob()
|
||||
|
||||
# Act
|
||||
for i in range(5):
|
||||
resp = self.bs.put_block(self.container_name,
|
||||
blob_name,
|
||||
os.urandom(LARGE_BLOCK_SIZE),
|
||||
'block {0}'.format(i).encode('utf-8'),
|
||||
validate_content=True)
|
||||
self.assertIsNone(resp)
|
||||
|
||||
def test_put_block_stream_large(self):
|
||||
if TestMode.need_recording_file(self.test_mode):
|
||||
return
|
||||
|
@ -116,17 +132,16 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
|
||||
# Act
|
||||
for i in range(5):
|
||||
stream = BytesIO(bytearray(self.bs.MAX_BLOCK_SIZE))
|
||||
stream = BytesIO(bytearray(LARGE_BLOCK_SIZE))
|
||||
resp = self.bs.put_block(self.container_name,
|
||||
blob_name,
|
||||
'block {0}'.format(i).encode('utf-8'),
|
||||
stream)
|
||||
stream,
|
||||
'block {0}'.format(i).encode('utf-8'),)
|
||||
self.assertIsNone(resp)
|
||||
|
||||
# Assert
|
||||
|
||||
|
||||
def test_put_block_stream_with_md5(self):
|
||||
def test_put_block_stream_large_with_md5(self):
|
||||
if TestMode.need_recording_file(self.test_mode):
|
||||
return
|
||||
|
||||
|
@ -135,11 +150,11 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
|
||||
# Act
|
||||
for i in range(5):
|
||||
stream = BytesIO(bytearray(self.bs.MAX_BLOCK_SIZE))
|
||||
stream = BytesIO(bytearray(LARGE_BLOCK_SIZE))
|
||||
resp = self.bs.put_block(self.container_name,
|
||||
blob_name,
|
||||
'block {0}'.format(i).encode('utf-8'),
|
||||
stream,
|
||||
'block {0}'.format(i).encode('utf-8'),
|
||||
validate_content=True)
|
||||
self.assertIsNone(resp)
|
||||
|
||||
|
@ -162,6 +177,22 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
# Assert
|
||||
self.assertBlobEqual(self.container_name, blob_name, data)
|
||||
|
||||
def test_create_large_blob_from_path_with_md5(self):
|
||||
# parallel tests introduce random order of requests, can only run live
|
||||
if TestMode.need_recording_file(self.test_mode):
|
||||
return
|
||||
|
||||
# Arrange
|
||||
blob_name = self._get_blob_reference()
|
||||
data = bytearray(os.urandom(LARGE_BLOB_SIZE))
|
||||
with open(FILE_PATH, 'wb') as stream:
|
||||
stream.write(data)
|
||||
|
||||
# Act
|
||||
self.bs.create_blob_from_path(self.container_name, blob_name, FILE_PATH, validate_content=True)
|
||||
|
||||
# Assert
|
||||
self.assertBlobEqual(self.container_name, blob_name, data)
|
||||
|
||||
def test_create_large_blob_from_path_non_parallel(self):
|
||||
if TestMode.need_recording_file(self.test_mode):
|
||||
|
@ -339,5 +370,7 @@ class StorageLargeBlockBlobTest(StorageTestCase):
|
|||
self.assertEqual(properties.content_settings.content_language, content_settings.content_language)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
# coding: utf-8
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# --------------------------------------------------------------------------
|
||||
import os
|
||||
|
||||
from azure.storage.blob._upload_chunking import _SubStream
|
||||
from threading import Lock
|
||||
from io import (BytesIO, SEEK_SET)
|
||||
|
||||
from tests.testcase import (
|
||||
StorageTestCase,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
class StorageBlobUploadChunkingTest(StorageTestCase):
|
||||
|
||||
# this is a white box test that's designed to make sure _Substream behaves properly
|
||||
# when the buffer needs to be swapped out at least once
|
||||
def test_sub_stream_with_length_larger_than_buffer(self):
|
||||
data = os.urandom(12 * 1024 * 1024)
|
||||
|
||||
# assuming the max size of the buffer is 4MB, this test needs to be updated if that has changed
|
||||
# the block size is 6MB for this test
|
||||
expected_data = data[0: 6 * 1024 * 1024]
|
||||
wrapped_stream = BytesIO(data) # simulate stream given by user
|
||||
lockObj = Lock() # simulate multi-threaded environment
|
||||
substream = _SubStream(wrapped_stream, stream_begin_index=0, length=6 * 1024 * 1024, lockObj=lockObj)
|
||||
|
||||
try:
|
||||
# substream should start with position at 0
|
||||
self.assertEqual(substream.tell(), 0)
|
||||
|
||||
# reading a chunk that is smaller than the buffer
|
||||
data_chunk_1 = substream.read(2 * 1024 * 1024)
|
||||
self.assertEqual(len(data_chunk_1), 2 * 1024 * 1024)
|
||||
|
||||
# reading a chunk that is bigger than the data remaining in buffer, force a buffer swap
|
||||
data_chunk_2 = substream.read(4 * 1024 * 1024)
|
||||
self.assertEqual(len(data_chunk_2), 4 * 1024 * 1024)
|
||||
|
||||
# assert data is consistent
|
||||
self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)
|
||||
self.assertEqual(6 * 1024 * 1024, substream.tell())
|
||||
|
||||
# attempt to read more than what the sub stream contains should return nothing
|
||||
empty_data = substream.read(1 * 1024 * 1024)
|
||||
self.assertEqual(0, len(empty_data))
|
||||
self.assertEqual(6 * 1024 * 1024, substream.tell())
|
||||
|
||||
# test seek outside of current buffer, which is at the moment the last 2MB of data
|
||||
substream.seek(0, SEEK_SET)
|
||||
data_chunk_1 = substream.read(4 * 1024 * 1024)
|
||||
data_chunk_2 = substream.read(2 * 1024 * 1024)
|
||||
|
||||
# assert data is consistent
|
||||
self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)
|
||||
|
||||
# test seek inside of buffer, which is at the moment the last 2MB of data
|
||||
substream.seek(4 * 1024 * 1024, SEEK_SET)
|
||||
data_chunk_2 = substream.read(2 * 1024 * 1024)
|
||||
|
||||
# assert data is consistent
|
||||
self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)
|
||||
|
||||
finally:
|
||||
wrapped_stream.close()
|
||||
substream.close()
|
||||
|
||||
# this is a white box test that's designed to make sure _Substream behaves properly
|
||||
# when block size is smaller than 4MB, thus there's no need for buffer swap
|
||||
def test_sub_stream_with_length_equal_to_buffer(self):
|
||||
data = os.urandom(6 * 1024 * 1024)
|
||||
|
||||
# assuming the max size of the buffer is 4MB, this test needs to be updated if that has changed
|
||||
# the block size is 2MB for this test
|
||||
expected_data = data[0: 2 * 1024 * 1024]
|
||||
wrapped_stream = BytesIO(expected_data) # simulate stream given by user
|
||||
lockObj = Lock() # simulate multi-threaded environment
|
||||
substream = _SubStream(wrapped_stream, stream_begin_index=0, length=2 * 1024 * 1024, lockObj=lockObj)
|
||||
|
||||
try:
|
||||
# substream should start with position at 0
|
||||
self.assertEqual(substream.tell(), 0)
|
||||
|
||||
# reading a chunk that is smaller than the buffer
|
||||
data_chunk_1 = substream.read(1 * 1024 * 1024)
|
||||
self.assertEqual(len(data_chunk_1), 1 * 1024 * 1024)
|
||||
|
||||
# reading a chunk that is bigger than the buffer, should not read anything beyond
|
||||
data_chunk_2 = substream.read(4 * 1024 * 1024)
|
||||
self.assertEqual(len(data_chunk_2), 1 * 1024 * 1024)
|
||||
|
||||
# assert data is consistent
|
||||
self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)
|
||||
|
||||
# test seek
|
||||
substream.seek(1 * 1024 * 1024, SEEK_SET)
|
||||
data_chunk_2 = substream.read(1 * 1024 * 1024)
|
||||
|
||||
# assert data is consistent
|
||||
self.assertEqual(data_chunk_1 + data_chunk_2, expected_data)
|
||||
|
||||
finally:
|
||||
wrapped_stream.close()
|
||||
substream.close()
|
Загрузка…
Ссылка в новой задаче