incubator-airflow/UPDATING.md

479 строки
24 KiB
Markdown
Исходник Обычный вид История

# Updating Airflow
This file documents any backwards-incompatible changes in Airflow and
assists users migrating to a new version.
## Airflow Master
### Setting UTF-8 as default mime_charset in email utils
### Add a configuration variable(default_dag_run_display_number) to control numbers of dag run for display
Add a configuration variable(default_dag_run_display_number) under webserver section to control num of dag run to show in UI.
### Default executor for SubDagOperator is changed to SequentialExecutor
### New Webserver UI with Role-Based Access Control
The current webserver UI uses the Flask-Admin extension. The new webserver UI uses the [Flask-AppBuilder (FAB)](https://github.com/dpgaspar/Flask-AppBuilder) extension. FAB has built-in authentication support and Role-Based Access Control (RBAC), which provides configurable roles and permissions for individual users.
To turn on this feature, in your airflow.cfg file (under [webserver]), set the configuration variable `rbac = True`, and then run `airflow` command, which will generate the `webserver_config.py` file in your $AIRFLOW_HOME.
#### Setting up Authentication
FAB has built-in authentication support for DB, OAuth, OpenID, LDAP, and REMOTE_USER. The default auth type is `AUTH_DB`.
For any other authentication type (OAuth, OpenID, LDAP, REMOTE_USER), see the [Authentication section of FAB docs](http://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-methods) for how to configure variables in webserver_config.py file.
Once you modify your config file, run `airflow initdb` to generate new tables for RBAC support (these tables will have the prefix `ab_`).
#### Creating an Admin Account
Once configuration settings have been updated and new tables have been generated, create an admin account with `airflow create_user` command.
#### Using your new UI
Run `airflow webserver` to start the new UI. This will bring up a log in page, enter the recently created admin username and password.
There are five roles created for Airflow by default: Admin, User, Op, Viewer, and Public. To configure roles/permissions, go to the `Security` tab and click `List Roles` in the new UI.
#### Breaking changes
- AWS Batch Operator renamed property queue to job_queue to prevent conflict with the internal queue from CeleryExecutor - AIRFLOW-2542
- Users created and stored in the old users table will not be migrated automatically. FAB's built-in authentication support must be reconfigured.
- Airflow dag home page is now `/home` (instead of `/admin`).
- All ModelViews in Flask-AppBuilder follow a different pattern from Flask-Admin. The `/admin` part of the url path will no longer exist. For example: `/admin/connection` becomes `/connection/list`, `/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit` becomes `/connection/edit`, etc.
- Due to security concerns, the new webserver will no longer support the features in the `Data Profiling` menu of old UI, including `Ad Hoc Query`, `Charts`, and `Known Events`.
- HiveServer2Hook.get_results() always returns a list of tuples, even when a single column is queried, as per Python API 2.
### airflow.contrib.sensors.hdfs_sensors renamed to airflow.contrib.sensors.hdfs_sensor
We now rename airflow.contrib.sensors.hdfs_sensors to airflow.contrib.sensors.hdfs_sensor for consistency purpose.
### MySQL setting required
We now rely on more strict ANSI SQL settings for MySQL in order to have sane defaults. Make sure
to have specified `explicit_defaults_for_timestamp=1` in your my.cnf under `[mysqld]`
### Celery config
To make the config of Airflow compatible with Celery, some properties have been renamed:
```
celeryd_concurrency -> worker_concurrency
celery_result_backend -> result_backend
celery_ssl_active -> ssl_active
celery_ssl_cert -> ssl_cert
celery_ssl_key -> ssl_key
```
Resulting in the same config parameters as Celery 4, with more transparency.
### GCP Dataflow Operators
Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a default
"airflow-version" label, please upgrade your google-cloud-dataflow or apache-beam version
to 2.2.0 or greater.
### BigQuery Hooks and Operator
The `bql` parameter passed to `BigQueryOperator` and `BigQueryBaseCursor.run_query` has been deprecated and renamed to `sql` for consistency purposes. Using `bql` will still work (and raise a `DeprecationWarning`), but is no longer
supported and will be removed entirely in Airflow 2.0
### Redshift to S3 Operator
With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row,
we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is
faster for larger tables. So, parameter called `include_header` is added and default is set to False.
Header row will be added only if this parameter is set True and also in that case parallel will be automatically turned off (`PARALLEL OFF`)
### Google cloud connection string
With Airflow 1.9 or lower, there were two connection strings for the Google Cloud operators, both `google_cloud_storage_default` and `google_cloud_default`. This can be confusing and therefore the `google_cloud_storage_default` connection id has been replaced with `google_cloud_default` to make the connection id consistent across Airflow.
### Logging Configuration
With Airflow 1.9 or lower, `FILENAME_TEMPLATE`, `PROCESSOR_FILENAME_TEMPLATE`, `LOG_ID_TEMPLATE`, `END_OF_LOG_MARK` were configured in `airflow_local_settings.py`. These have been moved into the configuration file, and hence if you were using a custom configuration file the following defaults need to be added.
```
[core]
fab_logging_level = WARN
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
[elasticsearch]
elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_end_of_log_mark = end_of_log
```
## Airflow 1.9
2017-07-20 23:12:31 +03:00
### SSH Hook updates, along with new SSH Operator & SFTP Operator
SSH Hook now uses the Paramiko library to create an ssh client connection, instead of the sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
2017-07-20 23:12:31 +03:00
- update SSHHook constructor
- use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer to test_ssh_operator.py for usage info.
- SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer to test_sftp_operator.py.py for usage info.
- No updates are required if you are using ftpHook, it will continue to work as is.
### S3Hook switched to use Boto3
The airflow.hooks.S3_hook.S3Hook has been switched to use boto3 instead of the older boto (a.k.a. boto2). This results in a few backwards incompatible changes to the following classes: S3Hook:
- the constructors no longer accepts `s3_conn_id`. It is now called `aws_conn_id`.
- the default connection is now "aws_default" instead of "s3_default"
- the return type of objects returned by `get_bucket` is now boto3.s3.Bucket
- the return type of `get_key`, and `get_wildcard_key` is now an boto3.S3.Object.
If you are using any of these in your DAGs and specify a connection ID you will need to update the parameter name for the connection to "aws_conn_id": S3ToHiveTransfer, S3PrefixSensor, S3KeySensor, RedshiftToS3Transfer.
### Logging update
The logging structure of Airflow has been rewritten to make configuration easier and the logging system more transparent.
#### A quick recap about logging
A logger is the entry point into the logging system. Each logger is a named bucket to which messages can be written for processing. A logger is configured to have a log level. This log level describes the severity of the messages that the logger will handle. Python defines the following log levels: DEBUG, INFO, WARNING, ERROR or CRITICAL.
Each message that is written to the logger is a Log Record. Each log record contains a log level indicating the severity of that specific message. A log record can also contain useful metadata that describes the event that is being logged. This can include details such as a stack trace or an error code.
When a message is given to the logger, the log level of the message is compared to the log level of the logger. If the log level of the message meets or exceeds the log level of the logger itself, the message will undergo further processing. If it doesnt, the message will be ignored.
Once a logger has determined that a message needs to be processed, it is passed to a Handler. This configuration is now more flexible and can be easily be maintained in a single file.
#### Changes in Airflow Logging
Airflow's logging mechanism has been refactored to use Pythons builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extend this class, so it is easily available to do logging.
The main benefit is easier configuration of the logging by setting a single centralized python file. Disclaimer; there is still some inline configuration, but this will be removed eventually. The new logging class is defined by setting the dotted classpath in your `~/airflow/airflow.cfg` file:
```
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
```
The logging configuration file needs to be on the `PYTHONPATH`, for example `$AIRFLOW_HOME/config`. This directory is loaded by default. Any directory may be added to the `PYTHONPATH`, this might be handy when the config is in another directory or a volume is mounted in case of Docker.
The config can be taken from `airflow/config_templates/airflow_local_settings.py` as a starting point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`, and alter the config as is preferred.
```
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from airflow import configuration as conf
# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
DEFAULT_LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
# 's3.task': {
# 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
# 'formatter': 'airflow.task',
# 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
# 's3_log_folder': S3_LOG_FOLDER,
# 'filename_template': FILENAME_TEMPLATE,
# },
# 'gcs.task': {
# 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
# 'formatter': 'airflow.task',
# 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
# 'gcs_log_folder': GCS_LOG_FOLDER,
# 'filename_template': FILENAME_TEMPLATE,
# },
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
```
To customize the logging (for example, use logging rotate), define one or more of the logging handles that [Python has to offer](https://docs.python.org/3/library/logging.handlers.html). For more details about the Python logging, please refer to the [official logging documentation](https://docs.python.org/3/library/logging.html).
Furthermore, this change also simplifies logging within the DAG itself:
```
root@ae1bc863e815:/airflow# python
Python 3.6.2 (default, Sep 13 2017, 14:26:54)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.settings import *
>>>
>>> from datetime import datetime
>>> from airflow import DAG
>>> from airflow.operators.dummy_operator import DummyOperator
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
>>>
>>> task = DummyOperator(task_id='task_1', dag=dag)
>>>
>>> task.log.error('I want to say something..')
[2017-09-25 20:17:04,927] {<stdin>:1} ERROR - I want to say something..
```
#### Template path of the file_task_handler
The `file_task_handler` logger has been made more flexible. The default format can be changed, `{dag_id}/{task_id}/{execution_date}/{try_number}.log` by supplying Jinja templating in the `FILENAME_TEMPLATE` configuration variable. See the `file_task_handler` for more information.
#### I'm using S3Log or GCSLogs, what do I do!?
If you are logging to Google cloud storage, please see the [Google cloud platform documentation](https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform) for logging instructions.
If you are using S3, the instructions should be largely the same as the Google cloud platform instructions above. You will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
- Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py).
- Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable.
- Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
- Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8.
2017-07-20 23:12:31 +03:00
### New Features
#### Dask Executor
A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.
### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0
- If you're using the `google_cloud_conn_id` or `dataproc_cluster` argument names explicitly in `contrib.operators.Dataproc{*}Operator`(s), be sure to rename them to `gcp_conn_id` or `cluster_name`, respectively. We've renamed these arguments for consistency. (AIRFLOW-1323)
- `post_execute()` hooks now take two arguments, `context` and `result`
(AIRFLOW-886)
Previously, post_execute() only took one argument, `context`.
- `contrib.hooks.gcp_dataflow_hook.DataFlowHook` starts to use `--runner=DataflowRunner` instead of `DataflowPipelineRunner`, which is removed from the package `google-cloud-dataflow-0.6.0`.
- The pickle type for XCom messages has been replaced by json to prevent RCE attacks.
Note that JSON serialization is stricter than pickling, so if you want to e.g. pass
raw bytes through XCom you must encode them using an encoding like base64.
By default pickling is still enabled until Airflow 2.0. To disable it
set enable_xcom_pickling = False in your Airflow config.
## Airflow 1.8.1
The Airflow package name was changed from `airflow` to `apache-airflow` during this release. You must uninstall
a previously installed version of Airflow before installing 1.8.1.
## Airflow 1.8
### Database
The database schema needs to be upgraded. Make sure to shutdown Airflow and make a backup of your database. To
upgrade the schema issue `airflow upgradedb`.
### Upgrade systemd unit files
Systemd unit files have been updated. If you use systemd please make sure to update these.
> Please note that the webserver does not detach properly, this will be fixed in a future version.
2017-02-09 18:10:17 +03:00
### Tasks not starting although dependencies are met due to stricter pool checking
Airflow 1.7.1 has issues with being able to over subscribe to a pool, ie. more slots could be used than were
available. This is fixed in Airflow 1.8.0, but due to past issue jobs may fail to start although their
dependencies are met after an upgrade. To workaround either temporarily increase the amount of slots above
the amount of queued tasks or use a new pool.
2017-02-09 18:10:17 +03:00
### Less forgiving scheduler on dynamic start_date
Using a dynamic start_date (e.g. `start_date = datetime.now()`) is not considered a best practice. The 1.8.0 scheduler
is less forgiving in this area. If you encounter DAGs not being scheduled you can try using a fixed start_date and
renaming your DAG. The last step is required to make sure you start with a clean slate, otherwise the old schedule can
interfere.
### New and updated scheduler options
Please read through the new scheduler options, defaults have changed since 1.7.1.
#### child_process_log_directory
In order to increase the robustness of the scheduler, DAGS are now processed in their own process. Therefore each
DAG has its own log file for the scheduler. These log files are placed in `child_process_log_directory` which defaults to
`<AIRFLOW_HOME>/scheduler/latest`. You will need to make sure these log files are removed.
> DAG logs or processor logs ignore and command line settings for log file locations.
#### run_duration
Previously the command line option `num_runs` was used to let the scheduler terminate after a certain amount of
loops. This is now time bound and defaults to `-1`, which means run continuously. See also num_runs.
#### num_runs
Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies
the number of times to try to schedule each DAG file within `run_duration` time. Defaults to `-1`, which means try
2017-02-10 16:53:02 +03:00
indefinitely. This is only available on the command line.
#### min_file_process_interval
After how much time should an updated DAG be picked up from the filesystem.
#### min_file_parsing_loop_time
How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
#### dag_dir_list_interval
The frequency with which the scheduler should relist the contents of the DAG directory. If while developing +dags, they are not being picked up, have a look at this number and decrease it when necessary.
#### catchup_by_default
By default the scheduler will fill any missing interval DAG Runs between the last execution date and the current date.
This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as
`catchup = False / True`. Command line backfills will still work.
### Faulty DAGs do not show an error in the Web UI
Due to changes in the way Airflow processes DAGs the Web UI does not show an error when processing a faulty DAG. To
find processing errors go the `child_process_log_directory` which defaults to `<AIRFLOW_HOME>/scheduler/latest`.
### New DAGs are paused by default
Previously, new DAGs would be scheduled immediately. To retain the old behavior, add this to airflow.cfg:
```
[core]
dags_are_paused_at_creation = False
```
### Airflow Context variable are passed to Hive config if conf is specified
If you specify a hive conf to the run_cli command of the HiveHook, Airflow add some
convenience variables to the config. In case you run a secure Hadoop setup it might be
required to whitelist these variables by adding the following to your configuration:
```
<property>
<name>hive.security.authorization.sqlstd.confwhitelist.append</name>
<value>airflow\.ctx\..*</value>
</property>
```
### Google Cloud Operator and Hook alignment
All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection
type for all kinds of Google Cloud Operators.
If you experience problems connecting with your operator make sure you set the connection type "Google Cloud Platform".
Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service
account.
### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0
- Hooks and operators must be imported from their respective submodules
`airflow.operators.PigOperator` is no longer supported; `from airflow.operators.pig_operator import PigOperator` is.
(AIRFLOW-31, AIRFLOW-200)
- Operators no longer accept arbitrary arguments
Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without
complaint. Now, invalid arguments will be rejected. (https://github.com/apache/incubator-airflow/pull/1285)
- The config value secure_mode will default to True which will disable some insecure endpoints/features
2017-02-10 16:53:02 +03:00
### Known Issues
There is a report that the default of "-1" for num_runs creates an issue where errors are reported while parsing tasks.
It was not confirmed, but a workaround was found by changing the default back to `None`.
To do this edit `cli.py`, find the following:
```
'num_runs': Arg(
("-n", "--num_runs"),
default=-1, type=int,
help="Set the number of runs to execute before exiting"),
```
and change `default=-1` to `default=None`. If you have this issue please report it on the mailing list.
2017-02-10 16:53:02 +03:00
## Airflow 1.7.1.2
### Changes to Configuration
#### Email configuration change
To continue using the default smtp email backend, change the email_backend line in your config file from:
```
[email]
email_backend = airflow.utils.send_email_smtp
```
to:
```
[email]
email_backend = airflow.utils.email.send_email_smtp
```
#### S3 configuration change
To continue using S3 logging, update your config file so:
```
s3_log_folder = s3://my-airflow-log-bucket/logs
```
becomes:
```
remote_base_log_folder = s3://my-airflow-log-bucket/logs
remote_log_conn_id = <your desired s3 connection>
```