diff --git a/README.rst b/README.rst index 35bfee0a0..50bd4cdd6 100644 --- a/README.rst +++ b/README.rst @@ -141,7 +141,7 @@ It supports... - **Concurrency** - - Prefork, Eventlet_, gevent_, threads/single threaded + - Prefork, Eventlet_, gevent_, single threaded (``solo``) - **Result Stores** diff --git a/celery/app/defaults.py b/celery/app/defaults.py index e4e989137..8c964e65b 100644 --- a/celery/app/defaults.py +++ b/celery/app/defaults.py @@ -24,7 +24,7 @@ is_pypy = hasattr(sys, 'pypy_version_info') DEFAULT_POOL = 'prefork' if is_jython: - DEFAULT_POOL = 'threads' + DEFAULT_POOL = 'solo' elif is_pypy: if sys.pypy_version_info[0:3] < (1, 5, 0): DEFAULT_POOL = 'solo' diff --git a/celery/bin/worker.py b/celery/bin/worker.py index 80c92c5d8..e47921a72 100644 --- a/celery/bin/worker.py +++ b/celery/bin/worker.py @@ -18,7 +18,7 @@ The :program:`celery worker` command (previously known as ``celeryd``) Pool implementation: - prefork (default), eventlet, gevent, solo or threads. + prefork (default), eventlet, gevent or solo. .. cmdoption:: -n, --hostname diff --git a/celery/concurrency/__init__.py b/celery/concurrency/__init__.py index 1aa949af3..f7cd8ab66 100644 --- a/celery/concurrency/__init__.py +++ b/celery/concurrency/__init__.py @@ -19,7 +19,6 @@ ALIASES = { 'prefork': 'celery.concurrency.prefork:TaskPool', 'eventlet': 'celery.concurrency.eventlet:TaskPool', 'gevent': 'celery.concurrency.gevent:TaskPool', - 'threads': 'celery.concurrency.threads:TaskPool', 'solo': 'celery.concurrency.solo:TaskPool', 'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias } diff --git a/celery/concurrency/threads.py b/celery/concurrency/threads.py deleted file mode 100644 index 4453c3ae5..000000000 --- a/celery/concurrency/threads.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- -""" - celery.concurrency.threads - ~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Pool implementation using threads. - -""" -from __future__ import absolute_import, unicode_literals - -from celery.five import UserDict - -from .base import apply_target, BasePool - -__all__ = ['TaskPool'] - - -class NullDict(UserDict): - - def __setitem__(self, key, value): - pass - - -class TaskPool(BasePool): - - def __init__(self, *args, **kwargs): - try: - import threadpool - except ImportError: - raise ImportError( - 'The threaded pool requires the threadpool module.') - self.WorkRequest = threadpool.WorkRequest - self.ThreadPool = threadpool.ThreadPool - super(TaskPool, self).__init__(*args, **kwargs) - - def on_start(self): - # make sure all threads have the same current_app. - self.app.set_default() - - self._pool = self.ThreadPool(self.limit) - # threadpool stores all work requests until they are processed - # we don't need this dict, and it occupies way too much memory. - self._pool.workRequests = NullDict() - self._quick_put = self._pool.putRequest - self._quick_clear = self._pool._results_queue.queue.clear - - def on_stop(self): - self._pool.dismissWorkers(self.limit, do_join=True) - - def on_apply(self, target, args=None, kwargs=None, callback=None, - accept_callback=None, **_): - req = self.WorkRequest(apply_target, (target, args, kwargs, callback, - accept_callback)) - self._quick_put(req) - # threadpool also has callback support, - # but for some reason the callback is not triggered - # before you've collected the results. - # Clear the results (if any), so it doesn't grow too large. - self._quick_clear() - return req diff --git a/celery/tests/app/test_defaults.py b/celery/tests/app/test_defaults.py index c0edb87a1..514f3de97 100644 --- a/celery/tests/app/test_defaults.py +++ b/celery/tests/app/test_defaults.py @@ -54,10 +54,6 @@ class test_defaults(AppCase): for key in _TO_OLD_KEY: self.assertIn(key, SETTING_KEYS) - @mock.sys_platform('java 1.6.51') - def test_default_pool_jython(self): - self.assertEqual(self.defaults.DEFAULT_POOL, 'threads') - def test_find(self): find = self.defaults.find diff --git a/celery/tests/concurrency/test_threads.py b/celery/tests/concurrency/test_threads.py deleted file mode 100644 index 6ec14f997..000000000 --- a/celery/tests/concurrency/test_threads.py +++ /dev/null @@ -1,60 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -from celery.concurrency.threads import NullDict, TaskPool, apply_target - -from celery.tests.case import AppCase, Case, Mock, mock - - -class test_NullDict(Case): - - def test_setitem(self): - x = NullDict() - x['foo'] = 1 - with self.assertRaises(KeyError): - x['foo'] - - -class test_TaskPool(AppCase): - - def test_without_threadpool(self): - - with mock.mask_modules('threadpool'): - with self.assertRaises(ImportError): - TaskPool(app=self.app) - - def test_with_threadpool(self): - with mock.module('threadpool'): - x = TaskPool(app=self.app) - self.assertTrue(x.ThreadPool) - self.assertTrue(x.WorkRequest) - - def test_on_start(self): - with mock.module('threadpool'): - x = TaskPool(app=self.app) - x.on_start() - self.assertTrue(x._pool) - self.assertIsInstance(x._pool.workRequests, NullDict) - - def test_on_stop(self): - with mock.module('threadpool'): - x = TaskPool(app=self.app) - x.on_start() - x.on_stop() - x._pool.dismissWorkers.assert_called_with(x.limit, do_join=True) - - def test_on_apply(self): - with mock.module('threadpool'): - x = TaskPool(app=self.app) - x.on_start() - callback = Mock() - accept_callback = Mock() - target = Mock() - req = x.on_apply(target, args=(1, 2), kwargs={'a': 10}, - callback=callback, - accept_callback=accept_callback) - x.WorkRequest.assert_called_with( - apply_target, - (target, (1, 2), {'a': 10}, callback, accept_callback), - ) - x._pool.putRequest.assert_called_with(req) - x._pool._results_queue.queue.clear.assert_called_with() diff --git a/docs/getting-started/introduction.rst b/docs/getting-started/introduction.rst index be4fbef13..38d61bb5d 100644 --- a/docs/getting-started/introduction.rst +++ b/docs/getting-started/introduction.rst @@ -133,7 +133,7 @@ Celery is… - prefork (multiprocessing), - Eventlet_, gevent_ - - threads/single threaded + - `solo` (single threaded) - **Result Stores** diff --git a/docs/getting-started/next-steps.rst b/docs/getting-started/next-steps.rst index b2ad33f86..f085492d6 100644 --- a/docs/getting-started/next-steps.rst +++ b/docs/getting-started/next-steps.rst @@ -111,7 +111,7 @@ of CPU's is rarely effective, and likely to degrade performance instead. Including the default prefork pool, Celery also supports using -Eventlet, Gevent, and threads (see :ref:`concurrency`). +Eventlet, Gevent, and running in a single thread (see :ref:`concurrency`). -- *Events* is an option that when enabled causes Celery to send monitoring messages (events) for actions occurring in the worker. diff --git a/docs/includes/installation.txt b/docs/includes/installation.txt index 7e4756ade..df38ebd49 100644 --- a/docs/includes/installation.txt +++ b/docs/includes/installation.txt @@ -59,9 +59,6 @@ Concurrency :``celery[gevent]``: for using the :pypi:`gevent` pool. -:``celery[threads]``: - for using the thread pool. - Transports and Backends ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index 35ef1487a..241f2def3 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -133,7 +133,7 @@ It supports… - **Concurrency** - - Prefork, Eventlet_, gevent_, threads/single threaded + - Prefork, Eventlet_, gevent_, single threaded (``solo``) - **Result Stores** diff --git a/docs/internals/guide.rst b/docs/internals/guide.rst index 02bd3908c..de7535008 100644 --- a/docs/internals/guide.rst +++ b/docs/internals/guide.rst @@ -268,7 +268,7 @@ Module Overview - celery.concurrency - Execution pool implementations (prefork, eventlet, gevent, threads). + Execution pool implementations (prefork, eventlet, gevent, solo). - celery.db diff --git a/docs/internals/reference/celery.concurrency.threads.rst b/docs/internals/reference/celery.concurrency.threads.rst deleted file mode 100644 index e5bddfb14..000000000 --- a/docs/internals/reference/celery.concurrency.threads.rst +++ /dev/null @@ -1,11 +0,0 @@ -=================================================================== - ``celery.concurrency.threads``‡ (**minefield**) -=================================================================== - -.. contents:: - :local: -.. currentmodule:: celery.concurrency.threads - -.. automodule:: celery.concurrency.threads - :members: - :undoc-members: diff --git a/docs/internals/reference/index.rst b/docs/internals/reference/index.rst index 266176c4a..a5206fdca 100644 --- a/docs/internals/reference/index.rst +++ b/docs/internals/reference/index.rst @@ -19,7 +19,6 @@ celery.concurrency.eventlet celery.concurrency.gevent celery.concurrency.base - celery.concurrency.threads celery.backends celery.backends.base celery.backends.async diff --git a/docs/userguide/workers.rst b/docs/userguide/workers.rst index 471e544fa..4b98706ef 100644 --- a/docs/userguide/workers.rst +++ b/docs/userguide/workers.rst @@ -232,7 +232,7 @@ Remote control commands from the command-line. It supports all of the commands listed below. See :ref:`monitoring-control` for more information. -:pool support: *prefork, eventlet, gevent*, blocking:*threads/solo* (see note) +:pool support: *prefork, eventlet, gevent*, blocking:*solo* (see note) :broker support: *amqp* Workers have the ability to be remote controlled using a high-priority @@ -255,7 +255,7 @@ to the number of destination hosts. .. note:: - The solo and threads pool supports remote control commands, + The ``solo`` pool supports remote control commands, but any task executing will block any waiting control command, so it is of limited use if the worker is very busy. In that case you must increase the timeout waiting for replies in the client. diff --git a/extra/bash-completion/celery.bash b/extra/bash-completion/celery.bash index dd2952fee..5124a3425 100644 --- a/extra/bash-completion/celery.bash +++ b/extra/bash-completion/celery.bash @@ -16,7 +16,7 @@ _celery() dopts="--detach --umask= --gid= --uid= --pidfile= --logfile= --loglevel= --executable=" controlargs="--timeout --destination" - pools="prefork eventlet gevent threads solo" + pools="prefork eventlet gevent solo" loglevels="critical error warning info debug" in_opt=0 diff --git a/extra/zsh-completion/celery.zsh b/extra/zsh-completion/celery.zsh index 5dd964431..712c9493b 100644 --- a/extra/zsh-completion/celery.zsh +++ b/extra/zsh-completion/celery.zsh @@ -40,7 +40,7 @@ case "$words[1]" in worker) _arguments \ '(-C --concurrency=)'{-C,--concurrency=}'[Number of child processes processing the queue. The default is the number of CPUs.]' \ - '(--pool)--pool=:::(prefork eventlet gevent threads solo)' \ + '(--pool)--pool=:::(prefork eventlet gevent solo)' \ '(--purge --discard)'{--discard,--purge}'[Purges all waiting tasks before the daemon is started.]' \ '(-f --logfile=)'{-f,--logfile=}'[Path to log file. If no logfile is specified, stderr is used.]' \ '(--loglevel=)--loglevel=:::(critical error warning info debug)' \ diff --git a/requirements/extras/threads.txt b/requirements/extras/threads.txt deleted file mode 100644 index c88d74e56..000000000 --- a/requirements/extras/threads.txt +++ /dev/null @@ -1 +0,0 @@ -threadpool diff --git a/requirements/jython.txt b/requirements/jython.txt index 16a2ad15f..fee61a17d 100644 --- a/requirements/jython.txt +++ b/requirements/jython.txt @@ -1,2 +1 @@ multiprocessing --r extras/threads.txt diff --git a/setup.py b/setup.py index 79143e017..738c1f02a 100644 --- a/setup.py +++ b/setup.py @@ -193,7 +193,7 @@ def extras(*p): # Celery specific features = set([ 'auth', 'cassandra', 'elasticsearch', 'memcache', 'pymemcache', - 'couchbase', 'threads', 'eventlet', 'gevent', 'msgpack', 'yaml', + 'couchbase', 'eventlet', 'gevent', 'msgpack', 'yaml', 'redis', 'sqs', 'couchdb', 'riak', 'zookeeper', 'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib', 'consul' ])