Configuration changes
- Move more private registry settings from hardcode to config - Split config file into two
This commit is contained in:
Родитель
0271e54b1a
Коммит
d06d3649a0
45
cascade.py
45
cascade.py
|
@ -50,6 +50,7 @@ _STORAGE_CONTAINERS = {
|
|||
_SELF_REGISTRY_PTR = None
|
||||
_REGISTRIES = {}
|
||||
_TORRENTS = {}
|
||||
_TORRENT_REVERSE_LOOKUP = {}
|
||||
_DIRECTDL = {}
|
||||
_DHT_ROUTERS = []
|
||||
_LR_LOCK_ASYNC = asyncio.Lock()
|
||||
|
@ -94,19 +95,18 @@ def _create_credentials() -> tuple:
|
|||
return blob_client, queue_client, table_client
|
||||
|
||||
|
||||
def generate_torrent(incl_file: str) -> dict:
|
||||
def generate_torrent(incl_file: pathlib.Path) -> dict:
|
||||
"""Generate torrent file for a given file and write it to disk
|
||||
:param str incl_file: file to include in torrent
|
||||
:param pathlib.Path incl_file: file to include in torrent
|
||||
:rtype: tuple
|
||||
:return: (torrent file as pathlib, torrent file encoded as base64,
|
||||
torrent file data sha1 hash)
|
||||
"""
|
||||
fs = libtorrent.file_storage()
|
||||
libtorrent.add_files(fs, incl_file)
|
||||
libtorrent.add_files(fs, str(incl_file))
|
||||
tor = libtorrent.create_torrent(fs)
|
||||
tor.set_creator('libtorrent {}'.format(libtorrent.version))
|
||||
path = pathlib.Path(incl_file)
|
||||
libtorrent.set_piece_hashes(tor, str(path.parent))
|
||||
libtorrent.set_piece_hashes(tor, str(incl_file.parent))
|
||||
torrent = tor.generate()
|
||||
torrent_data = libtorrent.bencode(torrent)
|
||||
torrent_b64 = base64.b64encode(torrent_data).decode('ascii')
|
||||
|
@ -255,6 +255,7 @@ async def _direct_download_resources_async(
|
|||
file = None
|
||||
# download data
|
||||
resource = msg.content
|
||||
resource_hash = hashlib.sha1(resource.encode('utf8')).hexdigest()
|
||||
if resource.startswith(_DOCKER_TAG):
|
||||
if len(_REGISTRIES) < 1:
|
||||
raise RuntimeError(
|
||||
|
@ -299,9 +300,7 @@ async def _direct_download_resources_async(
|
|||
await _record_perf_async(loop, 'save-start', 'img={}'.format(
|
||||
image))
|
||||
start = datetime.datetime.now()
|
||||
file = _TORRENT_DIR / '{}.tar.gz'.format(image)
|
||||
print('creating path to store torrent: {}'.format(file.parent))
|
||||
file.parent.mkdir(parents=True, exist_ok=True)
|
||||
file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash)
|
||||
print('saving docker image {} to {} for seeding'.format(
|
||||
image, file))
|
||||
proc = await asyncio.subprocess.create_subprocess_shell(
|
||||
|
@ -323,7 +322,7 @@ async def _direct_download_resources_async(
|
|||
raise NotImplementedError()
|
||||
# generate torrent file
|
||||
start = datetime.datetime.now()
|
||||
future = loop.run_in_executor(None, generate_torrent, str(file))
|
||||
future = loop.run_in_executor(None, generate_torrent, file)
|
||||
torrent_file, torrent_b64, torrent_sha1 = await future
|
||||
diff = (datetime.datetime.now() - start).total_seconds()
|
||||
print('took {} sec to generate torrent file: {}'.format(
|
||||
|
@ -332,7 +331,7 @@ async def _direct_download_resources_async(
|
|||
# add to torrent dict (effectively enqueues for torrent start)
|
||||
entity = {
|
||||
'PartitionKey': _PARTITION_KEY,
|
||||
'RowKey': hashlib.sha1(resource.encode('utf8')).hexdigest(),
|
||||
'RowKey': resource_hash,
|
||||
'Resource': resource,
|
||||
'TorrentFileBase64': torrent_b64,
|
||||
'TorrentFileSHA1': torrent_sha1,
|
||||
|
@ -347,6 +346,7 @@ async def _direct_download_resources_async(
|
|||
'loaded': True,
|
||||
'registered': False,
|
||||
}
|
||||
_TORRENT_REVERSE_LOOKUP[resource_hash] = resource
|
||||
# wait until torrent has started
|
||||
print('waiting for torrent {} to start'.format(resource))
|
||||
while not _TORRENTS[resource]['started']:
|
||||
|
@ -411,9 +411,10 @@ def _merge_service(
|
|||
|
||||
def _get_torrent_info(resource, th):
|
||||
s = th.status()
|
||||
print(('%s wanted: %d %.2f%% complete (down: %.1f kB/s up: %.1f kB/s '
|
||||
print(('%s %s bytes: %d %.2f%% complete (down: %.1f kB/s up: %.1f kB/s '
|
||||
'peers: %d) %s') %
|
||||
(th.name(), s.total_wanted, s.progress * 100, s.download_rate / 1000,
|
||||
(_TORRENT_REVERSE_LOOKUP[th.name().split('.')[0]], th.name(),
|
||||
s.total_wanted, s.progress * 100, s.download_rate / 1000,
|
||||
s.upload_rate / 1000, s.num_peers, _TORRENT_STATE[s.state]))
|
||||
# ss = _TORRENT_SESSION.status()
|
||||
# print(_TORRENT_SESSION.is_dht_running(), ss.dht_global_nodes,
|
||||
|
@ -463,7 +464,7 @@ async def _load_and_register_async(
|
|||
loop: asyncio.BaseEventLoop,
|
||||
table_client: azure.storage.table.TableService,
|
||||
nglobalresources: int):
|
||||
global _LR_LOCK_ASYNC
|
||||
global _LR_LOCK_ASYNC, _GR_DONE
|
||||
async with _LR_LOCK_ASYNC:
|
||||
nfinished = 0
|
||||
for resource in _TORRENTS:
|
||||
|
@ -472,12 +473,14 @@ async def _load_and_register_async(
|
|||
if _TORRENTS[resource]['handle'].is_seed():
|
||||
# docker load image
|
||||
if not _TORRENTS[resource]['loaded']:
|
||||
resource_hash = hashlib.sha1(
|
||||
resource.encode('utf8')).hexdigest()
|
||||
image = resource[
|
||||
resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):]
|
||||
await _record_perf_async(
|
||||
loop, 'load-start', 'img={}'.format(image))
|
||||
start = datetime.datetime.now()
|
||||
file = _TORRENT_DIR / '{}.tar.gz'.format(image)
|
||||
file = _TORRENT_DIR / '{}.tar.gz'.format(resource_hash)
|
||||
print('loading docker image {} from {}'.format(
|
||||
image, file))
|
||||
proc = await \
|
||||
|
@ -507,7 +510,6 @@ async def _load_and_register_async(
|
|||
await _record_perf_async(
|
||||
loop, 'gr-done',
|
||||
'nglobalresources={}'.format(nglobalresources))
|
||||
global _GR_DONE
|
||||
_GR_DONE = True
|
||||
|
||||
|
||||
|
@ -516,10 +518,10 @@ async def manage_torrents_async(
|
|||
table_client: azure.storage.table.TableService,
|
||||
ipaddress: str,
|
||||
nglobalresources: int):
|
||||
global _LR_LOCK_ASYNC
|
||||
global _LR_LOCK_ASYNC, _GR_DONE
|
||||
while True:
|
||||
# async schedule load and register
|
||||
if not _LR_LOCK_ASYNC.locked():
|
||||
if not _GR_DONE and not _LR_LOCK_ASYNC.locked():
|
||||
asyncio.ensure_future(_load_and_register_async(
|
||||
loop, table_client, nglobalresources))
|
||||
# start applicable torrent sessions
|
||||
|
@ -532,11 +534,8 @@ async def manage_torrents_async(
|
|||
print(('creating torrent session for {} ipaddress={} '
|
||||
'seed={}').format(resource, ipaddress, seed))
|
||||
image = resource[resource.find(_DOCKER_TAG) + len(_DOCKER_TAG):]
|
||||
parent = (_TORRENT_DIR / image).parent
|
||||
print('creating torrent download directory: {}'.format(parent))
|
||||
parent.mkdir(parents=True, exist_ok=True)
|
||||
_TORRENTS[resource]['handle'] = create_torrent_session(
|
||||
resource, parent, seed)
|
||||
resource, _TORRENT_DIR, seed)
|
||||
await _record_perf_async(loop, 'torrent-start', 'img={}'.format(
|
||||
image))
|
||||
del image
|
||||
|
@ -590,8 +589,7 @@ def _check_resource_has_torrent(
|
|||
else:
|
||||
# write torrent file to disk
|
||||
torrent = base64.b64decode(entity['TorrentFileBase64'])
|
||||
torrent_file = _TORRENT_DIR / '{}.torrent'.format(
|
||||
entity['TorrentFileSHA1'])
|
||||
torrent_file = _TORRENT_DIR / '{}.torrent'.format(entity['RowKey'])
|
||||
with open(str(torrent_file), 'wb') as f:
|
||||
f.write(torrent)
|
||||
_TORRENTS[resource] = {
|
||||
|
@ -602,6 +600,7 @@ def _check_resource_has_torrent(
|
|||
'loaded': False,
|
||||
'registered': False,
|
||||
}
|
||||
_TORRENT_REVERSE_LOOKUP[entity['RowKey']] = resource
|
||||
print('found torrent for resource {}'.format(resource))
|
||||
return True
|
||||
|
||||
|
|
|
@ -6,13 +6,17 @@ offer=
|
|||
p2p=0
|
||||
prefix=
|
||||
privatereg=
|
||||
privateregarchive=
|
||||
privateregimageid=
|
||||
sku=
|
||||
|
||||
while getopts "h?o:p:r:s:t" opt; do
|
||||
while getopts "h?a:i:o:p:r:s:t" opt; do
|
||||
case "$opt" in
|
||||
h|\?)
|
||||
echo "nodeprep.sh parameters"
|
||||
echo ""
|
||||
echo "-a [registry archive] registry archive file"
|
||||
echo "-i [registry image id] registry image id"
|
||||
echo "-o [offer] VM offer"
|
||||
echo "-p [prefix] storage container prefix"
|
||||
echo "-r [container] enable private registry"
|
||||
|
@ -21,6 +25,12 @@ while getopts "h?o:p:r:s:t" opt; do
|
|||
echo ""
|
||||
exit 1
|
||||
;;
|
||||
a)
|
||||
privateregarchive="--regarchive $OPTARG"
|
||||
;;
|
||||
i)
|
||||
privateregimageid="--regimageid $OPTARG"
|
||||
;;
|
||||
o)
|
||||
offer=${OPTARG,,}
|
||||
;;
|
||||
|
@ -110,7 +120,7 @@ if [ $offer == "ubuntuserver" ]; then
|
|||
if [ ! -z "$privatereg" ]; then
|
||||
# mark private registry start
|
||||
./perf.py privateregistry start $prefix --message "ipaddress=$ipaddress"
|
||||
./setup_private_registry.py $offer $sku $ipaddress $prefix $privatereg
|
||||
./setup_private_registry.py $offer $sku $ipaddress $prefix $privatereg $privateregarchive $privateregimageid
|
||||
rc=$?
|
||||
./perf.py privateregistry end $prefix
|
||||
# mark private registry end
|
||||
|
|
|
@ -14,8 +14,6 @@ import azure.storage.table as azuretable
|
|||
|
||||
# global defines
|
||||
_DEFAULT_PRIVATE_REGISTRY_PORT = 5000
|
||||
_REGISTRY_ARCHIVE = 'docker-registry-v2.tar.gz'
|
||||
_REGISTRY_IMAGE_ID = '8ff6a4aae657'
|
||||
_STORAGEACCOUNT = os.environ['PRIVATE_REGISTRY_SA']
|
||||
_STORAGEACCOUNTKEY = os.environ['PRIVATE_REGISTRY_SAKEY']
|
||||
_BATCHACCOUNT = os.environ['AZ_BATCH_ACCOUNT_NAME']
|
||||
|
@ -86,10 +84,13 @@ def _renew_queue_message_lease(
|
|||
|
||||
|
||||
async def _start_private_registry_instance_async(
|
||||
loop: asyncio.BaseEventLoop, container: str):
|
||||
loop: asyncio.BaseEventLoop, container: str,
|
||||
registry_archive: str, registry_image_id: str):
|
||||
"""Start private docker registry instance
|
||||
:param asyncio.BaseEventLoop loop: event loop
|
||||
:param str container: storage container holding registry info
|
||||
:param str registry_archive: registry archive file
|
||||
:param str registry_image_id: registry image id
|
||||
"""
|
||||
proc = await asyncio.subprocess.create_subprocess_shell(
|
||||
'docker images | grep -E \'^registry.*2\' | awk -e \'{print $3}\'',
|
||||
|
@ -98,13 +99,12 @@ async def _start_private_registry_instance_async(
|
|||
if proc.returncode != 0:
|
||||
raise RuntimeError('docker images non-zero rc: {}'.format(
|
||||
proc.returncode))
|
||||
if (stdout[0].strip() != _REGISTRY_IMAGE_ID and
|
||||
pathlib.Path(_REGISTRY_ARCHIVE).exists()):
|
||||
if (stdout[0].strip() != registry_image_id and
|
||||
pathlib.Path(registry_archive).exists()):
|
||||
print('importing registry from local file: {}'.format(
|
||||
_REGISTRY_ARCHIVE))
|
||||
registry_archive))
|
||||
proc = await asyncio.subprocess.create_subprocess_shell(
|
||||
'gunzip -c {} | docker load'.format(
|
||||
_REGISTRY_ARCHIVE), loop=loop)
|
||||
'gunzip -c {} | docker load'.format(registry_archive), loop=loop)
|
||||
await proc.wait()
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError('docker load non-zero rc: {}'.format(
|
||||
|
@ -135,20 +135,24 @@ async def setup_private_registry_async(
|
|||
loop: asyncio.BaseEventLoop,
|
||||
queue_client: azure.storage.queue.QueueService,
|
||||
table_client: azure.storage.table.TableService,
|
||||
ipaddress: str, container: str):
|
||||
ipaddress: str, container: str,
|
||||
registry_archive: str, registry_image_id: str):
|
||||
"""Set up a docker private registry if a ticket exists
|
||||
:param asyncio.BaseEventLoop loop: event loop
|
||||
:param azure.storage.queue.QueueService queue_client: queue client
|
||||
:param azure.storage.table.TableService table_client: table client
|
||||
:param str ipaddress: ip address
|
||||
:param str container: container holding registry
|
||||
:param str registry_archive: registry archive file
|
||||
:param str registry_image_id: registry image id
|
||||
"""
|
||||
# first check if we've registered before
|
||||
try:
|
||||
entity = table_client.get_entity(
|
||||
_STORAGE_CONTAINERS['table_registry'], _PARTITION_KEY, ipaddress)
|
||||
print('private registry row already exists: {}'.format(entity))
|
||||
await _start_private_registry_instance_async(loop, container)
|
||||
await _start_private_registry_instance_async(
|
||||
loop, container, registry_archive, registry_image_id)
|
||||
return
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
pass
|
||||
|
@ -179,7 +183,8 @@ async def setup_private_registry_async(
|
|||
15, _renew_queue_message_lease, loop, queue_client,
|
||||
'queue_registry', msg.id)
|
||||
# install docker registy container
|
||||
await _start_private_registry_instance_async(loop, container)
|
||||
await _start_private_registry_instance_async(
|
||||
loop, container, registry_archive, registry_image_id)
|
||||
entity = {
|
||||
'PartitionKey': _PARTITION_KEY,
|
||||
'RowKey': ipaddress,
|
||||
|
@ -292,7 +297,8 @@ def main():
|
|||
|
||||
# set up private registry
|
||||
loop.run_until_complete(setup_private_registry_async(
|
||||
loop, queue_client, table_client, args.ipaddress, args.container))
|
||||
loop, queue_client, table_client, args.ipaddress, args.container,
|
||||
args.regarchive, args.regimageid))
|
||||
|
||||
# get private registries
|
||||
registries = get_private_registries(table_client)
|
||||
|
@ -324,11 +330,14 @@ def parseargs():
|
|||
'sku', help='vm sku')
|
||||
parser.add_argument(
|
||||
'ipaddress', nargs='?', default=None, help='ip address')
|
||||
parser.add_argument(
|
||||
'--regarchive', help='private registry archive')
|
||||
parser.add_argument(
|
||||
'--regimageid', help='private registry image id')
|
||||
parser.add_argument(
|
||||
'--prefix', help='storage container prefix')
|
||||
parser.add_argument(
|
||||
'--container',
|
||||
help='private registry container name')
|
||||
'--container', help='private registry container name')
|
||||
return parser.parse_args()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
173
shipyard.py
173
shipyard.py
|
@ -2,6 +2,7 @@
|
|||
|
||||
# stdlib imports
|
||||
import argparse
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
import hashlib
|
||||
|
@ -33,29 +34,24 @@ _STORAGE_CONTAINERS = {
|
|||
'queue_registry': None,
|
||||
'queue_globalresources': None,
|
||||
}
|
||||
_REGISTRY_FILENAME = 'docker-registry-v2.tar.gz'
|
||||
_NODEPREP_FILE = ('nodeprep.sh', 'scripts/nodeprep.sh')
|
||||
_CASCADE_FILE = ('cascade.py', 'cascade.py')
|
||||
_SETUP_PR_FILE = ('setup_private_registry.py', 'setup_private_registry.py')
|
||||
_PERF_FILE = ('perf.py', 'perf.py')
|
||||
_REGISTRY_FILE = (
|
||||
_REGISTRY_FILENAME, 'resources/{}'.format(_REGISTRY_FILENAME)
|
||||
)
|
||||
_REGISTRY_FILE = None
|
||||
|
||||
|
||||
def _populate_global_settings(config: dict):
|
||||
"""Populate global settings from config
|
||||
:param dict config: configuration dict
|
||||
"""
|
||||
global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _BATCHACCOUNTKEY
|
||||
_STORAGEACCOUNT = config[
|
||||
'global_settings']['credentials']['storage_account']
|
||||
_STORAGEACCOUNTKEY = config[
|
||||
'global_settings']['credentials']['storage_account_key']
|
||||
_BATCHACCOUNTKEY = config[
|
||||
'global_settings']['credentials']['batch_account_key']
|
||||
global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _BATCHACCOUNTKEY, \
|
||||
_REGISTRY_FILE
|
||||
_STORAGEACCOUNT = config['credentials']['storage_account']
|
||||
_STORAGEACCOUNTKEY = config['credentials']['storage_account_key']
|
||||
_BATCHACCOUNTKEY = config['credentials']['batch_account_key']
|
||||
try:
|
||||
sep = config['global_settings']['storage_entity_prefix']
|
||||
sep = config['storage_entity_prefix']
|
||||
except KeyError:
|
||||
sep = None
|
||||
if sep is None:
|
||||
|
@ -69,12 +65,22 @@ def _populate_global_settings(config: dict):
|
|||
_STORAGE_CONTAINERS['table_perf'] = sep + 'perf'
|
||||
_STORAGE_CONTAINERS['queue_registry'] = '-'.join(
|
||||
(sep + 'registry',
|
||||
config['global_settings']['credentials']['batch_account'].lower(),
|
||||
config['addpool']['poolspec']['id'].lower()))
|
||||
config['credentials']['batch_account'].lower(),
|
||||
config['poolspec']['id'].lower()))
|
||||
_STORAGE_CONTAINERS['queue_globalresources'] = '-'.join(
|
||||
(sep + 'globalresources',
|
||||
config['global_settings']['credentials']['batch_account'].lower(),
|
||||
config['addpool']['poolspec']['id'].lower()))
|
||||
config['credentials']['batch_account'].lower(),
|
||||
config['poolspec']['id'].lower()))
|
||||
try:
|
||||
rf = config['docker_registry']['private']['docker_save_registry_file']
|
||||
_REGISTRY_FILE = (
|
||||
pathlib.Path(rf).name,
|
||||
rf,
|
||||
config['docker_registry']['private'][
|
||||
'docker_save_registry_image_id']
|
||||
)
|
||||
except Exception:
|
||||
_REGISTRY_FILE = (None, None, None)
|
||||
|
||||
|
||||
def _wrap_commands_in_shell(commands: List[str], wait: bool=True) -> str:
|
||||
|
@ -95,29 +101,26 @@ def _create_credentials(config: dict) -> tuple:
|
|||
:return: (batch client, blob client, queue client, table client)
|
||||
"""
|
||||
credentials = batchauth.SharedKeyCredentials(
|
||||
config['global_settings']['credentials']['batch_account'],
|
||||
config['credentials']['batch_account'],
|
||||
_BATCHACCOUNTKEY)
|
||||
batch_client = batch.BatchServiceClient(
|
||||
credentials,
|
||||
base_url='https://{}.{}.{}'.format(
|
||||
config['global_settings']['credentials']['batch_account'],
|
||||
config['global_settings']['credentials']['batch_account_region'],
|
||||
config['global_settings']['credentials']['batch_endpoint']))
|
||||
config['credentials']['batch_account'],
|
||||
config['credentials']['batch_account_region'],
|
||||
config['credentials']['batch_endpoint']))
|
||||
blob_client = azureblob.BlockBlobService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config[
|
||||
'global_settings']['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
queue_client = azurequeue.QueueService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config[
|
||||
'global_settings']['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
table_client = azuretable.TableService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config[
|
||||
'global_settings']['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
return batch_client, blob_client, queue_client, table_client
|
||||
|
||||
|
||||
|
@ -132,12 +135,15 @@ def upload_resource_files(
|
|||
"""
|
||||
sas_urls = {}
|
||||
for file in files:
|
||||
# skip if no file is specified
|
||||
if file[0] is None:
|
||||
continue
|
||||
upload = True
|
||||
if file[0] == _REGISTRY_FILENAME:
|
||||
if file[0] == _REGISTRY_FILE[0]:
|
||||
fp = pathlib.Path(file[1])
|
||||
if not fp.exists():
|
||||
print('skipping optional docker registry image: {}'.format(
|
||||
_REGISTRY_FILENAME))
|
||||
_REGISTRY_FILE[0]))
|
||||
continue
|
||||
else:
|
||||
# check if blob exists
|
||||
|
@ -145,11 +151,11 @@ def upload_resource_files(
|
|||
prop = blob_client.get_blob_properties(
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0])
|
||||
# TODO use MD5 instead
|
||||
if (prop.name == _REGISTRY_FILENAME and
|
||||
if (prop.name == _REGISTRY_FILE[0] and
|
||||
prop.properties.content_length ==
|
||||
fp.stat().st_size):
|
||||
print(('remote file size is the same '
|
||||
'for {}, skipping').format(_REGISTRY_FILENAME))
|
||||
'for {}, skipping').format(_REGISTRY_FILE[0]))
|
||||
upload = False
|
||||
except azure.common.AzureMissingResourceHttpError:
|
||||
pass
|
||||
|
@ -159,7 +165,7 @@ def upload_resource_files(
|
|||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0], file[1])
|
||||
sas_urls[file[0]] = 'https://{}.blob.{}/{}/{}?{}'.format(
|
||||
_STORAGEACCOUNT,
|
||||
config['global_settings']['credentials']['storage_endpoint'],
|
||||
config['credentials']['storage_endpoint'],
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
|
||||
blob_client.generate_blob_shared_access_signature(
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
|
||||
|
@ -179,26 +185,26 @@ def add_pool(
|
|||
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
||||
:param dict config: configuration dict
|
||||
"""
|
||||
publisher = config['addpool']['poolspec']['publisher']
|
||||
offer = config['addpool']['poolspec']['offer']
|
||||
sku = config['addpool']['poolspec']['sku']
|
||||
publisher = config['poolspec']['publisher']
|
||||
offer = config['poolspec']['offer']
|
||||
sku = config['poolspec']['sku']
|
||||
try:
|
||||
p2p = config['addpool']['peer_to_peer']['enabled']
|
||||
p2p = config['peer_to_peer']['enabled']
|
||||
except KeyError:
|
||||
p2p = True
|
||||
try:
|
||||
preg = 'private' in config['addpool']['docker_registry']
|
||||
pcont = config['addpool']['docker_registry']['private']['container']
|
||||
preg = 'private' in config['docker_registry']
|
||||
pcont = config['docker_registry']['private']['container']
|
||||
except KeyError:
|
||||
preg = False
|
||||
try:
|
||||
dockeruser = config['addpool']['docker_registry']['login']['username']
|
||||
dockerpw = config['addpool']['docker_registry']['login']['password']
|
||||
dockeruser = config['docker_registry']['login']['username']
|
||||
dockerpw = config['docker_registry']['login']['password']
|
||||
except KeyError:
|
||||
dockeruser = None
|
||||
dockerpw = None
|
||||
try:
|
||||
prefix = config['global_settings']['storage_entity_prefix']
|
||||
prefix = config['storage_entity_prefix']
|
||||
if len(prefix) == 0:
|
||||
prefix = None
|
||||
except KeyError:
|
||||
|
@ -226,18 +232,22 @@ def add_pool(
|
|||
)
|
||||
# create pool param
|
||||
pool = batchmodels.PoolAddParameter(
|
||||
id=config['addpool']['poolspec']['id'],
|
||||
id=config['poolspec']['id'],
|
||||
virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
|
||||
image_reference=image_ref_to_use,
|
||||
node_agent_sku_id=sku_to_use.id),
|
||||
vm_size=config['addpool']['poolspec']['vm_size'],
|
||||
target_dedicated=config['addpool']['poolspec']['vm_count'],
|
||||
vm_size=config['poolspec']['vm_size'],
|
||||
target_dedicated=config['poolspec']['vm_count'],
|
||||
enable_inter_node_communication=True,
|
||||
start_task=batchmodels.StartTask(
|
||||
command_line='{} -o {} -s {}{}{}{}'.format(
|
||||
command_line='{} -o {} -s {}{}{}{}{}{}'.format(
|
||||
_NODEPREP_FILE[0], offer, sku,
|
||||
' -p {}'.format(prefix) if prefix else '',
|
||||
' -r {}'.format(pcont) if preg else '',
|
||||
' -a {}'.format(_REGISTRY_FILE[0])
|
||||
if _REGISTRY_FILE[0] else '',
|
||||
' -i {}'.format(_REGISTRY_FILE[2])
|
||||
if _REGISTRY_FILE[2] else '',
|
||||
' -t' if p2p else ''
|
||||
),
|
||||
run_elevated=True,
|
||||
|
@ -261,14 +271,12 @@ def add_pool(
|
|||
pool.start_task.environment_settings.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
'PRIVATE_REGISTRY_SA',
|
||||
config['addpool']['docker_registry'][
|
||||
'private']['storage_account'])
|
||||
config['docker_registry']['private']['storage_account'])
|
||||
)
|
||||
pool.start_task.environment_settings.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
'PRIVATE_REGISTRY_SAKEY',
|
||||
config['addpool']['docker_registry'][
|
||||
'private']['storage_account_key'])
|
||||
config['docker_registry']['private']['storage_account_key'])
|
||||
)
|
||||
if (dockeruser is not None and len(dockeruser) > 0 and
|
||||
dockerpw is not None and len(dockerpw) > 0):
|
||||
|
@ -414,8 +422,8 @@ def _clear_table(table_client, table_name, config):
|
|||
print('clearing table: {}'.format(table_name))
|
||||
ents = table_client.query_entities(
|
||||
table_name, filter='PartitionKey eq \'{}${}\''.format(
|
||||
config['global_settings']['credentials']['batch_account'],
|
||||
config['addpool']['poolspec']['id'])
|
||||
config['credentials']['batch_account'],
|
||||
config['poolspec']['id'])
|
||||
)
|
||||
# batch delete entities
|
||||
i = 0
|
||||
|
@ -458,12 +466,12 @@ def create_storage_containers(blob_client, queue_client, table_client, config):
|
|||
|
||||
def populate_queues(queue_client, table_client, config):
|
||||
try:
|
||||
use_hub = 'private' not in config['addpool']['docker_registry']
|
||||
use_hub = 'private' not in config['docker_registry']
|
||||
except KeyError:
|
||||
use_hub = True
|
||||
pk = '{}${}'.format(
|
||||
config['global_settings']['credentials']['batch_account'],
|
||||
config['addpool']['poolspec']['id'])
|
||||
config['credentials']['batch_account'],
|
||||
config['poolspec']['id'])
|
||||
# if using docker public hub, then populate registry table with hub
|
||||
if use_hub:
|
||||
table_client.insert_or_replace_entity(
|
||||
|
@ -476,12 +484,18 @@ def populate_queues(queue_client, table_client, config):
|
|||
)
|
||||
else:
|
||||
# populate registry queue
|
||||
for i in range(0, 3):
|
||||
try:
|
||||
nregistries = config['docker_registry']['private']['replication']
|
||||
if nregistries < 1:
|
||||
nregistries = 1
|
||||
except Exception:
|
||||
nregistries = 1
|
||||
for i in range(0, nregistries):
|
||||
queue_client.put_message(
|
||||
_STORAGE_CONTAINERS['queue_registry'], 'create-{}'.format(i))
|
||||
# populate global resources
|
||||
try:
|
||||
for gr in config['addpool']['global_resources']:
|
||||
for gr in config['global_resources']:
|
||||
table_client.insert_or_replace_entity(
|
||||
_STORAGE_CONTAINERS['table_globalresources'],
|
||||
{
|
||||
|
@ -496,18 +510,48 @@ def populate_queues(queue_client, table_client, config):
|
|||
pass
|
||||
|
||||
|
||||
def merge_dict(dict1, dict2):
|
||||
"""Recursively merge dictionaries: dict2 on to dict1. This differs
|
||||
from dict.update() in that values that are dicts are recursively merged.
|
||||
Note that only dict value types are merged, not lists, etc.
|
||||
|
||||
Code adapted from:
|
||||
https://www.xormedia.com/recursively-merge-dictionaries-in-python/
|
||||
|
||||
:param dict dict1: dictionary to merge to
|
||||
:param dict dict2: dictionary to merge with
|
||||
:rtype: dict
|
||||
:return: merged dictionary
|
||||
"""
|
||||
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
|
||||
raise ValueError('dict1 or dict2 is not a dictionary')
|
||||
result = copy.deepcopy(dict1)
|
||||
for k, v in dict2.items():
|
||||
if k in result and isinstance(result[k], dict):
|
||||
result[k] = merge_dict(result[k], v)
|
||||
else:
|
||||
result[k] = copy.deepcopy(v)
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function"""
|
||||
# get command-line args
|
||||
args = parseargs()
|
||||
args.action = args.action.lower()
|
||||
|
||||
if args.json is not None:
|
||||
with open(args.json, 'r') as f:
|
||||
config = json.load(f)
|
||||
print('config:')
|
||||
print(json.dumps(config, indent=4))
|
||||
_populate_global_settings(config)
|
||||
if args.settings is None:
|
||||
raise ValueError('global settings not specified')
|
||||
if args.config is None:
|
||||
raise ValueError('config settings for action not specified')
|
||||
|
||||
with open(args.settings, 'r') as f:
|
||||
config = json.load(f)
|
||||
with open(args.config, 'r') as f:
|
||||
config = merge_dict(config, json.load(f))
|
||||
print('config:')
|
||||
print(json.dumps(config, indent=4))
|
||||
_populate_global_settings(config)
|
||||
|
||||
batch_client, blob_client, queue_client, table_client = \
|
||||
_create_credentials(config)
|
||||
|
@ -554,8 +598,11 @@ def parseargs():
|
|||
'action', help='action: addpool, addjob, addtask, delpool, deljob, '
|
||||
'delalljobs, grl, delstorage, clearstorage')
|
||||
parser.add_argument(
|
||||
'--json',
|
||||
help='json file config for option. required for all add actions')
|
||||
'--settings',
|
||||
help='global settings json file config. required for all actions')
|
||||
parser.add_argument(
|
||||
'--config',
|
||||
help='json file config for option. required for all actions')
|
||||
parser.add_argument('--poolid', help='pool id')
|
||||
parser.add_argument('--jobid', help='job id')
|
||||
return parser.parse_args()
|
||||
|
|
Загрузка…
Ссылка в новой задаче