Removes the experimental threads pool

This commit is contained in:
Ask Solem 2016-06-29 14:54:00 -07:00
Родитель 4b7af113fa
Коммит 586f3d3182
20 изменённых файлов: 12 добавлений и 154 удалений

Просмотреть файл

@ -141,7 +141,7 @@ It supports...
- **Concurrency** - **Concurrency**
- Prefork, Eventlet_, gevent_, threads/single threaded - Prefork, Eventlet_, gevent_, single threaded (``solo``)
- **Result Stores** - **Result Stores**

Просмотреть файл

@ -24,7 +24,7 @@ is_pypy = hasattr(sys, 'pypy_version_info')
DEFAULT_POOL = 'prefork' DEFAULT_POOL = 'prefork'
if is_jython: if is_jython:
DEFAULT_POOL = 'threads' DEFAULT_POOL = 'solo'
elif is_pypy: elif is_pypy:
if sys.pypy_version_info[0:3] < (1, 5, 0): if sys.pypy_version_info[0:3] < (1, 5, 0):
DEFAULT_POOL = 'solo' DEFAULT_POOL = 'solo'

Просмотреть файл

@ -18,7 +18,7 @@ The :program:`celery worker` command (previously known as ``celeryd``)
Pool implementation: Pool implementation:
prefork (default), eventlet, gevent, solo or threads. prefork (default), eventlet, gevent or solo.
.. cmdoption:: -n, --hostname .. cmdoption:: -n, --hostname

Просмотреть файл

@ -19,7 +19,6 @@ ALIASES = {
'prefork': 'celery.concurrency.prefork:TaskPool', 'prefork': 'celery.concurrency.prefork:TaskPool',
'eventlet': 'celery.concurrency.eventlet:TaskPool', 'eventlet': 'celery.concurrency.eventlet:TaskPool',
'gevent': 'celery.concurrency.gevent:TaskPool', 'gevent': 'celery.concurrency.gevent:TaskPool',
'threads': 'celery.concurrency.threads:TaskPool',
'solo': 'celery.concurrency.solo:TaskPool', 'solo': 'celery.concurrency.solo:TaskPool',
'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias 'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias
} }

Просмотреть файл

@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
"""
celery.concurrency.threads
~~~~~~~~~~~~~~~~~~~~~~~~~~
Pool implementation using threads.
"""
from __future__ import absolute_import, unicode_literals
from celery.five import UserDict
from .base import apply_target, BasePool
__all__ = ['TaskPool']
class NullDict(UserDict):
def __setitem__(self, key, value):
pass
class TaskPool(BasePool):
def __init__(self, *args, **kwargs):
try:
import threadpool
except ImportError:
raise ImportError(
'The threaded pool requires the threadpool module.')
self.WorkRequest = threadpool.WorkRequest
self.ThreadPool = threadpool.ThreadPool
super(TaskPool, self).__init__(*args, **kwargs)
def on_start(self):
# make sure all threads have the same current_app.
self.app.set_default()
self._pool = self.ThreadPool(self.limit)
# threadpool stores all work requests until they are processed
# we don't need this dict, and it occupies way too much memory.
self._pool.workRequests = NullDict()
self._quick_put = self._pool.putRequest
self._quick_clear = self._pool._results_queue.queue.clear
def on_stop(self):
self._pool.dismissWorkers(self.limit, do_join=True)
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
req = self.WorkRequest(apply_target, (target, args, kwargs, callback,
accept_callback))
self._quick_put(req)
# threadpool also has callback support,
# but for some reason the callback is not triggered
# before you've collected the results.
# Clear the results (if any), so it doesn't grow too large.
self._quick_clear()
return req

Просмотреть файл

@ -54,10 +54,6 @@ class test_defaults(AppCase):
for key in _TO_OLD_KEY: for key in _TO_OLD_KEY:
self.assertIn(key, SETTING_KEYS) self.assertIn(key, SETTING_KEYS)
@mock.sys_platform('java 1.6.51')
def test_default_pool_jython(self):
self.assertEqual(self.defaults.DEFAULT_POOL, 'threads')
def test_find(self): def test_find(self):
find = self.defaults.find find = self.defaults.find

