* fix job submission bug and integration tests

* merge

* update tests, add vsts-ci.yml

* add python step

* debug statements

* update build

* update build add print

* build update

* fix bug

* debug

* undo

* parallelize build

* add trigger

* typo

* remove env

* whitespace

* whitespace

* whitespace

* remove debug branch
This commit is contained in:
Jacob Freck 2018-05-22 15:17:02 -07:00 коммит произвёл GitHub
Родитель 1527929e30
Коммит 66037fd5cb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 453 добавлений и 355 удалений

2
.vscode/launch.json поставляемый
Просмотреть файл

@ -35,7 +35,7 @@
"spark", "cluster", "create", "--id", "spark-debug"
],
"env": {},
"envFile": "${workspaceFolder}/.env",
"envFile": "${workspaceFolder}/.venv",
"debugOptions": [
"RedirectOutput"
]

20
.vsts-ci.yml Normal file
Просмотреть файл

@ -0,0 +1,20 @@
trigger:
- master
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '>= 3.5'
addToPath: true
architecture: 'x64'
- script: |
pip install -r requirements.txt
pip install -e .
condition: and(succeeded(), eq(variables['agent.os'], 'linux'))
displayName: install aztk
- script: |
pytest -n 50
condition: and(succeeded(), in(variables['agent.os'], 'linux'))
displayName: pytest

Просмотреть файл

@ -22,7 +22,8 @@ def __app_cmd():
docker_exec.add_argument("spark /bin/bash >> output.log 2>&1 -c \"" \
"source ~/.bashrc; " \
"export PYTHONPATH=$PYTHONPATH:\$AZTK_WORKING_DIR; " \
"$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/job_submission.py\"")
"cd \$AZ_BATCH_TASK_WORKING_DIR; " \
"\$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/job_submission.py\"")
return docker_exec.to_str()

Просмотреть файл

@ -84,7 +84,7 @@ def generate_task(spark_client, container_id, application):
task_cmd.add_argument('spark /bin/bash >> output.log 2>&1')
task_cmd.add_argument('-c "source ~/.bashrc; ' \
'export PYTHONPATH=$PYTHONPATH:\$AZTK_WORKING_DIR; ' \
'cd $AZ_BATCH_TASK_WORKING_DIR; ' \
'cd \$AZ_BATCH_TASK_WORKING_DIR; ' \
'\$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/submit.py"')
# Create task

Просмотреть файл

@ -8,6 +8,7 @@ from aztk.utils import constants, helpers
class SparkToolkit(aztk.models.Toolkit):
def __init__(self, version: str, environment: str = None, environment_version: str = None):
super().__init__(
software="spark",
version=version,
environment=environment,
environment_version=environment_version,

Просмотреть файл

@ -45,4 +45,3 @@ def wait_for_master_to_be_ready(client, cluster_id: str):
"Master didn't become ready before timeout.")
time.sleep(10)
time.sleep(5)

Просмотреть файл

