[AIRFLOW-871] change logging.warn() into warning()

This silences deprecation warnings, e.g.

airflow/airflow/utils/dag_processing.py:578:
DeprecationWarning: The
'warn' method is deprecated, use 'warning' instead

Closes #2082 from imbaczek/bug871
This commit is contained in:
Marek Baczynski 2017-02-18 11:12:14 -05:00 коммит произвёл Jeremiah Lowin
Родитель baa4cd6806
Коммит 21d775a9a4
9 изменённых файлов: 75 добавлений и 74 удалений

Просмотреть файл

@ -65,7 +65,7 @@ def init_app(app):
logging.info("Kerberos init: {} {}".format(service, hostname))
principal = kerberos.getServerPrincipalDetails(service, hostname)
except kerberos.KrbError as err:
logging.warn("Kerberos: {}".format(err))
logging.warning("Kerberos: {}".format(err))
else:
logging.info("Kerberos API: server is {}".format(principal))

Просмотреть файл

@ -72,7 +72,7 @@ def get_ldap_connection(dn=None, password=None):
def group_contains_user(conn, search_base, group_filter, user_name_attr, username):
search_filter = '(&({0}))'.format(group_filter)
if not conn.search(search_base, search_filter, attributes=[user_name_attr]):
LOG.warn("Unable to find group for %s %s", search_base, search_filter)
LOG.warning("Unable to find group for %s %s", search_base, search_filter)
else:
for resp in conn.response:
if (
@ -93,7 +93,7 @@ def groups_user(conn, search_base, user_filter, user_name_att, username):
raise AuthenticationError("Invalid username or password")
if conn.response and "memberOf" not in conn.response[0]["attributes"]:
LOG.warn("""Missing attribute "memberOf" when looked-up in Ldap database.
LOG.warning("""Missing attribute "memberOf" when looked-up in Ldap database.
The user does not seem to be a member of a group and therefore won't see any dag
if the option filter_by_owner=True and owner_mode=ldapgroup are set""")
return []

Просмотреть файл

@ -167,7 +167,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
except KeyError:
# The map may not contain an item if the framework re-registered after a failover.
# Discard these tasks.
logging.warn("Unrecognised task key %s" % update.task_id.value)
logging.warning("Unrecognised task key %s" % update.task_id.value)
return
if update.state == mesos_pb2.TASK_FINISHED:

Просмотреть файл

@ -95,8 +95,8 @@ class CgroupTaskRunner(BaseTaskRunner):
for path_element in path_split:
name_to_node = {x.name: x for x in node.children}
if path_element not in name_to_node:
self.logger.warn("Cgroup does not exist: {}"
.format(path))
self.logger.warning("Cgroup does not exist: {}"
.format(path))
return
else:
node = name_to_node[path_element]
@ -164,11 +164,11 @@ class CgroupTaskRunner(BaseTaskRunner):
# I wasn't able to track down the root cause of the package install failures, but
# we might want to revisit that approach at some other point.
if return_code == 137:
self.logger.warn("Task failed with return code of 137. This may indicate "
"that it was killed due to excessive memory usage. "
"Please consider optimizing your task or using the "
"resources argument to reserve more memory for your "
"task")
self.logger.warning("Task failed with return code of 137. This may indicate "
"that it was killed due to excessive memory usage. "
"Please consider optimizing your task or using the "
"resources argument to reserve more memory for your "
"task")
return return_code
def terminate(self):

Просмотреть файл

@ -369,7 +369,7 @@ class DagFileProcessor(AbstractDagFileProcessor):
# Arbitrarily wait 5s for the process to die
self._process.join(5)
if sigkill and self._process.is_alive():
logging.warn("Killing PID %s", self._process.pid)
logging.warning("Killing PID %s", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)
@property
@ -541,7 +541,7 @@ class SchedulerJob(BaseJob):
"""
if not any([ti.sla for ti in dag.tasks]):
self.logger.info("Skipping SLA check for {} because "
"no tasks in DAG have SLAs".format(dag))
"no tasks in DAG have SLAs".format(dag))
return
TI = models.TaskInstance
@ -923,25 +923,25 @@ class SchedulerJob(BaseJob):
)
if len(dag_runs) == 0:
self.logger.warn("DagRun for %s %s does not exist",
task_instance.dag_id,
task_instance.execution_date)
self.logger.warning("DagRun for %s %s does not exist",
task_instance.dag_id,
task_instance.execution_date)
continue
# There should only be one DAG run. Add some logging info if this
# is not the case for later debugging.
if len(dag_runs) > 1:
self.logger.warn("Multiple DagRuns found for {} {}: {}"
.format(task_instance.dag_id,
task_instance.execution_date,
dag_runs))
self.logger.warning("Multiple DagRuns found for {} {}: {}"
.format(task_instance.dag_id,
task_instance.execution_date,
dag_runs))
if not any(dag_run.state == State.RUNNING for dag_run in dag_runs):
self.logger.warn("Setting {} to state={} as it does not have "
"a DagRun in the {} state"
.format(task_instance,
new_state,
State.RUNNING))
self.logger.warning("Setting {} to state={} as it does not have "
"a DagRun in the {} state"
.format(task_instance,
new_state,
State.RUNNING))
task_instance.state = new_state
session.merge(task_instance)
session.commit()
@ -1534,7 +1534,7 @@ class SchedulerJob(BaseJob):
.format(dagbag.dags.keys(),
file_path))
else:
self.logger.warn("No viable dags retrieved from {}".format(file_path))
self.logger.warning("No viable dags retrieved from {}".format(file_path))
self.update_import_errors(session, dagbag)
return []
@ -1836,17 +1836,17 @@ class BackfillJob(BaseJob):
# If the set of tasks that aren't ready ever equals the set of
# tasks to run, then the backfill is deadlocked
if not_ready and not_ready == set(tasks_to_run):
self.logger.warn("Deadlock discovered for tasks_to_run={}"
.format(tasks_to_run.values()))
self.logger.warning("Deadlock discovered for tasks_to_run={}"
.format(tasks_to_run.values()))
deadlocked.update(tasks_to_run.values())
tasks_to_run.clear()
# Reacting to events
for key, state in list(executor.get_event_buffer().items()):
if key not in tasks_to_run:
self.logger.warn("{} state {} not in tasks_to_run={}"
.format(key, state,
tasks_to_run.values()))
self.logger.warning("{} state {} not in tasks_to_run={}"
.format(key, state,
tasks_to_run.values()))
continue
ti = tasks_to_run[key]
ti.refresh_from_db()

