feat(DENG-2682): Update fivetran imports to use airflow-provider-fivetran-async Airflow Fivetran provider (#1914)

* update fivetran imports to use airflow-provider-fivetran-async

* black and ruff errors fixed
This commit is contained in:
kik-kik 2024-02-21 17:51:16 +01:00 коммит произвёл GitHub
Родитель 68bfbe9d71
Коммит ec271e2c6d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 13 добавлений и 13 удалений

Просмотреть файл

@ -1,8 +1,9 @@
from datetime import datetime, timedelta
from airflow import DAG
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor
from utils.callbacks import retry_tasks_callback
from utils.tags import Tag
@ -33,25 +34,24 @@ default_args = {
tags = [Tag.ImpactTier.tier_1]
with DAG(
'fivetran_casa',
"fivetran_casa",
default_args=default_args,
doc_md=DOCS,
schedule_interval="0 5 * * *",
tags=tags,
) as dag:
casa_sync_start = FivetranOperator(
connector_id='{{ var.value.fivetran_casa_connector_id }}',
task_id='casa-task',
connector_id="{{ var.value.fivetran_casa_connector_id }}",
task_id="casa-task",
)
casa_sync_wait = FivetranSensor(
connector_id='{{ var.value.fivetran_casa_connector_id }}',
task_id='casa-sensor',
connector_id="{{ var.value.fivetran_casa_connector_id }}",
task_id="casa-sensor",
poke_interval=30,
xcom="{{ task_instance.xcom_pull('casa-task') }}",
on_retry_callback=retry_tasks_callback,
params={'retry_tasks': ['casa-task']},
params={"retry_tasks": ["casa-task"]},
)
casa_sync_start >> casa_sync_wait

Просмотреть файл

@ -5,8 +5,8 @@ from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor
from utils.acoustic.acoustic_client import AcousticClient
from utils.callbacks import retry_tasks_callback

Просмотреть файл

@ -8,7 +8,7 @@ apache-airflow[amazon,async,celery,cncf.kubernetes,github_enterprise,google_auth
apache-airflow-providers-google
apache-airflow-providers-http
apache-airflow-providers-slack
airflow-provider-fivetran==1.1.2
airflow-provider-fivetran-async==2.0.2
# Code quality
pytest==7.4.3

Просмотреть файл

@ -7,7 +7,7 @@
aiofiles==23.2.1
aiohttp==3.8.6
aiosignal==1.3.1
airflow-provider-fivetran==1.1.2
airflow-provider-fivetran-async==2.0.2
alembic==1.12.1
amqp==5.1.1
annotated-types==0.6.0