[AIRFLOW-4947] Remove six types (#5581)

This commit is contained in:
Kamil Breguła 2019-07-14 07:09:19 +02:00 коммит произвёл Jarek Potiuk
Родитель 4a56abea3a
Коммит c2f5698eda
27 изменённых файлов: 47 добавлений и 77 удалений

Просмотреть файл

@ -66,7 +66,6 @@ from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr,
from airflow.www.app import cached_app, create_app, cached_appbuilder
from sqlalchemy.orm import exc
import six
api.load_auth()
api_module = import_module(conf.get('cli', 'api_client')) # type: Any
@ -371,7 +370,7 @@ def import_helper(filepath):
suc_count = fail_count = 0
for k, v in d.items():
try:
Variable.set(k, v, serialize_json=not isinstance(v, six.string_types))
Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e:
print('Variable import failed: {}'.format(repr(e)))
fail_count += 1

Просмотреть файл

@ -27,7 +27,6 @@ implementation for BigQuery.
import time
from copy import deepcopy
import six
from six import iteritems
from googleapiclient.discovery import build
@ -784,8 +783,8 @@ class BigQueryBaseCursor(LoggingMixin):
cluster_fields = {'fields': cluster_fields}
query_param_list = [
(sql, 'query', None, six.string_types),
(priority, 'priority', 'INTERACTIVE', six.string_types),
(sql, 'query', None, (str,)),
(priority, 'priority', 'INTERACTIVE', (str,)),
(use_legacy_sql, 'useLegacySql', self.use_legacy_sql, bool),
(query_params, 'queryParameters', None, list),
(udf_config, 'userDefinedFunctionResources', None, list),

Просмотреть файл

@ -24,7 +24,6 @@ import json
import time
from copy import deepcopy
import six
from googleapiclient.discovery import build
from airflow.exceptions import AirflowException
@ -412,7 +411,7 @@ class GCPTransferServiceHook(GoogleCloudBaseHook):
:rtype: bool
"""
expected_statuses = (
{expected_statuses} if isinstance(expected_statuses, six.string_types) else set(expected_statuses)
{expected_statuses} if isinstance(expected_statuses, str) else set(expected_statuses)
)
if not operations:
return False

Просмотреть файл

@ -22,7 +22,6 @@ import os
import pathlib
import time
import datetime
import six
import re
from airflow.exceptions import AirflowException
@ -243,7 +242,7 @@ class QuboleHook(BaseHook):
elif k in positional_args_list:
inplace_args = v
elif k == 'tags':
if isinstance(v, six.string_types):
if isinstance(v, str):
tags.add(v)
elif isinstance(v, (list, tuple)):
for val in v:

Просмотреть файл

@ -22,7 +22,6 @@ from base64 import b64encode
from cassandra.util import Date, Time, SortedSet, OrderedMapSerializedKey
from datetime import datetime
from decimal import Decimal
from six import text_type, binary_type
from tempfile import NamedTemporaryFile
from uuid import UUID
@ -215,9 +214,9 @@ class CassandraToGoogleCloudStorageOperator(BaseOperator):
def convert_value(cls, name, value):
if not value:
return value
elif isinstance(value, (text_type, int, float, bool, dict)):
elif isinstance(value, (str, int, float, bool, dict)):
return value
elif isinstance(value, binary_type):
elif isinstance(value, bytes):
return b64encode(value).decode('ascii')
elif isinstance(value, UUID):
return b64encode(value.bytes).decode('ascii')

Просмотреть файл

@ -22,7 +22,6 @@ This module contains Databricks operators.
"""
import time
import six
from airflow.exceptions import AirflowException
from airflow.contrib.hooks.databricks_hook import DatabricksHook
@ -44,9 +43,9 @@ def _deep_string_coerce(content, json_path='json'):
for numerical values.
"""
coerce = _deep_string_coerce
if isinstance(content, six.string_types):
if isinstance(content, str):
return content
elif isinstance(content, six.integer_types + (float,)):
elif isinstance(content, (int, float,)):
# Databricks can tolerate either numeric or string types in the API backend.
return str(content)
elif isinstance(content, (list, tuple)):

Просмотреть файл

@ -21,8 +21,6 @@ from copy import deepcopy
import re
from urllib.parse import urlparse, unquote
import six
from airflow import AirflowException
from airflow.contrib.hooks.gcp_cloud_build_hook import CloudBuildHook
from airflow.models import BaseOperator
@ -69,7 +67,7 @@ class BuildProcessor:
source = self.body["source"]["repoSource"]
if not isinstance(source, six.string_types):
if not isinstance(source, str):
return
self.body["source"]["repoSource"] = self._convert_repo_url_to_dict(source)
@ -80,7 +78,7 @@ class BuildProcessor:
source = self.body["source"]["storageSource"]
if not isinstance(source, six.string_types):
if not isinstance(source, str):
return
self.body["source"]["storageSource"] = self._convert_storage_url_to_dict(source)

Просмотреть файл

@ -20,8 +20,6 @@
This module contains Google Spanner operators.
"""
import six
from airflow import AirflowException
from airflow.contrib.hooks.gcp_spanner_hook import CloudSpannerHook
from airflow.models import BaseOperator
@ -205,7 +203,7 @@ class CloudSpannerInstanceDatabaseQueryOperator(BaseOperator):
def execute(self, context):
queries = self.query
if isinstance(self.query, six.string_types):
if isinstance(self.query, str):
queries = [x.strip() for x in self.query.split(';')]
self.sanitize_queries(queries)
self.log.info("Executing DML query(-ies) on "

Просмотреть файл

@ -27,7 +27,6 @@ from airflow.contrib.hooks.jenkins_hook import JenkinsHook
import jenkins
from jenkins import JenkinsException
from requests import Request
import six
from six.moves.urllib.error import HTTPError, URLError
@ -134,7 +133,7 @@ class JenkinsJobTriggerOperator(BaseOperator):
"""
# Warning if the parameter is too long, the URL can be longer than
# the maximum allowed size
if self.parameters and isinstance(self.parameters, six.string_types):
if self.parameters and isinstance(self.parameters, str):
import ast
self.parameters = ast.literal_eval(self.parameters)

Просмотреть файл

@ -29,7 +29,6 @@ from datetime import date, datetime, timedelta
from decimal import Decimal
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile
from six import string_types
import unicodecsv as csv
@ -249,7 +248,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
schema_str = None
schema_file_mime_type = 'application/json'
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
if self.schema is not None and isinstance(self.schema, string_types):
if self.schema is not None and isinstance(self.schema, str):
schema_str = self.schema.encode('utf-8')
elif self.schema is not None and isinstance(self.schema, list):
schema_str = json.dumps(self.schema).encode('utf-8')
@ -331,7 +330,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
Return a dict of column name and column type based on self.schema if not None.
"""
schema = []
if isinstance(self.schema, string_types):
if isinstance(self.schema, str):
schema = json.loads(self.schema)
elif isinstance(self.schema, list):
schema = self.schema

Просмотреть файл

@ -20,8 +20,6 @@
This module contains a Google Cloud Transfer sensor.
"""
import six
from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@ -64,7 +62,7 @@ class GCPTransferServiceWaitForJobStatusSensor(BaseSensorOperator):
super().__init__(*args, **kwargs)
self.job_name = job_name
self.expected_statuses = (
{expected_statuses} if isinstance(expected_statuses, six.string_types) else expected_statuses
{expected_statuses} if isinstance(expected_statuses, str) else expected_statuses
)
self.project_id = project_id
self.gcp_cloud_conn_id = gcp_conn_id

Просмотреть файл

@ -17,7 +17,6 @@
# specific language governing permissions and limitations
# under the License.
import six
from airflow.contrib.utils.weekday import WeekDay
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
@ -81,12 +80,12 @@ class DayOfWeekSensor(BaseSensorOperator):
super().__init__(*args, **kwargs)
self.week_day = week_day
self.use_task_execution_day = use_task_execution_day
if isinstance(self.week_day, six.string_types):
if isinstance(self.week_day, str):
self._week_day_num = {WeekDay.get_weekday_number(week_day_str=self.week_day)}
elif isinstance(self.week_day, WeekDay):
self._week_day_num = {self.week_day}
elif isinstance(self.week_day, set):
if all(isinstance(day, six.string_types) for day in self.week_day):
if all(isinstance(day, str) for day in self.week_day):
self._week_day_num = {WeekDay.get_weekday_number(day) for day in week_day}
elif all(isinstance(day, WeekDay) for day in self.week_day):
self._week_day_num = self.week_day

Просмотреть файл

@ -20,7 +20,6 @@
import MySQLdb
import MySQLdb.cursors
import json
import six
from airflow.hooks.dbapi_hook import DbApiHook
@ -106,7 +105,7 @@ class MySqlHook(DbApiHook):
# of extra/dejson we can get string if extra is passed via
# URL parameters
dejson_ssl = conn.extra_dejson['ssl']
if isinstance(dejson_ssl, six.string_types):
if isinstance(dejson_ssl, str):
dejson_ssl = json.loads(dejson_ssl)
conn_config['ssl'] = dejson_ssl
if conn.extra_dejson.get('unix_socket'):

Просмотреть файл

@ -28,7 +28,6 @@ from datetime import timedelta, datetime
from typing import Callable, Dict, Iterable, List, Optional, Set
import jinja2
import six
from airflow import configuration, settings
from airflow.exceptions import AirflowException
@ -643,7 +642,7 @@ class BaseOperator(LoggingMixin):
all elements in it. If the field has another type, it will return it as it is.
"""
rt = self.render_template
if isinstance(content, six.string_types):
if isinstance(content, str):
result = jinja_env.from_string(content).render(**context)
elif isinstance(content, (list, tuple)):
result = [rt(attr, e, context) for e in content]
@ -664,7 +663,7 @@ class BaseOperator(LoggingMixin):
exts = self.__class__.template_ext
if (
isinstance(content, six.string_types) and
isinstance(content, str) and
any([content.endswith(ext) for ext in exts])):
return jinja_env.get_template(content).render(**context)
else:
@ -689,7 +688,7 @@ class BaseOperator(LoggingMixin):
content = getattr(self, attr)
if content is None:
continue
elif isinstance(content, six.string_types) and \
elif isinstance(content, str) and \
any([content.endswith(ext) for ext in self.template_ext]):
env = self.get_template_env()
try:
@ -699,7 +698,7 @@ class BaseOperator(LoggingMixin):
elif isinstance(content, list):
env = self.dag.get_template_env()
for i in range(len(content)):
if isinstance(content[i], six.string_types) and \
if isinstance(content[i], str) and \
any([content[i].endswith(ext) for ext in self.template_ext]):
try:
content[i] = env.loader.get_source(env, content[i])[0]
@ -828,7 +827,7 @@ class BaseOperator(LoggingMixin):
self.log.info('Dry run')
for attr in self.template_fields:
content = getattr(self, attr)
if content and isinstance(content, six.string_types):
if content and isinstance(content, str):
self.log.info('Rendering template for %s', attr)
self.log.info(content)

Просмотреть файл

@ -31,7 +31,6 @@ from typing import Union, Optional, Iterable, Dict, Type, Callable, List
import jinja2
import pendulum
import six
from croniter import croniter
from dateutil.relativedelta import relativedelta
from sqlalchemy import Column, String, Boolean, Integer, Text, func, or_
@ -231,7 +230,7 @@ class DAG(BaseDag, LoggingMixin):
if start_date and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif 'start_date' in self.default_args and self.default_args['start_date']:
if isinstance(self.default_args['start_date'], six.string_types):
if isinstance(self.default_args['start_date'], str):
self.default_args['start_date'] = (
timezone.parse(self.default_args['start_date'])
)
@ -242,7 +241,7 @@ class DAG(BaseDag, LoggingMixin):
# Apply the timezone we settled on to end_date if it wasn't supplied
if 'end_date' in self.default_args and self.default_args['end_date']:
if isinstance(self.default_args['end_date'], six.string_types):
if isinstance(self.default_args['end_date'], str):
self.default_args['end_date'] = (
timezone.parse(self.default_args['end_date'], timezone=self.timezone)
)
@ -261,13 +260,13 @@ class DAG(BaseDag, LoggingMixin):
)
self.schedule_interval = schedule_interval
if isinstance(schedule_interval, six.string_types) and schedule_interval in cron_presets:
if isinstance(schedule_interval, str) and schedule_interval in cron_presets:
self._schedule_interval = cron_presets.get(schedule_interval) # type: Optional[ScheduleInterval]
elif schedule_interval == '@once':
self._schedule_interval = None
else:
self._schedule_interval = schedule_interval
if isinstance(template_searchpath, six.string_types):
if isinstance(template_searchpath, str):
template_searchpath = [template_searchpath]
self.template_searchpath = template_searchpath
self.template_undefined = template_undefined
@ -374,7 +373,7 @@ class DAG(BaseDag, LoggingMixin):
:param dttm: utc datetime
:return: utc datetime
"""
if isinstance(self._schedule_interval, six.string_types):
if isinstance(self._schedule_interval, str):
# we don't want to rely on the transitions created by
# croniter as they are not always correct
dttm = pendulum.instance(dttm)
@ -402,7 +401,7 @@ class DAG(BaseDag, LoggingMixin):
:param dttm: utc datetime
:return: utc datetime
"""
if isinstance(self._schedule_interval, six.string_types):
if isinstance(self._schedule_interval, str):
# we don't want to rely on the transitions created by
# croniter as they are not always correct
dttm = pendulum.instance(dttm)

Просмотреть файл

@ -27,7 +27,6 @@ import zipfile
from collections import namedtuple
from datetime import datetime, timedelta
import six
from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
from sqlalchemy import or_
@ -248,7 +247,7 @@ class DagBag(BaseDagBag, LoggingMixin):
try:
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, six.string_types):
if isinstance(dag._schedule_interval, str):
croniter(dag._schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags

Просмотреть файл

@ -171,7 +171,7 @@ class DagRun(Base, LoggingMixin):
TaskInstance.execution_date == self.execution_date,
)
if state:
if isinstance(state, six.string_types):
if isinstance(state, str):
tis = tis.filter(TaskInstance.state == state)
else:
# this is required to deal with NULL values

Просмотреть файл

@ -18,7 +18,6 @@
# under the License.
import datetime
import six
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
@ -68,7 +67,7 @@ class TriggerDagRunOperator(BaseOperator):
if isinstance(execution_date, datetime.datetime):
self.execution_date = execution_date.isoformat()
elif isinstance(execution_date, six.string_types):
elif isinstance(execution_date, str):
self.execution_date = execution_date
elif execution_date is None:
self.execution_date = execution_date

Просмотреть файл

@ -20,7 +20,6 @@
from functools import wraps
import logging
from six import string_types
import socket
import string
import textwrap
@ -56,7 +55,7 @@ ALLOWED_CHARACTERS = set(string.ascii_letters + string.digits + '_.-')
def stat_name_default_handler(stat_name, max_length=250):
if not isinstance(stat_name, string_types):
if not isinstance(stat_name, str):
raise InvalidStatsNameException('The stat_name has to be a string')
if len(stat_name) > max_length:
raise InvalidStatsNameException(textwrap.dedent("""\

Просмотреть файл

@ -20,7 +20,6 @@
from airflow.utils import timezone
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta # noqa: F401 for doctest
import six
from croniter import croniter
@ -71,7 +70,7 @@ def date_range(start_date, end_date=None, num=None, delta=None):
delta_iscron = False
tz = start_date.tzinfo
if isinstance(delta, six.string_types):
if isinstance(delta, str):
delta_iscron = True
start_date = timezone.make_naive(start_date, tz)
cron = croniter(delta, start_date)
@ -130,7 +129,7 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
datetime.datetime(2015, 9, 14, 0, 0)
"""
if isinstance(delta, six.string_types):
if isinstance(delta, str):
# It's cron based, so it's easy
tz = start_date.tzinfo
start_date = timezone.make_naive(start_date, tz)

Просмотреть файл

@ -21,8 +21,6 @@ import logging
import sys
import warnings
import six
from contextlib import contextmanager
from logging import Handler, StreamHandler
@ -123,7 +121,7 @@ class RedirectStdHandler(StreamHandler):
sys.stderr/stdout at handler construction time.
"""
def __init__(self, stream):
if not isinstance(stream, six.string_types):
if not isinstance(stream, str):
raise Exception("Cannot use file like objects. Use 'stdout' or 'stderr'"
" as a str and without 'ext://'.")

Просмотреть файл

@ -43,7 +43,6 @@ from flask_babel import lazy_gettext
import lazy_object_proxy
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
import six
from sqlalchemy import or_, desc, and_, union_all
from wtforms import SelectField, validators
@ -2266,7 +2265,7 @@ class VariableModelView(AirflowModelView):
suc_count = fail_count = 0
for k, v in d.items():
try:
models.Variable.set(k, v, serialize_json=not isinstance(v, six.string_types))
models.Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e:
logging.info('Variable import failed: {}'.format(repr(e)))
fail_count += 1

Просмотреть файл

@ -120,7 +120,7 @@ class ExampleInclude(SphinxDirective):
return [retnode]
except Exception as exc:
return [document.reporter.warning(text_type(exc), line=self.lineno)]
return [document.reporter.warning(str(exc), line=self.lineno)]
def register_source(app, env, modname):
@ -138,7 +138,7 @@ def register_source(app, env, modname):
env._viewcode_modules[modname] = False # type: ignore
return False
if not isinstance(analyzer.code, text_type):
if not isinstance(analyzer.code, str):
code = analyzer.code.decode(analyzer.encoding)
else:
code = analyzer.code

Просмотреть файл

@ -21,8 +21,6 @@ import unittest
from unittest.mock import MagicMock
from datetime import datetime
import six
from airflow import models
from airflow.contrib.operators.bigquery_get_data import BigQueryGetDataOperator
from airflow.contrib.operators.bigquery_operator import \
@ -251,10 +249,10 @@ class BigQueryOperatorTest(unittest.TestCase):
cluster_fields=None,
)
self.assertTrue(isinstance(operator.sql, six.string_types))
self.assertTrue(isinstance(operator.sql, str))
ti = TaskInstance(task=operator, execution_date=DEFAULT_DATE)
ti.render_templates()
self.assertTrue(isinstance(ti.task.sql, six.string_types))
self.assertTrue(isinstance(ti.task.sql, str))
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
def test_bigquery_operator_extra_link(self, mock_hook):

Просмотреть файл

@ -1019,7 +1019,7 @@ class CoreTest(unittest.TestCase):
def test_trigger_dagrun_with_str_execution_date(self):
utc_now_str = timezone.utcnow().isoformat()
self.assertIsInstance(utc_now_str, six.string_types)
self.assertIsInstance(utc_now_str, (str,))
run_id = 'trig__' + utc_now_str
def payload_generator(context, object):
@ -1045,7 +1045,7 @@ class CoreTest(unittest.TestCase):
execution_date='{{ execution_date }}',
dag=self.dag)
self.assertTrue(isinstance(task.execution_date, six.string_types))
self.assertTrue(isinstance(task.execution_date, str))
self.assertEqual(task.execution_date, '{{ execution_date }}')
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)

Просмотреть файл

@ -22,8 +22,6 @@ import os
import warnings
from collections import OrderedDict
import six
from airflow import configuration
from airflow.configuration import conf, AirflowConfigParser, parameterized_config
@ -318,7 +316,7 @@ key3 = value3
self.assertTrue(isinstance(section_dict['visibility_timeout'], int))
self.assertTrue(isinstance(section_dict['_test_only_bool'], bool))
self.assertTrue(isinstance(section_dict['_test_only_float'], float))
self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types))
self.assertTrue(isinstance(section_dict['_test_only_string'], str))
def test_deprecated_options(self):
# Guarantee we have a deprecated setting, so we test the deprecation

Просмотреть файл

@ -23,7 +23,6 @@
# SOFTWARE.
import json
import six
from elasticsearch import Elasticsearch
from elasticsearch.client.utils import query_params
@ -280,7 +279,7 @@ class FakeElasticsearch(Elasticsearch):
# Ensure to have a list of index
if index is None:
searchable_indexes = self.__documents_dict.keys()
elif isinstance(index, six.string_types):
elif isinstance(index, str):
searchable_indexes = [index]
elif isinstance(index, list):
searchable_indexes = index
@ -302,7 +301,7 @@ class FakeElasticsearch(Elasticsearch):
# Ensure to have a list of index
if doc_type is None:
searchable_doc_types = []
elif isinstance(doc_type, six.string_types):
elif isinstance(doc_type, str):
searchable_doc_types = [doc_type]
elif isinstance(doc_type, list):
searchable_doc_types = doc_type