diff --git a/cascade.py b/cascade.py index 65d95e8..5974318 100755 --- a/cascade.py +++ b/cascade.py @@ -9,7 +9,10 @@ import hashlib import os import pathlib import random +import subprocess import sys +import threading +import time # non-stdlib imports import azure.common import azure.storage.blob as azureblob @@ -35,9 +38,17 @@ _NODEID = os.environ['AZ_BATCH_NODE_ID'] _SHARED_DIR = os.environ['AZ_BATCH_NODE_SHARED_DIR'] _TORRENT_DIR = pathlib.Path(_SHARED_DIR, '.torrents') _PARTITION_KEY = '{}${}'.format(_BATCHACCOUNT, _POOLID) +_LR_LOCK_ASYNC = asyncio.Lock() +_PT_LOCK = threading.Lock() +_DIRECTDL_LOCK = threading.Lock() +_SELF_REGISTRY_PTR = None +_ENABLE_P2P = True +_NON_P2P_CONCURRENT_DOWNLOADING = True +_REGISTRIES = {} # mutable global state _CBHANDLES = {} _QUEUE_MESSAGES = {} +_DHT_ROUTERS = [] _PREFIX = None _STORAGE_CONTAINERS = { 'table_dht': None, @@ -47,16 +58,12 @@ _STORAGE_CONTAINERS = { 'table_globalresources': None, 'queue_globalresources': None, } -_SELF_REGISTRY_PTR = None -_REGISTRIES = {} _TORRENTS = {} +_PENDING_TORRENTS = {} _TORRENT_REVERSE_LOOKUP = {} -_DIRECTDL = {} -_DHT_ROUTERS = [] -_LR_LOCK_ASYNC = asyncio.Lock() +_DIRECTDL = [] +_DIRECTDL_DOWNLOADING = [] _GR_DONE = False -_ENABLE_P2P = True -_NON_P2P_CONCURRENT_DOWNLOADING = True def _setup_container_names(sep: str): @@ -97,9 +104,10 @@ def _create_credentials() -> tuple: return blob_client, queue_client, table_client -def generate_torrent(incl_file: pathlib.Path) -> dict: +def generate_torrent(incl_file: pathlib.Path, resource_hash: str) -> dict: """Generate torrent file for a given file and write it to disk :param pathlib.Path incl_file: file to include in torrent + :param str resource_hash: resource hash :rtype: tuple :return: (torrent file as pathlib, torrent file encoded as base64, torrent file data sha1 hash) @@ -113,7 +121,7 @@ def generate_torrent(incl_file: pathlib.Path) -> dict: torrent_data = libtorrent.bencode(torrent) torrent_b64 = base64.b64encode(torrent_data).decode('ascii') torrent_sha1 = hashlib.sha1(torrent_data).hexdigest() - fp = _TORRENT_DIR / '{}.torrent'.format(torrent_sha1) + fp = _TORRENT_DIR / '{}.torrent'.format(resource_hash) with fp.open('wb') as f: f.write(torrent_data) return fp, torrent_b64, torrent_sha1 @@ -160,8 +168,6 @@ def _renew_queue_message_lease( :param str queue_key: queue name key index into _STORAGE_CONTAINERS :param str msg_id: message id """ - print('updating queue message id={} pr={}'.format( - msg_id, _QUEUE_MESSAGES[msg_id].pop_receipt)) msg = queue_client.update_message( _STORAGE_CONTAINERS[queue_key], message_id=msg_id, @@ -172,8 +178,6 @@ def _renew_queue_message_lease( 'update message failed for id={} pr={}'.format( msg_id, _QUEUE_MESSAGES[msg_id].pop_receipt)) _QUEUE_MESSAGES[msg_id].pop_receipt = msg.pop_receipt - print('queue message updated id={} pr={}'.format( - msg_id, _QUEUE_MESSAGES[msg_id].pop_receipt)) _CBHANDLES[queue_key] = loop.call_later( 15, _renew_queue_message_lease, loop, queue_client, queue_key, msg_id) @@ -210,33 +214,168 @@ async def _record_perf_async(loop, event, message): print('could not record perf to storage for event: {}'.format(event)) +def _record_perf(event, message): + subprocess.check_call( + 'perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format( + ev=event, pr=_PREFIX, msg=message), shell=True) + + +class DockerSaveThread(threading.Thread): + def __init__(self, queue_client, resource, msg_id): + threading.Thread.__init__(self) + self.queue_client = queue_client + self.resource = resource + self.msg_id = msg_id + with _DIRECTDL_LOCK: + _DIRECTDL_DOWNLOADING.append(self.resource) + + def run(self): + file = None + resource_hash = hashlib.sha1(self.resource.encode('utf8')).hexdigest() + if self.resource.startswith(_DOCKER_TAG): + if len(_REGISTRIES) < 1: + raise RuntimeError( + ('{} image specified for global resource, but there are ' + 'no registries available').format(self.resource)) + image = self.resource[ + self.resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):] + registry = None + _record_perf('pull-start', 'img={}'.format(image)) + start = datetime.datetime.now() + while True: + # pick random registry to download from + registry = _REGISTRIES[_pick_random_registry_key()] + print('pulling image {} from {}'.format(image, registry)) + if registry == 'registry.hub.docker.com': + proc = subprocess.Popen( + 'docker pull {}'.format(image), shell=True) + else: + proc = subprocess.Popen( + 'docker pull {}/{}'.format(registry, image), + shell=True) + proc.wait() + if proc.returncode == 0: + break + else: + print('docker pull non-zero rc: {}'.format( + proc.returncode)) + time.sleep(1) + # tag image to remove registry ip + if registry != 'registry.hub.docker.com': + subprocess.check_call( + 'docker tag {}/{} {}'.format(registry, image, image), + shell=True) + diff = (datetime.datetime.now() - start).total_seconds() + print('took {} sec to pull docker image {} from {}'.format( + diff, image, registry)) + _record_perf('pull-end', 'img={},diff={}'.format(image, diff)) + # save docker image to seed to torrent + if _ENABLE_P2P: + _record_perf('save-start', 'img={}'.format(image)) + start = datetime.datetime.now() + file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash) + print('saving docker image {} to {} for seeding'.format( + image, file)) + subprocess.check_call( + 'docker save {} | gzip -c > {}'.format(image, file), + shell=True) + print('docker image {} saved for seeding'.format(image)) + diff = (datetime.datetime.now() - start).total_seconds() + print('took {} sec to save docker image {} to {}'.format( + diff, image, file.parent)) + _record_perf('save-end', 'img={},size={},diff={}'.format( + image, file.stat().st_size, diff)) + else: + # TODO download via blob, explode uri to get container/blob + # use download to path into /tmp and move to _TORRENT_DIR + raise NotImplementedError() + # generate torrent file + if _ENABLE_P2P: + start = datetime.datetime.now() + torrent_file, torrent_b64, torrent_sha1 = generate_torrent( + file, resource_hash) + diff = (datetime.datetime.now() - start).total_seconds() + print('took {} sec to generate torrent file: {}'.format( + diff, torrent_file)) + start = datetime.datetime.now() + # add to torrent dict (effectively enqueues for torrent start) + entity = { + 'PartitionKey': _PARTITION_KEY, + 'RowKey': resource_hash, + 'Resource': self.resource, + 'TorrentFileBase64': torrent_b64, + 'TorrentFileSHA1': torrent_sha1, + 'FileSizeBytes': file.stat().st_size, + # 'FileSHA1': compute_sha1_for_file(file), + } + with _PT_LOCK: + _PENDING_TORRENTS[self.resource] = { + 'entity': entity, + 'torrent_file': torrent_file, + 'started': False, + 'seed': True, + 'loaded': True, + 'loading': False, + 'registered': False, + } + _TORRENT_REVERSE_LOOKUP[resource_hash] = self.resource + # wait until torrent has started + print('waiting for torrent {} to start'.format(self.resource)) + while (self.resource not in _TORRENTS or + not _TORRENTS[self.resource]['started']): + time.sleep(0.1) + diff = (datetime.datetime.now() - start).total_seconds() + print('took {} sec for {} torrent to start'.format( + diff, self.resource)) + # cancel callback + if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING: + _CBHANDLES['queue_globalresources'].cancel() + _CBHANDLES.pop('queue_globalresources') + # release queue message + self.queue_client.update_message( + _STORAGE_CONTAINERS['queue_globalresources'], + message_id=self.msg_id, + pop_receipt=_QUEUE_MESSAGES[self.msg_id].pop_receipt, + visibility_timeout=0) + _QUEUE_MESSAGES.pop(self.msg_id) + print('queue message released for {}'.format(self.resource)) + # remove from downloading list + with _DIRECTDL_LOCK: + _DIRECTDL_DOWNLOADING.remove(self.resource) + _DIRECTDL.remove(self.resource) + + async def _direct_download_resources_async( loop, blob_client, queue_client, table_client, ipaddress): # iterate through downloads to see if there are any torrents available - # TODO allow multiple downloads - rmdl = [] - for dl in _DIRECTDL: - if _check_resource_has_torrent(loop, table_client, dl, False): - rmdl.append(dl) - if len(rmdl) > 0: - for dl in rmdl: - _DIRECTDL.pop(dl, None) - if len(_DIRECTDL) == 0: - return + with _DIRECTDL_LOCK: + if len(_DIRECTDL) == 0: + return # go through queue and find resources we can download msg = None + rmdl = [] _release_list = [] while True: msgs = queue_client.get_messages( - _STORAGE_CONTAINERS['queue_globalresources'], num_messages=32, + _STORAGE_CONTAINERS['queue_globalresources'], num_messages=1, visibility_timeout=45) if len(msgs) == 0: break - for _msg in msgs: - if _msg.content in _DIRECTDL and msg is None: - msg = _msg - else: - _release_list.append(_msg) + with _DIRECTDL_LOCK: + for _msg in msgs: + if (msg is None and _msg.content in _DIRECTDL and + _msg.content not in _DIRECTDL_DOWNLOADING): + # TODO modify this to work with concurrent source downloads + # check number of seeds + nseeds = _get_torrent_num_seeds(table_client, _msg.content) + # TODO determine a good number of seeds to cut off directdl + if nseeds < 3: + msg = _msg + else: + rmdl.append(_msg.content) + _release_list.append(_msg) + else: + _release_list.append(_msg) if msg is not None: break # renew lease and create renew callback @@ -256,126 +395,19 @@ async def _direct_download_resources_async( pop_receipt=_msg.pop_receipt, visibility_timeout=0) del _release_list + # remove messages out of rmdl + if len(rmdl) > 0: + with _DIRECTDL_LOCK: + for dl in rmdl: + try: + _DIRECTDL.remove(dl) + except ValueError: + pass if msg is None: return - file = None - # download data - resource = msg.content - resource_hash = hashlib.sha1(resource.encode('utf8')).hexdigest() - if resource.startswith(_DOCKER_TAG): - if len(_REGISTRIES) < 1: - raise RuntimeError( - ('{} image specified for global resource, but there are ' - 'no registries available').format(resource)) - image = resource[resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):] - registry = None - await _record_perf_async(loop, 'pull-start', 'img={}'.format(image)) - start = datetime.datetime.now() - while True: - # pick random registry to download from - registry = _REGISTRIES[_pick_random_registry_key()] - print('pulling image {} from {}'.format(image, registry)) - if registry == 'registry.hub.docker.com': - proc = await asyncio.subprocess.create_subprocess_shell( - 'docker pull {}'.format(image), loop=loop) - else: - proc = await asyncio.subprocess.create_subprocess_shell( - 'docker pull {}/{}'.format(registry, image), loop=loop) - await proc.wait() - if proc.returncode == 0: - break - else: - print('docker pull non-zero rc: {}'.format( - proc.returncode)) - await asyncio.sleep(1) - # tag image to remove registry ip - if registry != 'registry.hub.docker.com': - proc = await asyncio.subprocess.create_subprocess_shell( - 'docker tag {}/{} {}'.format(registry, image, image), - loop=loop) - await proc.wait() - if proc.returncode != 0: - raise RuntimeError('docker tag non-zero rc: {}'.format( - proc.returncode)) - diff = (datetime.datetime.now() - start).total_seconds() - print('took {} sec to pull docker image {} from {}'.format( - diff, image, registry)) - await _record_perf_async(loop, 'pull-end', 'img={},diff={}'.format( - image, diff)) - # save docker image to seed to torrent - if _ENABLE_P2P: - await _record_perf_async(loop, 'save-start', 'img={}'.format( - image)) - start = datetime.datetime.now() - file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash) - print('saving docker image {} to {} for seeding'.format( - image, file)) - proc = await asyncio.subprocess.create_subprocess_shell( - 'docker save {} | gzip -c > {}'.format(image, file), loop=loop) - await proc.wait() - if proc.returncode != 0: - raise RuntimeError('docker save non-zero rc: {}'.format( - proc.returncode)) - else: - print('docker image {} saved for seeding'.format(image)) - diff = (datetime.datetime.now() - start).total_seconds() - print('took {} sec to save docker image {} to {}'.format( - diff, image, file.parent)) - await _record_perf_async( - loop, 'save-end', 'img={},size={},diff={}'.format( - image, file.stat().st_size, diff)) - else: - # TODO download via blob, explode uri to get container/blob - # use download to path into /tmp and move to _TORRENT_DIR - raise NotImplementedError() - # generate torrent file - if _ENABLE_P2P: - start = datetime.datetime.now() - future = loop.run_in_executor(None, generate_torrent, file) - torrent_file, torrent_b64, torrent_sha1 = await future - diff = (datetime.datetime.now() - start).total_seconds() - print('took {} sec to generate torrent file: {}'.format( - diff, torrent_file)) - start = datetime.datetime.now() - # add to torrent dict (effectively enqueues for torrent start) - entity = { - 'PartitionKey': _PARTITION_KEY, - 'RowKey': resource_hash, - 'Resource': resource, - 'TorrentFileBase64': torrent_b64, - 'TorrentFileSHA1': torrent_sha1, - 'FileSizeBytes': file.stat().st_size, - # 'FileSHA1': compute_sha1_for_file(file), - } - _TORRENTS[resource] = { - 'entity': entity, - 'torrent_file': torrent_file, - 'started': False, - 'seed': True, - 'loaded': True, - 'registered': False, - } - _TORRENT_REVERSE_LOOKUP[resource_hash] = resource - # wait until torrent has started - print('waiting for torrent {} to start'.format(resource)) - while not _TORRENTS[resource]['started']: - await asyncio.sleep(0.1) - diff = (datetime.datetime.now() - start).total_seconds() - print('took {} sec for {} torrent to start'.format(diff, resource)) - # cancel callback - if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING: - _CBHANDLES['queue_globalresources'].cancel() - _CBHANDLES.pop('queue_globalresources') - # release queue message - queue_client.update_message( - _STORAGE_CONTAINERS['queue_globalresources'], - message_id=msg.id, - pop_receipt=_QUEUE_MESSAGES[msg.id].pop_receipt, - visibility_timeout=0) - _QUEUE_MESSAGES.pop(msg.id) - print('queue message released for {}'.format(resource)) - # remove resources from download list - _DIRECTDL.pop(resource) + # pull and save docker image in thread + thr = DockerSaveThread(queue_client, msg.content, msg.id) + thr.start() def _merge_service( @@ -470,6 +502,32 @@ def bootstrap_dht_nodes( loop.call_later(1, bootstrap_dht_nodes, loop, table_client, ipaddress) +class DockerLoadThread(threading.Thread): + def __init__(self, resource): + threading.Thread.__init__(self) + self.resource = resource + _TORRENTS[self.resource]['seed'] = True + _TORRENTS[self.resource]['loading'] = True + + def run(self): + print('loading resource: {}'.format(self.resource)) + resource_hash = hashlib.sha1(self.resource.encode('utf8')).hexdigest() + image = self.resource[ + self.resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):] + file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash) + _record_perf('load-start', 'img={},size={}'.format( + image, file.stat().st_size)) + start = datetime.datetime.now() + print('loading docker image {} from {}'.format(image, file)) + subprocess.check_call( + 'gunzip -c {} | docker load'.format(file), shell=True) + diff = (datetime.datetime.now() - start).total_seconds() + print('took {} sec to load docker image from {}'.format(diff, file)) + _record_perf('load-end', 'img={},diff={}'.format(image, diff)) + _TORRENTS[self.resource]['loading'] = False + _TORRENTS[self.resource]['loaded'] = True + + async def _load_and_register_async( loop: asyncio.BaseEventLoop, table_client: azure.storage.table.TableService, @@ -482,41 +540,18 @@ async def _load_and_register_async( if _TORRENTS[resource]['started']: if _TORRENTS[resource]['handle'].is_seed(): # docker load image - if not _TORRENTS[resource]['loaded']: - resource_hash = hashlib.sha1( - resource.encode('utf8')).hexdigest() - image = resource[ - resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):] - file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash) - await _record_perf_async( - loop, 'load-start', 'img={},size={}'.format( - image, file.stat().st_size)) - start = datetime.datetime.now() - print('loading docker image {} from {}'.format( - image, file)) - proc = await \ - asyncio.subprocess.create_subprocess_shell( - 'gunzip -c {} | docker load'.format(file), - loop=loop) - await proc.wait() - if proc.returncode != 0: - raise RuntimeError( - 'docker load non-zero rc: {}'.format( - proc.returncode)) - _TORRENTS[resource]['loaded'] = True - diff = (datetime.datetime.now() - - start).total_seconds() - print(('took {} sec to load docker image ' - 'from {}').format(diff, file)) - await _record_perf_async( - loop, 'load-end', 'img={},diff={}'.format( - image, diff)) + if (not _TORRENTS[resource]['loaded'] and + not _TORRENTS[resource]['loading']): + thr = DockerLoadThread(resource) + thr.start() # register to services table - if not _TORRENTS[resource]['registered']: - _merge_service(table_client, resource) - _TORRENTS[resource]['registered'] = True - else: - nfinished += 1 + if (_TORRENTS[resource]['loaded'] and + not _TORRENTS[resource]['loading']): + if not _TORRENTS[resource]['registered']: + _merge_service(table_client, resource) + _TORRENTS[resource]['registered'] = True + else: + nfinished += 1 if not _GR_DONE and nfinished == nglobalresources: await _record_perf_async( loop, 'gr-done', @@ -535,6 +570,11 @@ async def manage_torrents_async( if not _GR_DONE and not _LR_LOCK_ASYNC.locked(): asyncio.ensure_future(_load_and_register_async( loop, table_client, nglobalresources)) + # move pending torrents into torrents + with _PT_LOCK: + for pt in _PENDING_TORRENTS: + _TORRENTS[pt] = _PENDING_TORRENTS[pt] + _PENDING_TORRENTS.clear() # start applicable torrent sessions for resource in _TORRENTS: if _TORRENTS[resource]['started']: @@ -586,6 +626,20 @@ async def download_monitor_async( await asyncio.sleep(1) +def _get_torrent_num_seeds( + table_client: azure.storage.table.TableService, + resource: str) -> int: + try: + rk = hashlib.sha1(resource.encode('utf8')).hexdigest() + se = table_client.get_entity( + _STORAGE_CONTAINERS['table_services'], + _PARTITION_KEY, rk) + numseeds = len(se['VmList'].split(',')) + except azure.common.AzureMissingResourceHttpError: + numseeds = 0 + return numseeds + + def _check_resource_has_torrent( loop: asyncio.BaseEventLoop, table_client: azure.storage.table.TableService, @@ -600,7 +654,8 @@ def _check_resource_has_torrent( _PARTITION_KEY, rk) except azure.common.AzureMissingResourceHttpError: if add_to_dict: - _DIRECTDL[resource] = None + with _DIRECTDL_LOCK: + _DIRECTDL.append(resource) return False else: # write torrent file to disk @@ -608,15 +663,17 @@ def _check_resource_has_torrent( torrent_file = _TORRENT_DIR / '{}.torrent'.format(entity['RowKey']) with open(str(torrent_file), 'wb') as f: f.write(torrent) - _TORRENTS[resource] = { - 'entity': entity, - 'torrent_file': torrent_file, - 'started': False, - 'seed': False, - 'loaded': False, - 'registered': False, - } - _TORRENT_REVERSE_LOOKUP[entity['RowKey']] = resource + with _PT_LOCK: + _PENDING_TORRENTS[resource] = { + 'entity': entity, + 'torrent_file': torrent_file, + 'started': False, + 'seed': False, + 'loaded': False, + 'loading': False, + 'registered': False, + } + _TORRENT_REVERSE_LOOKUP[entity['RowKey']] = resource print('found torrent for resource {}'.format(resource)) return True diff --git a/graph.py b/graph.py index 1895394..e9e84bf 100755 --- a/graph.py +++ b/graph.py @@ -62,11 +62,16 @@ def _parse_message(msg): return m -def _diff_events(data, nodeid, event, end_event, timing, prefix): +def _diff_events(data, nodeid, event, end_event, timing, prefix, sizes=None): for i in range(0, len(data[nodeid][event])): + # torrent start -> load start may not always exist due to pull + if (event == 'cascade:torrent-start' and + end_event == 'cascade:load-start' and + end_event not in data[nodeid]): + return + # find end event for this img subevent = data[nodeid][event][i] img = subevent['message']['img'] - # find end event for this img found = False for j in range(0, len(data[nodeid][end_event])): pei = data[ @@ -74,15 +79,21 @@ def _diff_events(data, nodeid, event, end_event, timing, prefix): if pei == img: timing[prefix + img] = _compute_delta_t( data, nodeid, event, i, end_event, j) + if sizes is not None and img not in sizes: + if event == 'cascade:load-start': + sizes[img] = data[nodeid][event][j]['message']['size'] + else: + sizes[img] = data[ + nodeid][end_event][j]['message']['size'] found = True break - if not found: + if not found and event != 'cascade:torrent-start': raise RuntimeError( 'could not find corresponding event for {}:{}'.format( - subevent, img)) + event, img)) -def graph_data(table_client): +def coalesce_data(table_client): print('graphing data from {} with pk={}'.format( _TABLE_NAME, _PARTITION_KEY)) entities = table_client.query_entities( @@ -106,8 +117,8 @@ def graph_data(table_client): ev['message'] = None data[nodeid][event].append(ev) del entities + sizes = {} for nodeid in data: - print(nodeid) # calculate dt timings timing = { 'docker_install': _compute_delta_t( @@ -132,19 +143,33 @@ def graph_data(table_client): _diff_events( data, nodeid, event, 'cascade:pull-end', timing, 'pull:') elif event == 'cascade:save-start': - pass - elif event == 'cascade:save-end': - # message will contain size info - pass + _diff_events( + data, nodeid, event, 'cascade:save-end', timing, 'save:', + sizes) elif event == 'cascade:torrent-start': - pass + _diff_events( + data, nodeid, event, 'cascade:load-start', timing, + 'torrent:') elif event == 'cascade:load-start': - # load start also marks torrent-seed - # message will contain size info - pass - elif event == 'cascade:load-end': - pass - print(timing) + _diff_events( + data, nodeid, event, 'cascade:load-end', timing, + 'load:', sizes) + data[nodeid].pop('cascade:pull-start', None) + data[nodeid].pop('cascade:pull-end', None) + data[nodeid].pop('cascade:save-start', None) + data[nodeid].pop('cascade:save-end', None) + data[nodeid].pop('cascade:torrent-start') + data[nodeid].pop('cascade:load-start', None) + data[nodeid].pop('cascade:load-end', None) + data[nodeid]['timing'] = timing + return data, sizes + + +def graph_data(data, sizes): + print(sizes) + for nodeid in data: + print(nodeid) + print(data[nodeid]) def merge_dict(dict1, dict2): @@ -189,7 +214,8 @@ def main(): # create storage credentials table_client = _create_credentials(config) # graph data - graph_data(table_client) + data, sizes = coalesce_data(table_client) + graph_data(data, sizes) def parseargs():