зеркало из https://github.com/mozilla/treeherder.git
Bug 1076710 - Remove socketio remnants
We're not currently using socketio - and if we start doing so in the future we'll likely want to update to a newer version/adjust the implementation anyway. Removing the dependencies from common.txt speeds up the pip install on Travis. The old files will still be in version control should we wish to refer to them :-)
This commit is contained in:
Родитель
5e7f316619
Коммит
e1631abac8
|
@ -70,7 +70,7 @@ You can run this file with the following command
|
|||
|
||||
(venv)vagrant@precise32:~/treeherder-service$ sudo puppet apply puppet/your_manifest_file.pp
|
||||
|
||||
Once puppet has finished, the only thing left to do is to start all the treeherder services (gunicorn, socketio, celery, etc).
|
||||
Once puppet has finished, the only thing left to do is to start all the treeherder services (gunicorn, celery, etc).
|
||||
The easiest way to do it is via supervisord.
|
||||
A supervisord configuration file is included in the repo under deployment/supervisord/treeherder.conf.
|
||||
|
||||
|
|
|
@ -15,18 +15,6 @@ Gunicorn
|
|||
A wsgi server in charge of serving the restful api and the django admin.
|
||||
All the requests to this server are proxied through varnish and apache.
|
||||
|
||||
Gevent-socketio
|
||||
---------------
|
||||
|
||||
A gevent-based implementation of a `socket.io`_ server that can send soft-realtime updates to the clients.
|
||||
It only serves socketio-related request, typically namespaced with /socket.io.
|
||||
When executing, it consumes messages from rabbitmq using a "events.#" routing key.
|
||||
As soon as a new event is detected, it's sent down to the consumers who subscribed to it.
|
||||
To separate the socket.io connection from the standard http ones we use varnish with the following configuration
|
||||
|
||||
.. literalinclude:: ../puppet/files/varnish/default.vcl
|
||||
:lines: 14-26
|
||||
|
||||
Celery task worker
|
||||
------------------
|
||||
|
||||
|
@ -48,7 +36,3 @@ Celerymon task monitor
|
|||
|
||||
This process provides an interface to the status of the worker and the running tasks. It can be used to provide such informations
|
||||
to monitoring tools like munin.
|
||||
|
||||
|
||||
|
||||
.. _socket.io: http://socket.io
|
||||
|
|
|
@ -30,6 +30,5 @@ Where are my log files?
|
|||
You can find the various services log files under
|
||||
* /var/log/celery
|
||||
* /var/log/gunicorn
|
||||
* /var/log/socketio
|
||||
|
||||
You may also want to inspect the main treeherder log file ~/treeherder-service/treeherder.log
|
||||
|
|
|
@ -2,26 +2,13 @@ backend apache {
|
|||
.host = "127.0.0.1";
|
||||
.port = "8080";
|
||||
}
|
||||
backend socketio {
|
||||
.host = "127.0.0.1";
|
||||
.port = "8005";
|
||||
}
|
||||
sub vcl_pipe {
|
||||
if (req.http.upgrade) {
|
||||
set bereq.http.upgrade = req.http.upgrade;
|
||||
}
|
||||
}
|
||||
sub vcl_recv {
|
||||
if (req.url ~ "socket.io/[0-9]") {
|
||||
set req.backend = socketio;
|
||||
|
||||
if(req.http.upgrade ~ "(?i)websocket"){
|
||||
return (pipe);
|
||||
}
|
||||
}
|
||||
else {
|
||||
set req.backend = apache;
|
||||
}
|
||||
set req.backend = apache;
|
||||
return (pass);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ class treeherder {
|
|||
file { [
|
||||
"/var/log/gunicorn",
|
||||
"/var/log/celery",
|
||||
"/var/log/socketio"
|
||||
]:
|
||||
ensure => "directory",
|
||||
owner => "${APP_USER}",
|
||||
|
|
|
@ -7,9 +7,6 @@ kombu==3.0.23
|
|||
simplejson==3.3.0
|
||||
Cython==0.19.2
|
||||
gevent==1.0
|
||||
# gevent-socketio 3.6 hasn't been released yet
|
||||
https://github.com/abourget/gevent-socketio/archive/0f9bd2744af033b7cba57bfd5b82106592e9f667.zip#egg=gevent-socketio
|
||||
gevent-websocket==0.9
|
||||
|
||||
# Required by gevent
|
||||
greenlet==0.4.1
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
from kombu.mixins import ConsumerMixin
|
||||
from kombu import Exchange, Queue
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsConsumer(ConsumerMixin):
|
||||
|
||||
"""
|
||||
A specialized message consumer for the 'events' exchange.
|
||||
|
||||
The subscription mechanism is based on a simple routing key with the
|
||||
following structure:
|
||||
|
||||
[ * | try | mozilla-inbound | ...]( [ * | job | job_failure | resultset ] )
|
||||
|
||||
The first member is the branch name (or a * wildcard) and the second
|
||||
optional member is the event type (again * is allowed).
|
||||
|
||||
For example you can subscribe using the following keys:
|
||||
|
||||
* (equivalent to *.*)
|
||||
*.job
|
||||
try.*
|
||||
try.job_failure
|
||||
"""
|
||||
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
self.exchange = Exchange("events", type="topic")
|
||||
self.consumers = []
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [
|
||||
Consumer(**c) for c in self.consumers
|
||||
]
|
||||
|
||||
def listen_to(self, routing_key, callback):
|
||||
logger.info("message consumer listening to : {0}".format(
|
||||
routing_key
|
||||
))
|
||||
|
||||
queue = Queue(
|
||||
name="",
|
||||
channel=self.connection.channel(),
|
||||
exchange=self.exchange,
|
||||
routing_key=routing_key,
|
||||
durable=False,
|
||||
auto_delete=True
|
||||
)
|
||||
|
||||
self.consumers.append(dict(queues=queue, callbacks=[callback]))
|
|
@ -1,152 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import logging
|
||||
from os.path import dirname
|
||||
import gevent
|
||||
from gevent import monkey
|
||||
monkey.patch_all() # noqa
|
||||
from socketio.server import SocketIOServer
|
||||
from socketio import socketio_manage
|
||||
from kombu import Connection
|
||||
|
||||
sys.path.append(dirname(dirname(dirname(__file__)))) # noqa
|
||||
|
||||
from treeherder.events.consumer import EventsConsumer
|
||||
from treeherder.events.sockets import EventsNamespace
|
||||
|
||||
logger = logging.getLogger("treeherder.events")
|
||||
|
||||
|
||||
class Application(object):
|
||||
|
||||
"""wsgi application with socketio enabled"""
|
||||
|
||||
def __init__(self):
|
||||
self.buffer = []
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
path = environ['PATH_INFO'].strip('/') or 'index.html'
|
||||
|
||||
if path.startswith("socket.io"):
|
||||
socketio_manage(environ, {'/events': EventsNamespace})
|
||||
else:
|
||||
return not_found(start_response)
|
||||
|
||||
|
||||
def not_found(start_response):
|
||||
start_response('404 Not Found', [])
|
||||
return ['<h1>Not Found</h1>']
|
||||
|
||||
|
||||
def broadcast_subscribers(body, msg):
|
||||
"""
|
||||
This is the main function where all the magic happens
|
||||
It broadcasts the events to the clients subscribed to
|
||||
them.
|
||||
"""
|
||||
pkt = dict(type="event", name=body['event'],
|
||||
args=body, endpoint='/events')
|
||||
|
||||
logger.debug("emitting event {0} on branch {1}".format(
|
||||
body["event"], body["branch"]
|
||||
))
|
||||
|
||||
for session_id, socket in server.sockets.iteritems():
|
||||
# loop over all the open connections
|
||||
# and send a message when needed
|
||||
if "subscriptions" not in socket.session:
|
||||
continue
|
||||
|
||||
for branch, events in socket.session['subscriptions'].items():
|
||||
if branch == body["branch"] or branch == "*":
|
||||
if body["event"] in events or "*" in events:
|
||||
logger.debug("sending packet {0} to {1}".format(
|
||||
pkt, session_id
|
||||
))
|
||||
socket.send_packet(pkt)
|
||||
break
|
||||
msg.ack()
|
||||
|
||||
|
||||
def start_consumer(broker_url):
|
||||
with Connection(broker_url) as conn:
|
||||
consumer = EventsConsumer(conn)
|
||||
consumer.listen_to("events.#", broadcast_subscribers)
|
||||
consumer.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
LOG_LEVELS = {
|
||||
"DEBUG": logging.DEBUG,
|
||||
"INFO": logging.INFO,
|
||||
"WARNING": logging.WARNING,
|
||||
"ERROR": logging.ERROR,
|
||||
"CRITICAL": logging.CRITICAL
|
||||
}
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--host",
|
||||
help="interface to bind the server to",
|
||||
default="0.0.0.0")
|
||||
parser.add_argument("--port",
|
||||
help="port to bind the server to",
|
||||
default="8005",
|
||||
type=int)
|
||||
parser.add_argument("--broker-url",
|
||||
help="url of the broker to use",
|
||||
required=True)
|
||||
parser.add_argument('--log-file',
|
||||
default=None,
|
||||
help="""the file where the log should be written to.
|
||||
Default to stdout""")
|
||||
parser.add_argument("--log-level",
|
||||
help="minimum level to log",
|
||||
default="DEBUG",
|
||||
choices=LOG_LEVELS.keys())
|
||||
args = parser.parse_args()
|
||||
|
||||
# logging system setup
|
||||
root_logger = logging.getLogger('')
|
||||
root_logger.setLevel(LOG_LEVELS[args.log_level])
|
||||
if args.log_file:
|
||||
log_handler = logging.FileHandler(args.log_file)
|
||||
else:
|
||||
log_handler = logging.StreamHandler()
|
||||
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
log_handler.setFormatter(formatter)
|
||||
root_logger.addHandler(log_handler)
|
||||
|
||||
try:
|
||||
logger.info("Starting SocketIOServer")
|
||||
|
||||
available_transports = [
|
||||
'htmlfile',
|
||||
'xhr-multipart',
|
||||
'xhr-polling',
|
||||
'jsonp-polling'
|
||||
]
|
||||
server = SocketIOServer(
|
||||
(args.host, args.port),
|
||||
Application(),
|
||||
resource="socket.io",
|
||||
transports=available_transports,
|
||||
policy_server=False
|
||||
)
|
||||
logger.info("Listening to http://{0}:{1}".format(args.host, args.port))
|
||||
logger.debug("writing logs to %s" % args.log_file)
|
||||
gevent.spawn(start_consumer, args.broker_url)
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Socketio server stopped")
|
||||
for handler in logger.handlers:
|
||||
try:
|
||||
handler.close()
|
||||
except AttributeError:
|
||||
pass
|
|
@ -1,61 +0,0 @@
|
|||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from socketio.namespace import BaseNamespace
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsNamespace(BaseNamespace):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(EventsNamespace, self).__init__(*args, **kwargs)
|
||||
logger.info("New connection")
|
||||
self.session['subscriptions'] = defaultdict(set)
|
||||
|
||||
def log(self, message, level="DEBUG"):
|
||||
logger.log(getattr(logging, level),
|
||||
"[{0}] {1}".format(self.socket.sessid, message))
|
||||
|
||||
def on_subscribe(self, subscription):
|
||||
"""
|
||||
this method is triggered by a new client subscription.
|
||||
subscription is a string indicating a branch or branch.event
|
||||
"""
|
||||
|
||||
tokens = subscription.split(".")
|
||||
self.log("subscribing to {0}".format(subscription))
|
||||
if len(tokens) == 1:
|
||||
# event is implicitly set to 'all'
|
||||
self.session['subscriptions'][tokens[0]].add("*")
|
||||
elif len(tokens) == 2:
|
||||
# event subscription
|
||||
self.session['subscriptions'][tokens[0]].add(tokens[1])
|
||||
else:
|
||||
error_message = 'malformed subscription'
|
||||
self.emit('error', error_message)
|
||||
self.log(error_message, "ERROR")
|
||||
|
||||
def on_unsubscribe(self, subscription=None):
|
||||
"""
|
||||
this method is triggered by a new client subscription.
|
||||
subscription is a string indicating a branch or branch.event
|
||||
if no subscription is passed, all the subscriptions are cleared
|
||||
"""
|
||||
|
||||
self.log("unsubscribing from channels: {0}".format(subscription))
|
||||
if not subscription:
|
||||
self.session['subscriptions'] = defaultdict(set)
|
||||
else:
|
||||
tokens = subscription.split(".")
|
||||
if len(tokens) == 1:
|
||||
del self.session['subscriptions'][tokens[0]]
|
||||
else:
|
||||
self.session['subscriptions'][tokens[0]].remove(tokens[1])
|
||||
|
||||
def recv_disconnect(self):
|
||||
self.log("Disconnected")
|
||||
return True
|
Загрузка…
Ссылка в новой задаче