* InfuxDB to BQ ETL

* fix ci issues

* Incorporate feedback
This commit is contained in:
Alekhya 2023-05-16 16:47:54 -04:00 коммит произвёл GitHub
Родитель bffcf0af3b
Коммит 9f1363d3ec
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 285 добавлений и 4 удалений

Просмотреть файл

@ -94,7 +94,6 @@ jobs:
name: Test Code
command: docker run app:build pytest --flake8 --black
build-job-example_job:
docker:
- image: docker:stable-git
@ -123,6 +122,20 @@ jobs:
command: docker build -t app:build jobs/experiments-monitoring-data-export/
build-job-influxdb-to-bigquery:
docker:
- image: docker:stable-git
steps:
- checkout
- compare-branch:
pattern: ^jobs/influxdb-to-bigquery/
- setup_remote_docker:
version: 19.03.13
- run:
name: Build Docker image
command: docker build -t app:build jobs/influxdb-to-bigquery/
build-job-kpi-forecasting:
docker:
- image: docker:stable-git
@ -257,6 +270,7 @@ workflows:
branches:
only: main
job-dap-collector:
jobs:
- build-job-dap-collector
@ -275,7 +289,6 @@ workflows:
jobs:
- build-job-desktop-mobile-mau-2020
job-example_job:
jobs:
- build-job-example_job
@ -304,6 +317,21 @@ workflows:
branches:
only: main
job-influxdb-to-bigquery:
jobs:
- build-job-influxdb-to-bigquery
- gcp-gcr/build-and-push-image:
context: data-eng-airflow-gcr
docker-context: jobs/influxdb-to-bigquery/
path: jobs/influxdb-to-bigquery/
image: influxdb-to-bigquery_docker_etl
requires:
- build-job-influxdb-to-bigquery
filters:
branches:
only: main
job-kpi-forecasting:
jobs:
- build-job-kpi-forecasting

Просмотреть файл

@ -0,0 +1,7 @@
.ci_job.yaml
.ci_workflow.yaml
.DS_Store
*.pyc
.pytest_cache/
__pycache__/
venv/

Просмотреть файл

@ -0,0 +1,2 @@
[flake8]
max-line-length = 88

4
jobs/influxdb-to-bigquery/.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,4 @@
.DS_Store
*.pyc
__pycache__/
venv/

Просмотреть файл

@ -0,0 +1,26 @@
FROM python:3.8
MAINTAINER <akommasani@mozilla.com>
# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md
ARG USER_ID="10001"
ARG GROUP_ID="app"
ARG HOME="/app"
ENV HOME=${HOME}
RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \
useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID}
WORKDIR ${HOME}
RUN pip install --upgrade pip
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
RUN pip install .
# Drop root and change ownership of the application folder to the user
RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME}
USER ${USER_ID}

Просмотреть файл

@ -0,0 +1,48 @@
# InfluxDB to Bigquery import job
This job collects AMP data from Influxdb and puts them into BQ.
For more information on see https://mozilla-hub.atlassian.net/browse/RS-683
## Usage
This script is intended to be run in a docker container.
Build the docker image with:
```sh
podman build -t influxdb-to-bigquery .
```
To run locally, install dependencies with (in jobs/influxdb-to-bigquery):
```sh
cd ..
pip install -r requirements.txt
```
Run the script with (needs gcloud auth):
```sh
python3 influxdb_to_bigquery/main.py "--bq_project_id"=test_bq_project "--bq_dataset_id"=test_bq_dataset "--bq_table_id"=test_bq_table "--influxdb_measurement"=test_influx_measurement "--influxdb_username"=test_influx_un "--influxdb_password"=test_influx_pwd "--influxdb_host"=test_influx_host --date=test_date
```
## Development
Run tests with:
```sh
pytest
```
`flake8` and `black` are included for code linting and formatting:
```sh
pytest --black --flake8
```
or
```sh
flake8 python_template_job/ tests/
black --diff python_template_job/ tests/
```

Просмотреть файл

@ -0,0 +1,12 @@
build-job-influxdb-to-bigquery:
docker:
- image: docker:stable-git
steps:
- checkout
- compare-branch:
pattern: ^jobs/influxdb-to-bigquery/
- setup_remote_docker:
version: 19.03.13
- run:
name: Build Docker image
command: docker build -t app:build jobs/influxdb-to-bigquery/

Просмотреть файл

