зеркало из https://github.com/mozilla/treeherder.git
merged master
This commit is contained in:
Коммит
e66e63c6d2
|
@ -6,7 +6,7 @@ end
|
|||
Vagrant::Config.run do |config|
|
||||
config.vm.box = "precise32"
|
||||
config.vm.box_url = "http://files.vagrantup.com/precise32.box"
|
||||
config.vm.customize ["modifyvm", :id, "--memory", "512"]
|
||||
config.vm.customize ["modifyvm", :id, "--memory", "1024"]
|
||||
config.vm.network :hostonly, "192.168.33.10"
|
||||
|
||||
# enable this to see the GUI if vagrant cannot connect
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
#!/bin/bash
|
||||
curr_dir=$( dirname "${BASH_SOURCE[0]}" )
|
||||
cd $( dirname $curr_dir)
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
exec ../venv/bin/python manage.py celerymon
|
|
@ -0,0 +1,7 @@
|
|||
#!/bin/bash
|
||||
curr_dir=$( dirname "${BASH_SOURCE[0]}" )
|
||||
cd $( dirname $curr_dir)
|
||||
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
exec ../venv/bin/python manage.py celeryd -c 3 -E --maxtasksperchild=500 --logfile=/var/log/celery/treeherder_worker.log
|
|
@ -0,0 +1,6 @@
|
|||
#!/bin/bash
|
||||
curr_dir=$( dirname "${BASH_SOURCE[0]}" )
|
||||
cd $( dirname $curr_dir)
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
exec ../venv/bin/python manage.py celeryd -Q log_parser_fail,log_parser -P gevent --concurrency=10 -E --logfile=/var/log/celery/treeherder_worker_gevent.log -n gevent_worker.dev3 -l info --maxtasksperchild=500
|
|
@ -0,0 +1,6 @@
|
|||
#!/bin/bash
|
||||
curr_dir=$( dirname "${BASH_SOURCE[0]}" )
|
||||
cd $( dirname $curr_dir)
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
exec ../venv/bin/python manage.py celerybeat
|
|
@ -0,0 +1,19 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
curr_dir=$( dirname "${BASH_SOURCE[0]}" )
|
||||
cd $( dirname $curr_dir)
|
||||
|
||||
LOGDIR=/var/log/gunicorn
|
||||
ACCESS_LOGFILE=$LOGDIR/treeherder_access.log
|
||||
ERROR_LOGFILE=$LOGDIR/treeherder_error.log
|
||||
|
||||
NUM_WORKERS=5
|
||||
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
|
||||
exec ../venv/bin/gunicorn -w $NUM_WORKERS \
|
||||
--max-requests=200 \
|
||||
--access-logfile=$ACCESS_LOGFILE \
|
||||
--error-logfile=$ERROR_LOGFILE treeherder.webapp.wsgi:application
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/bash
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
exec ../venv/bin/python manage.py start_pulse_consumer --start
|
|
@ -0,0 +1,21 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
curr_dir=$( dirname "${BASH_SOURCE[0]}" )
|
||||
cd $( dirname $curr_dir)
|
||||
|
||||
LOGFILE=/var/log/socketio/treeherder.log
|
||||
|
||||
source /etc/profile.d/treeherder.sh
|
||||
source ../venv/bin/activate
|
||||
USER=$TREEHERDER_RABBITMQ_USER
|
||||
PASS=$TREEHERDER_RABBITMQ_PASSWORD
|
||||
HOST=$TREEHERDER_RABBITMQ_HOST
|
||||
PORT=$TREEHERDER_RABBITMQ_PORT
|
||||
VHOST=$TREEHERDER_RABBITMQ_VHOST
|
||||
|
||||
|
||||
exec python treeherder/events/run_socketio.py \
|
||||
--broker-url amqp://$USER:$PASS@$HOST:$PORT/$VHOST \
|
||||
--log-file LOGFILE
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
[program:gunicorn]
|
||||
command=/home/vagrant/treeherder-service/bin/run_gunicorn
|
||||
user=vagrant
|
||||
autostart=true
|
||||
autorestart=true
|
||||
redirect_stderr=true
|
||||
|
||||
[program:socketio-server]
|
||||
command=/home/vagrant/treeherder-service/bin/run_socketio_server
|
||||
user=vagrant
|
||||
autostart=true
|
||||
autorestart=true
|
||||
stderr_logfile=/var/log/socketio/treeherder_err.log
|
||||
|
||||
[program:celerybeat]
|
||||
command=/home/vagrant/treeherder-service/bin/run_celerybeat
|
||||
user=vagrant
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
priority=999
|
||||
|
||||
[program:celery]
|
||||
command=/home/vagrant/treeherder-service/bin/run_celery_worker
|
||||
user=vagrant
|
||||
numprocs=1
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
stopwaitsecs = 600
|
||||
priority=998
|
||||
stdout_logfile=/var/log/celery/worker.log
|
||||
stderr_logfile=/var/log/celery/worker_err.log
|
||||
|
||||
[program:celery_gevent]
|
||||
command=/home/vagrant/treeherder-service/bin/run_celery_worker_gevent
|
||||
user=vagrant
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
stopwaitsecs = 600
|
||||
priority=998
|
||||
stdout_logfile=/var/log/celery/worker_gevent.log
|
||||
stderr_logfile=/var/log/celery/worker_gevent_err.log
|
||||
|
||||
[program:celerymon]
|
||||
command=/home/vagrant/treeherder-service/bin/run_celery_monitor
|
||||
user=vagrant
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
stopwaitsecs = 600
|
||||
priority=997
|
||||
stdout_logfile=/var/log/celery/celerymon.log
|
||||
stderr_logfile=/var/log/celery/celerymon_err.log
|
|
@ -0,0 +1,75 @@
|
|||
Deployment
|
||||
==========
|
||||
|
||||
The easiest way to deploy all the treeherder services on a server is to let puppet do it.
|
||||
Once puppet is installed on your machine, clone the treeherder repo on the target machine and create a puppet
|
||||
manifest like this inside the puppet directory:
|
||||
.. code-block:: ruby
|
||||
|
||||
import "classes/*.pp"
|
||||
|
||||
$APP_URL="your.webapp.url"
|
||||
$APP_USER="your_app_user"
|
||||
$APP_GROUP="your_app_group"
|
||||
$PROJ_DIR = "/home/${APP_USER}/treeherder-service"
|
||||
$VENV_DIR = "/home/${APP_USER}/venv"
|
||||
# You can make these less generic if you like, but these are box-specific
|
||||
# so it's not required.
|
||||
$DB_NAME = "db_name"
|
||||
$DB_USER = "db_user"
|
||||
$DB_PASS = "db_pass"
|
||||
$DB_HOST = "localhost"
|
||||
$DB_PORT = "3306"
|
||||
$DJANGO_SECRET_KEY = "your-django-secret"
|
||||
$RABBITMQ_USER = "your_rabbitmq_user"
|
||||
$RABBITMQ_PASSWORD = "your_rabbitmq_pass"
|
||||
$RABBITMQ_VHOST = "your_rabbitmq_vhost"
|
||||
$RABBITMQ_HOST = "your_rabbitmq_host"
|
||||
$RABBITMQ_PORT = "your_rabbitmq_port"
|
||||
|
||||
Exec {
|
||||
path => "/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin",
|
||||
}
|
||||
|
||||
file {"/etc/profile.d/treeherder.sh":
|
||||
content => "
|
||||
export TREEHERDER_DATABASE_NAME='${DB_NAME}'
|
||||
export TREEHERDER_DATABASE_USER='${DB_USER}'
|
||||
export TREEHERDER_DATABASE_PASSWORD='${DB_PASS}'
|
||||
export TREEHERDER_DATABASE_HOST='${DB_HOST}'
|
||||
export TREEHERDER_DATABASE_PORT='${DB_PORT}'
|
||||
export TREEHERDER_DEBUG='1'
|
||||
export TREEHERDER_DJANGO_SECRET_KEY='${DJANGO_SECRET_KEY}'
|
||||
export TREEHERDER_MEMCACHED='127.0.0.1:11211'
|
||||
export TREEHERDER_RABBITMQ_USER='${RABBITMQ_USER}'
|
||||
export TREEHERDER_RABBITMQ_PASSWORD='${RABBITMQ_PASSWORD}'
|
||||
export TREEHERDER_RABBITMQ_VHOST='${RABBITMQ_VHOST}'
|
||||
export TREEHERDER_RABBITMQ_HOST='${RABBITMQ_HOST}'
|
||||
export TREEHERDER_RABBITMQ_PORT='${RABBITMQ_PORT}'
|
||||
"
|
||||
}
|
||||
|
||||
class deployment {
|
||||
class {
|
||||
init: before => Class["mysql"];
|
||||
mysql: before => Class["python"];
|
||||
python: before => Class["apache"];
|
||||
apache: before => Class["varnish"];
|
||||
varnish: before => Class["treeherder"];
|
||||
treeherder: before => Class["rabbitmq"];
|
||||
rabbitmq:;
|
||||
}
|
||||
}
|
||||
|
||||
include deployment
|
||||
|
||||
As you can see it's very similar to the file used to startup the vagrant environment.
|
||||
You can run this file with the following command
|
||||
|
||||
.. code-block:: bash
|
||||
(venv)vagrant@precise32:~/treeherder-service$ sudo puppet apply puppet/your_manifest_file.pp
|
||||
|
||||
Once puppet has finished, the only thing left to do is to start all the treeherder services (gunicorn, socketio, celery, etc).
|
||||
The easiest way to do it is via supervisord.
|
||||
A supervisord configuration file is included in the repo under deployment/supervisord/treeherder.conf.
|
||||
|
|
@ -13,6 +13,7 @@ Contents:
|
|||
|
||||
installation
|
||||
dataload
|
||||
deployment
|
||||
|
||||
|
||||
Indices and tables
|
||||
|
|
|
@ -31,20 +31,7 @@ Installation
|
|||
|
||||
(venv)vagrant@precise32:~/treeherder-service$ ./runtests.sh
|
||||
|
||||
* Init a master database and setup datasources:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
(venv)vagrant@precise32:~/treeherder-service$ python manage.py init_master_db
|
||||
(venv)vagrant@precise32:~/treeherder-service$ python manage.py init_datasources
|
||||
|
||||
* Add an entry to your vm /etc/hosts for your treeherder virtual host:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
127.0.0.1 local.treeherder.mozilla.org
|
||||
|
||||
* And one to your host machine /etc/hosts so that you can point your browser to local.treeherder.mozilla.org to reach it
|
||||
* And an entry to your host machine /etc/hosts so that you can point your browser to local.treeherder.mozilla.org to reach it
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
|
@ -54,9 +41,9 @@ Installation
|
|||
|
||||
.. code-block:: bash
|
||||
|
||||
(venv)vagrant@precise32:~/treeherder-service$ gunicorn treeherder.webapp.wsgi:application
|
||||
(venv)vagrant@precise32:~/treeherder-service$ ./bin/run_gunicorn
|
||||
|
||||
all the request sent to local.treeherder.mozilla.org will be proxied to it by apache.
|
||||
all the request sent to local.treeherder.mozilla.org will be proxied to it by varnish/apache.
|
||||
|
||||
* Start up one or more celery worker to process async tasks:
|
||||
|
||||
|
@ -82,6 +69,12 @@ Installation
|
|||
(venv)vagrant@precise32:~/treeherder-service$ python setup.py build_ext --inplace
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
* If you want to use supervisord to take care of all the services,
|
||||
|
||||
|
||||
.. _project repo: https://github.com/mozilla/treeherder-service
|
||||
.. _Vagrant: http://downloads.vagrantup.com
|
||||
.. _Virtualbox: https://www.virtualbox.org
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
<VirtualHost *:80>
|
||||
<VirtualHost *:8080>
|
||||
ServerName <%= @APP_URL %>
|
||||
ProxyRequests Off
|
||||
<Proxy *>
|
||||
|
@ -7,11 +7,14 @@
|
|||
</Proxy>
|
||||
|
||||
|
||||
Alias /static/ <%= @PROJ_DIR %>/treeherder/webapp/static/
|
||||
ProxyPass /static/ !
|
||||
Alias /static <%= @PROJ_DIR %>/treeherder/webapp/static
|
||||
ProxyPass /static !
|
||||
|
||||
Alias /media/ <%= @PROJ_DIR %>/treeherder/webapp/media/
|
||||
ProxyPass /media/ !
|
||||
Alias /media <%= @PROJ_DIR %>/treeherder/webapp/media
|
||||
ProxyPass /media !
|
||||
|
||||
Alias /ui /home/<%= @APP_USER %>/treeherder-ui/webapp/app
|
||||
ProxyPass /ui !
|
||||
|
||||
ProxyPass / http://localhost:8000/
|
||||
ProxyPassReverse / http://localhost:8000/
|
||||
|
@ -20,4 +23,5 @@
|
|||
ErrorLog /var/log/<%= @apache_service %>/treeherder-service_err.log
|
||||
LogLevel warn
|
||||
CustomLog /var/log/<%= @apache_service %>/treeherder-service_access.log combined
|
||||
|
||||
</VirtualHost>
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
backend apache {
|
||||
.host = "127.0.0.1";
|
||||
.port = "8080";
|
||||
}
|
||||
backend socketio {
|
||||
.host = "127.0.0.1";
|
||||
.port = "8005";
|
||||
}
|
||||
sub vcl_pipe {
|
||||
if (req.http.upgrade) {
|
||||
set bereq.http.upgrade = req.http.upgrade;
|
||||
}
|
||||
}
|
||||
sub vcl_recv {
|
||||
if (req.url ~ "socket.io/[0-9]") {
|
||||
set req.backend = socketio;
|
||||
return (pipe);
|
||||
}
|
||||
else {
|
||||
set req.backend = apache;
|
||||
}
|
||||
}
|
||||
|
||||
sub vcl_fetch {
|
||||
if (beresp.http.content-type ~ "json" || beresp.http.content-type ~ "text" ) {
|
||||
set beresp.do_gzip = true;
|
||||
}
|
||||
}
|
|
@ -13,6 +13,11 @@ $apache_service = $operatingsystem ? {
|
|||
default => "httpd",
|
||||
}
|
||||
|
||||
$apache_port_definition_file = $operatingsystem ? {
|
||||
ubuntu => "/etc/apache2/ports.conf",
|
||||
default => "/etc/httpd/conf/httpd.conf",
|
||||
}
|
||||
|
||||
class apache {
|
||||
package { $apache_devel:
|
||||
ensure => present
|
||||
|
@ -29,6 +34,13 @@ class apache {
|
|||
notify => Service[$apache_service],
|
||||
}
|
||||
|
||||
exec { "sed -i '/[: ]80$/ s/80/8080/' ${apache_port_definition_file}":
|
||||
require => [Package[$apache_devel]],
|
||||
before => [
|
||||
Service[$apache_service]
|
||||
]
|
||||
}
|
||||
|
||||
service { $apache_service:
|
||||
ensure => running,
|
||||
enable => true,
|
||||
|
@ -50,5 +62,8 @@ class apache {
|
|||
onlyif => 'test ! -e /etc/apache2/mods-enabled/proxy_http.load',
|
||||
before => Service[$apache_service];
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,4 +6,15 @@ class dev{
|
|||
timeout => 1800,
|
||||
}
|
||||
|
||||
exec{"init_master_db":
|
||||
cwd => '/home/vagrant/treeherder-service',
|
||||
command => "${VENV_DIR}/bin/python manage.py init_master_db --noinput",
|
||||
}
|
||||
|
||||
exec{"init_datasources":
|
||||
cwd => '/home/vagrant/treeherder-service',
|
||||
command => "${VENV_DIR}/bin/python manage.py init_datasources",
|
||||
require => Exec["init_master_db"],
|
||||
}
|
||||
|
||||
}
|
|
@ -17,4 +17,14 @@ class treeherder {
|
|||
exec{"build-extensions":
|
||||
command => "${VENV_DIR}/bin/python ${PROJ_DIR}/setup.py build_ext --inplace"
|
||||
}
|
||||
|
||||
file { [
|
||||
"/var/log/gunicorn",
|
||||
"/var/log/celery"
|
||||
]:
|
||||
ensure => "directory",
|
||||
owner => "${APP_USER}",
|
||||
group => "${APP_GROUP}",
|
||||
mode => 755,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
define line($file, $line, $ensure = 'present') {
|
||||
case $ensure {
|
||||
default : { err ( "unknown ensure value ${ensure}" ) }
|
||||
present: {
|
||||
exec { "/bin/echo '${line}' >> '${file}'":
|
||||
unless => "/bin/grep -qFx '${line}' '${file}'"
|
||||
}
|
||||
}
|
||||
absent: {
|
||||
exec { "/usr/bin/perl -ni -e 'print unless /^\\Q${line}\\E\$/' '${file}'":
|
||||
onlyif => "/bin/grep -qFx '${line}' '${file}'"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
$varnish_port_file = $operatingsystem ? {
|
||||
ubuntu => "/etc/default/varnish",
|
||||
default => "/etc/sysconfig/varnish",
|
||||
}
|
||||
|
||||
$varnish_port_change = $operatingsystem ? {
|
||||
ubuntu => "sed -i '/^DAEMON_OPTS=\"-a :6081* / s/6081/80/' ${varnish_port_file}",
|
||||
default => "sed -i '/^VARNISH_LISTEN_PORT=6081$/ s/6081/80/' ${varnish_port_file}",
|
||||
}
|
||||
|
||||
|
||||
class varnish {
|
||||
package { "varnish":
|
||||
ensure => installed;
|
||||
}
|
||||
|
||||
service { "varnish":
|
||||
ensure => running,
|
||||
enable => true,
|
||||
require => Package['varnish'],
|
||||
}
|
||||
|
||||
file {"/etc/varnish/default.vcl":
|
||||
content => template("${PROJ_DIR}/puppet/files/varnish/default.vcl"),
|
||||
owner => "root", group => "root", mode => 0644,
|
||||
require => [Package["varnish"]],
|
||||
before => Service["varnish"],
|
||||
notify => Service["varnish"],
|
||||
}
|
||||
|
||||
exec { $varnish_port_change:
|
||||
require => [
|
||||
Package[$apache_devel],
|
||||
Package["varnish"],
|
||||
],
|
||||
before => [
|
||||
Service["varnish"]
|
||||
],
|
||||
notify => Service["varnish"],
|
||||
}
|
||||
|
||||
}
|
|
@ -26,6 +26,12 @@ Exec {
|
|||
path => "/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin",
|
||||
}
|
||||
|
||||
line {"etc-hosts":
|
||||
file => "/etc/hosts",
|
||||
line => "127.0.0.1 local.treeherder.mozilla.org",
|
||||
ensure => "present"
|
||||
}
|
||||
|
||||
file {"/etc/profile.d/treeherder.sh":
|
||||
content => "
|
||||
export TREEHERDER_DATABASE_NAME='${DB_NAME}'
|
||||
|
@ -49,7 +55,8 @@ class vagrant {
|
|||
init: before => Class["mysql"];
|
||||
mysql: before => Class["python"];
|
||||
python: before => Class["apache"];
|
||||
apache: before => Class["treeherder"];
|
||||
apache: before => Class["varnish"];
|
||||
varnish: before => Class["treeherder"];
|
||||
treeherder: before => Class["rabbitmq"];
|
||||
rabbitmq: before => Class["dev"];
|
||||
dev:;
|
||||
|
|
|
@ -1,9 +1,15 @@
|
|||
# Dependencies with compiled extensions
|
||||
MySQL-python
|
||||
gunicorn==0.17.2
|
||||
gunicorn==18.0
|
||||
django>=1.5,<1.6
|
||||
Celery==3.0.17
|
||||
django-celery==3.0.17
|
||||
celerymon==1.0.3
|
||||
kombu==2.4.7
|
||||
simplejson==3.3.0
|
||||
Cython==0.19.2
|
||||
Cython==0.19.2
|
||||
gevent==1.0
|
||||
# gevent-socketio 3.6 hasn't been released yet
|
||||
git+git://github.com/abourget/gevent-socketio@0f9bd2744a
|
||||
gevent-websocket==0.9
|
||||
greenlet==0.4.1
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
argparse==1.2.1
|
||||
South==0.7.6
|
||||
python-memcached==1.48
|
||||
mozillapulse==0.61
|
||||
|
||||
# requirements for mozillapulse
|
||||
mozillapulse==0.61
|
||||
carrot==0.10.7
|
||||
|
||||
djangorestframework==2.3.5
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
from kombu.mixins import ConsumerMixin
|
||||
from kombu import Connection, Exchange, Consumer, Queue
|
||||
|
||||
|
||||
class EventsConsumer(ConsumerMixin):
|
||||
"""
|
||||
A specialized message consumer for the 'events' exchange.
|
||||
|
||||
The subscription mechanism is based on a simple routing key with the
|
||||
following structure:
|
||||
|
||||
[ * | try | mozilla-inbound | ...]( [ * | job | job_failure | resultset ] )
|
||||
|
||||
The first member is the branch name (or a * wildcard) and the second
|
||||
optional member is the event type (again * is allowed).
|
||||
|
||||
For example you can subscribe using the following keys:
|
||||
|
||||
* (equivalent to *.*)
|
||||
*.job
|
||||
try.*
|
||||
try.job_failure
|
||||
"""
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
self.exchange = Exchange("events", type="topic")
|
||||
self.consumers = []
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [
|
||||
Consumer(**c) for c in self.consumers
|
||||
]
|
||||
|
||||
def listen_to(self, routing_key, callback):
|
||||
queue = Queue(
|
||||
name="",
|
||||
channel=self.connection.channel(),
|
||||
exchange=self.exchange,
|
||||
routing_key=routing_key,
|
||||
durable=False,
|
||||
auto_delete=True
|
||||
)
|
||||
|
||||
self.consumers.append(dict(queues=queue, callbacks=[callback]))
|
|
@ -0,0 +1,94 @@
|
|||
import logging
|
||||
from kombu import Connection, Exchange, Producer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class EventsPublisher(object):
|
||||
"""Generic publisher class that specific publishers inherit from."""
|
||||
|
||||
def __init__(self, connection_url, routing_key=None):
|
||||
self.connection_url = connection_url
|
||||
self.connection = None
|
||||
self.producer = None
|
||||
self.default_routing_key = routing_key
|
||||
self.exchange = Exchange("events", type="topic")
|
||||
|
||||
def connect(self):
|
||||
self.connection = Connection(self.connection_url)
|
||||
self.exchange = self.exchange(self.connection.channel)
|
||||
|
||||
def disconnect(self):
|
||||
if self.connection:
|
||||
self.connection.release()
|
||||
self.connection = None
|
||||
self.producer = None
|
||||
|
||||
def log(self, message):
|
||||
logger.info("{0}".format(message))
|
||||
|
||||
def publish(self, message, routing_key=None):
|
||||
|
||||
if not self.connection:
|
||||
self.connect()
|
||||
|
||||
if not self.producer:
|
||||
self.producer = Producer(self.connection, self.exchange)
|
||||
|
||||
routing_key = routing_key or self.default_routing_key
|
||||
if not routing_key:
|
||||
raise Exception("Routing key not specified")
|
||||
|
||||
self.log("Publishing to exchange {0} with routing key {1}".format(
|
||||
self.exchange, routing_key
|
||||
))
|
||||
|
||||
self.producer.publish(message,
|
||||
exchange=self.exchange,
|
||||
routing_key=routing_key)
|
||||
|
||||
|
||||
class JobStatusPublisher(EventsPublisher):
|
||||
|
||||
def publish(self, job_id, branch, status):
|
||||
message = {
|
||||
"id": job_id,
|
||||
"event": "job",
|
||||
"branch": branch,
|
||||
"status": status
|
||||
}
|
||||
|
||||
super(JobStatusPublisher, self).publish(
|
||||
message,
|
||||
"events.{0}.job".format(branch)
|
||||
)
|
||||
|
||||
|
||||
class JobFailurePublisher(EventsPublisher):
|
||||
|
||||
def publish(self, job_id, branch):
|
||||
message = {
|
||||
"id": job_id,
|
||||
"event": "job_failure",
|
||||
"branch": branch
|
||||
}
|
||||
|
||||
super(JobFailurePublisher, self).publish(
|
||||
message,
|
||||
"events.job_failure"
|
||||
)
|
||||
|
||||
|
||||
class ResultsetPublisher(EventsPublisher):
|
||||
|
||||
def publish(self, resultset_id, branch, author):
|
||||
message = {
|
||||
"id": resultset_id,
|
||||
"event": "resultset",
|
||||
"branch": branch,
|
||||
"author": author
|
||||
}
|
||||
|
||||
super(ResultsetPublisher, self).publish(
|
||||
message,
|
||||
"events.{0}.resultset".format(branch))
|
|
@ -0,0 +1,97 @@
|
|||
#!/usr/bin/env python
|
||||
import sys
|
||||
import argparse
|
||||
from os.path import dirname
|
||||
import gevent
|
||||
from gevent import monkey
|
||||
monkey.patch_all()
|
||||
from socketio.server import SocketIOServer
|
||||
from socketio import socketio_manage
|
||||
import os
|
||||
from kombu import Connection, Consumer
|
||||
|
||||
sys.path.append(dirname(dirname(dirname(__file__))))
|
||||
os.environ['DJANGO_SETTINGS_MODULE'] = 'treeherder.settings'
|
||||
|
||||
from treeherder.events.consumer import EventsConsumer
|
||||
from treeherder.events.sockets import EventsNamespace
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
class Application(object):
|
||||
"""wsgi application with socketio enabled"""
|
||||
|
||||
def __init__(self):
|
||||
self.buffer = []
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
path = environ['PATH_INFO'].strip('/') or 'index.html'
|
||||
|
||||
if path.startswith("socket.io"):
|
||||
socketio_manage(environ, {'/events': EventsNamespace})
|
||||
else:
|
||||
return not_found(start_response)
|
||||
|
||||
|
||||
def not_found(start_response):
|
||||
start_response('404 Not Found', [])
|
||||
return ['<h1>Not Found</h1>']
|
||||
|
||||
|
||||
def broadcast_subscribers(body, msg):
|
||||
"""
|
||||
This is the main function where all the magic happens
|
||||
It broadcasts the events to the clients subscribed to
|
||||
them.
|
||||
"""
|
||||
pkt = dict(type="event", name=body['event'],
|
||||
args=body, endpoint='/events')
|
||||
|
||||
for session_id, socket in server.sockets.iteritems():
|
||||
# loop over all the open connections
|
||||
# and send a message when needed
|
||||
if "event" not in socket.session or "branch" not in socket.session:
|
||||
continue
|
||||
branch_condition = "*" in socket.session["branch"] \
|
||||
or body["branch"] in socket.session["branch"]
|
||||
event_condition = "*" in socket.session["event"] \
|
||||
or body["event"] in socket.session["event"]
|
||||
if branch_condition and event_condition:
|
||||
socket.send_packet(pkt)
|
||||
msg.ack()
|
||||
|
||||
|
||||
def start_consumer(broker_url):
|
||||
with Connection(broker_url) as conn:
|
||||
consumer = EventsConsumer(conn)
|
||||
consumer.listen_to("events.#", broadcast_subscribers)
|
||||
consumer.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--host",
|
||||
help="interface to bind the server to",
|
||||
default="127.0.0.1")
|
||||
parser.add_argument("--port",
|
||||
help="port to bind the server to",
|
||||
default="8005",
|
||||
type=int)
|
||||
parser.add_argument("--broker-url",
|
||||
help="url of the broker to use",
|
||||
required=True)
|
||||
parser.add_argument("--log-file",
|
||||
help="where to log the access log",
|
||||
default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
server = SocketIOServer((args.host, args.port), Application(),
|
||||
resource="socket.io", log_file=args.log_file)
|
||||
print "Listening on http://{0}:{1}".format(args.host, args.port)
|
||||
print "and on port 10843 (flash policy server)"
|
||||
gevent.spawn(start_consumer, args.broker_url)
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print "Socketio server stopped"
|
|
@ -0,0 +1,44 @@
|
|||
import logging
|
||||
|
||||
from socketio.namespace import BaseNamespace
|
||||
|
||||
|
||||
class EventsNamespace(BaseNamespace):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(EventsNamespace, self).__init__(*args, **kwargs)
|
||||
self.logger = logging.getLogger("treeherder.events.socketio")
|
||||
self.log("New connection")
|
||||
self.session['branch'] = set()
|
||||
self.session['event'] = set()
|
||||
|
||||
def log(self, message):
|
||||
self.logger.info("[{0}] {1}".format(self.socket.sessid, message))
|
||||
|
||||
def on_subscribe(self, subscription):
|
||||
"""
|
||||
this method is triggered by a new client subscription.
|
||||
it adds a prefix to the routing key to prevent message sniffing
|
||||
"""
|
||||
tokens = subscription.split(".")
|
||||
|
||||
if len(tokens) == 1:
|
||||
# branch subscription
|
||||
self.session['branch'].add(tokens[0])
|
||||
# event is implicitly set to 'all'
|
||||
self.session['event'].add("*")
|
||||
elif len(tokens) == 2:
|
||||
# event subscription
|
||||
self.session['branch'].add(tokens[0])
|
||||
self.session['event'].add(tokens[1])
|
||||
else:
|
||||
self.emit('error', 'malformed subscription')
|
||||
|
||||
def on_unsubscribe(self):
|
||||
self.session['branch'] = set()
|
||||
self.session['event'] = set()
|
||||
self.log("subscription reset")
|
||||
|
||||
def recv_disconnect(self):
|
||||
self.log("Disconnected")
|
||||
return True
|
|
@ -10,13 +10,15 @@ import simplejson as json
|
|||
import re
|
||||
|
||||
from celery import task
|
||||
from django.conf import settings
|
||||
|
||||
from treeherder.model.derived import JobsModel, RefDataManager
|
||||
from treeherder.log_parser.artifactbuildercollection import ArtifactBuilderCollection
|
||||
from treeherder.events.publisher import JobFailurePublisher, JobStatusPublisher
|
||||
|
||||
|
||||
@task(name='parse-log')
|
||||
def parse_log(project, job_id, check_errors=True):
|
||||
def parse_log(project, job_id, check_errors=False):
|
||||
"""
|
||||
Call ArtifactBuilderCollection on the given job.
|
||||
"""
|
||||
|
@ -28,6 +30,9 @@ def parse_log(project, job_id, check_errors=True):
|
|||
open_bugs_cache = {}
|
||||
closed_bugs_cache = {}
|
||||
|
||||
status_publisher = JobStatusPublisher(settings.BROKER_URL)
|
||||
failure_publisher = JobFailurePublisher(settings.BROKER_URL)
|
||||
|
||||
try:
|
||||
log_references = jm.get_log_references(job_id)
|
||||
|
||||
|
@ -74,6 +79,12 @@ def parse_log(project, job_id, check_errors=True):
|
|||
|
||||
# store the artifacts generated
|
||||
jm.store_job_artifact(artifact_list)
|
||||
status_publisher.publish(job_id, project, 'processed')
|
||||
if check_errors:
|
||||
failure_publisher.publish(job_id, project)
|
||||
|
||||
finally:
|
||||
rdm.disconnect()
|
||||
jm.disconnect()
|
||||
status_publisher.disconnect()
|
||||
failure_publisher.disconnect()
|
||||
|
|
|
@ -6,6 +6,8 @@ from django.db import connection
|
|||
from django.core.management import call_command
|
||||
from django.utils.six.moves import input
|
||||
|
||||
from treeherder import path
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Init master database and call syncdb"
|
||||
|
@ -23,7 +25,7 @@ class Command(BaseCommand):
|
|||
make_option('--template-path',
|
||||
action='store',
|
||||
dest='template_path',
|
||||
default='treeherder/model/sql/template_schema/',
|
||||
default=path('model', 'sql', 'template_schema'),
|
||||
help='Directory containing the sql templates',
|
||||
),
|
||||
make_option('--skip-fixtures',
|
||||
|
|
|
@ -184,6 +184,10 @@ CELERY_QUEUES = (
|
|||
Queue('log_parser', Exchange('default'), routing_key='parse_log.success'),
|
||||
)
|
||||
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
|
||||
# default value when no task routing info is specified
|
||||
CELERY_DEFAULT_QUEUE = 'default'
|
||||
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
|
||||
|
|
|
@ -11,5 +11,5 @@ urlpatterns = patterns('',
|
|||
url(r'^browserid/', include('django_browserid.urls')),
|
||||
url(r'^admin/', include(admin.site.urls)),
|
||||
# by default redirect all request on / to /admin/
|
||||
url(r'^$', RedirectView.as_view(url='/api/'))
|
||||
url(r'^$', RedirectView.as_view(url='/ui/'))
|
||||
)
|
||||
|
|
Загрузка…
Ссылка в новой задаче