[AIRFLOW-4670] Make airflow/example_dags Pylint compatible (#5361)

This commit is contained in:
Bas Harenslak 2019-06-09 16:36:33 +02:00 коммит произвёл Jarek Potiuk
Родитель 3891de68af
Коммит 189bbfd85d
34 изменённых файлов: 124 добавлений и 107 удалений

Просмотреть файл

@ -166,8 +166,8 @@ disable=print-statement,
super-init-not-called, # BasPH: ignored for now but should be fixed somewhere in the future
arguments-differ, # Doesn't always raise valid messages
import-error, # Requires installing Airflow environment in CI task which takes long, therefore ignored. Tests should fail anyways if deps are missing. Possibly un-ignore in the future if we ever use pre-built Docker images for CI.
fixme # There should be a good reason for adding a TODO
fixme, # There should be a good reason for adding a TODO
pointless-statement # Is raised on the bitshift operator. Could be disabled only on /example_dags after https://github.com/PyCQA/pylint/projects/1.
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
@ -446,7 +446,8 @@ good-names=e,
i,
j,
k,
_
_,
ti # Commonly used in Airflow as shorthand for taskinstance
# Include a hint for the correct naming format with invalid-name.
include-naming-hint=no

Просмотреть файл

@ -61,4 +61,4 @@ t2 = AzureCosmosInsertDocumentOperator(
document={"id": "someuniqueid", "param1": "value1", "param2": "value2"},
azure_cosmos_conn_id='azure_cosmos_default')
t1 >> t2 # pylint: disable=pointless-statement
t1 >> t2

Просмотреть файл

@ -212,12 +212,12 @@ msg_failure_callback = DingdingOperator(
dag=dag,
)
[ # pylint: disable=pointless-statement
[
text_msg_remind_none,
text_msg_remind_specific,
text_msg_remind_include_invalid,
text_msg_remind_all
] >> link_msg >> markdown_msg >> [ # pylint: disable=pointless-statement
] >> link_msg >> markdown_msg >> [
single_action_card_msg,
multi_action_card_msg
] >> feed_card_msg >> msg_failure_callback # pylint: disable=pointless-statement
] >> feed_card_msg >> msg_failure_callback

Просмотреть файл

@ -109,7 +109,7 @@ with models.DAG(
cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
task_id='create_instance_task2',
)
create_instance_task >> create_instance_task2 # pylint: disable=pointless-statement
create_instance_task >> create_instance_task2
# [END howto_operator_gcp_bigtable_instance_create]
# [START howto_operator_gcp_bigtable_cluster_update]
@ -126,7 +126,7 @@ with models.DAG(
nodes=int(CBT_CLUSTER_NODES_UPDATED),
task_id='update_cluster_task2',
)
cluster_update_task >> cluster_update_task2 # pylint: disable=pointless-statement
cluster_update_task >> cluster_update_task2
# [END howto_operator_gcp_bigtable_cluster_update]
# [START howto_operator_gcp_bigtable_instance_delete]
@ -153,7 +153,7 @@ with models.DAG(
table_id=CBT_TABLE_ID,
task_id='create_table_task2',
)
create_table_task >> create_table_task2 # pylint: disable=pointless-statement
create_table_task >> create_table_task2
# [END howto_operator_gcp_bigtable_table_create]
# [START howto_operator_gcp_bigtable_table_wait_for_replication]
@ -188,7 +188,6 @@ with models.DAG(
)
# [END howto_operator_gcp_bigtable_table_delete]
# pylint: disable=pointless-statement
wait_for_table_replication_task >> delete_table_task
wait_for_table_replication_task2 >> delete_table_task
wait_for_table_replication_task >> delete_table_task2
@ -205,4 +204,3 @@ with models.DAG(
# Only delete instances after all tables are deleted
[delete_table_task, delete_table_task2] >> \
delete_instance_task >> delete_instance_task2
# pylint: enable=pointless-statement

Просмотреть файл

@ -111,7 +111,5 @@ with models.DAG(
)
# [END howto_operator_gce_set_machine_type_no_project_id]
# pylint: disable=pointless-statement
gce_instance_start >> gce_instance_start2 >> gce_instance_stop >> \
gce_instance_stop2 >> gce_set_machine_type >> gce_set_machine_type2
# pylint: enable=pointless-statement

Просмотреть файл

@ -140,8 +140,6 @@ with models.DAG(
)
# [END howto_operator_gce_igm_update_template_no_project_id]
# pylint: disable=pointless-statement
gce_instance_template_copy >> gce_instance_template_copy2 >> \
gce_instance_group_manager_update_template >> \
gce_instance_group_manager_update_template2
# pylint: enable=pointless-statement

Просмотреть файл

@ -130,4 +130,4 @@ with models.DAG(
name=FUNCTION_NAME
)
# [END howto_operator_gcf_delete]
deploy_task >> deploy2_task >> delete_task # pylint: disable=pointless-statement
deploy_task >> deploy2_task >> delete_task

Просмотреть файл

@ -107,9 +107,7 @@ with models.DAG(
)
# [END howto_operator_gcp_natural_language_analyze_classify_text_result]
# pylint: disable=pointless-statement
analyze_entities >> analyze_entities_result
analyze_entity_sentiment >> analyze_entity_sentiment_result
analyze_sentiment >> analyze_sentiment_result
analyze_classify_text >> analyze_classify_text_result
# pylint: enable=pointless-statement

Просмотреть файл

@ -183,7 +183,6 @@ with models.DAG(
)
# [END howto_operator_spanner_delete]
# pylint: disable=pointless-statement
spanner_instance_create_task \
>> spanner_instance_update_task \
>> spanner_database_deploy_task \
@ -197,4 +196,3 @@ with models.DAG(
>> spanner_database_delete_task2 \
>> spanner_instance_delete_task \
>> spanner_instance_delete_task2
# pylint: enable=pointless-statement

Просмотреть файл

@ -84,7 +84,7 @@ with models.DAG(
)
# [END howto_operator_speech_to_text_recognize]
text_to_speech_synthesize_task >> speech_to_text_recognize_task # pylint: disable=pointless-statement
text_to_speech_synthesize_task >> speech_to_text_recognize_task
# [START howto_operator_translate_speech]
translate_speech_task = GcpTranslateSpeechOperator(
@ -99,4 +99,4 @@ with models.DAG(
)
# [END howto_operator_translate_speech]
text_to_speech_synthesize_task >> translate_speech_task # pylint: disable=pointless-statement
text_to_speech_synthesize_task >> translate_speech_task

Просмотреть файл

@ -186,7 +186,7 @@ with models.DAG(
) as dag:
def next_dep(task, prev):
prev >> task # pylint: disable=pointless-statement
prev >> task
return task
# ############################################## #

Просмотреть файл

@ -287,7 +287,7 @@ with models.DAG(
)
tasks.append(task)
if prev_task:
prev_task >> task # pylint: disable=pointless-statement
prev_task >> task
prev_task = task
# [END howto_operator_cloudsql_query_operators]

Просмотреть файл

@ -248,9 +248,7 @@ with models.DAG(
project_id=GCP_PROJECT_ID,
)
# pylint: disable=pointless-statement
create_transfer_job_from_aws >> wait_for_operation_to_start >> pause_operation >> \
list_operations >> get_operation >> resume_operation >> wait_for_operation_to_end >> \
create_transfer_job_from_gcp >> wait_for_second_operation_to_start >> cancel_operation >> \
delete_transfer_from_aws_job >> delete_transfer_from_gcp_job
# pylint: enable=pointless-statement

Просмотреть файл

@ -49,5 +49,5 @@ with models.DAG(
task_id='access',
bash_command="echo '{{ task_instance.xcom_pull(\"translate\")[0] }}'"
)
product_set_create >> translation_access # pylint: disable=pointless-statement
product_set_create >> translation_access
# [END howto_operator_translate_access]

Просмотреть файл

@ -230,7 +230,6 @@ with models.DAG(
)
# [END howto_operator_vision_remove_product_from_product_set]
# pylint: disable=pointless-statement
# Product path
product_create >> product_get >> product_update >> product_delete
@ -246,7 +245,6 @@ with models.DAG(
add_product_to_product_set >> remove_product_from_product_set
remove_product_from_product_set >> product_delete
remove_product_from_product_set >> product_set_delete
# pylint: enable=pointless-statement
with models.DAG(
'example_gcp_vision_explicit_id', default_args=default_args, schedule_interval=None
@ -384,7 +382,6 @@ with models.DAG(
)
# [END howto_operator_vision_remove_product_from_product_set_2]
# pylint: disable=pointless-statement
# Product path
product_create_2 >> product_create_2_idempotence >> product_get_2 >> product_update_2 >> product_delete_2
@ -401,7 +398,6 @@ with models.DAG(
product_create_2 >> add_product_to_product_set_2
remove_product_from_product_set_2 >> product_set_delete_2
remove_product_from_product_set_2 >> product_delete_2
# pylint: enable=pointless-statement
with models.DAG(
'example_gcp_vision_annotate_image', default_args=default_args, schedule_interval=None
@ -481,11 +477,9 @@ with models.DAG(
)
# [END howto_operator_vision_detect_safe_search_result]
# pylint: disable=pointless-statement
annotate_image >> annotate_image_result
detect_text >> detect_text_result
document_detect_text >> document_detect_text_result
detect_labels >> detect_labels_result
detect_safe_search >> detect_safe_search_result
# pylint: enable=pointless-statement

Просмотреть файл

@ -74,5 +74,4 @@ with models.DAG(
)
# [END howto_operator_gcs_object_create_acl_entry_task]
# pylint: disable=pointless-statement
gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task

Просмотреть файл

@ -64,4 +64,4 @@ if gcs_to_bq is not None:
bash_command='bq rm -rf airflow_test',
dag=dag)
create_test_dataset >> load_csv >> delete_test_dataset # pylint: disable=pointless-statement
create_test_dataset >> load_csv >> delete_test_dataset

Просмотреть файл

@ -82,5 +82,5 @@ with DAG('pubsub-end-to-end', default_args=default_args,
t6 = PubSubSubscriptionDeleteOperator(task_id='delete-subscription')
t7 = PubSubTopicDeleteOperator(task_id='delete-topic')
t1 >> t2 >> t3 # pylint: disable=pointless-statement
t2 >> t4 >> t5 >> t6 >> t7 # pylint: disable=pointless-statement
t1 >> t2 >> t3
t2 >> t4 >> t5 >> t6 >> t7

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the BashOperator."""
from datetime import timedelta
import airflow

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the BranchPythonOperator."""
import random
import airflow

Просмотреть файл

@ -17,6 +17,11 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
or skipped on alternating runs.
"""
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
@ -28,9 +33,6 @@ args = {
'depends_on_past': True,
}
# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
@ -39,6 +41,13 @@ dag = DAG(
def should_run(**kwargs):
"""
Determine which dummy_task should be run based on if the execution date minute is even or odd.
:param dict kwargs: Context
:return: Id of the task to run
:rtype: str
"""
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:

Просмотреть файл

@ -16,9 +16,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
### Example HTTP operator and sensor
"""
"""Example HTTP operator and sensor"""
import json
from datetime import timedelta
@ -48,7 +48,7 @@ t1 = SimpleHttpOperator(
endpoint='api/v1.0/nodes',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: True if len(response.json()) == 0 else False,
response_check=lambda response: len(response.json()) == 0,
dag=dag,
)
@ -92,7 +92,7 @@ sensor = HttpSensor(
http_conn_id='http_default',
endpoint='',
request_params={},
response_check=lambda response: True if "Google" in response.text else False,
response_check=lambda response: "Google" in response.text,
poke_interval=5,
dag=dag,
)

Просмотреть файл

@ -16,9 +16,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example of the LatestOnlyOperator
"""
"""Example of the LatestOnlyOperator"""
import datetime as dt
import airflow

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the params arguments in templated arguments."""
from datetime import timedelta
import airflow
@ -35,10 +37,12 @@ dag = DAG(
)
def my_py_command(ds, **kwargs):
# Print out the "foo" param passed in via
# `airflow test example_passing_params_via_test_command run_this <date>
# -tp '{"foo":"bar"}'`
def my_py_command(**kwargs):
"""
Print out the "foo" param passed in via
`airflow test example_passing_params_via_test_command run_this <date>
-tp '{"foo":"bar"}'`
"""
if kwargs["test_mode"]:
print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the PigOperator."""
import airflow
from airflow.models import DAG
from airflow.operators.pig_operator import PigOperator
@ -38,5 +40,3 @@ run_this = PigOperator(
pig_opts="-x local",
dag=dag,
)
run_this

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the PythonOperator."""
import time
from pprint import pprint
@ -38,6 +40,7 @@ dag = DAG(
# [START howto_operator_python]
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the ShortCircuitOperator."""
import airflow.utils.helpers
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default."""
import airflow
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
@ -30,17 +32,26 @@ args = {
# Create some placeholder operators
class DummySkipOperator(DummyOperator):
"""Dummy operator which always skips the task."""
ui_color = '#e8b7e4'
def execute(self, context):
raise AirflowSkipException
def create_test_pipeline(suffix, trigger_rule, dag):
skip_operator = DummySkipOperator(task_id='skip_operator_{}'.format(suffix), dag=dag)
always_true = DummyOperator(task_id='always_true_{}'.format(suffix), dag=dag)
join = DummyOperator(task_id=trigger_rule, dag=dag, trigger_rule=trigger_rule)
final = DummyOperator(task_id='final_{}'.format(suffix), dag=dag)
def create_test_pipeline(suffix, trigger_rule, dag_):
"""
Instantiate a number of operators for the given DAG.
:param str suffix: Suffix to append to the operator task_ids
:param str trigger_rule: TriggerRule for the join task
:param DAG dag_: The DAG to run the operators on
"""
skip_operator = DummySkipOperator(task_id='skip_operator_{}'.format(suffix), dag=dag_)
always_true = DummyOperator(task_id='always_true_{}'.format(suffix), dag=dag_)
join = DummyOperator(task_id=trigger_rule, dag=dag_, trigger_rule=trigger_rule)
final = DummyOperator(task_id='final_{}'.format(suffix), dag=dag_)
skip_operator >> join
always_true >> join

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the SubDagOperator."""
import airflow
from airflow.example_dags.subdags.subdag import subdag
from airflow.models import DAG

Просмотреть файл

@ -50,6 +50,7 @@ def conditionally_trigger(context, dag_run_obj):
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
return None
# Define the DAG

Просмотреть файл

@ -17,6 +17,25 @@
# specific language governing permissions and limitations
# under the License.
"""
This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
(in example_trigger_controller.py)
2. The Target DAG - DAG being triggered
This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
a. A python callable that decides whether or not to trigger the Target DAG
b. An optional params dict passed to the python callable to help in
evaluating whether or not to trigger the Target DAG
c. The id (name) of the Target DAG
d. The python callable can add contextual info to the DagRun created by
way of adding a Pickleable payload (e.g. dictionary of primitives). This
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""
import pprint
from datetime import datetime
@ -26,23 +45,6 @@ from airflow.operators.python_operator import PythonOperator
pp = pprint.PrettyPrinter(indent=4)
# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
# (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
# a. A python callable that decides whether or not to trigger the Target DAG
# b. An optional params dict passed to the python callable to help in
# evaluating whether or not to trigger the Target DAG
# c. The id (name) of the Target DAG
# d. The python callable can add contextual info to the DagRun created by
# way of adding a Pickleable payload (e.g. dictionary of primitives). This
# state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
@ -55,7 +57,12 @@ dag = DAG(
)
def run_this_func(ds, **kwargs):
def run_this_func(**kwargs):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param dict kwargs: Context
"""
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))

Просмотреть файл

@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of XComs."""
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
@ -38,25 +40,26 @@ def push(**kwargs):
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def push_by_returning(**kwargs):
def push_by_returning():
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
def puller(**kwargs):
"""Pull all previously pushed XComs and check if the pushed values match the pulled values."""
ti = kwargs['ti']
# get value_1
v1 = ti.xcom_pull(key=None, task_ids='push')
assert v1 == value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
assert pulled_value_1 == value_1
# get value_2
v2 = ti.xcom_pull(task_ids='push_by_returning')
assert v2 == value_2
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
assert pulled_value_2 == value_2
# get both value_1 and value_2
v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
assert (v1, v2) == (value_1, value_2)
pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
assert (pulled_value_1, pulled_value_2) == (value_1, value_2)
push1 = PythonOperator(

Просмотреть файл

@ -17,11 +17,22 @@
# specific language governing permissions and limitations
# under the License.
"""Helper function to generate a DAG and operators given some arguments."""
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
"""
Generate a DAG to be used as a subdag.
:param str parent_dag_name: Id of the parent DAG
:param str child_dag_name: Id of the child DAG
:param dict args: Default arguments to provide to the subdag
:return: DAG to use as a subdag
:rtype: airflow.models.DAG
"""
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,

Просмотреть файл

@ -287,28 +287,6 @@
./airflow/contrib/utils/mlengine_prediction_summary.py
./airflow/contrib/utils/sendgrid.py
./airflow/contrib/utils/weekday.py
./airflow/example_dags/__init__.py
./airflow/example_dags/docker_copy_data.py
./airflow/example_dags/example_bash_operator.py
./airflow/example_dags/example_branch_operator.py
./airflow/example_dags/example_branch_python_dop_operator_3.py
./airflow/example_dags/example_docker_operator.py
./airflow/example_dags/example_http_operator.py
./airflow/example_dags/example_latest_only.py
./airflow/example_dags/example_latest_only_with_trigger.py
./airflow/example_dags/example_passing_params_via_test_command.py
./airflow/example_dags/example_pig_operator.py
./airflow/example_dags/example_python_operator.py
./airflow/example_dags/example_short_circuit_operator.py
./airflow/example_dags/example_skip_dag.py
./airflow/example_dags/example_subdag_operator.py
./airflow/example_dags/example_trigger_controller_dag.py
./airflow/example_dags/example_trigger_target_dag.py
./airflow/example_dags/example_xcom.py
./airflow/example_dags/subdags/__init__.py
./airflow/example_dags/subdags/subdag.py
./airflow/example_dags/test_utils.py
./airflow/example_dags/tutorial.py
./airflow/exceptions.py
./airflow/executors/__init__.py
./airflow/executors/base_executor.py