diff --git a/airflow/providers/google/cloud/example_dags/example_mysql_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_mysql_to_gcs.py new file mode 100644 index 0000000000..dbec5dcc87 --- /dev/null +++ b/airflow/providers/google/cloud/example_dags/example_mysql_to_gcs.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from airflow import models +from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator +from airflow.utils import dates + +GCS_BUCKET = os.environ.get("GCP_GCS_BUCKET", "example-airflow-mysql-gcs") +FILENAME = 'test_file' + +SQL_QUERY = "SELECT * from test_table" + +with models.DAG( + 'example_mysql_to_gcs', + default_args=dict(start_date=dates.days_ago(1)), + schedule_interval=None, + tags=['example'], +) as dag: + # [START howto_operator_mysql_to_gcs] + upload = MySQLToGCSOperator( + task_id='mysql_to_gcs', sql=SQL_QUERY, bucket=GCS_BUCKET, filename=FILENAME, export_format='csv' + ) + # [END howto_operator_mysql_to_gcs] diff --git a/docs/build_docs.py b/docs/build_docs.py index dd73ea9292..e6eea86ee9 100755 --- a/docs/build_docs.py +++ b/docs/build_docs.py @@ -450,7 +450,6 @@ MISSING_GOOGLE_DOC_GUIDES = { 'dlp', 'gcs_to_bigquery', 'mssql_to_gcs', - 'mysql_to_gcs', 'postgres_to_gcs', 'sql_to_gcs', 'tasks', diff --git a/docs/howto/operator/google/transfer/mysql_to_gcs.rst b/docs/howto/operator/google/transfer/mysql_to_gcs.rst new file mode 100644 index 0000000000..196cafdfd0 --- /dev/null +++ b/docs/howto/operator/google/transfer/mysql_to_gcs.rst @@ -0,0 +1,58 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +MySQL To Google Cloud Storage Operator +====================================== +The `Google Cloud Storage `__ (GCS) service is +used to store large data from various applications. This page shows how to copy +data from MySQL to GCS. + +.. contents:: + :depth: 1 + :local: + + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include::/howto/operator/google/_partials/prerequisite_tasks.rst + +.. _howto/operator:MySQLToGCSOperator: + +MySQLToGCSOperator +~~~~~~~~~~~~~~~~~~ + +:class:`~airflow.providers.google.cloud.transfers.mysql_to_gcs.MySQLToGCSOperator` allows you to upload +data from MySQL database to GCS. + +When you use this operator, you can optionally compress the data being uploaded to gzip format. + +Below is an example of using this operator to upload data to GCS. + +.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mysql_to_gcs.py + :language: python + :dedent: 0 + :start-after: [START howto_operator_mysql_to_gcs] + :end-before: [END howto_operator_mysql_to_gcs] + + +Reference +--------- + +For further information, look at: +* `MySQL Documentation `__ +* `Google Cloud Storage Documentation `__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 632c7c68a7..885c18b020 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -1024,7 +1024,7 @@ These integrations allow you to copy data from/to Google Cloud. * - `MySQL `__ - `Google Cloud Storage (GCS) `__ - - + - :doc:`How to use ` - :mod:`airflow.providers.google.cloud.transfers.mysql_to_gcs` * - `PostgresSQL `__ diff --git a/tests/providers/google/cloud/transfers/test_mysql_to_gcs_system.py b/tests/providers/google/cloud/transfers/test_mysql_to_gcs_system.py new file mode 100644 index 0000000000..65e2bc7af5 --- /dev/null +++ b/tests/providers/google/cloud/transfers/test_mysql_to_gcs_system.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest +from psycopg2 import ProgrammingError, OperationalError + +from airflow.providers.mysql.hooks.mysql import MySqlHook +from airflow.providers.google.cloud.example_dags.example_mysql_to_gcs import GCS_BUCKET +from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY +from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context + +CREATE_QUERY = """ +CREATE TABLE test_table +( + id int auto_increment primary key, + params json +); +""" + +LOAD_QUERY = """ +INSERT INTO test_table (id, params) +VALUES + ( + 1, '{ "customer": "Lily Bush", "items": {"product": "Diaper","qty": 24}}' + ), + ( + 2, '{ "customer": "Josh William", "items": {"product": "Toy Car","qty": 1}}' + ), + ( + 3, '{ "customer": "Mary Clark", "items": {"product": "Toy Train","qty": 2}}' + ); +""" +DELETE_QUERY = "DROP TABLE test_table;" + + +@pytest.mark.backend("mysql") +@pytest.mark.credential_file(GCP_GCS_KEY) +class MySQLToGCSSystemTest(GoogleSystemTest): + @staticmethod + def init_db(): + try: + hook = MySqlHook() + hook.run(CREATE_QUERY) + hook.run(LOAD_QUERY) + except (OperationalError, ProgrammingError): + pass + + @staticmethod + def drop_db(): + hook = MySqlHook() + hook.run(DELETE_QUERY) + + @provide_gcp_context(GCP_GCS_KEY) + def setUp(self): + super().setUp() + self.create_gcs_bucket(GCS_BUCKET) + self.init_db() + + @provide_gcp_context(GCP_GCS_KEY) + def test_run_example_dag(self): + self.run_dag('example_mysql_to_gcs', CLOUD_DAG_FOLDER) + + @provide_gcp_context(GCP_GCS_KEY) + def tearDown(self): + self.delete_gcs_bucket(GCS_BUCKET) + self.drop_db() + super().tearDown() diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py index fc6522dcbb..03ac050498 100644 --- a/tests/test_project_structure.py +++ b/tests/test_project_structure.py @@ -94,7 +94,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): ('cloud', 'sql_to_gcs'), ('cloud', 'bigquery_to_mysql'), ('cloud', 'cassandra_to_gcs'), - ('cloud', 'mysql_to_gcs'), ('cloud', 'mssql_to_gcs'), ('ads', 'ads_to_gcs'), }