diff --git a/.circleci/config.yml b/.circleci/config.yml index f56bd85..4dfc60f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -94,7 +94,6 @@ jobs: name: Test Code command: docker run app:build pytest --flake8 --black - build-job-example_job: docker: - image: docker:stable-git @@ -123,6 +122,20 @@ jobs: command: docker build -t app:build jobs/experiments-monitoring-data-export/ + build-job-influxdb-to-bigquery: + docker: + - image: docker:stable-git + steps: + - checkout + - compare-branch: + pattern: ^jobs/influxdb-to-bigquery/ + - setup_remote_docker: + version: 19.03.13 + - run: + name: Build Docker image + command: docker build -t app:build jobs/influxdb-to-bigquery/ + + build-job-kpi-forecasting: docker: - image: docker:stable-git @@ -257,6 +270,7 @@ workflows: branches: only: main + job-dap-collector: jobs: - build-job-dap-collector @@ -275,7 +289,6 @@ workflows: jobs: - build-job-desktop-mobile-mau-2020 - job-example_job: jobs: - build-job-example_job @@ -304,6 +317,21 @@ workflows: branches: only: main + job-influxdb-to-bigquery: + jobs: + - build-job-influxdb-to-bigquery + - gcp-gcr/build-and-push-image: + context: data-eng-airflow-gcr + docker-context: jobs/influxdb-to-bigquery/ + path: jobs/influxdb-to-bigquery/ + image: influxdb-to-bigquery_docker_etl + requires: + - build-job-influxdb-to-bigquery + filters: + branches: + only: main + + job-kpi-forecasting: jobs: - build-job-kpi-forecasting diff --git a/jobs/influxdb-to-bigquery/.dockerignore b/jobs/influxdb-to-bigquery/.dockerignore new file mode 100644 index 0000000..cff5d6a --- /dev/null +++ b/jobs/influxdb-to-bigquery/.dockerignore @@ -0,0 +1,7 @@ +.ci_job.yaml +.ci_workflow.yaml +.DS_Store +*.pyc +.pytest_cache/ +__pycache__/ +venv/ diff --git a/jobs/influxdb-to-bigquery/.flake8 b/jobs/influxdb-to-bigquery/.flake8 new file mode 100644 index 0000000..2bcd70e --- /dev/null +++ b/jobs/influxdb-to-bigquery/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 88 diff --git a/jobs/influxdb-to-bigquery/.gitignore b/jobs/influxdb-to-bigquery/.gitignore new file mode 100644 index 0000000..2e9942c --- /dev/null +++ b/jobs/influxdb-to-bigquery/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +*.pyc +__pycache__/ +venv/ diff --git a/jobs/influxdb-to-bigquery/Dockerfile b/jobs/influxdb-to-bigquery/Dockerfile new file mode 100644 index 0000000..60bb688 --- /dev/null +++ b/jobs/influxdb-to-bigquery/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.8 +MAINTAINER + +# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md +ARG USER_ID="10001" +ARG GROUP_ID="app" +ARG HOME="/app" + +ENV HOME=${HOME} +RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \ + useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID} + +WORKDIR ${HOME} + +RUN pip install --upgrade pip + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . + +RUN pip install . + +# Drop root and change ownership of the application folder to the user +RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME} +USER ${USER_ID} diff --git a/jobs/influxdb-to-bigquery/README.md b/jobs/influxdb-to-bigquery/README.md new file mode 100644 index 0000000..e3d192e --- /dev/null +++ b/jobs/influxdb-to-bigquery/README.md @@ -0,0 +1,48 @@ +# InfluxDB to Bigquery import job + +This job collects AMP data from Influxdb and puts them into BQ. + +For more information on see https://mozilla-hub.atlassian.net/browse/RS-683 + +## Usage + +This script is intended to be run in a docker container. +Build the docker image with: + +```sh +podman build -t influxdb-to-bigquery . +``` + +To run locally, install dependencies with (in jobs/influxdb-to-bigquery): + +```sh +cd .. +pip install -r requirements.txt +``` + +Run the script with (needs gcloud auth): + +```sh +python3 influxdb_to_bigquery/main.py "--bq_project_id"=test_bq_project "--bq_dataset_id"=test_bq_dataset "--bq_table_id"=test_bq_table "--influxdb_measurement"=test_influx_measurement "--influxdb_username"=test_influx_un "--influxdb_password"=test_influx_pwd "--influxdb_host"=test_influx_host --date=test_date +``` + +## Development + +Run tests with: + +```sh +pytest +``` + +`flake8` and `black` are included for code linting and formatting: + +```sh +pytest --black --flake8 +``` + +or + +```sh +flake8 python_template_job/ tests/ +black --diff python_template_job/ tests/ +``` diff --git a/jobs/influxdb-to-bigquery/ci_job.yaml b/jobs/influxdb-to-bigquery/ci_job.yaml new file mode 100644 index 0000000..8d5d306 --- /dev/null +++ b/jobs/influxdb-to-bigquery/ci_job.yaml @@ -0,0 +1,12 @@ +build-job-influxdb-to-bigquery: + docker: + - image: docker:stable-git + steps: + - checkout + - compare-branch: + pattern: ^jobs/influxdb-to-bigquery/ + - setup_remote_docker: + version: 19.03.13 + - run: + name: Build Docker image + command: docker build -t app:build jobs/influxdb-to-bigquery/ diff --git a/jobs/influxdb-to-bigquery/ci_workflow.yaml b/jobs/influxdb-to-bigquery/ci_workflow.yaml new file mode 100644 index 0000000..62f7112 --- /dev/null +++ b/jobs/influxdb-to-bigquery/ci_workflow.yaml @@ -0,0 +1,13 @@ +job-influxdb-to-bigquery: + jobs: + - build-job-influxdb-to-bigquery + - gcp-gcr/build-and-push-image: + context: data-eng-airflow-gcr + docker-context: jobs/influxdb-to-bigquery/ + path: jobs/influxdb-to-bigquery/ + image: influxdb-to-bigquery_docker_etl + requires: + - build-job-influxdb-to-bigquery + filters: + branches: + only: main diff --git a/jobs/influxdb-to-bigquery/influxdb_to_bigquery/main.py b/jobs/influxdb-to-bigquery/influxdb_to_bigquery/main.py new file mode 100644 index 0000000..3d08a86 --- /dev/null +++ b/jobs/influxdb-to-bigquery/influxdb_to_bigquery/main.py @@ -0,0 +1,115 @@ +from google.cloud import bigquery +from influxdb import InfluxDBClient +import pandas as pd +import click +from datetime import datetime +import logging + + +def collect_influxdb_data( + influxdb_host, + influxdb_port, + influxdb_username, + influxdb_password, + influxdb_measurement, + date, + bq_project_id, + bq_dataset_id, + bq_table_id, +): + # Create InfluxDB client and extract data + client = InfluxDBClient( + host=influxdb_host, + port=influxdb_port, + username=influxdb_username, + password=influxdb_password, + ssl=True, + verify_ssl=True, + ) + query_template = "SELECT * FROM {influxdb_measurement} WHERE time >= '{date}T00:00:00Z' AND time < '{date}T23:59:59Z' AND \"environment\"='prod' " # noqa: E501,E261 + query = query_template.format(date=date, influxdb_measurement=influxdb_measurement) + results = client.query(query) + + # Convert the resultset to pd dataframe + df = pd.DataFrame(list(results.get_points())) + if len(df) > 0: + # rename the columns to be compatible with BQ + df.columns = df.columns.str.replace(".", "_") + df["time"] = pd.to_datetime(df["time"]) + df["submission_date"] = df["time"].apply(lambda x: x.date()) + load_bigquery_table(df, bq_project_id, bq_dataset_id, bq_table_id, date) + else: + logging.info(f"{influxdb_measurement} is empty".format(influxdb_measurement)) + + +def load_bigquery_table(df, bq_project_id, bq_dataset_id, bq_table_id, date): + # Create BigQuery client and insert data + bq_client = bigquery.Client(project=bq_project_id) + bq_dataset_ref = bq_client.dataset(bq_dataset_id) + bq_table_ref = bq_dataset_ref.table(bq_table_id) + + # Configure BigQuery job and write dataframe to table + job_config = bigquery.LoadJobConfig( + schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE, + time_partitioning=bigquery.table.TimePartitioning(field="submission_date"), + ) + partition = f"{bq_table_ref}${str(date).replace('-', '')}" + job = bq_client.load_table_from_dataframe(df, partition, job_config=job_config) + job.result() + + +@click.command() +@click.option("--bq_project_id", help="GCP BigQuery project id", required=True) +@click.option("--bq_dataset_id", help="GCP BigQuery dataset id", required=True) +@click.option( + "--bq_table_id", + help="GCP BigQuery table id", + required=True, +) +@click.option("--influxdb_measurement", help="Influx measurement to fetch", required=True) +@click.option( + "--influxdb_username", + help="Influxdb username", + required=True, +) +@click.option( + "--influxdb_password", + help="Influxdb password", + required=True, +) +@click.option( + "--influxdb_host", + help="Influxdb host URL", + required=True, +) +@click.option("--influxdb_port", default=8086, help="Influxdb port") +@click.option( + "--date", type=lambda x: datetime.strptime(x, "%Y-%m-%d").date(), required=True +) +def main( + bq_project_id, + bq_dataset_id, + bq_table_id, + influxdb_measurement, + influxdb_username, + influxdb_password, + influxdb_host, + influxdb_port, + date, +): + collect_influxdb_data( + influxdb_host, + influxdb_port, + influxdb_username, + influxdb_password, + influxdb_measurement, + date, + bq_project_id, + bq_dataset_id, + bq_table_id, + ) + + +if __name__ == "__main__": + main() diff --git a/jobs/influxdb-to-bigquery/pytest.ini b/jobs/influxdb-to-bigquery/pytest.ini new file mode 100644 index 0000000..e618d7a --- /dev/null +++ b/jobs/influxdb-to-bigquery/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = + tests diff --git a/jobs/influxdb-to-bigquery/requirements.txt b/jobs/influxdb-to-bigquery/requirements.txt new file mode 100644 index 0000000..0fc82aa --- /dev/null +++ b/jobs/influxdb-to-bigquery/requirements.txt @@ -0,0 +1,8 @@ +click==8.0.4 +pytest==6.0.2 +pytest-black==0.3.11 +pytest-flake8==1.0.6 +influxdb==5.3.1 +google-cloud-bigquery==3.5.0 +pandas==1.5.1 +pyarrow==8.0.0 \ No newline at end of file diff --git a/jobs/influxdb-to-bigquery/setup.py b/jobs/influxdb-to-bigquery/setup.py new file mode 100644 index 0000000..ac6fe9c --- /dev/null +++ b/jobs/influxdb-to-bigquery/setup.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + +readme = open("README.md").read() + +setup( + name="influxdb-to-bigquery", + version="0.1.0", + author="akommasani@mozilla.com", + packages=find_packages(include=["influxdb_to_bigquery"]), + long_description=readme, + include_package_data=True, + license="MPL 2.0", +) diff --git a/jobs/influxdb-to-bigquery/tests/__init__.py b/jobs/influxdb-to-bigquery/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index 1ae7fae..6195d73 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.8 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # # pip-compile --generate-hashes requirements.in #