Add mozetl-runner for external mozetl-compatible modules (#480)

* Add initial function for generating the mozetl runner

* Add tests for generate_runner

* Generate a runner for external modules

* Add missing changes to test_mozetl
This commit is contained in:
Anthony Miyaguchi 2019-05-01 16:07:39 -07:00 коммит произвёл GitHub
Родитель 0accda4037
Коммит b8a44075d0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 164 добавлений и 6 удалений

Просмотреть файл

@ -1,8 +1,12 @@
import boto3
import botocore
from os import environ
from pprint import pformat
from airflow.plugins_manager import AirflowPlugin
from databricks.databricks_operator import DatabricksSubmitRunOperator
from mozetl import generate_runner
class MozDatabricksSubmitRunOperator(DatabricksSubmitRunOperator):
@ -169,9 +173,25 @@ class MozDatabricksSubmitRunOperator(DatabricksSubmitRunOperator):
libraries.append({'jar': artifact_path_s3})
elif env.get("MOZETL_COMMAND"):
# create a runner if it doesn't exist
s3 = boto3.resource("s3")
bucket = "telemetry-test-bucket" if is_dev else "telemetry-airflow"
prefix = "steps"
module_name = env.get("MOZETL_EXTERNAL_MODULE", "mozetl")
runner_name = "{}_runner.py".format(module_name)
try:
s3.Object(bucket, "{}/{}".format(prefix, runner_name)).load()
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
generate_runner(module_name, bucket, prefix)
else:
raise e
# options are read directly from the environment via Click
python_task = {
"python_file": "s3://telemetry-airflow/steps/mozetl_runner.py",
"python_file": "s3://{}/{}/{}".format(bucket, prefix, runner_name),
"parameters": [env["MOZETL_COMMAND"]]
}

33
plugins/mozetl.py Normal file
Просмотреть файл

@ -0,0 +1,33 @@
import boto3
import logging
from textwrap import dedent
def generate_runner(module_name, bucket, prefix):
"""Generate a runner for the current module to be run in Databricks.
See https://github.com/mozilla/python_mozetl/blob/master/bin/mozetl-databricks.py for a
standalone implementation.
"""
logging.info(
"Writing new runner to {}/{} for {}".format(bucket, prefix, module_name)
)
runner_data = """
# This runner has been auto-generated from mozilla/telemetry-airflow/plugins/moz_databricks.py.
# Any changes made to the runner file may be over-written on subsequent runs.
from {module} import cli
try:
cli.entry_point(auto_envvar_prefix="MOZETL")
except SystemExit:
# avoid calling sys.exit() in databricks
# http://click.palletsprojects.com/en/7.x/api/?highlight=auto_envvar_prefix#click.BaseCommand.main
pass
""".format(
module=module_name
)
s3 = boto3.resource("s3")
runner_object = s3.Object(bucket, "{}/{}_runner.py".format(prefix, module_name))
runner_object.put(Body=dedent(runner_data))

Просмотреть файл

@ -2,7 +2,9 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import boto3
import pytest
from moto import mock_s3
from plugins.moz_databricks import MozDatabricksSubmitRunOperator
# The environment variables required by the MozDatabricks operator must be available
@ -11,6 +13,20 @@ from plugins.moz_databricks import MozDatabricksSubmitRunOperator
# so the variables are defined in `tox.ini` instead.
@pytest.fixture()
def client():
"""Create a moto generated fixture for s3. Using this fixture will put the function
under test in the same scope as the @mock_s3 decorator. See
https://github.com/spulec/moto/issues/620.
"""
mock_s3().start()
client = boto3.resource("s3")
client.create_bucket(Bucket="telemetry-test-bucket")
client.create_bucket(Bucket="telemetry-airflow")
yield client
mock_s3().stop()
@pytest.fixture()
def mock_hook(mocker):
mock_hook = mocker.patch("plugins.databricks.databricks_operator.DatabricksHook")
@ -26,7 +42,10 @@ def test_missing_tbv_or_mozetl_env(mock_hook):
)
def test_mozetl_success(mock_hook):
def test_mozetl_success(mock_hook, client):
client.Object("telemetry-test-bucket", "steps/mozetl_runner.py").put(
Body="raise NotImplementedError"
)
operator = MozDatabricksSubmitRunOperator(
task_id="test_databricks",
job_name="test_databricks",
@ -57,7 +76,7 @@ def test_tbv_success(mock_hook):
assert json.get("spark_jar_task") is not None
def test_default_python_version(mock_hook):
def test_default_python_version(mock_hook, client):
# run with default
operator = MozDatabricksSubmitRunOperator(
task_id="test_databricks",
@ -97,7 +116,7 @@ def test_default_python_version(mock_hook):
).execute(None)
def test_set_mozetl_path_and_branch(mock_hook):
def test_set_mozetl_path_and_branch(mock_hook, client):
def mocked_run_submit_args(env):
MozDatabricksSubmitRunOperator(
task_id="test_databricks",
@ -125,3 +144,66 @@ def test_set_mozetl_path_and_branch(mock_hook):
json["libraries"][0]["pypi"]["package"]
== "git+https://github.com/mozilla/python_mozetl.git@dev"
)
def test_mozetl_skips_generates_runner_if_exists(mocker, client):
client.Object("telemetry-test-bucket", "steps/mozetl_runner.py").put(
Body="raise NotImplementedError"
)
mock_hook = mocker.patch("plugins.databricks.databricks_operator.DatabricksHook")
mock_runner = mocker.patch("plugins.moz_databricks.generate_runner")
operator = MozDatabricksSubmitRunOperator(
task_id="test_databricks",
job_name="test_databricks",
env={"MOZETL_COMMAND": "test"},
instance_count=1,
)
operator.execute(None)
assert mock_hook.called
assert mock_runner.assert_not_called
assert (
operator.json["spark_python_task"]["python_file"]
== "s3://telemetry-test-bucket/steps/mozetl_runner.py"
)
def test_mozetl_generates_runner_if_not_exists(mocker, client):
mock_hook = mocker.patch("plugins.databricks.databricks_operator.DatabricksHook")
mock_runner = mocker.patch("plugins.moz_databricks.generate_runner")
operator = MozDatabricksSubmitRunOperator(
task_id="test_databricks",
job_name="test_databricks",
env={"MOZETL_COMMAND": "test"},
instance_count=1,
)
operator.execute(None)
assert mock_hook.called
assert mock_runner.called
assert (
operator.json["spark_python_task"]["python_file"]
== "s3://telemetry-test-bucket/steps/mozetl_runner.py"
)
def test_mozetl_generates_runner_for_external_module(mocker, client):
client.Object("telemetry-test-bucket", "steps/mozetl_runner.py").put(
Body="raise NotImplementedError"
)
mock_hook = mocker.patch("plugins.databricks.databricks_operator.DatabricksHook")
mock_runner = mocker.patch("plugins.moz_databricks.generate_runner")
operator = MozDatabricksSubmitRunOperator(
task_id="test_databricks",
job_name="test_databricks",
env={"MOZETL_COMMAND": "test", "MOZETL_EXTERNAL_MODULE": "custom_module"},
instance_count=1,
)
operator.execute(None)
assert mock_hook.called
assert mock_runner.called
assert (
operator.json["spark_python_task"]["python_file"]
== "s3://telemetry-test-bucket/steps/custom_module_runner.py"
)

23
tests/test_mozetl.py Normal file
Просмотреть файл

@ -0,0 +1,23 @@
import boto3
import pytest
from moto import mock_s3
from plugins.mozetl import generate_runner
@mock_s3
@pytest.mark.parametrize("module_name", ["mozetl", "custom"])
def test_generate_runner(module_name):
bucket = "test-bucket"
prefix = "test-prefix"
conn = boto3.resource("s3")
conn.create_bucket(Bucket=bucket)
generate_runner(module_name, bucket, prefix)
body = (
conn.Object(bucket, "{}/{}_runner.py".format(prefix, module_name))
.get()["Body"]
.read()
.decode("utf-8")
)
assert body.split("\n")[3] == "from {} import cli".format(module_name)

Просмотреть файл

@ -19,8 +19,8 @@ setenv =
SPARK_BUCKET = test
PRIVATE_OUTPUT_BUCKET = test
PUBLIC_OUTPUT_BUCKET = test
DEPLOY_ENVIRONMENT = test
DEPLOY_TAG = test
DEPLOY_ENVIRONMENT = dev
DEPLOY_TAG = dev
ARTIFACTS_BUCKET = test
DATABRICKS_DEFAULT_IAM = test
EMR_INSTANCE_TYPE = test