@ -0,0 +1,13 @@
job-influxdb-to-bigquery:
jobs:
- build-job-influxdb-to-bigquery
- gcp-gcr/build-and-push-image:
context: data-eng-airflow-gcr
docker-context: jobs/influxdb-to-bigquery/
path: jobs/influxdb-to-bigquery/
image: influxdb-to-bigquery_docker_etl
requires:
- build-job-influxdb-to-bigquery
filters:
branches:
only: main

Просмотреть файл

@ -0,0 +1,115 @@
from google.cloud import bigquery
from influxdb import InfluxDBClient
import pandas as pd
import click
from datetime import datetime
import logging
def collect_influxdb_data(
influxdb_host,
influxdb_port,
influxdb_username,
influxdb_password,
influxdb_measurement,
date,
bq_project_id,
bq_dataset_id,
bq_table_id,
):
# Create InfluxDB client and extract data
client = InfluxDBClient(
host=influxdb_host,
port=influxdb_port,
username=influxdb_username,
password=influxdb_password,
ssl=True,
verify_ssl=True,
)
query_template = "SELECT * FROM {influxdb_measurement} WHERE time >= '{date}T00:00:00Z' AND time < '{date}T23:59:59Z' AND \"environment\"='prod' " # noqa: E501,E261
query = query_template.format(date=date, influxdb_measurement=influxdb_measurement)
results = client.query(query)
# Convert the resultset to pd dataframe
df = pd.DataFrame(list(results.get_points()))
if len(df) > 0:
# rename the columns to be compatible with BQ
df.columns = df.columns.str.replace(".", "_")
df["time"] = pd.to_datetime(df["time"])
df["submission_date"] = df["time"].apply(lambda x: x.date())
load_bigquery_table(df, bq_project_id, bq_dataset_id, bq_table_id, date)
else:
logging.info(f"{influxdb_measurement} is empty".format(influxdb_measurement))
def load_bigquery_table(df, bq_project_id, bq_dataset_id, bq_table_id, date):
# Create BigQuery client and insert data
bq_client = bigquery.Client(project=bq_project_id)
bq_dataset_ref = bq_client.dataset(bq_dataset_id)
bq_table_ref = bq_dataset_ref.table(bq_table_id)
# Configure BigQuery job and write dataframe to table
job_config = bigquery.LoadJobConfig(
schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE,
time_partitioning=bigquery.table.TimePartitioning(field="submission_date"),
)
partition = f"{bq_table_ref}${str(date).replace('-', '')}"
job = bq_client.load_table_from_dataframe(df, partition, job_config=job_config)
job.result()
@click.command()
@click.option("--bq_project_id", help="GCP BigQuery project id", required=True)
@click.option("--bq_dataset_id", help="GCP BigQuery dataset id", required=True)
@click.option(
"--bq_table_id",
help="GCP BigQuery table id",
required=True,
)
@click.option("--influxdb_measurement", help="Influx measurement to fetch", required=True)
@click.option(
"--influxdb_username",
help="Influxdb username",
required=True,
)
@click.option(
"--influxdb_password",
help="Influxdb password",
required=True,
)
@click.option(
"--influxdb_host",
help="Influxdb host URL",
required=True,
)
@click.option("--influxdb_port", default=8086, help="Influxdb port")
@click.option(
"--date", type=lambda x: datetime.strptime(x, "%Y-%m-%d").date(), required=True
)
def main(
bq_project_id,
bq_dataset_id,
bq_table_id,
influxdb_measurement,
influxdb_username,
influxdb_password,
influxdb_host,
influxdb_port,
date,
):
collect_influxdb_data(
influxdb_host,
influxdb_port,
influxdb_username,
influxdb_password,
influxdb_measurement,
date,
bq_project_id,
bq_dataset_id,
bq_table_id,
)
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,3 @@
[pytest]
testpaths =
tests

Просмотреть файл

@ -0,0 +1,8 @@
click==8.0.4
pytest==6.0.2
pytest-black==0.3.11
pytest-flake8==1.0.6
influxdb==5.3.1
google-cloud-bigquery==3.5.0
pandas==1.5.1
pyarrow==8.0.0

Просмотреть файл

@ -0,0 +1,15 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
readme = open("README.md").read()
setup(
name="influxdb-to-bigquery",
version="0.1.0",
author="akommasani@mozilla.com",
packages=find_packages(include=["influxdb_to_bigquery"]),
long_description=readme,
include_package_data=True,
license="MPL 2.0",
)

Просмотреть файл

Просмотреть файл

@ -1,6 +1,6 @@
#
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --generate-hashes requirements.in
#