Просмотреть файл

@ -1308,7 +1308,7 @@ class TaskInstance(Base):
# the current worker process was blocked on refresh_from_db
if self.state == State.RUNNING:
msg = "Task Instance already running {}".format(self)
logging.warn(msg)
logging.warning(msg)
session.commit()
return

Просмотреть файл

@ -307,6 +307,7 @@ class DagFileProcessorManager(LoggingMixin):
:type _last_runtime: dict[unicode, float]
:type _last_finish_time: dict[unicode, datetime]
"""
def __init__(self,
dag_directory,
file_paths,
@ -442,7 +443,7 @@ class DagFileProcessorManager(LoggingMixin):
if file_path in new_file_paths:
filtered_processors[file_path] = processor
else:
self.logger.warn("Stopping processor for {}".format(file_path))
self.logger.warning("Stopping processor for {}".format(file_path))
processor.stop()
self._processors = filtered_processors
@ -477,7 +478,7 @@ class DagFileProcessorManager(LoggingMixin):
"""
now = datetime.now()
return os.path.join(self._child_process_log_directory,
now.strftime("%Y-%m-%d"))
now.strftime("%Y-%m-%d"))
def _get_log_file_path(self, dag_file_path):
"""
@ -518,8 +519,8 @@ class DagFileProcessorManager(LoggingMixin):
os.symlink(log_directory, latest_log_directory_path)
elif (os.path.isdir(latest_log_directory_path) or
os.path.isfile(latest_log_directory_path)):
self.logger.warn("{} already exists as a dir/file. "
"Skip creating symlink."
self.logger.warning("{} already exists as a dir/file. "
"Skip creating symlink."
.format(latest_log_directory_path))
else:
os.symlink(log_directory, latest_log_directory_path)
@ -571,11 +572,11 @@ class DagFileProcessorManager(LoggingMixin):
simple_dags = []
for file_path, processor in finished_processors.items():
if processor.result is None:
self.logger.warn("Processor for {} exited with return code "
"{}. See {} for details."
.format(processor.file_path,
processor.exit_code,
processor.log_file))
self.logger.warning("Processor for {} exited with return code "
"{}. See {} for details."
.format(processor.file_path,
processor.exit_code,
processor.log_file))
else:
for simple_dag in processor.result:
simple_dags.append(simple_dag)

Просмотреть файл

@ -206,7 +206,7 @@ def kill_process_tree(logger, pid):
try:
root_process = psutil.Process(pid)
except psutil.NoSuchProcess:
logger.warn("PID: {} does not exist".format(pid))
logger.warning("PID: {} does not exist".format(pid))
return
# Check child processes to reduce cases where a child process died but
@ -215,26 +215,26 @@ def kill_process_tree(logger, pid):
if x.is_running()]
if len(descendant_processes) != 0:
logger.warn("Terminating descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
logger.warning("Terminating descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
temp_processes = descendant_processes[:]
for descendant in temp_processes:
logger.warn("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
logger.warning("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
try:
kill_using_shell(descendant.pid, signal.SIGTERM)
except psutil.NoSuchProcess:
descendant_processes.remove(descendant)
logger.warn("Waiting up to {}s for processes to exit..."
.format(TIME_TO_WAIT_AFTER_SIGTERM))
logger.warning("Waiting up to {}s for processes to exit..."
.format(TIME_TO_WAIT_AFTER_SIGTERM))
try:
psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
logger.warn("Done waiting")
logger.warning("Done waiting")
except psutil.TimeoutExpired:
logger.warn("Ran out of time while waiting for "
"processes to exit")
logger.warning("Ran out of time while waiting for "
"processes to exit")
# Then SIGKILL
descendant_processes = [x for x in root_process.children(recursive=True)
if x.is_running()]
@ -242,16 +242,16 @@ def kill_process_tree(logger, pid):
if len(descendant_processes) > 0:
temp_processes = descendant_processes[:]
for descendant in temp_processes:
logger.warn("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
logger.warning("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
try:
kill_using_shell(descendant.pid, signal.SIGTERM)
descendant.wait()
except psutil.NoSuchProcess:
descendant_processes.remove(descendant)
logger.warn("Killed all descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
logger.warning("Killed all descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
else:
logger.debug("There are no descendant processes to kill")
@ -279,27 +279,27 @@ def kill_descendant_processes(logger, pids_to_kill=None):
if len(descendant_processes) == 0:
logger.debug("There are no descendant processes that can be killed")
return
logger.warn("Terminating descendant processes of {} PID: {}"
.format(this_process.cmdline(),
this_process.pid))
logger.warning("Terminating descendant processes of {} PID: {}"
.format(this_process.cmdline(),
this_process.pid))
temp_processes = descendant_processes[:]
for descendant in temp_processes:
try:
logger.warn("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
logger.warning("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
descendant.terminate()
except psutil.NoSuchProcess:
descendant_processes.remove(descendant)
logger.warn("Waiting up to {}s for processes to exit..."
.format(TIME_TO_WAIT_AFTER_SIGTERM))
logger.warning("Waiting up to {}s for processes to exit..."
.format(TIME_TO_WAIT_AFTER_SIGTERM))
try:
psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
logger.warn("Done waiting")
logger.warning("Done waiting")
except psutil.TimeoutExpired:
logger.warn("Ran out of time while waiting for "
"processes to exit")
logger.warning("Ran out of time while waiting for "
"processes to exit")
# Then SIGKILL
descendant_processes = [x for x in this_process.children(recursive=True)
if x.is_running()]
@ -309,16 +309,16 @@ def kill_descendant_processes(logger, pids_to_kill=None):
if len(descendant_processes) > 0:
for descendant in descendant_processes:
logger.warn("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
logger.warning("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
try:
descendant.kill()
descendant.wait()
except psutil.NoSuchProcess:
pass
logger.warn("Killed all descendant processes of {} PID: {}"
.format(this_process.cmdline(),
this_process.pid))
logger.warning("Killed all descendant processes of {} PID: {}"
.format(this_process.cmdline(),
this_process.pid))
class AirflowImporter(object):

Просмотреть файл

@ -68,10 +68,10 @@ def git_version(version):
import git
repo = git.Repo('.git')
except ImportError:
logger.warn('gitpython not found: Cannot compute the git version.')
logger.warning('gitpython not found: Cannot compute the git version.')
return ''
except Exception as e:
logger.warn('Git repo not found: Cannot compute the git version.')
logger.warning('Git repo not found: Cannot compute the git version.')
return ''
if repo:
sha = repo.head.commit.hexsha