@ -1,4 +1,5 @@
import subprocess
import os
import time
from datetime import datetime
@ -19,36 +20,90 @@ base_cluster_id = "cluster-{}".format(current_time)
# load secrets
# note: this assumes secrets are set up in .aztk/secrets
spark_client = aztk.spark.Client(config.load_aztk_secrets())
tenant_id = os.environ.get("TENANT_ID")
client_id = os.environ.get("CLIENT_ID")
credential = os.environ.get("CREDENTIAL")
batch_account_resource_id = os.environ.get("BATCH_ACCOUNT_RESOURCE_ID")
storage_account_resource_id = os.environ.get("STORAGE_ACCOUNT_RESOURCE_ID")
ssh_pub_key = os.environ.get("ID_RSA_PUB")
ssh_priv_key = os.environ.get("ID_RSA")
keys = [tenant_id, client_id, credential, batch_account_resource_id,
storage_account_resource_id, ssh_priv_key, ssh_pub_key]
if all(keys):
spark_client = aztk.spark.Client(
aztk.spark.models.SecretsConfiguration(
service_principal=aztk.spark.models.ServicePrincipalConfiguration(
tenant_id=tenant_id,
client_id=client_id,
credential=credential,
batch_account_resource_id=batch_account_resource_id,
storage_account_resource_id=storage_account_resource_id
),
ssh_pub_key=ssh_pub_key,
ssh_priv_key=ssh_priv_key
)
)
else:
# fallback to local secrets if envrionment variables don't exist
spark_client = aztk.spark.Client(config.load_aztk_secrets())
# helper method
def wait_until_cluster_deleted(cluster_id: str):
def clean_up_cluster(cluster_id):
try:
spark_client.delete_cluster(cluster_id=cluster_id)
except (BatchErrorException, AztkError):
# pass in the event that the cluster does not exist
pass
def ensure_spark_master(cluster_id):
results = spark_client.cluster_run(cluster_id,
"if $AZTK_IS_MASTER ; then $SPARK_HOME/sbin/spark-daemon.sh status org.apache.spark.deploy.master.Master 1 ;" \
" else echo AZTK_IS_MASTER is false ; fi")
for _, result in results:
if isinstance(result, Exception):
raise result
print(result[0])
assert result[0] in ["org.apache.spark.deploy.master.Master is running.", "AZTK_IS_MASTER is false"]
def ensure_spark_worker(cluster_id):
results = spark_client.cluster_run(cluster_id,
"if $AZTK_IS_WORKER ; then $SPARK_HOME/sbin/spark-daemon.sh status org.apache.spark.deploy.worker.Worker 1 ;" \
" else echo AZTK_IS_WORKER is false ; fi")
for _, result in results:
if isinstance(result, Exception):
raise result
assert result[0] in ["org.apache.spark.deploy.worker.Worker is running.", "AZTK_IS_WORKER is false"]
def ensure_spark_processes(cluster_id):
ensure_spark_master(cluster_id)
ensure_spark_worker(cluster_id)
def wait_for_all_nodes(cluster_id, nodes):
while True:
try:
spark_client.get_cluster(cluster_id)
time.sleep(1)
except AztkError:
# break when the cluster is not found
break
for node in nodes:
if node.state not in [batch_models.ComputeNodeState.idle, batch_models.ComputeNodeState.running]:
break
else:
nodes = spark_client.get_cluster(cluster_id).nodes
continue
break
def test_create_cluster():
test_id = "test-create-"
# TODO: make Cluster Configuration more robust, test each value
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
try:
# TODO: make Cluster Configuration more robust, test each value
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
cluster = spark_client.create_cluster(cluster_configuration, wait=True)
assert cluster.pool is not None
@ -64,26 +119,22 @@ def test_create_cluster():
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_get_cluster():
test_id = "test-get-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
spark_client.create_cluster(cluster_configuration, wait=True)
cluster = spark_client.get_cluster(cluster_id=cluster_configuration.cluster_id)
@ -100,27 +151,23 @@ def test_get_cluster():
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_list_clusters():
test_id = "test-list-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
spark_client.create_cluster(cluster_configuration, wait=True)
clusters = spark_client.list_clusters()
@ -130,28 +177,23 @@ def test_list_clusters():
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_get_remote_login_settings():
test_id = "test-get-remote-login-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
spark_client.create_cluster(cluster_configuration, wait=True)
cluster = spark_client.get_cluster(cluster_id=cluster_configuration.cluster_id)
rls = spark_client.get_remote_login_settings(cluster_id=cluster.id, node_id=cluster.master_node_id)
@ -159,47 +201,46 @@ def test_get_remote_login_settings():
assert rls.ip_address is not None
assert rls.port is not None
except (AztkError, BatchErrorException):
except (AztkError, BatchErrorException) as e:
raise e
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_submit():
test_id = "test-submit-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None
)
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
spark_client.create_cluster(cluster_configuration, wait=True)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None
)
spark_client.submit(cluster_id=cluster_configuration.cluster_id, application=application_configuration, wait=True)
assert True
@ -207,48 +248,47 @@ def test_submit():
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_get_application_log():
test_id = "test-get-app-log-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None
)
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
spark_client.create_cluster(cluster_configuration, wait=True)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None
)
spark_client.submit(cluster_id=cluster_configuration.cluster_id, application=application_configuration, wait=True)
application_log = spark_client.get_application_log(cluster_id=cluster_configuration.cluster_id,
application_name=application_configuration.name,
tail=False,
current_bytes=0)
assert application_log.exit_code == 0
assert application_log.name == application_configuration.name == "pipy100"
assert application_log.application_state == "completed"
@ -259,9 +299,7 @@ def test_get_application_log():
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_create_user_password():
@ -276,40 +314,37 @@ def test_create_user_ssh_key():
def test_get_application_status_complete():
test_id = "test-app-status-complete-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None
)
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
spark_client.create_cluster(cluster_configuration, wait=True)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None
)
spark_client.submit(cluster_id=cluster_configuration.cluster_id, application=application_configuration, wait=True)
spark_client.submit(cluster_configuration.cluster_id, application_configuration)
spark_client.wait_until_application_done(cluster_id=cluster_configuration.cluster_id, task_id=application_configuration.name)
status = spark_client.get_application_status(cluster_id=cluster_configuration.cluster_id, app_name=application_configuration.name)
assert status == "completed"
@ -318,29 +353,26 @@ def test_get_application_status_complete():
assert False
finally:
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
clean_up_cluster(cluster_configuration.cluster_id)
def test_delete_cluster():
test_id = "test-delete-"
try:
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
docker_repo=None,
spark_configuration=None
)
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
try:
spark_client.create_cluster(cluster_configuration, wait=True)
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
assert success is True
@ -348,5 +380,31 @@ def test_delete_cluster():
assert False
finally:
clean_up_cluster(cluster_configuration.cluster_id)
def test_spark_processes_up():
test_id = "test-spark-processes-up"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
vm_low_pri_count=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
try:
cluster = spark_client.create_cluster(cluster_configuration, wait=True)
wait_for_all_nodes(cluster.id, cluster.nodes)
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
wait_until_cluster_deleted(cluster_id=cluster_configuration.cluster_id)
assert success is True
except (AztkError, BatchErrorException):
assert False
finally:
clean_up_cluster(cluster_configuration.cluster_id)

