This commit is contained in:
Fred Park 2017-05-29 18:00:29 -07:00
Родитель e8ab378064
Коммит 53d0beb8d9
4 изменённых файлов: 149 добавлений и 41 удалений

Просмотреть файл

@ -22,6 +22,7 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import sys
from .version import __version__ # noqa
# monkeypatch User-Agent string
@ -31,3 +32,13 @@ azure.storage._constants.USER_AGENT_STRING = 'blobxfer/{} {}'.format(
# monkeypatch SOCKET_TIMEOUT value in Azure Storage SDK
azure.storage._constants.SOCKET_TIMEOUT = (5, 300)
# set stdin source
if sys.version_info >= (3, 0):
STDIN = sys.stdin.buffer
else:
# set stdin to binary mode on Windows
if sys.platform == 'win32':
import os, msvcrt # noqa
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
STDIN = sys.stdin

Просмотреть файл

@ -53,7 +53,7 @@ import blobxfer.util
logger = logging.getLogger(__name__)
# global defines
_MAX_BLOCK_BLOB_ONESHOT_BYTES = 268435456
_MAX_BLOCK_BLOB_CHUNKSIZE_BYTES = 268435456
_MAX_BLOCK_BLOB_CHUNKSIZE_BYTES = 104857600
_MAX_NONBLOCK_BLOB_CHUNKSIZE_BYTES = 4194304
_MAX_NUM_CHUNKS = 50000
_DEFAULT_AUTO_CHUNKSIZE_BYTES = 16777216
@ -93,17 +93,29 @@ class VectoredIoDistributionMode(enum.Enum):
class LocalPath(object):
"""Local Path"""
def __init__(self, parent_path, relative_path, view=None):
# type: (LocalPath, pathlib.Path, pathlib.Path, LocalPathView) -> None
def __init__(self, parent_path, relative_path, use_stdin=False, view=None):
# type: (LocalPath, pathlib.Path, pathlib.Path, bool,
# LocalPathView) -> None
"""Ctor for LocalPath
:param LocalPath self: this
:param pathlib.Path parent_path: parent path
:param pathlib.Path relative_path: relative path
:param bool use_stdin: use stdin
:param LocalPathView view: local path view
"""
self.parent_path = parent_path
self.relative_path = relative_path
self.use_stdin = use_stdin
# populate properties
if self.use_stdin:
# create dummy stat object
self._stat = type('stat', (object,), {})
self._stat.st_size = 0
self._stat.st_mtime = 0
self._stat.st_mode = 0
self._stat.st_uid = 0
self._stat.st_gid = 0
else:
self._stat = self.absolute_path.stat()
if view is None:
self.view = LocalPathView(
@ -194,13 +206,25 @@ class LocalSourcePath(blobxfer.models._BaseSourcePaths):
def can_rename(self):
# type: (LocalSourcePaths) -> bool
"""Check if ource can be renamed
"""Check if source can be renamed
:param LocalSourcePath self: this
:rtype: bool
:return: if rename possible
"""
return len(self._paths) == 1 and self._paths[0].is_file()
@staticmethod
def is_stdin(path):
# type: (str) -> bool
"""Check if path is stdin
:param str path: path to check
:rtype: bool
:return: if path is stdin
"""
if path == '-' or path == '/dev/stdin':
return True
return False
def files(self):
# type: (LocalSourcePaths) -> LocalPath
"""Generator for files in paths
@ -210,6 +234,15 @@ class LocalSourcePath(blobxfer.models._BaseSourcePaths):
"""
for _path in self._paths:
_ppath = os.path.expandvars(os.path.expanduser(str(_path)))
# check of path is stdin
if blobxfer.models.upload.LocalSourcePath.is_stdin(_ppath):
yield LocalPath(
parent_path=pathlib.Path(),
relative_path=pathlib.Path('stdin'),
use_stdin=True,
)
continue
# resolve path
_expath = pathlib.Path(_ppath).resolve()
# check if path is a single file
tmp = pathlib.Path(_ppath)
@ -217,7 +250,8 @@ class LocalSourcePath(blobxfer.models._BaseSourcePaths):
if self._inclusion_check(tmp.name):
yield LocalPath(
parent_path=tmp.parent,
relative_path=pathlib.Path(tmp.name)
relative_path=pathlib.Path(tmp.name),
use_stdin=False,
)
continue
del tmp
@ -225,7 +259,11 @@ class LocalSourcePath(blobxfer.models._BaseSourcePaths):
_rpath = pathlib.Path(entry.path).relative_to(_ppath)
if not self._inclusion_check(_rpath):
continue
yield LocalPath(parent_path=_expath, relative_path=_rpath)
yield LocalPath(
parent_path=_expath,
relative_path=_rpath,
use_stdin=False,
)
class Specification(object):
@ -523,6 +561,11 @@ class Descriptor(object):
logger.debug(
'auto-selected chunk size of {} for {}'.format(
chunk_size, self.local_path.absolute_path))
if self.local_path.use_stdin:
self._chunk_size = max(
(chunk_size, _MAX_NONBLOCK_BLOB_CHUNKSIZE_BYTES)
)
else:
self._chunk_size = min((chunk_size, self._ase.size))
# ensure chunk sizes are compatible with mode
if self._ase.mode == blobxfer.models.azure.StorageModes.Append:
@ -533,7 +576,8 @@ class Descriptor(object):
'from {}').format(
self._chunk_size, self.local_path.absolute_path))
elif self._ase.mode == blobxfer.models.azure.StorageModes.Block:
if self._ase.size <= options.one_shot_bytes:
if (not self.local_path.use_stdin and
self._ase.size <= options.one_shot_bytes):
self._chunk_size = min(
(self._ase.size, options.one_shot_bytes)
)
@ -569,6 +613,8 @@ class Descriptor(object):
chunks = int(math.ceil(self._ase.size / chunk_size))
except ZeroDivisionError:
chunks = 1
if self.local_path.use_stdin and chunks == 0:
chunks = 1
if chunks > 50000:
max_vector = False
if self._ase.mode == blobxfer.models.azure.StorageModes.Block:
@ -645,15 +691,17 @@ class Descriptor(object):
), resume_bytes
def read_data(self, offsets):
# type: (Descriptor, Offsets) -> bytes
# type: (Descriptor, Offsets) -> Tuple[bytes, Offsets]
"""Read data from file
:param Descriptor self: this
:param Offsets offsets: offsets
:rtype: bytes
:return: file data
:rtype: tuple
:return: (file data bytes, new Offsets if stdin)
"""
newoffset = None
if not self.local_path.use_stdin:
if offsets.num_bytes == 0:
return None
return None, None
# compute start from view
start = self.local_path.view.fd_start + offsets.range_start
# encrypted offsets will read past the end of the file due
@ -661,10 +709,31 @@ class Descriptor(object):
with self.local_path.absolute_path.open('rb') as fd:
fd.seek(start, 0)
data = fd.read(offsets.num_bytes)
if self.must_compute_md5:
else:
data = blobxfer.STDIN.read(self._chunk_size)
if not data:
with self._meta_lock:
self._total_chunks -= 1
self._chunk_num -= 1
self._outstanding_ops -= 1
else:
num_bytes = len(data)
with self._meta_lock:
newoffset = Offsets(
chunk_num=self._chunk_num - 1,
num_bytes=num_bytes,
range_start=self._offset,
range_end=self._offset + num_bytes - 1,
pad=False,
)
self._total_chunks += 1
self._outstanding_ops += 1
self._offset += num_bytes
self._ase.size += num_bytes
if self.must_compute_md5 and data:
with self._hasher_lock:
self.md5.update(data)
return data
return data, newoffset
def generate_metadata(self):
# type: (Descriptor) -> dict
@ -690,7 +759,7 @@ class Descriptor(object):
encmeta = self._ase.encryption_metadata.convert_to_json_with_mac(
md5digest, hmacdigest)
# generate file attribute metadata
if self._store_file_attr:
if self._store_file_attr and not self.local_path.use_stdin:
merged = blobxfer.models.metadata.generate_fileattr_metadata(
self.local_path, genmeta)
if merged is not None:

Просмотреть файл

@ -48,9 +48,9 @@ logger = logging.getLogger(__name__)
def update_progress_bar(
go, optext, start, total_files, files_sofar, total_bytes,
bytes_sofar):
bytes_sofar, stdin_upload=False):
# type: (blobxfer.models.options.General, str, datetime.datetime, int,
# int, int, int) -> None
# int, int, int, bool) -> None
"""Update the progress bar
:param blobxfer.models.options.General go: general options
:param str optext: operation prefix text
@ -59,6 +59,7 @@ def update_progress_bar(
:param int files_sofar: files transfered so far
:param int total_bytes: total number of bytes
:param int bytes_sofar: bytes transferred so far
:param bool stdin_upload: stdin upload
"""
if (not go.progress_bar or blobxfer.util.is_none_or_empty(go.log_file) or
start is None):
@ -80,6 +81,13 @@ def update_progress_bar(
fprog = 'n/a'
else:
fprog = '{}/{}'.format(files_sofar, total_files)
if stdin_upload:
sys.stdout.write(
('\r{0} progress: [{1:30s}] n/a % {2:12.3f} MiB/sec, '
'{3} {4}').format(
optext, '>' * int(total_bytes % 30), rate, fprog, rtext)
)
else:
sys.stdout.write(
('\r{0} progress: [{1:30s}] {2:.2f}% {3:12.3f} MiB/sec, '
'{4} {5}').format(

Просмотреть файл

@ -191,10 +191,11 @@ class Uploader(object):
"""
return '{}.bxslice-{}'.format(name, slice)
def _update_progress_bar(self):
# type: (Uploader) -> None
def _update_progress_bar(self, stdin=False):
# type: (Uploader, bool) -> None
"""Update progress bar
:param Uploader self: this
:param bool stdin: stdin upload
"""
if not self._all_files_processed:
return
@ -206,6 +207,7 @@ class Uploader(object):
self._upload_sofar,
self._upload_bytes_total,
self._upload_bytes_sofar,
stdin_upload=stdin,
)
def _pre_md5_skip_on_check(self, src, rfile):
@ -370,7 +372,9 @@ class Uploader(object):
self._put_data(ud, ase, offsets, data)
# accounting
with self._transfer_lock:
if offsets.chunk_num == 0:
if ud.local_path.use_stdin:
self._upload_bytes_total += offsets.num_bytes
elif offsets.chunk_num == 0:
self._upload_bytes_total += ase.size
self._upload_bytes_sofar += offsets.num_bytes
self._transfer_set.remove(
@ -378,7 +382,7 @@ class Uploader(object):
ud.local_path, ase, offsets))
ud.complete_offset_upload()
# update progress bar
self._update_progress_bar()
self._update_progress_bar(stdin=ud.local_path.use_stdin)
def _put_data(self, ud, ase, offsets, data):
# type: (Uploader, blobxfer.models.upload.Descriptor,
@ -462,7 +466,15 @@ class Uploader(object):
:param blobxfer.models.azure.StorageEntity ase: Storage entity
:param blobxfer.models.upload.Offsets offsets: offsets
"""
if ase.mode == blobxfer.models.azure.StorageModes.Block:
if ase.mode == blobxfer.models.azure.StorageModes.Append:
# create container if necessary
blobxfer.operations.azure.blob.create_container(
ase, self._containers_created,
timeout=self._general_options.timeout_sec)
# create remote blob
blobxfer.operations.azure.blob.append.create_blob(
ase, timeout=self._general_options.timeout_sec)
elif ase.mode == blobxfer.models.azure.StorageModes.Block:
# create container if necessary
blobxfer.operations.azure.blob.create_container(
ase, self._containers_created,
@ -496,7 +508,7 @@ class Uploader(object):
:param Uploader self: this
:param blobxfer.models.upload.Descriptor: upload descriptor
"""
# get download offsets
# get upload offsets
offsets, resume_bytes = ud.next_offsets()
# add resume bytes to counter
if resume_bytes is not None:
@ -531,7 +543,7 @@ class Uploader(object):
# encrypt data
if self._crypto_offload is None:
# read data from file and encrypt
data = ud.read_data(offsets)
data, _ = ud.read_data(offsets)
encdata = blobxfer.operations.crypto.aes_cbc_encrypt_data(
ud.entity.encryption_metadata.symmetric_key,
ud.current_iv, data, offsets.pad)
@ -552,9 +564,15 @@ class Uploader(object):
# retrieved from crypto queue
# return_early = True
else:
data = ud.read_data(offsets)
data, newoffset = ud.read_data(offsets)
# set new offset if stdin
if newoffset is not None:
offsets = newoffset
# re-enqueue for other threads to upload
self._upload_queue.put(ud)
# no data can be returned on stdin uploads
if not data:
return
# add data to transfer queue
with self._transfer_lock:
self._transfer_set.add(
@ -713,7 +731,7 @@ class Uploader(object):
"""
lpath = local_path.absolute_path
# check if local file still exists
if not lpath.exists():
if not local_path.use_stdin and not lpath.exists():
return UploadAction.Skip
# if remote file doesn't exist, upload
if rfile is None:
@ -849,7 +867,8 @@ class Uploader(object):
:return: action, LocalPath, ase
"""
if (self._spec.options.vectored_io.distribution_mode ==
blobxfer.models.upload.VectoredIoDistributionMode.Stripe):
blobxfer.models.upload.VectoredIoDistributionMode.Stripe and
not local_path.use_stdin):
# compute total number of slices
slices = int(math.ceil(
local_path.total_size /
@ -897,6 +916,7 @@ class Uploader(object):
lp_slice = blobxfer.models.upload.LocalPath(
parent_path=local_path.parent_path,
relative_path=local_path.relative_path,
use_stdin=False,
view=blobxfer.models.upload.LocalPathView(
fd_start=start,
fd_end=end,