--HG--
extra : rebase_source : 614f412af96c31740b98f118a1d577de2563eeb4
This commit is contained in:
Eugene Lazutkin 2010-03-18 06:11:18 -05:00
Родитель 9775d8731d
Коммит 4ff04f2a3b
3 изменённых файлов: 96 добавлений и 7 удалений

Просмотреть файл

@ -104,10 +104,12 @@ c.static_override = None
# turns on asynchronous running of long jobs (like vcs)
c.async_jobs = True
# can be "" or False, "beanstalk" or True, or "restmq" for now
# beanstalkd host and port
# queue host, port, and a polling interval in seconds (for now)
c.queue_host = None
c.queue_port = None
c.queue_timeout = 0.3
# holds the actual queue object
c.queue = None
@ -305,7 +307,11 @@ def activate_profile():
c.queue_port = int(c.queue_port)
from bespin import queue
c.queue = queue.BeanstalkQueue(c.queue_host, c.queue_port)
if c.async_jobs is True or c.async_jobs == "beanstalk":
c.queue = queue.BeanstalkQueue(c.queue_host, c.queue_port)
elif c.async_jobs == "restmq":
c.queue = queue.RestMqQueue(c.queue_host, c.queue_port, timeout=float(c.queue_timeout))
if c.redis_port:
c.redis_port = int(c.redis_port)

Просмотреть файл

@ -581,6 +581,22 @@ def unfollow(request, response):
request.user.unfollow(other_user)
return _users_followed_response(request.user, response)
@expose(r'^/network/broadcast/', 'POST')
def broadcast(request, response):
user = request.user
try:
text = request.POST['text']
except KeyError:
text = "*UNSPECIFIED*"
followers = user.users_following_me()
for follower in followers:
follower.publish({
"msgtargetid": "42",
"from": user.username,
"text": text
})
return _users_followed_response(request.user, response)
@expose(r'^/group/list/all', 'GET')
def group_list_all(request, response):
groups = request.user.get_groups()

Просмотреть файл

@ -42,6 +42,10 @@ import time
import logging
import sys
import urllib
import urllib2
import time
from bespin import config
try:
@ -55,7 +59,7 @@ class QueueItem(object):
next_jobid = 0
def __init__(self, id, queue, message, execute, error_handler=None,
job=None, use_db=True):
job=None, use_db=True, origin=None):
if id == None:
self.id = QueueItem.next_jobid
QueueItem.next_jobid = QueueItem.next_jobid + 1
@ -67,6 +71,7 @@ class QueueItem(object):
self.error_handler = error_handler
self.job = job
self.use_db = use_db
self.origin = origin
self.session = None
def run(self):
@ -109,8 +114,12 @@ class QueueItem(object):
error_handler(self, e)
def done(self):
if self.job:
self.job.delete()
if origin == "beanstalk":
if self.job:
self.job.delete()
elif origin == "restmq":
if self.job:
self.job.delete(self.queue, self.id)
class BeanstalkQueue(object):
"""Manages Bespin jobs within a beanstalkd server.
@ -153,12 +162,71 @@ class BeanstalkQueue(object):
use_db = message.pop('__use_db')
qi = QueueItem(item.jid, name, message,
execute, error_handler=error_handler,
job=item, use_db=use_db)
job=item, use_db=use_db, origin="beanstalk")
yield qi
def close(self):
self.conn.close()
class RestMqQueue(object):
"""
Manages Bespin jobs within a RestMQ server.
http://github.com/gleicon/restmq
"""
def __init__(self, host, port, timeout=0.3):
self.host = host or "localhost"
self.port = port or 8888
self.timeout = timeout
self.url = "http://" + self.host + ":" + self.port + "/queue"
def _do_cmd(self, **kwargs):
# special treatment of 'value'
if 'value' in kwargs:
kwargs['value'] = simplejson.dumps(kwargs['value'])
req = urllib2.Request(
self.url,
urllib.urlencode({
'body': simplejson.dumps(kwargs)
})
)
rsp = urllib2.urlopen(req)
obj = simplejson.loads(rsp.read())
return obj
def enqueue(self, name, message, execute, error_handler, use_db):
message['__execute'] = execute
message['__error_handler'] = error_handler
message['__use_db'] = use_db
obj = self._do_cmd(cmd="add", queue=name, value=message)
return obj and obj['key'] or None
def delete(name, id):
return self._do_cmd(cmd="del", queue=name, key=id)
def read_queue(self, name):
log.debug("Starting to read %s on %s", name, self.url)
while True:
log.debug("Reserving next job")
item = self._do_cmd(cmd="get", queue=name)
if item is None or ('error' in item):
time.sleep(self.timeout)
continue
log.debug("Job received (%s)", item['key'])
message = simplejson.loads(item['value'])
execute = message.pop('__execute')
error_handler = message.pop('__error_handler')
use_db = message.pop('__use_db')
qi = QueueItem(item['key'], name, message,
execute, error_handler=error_handler,
job=self, use_db=use_db, origin="restmq")
yield qi
def close(self):
# don't need to close anything
pass
def _resolve_function(namestring):
modulename, funcname = namestring.split(":")
module = __import__(modulename, fromlist=[funcname])
@ -199,4 +267,3 @@ def process_queue(args=None):
log.debug("Message: %s", qi.message)
qi.run()
qi.done()