[backends] Add Consul.io Key/Value store as a backend
Add support for Consul as a backend using the Key/Value store of Consul. Consul has a HTTP API where through you can store keys with their values. The backend extends KeyValueStoreBackend and implements most of the methods. Mainly to set, get and remove objects. This allows Celery to store Task results in the K/V store of Consul. Consul also allows to set a TTL on keys using the Sessions from Consul. This way the backend supports auto expiry of Task results. For more information on Consul visit http://consul.io/ The backend uses python-consul for talking to the HTTP API. This package is fully Python 3 compliant just as this backend is. pip install python-consul That installs the required package to talk to Consul's HTTP API from Python.
This commit is contained in:
Родитель
320777611a
Коммит
77e44d8a17
|
@ -38,6 +38,7 @@ BACKEND_ALIASES = {
|
|||
'riak': 'celery.backends.riak:RiakBackend',
|
||||
'file': 'celery.backends.filesystem:FilesystemBackend',
|
||||
'disabled': 'celery.backends.base:DisabledBackend',
|
||||
'consul': 'celery.backends.consul:ConsulBackend'
|
||||
}
|
||||
|
||||
#: deprecated alias to ``current_app.backend``.
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
celery.backends.consul
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Consul result store backend.
|
||||
|
||||
- :class:`ConsulBackend` implements KeyValueStoreBackend to store results
|
||||
the key-value store of Consul.
|
||||
|
||||
"""
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from kombu.utils.url import parse_url
|
||||
|
||||
from celery.exceptions import ImproperlyConfigured
|
||||
from celery.backends.base import KeyValueStoreBackend, PY3
|
||||
from celery.utils.log import get_logger
|
||||
|
||||
try:
|
||||
import consul
|
||||
except ImportError:
|
||||
consul = None
|
||||
|
||||
LOGGER = get_logger(__name__)
|
||||
|
||||
__all__ = ['ConsulBackend']
|
||||
|
||||
CONSUL_MISSING = """\
|
||||
You need to install the python-consul library in order to use \
|
||||
the Consul result store backend."""
|
||||
|
||||
|
||||
class ConsulBackend(KeyValueStoreBackend):
|
||||
"""
|
||||
Consul.io K/V store backend for Celery
|
||||
"""
|
||||
consul = consul
|
||||
|
||||
supports_autoexpire = True
|
||||
|
||||
client = None
|
||||
consistency = 'consistent'
|
||||
path = None
|
||||
|
||||
def __init__(self, url=None, expires=None, **kwargs):
|
||||
super(ConsulBackend, self).__init__(**kwargs)
|
||||
|
||||
if self.consul is None:
|
||||
raise ImproperlyConfigured(CONSUL_MISSING)
|
||||
|
||||
self.url = url
|
||||
self.expires = self.prepare_expires(expires, int)
|
||||
|
||||
params = parse_url(self.url)
|
||||
self.path = params['virtual_host']
|
||||
LOGGER.debug('Setting on Consul client to connect to %s:%d',
|
||||
params['hostname'], params['port'])
|
||||
self.client = consul.Consul(host=params['hostname'],
|
||||
port=params['port'],
|
||||
consistency=self.consistency)
|
||||
|
||||
def _key_to_consul_key(self, key):
|
||||
if PY3:
|
||||
key = key.decode('utf-8')
|
||||
|
||||
if self.path is not None:
|
||||
return '{0}/{1}'.format(self.path, key)
|
||||
|
||||
return key
|
||||
|
||||
def get(self, key):
|
||||
LOGGER.debug('Trying to fetch key %s from Consul',
|
||||
self._key_to_consul_key(key))
|
||||
try:
|
||||
_, data = self.client.kv.get(self._key_to_consul_key(key))
|
||||
return data['Value']
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
def mget(self, keys):
|
||||
for key in keys:
|
||||
yield self.get(key)
|
||||
|
||||
def set(self, key, value):
|
||||
"""Set a key in Consul
|
||||
|
||||
Before creating the key it will create a session inside Consul
|
||||
where it creates a session with a TTL
|
||||
|
||||
The key created afterwards will reference to the session's ID.
|
||||
|
||||
If the session expires it will remove the key so that results
|
||||
can auto expire from the K/V store
|
||||
"""
|
||||
session_name = key
|
||||
|
||||
if PY3:
|
||||
session_name = key.decode('utf-8')
|
||||
|
||||
LOGGER.debug('Trying to create Consul session %s with TTL %d',
|
||||
session_name, self.expires)
|
||||
session_id = self.client.session.create(name=session_name,
|
||||
behavior='delete',
|
||||
ttl=self.expires)
|
||||
LOGGER.debug('Created Consul session %s', session_id)
|
||||
|
||||
LOGGER.debug('Writing key %s to Consul', self._key_to_consul_key(key))
|
||||
return self.client.kv.put(key=self._key_to_consul_key(key),
|
||||
value=value,
|
||||
acquire=session_id)
|
||||
|
||||
def delete(self, key):
|
||||
LOGGER.debug('Removing key %s from Consul',
|
||||
self._key_to_consul_key(key))
|
||||
return self.client.kv.delete(self._key_to_consul_key(key))
|
|
@ -0,0 +1,37 @@
|
|||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from celery.tests.case import AppCase, Mock, skip
|
||||
from celery.backends.consul import ConsulBackend
|
||||
|
||||
try:
|
||||
import consul
|
||||
except ImportError:
|
||||
consul = None
|
||||
|
||||
|
||||
@skip.unless_module('consul')
|
||||
class test_ConsulBackend(AppCase):
|
||||
|
||||
def setup(self):
|
||||
if consul is None:
|
||||
raise SkipTest('python-consul is not installed.')
|
||||
self.backend = ConsulBackend(app=self.app)
|
||||
|
||||
def test_supports_autoexpire(self):
|
||||
self.assertTrue(self.backend.supports_autoexpire)
|
||||
|
||||
def test_consul_consistency(self):
|
||||
self.assertEqual('consistent', self.backend.consistency)
|
||||
|
||||
def test_get(self):
|
||||
c = ConsulBackend(app=self.app)
|
||||
c.client = Mock()
|
||||
c.client.kv = Mock()
|
||||
c.client.kv.get = Mock()
|
||||
index = 100
|
||||
data = {'Key': 'test-consul-1', 'Value': 'mypayload'}
|
||||
r = (index, data)
|
||||
c.client.kv.get.return_value = r
|
||||
i, d = c.get(data['Key'])
|
||||
self.assertEqual(i, 100)
|
||||
self.assertEqual(d['Key'], data['Key'])
|
|
@ -552,6 +552,10 @@ Can be one of the following:
|
|||
Older AMQP backend (badly) emulating a database-based backend.
|
||||
See :ref:`conf-amqp-result-backend`.
|
||||
|
||||
* ``consul``
|
||||
Use the `Consul`_ K/V store to store the results
|
||||
See :ref:`conf-consul-result-backend`.
|
||||
|
||||
.. warning:
|
||||
|
||||
While the AMQP result backend is very efficient, you must make sure
|
||||
|
@ -566,6 +570,7 @@ Can be one of the following:
|
|||
.. _`IronCache`: http://www.iron.io/cache
|
||||
.. _`CouchDB`: http://www.couchdb.com/
|
||||
.. _`Couchbase`: http://www.couchbase.com/
|
||||
.. _`Consul`: http://consul.io/
|
||||
|
||||
.. setting:: result_serializer
|
||||
|
||||
|
@ -1297,6 +1302,19 @@ without any further configuration. For larger clusters you could use NFS,
|
|||
.. _`GlusterFS`: http://www.gluster.org/
|
||||
.. _`HDFS`: http://hadoop.apache.org/
|
||||
|
||||
.. _conf-consul-result-backend:
|
||||
|
||||
Consul K/V store backend settings
|
||||
---------------------------------
|
||||
|
||||
The Consul backend can be configured using a URL, for example:
|
||||
|
||||
CELERY_RESULT_BACKEND = 'consul://localhost:8500/'
|
||||
|
||||
The backend will storage results in the K/V store of Consul
|
||||
as individual keys.
|
||||
|
||||
The backend supports auto expire of results using TTLs in Consul.
|
||||
|
||||
.. _conf-messaging:
|
||||
|
||||
|
|
|
@ -121,6 +121,9 @@ Transports and Backends
|
|||
:``celery[slmq]``:
|
||||
for using the SoftLayer Message Queue transport (*experimental*).
|
||||
|
||||
:``celery[consul]``:
|
||||
for using the Consul.io Key/Value store as a message transport or result backend (*experimental*).
|
||||
|
||||
.. _celery-installing-from-source:
|
||||
|
||||
Downloading and installing from source
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
==========================================
|
||||
celery.backends.consul
|
||||
==========================================
|
||||
|
||||
.. contents::
|
||||
:local:
|
||||
.. currentmodule:: celery.backends.consul
|
||||
|
||||
.. automodule:: celery.backends.consul
|
||||
:members:
|
||||
:undoc-members:
|
|
@ -0,0 +1 @@
|
|||
python-consul
|
2
setup.py
2
setup.py
|
@ -194,7 +194,7 @@ features = set([
|
|||
'auth', 'cassandra', 'elasticsearch', 'memcache', 'pymemcache',
|
||||
'couchbase', 'threads', 'eventlet', 'gevent', 'msgpack', 'yaml',
|
||||
'redis', 'mongodb', 'sqs', 'couchdb', 'riak', 'beanstalk', 'zookeeper',
|
||||
'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib'
|
||||
'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq', 'tblib', 'consul'
|
||||
])
|
||||
extras_require = dict((x, extras(x + '.txt')) for x in features)
|
||||
extra['extras_require'] = extras_require
|
||||
|
|
Загрузка…
Ссылка в новой задаче