Move blocking pull/save/load to thread code

This commit is contained in:
Fred Park 2016-08-15 15:16:45 -07:00
Родитель d4c44f811c
Коммит d29349f7ec
2 изменённых файлов: 292 добавлений и 209 удалений

Просмотреть файл

@ -9,7 +9,10 @@ import hashlib
import os
import pathlib
import random
import subprocess
import sys
import threading
import time
# non-stdlib imports
import azure.common
import azure.storage.blob as azureblob
@ -35,9 +38,17 @@ _NODEID = os.environ['AZ_BATCH_NODE_ID']
_SHARED_DIR = os.environ['AZ_BATCH_NODE_SHARED_DIR']
_TORRENT_DIR = pathlib.Path(_SHARED_DIR, '.torrents')
_PARTITION_KEY = '{}${}'.format(_BATCHACCOUNT, _POOLID)
_LR_LOCK_ASYNC = asyncio.Lock()
_PT_LOCK = threading.Lock()
_DIRECTDL_LOCK = threading.Lock()
_SELF_REGISTRY_PTR = None
_ENABLE_P2P = True
_NON_P2P_CONCURRENT_DOWNLOADING = True
_REGISTRIES = {}
# mutable global state
_CBHANDLES = {}
_QUEUE_MESSAGES = {}
_DHT_ROUTERS = []
_PREFIX = None
_STORAGE_CONTAINERS = {
'table_dht': None,
@ -47,16 +58,12 @@ _STORAGE_CONTAINERS = {
'table_globalresources': None,
'queue_globalresources': None,
}
_SELF_REGISTRY_PTR = None
_REGISTRIES = {}
_TORRENTS = {}
_PENDING_TORRENTS = {}
_TORRENT_REVERSE_LOOKUP = {}
_DIRECTDL = {}
_DHT_ROUTERS = []
_LR_LOCK_ASYNC = asyncio.Lock()
_DIRECTDL = []
_DIRECTDL_DOWNLOADING = []
_GR_DONE = False
_ENABLE_P2P = True
_NON_P2P_CONCURRENT_DOWNLOADING = True
def _setup_container_names(sep: str):
@ -97,9 +104,10 @@ def _create_credentials() -> tuple:
return blob_client, queue_client, table_client
def generate_torrent(incl_file: pathlib.Path) -> dict:
def generate_torrent(incl_file: pathlib.Path, resource_hash: str) -> dict:
"""Generate torrent file for a given file and write it to disk
:param pathlib.Path incl_file: file to include in torrent
:param str resource_hash: resource hash
:rtype: tuple
:return: (torrent file as pathlib, torrent file encoded as base64,
torrent file data sha1 hash)
@ -113,7 +121,7 @@ def generate_torrent(incl_file: pathlib.Path) -> dict:
torrent_data = libtorrent.bencode(torrent)
torrent_b64 = base64.b64encode(torrent_data).decode('ascii')
torrent_sha1 = hashlib.sha1(torrent_data).hexdigest()
fp = _TORRENT_DIR / '{}.torrent'.format(torrent_sha1)
fp = _TORRENT_DIR / '{}.torrent'.format(resource_hash)
with fp.open('wb') as f:
f.write(torrent_data)
return fp, torrent_b64, torrent_sha1
@ -160,8 +168,6 @@ def _renew_queue_message_lease(
:param str queue_key: queue name key index into _STORAGE_CONTAINERS
:param str msg_id: message id
"""
print('updating queue message id={} pr={}'.format(
msg_id, _QUEUE_MESSAGES[msg_id].pop_receipt))
msg = queue_client.update_message(
_STORAGE_CONTAINERS[queue_key],
message_id=msg_id,
@ -172,8 +178,6 @@ def _renew_queue_message_lease(
'update message failed for id={} pr={}'.format(
msg_id, _QUEUE_MESSAGES[msg_id].pop_receipt))
_QUEUE_MESSAGES[msg_id].pop_receipt = msg.pop_receipt
print('queue message updated id={} pr={}'.format(
msg_id, _QUEUE_MESSAGES[msg_id].pop_receipt))
_CBHANDLES[queue_key] = loop.call_later(
15, _renew_queue_message_lease, loop, queue_client, queue_key, msg_id)
@ -210,33 +214,168 @@ async def _record_perf_async(loop, event, message):
print('could not record perf to storage for event: {}'.format(event))
def _record_perf(event, message):
subprocess.check_call(
'perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
ev=event, pr=_PREFIX, msg=message), shell=True)
class DockerSaveThread(threading.Thread):
def __init__(self, queue_client, resource, msg_id):
threading.Thread.__init__(self)
self.queue_client = queue_client
self.resource = resource
self.msg_id = msg_id
with _DIRECTDL_LOCK:
_DIRECTDL_DOWNLOADING.append(self.resource)
def run(self):
file = None
resource_hash = hashlib.sha1(self.resource.encode('utf8')).hexdigest()
if self.resource.startswith(_DOCKER_TAG):
if len(_REGISTRIES) < 1:
raise RuntimeError(
('{} image specified for global resource, but there are '
'no registries available').format(self.resource))
image = self.resource[
self.resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):]
registry = None
_record_perf('pull-start', 'img={}'.format(image))
start = datetime.datetime.now()
while True:
# pick random registry to download from
registry = _REGISTRIES[_pick_random_registry_key()]
print('pulling image {} from {}'.format(image, registry))
if registry == 'registry.hub.docker.com':
proc = subprocess.Popen(
'docker pull {}'.format(image), shell=True)
else:
proc = subprocess.Popen(
'docker pull {}/{}'.format(registry, image),
shell=True)
proc.wait()
if proc.returncode == 0:
break
else:
print('docker pull non-zero rc: {}'.format(
proc.returncode))
time.sleep(1)
# tag image to remove registry ip
if registry != 'registry.hub.docker.com':
subprocess.check_call(
'docker tag {}/{} {}'.format(registry, image, image),
shell=True)
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to pull docker image {} from {}'.format(
diff, image, registry))
_record_perf('pull-end', 'img={},diff={}'.format(image, diff))
# save docker image to seed to torrent
if _ENABLE_P2P:
_record_perf('save-start', 'img={}'.format(image))
start = datetime.datetime.now()
file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash)
print('saving docker image {} to {} for seeding'.format(
image, file))
subprocess.check_call(
'docker save {} | gzip -c > {}'.format(image, file),
shell=True)
print('docker image {} saved for seeding'.format(image))
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to save docker image {} to {}'.format(
diff, image, file.parent))
_record_perf('save-end', 'img={},size={},diff={}'.format(
image, file.stat().st_size, diff))
else:
# TODO download via blob, explode uri to get container/blob
# use download to path into /tmp and move to _TORRENT_DIR
raise NotImplementedError()
# generate torrent file
if _ENABLE_P2P:
start = datetime.datetime.now()
torrent_file, torrent_b64, torrent_sha1 = generate_torrent(
file, resource_hash)
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to generate torrent file: {}'.format(
diff, torrent_file))
start = datetime.datetime.now()
# add to torrent dict (effectively enqueues for torrent start)
entity = {
'PartitionKey': _PARTITION_KEY,
'RowKey': resource_hash,
'Resource': self.resource,
'TorrentFileBase64': torrent_b64,
'TorrentFileSHA1': torrent_sha1,
'FileSizeBytes': file.stat().st_size,
# 'FileSHA1': compute_sha1_for_file(file),
}
with _PT_LOCK:
_PENDING_TORRENTS[self.resource] = {
'entity': entity,
'torrent_file': torrent_file,
'started': False,
'seed': True,
'loaded': True,
'loading': False,
'registered': False,
}
_TORRENT_REVERSE_LOOKUP[resource_hash] = self.resource
# wait until torrent has started
print('waiting for torrent {} to start'.format(self.resource))
while (self.resource not in _TORRENTS or
not _TORRENTS[self.resource]['started']):
time.sleep(0.1)
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec for {} torrent to start'.format(
diff, self.resource))
# cancel callback
if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING:
_CBHANDLES['queue_globalresources'].cancel()
_CBHANDLES.pop('queue_globalresources')
# release queue message
self.queue_client.update_message(
_STORAGE_CONTAINERS['queue_globalresources'],
message_id=self.msg_id,
pop_receipt=_QUEUE_MESSAGES[self.msg_id].pop_receipt,
visibility_timeout=0)
_QUEUE_MESSAGES.pop(self.msg_id)
print('queue message released for {}'.format(self.resource))
# remove from downloading list
with _DIRECTDL_LOCK:
_DIRECTDL_DOWNLOADING.remove(self.resource)
_DIRECTDL.remove(self.resource)
async def _direct_download_resources_async(
loop, blob_client, queue_client, table_client, ipaddress):
# iterate through downloads to see if there are any torrents available
# TODO allow multiple downloads
rmdl = []
for dl in _DIRECTDL:
if _check_resource_has_torrent(loop, table_client, dl, False):
rmdl.append(dl)
if len(rmdl) > 0:
for dl in rmdl:
_DIRECTDL.pop(dl, None)
if len(_DIRECTDL) == 0:
return
with _DIRECTDL_LOCK:
if len(_DIRECTDL) == 0:
return
# go through queue and find resources we can download
msg = None
rmdl = []
_release_list = []
while True:
msgs = queue_client.get_messages(
_STORAGE_CONTAINERS['queue_globalresources'], num_messages=32,
_STORAGE_CONTAINERS['queue_globalresources'], num_messages=1,
visibility_timeout=45)
if len(msgs) == 0:
break
for _msg in msgs:
if _msg.content in _DIRECTDL and msg is None:
msg = _msg
else:
_release_list.append(_msg)
with _DIRECTDL_LOCK:
for _msg in msgs:
if (msg is None and _msg.content in _DIRECTDL and
_msg.content not in _DIRECTDL_DOWNLOADING):
# TODO modify this to work with concurrent source downloads
# check number of seeds
nseeds = _get_torrent_num_seeds(table_client, _msg.content)
# TODO determine a good number of seeds to cut off directdl
if nseeds < 3:
msg = _msg
else:
rmdl.append(_msg.content)
_release_list.append(_msg)
else:
_release_list.append(_msg)
if msg is not None:
break
# renew lease and create renew callback
@ -256,126 +395,19 @@ async def _direct_download_resources_async(
pop_receipt=_msg.pop_receipt,
visibility_timeout=0)
del _release_list
# remove messages out of rmdl
if len(rmdl) > 0:
with _DIRECTDL_LOCK:
for dl in rmdl:
try:
_DIRECTDL.remove(dl)
except ValueError:
pass
if msg is None:
return
file = None
# download data
resource = msg.content
resource_hash = hashlib.sha1(resource.encode('utf8')).hexdigest()
if resource.startswith(_DOCKER_TAG):
if len(_REGISTRIES) < 1:
raise RuntimeError(
('{} image specified for global resource, but there are '
'no registries available').format(resource))
image = resource[resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):]
registry = None
await _record_perf_async(loop, 'pull-start', 'img={}'.format(image))
start = datetime.datetime.now()
while True:
# pick random registry to download from
registry = _REGISTRIES[_pick_random_registry_key()]
print('pulling image {} from {}'.format(image, registry))
if registry == 'registry.hub.docker.com':
proc = await asyncio.subprocess.create_subprocess_shell(
'docker pull {}'.format(image), loop=loop)
else:
proc = await asyncio.subprocess.create_subprocess_shell(
'docker pull {}/{}'.format(registry, image), loop=loop)
await proc.wait()
if proc.returncode == 0:
break
else:
print('docker pull non-zero rc: {}'.format(
proc.returncode))
await asyncio.sleep(1)
# tag image to remove registry ip
if registry != 'registry.hub.docker.com':
proc = await asyncio.subprocess.create_subprocess_shell(
'docker tag {}/{} {}'.format(registry, image, image),
loop=loop)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError('docker tag non-zero rc: {}'.format(
proc.returncode))
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to pull docker image {} from {}'.format(
diff, image, registry))
await _record_perf_async(loop, 'pull-end', 'img={},diff={}'.format(
image, diff))
# save docker image to seed to torrent
if _ENABLE_P2P:
await _record_perf_async(loop, 'save-start', 'img={}'.format(
image))
start = datetime.datetime.now()
file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash)
print('saving docker image {} to {} for seeding'.format(
image, file))
proc = await asyncio.subprocess.create_subprocess_shell(
'docker save {} | gzip -c > {}'.format(image, file), loop=loop)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError('docker save non-zero rc: {}'.format(
proc.returncode))
else:
print('docker image {} saved for seeding'.format(image))
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to save docker image {} to {}'.format(
diff, image, file.parent))
await _record_perf_async(
loop, 'save-end', 'img={},size={},diff={}'.format(
image, file.stat().st_size, diff))
else:
# TODO download via blob, explode uri to get container/blob
# use download to path into /tmp and move to _TORRENT_DIR
raise NotImplementedError()
# generate torrent file
if _ENABLE_P2P:
start = datetime.datetime.now()
future = loop.run_in_executor(None, generate_torrent, file)
torrent_file, torrent_b64, torrent_sha1 = await future
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to generate torrent file: {}'.format(
diff, torrent_file))
start = datetime.datetime.now()
# add to torrent dict (effectively enqueues for torrent start)
entity = {
'PartitionKey': _PARTITION_KEY,
'RowKey': resource_hash,
'Resource': resource,
'TorrentFileBase64': torrent_b64,
'TorrentFileSHA1': torrent_sha1,
'FileSizeBytes': file.stat().st_size,
# 'FileSHA1': compute_sha1_for_file(file),
}
_TORRENTS[resource] = {
'entity': entity,
'torrent_file': torrent_file,
'started': False,
'seed': True,
'loaded': True,
'registered': False,
}
_TORRENT_REVERSE_LOOKUP[resource_hash] = resource
# wait until torrent has started
print('waiting for torrent {} to start'.format(resource))
while not _TORRENTS[resource]['started']:
await asyncio.sleep(0.1)
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec for {} torrent to start'.format(diff, resource))
# cancel callback
if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING:
_CBHANDLES['queue_globalresources'].cancel()
_CBHANDLES.pop('queue_globalresources')
# release queue message
queue_client.update_message(
_STORAGE_CONTAINERS['queue_globalresources'],
message_id=msg.id,
pop_receipt=_QUEUE_MESSAGES[msg.id].pop_receipt,
visibility_timeout=0)
_QUEUE_MESSAGES.pop(msg.id)
print('queue message released for {}'.format(resource))
# remove resources from download list
_DIRECTDL.pop(resource)
# pull and save docker image in thread
thr = DockerSaveThread(queue_client, msg.content, msg.id)
thr.start()
def _merge_service(
@ -470,6 +502,32 @@ def bootstrap_dht_nodes(
loop.call_later(1, bootstrap_dht_nodes, loop, table_client, ipaddress)
class DockerLoadThread(threading.Thread):
def __init__(self, resource):
threading.Thread.__init__(self)
self.resource = resource
_TORRENTS[self.resource]['seed'] = True
_TORRENTS[self.resource]['loading'] = True
def run(self):
print('loading resource: {}'.format(self.resource))
resource_hash = hashlib.sha1(self.resource.encode('utf8')).hexdigest()
image = self.resource[
self.resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):]
file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash)
_record_perf('load-start', 'img={},size={}'.format(
image, file.stat().st_size))
start = datetime.datetime.now()
print('loading docker image {} from {}'.format(image, file))
subprocess.check_call(
'gunzip -c {} | docker load'.format(file), shell=True)
diff = (datetime.datetime.now() - start).total_seconds()
print('took {} sec to load docker image from {}'.format(diff, file))
_record_perf('load-end', 'img={},diff={}'.format(image, diff))
_TORRENTS[self.resource]['loading'] = False
_TORRENTS[self.resource]['loaded'] = True
async def _load_and_register_async(
loop: asyncio.BaseEventLoop,
table_client: azure.storage.table.TableService,
@ -482,41 +540,18 @@ async def _load_and_register_async(
if _TORRENTS[resource]['started']:
if _TORRENTS[resource]['handle'].is_seed():
# docker load image
if not _TORRENTS[resource]['loaded']:
resource_hash = hashlib.sha1(
resource.encode('utf8')).hexdigest()
image = resource[
resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):]
file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash)
await _record_perf_async(
loop, 'load-start', 'img={},size={}'.format(
image, file.stat().st_size))
start = datetime.datetime.now()
print('loading docker image {} from {}'.format(
image, file))
proc = await \
asyncio.subprocess.create_subprocess_shell(
'gunzip -c {} | docker load'.format(file),
loop=loop)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError(
'docker load non-zero rc: {}'.format(
proc.returncode))
_TORRENTS[resource]['loaded'] = True
diff = (datetime.datetime.now() -
start).total_seconds()
print(('took {} sec to load docker image '
'from {}').format(diff, file))
await _record_perf_async(
loop, 'load-end', 'img={},diff={}'.format(
image, diff))
if (not _TORRENTS[resource]['loaded'] and
not _TORRENTS[resource]['loading']):
thr = DockerLoadThread(resource)
thr.start()
# register to services table
if not _TORRENTS[resource]['registered']:
_merge_service(table_client, resource)
_TORRENTS[resource]['registered'] = True
else:
nfinished += 1
if (_TORRENTS[resource]['loaded'] and
not _TORRENTS[resource]['loading']):
if not _TORRENTS[resource]['registered']:
_merge_service(table_client, resource)
_TORRENTS[resource]['registered'] = True
else:
nfinished += 1
if not _GR_DONE and nfinished == nglobalresources:
await _record_perf_async(
loop, 'gr-done',
@ -535,6 +570,11 @@ async def manage_torrents_async(
if not _GR_DONE and not _LR_LOCK_ASYNC.locked():
asyncio.ensure_future(_load_and_register_async(
loop, table_client, nglobalresources))
# move pending torrents into torrents
with _PT_LOCK:
for pt in _PENDING_TORRENTS:
_TORRENTS[pt] = _PENDING_TORRENTS[pt]
_PENDING_TORRENTS.clear()
# start applicable torrent sessions
for resource in _TORRENTS:
if _TORRENTS[resource]['started']:
@ -586,6 +626,20 @@ async def download_monitor_async(
await asyncio.sleep(1)
def _get_torrent_num_seeds(
table_client: azure.storage.table.TableService,
resource: str) -> int:
try:
rk = hashlib.sha1(resource.encode('utf8')).hexdigest()
se = table_client.get_entity(
_STORAGE_CONTAINERS['table_services'],
_PARTITION_KEY, rk)
numseeds = len(se['VmList'].split(','))
except azure.common.AzureMissingResourceHttpError:
numseeds = 0
return numseeds
def _check_resource_has_torrent(
loop: asyncio.BaseEventLoop,
table_client: azure.storage.table.TableService,
@ -600,7 +654,8 @@ def _check_resource_has_torrent(
_PARTITION_KEY, rk)
except azure.common.AzureMissingResourceHttpError:
if add_to_dict:
_DIRECTDL[resource] = None
with _DIRECTDL_LOCK:
_DIRECTDL.append(resource)
return False
else:
# write torrent file to disk
@ -608,15 +663,17 @@ def _check_resource_has_torrent(
torrent_file = _TORRENT_DIR / '{}.torrent'.format(entity['RowKey'])
with open(str(torrent_file), 'wb') as f:
f.write(torrent)
_TORRENTS[resource] = {
'entity': entity,
'torrent_file': torrent_file,
'started': False,
'seed': False,
'loaded': False,
'registered': False,
}
_TORRENT_REVERSE_LOOKUP[entity['RowKey']] = resource
with _PT_LOCK:
_PENDING_TORRENTS[resource] = {
'entity': entity,
'torrent_file': torrent_file,
'started': False,
'seed': False,
'loaded': False,
'loading': False,
'registered': False,
}
_TORRENT_REVERSE_LOOKUP[entity['RowKey']] = resource
print('found torrent for resource {}'.format(resource))
return True

Просмотреть файл

@ -62,11 +62,16 @@ def _parse_message(msg):
return m
def _diff_events(data, nodeid, event, end_event, timing, prefix):
def _diff_events(data, nodeid, event, end_event, timing, prefix, sizes=None):
for i in range(0, len(data[nodeid][event])):
# torrent start -> load start may not always exist due to pull
if (event == 'cascade:torrent-start' and
end_event == 'cascade:load-start' and
end_event not in data[nodeid]):
return
# find end event for this img
subevent = data[nodeid][event][i]
img = subevent['message']['img']
# find end event for this img
found = False
for j in range(0, len(data[nodeid][end_event])):
pei = data[
@ -74,15 +79,21 @@ def _diff_events(data, nodeid, event, end_event, timing, prefix):
if pei == img:
timing[prefix + img] = _compute_delta_t(
data, nodeid, event, i, end_event, j)
if sizes is not None and img not in sizes:
if event == 'cascade:load-start':
sizes[img] = data[nodeid][event][j]['message']['size']
else:
sizes[img] = data[
nodeid][end_event][j]['message']['size']
found = True
break
if not found:
if not found and event != 'cascade:torrent-start':
raise RuntimeError(
'could not find corresponding event for {}:{}'.format(
subevent, img))
event, img))
def graph_data(table_client):
def coalesce_data(table_client):
print('graphing data from {} with pk={}'.format(
_TABLE_NAME, _PARTITION_KEY))
entities = table_client.query_entities(
@ -106,8 +117,8 @@ def graph_data(table_client):
ev['message'] = None
data[nodeid][event].append(ev)
del entities
sizes = {}
for nodeid in data:
print(nodeid)
# calculate dt timings
timing = {
'docker_install': _compute_delta_t(
@ -132,19 +143,33 @@ def graph_data(table_client):
_diff_events(
data, nodeid, event, 'cascade:pull-end', timing, 'pull:')
elif event == 'cascade:save-start':
pass
elif event == 'cascade:save-end':
# message will contain size info
pass
_diff_events(
data, nodeid, event, 'cascade:save-end', timing, 'save:',
sizes)
elif event == 'cascade:torrent-start':
pass
_diff_events(
data, nodeid, event, 'cascade:load-start', timing,
'torrent:')
elif event == 'cascade:load-start':
# load start also marks torrent-seed
# message will contain size info
pass
elif event == 'cascade:load-end':
pass
print(timing)
_diff_events(
data, nodeid, event, 'cascade:load-end', timing,
'load:', sizes)
data[nodeid].pop('cascade:pull-start', None)
data[nodeid].pop('cascade:pull-end', None)
data[nodeid].pop('cascade:save-start', None)
data[nodeid].pop('cascade:save-end', None)
data[nodeid].pop('cascade:torrent-start')
data[nodeid].pop('cascade:load-start', None)
data[nodeid].pop('cascade:load-end', None)
data[nodeid]['timing'] = timing
return data, sizes
def graph_data(data, sizes):
print(sizes)
for nodeid in data:
print(nodeid)
print(data[nodeid])
def merge_dict(dict1, dict2):
@ -189,7 +214,8 @@ def main():
# create storage credentials
table_client = _create_credentials(config)
# graph data
graph_data(table_client)
data, sizes = coalesce_data(table_client)
graph_data(data, sizes)
def parseargs():