Родитель
1fbad08ee5
Коммит
bc30023330
36
cascade.py
36
cascade.py
|
@ -53,6 +53,7 @@ _SEED_BIAS = 3
|
|||
_ALLOW_PUBLIC_PULL_WITH_PRIVATE = False
|
||||
_SAVELOAD_FILE_EXTENSION = 'tar.gz'
|
||||
_REGISTRY = None
|
||||
_RECORD_PERF = int(os.getenv('CASCADE_TIMING', default='0'))
|
||||
# mutable global state
|
||||
_CBHANDLES = {}
|
||||
_QUEUE_MESSAGES = {}
|
||||
|
@ -215,6 +216,8 @@ def scantree(path):
|
|||
|
||||
|
||||
async def _record_perf_async(loop, event, message):
|
||||
if not _RECORD_PERF:
|
||||
return
|
||||
proc = await asyncio.subprocess.create_subprocess_shell(
|
||||
'perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
|
||||
ev=event, pr=_PREFIX, msg=message), loop=loop)
|
||||
|
@ -225,6 +228,8 @@ async def _record_perf_async(loop, event, message):
|
|||
|
||||
|
||||
def _record_perf(event, message):
|
||||
if not _RECORD_PERF:
|
||||
return
|
||||
subprocess.check_call(
|
||||
'perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
|
||||
ev=event, pr=_PREFIX, msg=message), shell=True)
|
||||
|
@ -247,7 +252,7 @@ class DockerSaveThread(threading.Thread):
|
|||
def run(self):
|
||||
success = False
|
||||
try:
|
||||
self._pull_and_load()
|
||||
self._pull_and_save()
|
||||
success = True
|
||||
except Exception as ex:
|
||||
logger.exception(ex)
|
||||
|
@ -271,7 +276,7 @@ class DockerSaveThread(threading.Thread):
|
|||
_DIRECTDL_DOWNLOADING.remove(self.resource)
|
||||
_DIRECTDL.remove(self.resource)
|
||||
|
||||
def _pull_and_load(self):
|
||||
def _pull_and_save(self):
|
||||
if _REGISTRY is None:
|
||||
raise RuntimeError(
|
||||
('{} image specified for global resource, but there are '
|
||||
|
@ -536,8 +541,11 @@ def _merge_service(
|
|||
if_match=etag)
|
||||
entity = existing
|
||||
break
|
||||
except azure.common.AzureConflictHttpError:
|
||||
pass
|
||||
except azure.common.AzureHttpError as ex:
|
||||
if (ex.status_code == 412):
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
logger.info('entity {} merged to services table'.format(entity))
|
||||
global _GR_DONE
|
||||
if not _GR_DONE:
|
||||
|
@ -779,9 +787,15 @@ def _start_torrent_via_storage(
|
|||
return
|
||||
if entity is None:
|
||||
rk = hashlib.sha1(resource.encode('utf8')).hexdigest()
|
||||
entity = table_client.get_entity(
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'],
|
||||
_PARTITION_KEY, rk)
|
||||
# entity may not be populated yet, keep trying until ready
|
||||
while True:
|
||||
try:
|
||||
entity = table_client.get_entity(
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'],
|
||||
_PARTITION_KEY, rk)
|
||||
break
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
time.sleep(1)
|
||||
# retrive torrent file
|
||||
torrent_file = _TORRENT_DIR / '{}.torrent'.format(entity['RowKey'])
|
||||
tc, tp = entity['TorrentFileLocator'].split(',')
|
||||
|
@ -897,21 +911,21 @@ async def _get_ipaddress_async(loop: asyncio.BaseEventLoop) -> str:
|
|||
|
||||
def main():
|
||||
"""Main function"""
|
||||
global _ENABLE_P2P, _NON_P2P_CONCURRENT_DOWNLOADING
|
||||
global _ENABLE_P2P, _NON_P2P_CONCURRENT_DOWNLOADING, \
|
||||
_ALLOW_PUBLIC_PULL_WITH_PRIVATE
|
||||
# get command-line args
|
||||
args = parseargs()
|
||||
p2popts = args.p2popts.split(':')
|
||||
_ENABLE_P2P = p2popts[0] == 'true'
|
||||
_NON_P2P_CONCURRENT_DOWNLOADING = p2popts[1]
|
||||
_ALLOW_PUBLIC_PULL_WITH_PRIVATE = p2popts[4] == 'true'
|
||||
# set p2p options
|
||||
if _ENABLE_P2P:
|
||||
if not _LIBTORRENT_IMPORTED:
|
||||
raise ImportError('No module named \'libtorrent\'')
|
||||
global _COMPRESSION, _SEED_BIAS, _ALLOW_PUBLIC_PULL_WITH_PRIVATE, \
|
||||
_SAVELOAD_FILE_EXTENSION
|
||||
global _COMPRESSION, _SEED_BIAS, _SAVELOAD_FILE_EXTENSION
|
||||
_COMPRESSION = p2popts[3] == 'true'
|
||||
_SEED_BIAS = int(p2popts[2])
|
||||
_ALLOW_PUBLIC_PULL_WITH_PRIVATE = p2popts[4] == 'true'
|
||||
if not _COMPRESSION:
|
||||
_SAVELOAD_FILE_EXTENSION = 'tar'
|
||||
logger.info('peer-to-peer options: compression={} seedbias={}'.format(
|
||||
|
|
82
graph.py
82
graph.py
|
@ -6,6 +6,8 @@ import copy
|
|||
import datetime
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
# non-stdlib imports
|
||||
import azure.storage.table as azuretable
|
||||
|
||||
|
@ -158,6 +160,8 @@ def coalesce_data(table_client):
|
|||
'privateregistry:end', 0)
|
||||
except KeyError:
|
||||
timing['private_registry_setup'] = 0
|
||||
data[nodeid]['start'] = data[
|
||||
nodeid]['nodeprep:start'][0]['timestamp'].timestamp()
|
||||
data[nodeid].pop('nodeprep:start')
|
||||
data[nodeid].pop('nodeprep:end')
|
||||
data[nodeid].pop('privateregistry:start', None)
|
||||
|
@ -195,9 +199,83 @@ def coalesce_data(table_client):
|
|||
|
||||
def graph_data(data, sizes):
|
||||
print(sizes)
|
||||
# create data file
|
||||
dat_fname = _PARTITION_KEY.replace('$', '-') + '.dat'
|
||||
mintime = float(sys.maxsize)
|
||||
maxtime = 0.0
|
||||
rdata = {}
|
||||
for nodeid in data:
|
||||
print(nodeid)
|
||||
print(data[nodeid])
|
||||
start = data[nodeid]['start']
|
||||
if start in rdata:
|
||||
raise RuntimeError('cannot create reverse mapping')
|
||||
rdata[start] = nodeid
|
||||
if start < mintime:
|
||||
mintime = start
|
||||
if start > maxtime:
|
||||
maxtime = start
|
||||
print('delta:', maxtime - mintime)
|
||||
total_gr = 0
|
||||
total_ac = 0
|
||||
with open(dat_fname, 'w') as f:
|
||||
f.write(
|
||||
'NodePrepStartTime NodeId DockerInstall PrivateRegistrySetup '
|
||||
'GlobalResourcesLoad TotalPull TotalSave TotalLoad '
|
||||
'TotalTorrent\n')
|
||||
for start in sorted(rdata):
|
||||
nodeid = rdata[start]
|
||||
pull = 0
|
||||
save = 0
|
||||
load = 0
|
||||
torrent = 0
|
||||
for event in data[nodeid]['timing']:
|
||||
if event.startswith('pull:'):
|
||||
pull += data[nodeid]['timing'][event]
|
||||
elif event.startswith('save:'):
|
||||
save += data[nodeid]['timing'][event]
|
||||
elif event.startswith('load:'):
|
||||
load += data[nodeid]['timing'][event]
|
||||
elif event.startswith('torrent:'):
|
||||
torrent += data[nodeid]['timing'][event]
|
||||
acquisition = pull + torrent + load
|
||||
total_ac += acquisition
|
||||
print(nodeid, data[nodeid]['timing'])
|
||||
f.write(
|
||||
'{0} {1} {2} {3} {4} {5:.5f} {6:.5f} {7:.5f} {8:.5f}\n'.format(
|
||||
datetime.datetime.fromtimestamp(start).strftime(
|
||||
'%Y-%m-%d-%H:%M:%S.%f'),
|
||||
nodeid,
|
||||
data[nodeid]['timing']['docker_install'],
|
||||
data[nodeid]['timing']['private_registry_setup'],
|
||||
data[nodeid]['timing']['global_resources_loaded'],
|
||||
pull,
|
||||
save,
|
||||
load,
|
||||
torrent))
|
||||
total_gr += data[nodeid]['timing']['global_resources_loaded']
|
||||
print('total gr: {} avg: {}'.format(total_gr, total_gr / len(data)))
|
||||
print('total acq: {} avg: {}'.format(total_ac, total_ac / len(data)))
|
||||
# create plot file
|
||||
plot_fname = _PARTITION_KEY.replace('$', '-') + '.plot'
|
||||
with open(plot_fname, 'w') as f:
|
||||
f.write('set terminal pngcairo enhanced transparent crop\n')
|
||||
f.write('set key top left outside horizontal autotitle columnhead\n')
|
||||
f.write('set xtics rotate by 45 right font ", 8"\n')
|
||||
f.write('set ytics\n')
|
||||
f.write('set xlabel "Node Prep Start Time"\n')
|
||||
f.write('set ylabel "Seconds"\n')
|
||||
f.write('set format x "%H:%M:%.3S"\n')
|
||||
f.write('set xdata time\n')
|
||||
f.write('set timefmt "%Y-%m-%d-%H:%M:%S"\n')
|
||||
f.write('set style fill solid border -1\n')
|
||||
f.write('set boxwidth {0:.5f} absolute\n'.format(
|
||||
(maxtime - mintime) / 100.0))
|
||||
f.write('plot "{}" using 1:($3+$4+$5) with boxes lc rgb "red", \\\n'.format(
|
||||
dat_fname))
|
||||
f.write('\t"" using 1:($3+$4) with boxes lc rgb "yellow", \\\n')
|
||||
f.write('\t"" using 1:3 with boxes lc rgb "green"\n')
|
||||
png_fname = _PARTITION_KEY.replace('$', '-') + '.png'
|
||||
subprocess.check_call(
|
||||
'gnuplot {} > {}'.format(plot_fname, png_fname), shell=True)
|
||||
|
||||
|
||||
def merge_dict(dict1, dict2):
|
||||
|
|
|
@ -150,7 +150,7 @@ if [ $offer == "ubuntuserver" ]; then
|
|||
set -e
|
||||
# install azure storage python dependency
|
||||
pip3 install --no-cache-dir azure-storage
|
||||
if [ ! -f ".node_prep_finished" ]; then
|
||||
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
|
||||
./perf.py nodeprep start $prefix --ts $npstart --message "offer=$offer,sku=$sku"
|
||||
fi
|
||||
# install cascade dependencies
|
||||
|
@ -160,12 +160,12 @@ if [ $offer == "ubuntuserver" ]; then
|
|||
# install private registry if required
|
||||
if [ ! -z "$privatereg" ]; then
|
||||
# mark private registry start
|
||||
if [ ! -f ".node_prep_finished" ]; then
|
||||
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
|
||||
./perf.py privateregistry start $prefix --message "ipaddress=$ipaddress"
|
||||
fi
|
||||
./setup_private_registry.py $privatereg $ipaddress $prefix
|
||||
# mark private registry end
|
||||
if [ ! -f ".node_prep_finished" ]; then
|
||||
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
|
||||
./perf.py privateregistry end $prefix
|
||||
fi
|
||||
fi
|
||||
|
@ -181,13 +181,17 @@ fi
|
|||
|
||||
# mark node prep finished
|
||||
if [ ! -f ".node_prep_finished" ]; then
|
||||
./perf.py nodeprep end $prefix
|
||||
if [ ! -z ${CASCADE_TIMING+x} ]; then
|
||||
./perf.py nodeprep end $prefix
|
||||
fi
|
||||
# touch file to prevent subsequent perf recording if rebooted
|
||||
touch .node_prep_finished
|
||||
fi
|
||||
|
||||
# start cascade
|
||||
./perf.py cascade start $prefix
|
||||
if [ ! -z ${CASCADE_TIMING+x} ]; then
|
||||
./perf.py cascade start $prefix
|
||||
fi
|
||||
./cascade.py $p2p --ipaddress $ipaddress $prefix &
|
||||
# if not in p2p mode, then wait for cascade exit
|
||||
if [ -z "$p2p" ]; then
|
||||
|
|
42
shipyard.py
42
shipyard.py
|
@ -349,6 +349,10 @@ def add_pool(batch_client, blob_client, config):
|
|||
publisher = config['pool_specification']['publisher']
|
||||
offer = config['pool_specification']['offer']
|
||||
sku = config['pool_specification']['sku']
|
||||
try:
|
||||
perf = config['store_timing_metrics']
|
||||
except KeyError:
|
||||
perf = False
|
||||
# peer-to-peer settings
|
||||
try:
|
||||
p2p = config['data_replication']['peer_to_peer']['enabled']
|
||||
|
@ -359,8 +363,12 @@ def add_pool(batch_client, blob_client, config):
|
|||
try:
|
||||
p2psbias = config['data_replication'][
|
||||
'peer_to_peer']['direct_download_seed_bias']
|
||||
if p2psbias is None or p2psbias < 1:
|
||||
raise KeyError()
|
||||
except KeyError:
|
||||
p2psbias = 3
|
||||
p2psbias = config['pool_specification']['vm_count'] // 10
|
||||
if p2psbias < 1:
|
||||
p2psbias = 1
|
||||
try:
|
||||
p2pcomp = config[
|
||||
'data_replication']['peer_to_peer']['compression']
|
||||
|
@ -520,6 +528,10 @@ def add_pool(batch_client, blob_client, config):
|
|||
pool.start_task.environment_settings.append(
|
||||
batchmodels.EnvironmentSetting('DOCKER_LOGIN_PASSWORD', dockerpw)
|
||||
)
|
||||
if perf:
|
||||
pool.start_task.environment_settings.append(
|
||||
batchmodels.EnvironmentSetting('CASCADE_TIMING', '1')
|
||||
)
|
||||
# create pool if not exists
|
||||
try:
|
||||
logger.info('Attempting to create pool: {}'.format(pool.id))
|
||||
|
@ -1150,8 +1162,12 @@ def populate_queues(queue_client, table_client, config):
|
|||
try:
|
||||
p2pcsd = config['data_replication']['peer_to_peer'][
|
||||
'concurrent_source_downloads']
|
||||
if p2pcsd is None or p2pcsd < 1:
|
||||
raise KeyError()
|
||||
except KeyError:
|
||||
p2pcsd = 1
|
||||
p2pcsd = config['pool_specification']['vm_count'] // 6
|
||||
if p2pcsd < 1:
|
||||
p2pcsd = 1
|
||||
else:
|
||||
p2pcsd = 1
|
||||
# add global resources
|
||||
|
@ -1159,6 +1175,27 @@ def populate_queues(queue_client, table_client, config):
|
|||
queue_client, table_client, config, pk, p2pcsd, 'docker_images')
|
||||
|
||||
|
||||
def _adjust_settings_for_pool_creation(config):
|
||||
# type: (dict) -> None
|
||||
publisher = config['pool_specification']['publisher']
|
||||
vm_count = int(config['pool_specification']['vm_count'])
|
||||
try:
|
||||
p2p = config['data_replication']['peer_to_peer']['enabled']
|
||||
except KeyError:
|
||||
p2p = True
|
||||
max_vms = 20 if publisher.lower() == 'microsoftwindowsserver' else 40
|
||||
if p2p and vm_count > max_vms:
|
||||
logger.warning(
|
||||
('disabling peer-to-peer transfer as pool size of {} exceeds '
|
||||
'max limit of {} vms for inter-node communication').format(
|
||||
vm_count, max_vms))
|
||||
if 'data_replication' not in config:
|
||||
config['data_replication'] = {}
|
||||
if 'peer_to_peer' not in config['data_replication']:
|
||||
config['data_replication']['peer_to_peer'] = {}
|
||||
config['data_replication']['peer_to_peer']['enabled'] = False
|
||||
|
||||
|
||||
def merge_dict(dict1, dict2):
|
||||
# type: (dict, dict) -> dict
|
||||
"""Recursively merge dictionaries: dict2 on to dict1. This differs
|
||||
|
@ -1225,6 +1262,7 @@ def main():
|
|||
blob_client, queue_client, table_client, config)
|
||||
clear_storage_containers(
|
||||
blob_client, queue_client, table_client, config)
|
||||
_adjust_settings_for_pool_creation(config)
|
||||
populate_queues(queue_client, table_client, config)
|
||||
add_pool(batch_client, blob_client, config)
|
||||
elif args.action == 'resizepool':
|
||||
|
|
Загрузка…
Ссылка в новой задаче