diff --git a/aztk/spark/helpers/submit.py b/aztk/spark/helpers/submit.py index 6955e8fa..3edf6ca6 100644 --- a/aztk/spark/helpers/submit.py +++ b/aztk/spark/helpers/submit.py @@ -157,6 +157,8 @@ def submit_application(spark_client, cluster_id, application, wait: bool = False affinity_id=cluster.master_node_id), command_line=helpers.wrap_commands_in_shell(cmd), resource_files=resource_files, + constraints=batch_models.TaskConstraints( + max_task_retry_count=application.max_retry_count), user_identity=batch_models.UserIdentity( auto_user=batch_models.AutoUserSpecification( scope=batch_models.AutoUserScope.task, diff --git a/aztk/spark/models.py b/aztk/spark/models.py index 0c23a187..e834a88f 100644 --- a/aztk/spark/models.py +++ b/aztk/spark/models.py @@ -109,7 +109,8 @@ class Application: driver_memory=None, executor_memory=None, driver_cores=None, - executor_cores=None): + executor_cores=None, + max_retry_count=None): self.name = name self.application = application self.application_args = application_args @@ -124,6 +125,7 @@ class Application: self.executor_memory = executor_memory self.driver_cores = driver_cores self.executor_cores = executor_cores + self.max_retry_count = max_retry_count class ApplicationLog(): diff --git a/cli/spark/endpoints/cluster_submit.py b/cli/spark/endpoints/cluster_submit.py index 4cde73f9..06723b57 100644 --- a/cli/spark/endpoints/cluster_submit.py +++ b/cli/spark/endpoints/cluster_submit.py @@ -59,7 +59,11 @@ def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--executor-cores', help='Number of cores per executor. (Default: All \ - available cores on the worker') + available cores on the worker)') + + parser.add_argument('--max-retry-count', + help='Number of times the Spark job may be retried \ + if there is a failure') parser.add_argument('app', help='App jar OR python file to execute. Use absolute \ @@ -131,7 +135,8 @@ def execute(args: typing.NamedTuple): driver_memory=args.driver_memory, executor_memory=args.executor_memory, driver_cores=args.driver_cores, - executor_cores=args.executor_cores + executor_cores=args.executor_cores, + max_retry_count=args.max_retry_count ), wait=False ) diff --git a/docs/50-sdk.md b/docs/50-sdk.md index d4319919..049328ae 100644 --- a/docs/50-sdk.md +++ b/docs/50-sdk.md @@ -270,8 +270,10 @@ Find some samples and getting stated tutorial in the `examples/sdk/` directory o Cores for driver (Default: 1). - executor_cores: str - Number of cores per executor. (Default: All available cores on the worker - + Number of cores per executor. (Default: All available cores on the worker) + + - max_retry_count: int + Number of times the Spark job may be retried if there is a failure - `ApplicationLog`