diff --git a/celery/app/amqp.py b/celery/app/amqp.py index 687e0eb80..af053b932 100644 --- a/celery/app/amqp.py +++ b/celery/app/amqp.py @@ -24,7 +24,7 @@ from kombu.utils.functional import maybe_list from celery import signals from celery.five import items, string_t from celery.local import try_import -from celery.utils import anon_nodename +from celery.utils.nodenames import anon_nodename from celery.utils.saferepr import saferepr from celery.utils.text import indent as textindent from celery.utils.timeutils import maybe_make_aware, to_utc diff --git a/celery/app/log.py b/celery/app/log.py index 3c738c578..78ff17f01 100644 --- a/celery/app/log.py +++ b/celery/app/log.py @@ -23,12 +23,13 @@ from kombu.utils.encoding import set_default_encoding_file from celery import signals from celery._state import get_current_task from celery.five import class_property, string_t -from celery.utils import isatty, node_format +from celery.utils import isatty from celery.utils.log import ( get_logger, mlevel, ColorFormatter, LoggingProxy, get_multiprocessing_logger, reset_multiprocessing_logger, ) +from celery.utils.nodenames import node_format from celery.utils.term import colored __all__ = ['TaskFormatter', 'Logging'] diff --git a/celery/app/trace.py b/celery/app/trace.py index 54001427d..f8ec64076 100644 --- a/celery/app/trace.py +++ b/celery/app/trace.py @@ -34,8 +34,8 @@ from celery.app import set_default_app from celery.app.task import Task as BaseTask, Context from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError from celery.five import monotonic -from celery.utils import gethostname from celery.utils.log import get_logger +from celery.utils.nodenames import gethostname from celery.utils.objects import mro_lookup from celery.utils.saferepr import saferepr from celery.utils.serialization import ( diff --git a/celery/bin/base.py b/celery/bin/base.py index 5befcc2df..9eb697630 100644 --- a/celery/bin/base.py +++ b/celery/bin/base.py @@ -24,7 +24,7 @@ from celery.five import ( from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE from celery.utils import term from celery.utils import text -from celery.utils import node_format, host_format +from celery.utils.nodenames import node_format, host_format from celery.utils.imports import symbol_by_name, import_from_cwd try: diff --git a/celery/bin/celeryd_detach.py b/celery/bin/celeryd_detach.py index 1f073491c..81ab86c00 100644 --- a/celery/bin/celeryd_detach.py +++ b/celery/bin/celeryd_detach.py @@ -19,8 +19,8 @@ import sys from optparse import OptionParser, BadOptionError from celery.platforms import EX_FAILURE, detached -from celery.utils import default_nodename, node_format from celery.utils.log import get_logger +from celery.utils.nodenames import default_nodename, node_format from celery.bin.base import daemon_options diff --git a/celery/bin/multi.py b/celery/bin/multi.py index f0c1c4577..9453a3c1d 100644 --- a/celery/bin/multi.py +++ b/celery/bin/multi.py @@ -114,7 +114,9 @@ from celery import VERSION_BANNER from celery.five import items from celery.platforms import Pidfile, IS_WINDOWS from celery.utils import term -from celery.utils import gethostname, host_format, node_format, nodesplit +from celery.utils.nodenames import ( + gethostname, host_format, node_format, nodesplit, +) from celery.utils.text import pluralize __all__ = ['MultiTool'] diff --git a/celery/bin/worker.py b/celery/bin/worker.py index a7e2b3397..80c92c5d8 100644 --- a/celery/bin/worker.py +++ b/celery/bin/worker.py @@ -173,8 +173,8 @@ from celery.bin.base import Command, daemon_options from celery.bin.celeryd_detach import detached_celeryd from celery.five import string_t from celery.platforms import maybe_drop_privileges -from celery.utils import default_nodename from celery.utils.log import LOG_LEVELS, mlevel +from celery.utils.nodenames import default_nodename __all__ = ['worker', 'main'] diff --git a/celery/contrib/migrate.py b/celery/contrib/migrate.py index 54ba7e2d1..d5ad7b46c 100644 --- a/celery/contrib/migrate.py +++ b/celery/contrib/migrate.py @@ -19,7 +19,7 @@ from kombu.utils.encoding import ensure_bytes from celery.app import app_or_default from celery.five import python_2_unicode_compatible, string, string_t -from celery.utils import worker_direct +from celery.utils.nodenames import worker_direct __all__ = [ 'StopFiltering', 'State', 'republish', 'migrate_task', diff --git a/celery/events/__init__.py b/celery/events/__init__.py index fe23e9c0a..c526a767c 100644 --- a/celery/events/__init__.py +++ b/celery/events/__init__.py @@ -22,12 +22,12 @@ from operator import itemgetter from kombu import Exchange, Queue, Producer from kombu.connection import maybe_channel from kombu.mixins import ConsumerMixin -from kombu.utils import cached_property +from kombu.utils import cached_property, uuid from celery.app import app_or_default from celery.five import items -from celery.utils import anon_nodename, uuid from celery.utils.functional import dictfilter +from celery.utils.nodenames import anon_nodename from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms __all__ = ['Events', 'Event', 'EventDispatcher', 'EventReceiver'] diff --git a/celery/tests/bin/test_base.py b/celery/tests/bin/test_base.py index 66c8fd270..39eb30018 100644 --- a/celery/tests/bin/test_base.py +++ b/celery/tests/bin/test_base.py @@ -255,7 +255,7 @@ class test_Command(AppCase): def test_host_format(self): cmd = MockCommand(app=self.app) - with patch('celery.utils.gethostname') as hn: + with patch('celery.utils.nodenames.gethostname') as hn: hn.return_value = 'blacktron.example.com' self.assertEqual(cmd.host_format(''), '') self.assertEqual( diff --git a/celery/tests/utils/test_nodenames.py b/celery/tests/utils/test_nodenames.py new file mode 100644 index 000000000..ce6ab38a7 --- /dev/null +++ b/celery/tests/utils/test_nodenames.py @@ -0,0 +1,18 @@ +from __future__ import absolute_import, unicode_literals + +from kombu import Queue + +from celery.utils import ( + worker_direct, +) + +from celery.tests.case import Case, Mock, patch + + +class test_worker_direct(Case): + + def test_returns_if_queue(self): + q = Queue('foo') + self.assertIs(worker_direct(q), q) + + diff --git a/celery/tests/worker/test_worker.py b/celery/tests/worker/test_worker.py index e1a6e8341..1524bc427 100644 --- a/celery/tests/worker/test_worker.py +++ b/celery/tests/worker/test_worker.py @@ -31,7 +31,7 @@ from celery.worker import state from celery.worker.consumer import Consumer from celery.worker.pidbox import gPidbox from celery.worker.request import Request -from celery.utils import worker_direct +from celery.utils.nodenames import worker_direct from celery.utils.serialization import pickle from celery.utils.timer2 import Timer diff --git a/celery/utils/__init__.py b/celery/utils/__init__.py index 759069e0b..6c691847d 100644 --- a/celery/utils/__init__.py +++ b/celery/utils/__init__.py @@ -10,7 +10,6 @@ from __future__ import absolute_import, print_function, unicode_literals import numbers import os -import socket import sys import traceback import datetime @@ -18,12 +17,11 @@ import datetime from functools import partial from pprint import pprint -from kombu.entity import Exchange, Queue - from celery.five import WhateverIO, items, reraise, string_t -from .functional import memoize -from .text import simple_format +from .functional import memoize # noqa + +from .nodenames import worker_direct, nodename, nodesplit __all__ = ['worker_direct', 'lpmerge', 'is_iterable', 'isatty', 'cry', 'maybe_reraise', 'strtobool', @@ -38,37 +36,6 @@ PY3 = sys.version_info[0] == 3 #: task to be that of ``App.main``. MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE') -#: Exchange for worker direct queues. -WORKER_DIRECT_EXCHANGE = Exchange('C.dq2') - -#: Format for worker direct queue names. -WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2' - -#: Separator for worker node name and hostname. -NODENAME_SEP = '@' - -NODENAME_DEFAULT = 'celery' - -gethostname = memoize(1, Cache=dict)(socket.gethostname) - - -def worker_direct(hostname): - """Return :class:`kombu.Queue` that is a direct route to - a worker by hostname. - - :param hostname: The fully qualified node name of a worker - (e.g. ``w1@example.com``). If passed a - :class:`kombu.Queue` instance it will simply return - that instead. - """ - if isinstance(hostname, Queue): - return hostname - return Queue( - WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname), - WORKER_DIRECT_EXCHANGE, - hostname, - ) - def lpmerge(L, R): """In place left precedent dictionary merge. @@ -217,53 +184,6 @@ def gen_task_name(app, name, module_name): return '.'.join(p for p in (module_name, name) if p) -def nodename(name, hostname): - """Create node name from name/hostname pair.""" - return NODENAME_SEP.join((name, hostname)) - - -def anon_nodename(hostname=None, prefix='gen'): - return nodename(''.join([prefix, str(os.getpid())]), - hostname or gethostname()) - - -def nodesplit(nodename): - """Split node name into tuple of name/hostname.""" - parts = nodename.split(NODENAME_SEP, 1) - if len(parts) == 1: - return None, parts[0] - return parts - - -def default_nodename(hostname): - name, host = nodesplit(hostname or '') - return nodename(name or NODENAME_DEFAULT, host or gethostname()) - - -def node_format(s, nodename, **extra): - name, host = nodesplit(nodename) - return host_format( - s, host, name or NODENAME_DEFAULT, p=nodename, **extra) - - -def _fmt_process_index(prefix='', default='0'): - from .log import current_process_index - index = current_process_index() - return '{0}{1}'.format(prefix, index) if index else default -_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '') - - -def host_format(s, host=None, name=None, **extra): - host = host or gethostname() - hname, _, domain = host.partition('.') - name = name or hname - keys = dict({ - 'h': host, 'n': name, 'd': domain, - 'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix, - }, **extra) - return simple_format(s, keys) - - # ------------------------------------------------------------------------ # # > XXX Compat from .log import LOG_LEVELS # noqa diff --git a/celery/utils/nodenames.py b/celery/utils/nodenames.py new file mode 100644 index 000000000..371f2c006 --- /dev/null +++ b/celery/utils/nodenames.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +""" + celery.utils.nodenames + ~~~~~~~~~~~~~~~~~~~~~~ + + Worker name utilities. + +""" +from __future__ import absolute_import, unicode_literals + +import os +import socket + +from functools import partial + +from kombu.entity import Exchange, Queue + +from .functional import memoize +from .text import simple_format + +#: Exchange for worker direct queues. +WORKER_DIRECT_EXCHANGE = Exchange('C.dq2') + +#: Format for worker direct queue names. +WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2' + +#: Separator for worker node name and hostname. +NODENAME_SEP = '@' + +NODENAME_DEFAULT = 'celery' + +gethostname = memoize(1, Cache=dict)(socket.gethostname) + +__all__ = [ + 'worker_direct', 'gethostname', 'nodename', + 'anon_nodename', 'nodesplit', 'default_nodename', + 'node_format', 'host_format', +] + + +def worker_direct(hostname): + """Return :class:`kombu.Queue` that is a direct route to + a worker by hostname. + + :param hostname: The fully qualified node name of a worker + (e.g. ``w1@example.com``). If passed a + :class:`kombu.Queue` instance it will simply return + that instead. + """ + if isinstance(hostname, Queue): + return hostname + return Queue( + WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname), + WORKER_DIRECT_EXCHANGE, + hostname, + ) + + +def nodename(name, hostname): + """Create node name from name/hostname pair.""" + return NODENAME_SEP.join((name, hostname)) + + +def anon_nodename(hostname=None, prefix='gen'): + return nodename(''.join([prefix, str(os.getpid())]), + hostname or gethostname()) + + +def nodesplit(nodename): + """Split node name into tuple of name/hostname.""" + parts = nodename.split(NODENAME_SEP, 1) + if len(parts) == 1: + return None, parts[0] + return parts + + +def default_nodename(hostname): + name, host = nodesplit(hostname or '') + return nodename(name or NODENAME_DEFAULT, host or gethostname()) + + +def node_format(s, nodename, **extra): + name, host = nodesplit(nodename) + return host_format( + s, host, name or NODENAME_DEFAULT, p=nodename, **extra) + + +def _fmt_process_index(prefix='', default='0'): + from .log import current_process_index + index = current_process_index() + return '{0}{1}'.format(prefix, index) if index else default +_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '') + + +def host_format(s, host=None, name=None, **extra): + host = host or gethostname() + hname, _, domain = host.partition('.') + name = name or hname + keys = dict({ + 'h': host, 'n': name, 'd': domain, + 'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix, + }, **extra) + return simple_format(s, keys) diff --git a/celery/worker/__init__.py b/celery/worker/__init__.py index be296a02b..3881397f0 100644 --- a/celery/worker/__init__.py +++ b/celery/worker/__init__.py @@ -32,9 +32,9 @@ from celery.exceptions import ( ) from celery.five import python_2_unicode_compatible, values from celery.platforms import EX_FAILURE, create_pidlock -from celery.utils import default_nodename, worker_direct from celery.utils.imports import reload_from_cwd from celery.utils.log import mlevel, worker_logger as logger +from celery.utils.nodenames import default_nodename, worker_direct from celery.utils.text import str_to_list from celery.utils.threads import default_socket_timeout diff --git a/celery/worker/consumer/consumer.py b/celery/worker/consumer/consumer.py index 9cbac35d4..a51abf2dd 100644 --- a/celery/worker/consumer/consumer.py +++ b/celery/worker/consumer/consumer.py @@ -30,9 +30,9 @@ from celery import signals from celery.app.trace import build_tracer from celery.exceptions import InvalidTaskError, NotRegistered from celery.five import buffer_t, items, python_2_unicode_compatible, values -from celery.utils import gethostname from celery.utils.functional import noop from celery.utils.log import get_logger +from celery.utils.nodenames import gethostname from celery.utils.objects import Bunch from celery.utils.text import truncate from celery.utils.timeutils import humanize_seconds, rate diff --git a/celery/worker/request.py b/celery/worker/request.py index 5d8c668c9..c3821722b 100644 --- a/celery/worker/request.py +++ b/celery/worker/request.py @@ -16,6 +16,7 @@ from datetime import datetime from weakref import ref from billiard.common import TERM_SIGNAME +from kombu.utils import cached_property from kombu.utils.encoding import safe_repr, safe_str from celery import signals @@ -27,9 +28,9 @@ from celery.exceptions import ( ) from celery.five import python_2_unicode_compatible, string from celery.platforms import signals as _signals -from celery.utils import cached_property, gethostname from celery.utils.functional import noop from celery.utils.log import get_logger +from celery.utils.nodenames import gethostname from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware from celery.utils.serialization import get_pickled_exception diff --git a/docs/internals/reference/celery.utils.nodenames.rst b/docs/internals/reference/celery.utils.nodenames.rst new file mode 100644 index 000000000..d0affbbe2 --- /dev/null +++ b/docs/internals/reference/celery.utils.nodenames.rst @@ -0,0 +1,11 @@ +========================================== + ``celery.utils.nodenames`` +========================================== + +.. contents:: + :local: +.. currentmodule:: celery.utils.nodenames + +.. automodule:: celery.utils.nodenames + :members: + :undoc-members: diff --git a/docs/internals/reference/index.rst b/docs/internals/reference/index.rst index 45ea70713..266176c4a 100644 --- a/docs/internals/reference/index.rst +++ b/docs/internals/reference/index.rst @@ -51,6 +51,7 @@ celery.utils celery.utils.abstract celery.utils.collections + celery.utils.nodenames celery.utils.deprecated celery.utils.functional celery.utils.graph