Bug: fix spark-submit command file paths for extra files (#194)

* fix spark-submit command file paths for extra files

* fix files flag typo

* remove too long lines
This commit is contained in:
Jacob Freck 2017-11-02 15:03:55 -07:00 коммит произвёл GitHub
Родитель 1e0dc78108
Коммит 34d587b4b4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 43 добавлений и 15 удалений

Просмотреть файл

@ -82,7 +82,7 @@ def execute(args: typing.NamedTuple):
py_files = args.py_files.replace(' ', '').split(',')
if args.files is not None:
files = args.py_files.replace(' ', '').split(',')
files = args.files.replace(' ', '').split(',')
log.info("-------------------------------------------")
log.info("Spark cluster id: %s", args.cluster_id)

Просмотреть файл

@ -10,6 +10,7 @@ Submit helper methods
def __get_node(spark_client, node_id: str, cluster_id: str) -> batch_models.ComputeNode:
return spark_client.batch_client.compute_node.get(cluster_id, node_id)
def __app_submit_cmd(
spark_client,
cluster_id: str,
@ -33,6 +34,12 @@ def __app_submit_cmd(
spark_home = constants.DOCKER_SPARK_HOME
# set file paths to correct path on container
files_path = '/batch/workitems/{0}/{1}/{2}/wd/'.format(cluster_id, "job-1", name)
jars = [files_path + jar for jar in jars]
py_files = [files_path + py_file for py_file in py_files]
files = [files_path + f for f in files]
# 2>&1 redirect stdout and stderr to be in the same file
spark_submit_cmd = CommandBuilder(
'{0}/bin/spark-submit >> {1} 2>&1'.format(spark_home, constants.SPARK_SUBMIT_LOGS_FILE))
@ -42,7 +49,7 @@ def __app_submit_cmd(
spark_submit_cmd.add_option('--class', main_class)
spark_submit_cmd.add_option('--jars', jars and ','.join(jars))
spark_submit_cmd.add_option('--py-files', py_files and ','.join(py_files))
spark_submit_cmd.add_option('--jars', files and ','.join(files))
spark_submit_cmd.add_option('--files', files and ','.join(files))
spark_submit_cmd.add_option('--driver-java-options', driver_java_options)
spark_submit_cmd.add_option('--driver-library-path', driver_library_path)
spark_submit_cmd.add_option('--driver-class-path', driver_class_path)
@ -63,6 +70,7 @@ def __app_submit_cmd(
docker_exec_cmd.to_str()
]
def submit_application(spark_client, cluster_id, application, wait: bool = False):
"""
Submit a spark app
@ -70,36 +78,56 @@ def submit_application(spark_client, cluster_id, application, wait: bool = False
resource_files = []
app_resource_file = helpers.upload_file_to_container(container_name=application.name, file_path=application.application, blob_client=spark_client.blob_client, use_full_path=False)
app_resource_file = helpers.upload_file_to_container(container_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
use_full_path=False)
# Upload application file
resource_files.append(app_resource_file)
# Upload dependent JARS
jar_resource_file_paths = []
for jar in application.jars:
resource_files.append(
helpers.upload_file_to_container(container_name=application.name, file_path=jar, blob_client=spark_client.blob_client, use_full_path=True))
current_jar_resource_file_path = helpers.upload_file_to_container(container_name=application.name,
file_path=jar,
blob_client=spark_client.blob_client,
use_full_path=False)
jar_resource_file_paths.append(current_jar_resource_file_path)
resource_files.append(current_jar_resource_file_path)
# Upload dependent python files
py_files_resource_file_paths = []
for py_file in application.py_files:
resource_files.append(
helpers.upload_file_to_container(container_name=application.name, file_path=py_file, blob_client=spark_client.blob_client, use_full_path=True))
current_py_files_resource_file_path = helpers.upload_file_to_container(container_name=application.name,
file_path=py_file,
blob_client=spark_client.blob_client,
use_full_path=False)
py_files_resource_file_paths.append(
current_py_files_resource_file_path)
resource_files.append(current_py_files_resource_file_path)
# Upload other dependent files
files_resource_file_paths = []
for file in application.files:
resource_files.append(
helpers.upload_file_to_container(container_name=application.name, file_path=file, blob_client=spark_client.blob_client, use_full_path=True))
files_resource_file_path = helpers.upload_file_to_container(container_name=application.name,
file_path=file,
blob_client=spark_client.blob_client,
use_full_path=False)
files_resource_file_paths.append(files_resource_file_path)
resource_files.append(files_resource_file_path)
# create command to submit task
cmd = __app_submit_cmd(
spark_client = spark_client,
spark_client=spark_client,
cluster_id=cluster_id,
name=application.name,
app=app_resource_file.file_path,
app_args=application.application_args,
main_class=application.main_class,
jars=application.jars,
py_files=application.py_files,
files=application.files,
jars=[jar_resource_file_path.file_path for jar_resource_file_path in jar_resource_file_paths],
py_files=[py_files_resource.file_path for py_files_resource in py_files_resource_file_paths],
files=[file_resource_file_path.file_path for file_resource_file_path in files_resource_file_paths],
driver_java_options=application.driver_java_options,
driver_library_path=application.driver_library_path,
driver_class_path=application.driver_class_path,
@ -131,6 +159,6 @@ def submit_application(spark_client, cluster_id, application, wait: bool = False
# Add task to batch job (which has the same name as cluster_id)
job_id = cluster_id
spark_client.batch_client.task.add(job_id=job_id, task=task)
if wait:
helpers.wait_for_task_to_complete(job_id=job_id, task_id=task.id, batch_client=spark_client.batch_client)
helpers.wait_for_task_to_complete(job_id=job_id, task_id=task.id, batch_client=spark_client.batch_client)