Support shipyard as docker container

- Add Dockerfile
- Add command file for container
This commit is contained in:
Fred Park 2016-08-26 15:03:56 -07:00
Родитель bc30023330
Коммит aa4add34d5
6 изменённых файлов: 240 добавлений и 68 удалений

38
Dockerfile Normal file
Просмотреть файл

@ -0,0 +1,38 @@
# Dockerfile for Azure/batch-shipyard
FROM gliderlabs/alpine:3.4
WORKDIR /tmp
# set environment variables
ENV libtorrent_version=1.1.1 libtorrent_version_path=1_1_1
ENV CFLAGS=-lstdc++ PYTHON=/usr/bin/python3 PYTHON_VERSION=3.5
# add base packages, python dependencies, create script directory,
# build libtorrent-rasterbar for python3 and cleanup packaging
RUN apk update \
&& apk add --update --no-cache \
musl build-base python3 python3-dev openssl-dev ca-certificates \
boost boost-dev boost-python3 file curl tar pigz docker bash \
&& pip3 install --no-cache-dir --upgrade pip azure-storage==0.32.0 \
&& curl -SL https://github.com/arvidn/libtorrent/releases/download/libtorrent-${libtorrent_version_path}/libtorrent-rasterbar-${libtorrent_version}.tar.gz -o libtorrent-${libtorrent_version}.tar.gz \
&& tar zxvpf libtorrent-${libtorrent_version}.tar.gz \
&& cd libtorrent-rasterbar-${libtorrent_version} \
&& ./configure --prefix=/usr --enable-debug=no --enable-python-binding --with-boost-system=boost_system \
&& make -j4 install \
&& ldconfig /usr/lib \
&& cd .. \
&& rm -rf libtorrent-rasterbar-${libtorrent_version} \
&& rm -f zxvpf libtorrent-${libtorrent_version}.tar.gz \
&& apk del --purge \
build-base python3-dev openssl-dev python boost-dev file curl \
&& apk add --no-cache boost-random \
&& rm /var/cache/apk/* \
&& mkdir -p /opt/batch-shipyard
# copy in files
COPY cascade.py setup_private_registry.py perf.py scripts/docker_cascade.sh /opt/batch-shipyard/
# set command
CMD ["/opt/batch-shipyard/docker_cascade.sh"]

Просмотреть файл

@ -77,20 +77,23 @@ _GR_DONE = False
_LAST_DHT_INFO_DUMP = None
def _setup_logger():
def _setup_logger() -> None:
"""Set up logger"""
logger.setLevel(logging.DEBUG)
logloc = pathlib.Path(
os.environ['AZ_BATCH_TASK_WORKING_DIR'],
'cascade.log')
handler = logging.handlers.RotatingFileHandler(
'cascade.log', maxBytes=10485760, backupCount=5)
str(logloc), maxBytes=10485760, backupCount=5)
formatter = logging.Formatter(
'%(asctime)s.%(msecs)03dZ %(levelname)s %(filename)s::%(funcName)s:'
'%(lineno)d %(process)d:%(threadName)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info('logger initialized')
logger.info('logger initialized, log file: {}'.format(logloc))
def _setup_container_names(sep: str):
def _setup_container_names(sep: str) -> None:
"""Set up storage container names
:param str sep: storage container prefix
"""
@ -219,7 +222,7 @@ async def _record_perf_async(loop, event, message):
if not _RECORD_PERF:
return
proc = await asyncio.subprocess.create_subprocess_shell(
'perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
'./perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
ev=event, pr=_PREFIX, msg=message), loop=loop)
await proc.wait()
if proc.returncode != 0:
@ -231,7 +234,7 @@ def _record_perf(event, message):
if not _RECORD_PERF:
return
subprocess.check_call(
'perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
'./perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
ev=event, pr=_PREFIX, msg=message), shell=True)
@ -565,13 +568,15 @@ def _merge_service(
'gr-done',
'nglobalresources={}'.format(nglobalresources))
_GR_DONE = True
logger.info('all {} global resources loaded'.format(
nglobalresources))
def _get_torrent_info(resource, th):
global _LAST_DHT_INFO_DUMP
s = th.status()
if (s.download_rate > 0 or s.upload_rate > 0 or s.num_peers > 0 or
(s.progress - 1.0) > 1e-6):
(1.0 - s.progress) > 1e-6):
logger.debug(
('{name} {file} bytes={bytes} state={state} '
'completion={completion:.2f}% peers={peers} '
@ -583,7 +588,7 @@ def _get_torrent_info(resource, th):
up=s.upload_rate / 1000)))
now = datetime.datetime.utcnow()
if (_LAST_DHT_INFO_DUMP is None or
now > _LAST_DHT_INFO_DUMP + datetime.timedelta(minutes=1)):
now > _LAST_DHT_INFO_DUMP + datetime.timedelta(minutes=2)):
_LAST_DHT_INFO_DUMP = now
ss = _TORRENT_SESSION.status()
logger.debug(
@ -961,7 +966,9 @@ def main():
# set registry
global _REGISTRY
if pathlib.Path('.cascade_private_registry.txt').exists():
if pathlib.Path(
os.environ['AZ_BATCH_TASK_WORKING_DIR'],
'.cascade_private_registry.txt').exists():
_REGISTRY = 'localhost:5000'
else:
_REGISTRY = 'registry.hub.docker.com'
@ -990,8 +997,6 @@ def parseargs():
'--ipaddress', help='ip address')
parser.add_argument(
'--prefix', help='storage container prefix')
parser.add_argument(
'--privateregcount', help='private registry count')
parser.add_argument(
'--no-torrent', action='store_false', dest='torrent',
help='disable peer-to-peer transfer')

45
scripts/docker_cascade.sh Executable file
Просмотреть файл

@ -0,0 +1,45 @@
#!/usr/bin/env bash
set -e
set -o pipefail
# ensure we're in the proper directory
cd /opt/batch-shipyard
# add timing markers
if [ ! -z ${CASCADE_TIMING+x} ]; then
# backfull docker run pull start
python3 perf.py cascade docker-run-pull-start $prefix --ts $drpstart
# mark docker run pull end
python3 perf.py cascade docker-run-pull-end $prefix
if [ ! -f ".node_prep_finished" ]; then
# backfill node prep start
python3 perf.py nodeprep start $prefix --ts $npstart --message "offer=$offer,sku=$sku"
# mark private registry start
python3 perf.py privateregistry start $prefix --message "ipaddress=$ipaddress"
fi
fi
# set up private registry
python3 setup_private_registry.py $privatereg $ipaddress $prefix
# login to docker hub
if [ ! -z ${DOCKER_LOGIN_USERNAME+x} ]; then
docker login -u $DOCKER_LOGIN_USERNAME -p $DOCKER_LOGIN_PASSWORD
fi
# add timing markers
if [ ! -z ${CASCADE_TIMING+x} ]; then
if [ ! -f ".node_prep_finished" ]; then
# mark private registry end
python3 perf.py privateregistry end $prefix
# mark node prep finished
python3 perf.py nodeprep end $prefix
fi
# mark cascade start time
python3 perf.py cascade start $prefix
fi
# execute cascade
python3 cascade.py $p2p --ipaddress $ipaddress $prefix

Просмотреть файл

@ -4,18 +4,20 @@ set -e
set -o pipefail
azurefile=0
cascadecontainer=0
offer=
p2p=
prefix=
privatereg=
sku=
while getopts "h?ao:p:r:s:t:" opt; do
while getopts "h?ado:p:r:s:t:" opt; do
case "$opt" in
h|\?)
echo "nodeprep.sh parameters"
echo ""
echo "-a install azurefile docker volume driver"
echo "-d use docker container for cascade"
echo "-o [offer] VM offer"
echo "-p [prefix] storage container prefix"
echo "-r [container:archive:image id] private registry"
@ -27,6 +29,9 @@ while getopts "h?ao:p:r:s:t:" opt; do
a)
azurefile=1
;;
d)
cascadecontainer=1
;;
o)
offer=${OPTARG,,}
;;
@ -109,7 +114,7 @@ if [ $offer == "ubuntuserver" ]; then
apt-get purge -y -q lxc-docker
fi
# install required software
apt-get install -y -q -o Dpkg::Options::="--force-confnew" linux-image-extra-$(uname -r) docker-engine python3-pip
apt-get install -y -q -o Dpkg::Options::="--force-confnew" linux-image-extra-$(uname -r) docker-engine
# ensure docker opts service modifications are idempotent
set +e
grep '^DOCKER_OPTS=' /etc/default/docker
@ -148,25 +153,29 @@ if [ $offer == "ubuntuserver" ]; then
set +e
fi
set -e
# install azure storage python dependency
pip3 install --no-cache-dir azure-storage
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
./perf.py nodeprep start $prefix --ts $npstart --message "offer=$offer,sku=$sku"
fi
# install cascade dependencies
if [ ! -z "$p2p" ]; then
apt-get install -y -q python3-libtorrent pigz
fi
# install private registry if required
if [ ! -z "$privatereg" ]; then
# mark private registry start
if [ $cascadecontainer -eq 0 ]; then
# install azure storage python dependency
apt-get install -y -q python3-pip
pip3 install --no-cache-dir azure-storage==0.32.0
# backfill node prep start
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
./perf.py privateregistry start $prefix --message "ipaddress=$ipaddress"
./perf.py nodeprep start $prefix --ts $npstart --message "offer=$offer,sku=$sku"
fi
./setup_private_registry.py $privatereg $ipaddress $prefix
# mark private registry end
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
./perf.py privateregistry end $prefix
# install cascade dependencies
if [ ! -z "$p2p" ]; then
apt-get install -y -q python3-libtorrent pigz
fi
# install private registry if required
if [ ! -z "$privatereg" ]; then
# mark private registry start
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
./perf.py privateregistry start $prefix --message "ipaddress=$ipaddress"
fi
./setup_private_registry.py $privatereg $ipaddress $prefix
# mark private registry end
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
./perf.py privateregistry end $prefix
fi
fi
fi
else
@ -175,25 +184,64 @@ else
fi
# login to docker hub if no private registry
if [ ! -z $DOCKER_LOGIN_USERNAME ]; then
if [ ! -z ${DOCKER_LOGIN_USERNAME+x} ]; then
docker login -u $DOCKER_LOGIN_USERNAME -p $DOCKER_LOGIN_PASSWORD
fi
# mark node prep finished
if [ ! -f ".node_prep_finished" ]; then
if [ ! -z ${CASCADE_TIMING+x} ]; then
# touch file to prevent subsequent perf recording if rebooted
touch .node_prep_finished
# execute cascade
if [ $cascadecontainer -eq 1 ]; then
detached=
if [ -z "$p2p" ]; then
detached="--rm"
else
detached="-d"
fi
# store docker run pull start
if command -v python3 > /dev/null 2>&1; then
drpstart=`python3 -c 'import datetime;print(datetime.datetime.utcnow().timestamp())'`
else
drpstart=`python -c 'import datetime;import time;print(time.mktime(datetime.datetime.utcnow()))'`
fi
# create env file
envfile=.cascade-envfile
cat > $envfile << EOF
prefix=$prefix
ipaddress=$ipaddress
offer=$offer
sku=$sku
npstart=$npstart
drpstart=$drpstart
privatereg=$privatereg
p2p=$p2p
PRIVATE_REGISTRY_STORAGE_ENV=$PRIVATE_REGISTRY_STORAGE_ENV
`env | grep CASCADE_`
`env | grep AZ_BATCH_`
`env | grep DOCKER_LOGIN_`
EOF
# launch container
docker run $detached --env-file $envfile \
-v /var/run/docker.sock:/var/run/docker.sock \
-v $AZ_BATCH_NODE_ROOT_DIR:$AZ_BATCH_NODE_ROOT_DIR \
-w $AZ_BATCH_TASK_WORKING_DIR \
-p 6881-6891:6881-6891 -p 6881-6891:6881-6891/udp \
alfpark/shipyard
else
# mark node prep finished
if [ ! -z ${CASCADE_TIMING+x} ] && [ ! -f ".node_prep_finished" ]; then
./perf.py nodeprep end $prefix
fi
# touch file to prevent subsequent perf recording if rebooted
touch .node_prep_finished
# start cascade
if [ ! -z ${CASCADE_TIMING+x} ]; then
./perf.py cascade start $prefix
fi
./cascade.py $p2p --ipaddress $ipaddress $prefix &
fi
# start cascade
if [ ! -z ${CASCADE_TIMING+x} ]; then
./perf.py cascade start $prefix
fi
./cascade.py $p2p --ipaddress $ipaddress $prefix &
# if not in p2p mode, then wait for cascade exit
if [ -z "$p2p" ]; then
wait
fi

Просмотреть файл

@ -30,8 +30,6 @@ def _setup_container_names(sep: str):
if sep is None:
sep = ''
_STORAGE_CONTAINERS['table_registry'] = sep + 'registry'
_STORAGE_CONTAINERS['queue_registry'] = '-'.join(
(sep + 'registry', _BATCHACCOUNT.lower(), _POOLID.lower()))
def _create_credentials() -> azure.storage.table.TableService:
@ -74,16 +72,17 @@ async def _start_private_registry_instance_async(
if proc.returncode != 0:
raise RuntimeError('docker images non-zero rc: {}'.format(
proc.returncode))
if (stdout[0].strip() != registry_image_id and
pathlib.Path(registry_archive).exists()):
print('importing registry from local file: {}'.format(
registry_archive))
proc = await asyncio.subprocess.create_subprocess_shell(
'gunzip -c {} | docker load'.format(registry_archive), loop=loop)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError('docker load non-zero rc: {}'.format(
proc.returncode))
if stdout[0].strip() != registry_image_id:
ra = pathlib.Path(
os.environ['AZ_BATCH_TASK_WORKING_DIR'], registry_archive)
if ra.exists():
print('importing registry from local file: {}'.format(ra))
proc = await asyncio.subprocess.create_subprocess_shell(
'gunzip -c {} | docker load'.format(ra), loop=loop)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError('docker load non-zero rc: {}'.format(
proc.returncode))
sa, ep, sakey = os.environ['PRIVATE_REGISTRY_STORAGE_ENV'].split(':')
registry_cmd = [
'docker', 'run', '-d', '-p',
@ -147,7 +146,9 @@ async def setup_private_registry_async(
def main():
"""Main function"""
# delete existing private registry file if it exists
cprfile = pathlib.Path('.cascade_private_registry.txt')
cprfile = pathlib.Path(
os.environ['AZ_BATCH_TASK_WORKING_DIR'],
'.cascade_private_registry.txt')
try:
cprfile.unlink()
except FileNotFoundError:

Просмотреть файл

@ -57,6 +57,7 @@ _AZUREFILE_SYSTEMD_SERVICE_URL = (
'/master/contrib/init/systemd/azurefile-dockervolumedriver.service'
)
_AZUREFILE_SYSTEMD_SERVICE_MD5 = 'd58f2f5e9f9f78216651ac28419878f1'
_MAX_REBOOT_RETRIES = 5
_NODEPREP_FILE = ('nodeprep.sh', 'scripts/nodeprep.sh')
_JOBPREP_FILE = ('jpdockerblock.sh', 'scripts/jpdockerblock.sh')
_CASCADE_FILE = ('cascade.py', 'cascade.py')
@ -70,6 +71,7 @@ _TEMP_DIR = pathlib.Path('/tmp')
def _setup_logger():
# type: () -> None
"""Set up logger"""
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
@ -278,6 +280,13 @@ def upload_resource_files(blob_client, config, files):
def setup_azurefile_volume_driver(blob_client, config):
# type: (azure.storage.blob.BlockBlobService, dict) -> tuple
"""Set up the Azure File docker volume driver
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:rtype: tuple
:return: (bin path, service file path, service env file path,
volume creation script path)
"""
# check to see if binary is downloaded
bin = pathlib.Path('resources/azurefile-dockervolumedriver')
if (not bin.exists() or
@ -353,6 +362,10 @@ def add_pool(batch_client, blob_client, config):
perf = config['store_timing_metrics']
except KeyError:
perf = False
try:
use_shipyard_docker_image = config['use_shipyard_docker_image']
except KeyError:
use_shipyard_docker_image = True
# peer-to-peer settings
try:
p2p = config['data_replication']['peer_to_peer']['enabled']
@ -415,6 +428,7 @@ def add_pool(batch_client, blob_client, config):
for key in shared_data_volumes:
if shared_data_volumes[key]['volume_driver'] == 'azurefile':
azurefile_vd = True
break
except KeyError:
pass
# prefix settings
@ -439,12 +453,14 @@ def add_pool(batch_client, blob_client, config):
]
sku_to_use, image_ref_to_use = skus_to_use[-1]
# create resource files list
_rflist = [
_NODEPREP_FILE, _JOBPREP_FILE, _CASCADE_FILE, _SETUP_PR_FILE,
_PERF_FILE, _REGISTRY_FILE
]
_rflist = [_NODEPREP_FILE, _JOBPREP_FILE, _REGISTRY_FILE]
if not use_shipyard_docker_image:
_rflist.append(_CASCADE_FILE, _SETUP_PR_FILE)
if perf:
_rflist.append(_PERF_FILE)
# handle azurefile docker volume driver
if azurefile_vd:
# only ubuntu 16.04 is supported for azurefile dvd
if (publisher != 'Canonical' or offer != 'UbuntuServer' or
sku < '16.04.0-LTS'):
raise ValueError(
@ -461,7 +477,7 @@ def add_pool(batch_client, blob_client, config):
del _rflist
# create start task commandline
start_task = [
'{} -o {} -s {}{}{}{}{}'.format(
'{} -o {} -s {}{}{}{}{}{}'.format(
_NODEPREP_FILE[0],
offer,
sku,
@ -469,6 +485,7 @@ def add_pool(batch_client, blob_client, config):
torrentflags,
' -p {}'.format(prefix) if prefix else '',
' -a' if azurefile_vd else '',
' -d' if use_shipyard_docker_image else '',
),
]
try:
@ -566,6 +583,12 @@ def add_pool(batch_client, blob_client, config):
def add_ssh_tunnel_user(batch_client, config, nodes=None):
# type: (batch.BatchServiceClient, dict,
# List[batchmodels.ComputeNode]) -> None
"""Add an SSH user to node and optionally generate an SSH tunneling script
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
:param list nodes: list of nodes
"""
pool_id = config['pool_specification']['id']
try:
docker_user = config[
@ -620,6 +643,7 @@ def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed):
'waiting for all nodes in pool {} to reach one of: {!r}'.format(
pool_id, node_state))
i = 0
reboot_map = {}
while True:
# refresh pool to ensure that there is no resize error
pool = batch_client.pool.get(pool_id)
@ -635,7 +659,14 @@ def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed):
for node in nodes:
if (node.state ==
batchmodels.ComputeNodeState.starttaskfailed):
if node.id not in reboot_map:
reboot_map[node.id] = 0
if reboot_map[node.id] > _MAX_REBOOT_RETRIES:
raise RuntimeError(
('ran out of reboot retries recovering node {} '
'in pool {}').format(node.id, pool.id))
_reboot_node(batch_client, pool.id, node.id, True)
reboot_map[node.id] += 1
# refresh node list
nodes = list(batch_client.compute_node.list(pool.id))
if (len(nodes) >= pool.target_dedicated and
@ -703,16 +734,20 @@ def add_admin_user_to_compute_node(
"""
logger.info('adding user {} to node {} in pool {}'.format(
username, node.id, pool_id))
batch_client.compute_node.add_user(
pool_id,
node.id,
batchmodels.ComputeNodeUser(
username,
is_admin=True,
password=None,
ssh_public_key=open(ssh_public_key, 'rb').read().decode('utf8')
try:
batch_client.compute_node.add_user(
pool_id,
node.id,
batchmodels.ComputeNodeUser(
username,
is_admin=True,
password=None,
ssh_public_key=open(ssh_public_key, 'rb').read().decode('utf8')
)
)
)
except batchmodels.batch_error.BatchErrorException as ex:
if 'The node user already exists' not in ex.message.value:
raise
def resize_pool(batch_client, config):