Following srose suggestion and removing the usage of fivetran sensor to avoid double wait / blocking (#5080)

This commit is contained in:
kik-kik 2024-02-21 19:28:16 +01:00 коммит произвёл GitHub
Родитель 5572947730
Коммит 3ba5b32bb8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
2 изменённых файлов: 3 добавлений и 39 удалений

Просмотреть файл

@ -27,8 +27,6 @@ from utils.gcp import bigquery_etl_query, bigquery_dq_check
{% if ns.uses_fivetran -%}
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor
from utils.callbacks import retry_tasks_callback
{% endif -%}
docs = """
@ -387,19 +385,9 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
task_id='{{ fivetran_task.task_id }}_task',
)
{{ fivetran_task.task_id }}_sync_wait = FivetranSensor(
connector_id='{% raw %}{{ var.value.{% endraw %}{{ fivetran_task.task_id }}{% raw %}_connector_id }}{% endraw %}',
task_id='{{ fivetran_task.task_id }}_sensor',
poke_interval=30,
xcom="{% raw %}{{{% endraw %} task_instance.xcom_pull('{{ fivetran_task.task_id }}_task') {% raw %}}}{% endraw %}",
on_retry_callback=retry_tasks_callback,
params={'retry_tasks': ['{{ fivetran_task.task_id }}_task']},
)
{{ fivetran_task.task_id }}_sync_wait.set_upstream({{ fivetran_task.task_id }}_sync_start)
{% do fivetran_seen.append(fivetran_task) %}
{% endif -%}
{{ task.task_name }}.set_upstream({{ fivetran_task.task_id }}_sync_wait)
{{ task.task_name }}.set_upstream({{ fivetran_task.task_id }}_sync_start)
{% endfor -%}
{% endif -%}

Просмотреть файл

@ -10,8 +10,6 @@ from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, bigquery_dq_check
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor
from utils.callbacks import retry_tasks_callback
docs = """
### bqetl_test_dag
@ -67,33 +65,11 @@ with DAG(
task_id="fivetran_import_1_task",
)
fivetran_import_1_sync_wait = FivetranSensor(
connector_id="{{ var.value.fivetran_import_1_connector_id }}",
task_id="fivetran_import_1_sensor",
poke_interval=30,
xcom="{{ task_instance.xcom_pull('fivetran_import_1_task') }}",
on_retry_callback=retry_tasks_callback,
params={"retry_tasks": ["fivetran_import_1_task"]},
)
fivetran_import_1_sync_wait.set_upstream(fivetran_import_1_sync_start)
test__non_incremental_query__v1.set_upstream(fivetran_import_1_sync_wait)
test__non_incremental_query__v1.set_upstream(fivetran_import_1_sync_start)
fivetran_import_2_sync_start = FivetranOperator(
connector_id="{{ var.value.fivetran_import_2_connector_id }}",
task_id="fivetran_import_2_task",
)
fivetran_import_2_sync_wait = FivetranSensor(
connector_id="{{ var.value.fivetran_import_2_connector_id }}",
task_id="fivetran_import_2_sensor",
poke_interval=30,
xcom="{{ task_instance.xcom_pull('fivetran_import_2_task') }}",
on_retry_callback=retry_tasks_callback,
params={"retry_tasks": ["fivetran_import_2_task"]},
)
fivetran_import_2_sync_wait.set_upstream(fivetran_import_2_sync_start)
test__non_incremental_query__v1.set_upstream(fivetran_import_2_sync_wait)
test__non_incremental_query__v1.set_upstream(fivetran_import_2_sync_start)