Custom Fivetran connectors
Перейти к файлу
kik-kik 7ad4dc5528
Merge pull request #19 from mozilla/docs/improving-acoustic-docs
README for acoustic updated
2022-04-21 14:39:05 +02:00
.circleci cleaned up old acoustic code, no longer needed now using SFTP connector in Fivetrain 2022-03-24 13:48:08 +01:00
connectors README for acoustic updated 2022-04-13 17:42:59 +02:00
docs Add docs on setting up new connectors 2021-09-20 14:55:32 -07:00
tools Update docs 2022-02-22 12:16:51 -08:00
.flake8 Initial commit 2021-09-03 09:14:19 -07:00
.gcloudignore added .gcloudignore to prevent unneeded artifacts from being pushed to cloud function 2022-01-27 11:36:46 +01:00
.gitignore acoustic_connector_old 2022-03-22 11:54:36 +01:00
.isort.cfg Initial commit 2021-09-03 09:14:19 -07:00
LICENSE Initial commit 2021-09-03 09:14:19 -07:00
Makefile acoustic_connector_old 2022-03-22 11:54:36 +01:00
README.md acoustic_connector_old 2022-03-22 11:54:36 +01:00
deploy.yaml cleaned up old acoustic code, no longer needed now using SFTP connector in Fivetrain 2022-03-24 13:48:08 +01:00
fivetran Add Fivetran CLI 2021-11-01 12:22:50 -07:00
requirements.in Update dependencies 2021-11-01 12:22:50 -07:00
requirements.txt Update dependencies 2021-11-01 12:22:50 -07:00
setup.py Update README 2021-11-01 12:22:50 -07:00

README.md

fivetran-connectors

Custom connectors for Fivetran implemented as Google Cloud Functions.

Development

The tools in this repository are intended for Python 3.8+.

To install the CLI:

./fivetran bootstrap

Creating a New Connector

To add a new connector run:

./fivetran connector create <name_of_connector>

<name_of_connector> is the name of the new connector for which a new directory will be created in the connectors/ directory. The new connector directory will be automatically populated with boilerplate code.

Deploying Connectors

To deploy a connector as a Google Cloud Function, the connector needs to be added to the deploy.yaml file:

<connector-name>:     # name of the new Google Cloud Function (must be unique)
  connector: <connector-type>   # name of the connector as specified in connectors/
  environment: dev         # determines the GCP project the connector is deployed to

The connector will be deployed after the CircleCI configuration has been updated and pushed to main.

During development connectors can also be deployed quickly via:

cd connectors/new_connector
gcloud functions deploy new_connector --entry-point main --runtime python38 --trigger-http --timeout=540 --memory=4096MB

This does not require the code to be merged into main.

Updating the CircleCI Config

To Update the CircleCI config.yml and add new connectors to the CI workflow run:

./fivetran ci_config update

Custom Connector Development

This is a collection of things to keep in mind/useful information when writing code for custom connectors.

By default the main() method in the main.py file for the connector will be they entry point for the Cloud Function.

Passing Data into Connectors

To pass configuration data, such as API keys, into a custom connector a JSON object can be specified in the Fivetran settings. The JSON object can be set as the "Secret". The connector can access the JSON data via the request object:

def main(request):
    config = request.json['secrets']

Connector Response Format

The expected response format for connectors is:

{
    "state": {
        "since_id": "2018-01-02T00:00:01Z",
        /* other data */
    },
    "insert": {
        "table_name": [
            {"id":1, "value": 100},
            /* rows that will get added to "table_name" table in BigQuery */
        ],
        /* ... */
    },
    "delete": {},
    "schema" : {
        "table_name": {
            "primary_key": ["id"]
        },
        /* .... */
    },
    "has_more" : true /* if there is more data available that can be imported; or false */
}

More about the response format can be found here

Incremental Data Updates

To keep track of what data has already been imported in previous runs, Fivetran passes a since_id value as part of the state object. since_id needs to be updated by the connector and can be set, for example, to the date of the last data entry imported.

def main(request):
    # [...]

    since_id = request.json["state"]["since_id"]

    # make API request to get data added after since_id
    data = api.get_data(last_modified=since_id)

    # [...]
    # update since_id
    return {
        "state": {
            "since_id": max_date(data)
            # [...]
        },
        # [...]
    }

Follow-up Calls to Fetch more Data

Google Cloud Functions have a couple of limitations. For instance, the maximum runtime is around 10 minutes and available memory is limited. In some cases importing all data in one run might not be possible.

has_more can be used to indicate to Fivetran to trigger the connector again to import more available data. To keep track of what data has already been imported in previous runs, state has an offset value. offset can, for example, be set to the date of the last data record imported.

has_more and offset need to be updated every time the connector is triggered.

def main(request):
    # [...]
    offset = 1
    if "offset" in request.json["state"]:
        offset = request.json["state"]["offset"]

    # fetch more data

    # [...]

    # udpate offset and has_more
    return {
        "state": {
            "offset": offset + 1
        },
        "has_more": has_more_data(data),
        # [...]
    }

Connector Deployment

Once a new connector has been added and committed to main CircleCI will automatically deploy it as a Google Cloud Function to the dev-fivetran project.

When setting up a new custom connector in Fivetran for the first time, copy the trigger URL of the deployed function:

Cloud Function Trigger URL

In Fivetran, add a new "Google Cloud Function" connector, set the "Destination schema" as the create BigQuery destination dataset. Paste the trigger URL in the "Function Trigger" field.

The "Secrets" field is an optional JSON object that the Cloud Function connector will have access to. This JSON object can, for example, contain API access keys or other configuration values. Once the new connector has been configured, hit "Save and Test". The connector will get triggered to run an initial data import.

Generally, data imports should be scheduled via telemetry-airflow instead of Fivetrans built-in scheduling. Since Fivetran usually dumps data into a destination that is not accessible, it is also necessary to set up some ETL to transform the data and write it to an accessible destination. Both the ETL and the scheduling can be specified in bigquery-etl.

When writing the query to create derived datasets in bigquery-etl add the Fivetran import tasks to the scheduling config. Once changes are merged into main the DAG in Airflow will get updated automatically. Looking at the generated DAGs, each Fivetran task references a Fivetran connector ID that needs to be configured as an Airflow variable.

To configure this variable, in the Airflow Admin - Variables settings add a new entry. The Key needs to be set to the variable name as shown in the DAG source, the Value is the Fivetran connector ID which can be copied from the Connection Details of the Fivetran connector Setup tab.

Once configured, the Airflow DAG needs to be enabled.

How to Debug Issues

  1. Check Airflow errors to see which connector is affected.
    • The Airflow logs will not offer a lot of details on what went wrong. Instead they will link to the logs in Fivetran which are a little more helpful.
  2. Check Fivetran logs of the connector.
    • Go to Fivetran dashboard
    • Check the connector logs. reason might provide some indication on what went wrong, whether it was a Fivetran specific issue or whether the connector encountered a problem.
  3. Check the connector logs.
  4. Fix connector
    • If the connector needs to be fixed, deploy a new version (happens automatically when merged on main)
  5. Reconnect connector in Fivetran
    • Connectors need to be manually reconnected in Fivetran
    • Navigate to the Setup tab of the connector and click on Test Connection. Wait for the tests to finish.