Use the `airflow-provider-fivetran` package instead of our backport. (#3479)

And use the `airflow-provider-fivetran` package's new feature to pass the return value from the Fivetran operator to the Fivetran sensor via XCom so the sensor doesn't miss syncs that finish before it can check.
This commit is contained in:
Sean Rose 2023-01-03 15:52:13 -08:00 коммит произвёл GitHub
Родитель e11d5fc637
Коммит e1eb6df342
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 17 добавлений и 11 удалений

Просмотреть файл

@ -12,8 +12,8 @@ from utils.gcp import bigquery_etl_query, gke_command
{% for task in tasks -%}
{% if task.depends_on_fivetran != None and task.depends_on_fivetran|length > 0 and ns.uses_fivetran == False -%}
{% set ns.uses_fivetran = True -%}
from operators.backport.fivetran.operator import FivetranOperator
from operators.backport.fivetran.sensor import FivetranSensor
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
{% endif -%}
{% endfor -%}
@ -217,7 +217,8 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{{ fivetran_task.task_id }}_sync_wait = FivetranSensor(
connector_id='{% raw %}{{ var.value.{% endraw %}{{ fivetran_task.task_id }}{% raw %}_connector_id }}{% endraw %}',
task_id='{{ fivetran_task.task_id }}_sensor',
poke_interval=5
poke_interval=5,
xcom="{% raw %}{{{% endraw %} task_instance.xcom_pull('{{ fivetran_task.task_id }}_task') {% raw %}}}{% endraw %}",
)
{{ fivetran_task.task_id }}_sync_wait.set_upstream({{ fivetran_task.task_id }}_sync_start)

Просмотреть файл

@ -8,8 +8,8 @@ import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from operators.backport.fivetran.operator import FivetranOperator
from operators.backport.fivetran.sensor import FivetranSensor
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
docs = """
### bqetl_cjms_nonprod
@ -98,6 +98,7 @@ with DAG(
connector_id="{{ var.value.fivetran_stripe_nonprod_connector_id }}",
task_id="fivetran_stripe_nonprod_sensor",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_stripe_nonprod_task') }}",
)
fivetran_stripe_nonprod_sync_wait.set_upstream(fivetran_stripe_nonprod_sync_start)

Просмотреть файл

@ -8,8 +8,8 @@ import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from operators.backport.fivetran.operator import FivetranOperator
from operators.backport.fivetran.sensor import FivetranSensor
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
docs = """
### bqetl_monitoring_airflow
@ -142,6 +142,7 @@ with DAG(
connector_id="{{ var.value.fivetran_airflow_metadata_import_connector_id }}",
task_id="fivetran_airflow_metadata_import_sensor",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_airflow_metadata_import_task') }}",
)
fivetran_airflow_metadata_import_sync_wait.set_upstream(

Просмотреть файл

@ -8,8 +8,8 @@ import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from operators.backport.fivetran.operator import FivetranOperator
from operators.backport.fivetran.sensor import FivetranSensor
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
docs = """
### bqetl_subplat
@ -643,6 +643,7 @@ with DAG(
connector_id="{{ var.value.fivetran_stripe_connector_id }}",
task_id="fivetran_stripe_sensor",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_stripe_task') }}",
)
fivetran_stripe_sync_wait.set_upstream(fivetran_stripe_sync_start)

Просмотреть файл

@ -8,8 +8,8 @@ import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from operators.backport.fivetran.operator import FivetranOperator
from operators.backport.fivetran.sensor import FivetranSensor
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
docs = """
### bqetl_test_dag
@ -65,6 +65,7 @@ with DAG(
connector_id="{{ var.value.fivetran_import_1_connector_id }}",
task_id="fivetran_import_1_sensor",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_import_1_task') }}",
)
fivetran_import_1_sync_wait.set_upstream(fivetran_import_1_sync_start)
@ -80,6 +81,7 @@ with DAG(
connector_id="{{ var.value.fivetran_import_2_connector_id }}",
task_id="fivetran_import_2_sensor",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_import_2_task') }}",
)
fivetran_import_2_sync_wait.set_upstream(fivetran_import_2_sync_start)