diff --git a/UPDATING.md b/UPDATING.md index fa3c032432..a6729e0884 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -61,6 +61,25 @@ https://developers.google.com/style/inclusive-documentation --> +### Rename parameter name in PinotAdminHook.create_segment + +Rename parameter name from ``format`` to ``segment_format`` in PinotAdminHook function create_segment fro pylint compatible + +### Rename parameter name in HiveMetastoreHook.get_partitions + +Rename parameter name from ``filter`` to ``partition_filter`` in HiveMetastoreHook function get_partitions for pylint compatible + +### Remove unnecessary parameter in FTPHook.list_directory + +Remove unnecessary parameter ``nlst`` in FTPHook function list_directory for pylint compatible + +### Remove unnecessary parameter in PostgresHook function copy_expert + +Remove unnecessary parameter ``open`` in PostgresHook function copy_expert for pylint compatible + +### Change parameter name in OpsgenieAlertOperator + +Change parameter name from ``visibleTo`` to ``visible_to`` in OpsgenieAlertOperator for pylint compatible ### Use NULL as default value for dag.description diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py index 9865e77c8e..0537ca9fa1 100644 --- a/airflow/providers/apache/druid/hooks/druid.py +++ b/airflow/providers/apache/druid/hooks/druid.py @@ -59,6 +59,9 @@ class DruidHook(BaseHook): raise ValueError("Druid timeout should be equal or greater than 1") def get_conn_url(self): + """ + Get Druid connection url + """ conn = self.get_connection(self.druid_ingest_conn_id) host = conn.host port = conn.port @@ -82,6 +85,9 @@ class DruidHook(BaseHook): return None def submit_indexing_job(self, json_index_spec: str): + """ + Submit Druid ingestion job + """ url = self.get_conn_url() self.log.info("Druid ingestion spec: %s", json_index_spec) @@ -107,7 +113,7 @@ class DruidHook(BaseHook): # ensure that the job gets killed if the max ingestion time is exceeded requests.post("{0}/{1}/shutdown".format(url, druid_task_id), auth=self.get_auth()) raise AirflowException('Druid ingestion took more than ' - '%s seconds', self.max_ingestion_time) + f'{self.max_ingestion_time} seconds') time.sleep(self.timeout) @@ -122,7 +128,7 @@ class DruidHook(BaseHook): raise AirflowException('Druid indexing job failed, ' 'check console for more info') else: - raise AirflowException('Could not get status of the job, got %s', status) + raise AirflowException(f'Could not get status of the job, got {status}') self.log.info('Successful index') @@ -138,14 +144,11 @@ class DruidDbApiHook(DbApiHook): default_conn_name = 'druid_broker_default' supports_autocommit = False - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - def get_conn(self): """ Establish a connection to druid broker. """ - conn = self.get_connection(self.druid_broker_conn_id) + conn = self.get_connection(self.druid_broker_conn_id) # pylint: disable=no-member druid_broker_conn = connect( host=conn.host, port=conn.port, diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index 4446502831..d4122ce86d 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -88,6 +88,7 @@ class HiveCliHook(BaseHook): self.auth = conn.extra_dejson.get('auth', 'noSasl') self.conn = conn self.run_as = run_as + self.sub_process = None if mapred_queue_priority: mapred_queue_priority = mapred_queue_priority.upper() @@ -241,24 +242,24 @@ class HiveCliHook(BaseHook): if verbose: self.log.info("%s", " ".join(hive_cmd)) - sp = subprocess.Popen( + sub_process = subprocess.Popen( hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir, close_fds=True) - self.sp = sp + self.sub_process = sub_process stdout = '' while True: - line = sp.stdout.readline() + line = sub_process.stdout.readline() if not line: break stdout += line.decode('UTF-8') if verbose: self.log.info(line.decode('UTF-8').strip()) - sp.wait() + sub_process.wait() - if sp.returncode: + if sub_process.returncode: raise AirflowException(stdout) return stdout @@ -338,7 +339,7 @@ class HiveCliHook(BaseHook): """ def _infer_field_types_from_df(df): - DTYPE_KIND_HIVE_TYPE = { + dtype_kind_hive_type = { 'b': 'BOOLEAN', # boolean 'i': 'BIGINT', # signed integer 'u': 'BIGINT', # unsigned integer @@ -351,10 +352,10 @@ class HiveCliHook(BaseHook): 'V': 'STRING' # void } - d = OrderedDict() + order_type = OrderedDict() for col, dtype in df.dtypes.iteritems(): - d[col] = DTYPE_KIND_HIVE_TYPE[dtype.kind] - return d + order_type[col] = dtype_kind_hive_type[dtype.kind] + return order_type if pandas_kwargs is None: pandas_kwargs = {} @@ -466,12 +467,15 @@ class HiveCliHook(BaseHook): self.run_cli(hql) def kill(self): + """ + Kill Hive cli command + """ if hasattr(self, 'sp'): - if self.sp.poll() is None: + if self.sub_process.poll() is None: print("Killing the Hive job") - self.sp.terminate() + self.sub_process.terminate() time.sleep(60) - self.sp.kill() + self.sub_process.kill() class HiveMetastoreHook(BaseHook): @@ -488,9 +492,9 @@ class HiveMetastoreHook(BaseHook): def __getstate__(self): # This is for pickling to work despite the thirft hive client not # being pickable - d = dict(self.__dict__) - del d['metastore'] - return d + state = dict(self.__dict__) + del state['metastore'] + return state def __setstate__(self, d): self.__dict__.update(d) @@ -504,18 +508,18 @@ class HiveMetastoreHook(BaseHook): from thrift.transport import TSocket, TTransport from thrift.protocol import TBinaryProtocol - ms = self._find_valid_server() + conn = self._find_valid_server() - if ms is None: + if not conn: raise AirflowException("Failed to locate the valid server.") - auth_mechanism = ms.extra_dejson.get('authMechanism', 'NOSASL') + auth_mechanism = conn.extra_dejson.get('authMechanism', 'NOSASL') if conf.get('core', 'security') == 'kerberos': - auth_mechanism = ms.extra_dejson.get('authMechanism', 'GSSAPI') - kerberos_service_name = ms.extra_dejson.get('kerberos_service_name', 'hive') + auth_mechanism = conn.extra_dejson.get('authMechanism', 'GSSAPI') + kerberos_service_name = conn.extra_dejson.get('kerberos_service_name', 'hive') - conn_socket = TSocket.TSocket(ms.host, ms.port) + conn_socket = TSocket.TSocket(conn.host, conn.port) if conf.get('core', 'security') == 'kerberos' \ and auth_mechanism == 'GSSAPI': @@ -526,7 +530,7 @@ class HiveMetastoreHook(BaseHook): def sasl_factory(): sasl_client = sasl.Client() - sasl_client.setAttr("host", ms.host) + sasl_client.setAttr("host", conn.host) sasl_client.setAttr("service", kerberos_service_name) sasl_client.init() return sasl_client @@ -551,6 +555,7 @@ class HiveMetastoreHook(BaseHook): return conn else: self.log.info("Could not connect to %s:%s", conn.host, conn.port) + return None def get_conn(self): return self.metastore @@ -577,10 +582,7 @@ class HiveMetastoreHook(BaseHook): partitions = client.get_partitions_by_filter( schema, table, partition, 1) - if partitions: - return True - else: - return False + return bool(partitions) def check_for_named_partition(self, schema, table, partition_name): """ @@ -634,8 +636,7 @@ class HiveMetastoreHook(BaseHook): with self.metastore as client: return client.get_databases(pattern) - def get_partitions( - self, schema, table_name, filter=None): + def get_partitions(self, schema, table_name, partition_filter=None): """ Returns a list of all partitions in a table. Works only for tables with less than 32767 (java short max val). @@ -654,10 +655,10 @@ class HiveMetastoreHook(BaseHook): if len(table.partitionKeys) == 0: raise AirflowException("The table isn't partitioned") else: - if filter: + if partition_filter: parts = client.get_partitions_by_filter( db_name=schema, tbl_name=table_name, - filter=filter, max_parts=HiveMetastoreHook.MAX_PART_COUNT) + filter=partition_filter, max_parts=HiveMetastoreHook.MAX_PART_COUNT) else: parts = client.get_partitions( db_name=schema, tbl_name=table_name, @@ -770,7 +771,7 @@ class HiveMetastoreHook(BaseHook): try: self.get_table(table_name, db) return True - except Exception: + except Exception: # pylint: disable=broad-except return False @@ -849,7 +850,7 @@ class HiveServer2Hook(BaseHook): lowered_statement.startswith('show') or (lowered_statement.startswith('set') and '=' not in lowered_statement)): - description = [c for c in cur.description] + description = cur.description if previous_description and previous_description != description: message = '''The statements are producing different descriptions: Current: {} diff --git a/airflow/providers/apache/hive/operators/hive.py b/airflow/providers/apache/hive/operators/hive.py index 7b5814bed7..f6d06dda87 100644 --- a/airflow/providers/apache/hive/operators/hive.py +++ b/airflow/providers/apache/hive/operators/hive.py @@ -66,6 +66,7 @@ class HiveOperator(BaseOperator): template_ext = ('.hql', '.sql',) ui_color = '#f0e4ec' + # pylint: disable=too-many-arguments @apply_defaults def __init__( self, @@ -104,6 +105,9 @@ class HiveOperator(BaseOperator): self.hook = None def get_hook(self): + """ + Get Hive cli hook + """ return HiveCliHook( hive_cli_conn_id=self.hive_cli_conn_id, run_as=self.run_as, diff --git a/airflow/providers/apache/hive/operators/hive_stats.py b/airflow/providers/apache/hive/operators/hive_stats.py index f492e2dfce..2b0f968249 100644 --- a/airflow/providers/apache/hive/operators/hive_stats.py +++ b/airflow/providers/apache/hive/operators/hive_stats.py @@ -86,22 +86,25 @@ class HiveStatsCollectionOperator(BaseOperator): self.dttm = '{{ execution_date.isoformat() }}' def get_default_exprs(self, col, col_type): + """ + Get default expressions + """ if col in self.col_blacklist: return {} - d = {(col, 'non_null'): "COUNT({col})"} + exp = {(col, 'non_null'): f"COUNT({col})"} if col_type in ['double', 'int', 'bigint', 'float']: - d[(col, 'sum')] = 'SUM({col})' - d[(col, 'min')] = 'MIN({col})' - d[(col, 'max')] = 'MAX({col})' - d[(col, 'avg')] = 'AVG({col})' + exp[(col, 'sum')] = f'SUM({col})' + exp[(col, 'min')] = f'MIN({col})' + exp[(col, 'max')] = f'MAX({col})' + exp[(col, 'avg')] = f'AVG({col})' elif col_type == 'boolean': - d[(col, 'true')] = 'SUM(CASE WHEN {col} THEN 1 ELSE 0 END)' - d[(col, 'false')] = 'SUM(CASE WHEN NOT {col} THEN 1 ELSE 0 END)' + exp[(col, 'true')] = f'SUM(CASE WHEN {col} THEN 1 ELSE 0 END)' + exp[(col, 'false')] = f'SUM(CASE WHEN NOT {col} THEN 1 ELSE 0 END)' elif col_type in ['string']: - d[(col, 'len')] = 'SUM(CAST(LENGTH({col}) AS BIGINT))' - d[(col, 'approx_distinct')] = 'APPROX_DISTINCT({col})' + exp[(col, 'len')] = f'SUM(CAST(LENGTH({col}) AS BIGINT))' + exp[(col, 'approx_distinct')] = f'APPROX_DISTINCT({col})' - return {k: v.format(col=col) for k, v in d.items()} + return exp def execute(self, context=None): metastore = HiveMetastoreHook(metastore_conn_id=self.metastore_conn_id) @@ -113,12 +116,12 @@ class HiveStatsCollectionOperator(BaseOperator): } for col, col_type in list(field_types.items()): if self.assignment_func: - d = self.assignment_func(col, col_type) - if d is None: - d = self.get_default_exprs(col, col_type) + assign_exprs = self.assignment_func(col, col_type) + if assign_exprs is None: + assign_exprs = self.get_default_exprs(col, col_type) else: - d = self.get_default_exprs(col, col_type) - exprs.update(d) + assign_exprs = self.get_default_exprs(col, col_type) + exprs.update(assign_exprs) exprs.update(self.extra_exprs) exprs = OrderedDict(exprs) exprs_str = ",\n ".join([ diff --git a/airflow/providers/apache/pig/hooks/pig.py b/airflow/providers/apache/pig/hooks/pig.py index c703d206b8..6a938aaa8e 100644 --- a/airflow/providers/apache/pig/hooks/pig.py +++ b/airflow/providers/apache/pig/hooks/pig.py @@ -40,6 +40,7 @@ class PigCliHook(BaseHook): conn = self.get_connection(pig_cli_conn_id) self.pig_properties = conn.extra_dejson.get('pig_properties', '') self.conn = conn + self.sub_process = None def run_cli(self, pig, pig_opts=None, verbose=True): """ @@ -72,27 +73,30 @@ class PigCliHook(BaseHook): if verbose: self.log.info("%s", " ".join(pig_cmd)) - sp = subprocess.Popen( + sub_process = subprocess.Popen( pig_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir, close_fds=True) - self.sp = sp + self.sub_process = sub_process stdout = '' - for line in iter(sp.stdout.readline, b''): + for line in iter(sub_process.stdout.readline, b''): stdout += line.decode('utf-8') if verbose: self.log.info(line.strip()) - sp.wait() + sub_process.wait() - if sp.returncode: + if sub_process.returncode: raise AirflowException(stdout) return stdout def kill(self): - if hasattr(self, 'sp'): - if self.sp.poll() is None: + """ + Kill Pig job + """ + if self.sub_process: + if self.sub_process.poll() is None: print("Killing the Pig job") - self.sp.kill() + self.sub_process.kill() diff --git a/airflow/providers/apache/pig/operators/pig.py b/airflow/providers/apache/pig/operators/pig.py index db6c1657cc..99da407f40 100644 --- a/airflow/providers/apache/pig/operators/pig.py +++ b/airflow/providers/apache/pig/operators/pig.py @@ -60,9 +60,7 @@ class PigOperator(BaseOperator): self.pig = pig self.pig_cli_conn_id = pig_cli_conn_id self.pig_opts = pig_opts - - def get_hook(self): - return PigCliHook(pig_cli_conn_id=self.pig_cli_conn_id) + self.hook = None def prepare_template(self): if self.pigparams_jinja_translate: @@ -71,7 +69,7 @@ class PigOperator(BaseOperator): def execute(self, context): self.log.info('Executing: %s', self.pig) - self.hook = self.get_hook() + self.hook = PigCliHook(pig_cli_conn_id=self.pig_cli_conn_id) self.hook.run_cli(pig=self.pig, pig_opts=self.pig_opts) def on_kill(self): diff --git a/airflow/providers/apache/pinot/hooks/pinot.py b/airflow/providers/apache/pinot/hooks/pinot.py index 1ccae200f0..8f7fd2656e 100644 --- a/airflow/providers/apache/pinot/hooks/pinot.py +++ b/airflow/providers/apache/pinot/hooks/pinot.py @@ -18,6 +18,7 @@ import os import subprocess +from typing import Optional from pinotdb import connect @@ -51,6 +52,7 @@ class PinotAdminHook(BaseHook): "Exception" is in the output message. :type pinot_admin_system_exit: bool """ + def __init__(self, conn_id="pinot_admin_default", cmd_path="pinot-admin.sh", @@ -67,28 +69,45 @@ class PinotAdminHook(BaseHook): def get_conn(self): return self.conn - def add_schema(self, schema_file, exec=True): + def add_schema(self, schema_file: str, with_exec: Optional[bool] = True): + """ + Add Pinot schema by run AddSchema command + + :param schema_file: Pinot schema file + :type schema_file: str + :param with_exec: bool + :type with_exec: Optional[bool] + """ cmd = ["AddSchema"] cmd += ["-controllerHost", self.host] cmd += ["-controllerPort", self.port] cmd += ["-schemaFile", schema_file] - if exec: + if with_exec: cmd += ["-exec"] self.run_cli(cmd) - def add_table(self, file_path, exec=True): + def add_table(self, file_path: str, with_exec: Optional[bool] = True): + """ + Add Pinot table with run AddTable command + + :param file_path: Pinot table configure file + :type file_path: str + :param with_exec: bool + :type with_exec: Optional[bool] + """ cmd = ["AddTable"] cmd += ["-controllerHost", self.host] cmd += ["-controllerPort", self.port] cmd += ["-filePath", file_path] - if exec: + if with_exec: cmd += ["-exec"] self.run_cli(cmd) + # pylint: disable=too-many-arguments def create_segment(self, generator_config_file=None, data_dir=None, - format=None, + segment_format=None, out_dir=None, overwrite=None, table_name=None, @@ -104,6 +123,9 @@ class PinotAdminHook(BaseHook): num_threads=None, post_creation_verification=None, retry=None): + """ + Create Pinot segment by run CreateSegment command + """ cmd = ["CreateSegment"] if generator_config_file: @@ -112,8 +134,8 @@ class PinotAdminHook(BaseHook): if data_dir: cmd += ["-dataDir", data_dir] - if format: - cmd += ["-format", format] + if segment_format: + cmd += ["-format", segment_format] if out_dir: cmd += ["-outDir", out_dir] @@ -163,6 +185,13 @@ class PinotAdminHook(BaseHook): self.run_cli(cmd) def upload_segment(self, segment_dir, table_name=None): + """ + Upload Segment with run UploadSegment command + + :param segment_dir: + :param table_name: + :return: + """ cmd = ["UploadSegment"] cmd += ["-controllerHost", self.host] cmd += ["-controllerPort", self.port] @@ -171,7 +200,15 @@ class PinotAdminHook(BaseHook): cmd += ["-tableName", table_name] self.run_cli(cmd) - def run_cli(self, cmd, verbose=True): + def run_cli(self, cmd: list, verbose: Optional[bool] = True): + """ + Run command with pinot-admin.sh + + :param cmd: List of command going to be run by pinot-admin.sh script + :type cmd: list + :param verbose: + :type verbose: Optional[bool] + """ command = [self.cmd_path] command.extend(cmd) @@ -184,7 +221,7 @@ class PinotAdminHook(BaseHook): if verbose: self.log.info(" ".join(command)) - sp = subprocess.Popen( + sub_process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -192,18 +229,17 @@ class PinotAdminHook(BaseHook): env=env) stdout = "" - for line in iter(sp.stdout): - line = line.decode() - stdout += line + for line in iter(sub_process.stdout.readline, b''): + stdout += line.decode("utf-8") if verbose: - self.log.info(line.strip()) + self.log.info(line.decode("utf-8").strip()) - sp.wait() + sub_process.wait() # As of Pinot v0.1.0, either of "Error: ..." or "Exception caught: ..." # is expected to be in the output messages. See: # https://github.com/apache/incubator-pinot/blob/release-0.1.0/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java#L98-L101 - if ((self.pinot_admin_system_exit and sp.returncode) or + if ((self.pinot_admin_system_exit and sub_process.returncode) or ("Error" in stdout or "Exception" in stdout)): raise AirflowException(stdout) @@ -218,14 +254,11 @@ class PinotDbApiHook(DbApiHook): default_conn_name = 'pinot_broker_default' supports_autocommit = False - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - def get_conn(self): """ Establish a connection to pinot broker through pinot dbapi. """ - conn = self.get_connection(self.pinot_broker_conn_id) + conn = self.get_connection(self.pinot_broker_conn_id) # pylint: disable=no-member pinot_broker_conn = connect( host=conn.host, port=conn.port, @@ -233,7 +266,7 @@ class PinotDbApiHook(DbApiHook): scheme=conn.extra_dejson.get('schema', 'http') ) self.log.info('Get the connection to pinot ' - 'broker on {host}'.format(host=conn.host)) + 'broker on %s', conn.host) return pinot_broker_conn def get_uri(self): diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py index f483beeac7..45d3d60d7b 100644 --- a/airflow/providers/apache/spark/hooks/spark_jdbc.py +++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py @@ -22,6 +22,7 @@ from airflow.exceptions import AirflowException from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook +# pylint: disable=too-many-instance-attributes class SparkJDBCHook(SparkSubmitHook): """ This hook extends the SparkSubmitHook specifically for performing data @@ -109,6 +110,8 @@ class SparkJDBCHook(SparkSubmitHook): The specified types should be valid spark sql data types. """ + + # pylint: disable=too-many-arguments,too-many-locals def __init__(self, spark_app_name='airflow-spark-jdbc', spark_conn_id='spark-default', @@ -237,6 +240,9 @@ class SparkJDBCHook(SparkSubmitHook): return arguments def submit_jdbc_job(self): + """ + Submit Spark JDBC job + """ self._application_args = \ self._build_jdbc_application_arguments(self._jdbc_connection) self.submit(application=os.path.dirname(os.path.abspath(__file__)) + diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc_script.py b/airflow/providers/apache/spark/hooks/spark_jdbc_script.py index b28e09dd43..efc70cba52 100644 --- a/airflow/providers/apache/spark/hooks/spark_jdbc_script.py +++ b/airflow/providers/apache/spark/hooks/spark_jdbc_script.py @@ -27,6 +27,16 @@ def set_common_options(spark_source, user='root', password='root', driver='driver'): + """ + Get Spark source from JDBC connection + + :param spark_source: Spark source, here is Spark reader or writer + :param url: JDBC resource url + :param jdbc_table: JDBC resource table name + :param user: JDBC resource user name + :param password: JDBC resource password + :param driver: JDBC resource driver + """ spark_source = spark_source \ .format('jdbc') \ @@ -38,10 +48,14 @@ def set_common_options(spark_source, return spark_source -def spark_write_to_jdbc(spark, url, user, password, metastore_table, jdbc_table, driver, +# pylint: disable=too-many-arguments +def spark_write_to_jdbc(spark_session, url, user, password, metastore_table, jdbc_table, driver, truncate, save_mode, batch_size, num_partitions, create_table_column_types): - writer = spark \ + """ + Transfer data from Spark to JDBC source + """ + writer = spark_session \ .table(metastore_table) \ .write \ @@ -62,12 +76,16 @@ def spark_write_to_jdbc(spark, url, user, password, metastore_table, jdbc_table, .save(mode=save_mode) -def spark_read_from_jdbc(spark, url, user, password, metastore_table, jdbc_table, driver, +# pylint: disable=too-many-arguments +def spark_read_from_jdbc(spark_session, url, user, password, metastore_table, jdbc_table, driver, save_mode, save_format, fetch_size, num_partitions, partition_column, lower_bound, upper_bound): + """ + Transfer data from JDBC source to Spark + """ # first set common options - reader = set_common_options(spark.read, url, jdbc_table, user, password, driver) + reader = set_common_options(spark_session.read, url, jdbc_table, user, password, driver) # now set specific read options if fetch_size: diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py index 18b42347de..1bbd39fce1 100644 --- a/airflow/providers/apache/spark/hooks/spark_sql.py +++ b/airflow/providers/apache/spark/hooks/spark_sql.py @@ -54,6 +54,8 @@ class SparkSqlHook(BaseHook): :param yarn_queue: The YARN queue to submit to (Default: "default") :type yarn_queue: str """ + + # pylint: disable=too-many-arguments def __init__(self, sql, conf=None, @@ -167,6 +169,9 @@ class SparkSqlHook(BaseHook): ) def kill(self): + """ + Kill Spark job + """ if self._sp and self._sp.poll() is None: self.log.info("Killing the Spark-Sql job") self._sp.kill() diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index 6ee56f392d..ac40a9db92 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -31,6 +31,7 @@ except ImportError: pass +# pylint: disable=too-many-instance-attributes class SparkSubmitHook(BaseHook, LoggingMixin): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. @@ -99,6 +100,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin): Some distros may use spark2-submit. :type spark_binary: str """ + + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches def __init__(self, conf=None, conn_id='spark_default', @@ -167,6 +170,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): self._driver_id = None self._driver_status = None self._spark_exit_code = None + self._env = None def _resolve_should_track_driver_status(self): """ @@ -382,7 +386,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): """ spark_submit_cmd = self._build_spark_submit_command(application) - if hasattr(self, '_env'): + if self._env: env = os.environ.copy() env.update(self._env) kwargs["env"] = env @@ -406,7 +410,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): ) ) - self.log.debug("Should track driver: {}".format(self._should_track_driver_status)) + self.log.debug("Should track driver: %s", self._should_track_driver_status) # We want the Airflow job to wait until the Spark driver is finished if self._should_track_driver_status: @@ -473,8 +477,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): match_driver_id = re.search(r'(driver-[0-9\-]+)', line) if match_driver_id: self._driver_id = match_driver_id.groups()[0] - self.log.info("identified spark driver id: {}" - .format(self._driver_id)) + self.log.info("identified spark driver id: %s", self._driver_id) self.log.info(line) @@ -495,7 +498,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): .replace(',', '').replace('\"', '').strip() driver_found = True - self.log.debug("spark driver status log: {}".format(line)) + self.log.debug("spark driver status log: %s", line) if not driver_found: self._driver_status = "UNKNOWN" @@ -543,8 +546,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): # Sleep for n seconds as we do not want to spam the cluster time.sleep(self._status_poll_interval) - self.log.debug("polling status of spark driver with id {}" - .format(self._driver_id)) + self.log.debug("polling status of spark driver with id %s", self._driver_id) poll_drive_status_cmd = self._build_track_driver_status_command() status_process = subprocess.Popen(poll_drive_status_cmd, @@ -592,29 +594,30 @@ class SparkSubmitHook(BaseHook, LoggingMixin): return connection_cmd def on_kill(self): + """ + Kill Spark submit command + """ self.log.debug("Kill Command is being called") if self._should_track_driver_status: if self._driver_id: - self.log.info('Killing driver {} on cluster' - .format(self._driver_id)) + self.log.info('Killing driver %s on cluster', self._driver_id) kill_cmd = self._build_spark_driver_kill_command() driver_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.log.info("Spark driver {} killed with return code: {}" - .format(self._driver_id, driver_kill.wait())) + self.log.info("Spark driver %s killed with return code: %s", + self._driver_id, driver_kill.wait()) if self._submit_sp and self._submit_sp.poll() is None: self.log.info('Sending kill signal to %s', self._connection['spark_binary']) self._submit_sp.kill() if self._yarn_application_id: - self.log.info('Killing application {} on YARN' - .format(self._yarn_application_id)) + self.log.info('Killing application %s on YARN', self._yarn_application_id) kill_cmd = "yarn application -kill {}" \ .format(self._yarn_application_id).split() diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py b/airflow/providers/apache/spark/operators/spark_jdbc.py index 6541f4b7b0..6a45b8cf43 100644 --- a/airflow/providers/apache/spark/operators/spark_jdbc.py +++ b/airflow/providers/apache/spark/operators/spark_jdbc.py @@ -21,6 +21,7 @@ from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOpe from airflow.utils.decorators import apply_defaults +# pylint: disable=too-many-instance-attributes class SparkJDBCOperator(SparkSubmitOperator): """ This operator extends the SparkSubmitOperator specifically for performing data @@ -111,6 +112,7 @@ class SparkJDBCOperator(SparkSubmitOperator): types. """ + # pylint: disable=too-many-arguments,too-many-locals @apply_defaults def __init__(self, spark_app_name='airflow-spark-jdbc', diff --git a/airflow/providers/apache/spark/operators/spark_sql.py b/airflow/providers/apache/spark/operators/spark_sql.py index 32ed53be01..ac887af4d5 100644 --- a/airflow/providers/apache/spark/operators/spark_sql.py +++ b/airflow/providers/apache/spark/operators/spark_sql.py @@ -56,6 +56,7 @@ class SparkSqlOperator(BaseOperator): template_fields = ["_sql"] template_ext = [".sql", ".hql"] + # pylint: disable=too-many-arguments @apply_defaults def __init__(self, sql, diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index e5eb6e970d..31553684e9 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -22,6 +22,7 @@ from airflow.settings import WEB_COLORS from airflow.utils.decorators import apply_defaults +# pylint: disable=too-many-instance-attributes class SparkSubmitOperator(BaseOperator): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. @@ -93,6 +94,7 @@ class SparkSubmitOperator(BaseOperator): '_application_args', '_env_vars') ui_color = WEB_COLORS['LIGHTORANGE'] + # pylint: disable=too-many-arguments,too-many-locals @apply_defaults def __init__(self, application='', diff --git a/airflow/providers/apache/sqoop/hooks/sqoop.py b/airflow/providers/apache/sqoop/hooks/sqoop.py index ce7dd2d19b..9510b7b934 100644 --- a/airflow/providers/apache/sqoop/hooks/sqoop.py +++ b/airflow/providers/apache/sqoop/hooks/sqoop.py @@ -71,16 +71,17 @@ class SqoopHook(BaseHook): self.verbose = verbose self.num_mappers = num_mappers self.properties = properties or {} - self.log.info( - "Using connection to: {}:{}/{}".format( - self.conn.host, self.conn.port, self.conn.schema - ) - ) + self.log.info("Using connection to: %s:%s/%s", + self.conn.host, self.conn.port, self.conn.schema) + self.sub_process = None def get_conn(self): return self.conn def cmd_mask_password(self, cmd_orig): + """ + Mask command password for safety + """ cmd = deepcopy(cmd_orig) try: password_index = cmd.index('--password') @@ -89,7 +90,7 @@ class SqoopHook(BaseHook): self.log.debug("No password in sqoop cmd") return cmd - def Popen(self, cmd, **kwargs): + def popen(self, cmd, **kwargs): """ Remote Popen @@ -98,21 +99,21 @@ class SqoopHook(BaseHook): :return: handle to subprocess """ masked_cmd = ' '.join(self.cmd_mask_password(cmd)) - self.log.info("Executing command: {}".format(masked_cmd)) - self.sp = subprocess.Popen( + self.log.info("Executing command: %s", masked_cmd) + self.sub_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) - for line in iter(self.sp.stdout): + for line in iter(self.sub_process.stdout): self.log.info(line.strip()) - self.sp.wait() + self.sub_process.wait() - self.log.info("Command exited with return code %s", self.sp.returncode) + self.log.info("Command exited with return code %s", self.sub_process.returncode) - if self.sp.returncode: + if self.sub_process.returncode: raise AirflowException("Sqoop command failed: {}".format(masked_cmd)) def _prepare_command(self, export=False): @@ -200,6 +201,7 @@ class SqoopHook(BaseHook): return cmd + # pylint: disable=too-many-arguments def import_table(self, table, target_dir=None, append=False, file_type="text", columns=None, split_by=None, where=None, direct=False, driver=None, extra_import_options=None): @@ -231,7 +233,7 @@ class SqoopHook(BaseHook): if where: cmd += ["--where", where] - self.Popen(cmd) + self.popen(cmd) def import_query(self, query, target_dir, append=False, file_type="text", split_by=None, direct=None, driver=None, extra_import_options=None): @@ -254,8 +256,9 @@ class SqoopHook(BaseHook): driver, extra_import_options) cmd += ["--query", query] - self.Popen(cmd) + self.popen(cmd) + # pylint: disable=too-many-arguments def _export_cmd(self, table, export_dir, input_null_string, input_null_non_string, staging_table, clear_staging_table, enclosed_by, escaped_by, input_fields_terminated_by, @@ -312,6 +315,7 @@ class SqoopHook(BaseHook): return cmd + # pylint: disable=too-many-arguments def export_table(self, table, export_dir, input_null_string, input_null_non_string, staging_table, clear_staging_table, enclosed_by, @@ -353,4 +357,4 @@ class SqoopHook(BaseHook): input_optionally_enclosed_by, batch, relaxed_isolation, extra_export_options) - self.Popen(cmd) + self.popen(cmd) diff --git a/airflow/providers/apache/sqoop/operators/sqoop.py b/airflow/providers/apache/sqoop/operators/sqoop.py index 01f7860dea..ec8419ff3a 100644 --- a/airflow/providers/apache/sqoop/operators/sqoop.py +++ b/airflow/providers/apache/sqoop/operators/sqoop.py @@ -29,6 +29,7 @@ from airflow.providers.apache.sqoop.hooks.sqoop import SqoopHook from airflow.utils.decorators import apply_defaults +# pylint: disable=too-many-instance-attributes class SqoopOperator(BaseOperator): """ Execute a Sqoop job. @@ -91,6 +92,7 @@ class SqoopOperator(BaseOperator): 'extra_export_options', 'hcatalog_database', 'hcatalog_table',) ui_color = '#7D8CA4' + # pylint: disable=too-many-arguments,too-many-locals @apply_defaults def __init__(self, conn_id='sqoop_default', @@ -160,6 +162,7 @@ class SqoopOperator(BaseOperator): self.properties = properties self.extra_import_options = extra_import_options or {} self.extra_export_options = extra_export_options or {} + self.hook = None def execute(self, context): """ @@ -233,4 +236,4 @@ class SqoopOperator(BaseOperator): def on_kill(self): self.log.info('Sending SIGTERM signal to bash process group') - os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM) + os.killpg(os.getpgid(self.hook.sub_process.pid), signal.SIGTERM) diff --git a/airflow/providers/datadog/hooks/datadog.py b/airflow/providers/datadog/hooks/datadog.py index c1d00843c9..38055c4e97 100644 --- a/airflow/providers/datadog/hooks/datadog.py +++ b/airflow/providers/datadog/hooks/datadog.py @@ -56,6 +56,9 @@ class DatadogHook(BaseHook, LoggingMixin): initialize(api_key=self.api_key, app_key=self.app_key) def validate_response(self, response): + """ + Validate Datadog response + """ if response['status'] != 'ok': self.log.error("Datadog returned: %s", response) raise AirflowException("Error status received from Datadog") @@ -111,6 +114,7 @@ class DatadogHook(BaseHook, LoggingMixin): self.validate_response(response) return response + # pylint: disable=too-many-arguments def post_event(self, title, text, aggregation_key=None, alert_type=None, date_happened=None, handle=None, priority=None, related_event_id=None, tags=None, device_name=None): """ diff --git a/airflow/providers/dingding/hooks/dingding.py b/airflow/providers/dingding/hooks/dingding.py index f8e7e5ac28..00e8b4e6f4 100644 --- a/airflow/providers/dingding/hooks/dingding.py +++ b/airflow/providers/dingding/hooks/dingding.py @@ -129,5 +129,5 @@ class DingdingHook(HttpHook): # Dingding success send message will with errcode equal to 0 if int(resp.json().get('errcode')) != 0: raise AirflowException('Send Dingding message failed, receive error ' - 'message %s', resp.text) + f'message {resp.text}') self.log.info('Success Send Dingding message') diff --git a/airflow/providers/docker/hooks/docker.py b/airflow/providers/docker/hooks/docker.py index 98dfcfedf7..f84991d6ea 100644 --- a/airflow/providers/docker/hooks/docker.py +++ b/airflow/providers/docker/hooks/docker.py @@ -61,7 +61,7 @@ class DockerHook(BaseHook, LoggingMixin): self.__username = conn.login self.__password = conn.password self.__email = extra_options.get('email') - self.__reauth = False if extra_options.get('reauth') == 'no' else True + self.__reauth = extra_options.get('reauth') != 'no' def get_conn(self): client = APIClient( @@ -85,4 +85,4 @@ class DockerHook(BaseHook, LoggingMixin): self.log.debug('Login successful') except APIError as docker_error: self.log.error('Docker registry login failed: %s', str(docker_error)) - raise AirflowException('Docker registry login failed: %s', str(docker_error)) + raise AirflowException(f'Docker registry login failed: {docker_error}') diff --git a/airflow/providers/email/operators/email.py b/airflow/providers/email/operators/email.py index 10c9ba5c4c..8bd8956648 100644 --- a/airflow/providers/email/operators/email.py +++ b/airflow/providers/email/operators/email.py @@ -63,11 +63,11 @@ class EmailOperator(BaseOperator): mime_charset: str = 'utf-8', *args, **kwargs) -> None: super().__init__(*args, **kwargs) - self.to = to + self.to = to # pylint: disable=invalid-name self.subject = subject self.html_content = html_content self.files = files or [] - self.cc = cc + self.cc = cc # pylint: disable=invalid-name self.bcc = bcc self.mime_subtype = mime_subtype self.mime_charset = mime_charset diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py index 5f322e57e7..f0e9355b4f 100644 --- a/airflow/providers/ftp/hooks/ftp.py +++ b/airflow/providers/ftp/hooks/ftp.py @@ -116,7 +116,7 @@ class FTPHook(BaseHook): files = dict(mlsd(conn)) return files - def list_directory(self, path, nlst=False): + def list_directory(self, path): """ Returns a list of files on the remote system. @@ -300,7 +300,9 @@ class FTPHook(BaseHook): class FTPSHook(FTPHook): - + """ + Interact with FTPS. + """ def get_conn(self): """ Returns a FTPS connection object. diff --git a/airflow/providers/google/cloud/operators/s3_to_gcs.py b/airflow/providers/google/cloud/operators/s3_to_gcs.py index 62027af940..947f11c1e0 100644 --- a/airflow/providers/google/cloud/operators/s3_to_gcs.py +++ b/airflow/providers/google/cloud/operators/s3_to_gcs.py @@ -90,6 +90,7 @@ class S3ToGCSOperator(S3ListOperator): template_fields = ('bucket', 'prefix', 'delimiter', 'dest_gcs') ui_color = '#e09411' + # pylint: disable=too-many-arguments @apply_defaults def __init__(self, bucket, @@ -143,6 +144,7 @@ class S3ToGCSOperator(S3ListOperator): google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) + # pylint: disable=too-many-nested-blocks if not self.replace: # if we are not replacing -> list all files in the GCS bucket # and only keep those files which are present in @@ -214,7 +216,7 @@ class S3ToGCSOperator(S3ListOperator): # Following functionality may be better suited in # airflow/contrib/hooks/gcs.py @staticmethod - def _gcs_object_is_directory(object): - _, blob = _parse_gcs_url(object) + def _gcs_object_is_directory(bucket): + _, blob = _parse_gcs_url(bucket) return len(blob) == 0 or blob.endswith('/') diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 1e621cc987..38b3a3c043 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -176,7 +176,7 @@ class HttpHook(BaseHook): return response except requests.exceptions.ConnectionError as ex: - self.log.warning(str(ex) + ' Tenacity will retry to execute the operation') + self.log.warning('%s Tenacity will retry to execute the operation', ex) raise ex def run_with_advanced_retry(self, _retry_args, *args, **kwargs): diff --git a/airflow/providers/jdbc/operators/jdbc.py b/airflow/providers/jdbc/operators/jdbc.py index 82e73d7baa..e626abd0ff 100644 --- a/airflow/providers/jdbc/operators/jdbc.py +++ b/airflow/providers/jdbc/operators/jdbc.py @@ -57,6 +57,7 @@ class JdbcOperator(BaseOperator): self.sql = sql self.jdbc_conn_id = jdbc_conn_id self.autocommit = autocommit + self.hook = None def execute(self, context): self.log.info('Executing: %s', self.sql) diff --git a/airflow/providers/jenkins/hooks/jenkins.py b/airflow/providers/jenkins/hooks/jenkins.py index 8cab5a1e1c..e547390032 100644 --- a/airflow/providers/jenkins/hooks/jenkins.py +++ b/airflow/providers/jenkins/hooks/jenkins.py @@ -33,17 +33,20 @@ class JenkinsHook(BaseHook): super().__init__() connection = self.get_connection(conn_id) self.connection = connection - connectionPrefix = 'http' + connection_prefix = 'http' # connection.extra contains info about using https (true) or http (false) if connection.extra is None or connection.extra == '': connection.extra = 'false' # set a default value to connection.extra # to avoid rising ValueError in strtobool if strtobool(connection.extra): - connectionPrefix = 'https' - url = '%s://%s:%d' % (connectionPrefix, connection.host, connection.port) + connection_prefix = 'https' + url = f'{connection_prefix}://{connection.host}:{connection.port}' self.log.info('Trying to connect to %s', url) self.jenkins_server = jenkins.Jenkins(url, connection.login, connection.password) def get_jenkins_server(self): + """ + Get jenkins server + """ return self.jenkins_server diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py index eaf610d40d..434279c6d7 100644 --- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py +++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py @@ -168,11 +168,13 @@ class JenkinsJobTriggerOperator(BaseOperator): return build_number try_count += 1 time.sleep(self.sleep_time) - raise AirflowException("The job hasn't been executed" - " after polling the queue %d times", - self.max_try_before_job_appears) + raise AirflowException("The job hasn't been executed after polling " + f"the queue {self.max_try_before_job_appears} times") def get_hook(self): + """ + Instantiate jenkins hook + """ return JenkinsHook(self.jenkins_connection_id) def execute(self, context): @@ -200,6 +202,7 @@ class JenkinsJobTriggerOperator(BaseOperator): time.sleep(self.sleep_time) keep_polling_job = True build_info = None + # pylint: disable=too-many-nested-blocks while keep_polling_job: try: build_info = jenkins_server.get_build_info(name=self.job_name, @@ -217,16 +220,17 @@ class JenkinsJobTriggerOperator(BaseOperator): self.job_name, build_number) time.sleep(self.sleep_time) except jenkins.NotFoundException as err: + # pylint: disable=no-member raise AirflowException( - 'Jenkins job status check failed. Final error was: %s' - % err.resp.status) + 'Jenkins job status check failed. Final error was: ' + f'{err.resp.status}') except jenkins.JenkinsException as err: raise AirflowException( - 'Jenkins call failed with error : %s, if you have parameters ' + f'Jenkins call failed with error : {err}, if you have parameters ' 'double check them, jenkins sends back ' 'this exception for unknown parameters' 'You can also check logs for more details on this exception ' - '(jenkins_url/log/rss)', str(err)) + '(jenkins_url/log/rss)') if build_info: # If we can we return the url of the job # for later use (like retrieving an artifact) diff --git a/airflow/providers/microsoft/azure/hooks/azure_container_volume.py b/airflow/providers/microsoft/azure/hooks/azure_container_volume.py index ae40885b66..fe72c359cf 100644 --- a/airflow/providers/microsoft/azure/hooks/azure_container_volume.py +++ b/airflow/providers/microsoft/azure/hooks/azure_container_volume.py @@ -35,6 +35,9 @@ class AzureContainerVolumeHook(BaseHook): self.conn_id = wasb_conn_id def get_storagekey(self): + """ + Get Azure File Volume storage key + """ conn = self.get_connection(self.conn_id) service_options = conn.extra_dejson @@ -47,6 +50,9 @@ class AzureContainerVolumeHook(BaseHook): def get_file_volume(self, mount_name, share_name, storage_account_name, read_only=False): + """ + Get Azure File Volume + """ return Volume(name=mount_name, azure_file=AzureFileVolume(share_name=share_name, storage_account_name=storage_account_name, diff --git a/airflow/providers/microsoft/azure/hooks/azure_cosmos.py b/airflow/providers/microsoft/azure/hooks/azure_cosmos.py index 7a04b735fa..12599f5009 100644 --- a/airflow/providers/microsoft/azure/hooks/azure_cosmos.py +++ b/airflow/providers/microsoft/azure/hooks/azure_cosmos.py @@ -288,12 +288,21 @@ class AzureCosmosDBHook(BaseHook): def get_database_link(database_id): + """ + Get Azure CosmosDB database link + """ return "dbs/" + database_id def get_collection_link(database_id, collection_id): + """ + Get Azure CosmosDB collection link + """ return get_database_link(database_id) + "/colls/" + collection_id def get_document_link(database_id, collection_id, document_id): + """ + Get Azure CosmosDB document link + """ return get_collection_link(database_id, collection_id) + "/docs/" + document_id diff --git a/airflow/providers/microsoft/azure/hooks/wasb.py b/airflow/providers/microsoft/azure/hooks/wasb.py index 899abf72d5..32f9b57d17 100644 --- a/airflow/providers/microsoft/azure/hooks/wasb.py +++ b/airflow/providers/microsoft/azure/hooks/wasb.py @@ -191,7 +191,7 @@ class WasbHook(BaseHook): raise AirflowException('Blob(s) not found: {}'.format(blob_name)) for blob_uri in blobs_to_delete: - self.log.info("Deleting blob: " + blob_uri) + self.log.info("Deleting blob: %s", blob_uri) self.connection.delete_blob(container_name, blob_uri, delete_snapshots='include', diff --git a/airflow/providers/microsoft/azure/operators/azure_container_instances.py b/airflow/providers/microsoft/azure/operators/azure_container_instances.py index c134f24722..b56a0e4a4f 100644 --- a/airflow/providers/microsoft/azure/operators/azure_container_instances.py +++ b/airflow/providers/microsoft/azure/operators/azure_container_instances.py @@ -46,6 +46,7 @@ DEFAULT_MEMORY_IN_GB = 2.0 DEFAULT_CPU = 1.0 +# pylint: disable=too-many-instance-attributes class AzureContainerInstancesOperator(BaseOperator): """ Start a container on Azure Container Instances @@ -119,6 +120,7 @@ class AzureContainerInstancesOperator(BaseOperator): template_fields = ('name', 'image', 'command', 'environment_variables') + # pylint: disable=too-many-arguments @apply_defaults def __init__(self, ci_conn_id, @@ -232,7 +234,7 @@ class AzureContainerInstancesOperator(BaseOperator): self.log.info("Container group started %s/%s", self.resource_group, self.name) - exit_code = self._monitor_logging(self._ci_hook, self.resource_group, self.name) + exit_code = self._monitor_logging(self.resource_group, self.name) self.log.info("Container had exit code: %s", exit_code) if exit_code != 0: @@ -253,14 +255,15 @@ class AzureContainerInstancesOperator(BaseOperator): self.log.info("Deleting container group") try: self._ci_hook.delete(self.resource_group, self.name) - except Exception: + except Exception: # pylint: disable=broad-except self.log.exception("Could not delete container group") - def _monitor_logging(self, ci_hook, resource_group, name): + def _monitor_logging(self, resource_group, name): last_state = None last_message_logged = None last_line_logged = None + # pylint: disable=too-many-nested-blocks while True: try: cg_state = self._ci_hook.get_state(resource_group, name) @@ -310,7 +313,7 @@ class AzureContainerInstancesOperator(BaseOperator): return 1 else: self.log.exception("Exception while getting container groups") - except Exception: + except Exception: # pylint: disable=broad-except self.log.exception("Exception while getting container groups") sleep(1) @@ -330,6 +333,7 @@ class AzureContainerInstancesOperator(BaseOperator): self.log.info(line.rstrip()) return logs[-1] + return None @staticmethod def _check_name(name): diff --git a/airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py b/airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py index a78bbb1624..452543c64c 100644 --- a/airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py +++ b/airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py @@ -58,6 +58,7 @@ class OracleToAzureDataLakeTransfer(BaseOperator): template_fields = ('filename', 'sql', 'sql_params') ui_color = '#e08c8c' + # pylint: disable=too-many-arguments @apply_defaults def __init__( self, diff --git a/airflow/providers/microsoft/mssql/hooks/mssql.py b/airflow/providers/microsoft/mssql/hooks/mssql.py index f6e732ab73..4bee8abdcb 100644 --- a/airflow/providers/microsoft/mssql/hooks/mssql.py +++ b/airflow/providers/microsoft/mssql/hooks/mssql.py @@ -71,7 +71,8 @@ class MsSqlHook(DbApiHook): """ Returns a mssql connection object """ - conn = self.get_connection(self.mssql_conn_id) + conn = self.get_connection(self.mssql_conn_id) # pylint: disable=no-member + # pylint: disable=c-extension-no-member conn = pymssql.connect( server=conn.host, user=conn.login, diff --git a/airflow/providers/microsoft/winrm/operators/winrm.py b/airflow/providers/microsoft/winrm/operators/winrm.py index c8f6631dfd..53e80ebc4a 100644 --- a/airflow/providers/microsoft/winrm/operators/winrm.py +++ b/airflow/providers/microsoft/winrm/operators/winrm.py @@ -82,6 +82,7 @@ class WinRMOperator(BaseOperator): winrm_client = self.winrm_hook.get_conn() + # pylint: disable=too-many-nested-blocks try: self.log.info("Running command: '%s'...", self.command) command_id = self.winrm_hook.winrm_protocol.run_command( @@ -95,6 +96,7 @@ class WinRMOperator(BaseOperator): command_done = False while not command_done: try: + # pylint: disable=protected-access stdout, stderr, return_code, command_done = \ self.winrm_hook.winrm_protocol._raw_get_command_output( winrm_client, diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py index 465f740723..6f610f9ab8 100644 --- a/airflow/providers/mysql/hooks/mysql.py +++ b/airflow/providers/mysql/hooks/mysql.py @@ -143,9 +143,9 @@ class MySqlHook(DbApiHook): return MySQLdb.connect(**conn_config) if client_name == 'mysql-connector-python': - import mysql.connector + import mysql.connector # pylint: disable=no-name-in-module conn_config = self._get_conn_config_mysql_connector_python(conn) - return mysql.connector.connect(**conn_config) + return mysql.connector.connect(**conn_config) # pylint: disable=no-member raise ValueError('Unknown MySQL client name provided!') diff --git a/airflow/providers/mysql/operators/vertica_to_mysql.py b/airflow/providers/mysql/operators/vertica_to_mysql.py index afbdd893ac..6f9d5c7e8d 100644 --- a/airflow/providers/mysql/operators/vertica_to_mysql.py +++ b/airflow/providers/mysql/operators/vertica_to_mysql.py @@ -142,7 +142,7 @@ class VerticaToMySqlTransfer(BaseOperator): rows=result, target_fields=selected_columns) self.log.info("Inserted rows into MySQL %s", count) - except (MySQLdb.Error, MySQLdb.Warning): + except (MySQLdb.Error, MySQLdb.Warning): # pylint: disable=no-member self.log.info("Inserted rows into MySQL 0") raise diff --git a/airflow/providers/openfaas/hooks/openfaas.py b/airflow/providers/openfaas/hooks/openfaas.py index aa022415b2..8eaf70d0d5 100644 --- a/airflow/providers/openfaas/hooks/openfaas.py +++ b/airflow/providers/openfaas/hooks/openfaas.py @@ -53,47 +53,59 @@ class OpenFaasHook(BaseHook): return conn def deploy_function(self, overwrite_function_if_exist, body): + """ + Deploy OpenFaaS function + """ if overwrite_function_if_exist: - self.log.info("Function already exist " + self.function_name + " going to update") + self.log.info("Function already exist %s going to update", self.function_name) self.update_function(body) else: url = self.get_conn().host + self.DEPLOY_FUNCTION - self.log.info("Deploying function " + url) + self.log.info("Deploying function %s", url) response = requests.post(url, body) if response.status_code != OK_STATUS_CODE: - self.log.error("Response status " + str(response.status_code)) + self.log.error("Response status %d", response.status_code) self.log.error("Failed to deploy") raise AirflowException('failed to deploy') else: - self.log.info("Function deployed " + self.function_name) + self.log.info("Function deployed %s", self.function_name) def invoke_async_function(self, body): + """ + Invoking function + """ url = self.get_conn().host + self.INVOKE_ASYNC_FUNCTION + self.function_name - self.log.info("Invoking function " + url) + self.log.info("Invoking function %s", url) response = requests.post(url, body) if response.ok: - self.log.info("Invoked " + self.function_name) + self.log.info("Invoked %s", self.function_name) else: - self.log.error("Response status " + str(response.status_code)) + self.log.error("Response status %d", response.status_code) raise AirflowException('failed to invoke function') def update_function(self, body): + """ + Update OpenFaaS function + """ url = self.get_conn().host + self.UPDATE_FUNCTION - self.log.info("Updating function " + url) + self.log.info("Updating function %s", url) response = requests.put(url, body) if response.status_code != OK_STATUS_CODE: - self.log.error("Response status " + str(response.status_code)) - self.log.error("Failed to update response " + response.content.decode("utf-8")) + self.log.error("Response status %d", response.status_code) + self.log.error("Failed to update response %s", response.content.decode("utf-8")) raise AirflowException('failed to update ' + self.function_name) else: self.log.info("Function was updated") def does_function_exist(self): + """ + Whether OpenFaaS function exists or not + """ url = self.get_conn().host + self.GET_FUNCTION + self.function_name response = requests.get(url) if response.ok: return True else: - self.log.error("Failed to find function " + self.function_name) + self.log.error("Failed to find function %s", self.function_name) return False diff --git a/airflow/providers/opsgenie/operators/opsgenie_alert.py b/airflow/providers/opsgenie/operators/opsgenie_alert.py index bbb6fd37d7..ce09574157 100644 --- a/airflow/providers/opsgenie/operators/opsgenie_alert.py +++ b/airflow/providers/opsgenie/operators/opsgenie_alert.py @@ -42,9 +42,9 @@ class OpsgenieAlertOperator(BaseOperator): :param responders: Teams, users, escalations and schedules that the alert will be routed to send notifications. :type responders: list[dict] - :param visibleTo: Teams and users that the alert will become visible + :param visible_to: Teams and users that the alert will become visible to without sending any notification. - :type visibleTo: list[dict] + :type visible_to: list[dict] :param actions: Custom actions that will be available for the alert. :type actions: list[str] :param tags: Tags of the alert. @@ -66,6 +66,7 @@ class OpsgenieAlertOperator(BaseOperator): """ template_fields = ('message', 'alias', 'description', 'entity', 'priority', 'note') + # pylint: disable=too-many-arguments @apply_defaults def __init__(self, message, @@ -73,7 +74,7 @@ class OpsgenieAlertOperator(BaseOperator): alias=None, description=None, responders=None, - visibleTo=None, + visible_to=None, actions=None, tags=None, details=None, @@ -92,7 +93,7 @@ class OpsgenieAlertOperator(BaseOperator): self.alias = alias self.description = description self.responders = responders - self.visibleTo = visibleTo + self.visible_to = visible_to self.actions = actions self.tags = tags self.details = details @@ -114,7 +115,7 @@ class OpsgenieAlertOperator(BaseOperator): for key in [ "message", "alias", "description", "responders", - "visibleTo", "actions", "tags", "details", "entity", + "visible_to", "actions", "tags", "details", "entity", "source", "priority", "user", "note" ]: val = getattr(self, key) diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index 699a013105..b2385c83b0 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -32,6 +32,7 @@ class OracleHook(DbApiHook): default_conn_name = 'oracle_default' supports_autocommit = False + # pylint: disable=c-extension-no-member def get_conn(self): """ Returns a oracle connection object @@ -50,7 +51,7 @@ class OracleHook(DbApiHook): see more param detail in `cx_Oracle.connect `_ """ - conn = self.get_connection(self.oracle_conn_id) + conn = self.get_connection(self.oracle_conn_id) # pylint: disable=no-member conn_config = { 'user': conn.login, 'password': conn.password @@ -154,7 +155,7 @@ class OracleHook(DbApiHook): lst.append("'" + str(cell).replace("'", "''") + "'") elif cell is None: lst.append('NULL') - elif type(cell) == float and \ + elif isinstance(cell, float) and \ numpy.isnan(cell): # coerce numpy NaN to NULL lst.append('NULL') elif isinstance(cell, numpy.datetime64): diff --git a/airflow/providers/oracle/operators/oracle_to_oracle_transfer.py b/airflow/providers/oracle/operators/oracle_to_oracle_transfer.py index 817f367d87..25424060c1 100644 --- a/airflow/providers/oracle/operators/oracle_to_oracle_transfer.py +++ b/airflow/providers/oracle/operators/oracle_to_oracle_transfer.py @@ -64,6 +64,7 @@ class OracleToOracleTransfer(BaseOperator): self.source_sql_params = source_sql_params self.rows_chunk = rows_chunk + # pylint: disable=unused-argument def _execute(self, src_hook, dest_hook, context): with src_hook.get_conn() as src_conn: cursor = src_conn.cursor() diff --git a/airflow/providers/papermill/operators/papermill.py b/airflow/providers/papermill/operators/papermill.py index ad8f984e09..b34e5c88f4 100644 --- a/airflow/providers/papermill/operators/papermill.py +++ b/airflow/providers/papermill/operators/papermill.py @@ -27,6 +27,9 @@ from airflow.utils.decorators import apply_defaults @attr.s(auto_attribs=True) class NoteBook(File): + """ + Jupyter notebook + """ type_hint: Optional[str] = "jupyter_notebook" parameters: Optional[Dict] = {} diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index 640c775d4b..fd6c23c2c5 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -56,6 +56,7 @@ class PostgresHook(DbApiHook): super().__init__(*args, **kwargs) self.schema = kwargs.pop("schema", None) self.connection = kwargs.pop("connection", None) + self.conn = None def _get_cursor(self, raw_cursor): _cursor = raw_cursor.lower() @@ -95,7 +96,7 @@ class PostgresHook(DbApiHook): self.conn = psycopg2.connect(**conn_args) return self.conn - def copy_expert(self, sql, filename, open=open): + def copy_expert(self, sql, filename): """ Executes SQL using psycopg2 copy_expert method. Necessary to execute COPY command without access to a superuser. @@ -129,6 +130,7 @@ class PostgresHook(DbApiHook): """ self.copy_expert("COPY {table} TO STDOUT".format(table=table), tmp_file) + # pylint: disable=signature-differs @staticmethod def _serialize_cell(cell, conn): """ diff --git a/airflow/providers/postgres/operators/postgres.py b/airflow/providers/postgres/operators/postgres.py index 647cfedbf1..f11071435a 100644 --- a/airflow/providers/postgres/operators/postgres.py +++ b/airflow/providers/postgres/operators/postgres.py @@ -60,6 +60,7 @@ class PostgresOperator(BaseOperator): self.autocommit = autocommit self.parameters = parameters self.database = database + self.hook = None def execute(self, context): self.log.info('Executing: %s', self.sql) diff --git a/airflow/providers/presto/hooks/presto.py b/airflow/providers/presto/hooks/presto.py index 6b9c18c2c7..b316311301 100644 --- a/airflow/providers/presto/hooks/presto.py +++ b/airflow/providers/presto/hooks/presto.py @@ -23,7 +23,9 @@ from airflow.hooks.dbapi_hook import DbApiHook class PrestoException(Exception): - pass + """ + Presto exception + """ class PrestoHook(DbApiHook): @@ -41,7 +43,7 @@ class PrestoHook(DbApiHook): def get_conn(self): """Returns a connection object""" - db = self.get_connection(self.presto_conn_id) + db = self.get_connection(self.presto_conn_id) # pylint: disable=no-member auth = prestodb.auth.BasicAuthentication(db.login, db.password) if db.password else None return prestodb.dbapi.connect( @@ -58,7 +60,7 @@ class PrestoHook(DbApiHook): def get_isolation_level(self): """Returns an isolation level""" - db = self.get_connection(self.presto_conn_id) + db = self.get_connection(self.presto_conn_id) # pylint: disable=no-member isolation_level = db.extra_dejson.get('isolation_level', 'AUTOCOMMIT').upper() return getattr(IsolationLevel, isolation_level, IsolationLevel.AUTOCOMMIT) diff --git a/airflow/providers/qubole/hooks/qubole_check.py b/airflow/providers/qubole/hooks/qubole_check.py index 31b162cc99..b0f31ab54b 100644 --- a/airflow/providers/qubole/hooks/qubole_check.py +++ b/airflow/providers/qubole/hooks/qubole_check.py @@ -31,6 +31,9 @@ ROW_DELIM = '\r\n' def isint(value): + """ + Whether Qubole column are integer + """ try: int(value) return True @@ -39,6 +42,9 @@ def isint(value): def isfloat(value): + """ + Whether Qubole column are float + """ try: float(value) return True @@ -47,14 +53,19 @@ def isfloat(value): def isbool(value): + """ + Whether Qubole column are boolean + """ try: - if value.lower() in ["true", "false"]: - return True + return value.lower() in ["true", "false"] except ValueError: return False def parse_first_row(row_list): + """ + Parse Qubole first record list + """ record_list = [] first_row = row_list[0] if row_list else "" @@ -71,6 +82,9 @@ def parse_first_row(row_list): class QuboleCheckHook(QuboleHook): + """ + Qubole check hook + """ def __init__(self, context, *args, **kwargs): super().__init__(*args, **kwargs) self.results_parser_callable = parse_first_row @@ -93,7 +107,10 @@ class QuboleCheckHook(QuboleHook): log.info('Cancelling the Qubole Command Id: %s', cmd_id) cmd.cancel() - def get_first(self, sql): + def get_first(self, sql): # pylint: disable=unused-argument + """ + Get Qubole query first record list + """ self.execute(context=self.context) query_result = self.get_query_results() row_list = list(filter(None, query_result.split(ROW_DELIM))) @@ -101,9 +118,12 @@ class QuboleCheckHook(QuboleHook): return record_list def get_query_results(self): + """ + Get Qubole query result + """ if self.cmd is not None: cmd_id = self.cmd.id - self.log.info("command id: " + str(cmd_id)) + self.log.info("command id: %d", cmd_id) query_result_buffer = StringIO() self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM) query_result = query_result_buffer.getvalue() @@ -111,3 +131,4 @@ class QuboleCheckHook(QuboleHook): return query_result else: self.log.info("Qubole command not found") + return None diff --git a/airflow/providers/qubole/operators/qubole_check.py b/airflow/providers/qubole/operators/qubole_check.py index 6296a98c2b..01dd892ac8 100644 --- a/airflow/providers/qubole/operators/qubole_check.py +++ b/airflow/providers/qubole/operators/qubole_check.py @@ -209,6 +209,9 @@ class QuboleValueCheckOperator(ValueCheckOperator, QuboleOperator): def get_sql_from_qbol_cmd(params): + """ + Get Qubole sql from Qubole command + """ sql = '' if 'query' in params: sql = params['query'] @@ -218,6 +221,9 @@ def get_sql_from_qbol_cmd(params): def handle_airflow_exception(airflow_exception, hook): + """ + Qubole check handle Airflow exception + """ cmd = hook.cmd if cmd is not None: if cmd.is_success(cmd.status): diff --git a/airflow/providers/samba/hooks/samba.py b/airflow/providers/samba/hooks/samba.py index cd1e17af38..aebc61f27d 100644 --- a/airflow/providers/samba/hooks/samba.py +++ b/airflow/providers/samba/hooks/samba.py @@ -42,6 +42,9 @@ class SambaHook(BaseHook): return samba def push_from_local(self, destination_filepath, local_filepath): + """ + Push local file to samba server + """ samba = self.get_conn() if samba.exists(destination_filepath): if samba.isfile(destination_filepath): diff --git a/airflow/providers/segment/hooks/segment.py b/airflow/providers/segment/hooks/segment.py index 9d17f2085b..940621d5a5 100644 --- a/airflow/providers/segment/hooks/segment.py +++ b/airflow/providers/segment/hooks/segment.py @@ -31,6 +31,27 @@ from airflow.hooks.base_hook import BaseHook class SegmentHook(BaseHook): + """ + Create new connection to Segment + and allows you to pull data out of Segment or write to it. + + You can then use that file with other + Airflow operators to move the data around or interact with segment. + + :param segment_conn_id: the name of the connection that has the parameters + we need to connect to Segment. The connection should be type `json` and include a + write_key security token in the `Extras` field. + :type segment_conn_id: str + :param segment_debug_mode: Determines whether Segment should run in debug mode. + Defaults to False + :type segment_debug_mode: bool + + .. note:: + You must include a JSON structure in the `Extras` field. + We need a user's security token to connect to Segment. + So we define it in the `Extras` field as: + `{"write_key":"YOUR_SECURITY_TOKEN"}` + """ def __init__( self, segment_conn_id='segment_default', @@ -38,27 +59,6 @@ class SegmentHook(BaseHook): *args, **kwargs ): - """ - Create new connection to Segment - and allows you to pull data out of Segment or write to it. - - You can then use that file with other - Airflow operators to move the data around or interact with segment. - - :param segment_conn_id: the name of the connection that has the parameters - we need to connect to Segment. - The connection should be type `json` and include a - write_key security token in the `Extras` field. - :type segment_conn_id: str - :param segment_debug_mode: Determines whether Segment should run in debug mode. - Defaults to False - :type segment_debug_mode: bool - .. note:: - You must include a JSON structure in the `Extras` field. - We need a user's security token to connect to Segment. - So we define it in the `Extras` field as: - `{"write_key":"YOUR_SECURITY_TOKEN"}` - """ super().__init__() self.segment_conn_id = segment_conn_id self.segment_debug_mode = segment_debug_mode @@ -85,7 +85,6 @@ class SegmentHook(BaseHook): """ Handles error callbacks when using Segment with segment_debug_mode set to True """ - self.log.error('Encountered Segment error: {segment_error} with ' - 'items: {with_items}'.format(segment_error=error, - with_items=items)) + self.log.error('Encountered Segment error: %s with ' + 'items: %s', error, items) raise AirflowException('Segment error: {}'.format(error)) diff --git a/airflow/providers/segment/operators/segment_track_event.py b/airflow/providers/segment/operators/segment_track_event.py index 5d83881582..4d3db18748 100644 --- a/airflow/providers/segment/operators/segment_track_event.py +++ b/airflow/providers/segment/operators/segment_track_event.py @@ -65,6 +65,7 @@ class SegmentTrackEventOperator(BaseOperator): 'Sending track event (%s) for user id: %s with properties: %s', self.event, self.user_id, self.properties) + # pylint: disable=no-member hook.track( user_id=self.user_id, event=self.event, diff --git a/airflow/providers/slack/hooks/slack_webhook.py b/airflow/providers/slack/hooks/slack_webhook.py index e0ff1b13ff..de50d1f3cc 100644 --- a/airflow/providers/slack/hooks/slack_webhook.py +++ b/airflow/providers/slack/hooks/slack_webhook.py @@ -59,6 +59,7 @@ class SlackWebhookHook(HttpHook): :type proxy: str """ + # pylint: disable=too-many-arguments def __init__(self, http_conn_id=None, webhook_token=None, diff --git a/airflow/providers/slack/operators/slack_webhook.py b/airflow/providers/slack/operators/slack_webhook.py index 80ff9a8446..5e457a55f3 100644 --- a/airflow/providers/slack/operators/slack_webhook.py +++ b/airflow/providers/slack/operators/slack_webhook.py @@ -61,6 +61,7 @@ class SlackWebhookOperator(SimpleHttpOperator): template_fields = ['webhook_token', 'message', 'attachments', 'blocks', 'channel', 'username', 'proxy', ] + # pylint: disable=too-many-arguments @apply_defaults def __init__(self, http_conn_id=None, diff --git a/airflow/providers/sqlite/hooks/sqlite.py b/airflow/providers/sqlite/hooks/sqlite.py index 2af589c524..eef373428a 100644 --- a/airflow/providers/sqlite/hooks/sqlite.py +++ b/airflow/providers/sqlite/hooks/sqlite.py @@ -35,6 +35,6 @@ class SqliteHook(DbApiHook): """ Returns a sqlite connection object """ - conn = self.get_connection(self.sqlite_conn_id) + conn = self.get_connection(self.sqlite_conn_id) # pylint: disable=no-member conn = sqlite3.connect(conn.host) return conn diff --git a/airflow/providers/ssh/operators/ssh.py b/airflow/providers/ssh/operators/ssh.py index c3494698e1..532a4a246f 100644 --- a/airflow/providers/ssh/operators/ssh.py +++ b/airflow/providers/ssh/operators/ssh.py @@ -83,7 +83,7 @@ class SSHOperator(BaseOperator): if self.ssh_hook and isinstance(self.ssh_hook, SSHHook): self.log.info("ssh_conn_id is ignored when ssh_hook is provided.") else: - self.log.info("ssh_hook is not provided or invalid. " + + self.log.info("ssh_hook is not provided or invalid. " "Trying ssh_conn_id to create SSHHook.") self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout) @@ -92,8 +92,8 @@ class SSHOperator(BaseOperator): raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.") if self.remote_host is not None: - self.log.info("remote_host is provided explicitly. " + - "It will replace the remote_host which was defined " + + self.log.info("remote_host is provided explicitly. " + "It will replace the remote_host which was defined " "in ssh_hook or predefined in connection of ssh_conn_id.") self.ssh_hook.remote_host = self.remote_host @@ -130,15 +130,13 @@ class SSHOperator(BaseOperator): channel.recv_ready() or \ channel.recv_stderr_ready(): readq, _, _ = select([channel], [], [], self.timeout) - for c in readq: - if c.recv_ready(): - line = stdout.channel.recv(len(c.in_buffer)) - line = line + for recv in readq: + if recv.recv_ready(): + line = stdout.channel.recv(len(recv.in_buffer)) agg_stdout += line self.log.info(line.decode('utf-8').strip('\n')) - if c.recv_stderr_ready(): - line = stderr.channel.recv_stderr(len(c.in_stderr_buffer)) - line = line + if recv.recv_stderr_ready(): + line = stderr.channel.recv_stderr(len(recv.in_stderr_buffer)) agg_stderr += line self.log.warning(line.decode('utf-8').strip('\n')) if stdout.channel.exit_status_ready()\ @@ -172,5 +170,8 @@ class SSHOperator(BaseOperator): return True def tunnel(self): + """ + Get ssh tunnel + """ ssh_client = self.ssh_hook.get_conn() ssh_client.get_transport() diff --git a/airflow/providers/vertica/hooks/vertica.py b/airflow/providers/vertica/hooks/vertica.py index 0f9f3aefbd..91aa682dd8 100644 --- a/airflow/providers/vertica/hooks/vertica.py +++ b/airflow/providers/vertica/hooks/vertica.py @@ -35,7 +35,7 @@ class VerticaHook(DbApiHook): """ Returns verticaql connection object """ - conn = self.get_connection(self.vertica_conn_id) + conn = self.get_connection(self.vertica_conn_id) # pylint: disable=no-member conn_config = { "user": conn.login, "password": conn.password or '', diff --git a/airflow/providers/zendesk/hooks/zendesk.py b/airflow/providers/zendesk/hooks/zendesk.py index fb365e75d7..1134bfa5d8 100644 --- a/airflow/providers/zendesk/hooks/zendesk.py +++ b/airflow/providers/zendesk/hooks/zendesk.py @@ -83,6 +83,7 @@ class ZendeskHook(BaseHook): keys += query['include'].split(',') results = {key: results[key] for key in keys} + # pylint: disable=too-many-nested-blocks if get_all_pages: while next_page is not None: try: @@ -100,15 +101,13 @@ class ZendeskHook(BaseHook): # next just refers to the current set of results. # Hence, need to deal with this special case break - else: - next_page = more_res['next_page'] + next_page = more_res['next_page'] except RateLimitError as rle: self.__handle_rate_limit_exception(rle) - except ZendeskError as ze: - if b"Use a start_time older than 5 minutes" in ze.msg: + except ZendeskError as zde: + if b"Use a start_time older than 5 minutes" in zde.msg: # We have pretty up to date data break - else: - raise ze + raise zde return results diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index 5e83971c23..abdccea644 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -24,86 +24,6 @@ ./airflow/models/taskinstance.py ./airflow/models/variable.py ./airflow/models/xcom.py -./airflow/providers/apache/druid/hooks/druid.py -./airflow/providers/apache/druid/operators/druid.py -./airflow/providers/apache/druid/operators/druid_check.py -./airflow/providers/apache/hive/hooks/hive.py -./airflow/providers/apache/hive/operators/hive.py -./airflow/providers/apache/hive/operators/hive_stats.py -./airflow/providers/apache/pig/hooks/pig.py -./airflow/providers/apache/pig/operators/pig.py -./airflow/providers/apache/pinot/hooks/pinot.py -./airflow/providers/apache/spark/hooks/spark_jdbc.py -./airflow/providers/apache/spark/hooks/spark_jdbc_script.py -./airflow/providers/apache/spark/hooks/spark_sql.py -./airflow/providers/apache/spark/hooks/spark_submit.py -./airflow/providers/apache/spark/operators/spark_jdbc.py -./airflow/providers/apache/spark/operators/spark_sql.py -./airflow/providers/apache/spark/operators/spark_submit.py -./airflow/providers/apache/sqoop/hooks/sqoop.py -./airflow/providers/apache/sqoop/operators/sqoop.py -./airflow/providers/datadog/hooks/datadog.py -./airflow/providers/ddiscord/hooks/discord_webhook.py -./airflow/providers/dindding/operators/dingding.py -./airflow/providers/dingding/hooks/dingding.py -./airflow/providers/discord/operators/discord_webhook.py -./airflow/providers/docker/hooks/docker.py -./airflow/providers/email/operators/email.py -./airflow/providers/ftp/hooks/ftp.py -./airflow/providers/google/cloud/operators/s3_to_gcs.py -./airflow/providers/grpc/operators/grpc.py -./airflow/providers/http/hooks/http.py -./airflow/providers/http/operators/http.py -./airflow/providers/jdbc/hooks/jdbc.py -./airflow/providers/jdbc/operators/jdbc.py -./airflow/providers/jenkins/hooks/jenkins.py -./airflow/providers/jenkins/operators/jenkins_job_trigger.py -./airflow/providers/jira/operators/jira.py -./airflow/providers/microsoft/azure/hooks/azure_container_instance.py -./airflow/providers/microsoft/azure/hooks/azure_container_volume.py -./airflow/providers/microsoft/azure/hooks/azure_cosmos.py -./airflow/providers/microsoft/azure/hooks/azure_data_lake.py -./airflow/providers/microsoft/azure/hooks/azure_fileshare.py -./airflow/providers/microsoft/azure/hooks/wasb.py -./airflow/providers/microsoft/azure/operators/adls_list.py -./airflow/providers/microsoft/azure/operators/azure_container_instances.py -./airflow/providers/microsoft/azure/operators/azure_cosmos.py -./airflow/providers/microsoft/azure/operators/file_to_wasb.py -./airflow/providers/microsoft/azure/operators/oracle_to_azure_data_lake_transfer.py -./airflow/providers/microsoft/azure/operators/wasb_delete_blob.py -./airflow/providers/microsoft/mssql/hooks/mssql.py -./airflow/providers/microsoft/mssql/operators/mssql.py -./airflow/providers/microsoft/winrm/operators/winrm.py -./airflow/providers/mssql/operators/mysql.py -./airflow/providers/mysql/hooks/mysql.py -./airflow/providers/mysql/operators/presto_to_mysql.py -./airflow/providers/mysql/operators/vertica_to_mysql.py -./airflow/providers/openfaas/hooks/openfaas.py -./airflow/providers/opsgenie/hooks/opsgenie_alert.py -./airflow/providers/opsgenie/operators/opsgenie_alert.py -./airflow/providers/oracle/hooks/oracle.py -./airflow/providers/oracle/operators/oracle.py -./airflow/providers/oracle/operators/oracle_to_oracle_transfer.py -./airflow/providers/papermill/operators/papermill.py -./airflow/providers/postgres/hooks/postgres.py -./airflow/providers/postgres/operators/postgres.py -./airflow/providers/presto/hooks/presto.py -./airflow/providers/presto/operators/presto_check.py -./airflow/providers/qubole/hooks/qubole_check.py -./airflow/providers/qubole/operators/qubole_check.py -./airflow/providers/redis/operators/redis_publish.py -./airflow/providers/samba/hooks/samba.py -./airflow/providers/segment/hooks/segment.py -./airflow/providers/segment/operators/segment_track_event.py -./airflow/providers/slack/hooks/slack_webhook.py -./airflow/providers/slack/operators/slack.py -./airflow/providers/slack/operators/slack_webhook.py -./airflow/providers/sqlite/hooks/sqlite.py -./airflow/providers/sqlite/operators/sqlite.py -./airflow/providers/ssh/operators/ssh.py -./airflow/providers/vertica/hooks/vertica.py -./airflow/providers/vertica/operators/vertica.py -./airflow/providers/zendesk/hooks/zendesk.py ./airflow/settings.py ./airflow/stats.py ./airflow/www/api/experimental/endpoints.py diff --git a/tests/providers/apache/pig/hooks/test_pig.py b/tests/providers/apache/pig/hooks/test_pig.py index b6f70651ed..bab66cace6 100644 --- a/tests/providers/apache/pig/hooks/test_pig.py +++ b/tests/providers/apache/pig/hooks/test_pig.py @@ -105,7 +105,7 @@ class TestPigCliHook(unittest.TestCase): def test_kill_no_sp(self): sp_mock = mock.Mock() hook = self.pig_hook() - hook.sp = sp_mock # pylint: disable=attribute-defined-outside-init + hook.sub_process = sp_mock hook.kill() self.assertFalse(sp_mock.kill.called) @@ -115,7 +115,7 @@ class TestPigCliHook(unittest.TestCase): sp_mock.poll.return_value = 0 hook = self.pig_hook() - hook.sp = sp_mock # pylint: disable=attribute-defined-outside-init + hook.sub_process = sp_mock hook.kill() self.assertFalse(sp_mock.kill.called) @@ -125,7 +125,7 @@ class TestPigCliHook(unittest.TestCase): sp_mock.poll.return_value = None hook = self.pig_hook() - hook.sp = sp_mock # pylint: disable=attribute-defined-outside-init + hook.sub_process = sp_mock hook.kill() self.assertTrue(sp_mock.kill.called) diff --git a/tests/providers/apache/pinot/hooks/test_pinot.py b/tests/providers/apache/pinot/hooks/test_pinot.py index 474704bfb9..445d71cdcd 100644 --- a/tests/providers/apache/pinot/hooks/test_pinot.py +++ b/tests/providers/apache/pinot/hooks/test_pinot.py @@ -65,7 +65,7 @@ class TestPinotAdminHook(unittest.TestCase): params = { "generator_config_file": "a", "data_dir": "b", - "format": "c", + "segment_format": "c", "out_dir": "d", "overwrite": True, "table_name": "e", @@ -89,7 +89,7 @@ class TestPinotAdminHook(unittest.TestCase): 'CreateSegment', '-generatorConfigFile', params["generator_config_file"], '-dataDir', params["data_dir"], - '-format', params["format"], + '-format', params["segment_format"], '-outDir', params["out_dir"], '-overwrite', params["overwrite"], '-tableName', params["table_name"], diff --git a/tests/providers/opsgenie/operators/test_opsgenie_alert.py b/tests/providers/opsgenie/operators/test_opsgenie_alert.py index d4a2fd0445..64469d4932 100644 --- a/tests/providers/opsgenie/operators/test_opsgenie_alert.py +++ b/tests/providers/opsgenie/operators/test_opsgenie_alert.py @@ -41,7 +41,7 @@ class TestOpsgenieAlertOperator(unittest.TestCase): {'id': '80564037-1984-4f38-b98e-8a1f662df552', 'type': 'schedule'}, {'name': 'First Responders Schedule', 'type': 'schedule'} ], - 'visibleTo': [ + 'visible_to': [ {'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'}, {'name': 'rocket_team', 'type': 'team'}, {'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'}, @@ -62,7 +62,7 @@ class TestOpsgenieAlertOperator(unittest.TestCase): 'alias': _config['alias'], 'description': _config['description'], 'responders': _config['responders'], - 'visibleTo': _config['visibleTo'], + 'visible_to': _config['visible_to'], 'actions': _config['actions'], 'tags': _config['tags'], 'details': _config['details'], @@ -106,7 +106,7 @@ class TestOpsgenieAlertOperator(unittest.TestCase): self.assertEqual(self._config['alias'], operator.alias) self.assertEqual(self._config['description'], operator.description) self.assertEqual(self._config['responders'], operator.responders) - self.assertEqual(self._config['visibleTo'], operator.visibleTo) + self.assertEqual(self._config['visible_to'], operator.visible_to) self.assertEqual(self._config['actions'], operator.actions) self.assertEqual(self._config['tags'], operator.tags) self.assertEqual(self._config['details'], operator.details) diff --git a/tests/providers/postgres/hooks/test_postgres.py b/tests/providers/postgres/hooks/test_postgres.py index ed5d8560b0..0bedd972aa 100644 --- a/tests/providers/postgres/hooks/test_postgres.py +++ b/tests/providers/postgres/hooks/test_postgres.py @@ -153,7 +153,7 @@ class TestPostgresHook(unittest.TestCase): self.cur.fetchall.return_value = None - self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=open_mock)) + self.assertEqual(None, self.db_hook.copy_expert(statement, filename)) assert self.conn.close.call_count == 1 assert self.cur.close.call_count == 1