Просмотреть файл

@ -1,60 +0,0 @@
from __future__ import absolute_import, unicode_literals
from celery.concurrency.threads import NullDict, TaskPool, apply_target
from celery.tests.case import AppCase, Case, Mock, mock
class test_NullDict(Case):
def test_setitem(self):
x = NullDict()
x['foo'] = 1
with self.assertRaises(KeyError):
x['foo']
class test_TaskPool(AppCase):
def test_without_threadpool(self):
with mock.mask_modules('threadpool'):
with self.assertRaises(ImportError):
TaskPool(app=self.app)
def test_with_threadpool(self):
with mock.module('threadpool'):
x = TaskPool(app=self.app)
self.assertTrue(x.ThreadPool)
self.assertTrue(x.WorkRequest)
def test_on_start(self):
with mock.module('threadpool'):
x = TaskPool(app=self.app)
x.on_start()
self.assertTrue(x._pool)
self.assertIsInstance(x._pool.workRequests, NullDict)
def test_on_stop(self):
with mock.module('threadpool'):
x = TaskPool(app=self.app)
x.on_start()
x.on_stop()
x._pool.dismissWorkers.assert_called_with(x.limit, do_join=True)
def test_on_apply(self):
with mock.module('threadpool'):
x = TaskPool(app=self.app)
x.on_start()
callback = Mock()
accept_callback = Mock()
target = Mock()
req = x.on_apply(target, args=(1, 2), kwargs={'a': 10},
callback=callback,
accept_callback=accept_callback)
x.WorkRequest.assert_called_with(
apply_target,
(target, (1, 2), {'a': 10}, callback, accept_callback),
)
x._pool.putRequest.assert_called_with(req)
x._pool._results_queue.queue.clear.assert_called_with()

Просмотреть файл

@ -133,7 +133,7 @@ Celery is…
- prefork (multiprocessing), - prefork (multiprocessing),
- Eventlet_, gevent_ - Eventlet_, gevent_
- threads/single threaded - `solo` (single threaded)
- **Result Stores** - **Result Stores**

Просмотреть файл

@ -111,7 +111,7 @@ of CPU's is rarely effective, and likely to degrade performance
instead. instead.
Including the default prefork pool, Celery also supports using Including the default prefork pool, Celery also supports using
Eventlet, Gevent, and threads (see :ref:`concurrency`). Eventlet, Gevent, and running in a single thread (see :ref:`concurrency`).
-- *Events* is an option that when enabled causes Celery to send -- *Events* is an option that when enabled causes Celery to send
monitoring messages (events) for actions occurring in the worker. monitoring messages (events) for actions occurring in the worker.

Просмотреть файл

@ -59,9 +59,6 @@ Concurrency
:``celery[gevent]``: :``celery[gevent]``:
for using the :pypi:`gevent` pool. for using the :pypi:`gevent` pool.
:``celery[threads]``:
for using the thread pool.
Transports and Backends Transports and Backends
~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~

Просмотреть файл

@ -133,7 +133,7 @@ It supports…
- **Concurrency** - **Concurrency**
- Prefork, Eventlet_, gevent_, threads/single threaded - Prefork, Eventlet_, gevent_, single threaded (``solo``)
- **Result Stores** - **Result Stores**

Просмотреть файл

@ -268,7 +268,7 @@ Module Overview
- celery.concurrency - celery.concurrency
Execution pool implementations (prefork, eventlet, gevent, threads). Execution pool implementations (prefork, eventlet, gevent, solo).
- celery.db - celery.db

Просмотреть файл

@ -1,11 +0,0 @@
===================================================================
``celery.concurrency.threads``‡ (**minefield**)
===================================================================
.. contents::
:local:
.. currentmodule:: celery.concurrency.threads
.. automodule:: celery.concurrency.threads
:members:
:undoc-members:

Просмотреть файл

@ -19,7 +19,6 @@
celery.concurrency.eventlet celery.concurrency.eventlet
celery.concurrency.gevent celery.concurrency.gevent
celery.concurrency.base celery.concurrency.base
celery.concurrency.threads
celery.backends celery.backends
celery.backends.base celery.backends.base
celery.backends.async celery.backends.async

Просмотреть файл

@ -232,7 +232,7 @@ Remote control
commands from the command-line. It supports all of the commands commands from the command-line. It supports all of the commands
listed below. See :ref:`monitoring-control` for more information. listed below. See :ref:`monitoring-control` for more information.
:pool support: *prefork, eventlet, gevent*, blocking:*threads/solo* (see note) :pool support: *prefork, eventlet, gevent*, blocking:*solo* (see note)
:broker support: *amqp* :broker support: *amqp*
Workers have the ability to be remote controlled using a high-priority Workers have the ability to be remote controlled using a high-priority
@ -255,7 +255,7 @@ to the number of destination hosts.
.. note:: .. note::
The solo and threads pool supports remote control commands, The ``solo`` pool supports remote control commands,
but any task executing will block any waiting control command, but any task executing will block any waiting control command,
so it is of limited use if the worker is very busy. In that so it is of limited use if the worker is very busy. In that
case you must increase the timeout waiting for replies in the client. case you must increase the timeout waiting for replies in the client.

Просмотреть файл

@ -16,7 +16,7 @@ _celery()
dopts="--detach --umask= --gid= --uid= --pidfile= dopts="--detach --umask= --gid= --uid= --pidfile=
--logfile= --loglevel= --executable=" --logfile= --loglevel= --executable="
controlargs="--timeout --destination" controlargs="--timeout --destination"
pools="prefork eventlet gevent threads solo" pools="prefork eventlet gevent solo"
loglevels="critical error warning info debug" loglevels="critical error warning info debug"
in_opt=0 in_opt=0

Просмотреть файл

@ -40,7 +40,7 @@ case "$words[1]" in
worker) worker)
_arguments \ _arguments \
'(-C --concurrency=)'{-C,--concurrency=}'[Number of child processes processing the queue. The default is the number of CPUs.]' \ '(-C --concurrency=)'{-C,--concurrency=}'[Number of child processes processing the queue. The default is the number of CPUs.]' \
'(--pool)--pool=:::(prefork eventlet gevent threads solo)' \ '(--pool)--pool=:::(prefork eventlet gevent solo)' \
'(--purge --discard)'{--discard,--purge}'[Purges all waiting tasks before the daemon is started.]' \ '(--purge --discard)'{--discard,--purge}'[Purges all waiting tasks before the daemon is started.]' \
'(-f --logfile=)'{-f,--logfile=}'[Path to log file. If no logfile is specified, stderr is used.]' \ '(-f --logfile=)'{-f,--logfile=}'[Path to log file. If no logfile is specified, stderr is used.]' \
'(--loglevel=)--loglevel=:::(critical error warning info debug)' \ '(--loglevel=)--loglevel=:::(critical error warning info debug)' \

Просмотреть файл

@ -1 +0,0 @@
threadpool

Просмотреть файл

@ -1,2 +1 @@
multiprocessing multiprocessing
-r extras/threads.txt

Просмотреть файл

@ -193,7 +193,7 @@ def extras(*p):
# Celery specific # Celery specific
features = set([ features = set([
'auth', 'cassandra', 'elasticsearch', 'memcache', 'pymemcache', 'auth', 'cassandra', 'elasticsearch', 'memcache', 'pymemcache',
'couchbase', 'threads', 'eventlet', 'gevent', 'msgpack', 'yaml', 'couchbase', 'eventlet', 'gevent', 'msgpack', 'yaml',
'redis', 'sqs', 'couchdb', 'riak', 'zookeeper', 'redis', 'sqs', 'couchdb', 'riak', 'zookeeper',
'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib', 'consul' 'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib', 'consul'
]) ])