Prepare for Singularity3 work (#276)
* Remove torrent functionality * Remove torrent storage * Fix singularity permissions * Add container mode in cascade.py * Fix errors * Fix PR comments * Fix flake8 errors
This commit is contained in:
Родитель
a16e125056
Коммит
a68579c095
|
@ -28,6 +28,7 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import datetime
|
||||
import enum
|
||||
import hashlib
|
||||
import logging
|
||||
import logging.handlers
|
||||
|
@ -39,7 +40,6 @@ except ImportError:
|
|||
pass
|
||||
import queue
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
|
@ -49,33 +49,20 @@ from typing import Tuple
|
|||
import azure.common
|
||||
import azure.cosmosdb.table as azuretable
|
||||
import azure.storage.blob as azureblob
|
||||
try:
|
||||
import libtorrent
|
||||
_LIBTORRENT_IMPORTED = True
|
||||
except ImportError:
|
||||
_LIBTORRENT_IMPORTED = False
|
||||
|
||||
# create logger
|
||||
logger = logging.getLogger('cascade')
|
||||
# global defines
|
||||
_ON_WINDOWS = sys.platform == 'win32'
|
||||
_DEFAULT_PORT_BEGIN = 6881
|
||||
_DEFAULT_PORT_END = 6891
|
||||
_CONTAINER_MODE = None
|
||||
_DOCKER_TAG = 'docker:'
|
||||
_SINGULARITY_TAG = 'singularity:'
|
||||
_TORRENT_STATE = [
|
||||
'queued', 'checking', 'downloading metadata', 'downloading', 'finished',
|
||||
'seeding', 'allocating', 'checking fastresume'
|
||||
]
|
||||
_TORRENT_SESSION = None
|
||||
_NODEID = os.environ['AZ_BATCH_NODE_ID']
|
||||
_NODE_ROOT_DIR = os.environ['AZ_BATCH_NODE_ROOT_DIR']
|
||||
_TEMP_MOUNT_DIR = pathlib.Path(_NODE_ROOT_DIR, '..', '..').resolve()
|
||||
try:
|
||||
_SINGULARITY_CACHE_DIR = pathlib.Path(os.environ['SINGULARITY_CACHEDIR'])
|
||||
except KeyError:
|
||||
_SINGULARITY_CACHE_DIR = None
|
||||
_TORRENT_DIR = pathlib.Path(_NODE_ROOT_DIR, 'torrents')
|
||||
try:
|
||||
_AZBATCH_USER = pwd.getpwnam('_azbatch')
|
||||
except NameError:
|
||||
|
@ -83,35 +70,21 @@ except NameError:
|
|||
_PARTITION_KEY = None
|
||||
_MAX_VMLIST_PROPERTIES = 13
|
||||
_MAX_VMLIST_IDS_PER_PROPERTY = 800
|
||||
_LR_LOCK_ASYNC = asyncio.Lock()
|
||||
_PT_LOCK = threading.Lock()
|
||||
_DIRECTDL_LOCK = threading.Lock()
|
||||
_ENABLE_P2P = True
|
||||
_CONCURRENT_DOWNLOADS_ALLOWED = 10
|
||||
_COMPRESSION = True
|
||||
_SEED_BIAS = 3
|
||||
_SAVELOAD_FILE_EXTENSION = 'tar.gz'
|
||||
_RECORD_PERF = int(os.getenv('SHIPYARD_TIMING', default='0'))
|
||||
# mutable global state
|
||||
_CBHANDLES = {}
|
||||
_BLOB_LEASES = {}
|
||||
_DHT_ROUTERS = []
|
||||
_PREFIX = None
|
||||
_STORAGE_CONTAINERS = {
|
||||
'blob_globalresources': None,
|
||||
'blob_torrents': None,
|
||||
'table_dht': None,
|
||||
'table_torrentinfo': None,
|
||||
'table_images': None,
|
||||
'table_globalresources': None,
|
||||
}
|
||||
_TORRENTS = {}
|
||||
_PENDING_TORRENTS = {}
|
||||
_TORRENT_REVERSE_LOOKUP = {}
|
||||
_DIRECTDL_QUEUE = queue.Queue()
|
||||
_DIRECTDL_DOWNLOADING = set()
|
||||
_GR_DONE = False
|
||||
_LAST_DHT_INFO_DUMP = None
|
||||
_THREAD_EXCEPTIONS = []
|
||||
_DOCKER_PULL_ERRORS = frozenset((
|
||||
'toomanyrequests',
|
||||
|
@ -123,6 +96,11 @@ _DOCKER_PULL_ERRORS = frozenset((
|
|||
))
|
||||
|
||||
|
||||
class ContainerMode(enum.Enum):
|
||||
DOCKER = 1
|
||||
SINGULARITY = 2
|
||||
|
||||
|
||||
class StandardStreamLogger:
|
||||
"""Standard Stream Logger"""
|
||||
def __init__(self, level):
|
||||
|
@ -178,10 +156,6 @@ def _setup_storage_names(sep: str) -> None:
|
|||
raise ValueError('storage_entity_prefix is invalid')
|
||||
_STORAGE_CONTAINERS['blob_globalresources'] = '-'.join(
|
||||
(sep + 'gr', batchaccount, poolid))
|
||||
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
|
||||
(sep + 'tor', batchaccount, poolid))
|
||||
_STORAGE_CONTAINERS['table_dht'] = sep + 'dht'
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
|
||||
_STORAGE_CONTAINERS['table_images'] = sep + 'images'
|
||||
_STORAGE_CONTAINERS['table_globalresources'] = sep + 'gr'
|
||||
_PREFIX = sep
|
||||
|
@ -234,81 +208,6 @@ def _record_perf(event: str, message: str) -> None:
|
|||
ev=event, pr=_PREFIX, msg=message), shell=True)
|
||||
|
||||
|
||||
def generate_torrent(incl_file: pathlib.Path, resource_hash: str) -> dict:
|
||||
"""Generate torrent file for a given file and write it to disk
|
||||
:param pathlib.Path incl_file: file to include in torrent
|
||||
:param str resource_hash: resource hash
|
||||
:rtype: tuple
|
||||
:return: (torrent file as pathlib, torrent file data sha1 hash)
|
||||
"""
|
||||
fs = libtorrent.file_storage()
|
||||
libtorrent.add_files(fs, str(incl_file))
|
||||
tor = libtorrent.create_torrent(fs)
|
||||
tor.set_creator('libtorrent {}'.format(libtorrent.version))
|
||||
libtorrent.set_piece_hashes(tor, str(incl_file.parent))
|
||||
torrent = tor.generate()
|
||||
torrent_data = libtorrent.bencode(torrent)
|
||||
fp = _TORRENT_DIR / '{}.torrent'.format(resource_hash)
|
||||
with fp.open('wb') as f:
|
||||
f.write(torrent_data)
|
||||
return fp, hashlib.sha1(torrent_data).hexdigest()
|
||||
|
||||
|
||||
def create_torrent_session(
|
||||
resource: str, save_path: pathlib.Path, seed_mode: bool):
|
||||
"""Create a torrent session given a torrent file
|
||||
:param str resource: torrent resource
|
||||
:param pathlib.Path save_path: path to save torrented files to
|
||||
:param bool seed_mode: seed mode
|
||||
:param list port_range: port range to listen on
|
||||
:return: torrent_handle
|
||||
"""
|
||||
torrent_handle = _TORRENT_SESSION.add_torrent({
|
||||
'ti': libtorrent.torrent_info(
|
||||
str(_TORRENTS[resource]['torrent_file'])),
|
||||
'save_path': str(save_path),
|
||||
'seed_mode': seed_mode
|
||||
})
|
||||
logger.info('created torrent session for {} is_seed={}'.format(
|
||||
resource, torrent_handle.is_seed()))
|
||||
return torrent_handle
|
||||
|
||||
|
||||
def _remove_torrent_from_session(resource: str, torrent_handle) -> None:
|
||||
"""Remove a torrent from the session
|
||||
:param str resource: torrent resource
|
||||
:param torrent_handle: torrent handle
|
||||
"""
|
||||
_TORRENT_SESSION.remove_torrent(torrent_handle)
|
||||
# wait for removal alert
|
||||
retries = 5
|
||||
while True:
|
||||
alert = _TORRENT_SESSION.pop_alert()
|
||||
if not alert:
|
||||
retries -= 1
|
||||
if retries == 0:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
continue
|
||||
if isinstance(alert, str):
|
||||
logger.warning('received alert: {}'.format(alert))
|
||||
else:
|
||||
logger.warning('received alert: {}'.format(alert.message()))
|
||||
logger.info('removed torrent for {}'.format(resource))
|
||||
|
||||
|
||||
def add_dht_node(ip: str, port: int):
|
||||
"""Add a node as a DHT router
|
||||
:param str ip: ip address of the dht node
|
||||
:param int port: port of the dht node
|
||||
"""
|
||||
if ip not in _DHT_ROUTERS:
|
||||
_TORRENT_SESSION.add_dht_router(ip, port)
|
||||
logger.debug('added {}:{} as dht router'.format(ip, port))
|
||||
_DHT_ROUTERS.append(ip)
|
||||
|
||||
|
||||
def _renew_blob_lease(
|
||||
loop: asyncio.BaseEventLoop,
|
||||
blob_client: azureblob.BlockBlobService,
|
||||
|
@ -514,8 +413,6 @@ class ContainerImageSaveThread(threading.Thread):
|
|||
|
||||
def _pull_and_save(self) -> None:
|
||||
"""Thread main logic for pulling and saving a container image"""
|
||||
file = None
|
||||
resource_hash = compute_resource_hash(self.resource)
|
||||
grtype, image = get_container_image_name_from_resource(self.resource)
|
||||
_record_perf('pull-start', 'grtype={},img={}'.format(grtype, image))
|
||||
start = datetime.datetime.now()
|
||||
|
@ -553,137 +450,22 @@ class ContainerImageSaveThread(threading.Thread):
|
|||
# register service
|
||||
_merge_service(
|
||||
self.table_client, self.resource, self.nglobalresources)
|
||||
# save image to seed to torrent
|
||||
if _ENABLE_P2P:
|
||||
# get image size
|
||||
try:
|
||||
if grtype == 'docker':
|
||||
output = subprocess.check_output(
|
||||
'docker images {}'.format(image), shell=True)
|
||||
size = ' '.join(output.decode('utf-8').split()[-2:])
|
||||
elif grtype == 'singularity':
|
||||
imgpath = singularity_image_path_on_disk(image)
|
||||
size = imgpath.stat().st_size
|
||||
_record_perf(
|
||||
'pull-end', 'grtype={},img={},diff={},size={}'.format(
|
||||
grtype, image, diff, size))
|
||||
except subprocess.CalledProcessError as ex:
|
||||
logger.exception(ex)
|
||||
_record_perf('pull-end', 'grtype={},img={},diff={}'.format(
|
||||
grtype, image, diff))
|
||||
_record_perf('save-start', 'grtype={},img={}'.format(
|
||||
grtype, image))
|
||||
start = datetime.datetime.now()
|
||||
if _COMPRESSION:
|
||||
# need to create reproducible compressed tarballs
|
||||
# 1. untar image save file
|
||||
# 2. re-tar files sorted by name and set mtime/user/group
|
||||
# to known values
|
||||
# 3. fast compress with parallel gzip ignoring certain file
|
||||
# properties
|
||||
# 4. remove temporary directory
|
||||
tmpdir = _TORRENT_DIR / '{}-tmp'.format(resource_hash)
|
||||
tmpdir.mkdir(parents=True, exist_ok=True)
|
||||
file = _TORRENT_DIR / '{}.{}'.format(
|
||||
resource_hash, _SAVELOAD_FILE_EXTENSION)
|
||||
logger.info('saving {} image {} to {} for seeding'.format(
|
||||
grtype, image, file))
|
||||
if grtype == 'docker':
|
||||
subprocess.check_call(
|
||||
('(docker save {} | tar -xf -) '
|
||||
'&& (tar --sort=name --mtime=\'1970-01-01\' '
|
||||
'--owner=0 --group=0 -cf - . '
|
||||
'| pigz --fast -n -T -c > {})').format(image, file),
|
||||
cwd=str(tmpdir), shell=True)
|
||||
elif grtype == 'singularity':
|
||||
subprocess.check_call(
|
||||
('(singularity image.export {} | tar -xf -) '
|
||||
'&& (tar --sort=name --mtime=\'1970-01-01\' '
|
||||
'--owner=0 --group=0 -cf - . '
|
||||
'| pigz --fast -n -T -c > {})').format(image, file),
|
||||
cwd=str(tmpdir), shell=True)
|
||||
shutil.rmtree(str(tmpdir), ignore_errors=True)
|
||||
del tmpdir
|
||||
fsize = file.stat().st_size
|
||||
else:
|
||||
# tarball generated by image save is not reproducible
|
||||
# we need to untar it and torrent the contents instead
|
||||
file = _TORRENT_DIR / '{}'.format(resource_hash)
|
||||
file.mkdir(parents=True, exist_ok=True)
|
||||
logger.info('saving {} image {} to {} for seeding'.format(
|
||||
grtype, image, file))
|
||||
if grtype == 'docker':
|
||||
subprocess.check_call(
|
||||
'docker save {} | tar -xf -'.format(image),
|
||||
cwd=str(file), shell=True)
|
||||
elif grtype == 'singularity':
|
||||
subprocess.check_call(
|
||||
'singularity image.export {} | tar -xf -'.format(
|
||||
image),
|
||||
cwd=str(file), shell=True)
|
||||
fsize = 0
|
||||
for entry in scantree(str(file)):
|
||||
if entry.is_file(follow_symlinks=False):
|
||||
fsize += entry.stat().st_size
|
||||
diff = (datetime.datetime.now() - start).total_seconds()
|
||||
logger.debug('took {} sec to save {} image {} to {}'.format(
|
||||
diff, grtype, image, file))
|
||||
_record_perf('save-end', 'grtype={},img={},size={},diff={}'.format(
|
||||
grtype, image, fsize, diff))
|
||||
# generate torrent file
|
||||
start = datetime.datetime.now()
|
||||
torrent_file, torrent_sha1 = generate_torrent(file, resource_hash)
|
||||
# check if blob exists and is non-zero length prior to uploading
|
||||
try:
|
||||
_bp = self.blob_client.get_blob_properties(
|
||||
_STORAGE_CONTAINERS['blob_torrents'],
|
||||
str(torrent_file.name))
|
||||
if _bp.properties.content_length == 0:
|
||||
raise ValueError()
|
||||
except Exception:
|
||||
self.blob_client.create_blob_from_path(
|
||||
_STORAGE_CONTAINERS['blob_torrents'],
|
||||
str(torrent_file.name), str(torrent_file))
|
||||
diff = (datetime.datetime.now() - start).total_seconds()
|
||||
logger.debug(
|
||||
'took {} sec to generate and upload torrent file: {}'.format(
|
||||
diff, torrent_file))
|
||||
start = datetime.datetime.now()
|
||||
# add to torrent dict (effectively enqueues for torrent start)
|
||||
entity = {
|
||||
'PartitionKey': _PARTITION_KEY,
|
||||
'RowKey': resource_hash,
|
||||
'Resource': self.resource,
|
||||
'TorrentFileLocator': '{},{}'.format(
|
||||
_STORAGE_CONTAINERS['blob_torrents'],
|
||||
str(torrent_file.name)),
|
||||
'TorrentFileSHA1': torrent_sha1,
|
||||
'TorrentIsDir': file.is_dir(),
|
||||
'TorrentContentSizeBytes': fsize,
|
||||
}
|
||||
with _PT_LOCK:
|
||||
_PENDING_TORRENTS[self.resource] = {
|
||||
'entity': entity,
|
||||
'torrent_file': torrent_file,
|
||||
'started': False,
|
||||
'seed': True,
|
||||
'loaded': True,
|
||||
'loading': False,
|
||||
'registered': True,
|
||||
}
|
||||
_TORRENT_REVERSE_LOOKUP[resource_hash] = self.resource
|
||||
# wait until torrent has started
|
||||
logger.info(
|
||||
'waiting for torrent {} to start'.format(self.resource))
|
||||
while (self.resource not in _TORRENTS or
|
||||
not _TORRENTS[self.resource]['started']):
|
||||
time.sleep(0.1)
|
||||
diff = (datetime.datetime.now() - start).total_seconds()
|
||||
logger.debug('took {} sec for {} torrent to start'.format(
|
||||
diff, self.resource))
|
||||
else:
|
||||
# get image size
|
||||
try:
|
||||
if grtype == 'docker':
|
||||
output = subprocess.check_output(
|
||||
'docker images {}'.format(image), shell=True)
|
||||
size = ' '.join(output.decode('utf-8').split()[-2:])
|
||||
elif grtype == 'singularity':
|
||||
imgpath = singularity_image_path_on_disk(image)
|
||||
size = imgpath.stat().st_size
|
||||
_record_perf(
|
||||
'pull-end', 'grtype={},img={},diff={},size={}'.format(
|
||||
grtype, image, diff, size))
|
||||
except subprocess.CalledProcessError as ex:
|
||||
logger.exception(ex)
|
||||
_record_perf('pull-end', 'grtype={},img={},diff={}'.format(
|
||||
grtype, image, diff))
|
||||
|
||||
|
||||
async def _direct_download_resources_async(
|
||||
|
@ -702,8 +484,7 @@ async def _direct_download_resources_async(
|
|||
with _DIRECTDL_LOCK:
|
||||
if len(_DIRECTDL_DOWNLOADING) > _CONCURRENT_DOWNLOADS_ALLOWED:
|
||||
return
|
||||
# retrieve a resouorce from dl queue
|
||||
_start_torrent_list = []
|
||||
# retrieve a resource from dl queue
|
||||
_seen = set()
|
||||
while True:
|
||||
try:
|
||||
|
@ -716,17 +497,9 @@ async def _direct_download_resources_async(
|
|||
resource = None
|
||||
break
|
||||
_seen.add(resource)
|
||||
# check if torrent is available for resource
|
||||
with _DIRECTDL_LOCK:
|
||||
if resource not in _DIRECTDL_DOWNLOADING:
|
||||
if _ENABLE_P2P:
|
||||
nseeds = _get_torrent_num_seeds(table_client, resource)
|
||||
if nseeds >= _SEED_BIAS:
|
||||
_start_torrent_list.append(resource)
|
||||
resource = None
|
||||
break
|
||||
else:
|
||||
break
|
||||
break
|
||||
else:
|
||||
_DIRECTDL_QUEUE.put(resource)
|
||||
resource = None
|
||||
|
@ -760,10 +533,6 @@ async def _direct_download_resources_async(
|
|||
_CBHANDLES[resource] = loop.call_later(
|
||||
15, _renew_blob_lease, loop, blob_client, 'blob_globalresources',
|
||||
resource, blob_name)
|
||||
# start any torrents
|
||||
for resource in _start_torrent_list:
|
||||
_start_torrent_via_storage(blob_client, table_client, resource)
|
||||
del _start_torrent_list
|
||||
if resource is None:
|
||||
return
|
||||
# pull and save container image in thread
|
||||
|
@ -773,7 +542,7 @@ async def _direct_download_resources_async(
|
|||
thr.start()
|
||||
else:
|
||||
# TODO download via blob, explode uri to get container/blob
|
||||
# use download to path into /tmp and move to _TORRENT_DIR
|
||||
# use download to path into /tmp and move to directory
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
|
@ -854,237 +623,6 @@ def _merge_service(
|
|||
nglobalresources))
|
||||
|
||||
|
||||
def _log_torrent_info(resource: str, th) -> None:
|
||||
"""Log torrent info
|
||||
:param str resource: resource
|
||||
:param th: torrent handle
|
||||
"""
|
||||
global _LAST_DHT_INFO_DUMP
|
||||
s = th.status()
|
||||
if (s.download_rate > 0 or s.upload_rate > 0 or s.num_peers > 0 or
|
||||
(1.0 - s.progress) > 1e-6):
|
||||
logger.debug(
|
||||
('{name} {file} bytes={bytes} state={state} '
|
||||
'completion={completion:.2f}% peers={peers} '
|
||||
'down={down:.3f} kB/s up={up:.3f} kB/s'.format(
|
||||
name=_TORRENT_REVERSE_LOOKUP[th.name().split('.')[0]],
|
||||
file=th.name(), bytes=s.total_wanted,
|
||||
state=_TORRENT_STATE[s.state], completion=s.progress * 100,
|
||||
peers=s.num_peers, down=s.download_rate / 1000,
|
||||
up=s.upload_rate / 1000)))
|
||||
now = datetime.datetime.utcnow()
|
||||
if (_LAST_DHT_INFO_DUMP is None or
|
||||
now > _LAST_DHT_INFO_DUMP + datetime.timedelta(minutes=1)):
|
||||
_LAST_DHT_INFO_DUMP = now
|
||||
ss = _TORRENT_SESSION.status()
|
||||
logger.debug(
|
||||
('dht: running={} globalnodes={} nodes={} node_cache={} '
|
||||
'torrents={} incomingconn={} down={} up={}'.format(
|
||||
_TORRENT_SESSION.is_dht_running(), ss.dht_global_nodes,
|
||||
ss.dht_nodes, ss.dht_node_cache, ss.dht_torrents,
|
||||
ss.has_incoming_connections, ss.total_dht_download,
|
||||
ss.total_dht_upload)))
|
||||
|
||||
|
||||
def bootstrap_dht_nodes(
|
||||
loop: asyncio.BaseEventLoop,
|
||||
table_client: azuretable.TableService,
|
||||
ipaddress: str,
|
||||
num_attempts: int) -> None:
|
||||
"""Bootstrap DHT router nodes
|
||||
:param asyncio.BaseEventLoop loop: event loop
|
||||
:param azuretable.TableService table_client: table client
|
||||
:param str ipaddress: ip address
|
||||
:param int num_attempts: number of attempts
|
||||
"""
|
||||
found_self = False
|
||||
dht_nodes = []
|
||||
try:
|
||||
entities = table_client.query_entities(
|
||||
_STORAGE_CONTAINERS['table_dht'],
|
||||
filter='PartitionKey eq \'{}\''.format(_PARTITION_KEY))
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
pass
|
||||
else:
|
||||
for entity in entities:
|
||||
dht_nodes.append((entity['RowKey'], entity['Port']))
|
||||
if entity['RowKey'] == ipaddress:
|
||||
found_self = True
|
||||
if not found_self:
|
||||
entity = {
|
||||
'PartitionKey': _PARTITION_KEY,
|
||||
'RowKey': ipaddress,
|
||||
'Port': _DEFAULT_PORT_BEGIN,
|
||||
}
|
||||
table_client.insert_entity(_STORAGE_CONTAINERS['table_dht'], entity)
|
||||
dht_nodes.insert(0, (ipaddress, _DEFAULT_PORT_BEGIN))
|
||||
# TODO handle vm/ips no longer in pool
|
||||
for node in dht_nodes:
|
||||
if len(_DHT_ROUTERS) >= 3:
|
||||
break
|
||||
add_dht_node(node[0], node[1])
|
||||
# ensure at least 3 DHT router nodes if possible
|
||||
if len(dht_nodes) < 3:
|
||||
num_attempts += 1
|
||||
if num_attempts < 600:
|
||||
delay = 1
|
||||
elif num_attempts < 1200:
|
||||
delay = 10
|
||||
else:
|
||||
delay = 30
|
||||
loop.call_later(
|
||||
delay, bootstrap_dht_nodes, loop, table_client, ipaddress,
|
||||
num_attempts)
|
||||
|
||||
|
||||
class ContainerImageLoadThread(threading.Thread):
|
||||
"""Container Image Load Thread"""
|
||||
def __init__(self, resource):
|
||||
"""ContainerImageLoadThread ctor
|
||||
:param str resource: resource
|
||||
"""
|
||||
threading.Thread.__init__(self)
|
||||
self.resource = resource
|
||||
_TORRENTS[self.resource]['seed'] = True
|
||||
_TORRENTS[self.resource]['loading'] = True
|
||||
|
||||
def run(self) -> None:
|
||||
"""Main thread run logic"""
|
||||
try:
|
||||
self._load_image()
|
||||
except Exception as ex:
|
||||
logger.exception(ex)
|
||||
_THREAD_EXCEPTIONS.append(ex)
|
||||
|
||||
def _load_image(self) -> None:
|
||||
"""Load container image"""
|
||||
logger.debug('loading resource: {}'.format(self.resource))
|
||||
resource_hash = compute_resource_hash(self.resource)
|
||||
grtype, image = get_container_image_name_from_resource(self.resource)
|
||||
start = datetime.datetime.now()
|
||||
if _COMPRESSION:
|
||||
file = _TORRENT_DIR / '{}.{}'.format(
|
||||
resource_hash, _SAVELOAD_FILE_EXTENSION)
|
||||
logger.info('loading {} image {} from {}'.format(
|
||||
grtype, image, file))
|
||||
_record_perf('load-start', 'grtype={},img={},size={}'.format(
|
||||
grtype, image, file.stat().st_size))
|
||||
if grtype == 'docker':
|
||||
subprocess.check_call(
|
||||
'pigz -cd {} | docker load'.format(file), shell=True)
|
||||
elif grtype == 'singularity':
|
||||
imgpath = singularity_image_path_on_disk(image)
|
||||
subprocess.check_call(
|
||||
'pigz -cd {} | singularity image.import {}'.format(
|
||||
file, imgpath), shell=True)
|
||||
else:
|
||||
file = _TORRENT_DIR / '{}'.format(resource_hash)
|
||||
logger.info('loading {} image {} from {}'.format(
|
||||
grtype, image, file))
|
||||
_record_perf('load-start', 'grtype={},img={}'.format(
|
||||
grtype, image))
|
||||
if grtype == 'docker':
|
||||
subprocess.check_call(
|
||||
'tar -cO . | docker load', cwd=str(file), shell=True)
|
||||
elif grtype == 'singularity':
|
||||
imgpath = singularity_image_path_on_disk(image)
|
||||
subprocess.check_call(
|
||||
'tar -cO . | singularity image.import {}', cwd=str(
|
||||
file, imgpath), shell=True)
|
||||
diff = (datetime.datetime.now() - start).total_seconds()
|
||||
logger.debug(
|
||||
'took {} sec to load {} image from {}'.format(diff, grtype, file))
|
||||
_record_perf('load-end', 'grtype={},img={},diff={}'.format(
|
||||
grtype, image, diff))
|
||||
_TORRENTS[self.resource]['loading'] = False
|
||||
_TORRENTS[self.resource]['loaded'] = True
|
||||
|
||||
|
||||
async def _load_and_register_async(
|
||||
loop: asyncio.BaseEventLoop,
|
||||
table_client: azuretable.TableService,
|
||||
nglobalresources: int) -> None:
|
||||
"""Load and register image
|
||||
:param asyncio.BaseEventLoop loop: event loop
|
||||
:param azuretable.TableService table_client: table client
|
||||
:param int nglobalresource: number of global resources
|
||||
"""
|
||||
global _LR_LOCK_ASYNC
|
||||
async with _LR_LOCK_ASYNC:
|
||||
for resource in _TORRENTS:
|
||||
# if torrent is seeding, load container/file and register
|
||||
if (_TORRENTS[resource]['started'] and
|
||||
_TORRENTS[resource]['handle'].is_seed()):
|
||||
if (not _TORRENTS[resource]['loaded'] and
|
||||
not _TORRENTS[resource]['loading']):
|
||||
# container load image
|
||||
if is_container_resource(resource):
|
||||
thr = ContainerImageLoadThread(resource)
|
||||
thr.start()
|
||||
else:
|
||||
# TODO "load blob" - move to appropriate path
|
||||
raise NotImplementedError()
|
||||
# register to services table
|
||||
if (not _TORRENTS[resource]['registered'] and
|
||||
_TORRENTS[resource]['loaded'] and
|
||||
not _TORRENTS[resource]['loading']):
|
||||
_merge_service(
|
||||
table_client, resource, nglobalresources)
|
||||
_TORRENTS[resource]['registered'] = True
|
||||
|
||||
|
||||
async def manage_torrents_async(
|
||||
loop: asyncio.BaseEventLoop,
|
||||
table_client: azuretable.TableService,
|
||||
ipaddress: str, nglobalresources: int) -> None:
|
||||
"""Manage torrents
|
||||
:param asyncio.BaseEventLoop loop: event loop
|
||||
:param azuretable.TableService table_client: table client
|
||||
:param str ipaddress: ip address
|
||||
:param int nglobalresource: number of global resources
|
||||
"""
|
||||
global _LR_LOCK_ASYNC, _GR_DONE
|
||||
while True:
|
||||
# async schedule load and register
|
||||
if not _GR_DONE and not _LR_LOCK_ASYNC.locked():
|
||||
asyncio.ensure_future(_load_and_register_async(
|
||||
loop, table_client, nglobalresources))
|
||||
# move pending torrents into torrents
|
||||
with _PT_LOCK:
|
||||
for pt in _PENDING_TORRENTS:
|
||||
_TORRENTS[pt] = _PENDING_TORRENTS[pt]
|
||||
_PENDING_TORRENTS.clear()
|
||||
# start applicable torrent sessions
|
||||
for resource in _TORRENTS:
|
||||
if _TORRENTS[resource]['started']:
|
||||
# log torrent info
|
||||
_log_torrent_info(resource, _TORRENTS[resource]['handle'])
|
||||
continue
|
||||
seed = _TORRENTS[resource]['seed']
|
||||
logger.info(
|
||||
('creating torrent session for {} ipaddress={} '
|
||||
'seed={}').format(resource, ipaddress, seed))
|
||||
grtype, image = get_container_image_name_from_resource(resource)
|
||||
_TORRENTS[resource]['handle'] = create_torrent_session(
|
||||
resource, _TORRENT_DIR, seed)
|
||||
await _record_perf_async(
|
||||
loop, 'torrent-start', 'grtype={},img={}'.format(
|
||||
grtype, image))
|
||||
del image
|
||||
# insert torrent into torrentinfo table
|
||||
try:
|
||||
table_client.insert_entity(
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'],
|
||||
entity=_TORRENTS[resource]['entity'])
|
||||
except azure.common.AzureConflictHttpError:
|
||||
pass
|
||||
# mark torrent as started
|
||||
if not _TORRENTS[resource]['started']:
|
||||
_TORRENTS[resource]['started'] = True
|
||||
# sleep to avoid pinning cpu
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def download_monitor_async(
|
||||
loop: asyncio.BaseEventLoop,
|
||||
blob_client: azureblob.BlockBlobService,
|
||||
|
@ -1097,13 +635,7 @@ async def download_monitor_async(
|
|||
:param str ipaddress: ip address
|
||||
:param int nglobalresource: number of global resources
|
||||
"""
|
||||
# begin async manage torrent sessions
|
||||
if _ENABLE_P2P:
|
||||
asyncio.ensure_future(
|
||||
manage_torrents_async(
|
||||
loop, table_client, ipaddress, nglobalresources)
|
||||
)
|
||||
while True:
|
||||
while not _GR_DONE:
|
||||
# check if there are any direct downloads
|
||||
if _DIRECTDL_QUEUE.qsize() > 0:
|
||||
await _direct_download_resources_async(
|
||||
|
@ -1113,126 +645,24 @@ async def download_monitor_async(
|
|||
logger.critical('Thread exceptions encountered, terminating')
|
||||
# raise first exception
|
||||
raise _THREAD_EXCEPTIONS[0]
|
||||
# fixup filemodes/ownership for singularity images
|
||||
if (_GR_DONE and _SINGULARITY_CACHE_DIR is not None and
|
||||
_AZBATCH_USER is not None):
|
||||
if _SINGULARITY_CACHE_DIR.exists():
|
||||
logger.info('chown all files in {}'.format(
|
||||
_SINGULARITY_CACHE_DIR))
|
||||
for file in scantree(str(_SINGULARITY_CACHE_DIR)):
|
||||
os.chown(
|
||||
str(file.path),
|
||||
_AZBATCH_USER[2],
|
||||
_AZBATCH_USER[3]
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
'singularity cache dir {} does not exist'.format(
|
||||
_SINGULARITY_CACHE_DIR))
|
||||
# if not in peer-to-peer mode, allow exit
|
||||
if not _ENABLE_P2P and _GR_DONE:
|
||||
break
|
||||
# sleep to avoid pinning cpu
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
def _get_torrent_num_seeds(
|
||||
table_client: azuretable.TableService,
|
||||
resource: str) -> int:
|
||||
"""Get number of torrent seeders via table (for first VmList prop)
|
||||
:param azuretable.TableService table_client: table client
|
||||
:param int nglobalresource: number of global resources
|
||||
:rtype: int
|
||||
:return: number of seeds
|
||||
"""
|
||||
numseeds = 0
|
||||
try:
|
||||
se = table_client.get_entity(
|
||||
_STORAGE_CONTAINERS['table_images'],
|
||||
_PARTITION_KEY, compute_resource_hash(resource))
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
numseeds = len(se['VmList0'].split(','))
|
||||
except KeyError:
|
||||
pass
|
||||
return numseeds
|
||||
|
||||
|
||||
def _start_torrent_via_storage(
|
||||
blob_client: azureblob.BlockBlobService,
|
||||
table_client: azuretable.TableService,
|
||||
resource: str, entity: dict = None) -> None:
|
||||
"""Start a torrent via storage entity
|
||||
:param azureblob.BlockBlobService blob_client: blob client
|
||||
:param azuretable.TableService table_client: table client
|
||||
:param str resource: resource
|
||||
:param dict entity: entity
|
||||
"""
|
||||
if not _ENABLE_P2P:
|
||||
return
|
||||
if entity is None:
|
||||
rk = compute_resource_hash(resource)
|
||||
# entity may not be populated yet, keep trying until ready
|
||||
while True:
|
||||
try:
|
||||
entity = table_client.get_entity(
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'],
|
||||
_PARTITION_KEY, rk)
|
||||
break
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
time.sleep(1)
|
||||
# retrive torrent file
|
||||
torrent_file = _TORRENT_DIR / '{}.torrent'.format(entity['RowKey'])
|
||||
tc, tp = entity['TorrentFileLocator'].split(',')
|
||||
blob_client.get_blob_to_path(tc, tp, str(torrent_file))
|
||||
# add to pending torrents
|
||||
with _PT_LOCK:
|
||||
_PENDING_TORRENTS[resource] = {
|
||||
'entity': entity,
|
||||
'torrent_file': torrent_file,
|
||||
'started': False,
|
||||
'seed': False,
|
||||
'loaded': False,
|
||||
'loading': False,
|
||||
'registered': False,
|
||||
}
|
||||
_TORRENT_REVERSE_LOOKUP[entity['RowKey']] = resource
|
||||
|
||||
|
||||
def _check_resource_has_torrent(
|
||||
blob_client: azureblob.BlockBlobService,
|
||||
table_client: azuretable.TableService,
|
||||
resource: str) -> bool:
|
||||
"""Check if a resource has an associated torrent
|
||||
:param azureblob.BlockBlobService blob_client: blob client
|
||||
:param azuretable.TableService table_client: table client
|
||||
:param str resource: resource
|
||||
:rtype: bool
|
||||
:return: if resource has torrent
|
||||
"""
|
||||
if not _ENABLE_P2P:
|
||||
return False
|
||||
add_to_dict = False
|
||||
try:
|
||||
entity = table_client.get_entity(
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'],
|
||||
_PARTITION_KEY, compute_resource_hash(resource))
|
||||
numseeds = _get_torrent_num_seeds(table_client, resource)
|
||||
if numseeds < _SEED_BIAS:
|
||||
add_to_dict = True
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
add_to_dict = True
|
||||
if add_to_dict:
|
||||
logger.info('adding {} as resource to download'.format(resource))
|
||||
_DIRECTDL_QUEUE.put(resource)
|
||||
return False
|
||||
else:
|
||||
logger.info('found torrent for resource {}'.format(resource))
|
||||
_start_torrent_via_storage(
|
||||
blob_client, table_client, resource, entity)
|
||||
return True
|
||||
# fixup filemodes/ownership for singularity images
|
||||
if (_SINGULARITY_CACHE_DIR is not None and
|
||||
_AZBATCH_USER is not None):
|
||||
if _SINGULARITY_CACHE_DIR.exists():
|
||||
logger.info('chown all files in {}'.format(
|
||||
_SINGULARITY_CACHE_DIR))
|
||||
for file in scantree(str(_SINGULARITY_CACHE_DIR)):
|
||||
os.chown(
|
||||
str(file.path),
|
||||
_AZBATCH_USER[2],
|
||||
_AZBATCH_USER[3]
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
'singularity cache dir {} does not exist'.format(
|
||||
_SINGULARITY_CACHE_DIR))
|
||||
|
||||
|
||||
def distribute_global_resources(
|
||||
|
@ -1246,37 +676,24 @@ def distribute_global_resources(
|
|||
:param azuretable.TableService table_client: table client
|
||||
:param str ipaddress: ip address
|
||||
"""
|
||||
# set torrent session port listen
|
||||
if _ENABLE_P2P:
|
||||
global _TORRENT_SESSION
|
||||
# create torrent session
|
||||
logger.info('creating torrent session on {}:{}'.format(
|
||||
ipaddress, _DEFAULT_PORT_BEGIN))
|
||||
_TORRENT_SESSION = libtorrent.session()
|
||||
_TORRENT_SESSION.listen_on(_DEFAULT_PORT_BEGIN, _DEFAULT_PORT_END)
|
||||
_TORRENT_SESSION.stop_lsd()
|
||||
_TORRENT_SESSION.stop_upnp()
|
||||
_TORRENT_SESSION.stop_natpmp()
|
||||
# bootstrap dht nodes
|
||||
bootstrap_dht_nodes(loop, table_client, ipaddress, 0)
|
||||
_TORRENT_SESSION.start_dht()
|
||||
# get globalresources from table
|
||||
try:
|
||||
entities = table_client.query_entities(
|
||||
_STORAGE_CONTAINERS['table_globalresources'],
|
||||
filter='PartitionKey eq \'{}\''.format(_PARTITION_KEY))
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
entities = None
|
||||
entities = []
|
||||
nentities = 0
|
||||
# check torrent info table for resource
|
||||
if entities is not None:
|
||||
for ent in entities:
|
||||
for ent in entities:
|
||||
resource = ent['Resource']
|
||||
grtype, _ = get_container_image_name_from_resource(resource)
|
||||
if grtype == _CONTAINER_MODE.name.lower():
|
||||
nentities += 1
|
||||
if _ENABLE_P2P:
|
||||
_check_resource_has_torrent(
|
||||
blob_client, table_client, ent['Resource'])
|
||||
else:
|
||||
_DIRECTDL_QUEUE.put(ent['Resource'])
|
||||
_DIRECTDL_QUEUE.put(resource)
|
||||
else:
|
||||
logger.info('skipping resource {}:'.format(resource) +
|
||||
'not matching container mode "{}"'
|
||||
.format(_CONTAINER_MODE.name.lower()))
|
||||
if nentities == 0:
|
||||
logger.info('no global resources specified')
|
||||
return
|
||||
|
@ -1303,29 +720,24 @@ async def _get_ipaddress_async(loop: asyncio.BaseEventLoop) -> str:
|
|||
|
||||
def main():
|
||||
"""Main function"""
|
||||
global _ENABLE_P2P, _CONCURRENT_DOWNLOADS_ALLOWED, _POOL_ID
|
||||
# get command-line args
|
||||
args = parseargs()
|
||||
p2popts = args.p2popts.split(':')
|
||||
_ENABLE_P2P = p2popts[0] == 'true'
|
||||
_CONCURRENT_DOWNLOADS_ALLOWED = int(p2popts[1])
|
||||
|
||||
global _CONCURRENT_DOWNLOADS_ALLOWED, _CONTAINER_MODE
|
||||
|
||||
# set up concurrent source downloads
|
||||
if args.concurrent is None:
|
||||
raise ValueError('concurrent source downloads is not specified')
|
||||
try:
|
||||
_CONCURRENT_DOWNLOADS_ALLOWED = int(args.concurrent)
|
||||
except ValueError:
|
||||
_CONCURRENT_DOWNLOADS_ALLOWED = None
|
||||
if (_CONCURRENT_DOWNLOADS_ALLOWED is None or
|
||||
_CONCURRENT_DOWNLOADS_ALLOWED <= 0):
|
||||
raise ValueError('concurrent source downloads is invalid: {}'
|
||||
.format(args.concurrent))
|
||||
logger.info('max concurrent downloads: {}'.format(
|
||||
_CONCURRENT_DOWNLOADS_ALLOWED))
|
||||
# set p2p options
|
||||
if _ENABLE_P2P:
|
||||
if not _LIBTORRENT_IMPORTED:
|
||||
raise ImportError('No module named \'libtorrent\'')
|
||||
global _COMPRESSION, _SEED_BIAS, _SAVELOAD_FILE_EXTENSION
|
||||
_COMPRESSION = p2popts[3] == 'true'
|
||||
_SEED_BIAS = int(p2popts[2])
|
||||
if not _COMPRESSION:
|
||||
_SAVELOAD_FILE_EXTENSION = 'tar'
|
||||
logger.info('peer-to-peer options: compression={} seedbias={}'.format(
|
||||
_COMPRESSION, _SEED_BIAS))
|
||||
# create torrent directory
|
||||
logger.debug('creating torrent dir: {}'.format(_TORRENT_DIR))
|
||||
_TORRENT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
del p2popts
|
||||
|
||||
# get event loop
|
||||
if _ON_WINDOWS:
|
||||
|
@ -1342,6 +754,17 @@ def main():
|
|||
ipaddress = args.ipaddress
|
||||
logger.debug('ip address: {}'.format(ipaddress))
|
||||
|
||||
# set up container mode
|
||||
if args.mode is None:
|
||||
raise ValueError('container mode is not specified')
|
||||
if args.mode == 'docker':
|
||||
_CONTAINER_MODE = ContainerMode.DOCKER
|
||||
elif args.mode == 'singularity':
|
||||
_CONTAINER_MODE = ContainerMode.SINGULARITY
|
||||
else:
|
||||
raise ValueError('container mode is invalid: {}'.format(args.mode))
|
||||
logger.info('container mode: {}'.format(_CONTAINER_MODE))
|
||||
|
||||
# set up storage names
|
||||
_setup_storage_names(args.prefix)
|
||||
del args
|
||||
|
@ -1359,14 +782,15 @@ def parseargs():
|
|||
:return: parsed arguments
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Cascade: Batch Shipyard P2P File/Image Replicator')
|
||||
parser.set_defaults(ipaddress=None)
|
||||
description='Cascade: Batch Shipyard File/Image Replicator')
|
||||
parser.set_defaults(concurrent=None, ipaddress=None, mode=None)
|
||||
parser.add_argument(
|
||||
'p2popts',
|
||||
help='peer to peer options [enabled:non-p2p concurrent '
|
||||
'downloading:seed bias:compression]')
|
||||
'--concurrent',
|
||||
help='concurrent source downloads')
|
||||
parser.add_argument(
|
||||
'--ipaddress', help='ip address')
|
||||
parser.add_argument(
|
||||
'--mode', help='container mode (docker/singularity)')
|
||||
parser.add_argument(
|
||||
'--prefix', help='storage container prefix')
|
||||
return parser.parse_args()
|
||||
|
|
|
@ -35,4 +35,4 @@ fi
|
|||
|
||||
# execute cascade
|
||||
# shellcheck disable=SC2086
|
||||
python3 cascade.py "$p2p" --ipaddress "$ipaddress" ${prefix}
|
||||
python3 cascade.py --concurrent "$concurrent_source_downloads" --mode "docker" --ipaddress "$ipaddress" ${prefix}
|
||||
|
|
|
@ -16,10 +16,6 @@ batch_shipyard:
|
|||
delay_docker_image_preload: false
|
||||
data_replication:
|
||||
concurrent_source_downloads: null
|
||||
peer_to_peer:
|
||||
enabled: false
|
||||
compression: true
|
||||
direct_download_seed_bias: null
|
||||
global_resources:
|
||||
additional_registries:
|
||||
docker:
|
||||
|
|
|
@ -1201,13 +1201,8 @@ def _construct_pool_object(
|
|||
block_for_gr_docker = ''
|
||||
block_for_gr = '{}#{}'.format(
|
||||
block_for_gr_docker, block_for_gr_singularity)
|
||||
# data replication and peer-to-peer settings
|
||||
# data replication settings
|
||||
dr = settings.data_replication_settings(config)
|
||||
# create torrent flags
|
||||
torrentflags = '{}:{}:{}:{}'.format(
|
||||
dr.peer_to_peer.enabled, dr.concurrent_source_downloads,
|
||||
dr.peer_to_peer.direct_download_seed_bias,
|
||||
dr.peer_to_peer.compression)
|
||||
# create resource files list
|
||||
if is_windows:
|
||||
_rflist = [_REGISTRY_LOGIN_WINDOWS_FILE, _BLOBXFER_WINDOWS_FILE]
|
||||
|
@ -1392,7 +1387,7 @@ def _construct_pool_object(
|
|||
bs.storage_entity_prefix) else '',
|
||||
q=' -q' if pool_settings.batch_insights_enabled else '',
|
||||
r=' -r' if pool_settings.ssh.allow_docker_access else '',
|
||||
s=' -s {}'.format(torrentflags),
|
||||
s=' -s {}'.format(dr.concurrent_source_downloads),
|
||||
t=' -t' if settings.can_tune_tcp(
|
||||
pool_settings.vm_size) else '',
|
||||
u=' -u' if util.is_not_empty(custom_image_na) else '',
|
||||
|
@ -2130,15 +2125,7 @@ def _update_container_images(
|
|||
:param str singularity_image: singularity image to update
|
||||
:param bool force_ssh: force update over SSH
|
||||
"""
|
||||
# first check that peer-to-peer is disabled for pool
|
||||
pool_id = settings.pool_id(config)
|
||||
try:
|
||||
if settings.data_replication_settings(config).peer_to_peer.enabled:
|
||||
raise RuntimeError(
|
||||
'cannot update container images for a pool with peer-to-peer '
|
||||
'image distribution')
|
||||
except KeyError:
|
||||
pass
|
||||
native = settings.is_native_docker_pool(config)
|
||||
if native and not force_ssh:
|
||||
logger.debug('forcing update via SSH due to native mode')
|
||||
|
@ -2481,7 +2468,6 @@ def _adjust_settings_for_pool_creation(config):
|
|||
publisher, offer, sku))
|
||||
# re-read pool and data replication settings
|
||||
pool = settings.pool_settings(config)
|
||||
dr = settings.data_replication_settings(config)
|
||||
native = settings.is_native_docker_pool(
|
||||
config, vm_config=pool.vm_configuration)
|
||||
# ensure singularity images are not specified for native pools
|
||||
|
@ -2491,18 +2477,6 @@ def _adjust_settings_for_pool_creation(config):
|
|||
raise ValueError(
|
||||
'cannot specify a native container pool with Singularity '
|
||||
'images as global resources')
|
||||
# ensure settings p2p/as/internode settings are compatible
|
||||
if dr.peer_to_peer.enabled:
|
||||
if native and not bs.delay_docker_image_preload:
|
||||
raise ValueError(
|
||||
'cannot enable peer-to-peer and native container pools')
|
||||
if settings.is_pool_autoscale_enabled(config, pas=pool.autoscale):
|
||||
raise ValueError('cannot enable peer-to-peer and autoscale')
|
||||
if pool.inter_node_communication_enabled:
|
||||
logger.warning(
|
||||
'force enabling inter-node communication due to peer-to-peer '
|
||||
'transfer')
|
||||
settings.set_inter_node_communication_enabled(config, True)
|
||||
# hpn-ssh can only be used for Ubuntu currently
|
||||
try:
|
||||
if (pool.ssh.hpn_server_swap and
|
||||
|
@ -2543,9 +2517,6 @@ def _adjust_settings_for_pool_creation(config):
|
|||
'glusterfs on compute cannot be installed on an '
|
||||
'autoscale-enabled pool')
|
||||
if not pool.inter_node_communication_enabled:
|
||||
# do not modify value and proceed since this interplays
|
||||
# with p2p settings, simply raise exception and force
|
||||
# user to reconfigure
|
||||
raise ValueError(
|
||||
'inter node communication in pool configuration '
|
||||
'must be enabled for glusterfs on compute')
|
||||
|
|
|
@ -275,12 +275,7 @@ BatchShipyardSettings = collections.namedtuple(
|
|||
)
|
||||
DataReplicationSettings = collections.namedtuple(
|
||||
'DataReplicationSettings', [
|
||||
'peer_to_peer', 'concurrent_source_downloads',
|
||||
]
|
||||
)
|
||||
PeerToPeerSettings = collections.namedtuple(
|
||||
'PeerToPeerSettings', [
|
||||
'enabled', 'compression', 'direct_download_seed_bias',
|
||||
'concurrent_source_downloads',
|
||||
]
|
||||
)
|
||||
SourceSettings = collections.namedtuple(
|
||||
|
@ -2304,52 +2299,11 @@ def data_replication_settings(config):
|
|||
raise KeyError()
|
||||
except KeyError:
|
||||
concurrent_source_downloads = 10
|
||||
try:
|
||||
conf = config['data_replication']['peer_to_peer']
|
||||
except KeyError:
|
||||
conf = {}
|
||||
try:
|
||||
p2p_enabled = conf['enabled']
|
||||
except KeyError:
|
||||
p2p_enabled = False
|
||||
try:
|
||||
p2p_compression = conf['compression']
|
||||
except KeyError:
|
||||
p2p_compression = True
|
||||
pool_vm_count = _pool_vm_count(config)
|
||||
total_vm_count = pool_vm_count.dedicated + pool_vm_count.low_priority
|
||||
try:
|
||||
p2p_direct_download_seed_bias = conf['direct_download_seed_bias']
|
||||
if (p2p_direct_download_seed_bias is None or
|
||||
p2p_direct_download_seed_bias < 1):
|
||||
raise KeyError()
|
||||
except KeyError:
|
||||
p2p_direct_download_seed_bias = total_vm_count // 10
|
||||
if p2p_direct_download_seed_bias < 1:
|
||||
p2p_direct_download_seed_bias = 1
|
||||
return DataReplicationSettings(
|
||||
peer_to_peer=PeerToPeerSettings(
|
||||
enabled=p2p_enabled,
|
||||
compression=p2p_compression,
|
||||
direct_download_seed_bias=p2p_direct_download_seed_bias
|
||||
),
|
||||
concurrent_source_downloads=concurrent_source_downloads,
|
||||
)
|
||||
|
||||
|
||||
def set_peer_to_peer_enabled(config, flag):
|
||||
# type: (dict, bool) -> None
|
||||
"""Set peer to peer enabled setting
|
||||
:param dict config: configuration object
|
||||
:param bool flag: flag to set
|
||||
"""
|
||||
if 'data_replication' not in config:
|
||||
config['data_replication'] = {}
|
||||
if 'peer_to_peer' not in config['data_replication']:
|
||||
config['data_replication']['peer_to_peer'] = {}
|
||||
config['data_replication']['peer_to_peer']['enabled'] = flag
|
||||
|
||||
|
||||
def global_resources_docker_images(config):
|
||||
# type: (dict) -> list
|
||||
"""Get list of docker images
|
||||
|
|
|
@ -68,13 +68,11 @@ _STORAGEACCOUNTEP = None
|
|||
_STORAGE_CONTAINERS = {
|
||||
'blob_globalresources': None,
|
||||
'blob_resourcefiles': None,
|
||||
'blob_torrents': None,
|
||||
'blob_remotefs': None,
|
||||
'blob_monitoring': None,
|
||||
'blob_federation_global': None,
|
||||
'blob_federation': None,
|
||||
'table_dht': None,
|
||||
'table_torrentinfo': None,
|
||||
'table_images': None,
|
||||
'table_globalresources': None,
|
||||
'table_perf': None,
|
||||
|
@ -85,6 +83,8 @@ _STORAGE_CONTAINERS = {
|
|||
'queue_federation': None,
|
||||
# TODO remove following in future release
|
||||
'table_registry': None,
|
||||
'blob_torrents': None,
|
||||
'table_torrentinfo': None,
|
||||
}
|
||||
_CONTAINERS_CREATED = set()
|
||||
|
||||
|
@ -107,14 +107,11 @@ def set_storage_configuration(sep, postfix, sa, sakey, saep, sasexpiry):
|
|||
(sep + 'gr', postfix))
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'] = '-'.join(
|
||||
(sep + 'rf', postfix))
|
||||
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
|
||||
(sep + 'tor', postfix))
|
||||
_STORAGE_CONTAINERS['blob_remotefs'] = sep + 'remotefs'
|
||||
_STORAGE_CONTAINERS['blob_monitoring'] = sep + 'monitor'
|
||||
_STORAGE_CONTAINERS['blob_federation'] = sep + 'fed'
|
||||
_STORAGE_CONTAINERS['blob_federation_global'] = sep + 'fedglobal'
|
||||
_STORAGE_CONTAINERS['table_dht'] = sep + 'dht'
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
|
||||
_STORAGE_CONTAINERS['table_images'] = sep + 'images'
|
||||
_STORAGE_CONTAINERS['table_globalresources'] = sep + 'gr'
|
||||
_STORAGE_CONTAINERS['table_perf'] = sep + 'perf'
|
||||
|
@ -125,6 +122,9 @@ def set_storage_configuration(sep, postfix, sa, sakey, saep, sasexpiry):
|
|||
_STORAGE_CONTAINERS['queue_federation'] = sep + 'fed'
|
||||
# TODO remove following containers in future release
|
||||
_STORAGE_CONTAINERS['table_registry'] = sep + 'registry'
|
||||
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
|
||||
(sep + 'tor', postfix))
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
|
||||
# ensure all storage containers are between 3 and 63 chars in length
|
||||
for key in _STORAGE_CONTAINERS:
|
||||
length = len(_STORAGE_CONTAINERS[key])
|
||||
|
@ -1972,10 +1972,15 @@ def delete_storage_containers(
|
|||
:param bool skip_tables: skip deleting tables
|
||||
"""
|
||||
for key in _STORAGE_CONTAINERS:
|
||||
if key == 'table_registry':
|
||||
if key == 'table_registry' or key == 'table_torrentinfo':
|
||||
# TODO remove in future release: unused table
|
||||
logger.debug('deleting table: {}'.format(_STORAGE_CONTAINERS[key]))
|
||||
table_client.delete_table(_STORAGE_CONTAINERS[key])
|
||||
elif key == 'blob_torrents':
|
||||
# TODO remove in future release: unused container
|
||||
logger.debug('deleting container: {}'
|
||||
.format(_STORAGE_CONTAINERS[key]))
|
||||
blob_client.delete_container(_STORAGE_CONTAINERS[key])
|
||||
elif key.startswith('blob_'):
|
||||
if (key == 'blob_remotefs' or key == 'blob_monitoring' or
|
||||
key == 'blob_federation' or
|
||||
|
@ -2065,6 +2070,9 @@ def clear_storage_containers(
|
|||
"""
|
||||
bs = settings.batch_shipyard_settings(config)
|
||||
for key in _STORAGE_CONTAINERS:
|
||||
# TODO remove in a future release: unused container
|
||||
if key == 'blob_torrents':
|
||||
continue
|
||||
if not tables_only and key.startswith('blob_'):
|
||||
if (key == 'blob_remotefs' or key == 'blob_monitoring' or
|
||||
key == 'blob_federation' or
|
||||
|
@ -2072,8 +2080,8 @@ def clear_storage_containers(
|
|||
continue
|
||||
_clear_blobs(blob_client, _STORAGE_CONTAINERS[key])
|
||||
elif key.startswith('table_'):
|
||||
# TODO remove in a future release: unused registry table
|
||||
if key == 'table_registry':
|
||||
# TODO remove in a future release: unused table
|
||||
if key == 'table_registry' or key == 'table_torrentinfo':
|
||||
continue
|
||||
if (key == 'table_monitoring' or
|
||||
key == 'table_federation_global' or
|
||||
|
@ -2119,6 +2127,9 @@ def create_storage_containers(blob_client, table_client, config):
|
|||
bs = settings.batch_shipyard_settings(config)
|
||||
for key in _STORAGE_CONTAINERS:
|
||||
if key.startswith('blob_'):
|
||||
# TODO remove in a future release: unused container
|
||||
if key == 'blob_torrents':
|
||||
continue
|
||||
if (key == 'blob_remotefs' or key == 'blob_monitoring' or
|
||||
key == 'blob_federation' or
|
||||
key == 'blob_federation_global'):
|
||||
|
@ -2131,8 +2142,8 @@ def create_storage_containers(blob_client, table_client, config):
|
|||
break
|
||||
time.sleep(1)
|
||||
elif key.startswith('table_'):
|
||||
# TODO remove in a future release: unused registry table
|
||||
if key == 'table_registry':
|
||||
# TODO remove in a future release: unused table
|
||||
if key == 'table_registry' or key == 'table_torrentinfo':
|
||||
continue
|
||||
if (key == 'table_monitoring' or
|
||||
key == 'table_federation_global' or
|
||||
|
|
|
@ -24,10 +24,6 @@ batch_shipyard:
|
|||
delay_docker_image_preload: false
|
||||
data_replication:
|
||||
concurrent_source_downloads: null
|
||||
peer_to_peer:
|
||||
enabled: false
|
||||
compression: true
|
||||
direct_download_seed_bias: null
|
||||
global_resources:
|
||||
additional_registries:
|
||||
docker:
|
||||
|
@ -207,20 +203,7 @@ control of the download and data replication behavior for container images.
|
|||
image replication mechanism between compute nodes within a compute pool. The
|
||||
`concurrent_source_downloads` property specifies the number of nodes that
|
||||
can concurrently download the source images in parallel. The default, if
|
||||
not specified, is 10. The following options apply to `peer_to_peer` data
|
||||
replication options:
|
||||
* (optional) `enabled` property enables or disables private peer-to-peer
|
||||
transfer. Note that for compute pools with a relatively small number
|
||||
of VMs, peer-to-peer transfer may not provide any benefit and is
|
||||
recommended to be disabled in these cases. Compute pools with large
|
||||
number of VMs can benefit from peer-to-peer image replication. The
|
||||
default is `false`.
|
||||
* (optional) `compression` property enables or disables compression of
|
||||
image files. It is strongly recommended to keep this enabled. The
|
||||
default is `true`.
|
||||
* (optional) `direct_download_seed_bias` property sets the number of
|
||||
direct download seeds to prefer per image before switching to
|
||||
peer-to-peer transfer.
|
||||
not specified, is 10.
|
||||
|
||||
`global_resources` contains properties for populating each compute node
|
||||
with required container images and for data movement directives.
|
||||
|
|
|
@ -52,15 +52,6 @@ mapping:
|
|||
mapping:
|
||||
concurrent_source_downloads:
|
||||
type: int
|
||||
peer_to_peer:
|
||||
type: map
|
||||
mapping:
|
||||
enabled:
|
||||
type: bool
|
||||
compression:
|
||||
type: bool
|
||||
direct_download_seed_bias:
|
||||
type: int
|
||||
|
||||
global_resources:
|
||||
type: map
|
||||
|
|
|
@ -107,8 +107,7 @@ kata=0
|
|||
lis=
|
||||
networkopt=0
|
||||
native_mode=0
|
||||
p2p=
|
||||
p2penabled=0
|
||||
concurrent_source_downloads=
|
||||
prefix=
|
||||
sc_args=
|
||||
shipyardversion=
|
||||
|
@ -140,7 +139,7 @@ while getopts "h?abcde:fg:i:jkl:m:no:p:qrs:tuv:wx:yz:" opt; do
|
|||
echo "-p [prefix] storage container prefix"
|
||||
echo "-q enable batch insights"
|
||||
echo "-r enable azure batch docker group"
|
||||
echo "-s [enabled:non-p2p concurrent download:seed bias:compression] p2p sharing"
|
||||
echo "-s [concurrent source downloads] concurrent source downloads"
|
||||
echo "-t optimize network TCP settings"
|
||||
echo "-u custom image"
|
||||
echo "-v [version] batch-shipyard version"
|
||||
|
@ -203,13 +202,7 @@ while getopts "h?abcde:fg:i:jkl:m:no:p:qrs:tuv:wx:yz:" opt; do
|
|||
docker_group="\"group\": \"_azbatchsudogrp\","
|
||||
;;
|
||||
s)
|
||||
p2p=${OPTARG,,}
|
||||
IFS=':' read -ra p2pflags <<< "$p2p"
|
||||
if [ "${p2pflags[0]}" == "true" ]; then
|
||||
p2penabled=1
|
||||
else
|
||||
p2penabled=0
|
||||
fi
|
||||
concurrent_source_downloads=$OPTARG
|
||||
;;
|
||||
t)
|
||||
networkopt=1
|
||||
|
@ -1355,10 +1348,6 @@ install_cascade_dependencies() {
|
|||
rm -f get-pip.py
|
||||
pip3 install --no-cache-dir --upgrade wheel setuptools
|
||||
pip3 install --no-cache-dir -r requirements.txt
|
||||
# install cascade dependencies
|
||||
if [ $p2penabled -eq 1 ]; then
|
||||
install_packages python3-libtorrent pigz
|
||||
fi
|
||||
log INFO "Cascade on host dependencies installed"
|
||||
}
|
||||
|
||||
|
@ -1422,12 +1411,6 @@ spawn_cascade_process() {
|
|||
local cascadepid
|
||||
local envfile
|
||||
if [ $cascadecontainer -eq 1 ]; then
|
||||
local detached
|
||||
if [ $p2penabled -eq 1 ]; then
|
||||
detached="-d"
|
||||
else
|
||||
detached="--rm"
|
||||
fi
|
||||
# store docker cascade start
|
||||
if command -v python3 > /dev/null 2>&1; then
|
||||
drpstart=$(python3 -c 'import datetime;print(datetime.datetime.utcnow().timestamp())')
|
||||
|
@ -1444,7 +1427,7 @@ offer=$DISTRIB_ID
|
|||
sku=$DISTRIB_RELEASE
|
||||
npstart=$npstart
|
||||
drpstart=$drpstart
|
||||
p2p=$p2p
|
||||
concurrent_source_downloads=$concurrent_source_downloads
|
||||
$(env | grep SHIPYARD_)
|
||||
$(env | grep AZ_BATCH_)
|
||||
$(env | grep DOCKER_LOGIN_)
|
||||
|
@ -1463,7 +1446,7 @@ EOF
|
|||
# launch container
|
||||
log DEBUG "Starting Cascade"
|
||||
# shellcheck disable=SC2086
|
||||
docker run $detached --runtime runc --net=host --env-file $envfile \
|
||||
docker run --rm --runtime runc --net=host --env-file $envfile \
|
||||
-v /var/run/docker.sock:/var/run/docker.sock \
|
||||
-v /etc/passwd:/etc/passwd:ro \
|
||||
-v /etc/group:/etc/group:ro \
|
||||
|
@ -1488,22 +1471,19 @@ EOF
|
|||
fi
|
||||
log DEBUG "Starting Cascade"
|
||||
# shellcheck disable=SC2086
|
||||
PYTHONASYNCIODEBUG=1 ./cascade.py "$p2p" --ipaddress "$ipaddress" $prefix &
|
||||
PYTHONASYNCIODEBUG=1 ./cascade.py --concurrent "$concurrent_source_downloads" --mode "docker" --ipaddress "$ipaddress" $prefix &
|
||||
cascadepid=$!
|
||||
fi
|
||||
|
||||
# if not in p2p mode, then wait for cascade exit
|
||||
if [ $p2penabled -eq 0 ]; then
|
||||
local rc
|
||||
wait $cascadepid
|
||||
rc=$?
|
||||
if [ $rc -eq 0 ]; then
|
||||
log DEBUG "Cascade exited successfully"
|
||||
else
|
||||
log ERROR "cascade exited with non-zero exit code: $rc"
|
||||
rm -f "$nodeprepfinished"
|
||||
exit $rc
|
||||
fi
|
||||
local rc
|
||||
wait $cascadepid
|
||||
rc=$?
|
||||
if [ $rc -eq 0 ]; then
|
||||
log DEBUG "Cascade exited successfully"
|
||||
else
|
||||
log ERROR "cascade exited with non-zero exit code: $rc"
|
||||
rm -f "$nodeprepfinished"
|
||||
exit $rc
|
||||
fi
|
||||
set -e
|
||||
|
||||
|
@ -1633,7 +1613,7 @@ echo "Enable Azure Batch group for Docker access: $docker_group"
|
|||
echo "Fallback registry: $fallback_registry"
|
||||
echo "Docker image preload delay: $delay_preload"
|
||||
echo "Cascade via container: $cascadecontainer"
|
||||
echo "P2P: $p2penabled"
|
||||
echo "Concurrent source downloads: $concurrent_source_downloads"
|
||||
echo "Block on images: $block"
|
||||
echo ""
|
||||
|
||||
|
@ -1656,13 +1636,6 @@ save_startup_to_volatile
|
|||
# install LIS if required first (lspci won't work on certain distros without it)
|
||||
install_lis
|
||||
|
||||
# set iptables rules
|
||||
if [ $p2penabled -eq 1 ]; then
|
||||
# disable DHT connection tracking
|
||||
iptables -t raw -I PREROUTING -p udp --dport 6881 -j CT --notrack
|
||||
iptables -t raw -I OUTPUT -p udp --sport 6881 -j CT --notrack
|
||||
fi
|
||||
|
||||
# decrypt encrypted creds
|
||||
if [ -n "$encrypted" ]; then
|
||||
decrypt_encrypted_credentials
|
||||
|
|
Загрузка…
Ссылка в новой задаче