Просмотреть файл

@ -1,3 +1,4 @@
import os
import subprocess
from datetime import datetime
@ -14,68 +15,92 @@ base_job_id = "job-{}".format(time)
# load secrets
# note: this assumes secrets are set up in .aztk/secrets
spark_client = aztk.spark.Client(config.load_aztk_secrets())
tenant_id = os.environ.get("TENANT_ID")
client_id = os.environ.get("CLIENT_ID")
credential = os.environ.get("CREDENTIAL")
batch_account_resource_id = os.environ.get("BATCH_ACCOUNT_RESOURCE_ID")
storage_account_resource_id = os.environ.get("STORAGE_ACCOUNT_RESOURCE_ID")
ssh_pub_key = os.environ.get("ID_RSA_PUB")
ssh_priv_key = os.environ.get("ID_RSA")
keys = [tenant_id, client_id, credential, batch_account_resource_id,
storage_account_resource_id, ssh_priv_key, ssh_pub_key]
if all(keys):
spark_client = aztk.spark.Client(
aztk.spark.models.SecretsConfiguration(
service_principal=aztk.spark.models.ServicePrincipalConfiguration(
tenant_id=tenant_id,
client_id=client_id,
credential=credential,
batch_account_resource_id=batch_account_resource_id,
storage_account_resource_id=storage_account_resource_id
)
)
)
else:
# fallback to local secrets if envrionment variables don't exist
spark_client = aztk.spark.Client(config.load_aztk_secrets())
def test_submit_job():
test_id = "submit-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=2,
max_low_pri_nodes=None
)
job = spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_id=job_configuration.id)
assert job.id == job_configuration.id
assert job.state is not None
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
spark_client.delete_job(job_configuration.id)
clean_up_job(job_configuration.id)
def test_list_jobs():
test_id = "list-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=1,
max_low_pri_nodes=0,
worker_on_master=True
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=1,
max_low_pri_nodes=None
)
spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_configuration.id)
@ -84,37 +109,36 @@ def test_list_jobs():
assert jobs is not None
assert job_configuration.id in [job.id for job in jobs]
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
spark_client.delete_job(job_configuration.id)
clean_up_job(job_configuration.id)
def test_list_applications():
test_id = "list-apps-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=2,
max_low_pri_nodes=None
)
spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_configuration.id)
@ -125,37 +149,37 @@ def test_list_applications():
for application in applications:
assert isinstance(application, (aztk.spark.models.Application, str))
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
spark_client.delete_job(job_configuration.id)
clean_up_job(job_configuration.id)
def test_get_job():
test_id = "get-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=1,
max_low_pri_nodes=0,
worker_on_master=True
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=None,
max_low_pri_nodes=1
)
spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_configuration.id)
@ -164,121 +188,116 @@ def test_get_job():
assert app1.name in [app.name for app in job.applications]
assert app2.name in [app.name for app in job.applications]
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
spark_client.delete_job(job_configuration.id)
clean_up_job(job_configuration.id)
def test_get_application():
test_id = "get-app-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=2,
max_low_pri_nodes=None
)
spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_configuration.id)
application = spark_client.get_application(job_id=job_configuration.id, application_name="pipy100")
application = spark_client.get_application(job_id=job_configuration.id, application_name=app1.name)
assert isinstance(application, aztk.spark.models.Application)
assert application.exit_code == 0
assert application.state == "completed"
assert application.name == "pipy100"
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
spark_client.delete_job(job_configuration.id)
clean_up_job(job_configuration.id)
def test_get_application_log():
test_id = "gal-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=2,
max_low_pri_nodes=None
)
spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_configuration.id)
application_log = spark_client.get_job_application_log(job_id=job_configuration.id, application_name="pipy100")
application_log = spark_client.get_job_application_log(job_id=job_configuration.id, application_name=app1.name)
assert isinstance(application_log, aztk.spark.models.ApplicationLog)
assert application_log.log is not None
assert application_log.exit_code == 0
assert application_log.name == "pipy100"
assert application_log.total_bytes != 0
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
spark_client.delete_job(job_configuration.id)
clean_up_job(job_configuration.id)
def test_delete_job():
test_id = "delete-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=1,
max_low_pri_nodes=0,
worker_on_master=True
)
try:
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="examples/src/main/python/pi.py",
application_args=[100]
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id+base_job_id,
applications=[app1],
vm_size="standard_f1",
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=1,
max_low_pri_nodes=None
)
spark_client.submit_job(job_configuration=job_configuration)
spark_client.wait_until_job_finished(job_configuration.id)
spark_client.delete_job(job_configuration.id)
assert job_configuration.id not in spark_client.list_jobs()
try:
spark_client.get_job(job_configuration.id)
except AztkError:
# this should fail
assert True
except (AztkError, BatchErrorException):
assert False
except (AztkError, BatchErrorException) as e:
raise e
finally:
try:
spark_client.delete_job(job_configuration.id)
except Exception:
pass
clean_up_job(job_configuration.id)
def clean_up_job(job_id):
try:
spark_client.delete_job(job_id)
except (BatchErrorException, AztkError):
pass