зеркало из https://github.com/mozilla/labs-vcap.git
Merge "Merge branch 'router_v2'"
This commit is contained in:
Коммит
fc47a89739
1
AUTHORS
1
AUTHORS
|
@ -11,6 +11,7 @@
|
|||
- Adam C. Greenfield { email: adam.greenfield@gmail.com }
|
||||
- Ananthan Srinivasan { email: asrinivasan@vmware.com, name: AB}
|
||||
- Andrew Liu { email: aliu@vmware.com}
|
||||
- Anfernee Yongkun Gui { email: agui@vmware.com }
|
||||
- Christian Dupuis { email: cdupuis@vmware.com}
|
||||
- Derek Collison { email: dcollison@vmware.com }
|
||||
- Ezra Zygmuntowicz { email: ez@vmware.com }
|
||||
|
|
|
@ -1,4 +1,30 @@
|
|||
include_attribute "deployment"
|
||||
default[:nginx][:worker_connections] = 2048
|
||||
default[:nginx][:dir] = File.join("", "etc", "nginx")
|
||||
|
||||
default[:nginx][:version] = "0.8.54"
|
||||
default[:nginx][:source] = "http://nginx.org/download/nginx-#{nginx[:version]}.tar.gz"
|
||||
default[:nginx][:patch] = "http://nginx.org/download/patch.2012.memory.txt"
|
||||
default[:nginx][:pcre_source] = "http://sourceforge.net/projects/pcre/files/pcre/8.12/pcre-8.12.tar.gz"
|
||||
default[:nginx][:module_upload_source] = "http://www.grid.net.ru/nginx/download/nginx_upload_module-2.2.0.tar.gz"
|
||||
default[:nginx][:module_headers_more_source] = "https://github.com/agentzh/headers-more-nginx-module/tarball/v0.15rc3"
|
||||
default[:nginx][:module_devel_kit_source] = "https://github.com/simpl/ngx_devel_kit/tarball/v0.2.17rc2"
|
||||
default[:nginx][:module_lua_source] = "https://github.com/chaoslawful/lua-nginx-module/tarball/v0.3.1rc24"
|
||||
default[:nginx][:path] = File.join(node[:deployment][:home], "deploy", "nginx", "nginx-#{nginx[:version]}")
|
||||
default[:nginx][:vcap_log] = File.join(node[:deployment][:home], "sys", "log", "vcap.access.log")
|
||||
|
||||
default[:lua][:version] = "5.1.4"
|
||||
default[:lua][:simple_version] = lua[:version].match(/\d+\.\d+/).to_s # something like 5.1
|
||||
default[:lua][:source] = "http://www.lua.org/ftp/lua-#{lua[:version]}.tar.gz"
|
||||
default[:lua][:path] = File.join(node[:deployment][:home], "deploy", "lua", "lua-#{lua[:version]}")
|
||||
default[:lua][:cjson_source] = "http://www.kyne.com.au/~mark/software/lua-cjson-1.0.3.tar.gz"
|
||||
default[:lua][:module_path] = File.join(lua[:path], 'lib', 'lua', lua[:simple_version])
|
||||
default[:lua][:plugin_source_path] = File.join(node["cloudfoundry"]["path"], "router", "ext", "nginx")
|
||||
|
||||
default[:nginx][:worker_connections] = 2048
|
||||
default[:nginx][:uls_ip] = "localhost"
|
||||
default[:nginx][:uls_port] = 8081
|
||||
default[:nginx][:log_home] = File.join(node[:deployment][:home], "log")
|
||||
default[:nginx][:status_user] = "admin"
|
||||
default[:nginx][:status_passwd] = "password"
|
||||
|
||||
default[:router][:session_key] = "14fbc303b76bacd1e0a3ab641c11d11400341c5d"
|
||||
default[:router][:trace_key] = "222"
|
||||
|
|
|
@ -6,21 +6,191 @@
|
|||
#
|
||||
#
|
||||
|
||||
nginx_version = node[:nginx][:version]
|
||||
nginx_source = node[:nginx][:source]
|
||||
nginx_path = node[:nginx][:path]
|
||||
lua_version = node[:lua][:version]
|
||||
lua_source = node[:lua][:source]
|
||||
lua_path = node[:lua][:path]
|
||||
lua_module_path = node[:lua][:module_path]
|
||||
|
||||
case node['platform']
|
||||
when "ubuntu"
|
||||
package "nginx"
|
||||
|
||||
%w[ build-essential].each do |pkg|
|
||||
package pkg
|
||||
end
|
||||
|
||||
# Lua related packages
|
||||
remote_file File.join("", "tmp", "lua-#{lua_version}.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source lua_source
|
||||
not_if { ::File.exists?(File.join("", "tmp", "lua-#{lua_version}.tar.gz")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "lua-cjson-1.0.3.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:lua][:cjson_source]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "lua-cjson-1.0.3.tar.gz")) }
|
||||
end
|
||||
|
||||
# Nginx related packages
|
||||
remote_file File.join("", "tmp", "nginx-#{nginx_version}.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source nginx_source
|
||||
not_if { ::File.exists?(File.join("", "tmp", "nginx-#{nginx_version}.tar.gz")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "zero_byte_in_cstr_20120315.patch") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:nginx][:patch]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "zero_byte_in_cstr_20120315.patch")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "pcre-8.12.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:nginx][:pcre_source]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "pcre-8.12.tar.gz")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "nginx_upload_module-2.2.0.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:nginx][:module_upload_source]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "nginx_upload_module-2.2.0.tar.g")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "headers-more-v0.15rc3.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:nginx][:module_headers_more_source]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "headers-more-v0.15rc3.tar.gz")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "devel-kit-v0.2.17rc2.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:nginx][:module_devel_kit_source]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "devel-kit-v0.2.17rc2.tar.gz")) }
|
||||
end
|
||||
|
||||
remote_file File.join("", "tmp", "nginx-lua.v0.3.1rc24.tar.gz") do
|
||||
owner node[:deployment][:user]
|
||||
source node[:nginx][:module_lua_source]
|
||||
not_if { ::File.exists?(File.join("", "tmp", "nginx-lua.v0.3.1rc24.tar.gz")) }
|
||||
end
|
||||
|
||||
directory nginx_path do
|
||||
owner node[:deployment][:user]
|
||||
group node[:deployment][:group]
|
||||
mode "0755"
|
||||
recursive true
|
||||
action :create
|
||||
end
|
||||
|
||||
directory lua_path do
|
||||
owner node[:deployment][:user]
|
||||
group node[:deployment][:group]
|
||||
mode "0755"
|
||||
recursive true
|
||||
action :create
|
||||
end
|
||||
|
||||
bash "Install lua" do
|
||||
cwd File.join("", "tmp")
|
||||
user node[:deployment][:user]
|
||||
code <<-EOH
|
||||
tar xzf lua-#{lua_version}.tar.gz
|
||||
cd lua-#{lua_version}
|
||||
make linux install INSTALL_TOP=#{lua_path}
|
||||
EOH
|
||||
not_if do
|
||||
::File.exists?(File.join(lua_path, "bin", "lua"))
|
||||
end
|
||||
end
|
||||
|
||||
bash "Install lua json" do
|
||||
cwd File.join("", "tmp")
|
||||
user node[:deployment][:user]
|
||||
code <<-EOH
|
||||
tar xzf lua-cjson-1.0.3.tar.gz
|
||||
cd lua-cjson-1.0.3
|
||||
sed 's!^PREFIX ?=.*!PREFIX ?='#{lua_path}'!' Makefile > tmp
|
||||
mv tmp Makefile
|
||||
make
|
||||
make install
|
||||
EOH
|
||||
not_if do
|
||||
::File.exists?(File.join(lua_module_path, "cjson.so"))
|
||||
end
|
||||
end
|
||||
|
||||
bash "Install nginx" do
|
||||
cwd File.join("", "tmp")
|
||||
user node[:deployment][:user]
|
||||
code <<-EOH
|
||||
tar xzf nginx-#{nginx_version}.tar.gz
|
||||
tar xzf pcre-8.12.tar.gz
|
||||
tar xzf nginx_upload_module-2.2.0.tar.gz
|
||||
tar xzf headers-more-v0.15rc3.tar.gz
|
||||
tar xzf devel-kit-v0.2.17rc2.tar.gz
|
||||
tar xzf nginx-lua.v0.3.1rc24.tar.gz
|
||||
cd nginx-#{nginx_version}
|
||||
|
||||
patch -p0 < ../zero_byte_in_cstr_20120315.patch
|
||||
|
||||
LUA_LIB=#{lua_path}/lib LUA_INC=#{lua_path}/include ./configure \
|
||||
--prefix=#{nginx_path} \
|
||||
--with-pcre=../pcre-8.12 \
|
||||
--add-module=../nginx_upload_module-2.2.0 \
|
||||
--add-module=../agentzh-headers-more-nginx-module-5fac223 \
|
||||
--add-module=../simpl-ngx_devel_kit-bc97eea \
|
||||
--add-module=../chaoslawful-lua-nginx-module-4d92cb1
|
||||
make
|
||||
make install
|
||||
|
||||
EOH
|
||||
end
|
||||
|
||||
template "nginx.conf" do
|
||||
path File.join(node[:nginx][:dir], "nginx.conf")
|
||||
path File.join(nginx_path, "conf", "nginx.conf")
|
||||
source "ubuntu-nginx.conf.erb"
|
||||
owner "root"
|
||||
group "root"
|
||||
owner node[:deployment][:user]
|
||||
mode 0644
|
||||
end
|
||||
|
||||
template "uls.lua" do
|
||||
path File.join(lua_module_path, "uls.lua")
|
||||
source File.join(node[:lua][:plugin_source_path], "uls.lua")
|
||||
local true
|
||||
owner node[:deployment][:user]
|
||||
mode 0644
|
||||
end
|
||||
|
||||
template "tablesave.lua" do
|
||||
path File.join(lua_module_path, "tablesave.lua")
|
||||
source File.join(node[:lua][:plugin_source_path], "tablesave.lua")
|
||||
local true
|
||||
owner node[:deployment][:user]
|
||||
mode 0644
|
||||
end
|
||||
|
||||
template "nginx" do
|
||||
path File.join("", "etc", "init.d", "nginx")
|
||||
source "nginx.erb"
|
||||
owner node[:deployment][:user]
|
||||
mode 0755
|
||||
end
|
||||
|
||||
bash "Stop running nginx" do
|
||||
code <<-EOH
|
||||
pid=`ps -ef | grep nginx | grep -v grep | awk '{print $2}'`
|
||||
[ ! -z "$pid" ] && sudo kill $pid || true
|
||||
EOH
|
||||
end
|
||||
|
||||
service "nginx" do
|
||||
supports :status => true, :restart => true, :reload => true
|
||||
action [ :enable, :start ]
|
||||
action [ :enable, :restart ]
|
||||
end
|
||||
|
||||
else
|
||||
Chef::Log.error("Installation of nginx packages not supported on this platform.")
|
||||
end
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
#! /bin/sh
|
||||
|
||||
### BEGIN INIT INFO
|
||||
# Provides: nginx
|
||||
# Required-Start: $local_fs $remote_fs $network $syslog
|
||||
# Required-Stop: $local_fs $remote_fs $network $syslog
|
||||
# Default-Start: 2 3 4 5
|
||||
# Default-Stop: 0 1 6
|
||||
# Short-Description: init script for nginx
|
||||
# Description: init script for nginx
|
||||
### END INIT INFO
|
||||
|
||||
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
|
||||
PROG=<%= node[:nginx][:path] %>/sbin/nginx
|
||||
NAME=nginx
|
||||
|
||||
test -x $PROG || exit 0
|
||||
|
||||
set -e
|
||||
|
||||
. /lib/lsb/init-functions
|
||||
|
||||
case "$1" in
|
||||
start)
|
||||
log_action_begin_msg "Starting: $NAME"
|
||||
start-stop-daemon --start --quiet --pidfile /var/run/$NAME.pid --exec $PROG
|
||||
log_action_end_msg $?
|
||||
;;
|
||||
stop)
|
||||
log_action_begin_msg "Stopping: $NAME"
|
||||
start-stop-daemon --stop --quiet --pidfile /var/run/$NAME.pid --exec $PROG || true
|
||||
log_action_end_msg $?
|
||||
;;
|
||||
restart)
|
||||
$0 stop; sleep 1; $0 start
|
||||
;;
|
||||
status)
|
||||
status_of_proc -p /var/run/$NAME.pid "$PROG" $NAME && exit 0 || exit $?
|
||||
;;
|
||||
*)
|
||||
echo "Usage: $NAME {start|stop|restart|status}" >&2
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
exit 0
|
|
@ -1,7 +1,8 @@
|
|||
user www-data;
|
||||
|
||||
user root root;
|
||||
worker_processes 1;
|
||||
|
||||
error_log /var/log/nginx/error.log;
|
||||
error_log <%= node[:nginx][:log_home] %>/nginx_error.log debug;
|
||||
pid /var/run/nginx.pid;
|
||||
|
||||
events {
|
||||
|
@ -11,55 +12,140 @@ events {
|
|||
}
|
||||
|
||||
http {
|
||||
include /etc/nginx/mime.types;
|
||||
access_log /var/log/nginx/access.log;
|
||||
include mime.types;
|
||||
default_type text/html;
|
||||
server_tokens off;
|
||||
|
||||
log_format main '$host - [$time_local] '
|
||||
'"$request" $status $bytes_sent '
|
||||
'"$http_referer" "$http_user_agent" '
|
||||
'$remote_addr response_time:$upstream_response_time';
|
||||
log_format main '$host - [$time_local] '
|
||||
'"$request" $status $bytes_sent '
|
||||
'"$http_referer" "$http_user_agent" '
|
||||
'$remote_addr response_time:$upstream_response_time';
|
||||
|
||||
default_type text/html;
|
||||
access_log <%= node[:nginx][:log_home] %>/nginx_main_access.log main;
|
||||
|
||||
sendfile on;
|
||||
tcp_nopush on;
|
||||
tcp_nodelay on;
|
||||
sendfile on;
|
||||
tcp_nopush on;
|
||||
tcp_nodelay on;
|
||||
|
||||
keepalive_timeout 75 20;
|
||||
keepalive_timeout 75 20;
|
||||
|
||||
gzip on;
|
||||
gzip_min_length 1250;
|
||||
gzip_buffers 16 8k;
|
||||
gzip_comp_level 2;
|
||||
gzip_proxied any;
|
||||
gzip_types text/plain text/css application/javascript application/x-javascript text/xml application/xml application/xml+rss text/javascript;
|
||||
gzip_vary on;
|
||||
gzip_disable "MSIE [1-6]\.(?!.*SV1)";
|
||||
gzip on;
|
||||
gzip_min_length 1250;
|
||||
gzip_buffers 16 8k;
|
||||
gzip_comp_level 2;
|
||||
gzip_proxied any;
|
||||
gzip_types text/plain text/css application/javascript application/x-javascript text/xml application/xml application/xml+rss text/javascript;
|
||||
gzip_vary on;
|
||||
gzip_disable "MSIE [1-6]\.(?!.*SV1)";
|
||||
|
||||
client_max_body_size 256M;
|
||||
client_max_body_size 256M;
|
||||
|
||||
upstream vcap_router {
|
||||
server unix:/tmp/router.sock;
|
||||
upstream router_status {
|
||||
server <%= node[:nginx][:uls_ip] %>:<%= node[:nginx][:uls_port] %>;
|
||||
}
|
||||
|
||||
server {
|
||||
listen 80;
|
||||
server_name _;
|
||||
server_name_in_redirect off;
|
||||
|
||||
#TODO: how to make this internal location totally transparent to outside
|
||||
location = /vcapuls {
|
||||
internal;
|
||||
# We should use rewrite_by_lua to scrub subrequest headers
|
||||
# as uls doesn't care those headers at all.
|
||||
# Given there are some exceptions to clear some headers,
|
||||
# we just leave them as is.
|
||||
|
||||
proxy_pass http://unix:/tmp/router.sock:/;
|
||||
}
|
||||
|
||||
server {
|
||||
listen *:80;
|
||||
server_name _;
|
||||
location / {
|
||||
access_log <%= node[:nginx][:log_home] %>/nginx_access.log main;
|
||||
proxy_buffering off;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_redirect off;
|
||||
proxy_connect_timeout 10;
|
||||
proxy_send_timeout 30;
|
||||
proxy_read_timeout 30;
|
||||
|
||||
access_log <%= node[:nginx][:vcap_log] %> main;
|
||||
server_name_in_redirect off;
|
||||
set $health_monitor '';
|
||||
if ($http_user_agent = "HTTP-Monitor/1.1") {
|
||||
set $health_monitor T;
|
||||
}
|
||||
if ($http_host = "") {
|
||||
set $health_monitor "${health_monitor}T";
|
||||
}
|
||||
if ($health_monitor = TT) {
|
||||
# Trigger a subrequest to sync the latest few stats of the worker to uls,
|
||||
# if we have multiple workers, there will be still few stats not synced for
|
||||
# the workers which don't get this monitor request.
|
||||
access_by_lua '
|
||||
-- add package.path and package.cpath
|
||||
package.path = package.path..";<%= node[:lua][:module_path] %>/?.lua"
|
||||
package.cpath = package.cpath..";<%= node[:lua][:module_path] %>/?.so"
|
||||
local uls = require ("uls")
|
||||
|
||||
location / {
|
||||
proxy_buffering off;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real_IP $remote_addr;
|
||||
proxy_set_header X-Forwarded_For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded_Proto http;
|
||||
proxy_redirect off;
|
||||
proxy_connect_timeout 10;
|
||||
proxy_send_timeout 300;
|
||||
proxy_read_timeout 300;
|
||||
proxy_pass http://vcap_router;
|
||||
}
|
||||
ngx.log(ngx.DEBUG, "monitor trigger stats syncup")
|
||||
|
||||
local req = uls.generate_stats_request()
|
||||
|
||||
-- generate one subrequest to uls to update stats
|
||||
ngx.location.capture(
|
||||
"/vcapuls", { body = req }
|
||||
)
|
||||
';
|
||||
|
||||
more_set_input_headers "Authorization: Basic <%= Base64.encode64("#{node[:nginx][:status_user]}:#{node[:nginx][:status_passwd]}").strip %>";
|
||||
rewrite ^.*$ /healthz break;
|
||||
proxy_pass http://router_status;
|
||||
}
|
||||
|
||||
# We intend to have one "if" block to avoid the above monitor location
|
||||
# to twist with below upstream locator server handling.
|
||||
# ("if" block effectively creates a nested location and will inherit
|
||||
# all the rewrite/access phase handlers of outer location)
|
||||
if ($health_monitor != TT) {
|
||||
# The following variables are used by lua module code.
|
||||
# DO NOT remove or rename any of them!
|
||||
set $backend_addr ''; # Backend server address returned from uls for this request
|
||||
set $uls_req_tags ''; # Request tags returned from uls for this request to catalog statistics
|
||||
set $router_ip '';
|
||||
set $timestamp 0;
|
||||
set $trace '';
|
||||
set $sticky '';
|
||||
|
||||
access_by_lua '
|
||||
-- add package.path and package.cpath
|
||||
package.path = package.path..";<%= node[:lua][:module_path] %>/?.lua"
|
||||
package.cpath = package.cpath..";<%= node[:lua][:module_path] %>/?.so"
|
||||
local uls = require ("uls")
|
||||
|
||||
uls.pre_process_subrequest(ngx, "<%= node[:router][:trace_key] %>")
|
||||
|
||||
local req = uls.generate_uls_request(ngx)
|
||||
|
||||
-- generate one subrequest to uls for querying
|
||||
local res = ngx.location.capture(
|
||||
"/vcapuls", { body = req }
|
||||
)
|
||||
|
||||
uls.post_process_subrequest(ngx, res)
|
||||
';
|
||||
|
||||
proxy_pass http://$backend_addr;
|
||||
|
||||
# Handling response from backend servers
|
||||
header_filter_by_lua '
|
||||
-- add package.path and package.cpath
|
||||
package.path = package.path..";<%= node[:lua][:module_path] %>/?.lua"
|
||||
package.cpath = package.cpath..";<%= node[:lua][:module_path] %>/?.so"
|
||||
local uls = require ("uls")
|
||||
|
||||
uls.post_process_response(ngx)
|
||||
';
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,4 +82,3 @@ end
|
|||
class Chef::Recipe
|
||||
include RubyInstall
|
||||
end
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ gem 'eventmachine'
|
|||
|
||||
gem "http_parser.rb", :require => "http/parser"
|
||||
gem "yajl-ruby", :require => ["yajl", "yajl/json_gem"]
|
||||
gem "sinatra"
|
||||
|
||||
gem 'vcap_common', '~> 1.0.9'
|
||||
gem 'vcap_logging', :require => ['vcap/logging']
|
||||
|
|
|
@ -8,14 +8,16 @@ GEM
|
|||
diff-lcs (1.1.2)
|
||||
eventmachine (0.12.11.cloudfoundry.3)
|
||||
http_parser.rb (0.5.1)
|
||||
json_pure (1.6.5)
|
||||
nats (0.4.22.beta.8)
|
||||
json_pure (1.6.6)
|
||||
nats (0.4.22)
|
||||
daemons (>= 1.1.4)
|
||||
eventmachine (>= 0.12.10)
|
||||
json_pure (>= 1.6.1)
|
||||
thin (>= 1.3.1)
|
||||
posix-spawn (0.3.6)
|
||||
rack (1.4.1)
|
||||
rack-protection (1.2.0)
|
||||
rack
|
||||
rake (0.8.7)
|
||||
rcov (0.9.9)
|
||||
rspec (2.5.0)
|
||||
|
@ -26,11 +28,16 @@ GEM
|
|||
rspec-expectations (2.5.0)
|
||||
diff-lcs (~> 1.1.2)
|
||||
rspec-mocks (2.5.0)
|
||||
sinatra (1.3.2)
|
||||
rack (~> 1.3, >= 1.3.6)
|
||||
rack-protection (~> 1.2)
|
||||
tilt (~> 1.3, >= 1.3.3)
|
||||
thin (1.3.1)
|
||||
daemons (>= 1.0.9)
|
||||
eventmachine (>= 0.12.6)
|
||||
rack (>= 1.0.0)
|
||||
vcap_common (1.0.9)
|
||||
tilt (1.3.3)
|
||||
vcap_common (1.0.10)
|
||||
eventmachine (~> 0.12.11.cloudfoundry.3)
|
||||
nats (~> 0.4.22.beta.8)
|
||||
posix-spawn (~> 0.3.6)
|
||||
|
@ -52,6 +59,7 @@ DEPENDENCIES
|
|||
rake
|
||||
rcov
|
||||
rspec
|
||||
sinatra
|
||||
vcap_common (~> 1.0.9)
|
||||
vcap_logging
|
||||
yajl-ruby
|
||||
|
|
|
@ -8,9 +8,12 @@ end
|
|||
|
||||
reports_dir = File.expand_path(File.join(File.dirname(__FILE__), "spec_reports"))
|
||||
|
||||
ENV['CI_REPORTS'] = reports_dir
|
||||
|
||||
RSpec::Core::RakeTask.new do |t|
|
||||
t.pattern = "spec/**/*_spec.rb"
|
||||
t.rspec_opts = ["--format", "documentation", "--colour"]
|
||||
task "test" do |t|
|
||||
sh("cd spec && rake test")
|
||||
end
|
||||
|
||||
task "spec" do |t|
|
||||
sh("cd spec && rake spec")
|
||||
end
|
||||
|
||||
ENV['CI_REPORTS'] = reports_dir
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
--[[
|
||||
Save Table to File/Stringtable
|
||||
Load Table from File/Stringtable
|
||||
v 0.94
|
||||
|
||||
Lua 5.1 compatible
|
||||
|
||||
Userdata and indices of these are not saved
|
||||
Functions are saved via string.dump, so make sure it has no upvalues
|
||||
References are saved
|
||||
|
||||
table.save( table [, filename] )
|
||||
|
||||
Saves a table so it can be called via the table.load function again
|
||||
table must a object of type 'table'
|
||||
filename is optional, and may be a string representing a filename or true/1
|
||||
|
||||
table.save( table )
|
||||
on success: returns a string representing the table (stringtable)
|
||||
(uses a string as buffer, ideal for smaller tables)
|
||||
table.save( table, true or 1 )
|
||||
on success: returns a string representing the table (stringtable)
|
||||
(uses io.tmpfile() as buffer, ideal for bigger tables)
|
||||
table.save( table, "filename" )
|
||||
on success: returns 1
|
||||
(saves the table to file "filename")
|
||||
on failure: returns as second argument an error msg
|
||||
----------------------------------------------------
|
||||
table.load( filename or stringtable )
|
||||
|
||||
Loads a table that has been saved via the table.save function
|
||||
|
||||
on success: returns a previously saved table
|
||||
on failure: returns as second argument an error msg
|
||||
----------------------------------------------------
|
||||
|
||||
chillcode, http://lua-users.org/wiki/SaveTableToFile
|
||||
Licensed under the same terms as Lua itself.
|
||||
]]--
|
||||
do
|
||||
-- declare local variables
|
||||
--// exportstring( string )
|
||||
--// returns a "Lua" portable version of the string
|
||||
local function exportstring( s )
|
||||
s = string.format( "%q",s )
|
||||
-- to replace
|
||||
s = string.gsub( s,"\\\n","\\n" )
|
||||
s = string.gsub( s,"\r","\\r" )
|
||||
s = string.gsub( s,string.char(26),"\"..string.char(26)..\"" )
|
||||
return s
|
||||
end
|
||||
--// The Save Function
|
||||
function table.save( tbl,filename )
|
||||
local charS,charE = " ","\n"
|
||||
local file,err
|
||||
-- create a pseudo file that writes to a string and return the string
|
||||
if not filename then
|
||||
file = { write = function( self,newstr ) self.str = self.str..newstr end, str = "" }
|
||||
charS,charE = "",""
|
||||
-- write table to tmpfile
|
||||
elseif filename == true or filename == 1 then
|
||||
charS,charE,file = "","",io.tmpfile()
|
||||
-- write table to file
|
||||
-- use io.open here rather than io.output, since in windows when clicking on a file opened with io.output will create an error
|
||||
else
|
||||
file,err = io.open( filename, "w" )
|
||||
if err then return _,err end
|
||||
end
|
||||
-- initiate variables for save procedure
|
||||
local tables,lookup = { tbl },{ [tbl] = 1 }
|
||||
file:write( "return {"..charE )
|
||||
for idx,t in ipairs( tables ) do
|
||||
if filename and filename ~= true and filename ~= 1 then
|
||||
file:write( "-- Table: {"..idx.."}"..charE )
|
||||
end
|
||||
file:write( "{"..charE )
|
||||
local thandled = {}
|
||||
for i,v in ipairs( t ) do
|
||||
thandled[i] = true
|
||||
-- escape functions and userdata
|
||||
if type( v ) ~= "userdata" then
|
||||
-- only handle value
|
||||
if type( v ) == "table" then
|
||||
if not lookup[v] then
|
||||
table.insert( tables, v )
|
||||
lookup[v] = #tables
|
||||
end
|
||||
file:write( charS.."{"..lookup[v].."},"..charE )
|
||||
elseif type( v ) == "function" then
|
||||
file:write( charS.."loadstring("..exportstring(string.dump( v )).."),"..charE )
|
||||
else
|
||||
local value = ( type( v ) == "string" and exportstring( v ) ) or tostring( v )
|
||||
file:write( charS..value..","..charE )
|
||||
end
|
||||
end
|
||||
end
|
||||
for i,v in pairs( t ) do
|
||||
-- escape functions and userdata
|
||||
if (not thandled[i]) and type( v ) ~= "userdata" then
|
||||
-- handle index
|
||||
if type( i ) == "table" then
|
||||
if not lookup[i] then
|
||||
table.insert( tables,i )
|
||||
lookup[i] = #tables
|
||||
end
|
||||
file:write( charS.."[{"..lookup[i].."}]=" )
|
||||
else
|
||||
local index = ( type( i ) == "string" and "["..exportstring( i ).."]" ) or string.format( "[%d]",i )
|
||||
file:write( charS..index.."=" )
|
||||
end
|
||||
-- handle value
|
||||
if type( v ) == "table" then
|
||||
if not lookup[v] then
|
||||
table.insert( tables,v )
|
||||
lookup[v] = #tables
|
||||
end
|
||||
file:write( "{"..lookup[v].."},"..charE )
|
||||
elseif type( v ) == "function" then
|
||||
file:write( "loadstring("..exportstring(string.dump( v )).."),"..charE )
|
||||
else
|
||||
local value = ( type( v ) == "string" and exportstring( v ) ) or tostring( v )
|
||||
file:write( value..","..charE )
|
||||
end
|
||||
end
|
||||
end
|
||||
file:write( "},"..charE )
|
||||
end
|
||||
file:write( "}" )
|
||||
-- Return Values
|
||||
-- return stringtable from string
|
||||
if not filename then
|
||||
-- set marker for stringtable
|
||||
return file.str.."--|"
|
||||
-- return stringttable from file
|
||||
elseif filename == true or filename == 1 then
|
||||
file:seek ( "set" )
|
||||
-- no need to close file, it gets closed and removed automatically
|
||||
-- set marker for stringtable
|
||||
return file:read( "*a" ).."--|"
|
||||
-- close file and return 1
|
||||
else
|
||||
file:close()
|
||||
return 1
|
||||
end
|
||||
end
|
||||
|
||||
--// The Load Function
|
||||
function table.load( sfile )
|
||||
-- catch marker for stringtable
|
||||
if string.sub( sfile,-3,-1 ) == "--|" then
|
||||
tables,err = loadstring( sfile )
|
||||
else
|
||||
tables,err = loadfile( sfile )
|
||||
end
|
||||
if err then return _,err
|
||||
end
|
||||
tables = tables()
|
||||
for idx = 1,#tables do
|
||||
local tolinkv,tolinki = {},{}
|
||||
for i,v in pairs( tables[idx] ) do
|
||||
if type( v ) == "table" and tables[v[1]] then
|
||||
table.insert( tolinkv,{ i,tables[v[1]] } )
|
||||
end
|
||||
if type( i ) == "table" and tables[i[1]] then
|
||||
table.insert( tolinki,{ i,tables[i[1]] } )
|
||||
end
|
||||
end
|
||||
-- link values, first due to possible changes of indices
|
||||
for _,v in ipairs( tolinkv ) do
|
||||
tables[idx][v[1]] = v[2]
|
||||
end
|
||||
-- link indices
|
||||
for _,v in ipairs( tolinki ) do
|
||||
tables[idx][v[2]],tables[idx][v[1]] = tables[idx][v[1]],nil
|
||||
end
|
||||
end
|
||||
return tables[1]
|
||||
end
|
||||
-- close do
|
||||
end
|
|
@ -0,0 +1,253 @@
|
|||
--------------------------------------------------------------------------------
|
||||
-- Title: uls.lua
|
||||
-- Description: Helper for nginx talking to uls(Upstream Locator Server)
|
||||
-- Legal: Copyright (c) 2011 VMware, Inc.
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
-- import dependencies
|
||||
local cjson = require("cjson")
|
||||
require("tablesave")
|
||||
|
||||
module("uls", package.seeall)
|
||||
_VERSION = '1.0'
|
||||
|
||||
VCAP_SESSION_ID = "__VCAP_ID__"
|
||||
VCAP_COOKIE = "__VCAP_ID__=([^;]+)"
|
||||
SET_COOKIE_HEADER = "Set-Cookie"
|
||||
STICKY_SESSIONS = "JSESSIONID"
|
||||
COOKIE_HEADER = "Cookie"
|
||||
HOST_HEADER = "Host"
|
||||
VCAP_BACKEND_HEADER = "X-Vcap-Backend"
|
||||
VCAP_ROUTER_HEADER = "X-Vcap-Router"
|
||||
VCAP_TRACE_HEADER = "X-Vcap-Trace"
|
||||
|
||||
-- From nginx to uls
|
||||
ULS_HOST_QUERY = "host"
|
||||
ULS_STATS_UPDATE = "stats"
|
||||
ULS_STATS_LATENCY = "response_latency"
|
||||
ULS_STATS_SAMPLES = "response_samples"
|
||||
ULS_STATS_CODES = "response_codes"
|
||||
ULS_STICKY_SESSION = "sticky_session"
|
||||
-- For both diretion
|
||||
-- When ULS_BACKEND_ADDR sent from nginx to uls, it means sticky address
|
||||
ULS_BACKEND_ADDR = "backend_addr"
|
||||
ULS_REQEST_TAGS = "request_tags"
|
||||
ULS_ROUTER_IP = "router_ip"
|
||||
|
||||
--[[
|
||||
Message between nginx and uls (as http body)
|
||||
nginx -> uls
|
||||
{
|
||||
"host": api.vcap.me,
|
||||
"backend_addr": 10.117.9.178:9022,
|
||||
"stats": [
|
||||
{
|
||||
"request_tags": xxx,
|
||||
"response_latency": xxx,
|
||||
"response_samples": xxx,
|
||||
"response_codes": {
|
||||
{"responses_xxx":xxx},
|
||||
{"responses_2xx":xxx}
|
||||
}
|
||||
},
|
||||
{
|
||||
"request_tags": xxx,
|
||||
"response_latency": xxx,
|
||||
"response_samples": xxx,
|
||||
"response_codes": {
|
||||
{"responses_xxx":xxx},
|
||||
{"responses_2xx":xxx}
|
||||
{"responses_5xx":xxx},
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
nginx <- uls
|
||||
{
|
||||
"backend_addr": xxx,
|
||||
"request_tags": xxx,
|
||||
"router_ip": xxx
|
||||
}
|
||||
--]]
|
||||
|
||||
-- Per nginx worker global variables
|
||||
-- We don't need any lock as nginx callback in a single thread
|
||||
stats_not_synced = {}
|
||||
request_num = 0
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- Utilities
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
-- Retrieve ip:port if the input cookies have vcap cookie
|
||||
function retrieve_vcap_sticky_session(cookies)
|
||||
if not cookies then return nil end
|
||||
if type(cookies) ~= "table" then cookies = {cookies} end
|
||||
|
||||
for _, val in ipairs(cookies) do
|
||||
local i, j = string.find(val, VCAP_COOKIE)
|
||||
if i then
|
||||
assert(i + string.len(VCAP_SESSION_ID) + 1 < j)
|
||||
local sticky = string.sub(val, i + string.len(VCAP_SESSION_ID) + 1, j)
|
||||
return sticky
|
||||
end
|
||||
end
|
||||
return nil
|
||||
end
|
||||
|
||||
-- Save per request stats into per worker store
|
||||
function vcap_store_stats(req_tags, response_code, latency)
|
||||
|
||||
request_num = request_num + 1
|
||||
local stats = stats_not_synced[req_tags]
|
||||
if not stats then
|
||||
stats = {[ULS_STATS_CODES] = {},
|
||||
[ULS_STATS_LATENCY] = 0,
|
||||
[ULS_STATS_SAMPLES] = 0}
|
||||
|
||||
stats_not_synced[req_tags] = stats
|
||||
end
|
||||
|
||||
local response_code_metric = "responses_xxx"
|
||||
if response_code >= 200 and response_code < 300 then
|
||||
response_code_metric = "responses_2xx"
|
||||
elseif response_code >= 300 and response_code < 400 then
|
||||
response_code_metric = "responses_3xx"
|
||||
elseif response_code >= 400 and response_code < 500 then
|
||||
response_code_metric = "responses_4xx"
|
||||
elseif response_code >= 500 and response_code < 600 then
|
||||
response_code_metric = "responses_5xx"
|
||||
end
|
||||
|
||||
if not stats[ULS_STATS_CODES][response_code_metric] then
|
||||
stats[ULS_STATS_CODES][response_code_metric] = 1
|
||||
else
|
||||
stats[ULS_STATS_CODES][response_code_metric] =
|
||||
stats[ULS_STATS_CODES][response_code_metric] + 1
|
||||
end
|
||||
|
||||
local t = stats[ULS_STATS_LATENCY] * stats[ULS_STATS_SAMPLES] + latency
|
||||
stats[ULS_STATS_SAMPLES] = stats[ULS_STATS_SAMPLES] + 1
|
||||
stats[ULS_STATS_LATENCY] = t / stats[ULS_STATS_SAMPLES]
|
||||
|
||||
end
|
||||
|
||||
-- Assemble saved stats to return to the caller, then cleanup
|
||||
function serialize_request_statistics()
|
||||
if request_num == 0 then return nil end
|
||||
|
||||
local stats = {}
|
||||
for k, v in pairs(stats_not_synced) do
|
||||
table.insert(stats, {[ULS_REQEST_TAGS] = k,
|
||||
[ULS_STATS_LATENCY] = v[ULS_STATS_LATENCY],
|
||||
[ULS_STATS_SAMPLES] = v[ULS_STATS_SAMPLES],
|
||||
[ULS_STATS_CODES] = v[ULS_STATS_CODES]})
|
||||
end
|
||||
|
||||
-- clean stats
|
||||
request_num = 0
|
||||
stats_not_synced = {}
|
||||
return stats
|
||||
end
|
||||
|
||||
function vcap_handle_cookies(ngx)
|
||||
local cookies = ngx.header.set_cookie
|
||||
if not cookies then return end
|
||||
|
||||
if type(cookies) ~= "table" then cookies = {cookies} end
|
||||
local sticky = false
|
||||
for _, val in ipairs(cookies) do
|
||||
local i, j = string.find(val:upper(), STICKY_SESSIONS)
|
||||
if i then
|
||||
sticky = true
|
||||
break
|
||||
end
|
||||
end
|
||||
if not sticky then return end
|
||||
|
||||
local vcap_cookie = VCAP_SESSION_ID.."="..ngx.var.sticky
|
||||
|
||||
ngx.log(ngx.DEBUG, "generate cookie:"..vcap_cookie.." for resp from:"..
|
||||
ngx.var.backend_addr)
|
||||
table.insert(cookies, vcap_cookie)
|
||||
-- ngx.header.set_cookie incorrectly makes header to "set-cookie",
|
||||
-- so workaround to set "Set-Cookie" directly
|
||||
-- ngx.header.set_cookie = cookies
|
||||
ngx.header["Set-Cookie"] = cookies
|
||||
end
|
||||
|
||||
function vcap_add_trace_header(backend_addr, router_ip)
|
||||
ngx.header[VCAP_BACKEND_HEADER] = backend_addr
|
||||
ngx.header[VCAP_ROUTER_HEADER] = router_ip
|
||||
end
|
||||
|
||||
function generate_stats_request()
|
||||
local uls_req_spec = {}
|
||||
local req_stats = uls.serialize_request_statistics()
|
||||
if req_stats then
|
||||
uls_req_spec[ULS_STATS_UPDATE] = req_stats
|
||||
end
|
||||
return cjson.encode(uls_req_spec)
|
||||
end
|
||||
|
||||
function pre_process_subrequest(ngx, trace_key)
|
||||
ngx.var.timestamp = ngx.time()
|
||||
|
||||
if string.len(ngx.var.http_host) == 0 then
|
||||
ngx.exit(ngx.HTTP_BAD_REQUEST)
|
||||
end
|
||||
|
||||
if ngx.req.get_headers()[VCAP_TRACE_HEADER] == trace_key then
|
||||
ngx.var.trace = "Y"
|
||||
end
|
||||
end
|
||||
|
||||
function generate_uls_request(ngx)
|
||||
local uls_req_spec = {}
|
||||
|
||||
-- add host in request
|
||||
uls_req_spec[uls.ULS_HOST_QUERY] = ngx.var.http_host
|
||||
|
||||
-- add sticky session in request
|
||||
local uls_sticky_session = retrieve_vcap_sticky_session(
|
||||
ngx.req.get_headers()[COOKIE_HEADER])
|
||||
if uls_sticky_session then
|
||||
uls_req_spec[ULS_STICKY_SESSION] = uls_sticky_session
|
||||
ngx.log(ngx.DEBUG, "req sticks to backend session:"..uls_sticky_session)
|
||||
end
|
||||
|
||||
-- add status update in request
|
||||
local req_stats = uls.serialize_request_statistics()
|
||||
if req_stats then
|
||||
uls_req_spec[ULS_STATS_UPDATE] = req_stats
|
||||
end
|
||||
|
||||
return cjson.encode(uls_req_spec)
|
||||
end
|
||||
|
||||
function post_process_subrequest(ngx, res)
|
||||
if res.status ~= 200 then
|
||||
ngx.exit(ngx.HTTP_NOT_FOUND)
|
||||
end
|
||||
|
||||
local msg = cjson.decode(res.body)
|
||||
ngx.var.backend_addr = msg[ULS_BACKEND_ADDR]
|
||||
ngx.var.uls_req_tags = msg[ULS_REQEST_TAGS]
|
||||
ngx.var.router_ip = msg[ULS_ROUTER_IP]
|
||||
ngx.var.sticky = msg[ULS_STICKY_SESSION]
|
||||
|
||||
ngx.log(ngx.DEBUG, "route "..ngx.var.http_host.." to "..ngx.var.backend_addr)
|
||||
end
|
||||
|
||||
function post_process_response(ngx)
|
||||
local latency = ( ngx.time() - ngx.var.timestamp ) * 1000
|
||||
vcap_store_stats(ngx.var.uls_req_tags, ngx.status, latency)
|
||||
|
||||
if ngx.var.trace == "Y" then
|
||||
vcap_add_trace_header(ngx.var.backend_addr, ngx.var.router_ip)
|
||||
end
|
||||
|
||||
vcap_handle_cookies(ngx)
|
||||
end
|
||||
|
|
@ -20,8 +20,7 @@ $:.unshift(File.dirname(__FILE__))
|
|||
|
||||
require 'router/const'
|
||||
require 'router/router'
|
||||
require 'router/app_connection'
|
||||
require 'router/client_connection'
|
||||
require 'router/router_uls_server'
|
||||
require 'router/utils'
|
||||
|
||||
config_path = ENV["CLOUD_FOUNDRY_CONFIG_PATH"] || File.join(File.dirname(__FILE__), '../config')
|
||||
|
@ -112,8 +111,8 @@ EM.run do
|
|||
|
||||
begin
|
||||
# TCP/IP Socket
|
||||
Router.server = EM.start_server(inet, port, ClientConnection, false) if inet && port
|
||||
Router.local_server = EM.start_server(fn, nil, ClientConnection, true) if fn
|
||||
Router.server = Thin::Server.start(inet, port, RouterULSServer) if inet && port
|
||||
Router.local_server = Thin::Server.start(fn, RouterULSServer) if fn
|
||||
rescue => e
|
||||
Router.log.fatal "Problem starting server, #{e}"
|
||||
exit
|
||||
|
@ -149,6 +148,7 @@ EM.run do
|
|||
|
||||
# Setup some of our varzs..
|
||||
VCAP::Component.varz[:requests] = 0
|
||||
VCAP::Component.varz[:bad_requests] = 0
|
||||
VCAP::Component.varz[:latency] = VCAP::RollingMetric.new(60)
|
||||
VCAP::Component.varz[:responses_2xx] = 0
|
||||
VCAP::Component.varz[:responses_3xx] = 0
|
||||
|
@ -183,4 +183,3 @@ EM.run do
|
|||
end
|
||||
|
||||
end
|
||||
|
||||
|
|
|
@ -1,200 +0,0 @@
|
|||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
module AppConnection
|
||||
|
||||
attr_reader :outstanding_requests
|
||||
|
||||
def initialize(client, request, droplet)
|
||||
Router.log.debug "Creating AppConnection"
|
||||
@client, @request, @droplet = client, request, droplet
|
||||
@start_time = Time.now
|
||||
@connected = false
|
||||
@outstanding_requests = 1
|
||||
Router.outstanding_request_count += 1
|
||||
end
|
||||
|
||||
def post_init
|
||||
VCAP::Component.varz[:app_connections] = Router.app_connection_count += 1
|
||||
Router.log.debug "Completed AppConnection"
|
||||
Router.log.debug Router.connection_stats
|
||||
Router.log.debug "------------"
|
||||
end
|
||||
|
||||
def connection_completed
|
||||
@connected = true
|
||||
#proxy_incoming_to(@client) if @client
|
||||
send_data(@request) if @client && @request
|
||||
end
|
||||
|
||||
# queue data until connection completed.
|
||||
def deliver_data(data)
|
||||
return send_data(data) if @connected
|
||||
@request << data
|
||||
end
|
||||
|
||||
# We have the HTTP Headers complete from the client
|
||||
def on_headers_complete(headers)
|
||||
check_sticky_session = STICKY_SESSIONS =~ headers[SET_COOKIE_HEADER]
|
||||
sent_session_cookie = false # Only send one in case of multiple hits
|
||||
|
||||
header_lines = @headers.split("\r\n")
|
||||
header_lines.each do |line|
|
||||
@client.send_data(line)
|
||||
@client.send_data(CR_LF)
|
||||
if (check_sticky_session && !sent_session_cookie && STICKY_SESSIONS =~ line)
|
||||
sid = Router.generate_session_cookie(@droplet)
|
||||
scl = line.sub(/\S+\s*=\s*\w+/, "#{VCAP_SESSION_ID}=#{sid}")
|
||||
sent_session_cookie = true
|
||||
@client.send_data(scl)
|
||||
@client.send_data(CR_LF)
|
||||
end
|
||||
end
|
||||
# Trace if properly requested
|
||||
if @client.trace
|
||||
router_trace = "#{VCAP_ROUTER_HEADER}:#{Router.inet}#{CR_LF}"
|
||||
be_trace = "#{VCAP_BACKEND_HEADER}:#{@droplet[:host]}:#{@droplet[:port]}#{CR_LF}"
|
||||
@client.send_data(router_trace)
|
||||
@client.send_data(be_trace)
|
||||
end
|
||||
# Ending CR_LF
|
||||
@client.send_data(CR_LF)
|
||||
end
|
||||
|
||||
def process_response_body_chunk(data)
|
||||
return unless data and data.bytesize > 0
|
||||
|
||||
# Let parser process as well to properly determine end of message.
|
||||
# TODO: Once EM 1.0, add in optional bytsize proxy if Content-Length is present.
|
||||
psize = @parser << data
|
||||
if (psize == data.bytesize)
|
||||
@client.send_data(data)
|
||||
else
|
||||
Router.log.info "Pipelined response detected!"
|
||||
# We have a pipelined response, we need to hand over the new headers and only send the proper
|
||||
# body segments to the backend
|
||||
body = data.slice!(0, psize)
|
||||
@client.send_data(body)
|
||||
receive_data(data)
|
||||
end
|
||||
end
|
||||
|
||||
def record_stats
|
||||
return unless @parser
|
||||
|
||||
latency = ((Time.now - @start_time) * 1000).to_i
|
||||
response_code = @parser.status_code
|
||||
response_code_metric = :responses_xxx
|
||||
if (200..299).include?(response_code)
|
||||
response_code_metric = :responses_2xx
|
||||
elsif (300..399).include?(response_code)
|
||||
response_code_metric = :responses_3xx
|
||||
elsif (400..499).include?(response_code)
|
||||
response_code_metric = :responses_4xx
|
||||
elsif (500..599).include?(response_code)
|
||||
response_code_metric = :responses_5xx
|
||||
end
|
||||
|
||||
VCAP::Component.varz[response_code_metric] += 1
|
||||
VCAP::Component.varz[:latency] << latency
|
||||
|
||||
if @droplet[:tags]
|
||||
@droplet[:tags].each do |key, value|
|
||||
tag_metrics = VCAP::Component.varz[:tags][key][value]
|
||||
tag_metrics[response_code_metric] += 1
|
||||
tag_metrics[:latency] << latency
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def on_message_complete
|
||||
record_stats
|
||||
@parser = nil
|
||||
@outstanding_requests -= 1
|
||||
Router.outstanding_request_count -= 1
|
||||
:stop
|
||||
end
|
||||
|
||||
def cant_be_recycled?
|
||||
error? || @parser != nil || @connected == false || @outstanding_requests > 0
|
||||
end
|
||||
|
||||
def recycle
|
||||
stop_proxying
|
||||
@client = @request = @headers = nil
|
||||
end
|
||||
|
||||
def receive_data(data)
|
||||
# Parser is created after headers have been received and processed.
|
||||
# If it exists we are continuing the processing of body fragments.
|
||||
# Allow the parser to process to signal proper end of message, e.g. chunked, etc
|
||||
return process_response_body_chunk(data) if @parser
|
||||
|
||||
# We are awaiting headers here.
|
||||
# We buffer them if needed to determine the header/body boundary correctly.
|
||||
@buf = @buf ? @buf << data : data
|
||||
if hindex = @buf.index(HTTP_HEADERS_END) # all http headers received, figure out where to route to..
|
||||
@parser = Http::Parser.new(self)
|
||||
|
||||
# split headers and rest of body out here.
|
||||
@headers = @buf.slice!(0...hindex+HTTP_HEADERS_END_SIZE)
|
||||
|
||||
# Process headers
|
||||
@parser << @headers
|
||||
|
||||
# Process left over body fragment if any
|
||||
process_response_body_chunk(@buf) if @parser
|
||||
@buf = @headers = nil
|
||||
end
|
||||
|
||||
rescue Http::Parser::Error => e
|
||||
Router.log.debug "HTTP Parser error on response: #{e}"
|
||||
close_connection
|
||||
end
|
||||
|
||||
def rebind(client, request)
|
||||
@start_time = Time.now
|
||||
@client = client
|
||||
reuse(request)
|
||||
end
|
||||
|
||||
def reuse(new_request)
|
||||
@request = new_request
|
||||
@outstanding_requests += 1
|
||||
Router.outstanding_request_count += 1
|
||||
deliver_data(@request)
|
||||
end
|
||||
|
||||
def proxy_target_unbound
|
||||
Router.log.debug "Proxy connection dropped"
|
||||
#close_connection_after_writing
|
||||
end
|
||||
|
||||
def unbind
|
||||
Router.outstanding_request_count -= @outstanding_requests
|
||||
unless @connected
|
||||
Router.log.info "Could not connect to backend for url:#{@droplet[:url]} @ #{@droplet[:host]}:#{@droplet[:port]}"
|
||||
if @client
|
||||
@client.send_data(Router.notfound_redirect || ERROR_404_RESPONSE)
|
||||
@client.close_connection_after_writing
|
||||
end
|
||||
# TODO(dlc) fix - We will unregister bad backends here, should retry the request if possible.
|
||||
Router.unregister_droplet(@droplet[:url], @droplet[:host], @droplet[:port])
|
||||
end
|
||||
|
||||
VCAP::Component.varz[:app_connections] = Router.app_connection_count -= 1
|
||||
Router.log.debug 'Unbinding AppConnection'
|
||||
Router.log.debug Router.connection_stats
|
||||
Router.log.debug "------------"
|
||||
|
||||
# Remove ourselves from the connection pool
|
||||
@droplet[:connections].delete(self)
|
||||
|
||||
@client.close_connection_after_writing if @client
|
||||
end
|
||||
|
||||
def terminate
|
||||
stop_proxying
|
||||
close_connection
|
||||
on_message_complete if @outstanding_requests > 0
|
||||
end
|
||||
|
||||
end
|
|
@ -1,212 +0,0 @@
|
|||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
module ClientConnection
|
||||
|
||||
HTTP_11 ='11'.freeze
|
||||
|
||||
attr_reader :close_connection_after_request, :trace
|
||||
|
||||
def initialize(is_unix_socket)
|
||||
Router.log.debug "Created Client Connection"
|
||||
@droplet, @bound_app_conn = nil, nil, nil
|
||||
# Default to be on the safe side
|
||||
@close_connection_after_request = true
|
||||
@is_unix_socket = is_unix_socket
|
||||
end
|
||||
|
||||
def post_init
|
||||
VCAP::Component.varz[:client_connections] = Router.client_connection_count += 1
|
||||
Router.log.debug Router.connection_stats
|
||||
Router.log.debug "------------"
|
||||
self.comm_inactivity_timeout = Router.client_inactivity_timeout
|
||||
end
|
||||
|
||||
def recycle_app_conn
|
||||
return if (!@droplet || !@bound_app_conn) # Could we leak @bound_app_conn here?
|
||||
|
||||
# Don't recycle if we are not supposed to..
|
||||
if @close_connection_after_request
|
||||
Router.log.debug "NOT placing bound AppConnection back into free list because a close was requested.."
|
||||
@bound_app_conn.close_connection_after_writing
|
||||
return
|
||||
end
|
||||
|
||||
# Place any bound connections back into the droplets connection pool.
|
||||
# This happens on client connection reuse, HTTP 1.1.
|
||||
# Check for errors, overcommits, etc..
|
||||
|
||||
if (@bound_app_conn.cant_be_recycled?)
|
||||
Router.log.debug "NOT placing AppConnection back into free list, can't be recycled"
|
||||
@bound_app_conn.close_connection_after_writing
|
||||
return
|
||||
end
|
||||
|
||||
if @droplet[:connections].index(@bound_app_conn)
|
||||
Router.log.debug "NOT placing AppConnection back into free list, already exists in free list.."
|
||||
return
|
||||
end
|
||||
|
||||
if @droplet[:connections].size >= MAX_POOL
|
||||
Router.log.debug "NOT placing AppConnection back into free list, MAX_POOL connections already exist.."
|
||||
@bound_app_conn.close_connection_after_writing
|
||||
return
|
||||
end
|
||||
|
||||
Router.log.debug "Placing bound AppConnection back into free list.."
|
||||
@bound_app_conn.recycle
|
||||
@droplet[:connections].push(@bound_app_conn)
|
||||
@bound_app_conn = nil
|
||||
end
|
||||
|
||||
def terminate_app_conn
|
||||
return unless @bound_app_conn
|
||||
Router.log.debug "Terminating AppConnection"
|
||||
@bound_app_conn.terminate
|
||||
@bound_app_conn = nil
|
||||
end
|
||||
|
||||
def process_request_body_chunk(data)
|
||||
return unless data
|
||||
if (@droplet && @bound_app_conn && !@bound_app_conn.error?)
|
||||
|
||||
# Let parser process as well to properly determine end of message.
|
||||
psize = @parser << data
|
||||
|
||||
if (psize != data.bytesize)
|
||||
Router.log.info "Pipelined request detected!"
|
||||
# We have a pipelined request, we need to hand over the new headers and only send the proper
|
||||
# body segments to the backend
|
||||
body = data.slice!(0, psize)
|
||||
@bound_app_conn.deliver_data(body)
|
||||
receive_data(data)
|
||||
else
|
||||
@bound_app_conn.deliver_data(data)
|
||||
end
|
||||
else # We do not have a backend droplet anymore
|
||||
Router.log.info "Backend connection dropped"
|
||||
terminate_app_conn
|
||||
close_connection # Should we retry here?
|
||||
end
|
||||
end
|
||||
|
||||
# We have the HTTP Headers complete from the client
|
||||
def on_headers_complete(headers)
|
||||
return close_connection unless headers and host = headers[HOST_HEADER]
|
||||
|
||||
# Support for HTTP/1.1 connection reuse and possible pipelining
|
||||
@close_connection_after_request = (@parser.http_version.to_s == HTTP_11) ? false : true
|
||||
|
||||
# Support for Connection:Keep-Alive requests on HTTP/1.0, e.g. ApacheBench
|
||||
@close_connection_after_request = false if (headers[CONNECTION_HEADER] == KEEP_ALIVE)
|
||||
|
||||
@trace = (headers[VCAP_TRACE_HEADER] == Router.trace_key)
|
||||
|
||||
# Update # of requests..
|
||||
VCAP::Component.varz[:requests] += 1
|
||||
|
||||
# Clear and recycle previous state
|
||||
recycle_app_conn if @bound_app_conn
|
||||
@droplet = @bound_app_conn = nil
|
||||
|
||||
# Lookup a Droplet
|
||||
unless droplets = Router.lookup_droplet(host)
|
||||
Router.log.debug "No droplet registered for #{host}"
|
||||
VCAP::Component.varz[:bad_requests] += 1
|
||||
send_data(Router.notfound_redirect || ERROR_404_RESPONSE)
|
||||
close_connection_after_writing
|
||||
return
|
||||
end
|
||||
|
||||
Router.log.debug "#{droplets.size} servers available for #{host}"
|
||||
|
||||
# Check for session state
|
||||
if VCAP_COOKIE =~ headers[COOKIE_HEADER]
|
||||
url, host, port = Router.decrypt_session_cookie($1)
|
||||
Router.log.debug "Client has __VCAP_ID__ for #{url}@#{host}:#{port}"
|
||||
# Check host?
|
||||
droplets.each { |droplet|
|
||||
# If we already now about them just update the timestamp..
|
||||
if(droplet[:host] == host && droplet[:port] == port)
|
||||
@droplet = droplet
|
||||
break;
|
||||
end
|
||||
}
|
||||
Router.log.debug "Client's __VCAP_ID__ is stale" unless @droplet
|
||||
end
|
||||
|
||||
# pick a random backend unless selected from above already
|
||||
@droplet = droplets[rand*droplets.size] unless @droplet
|
||||
|
||||
if @droplet[:tags]
|
||||
@droplet[:tags].each do |key, value|
|
||||
tag_metrics = VCAP::Component.varz[:tags][key][value]
|
||||
tag_metrics[:requests] += 1
|
||||
end
|
||||
end
|
||||
|
||||
@droplet[:requests] += 1
|
||||
|
||||
# Client tracking, override with header if its set (nginx to unix domain socket)
|
||||
_, client_ip = Socket.unpack_sockaddr_in(get_peername) unless @is_unix_socket
|
||||
client_ip = headers[REAL_IP_HEADER] if headers[REAL_IP_HEADER]
|
||||
|
||||
@droplet[:clients][client_ip] += 1 if client_ip
|
||||
|
||||
Router.log.debug "Routing on #{@droplet[:url]} to #{@droplet[:host]}:#{@droplet[:port]}"
|
||||
|
||||
# Reuse an existing connection or create one.
|
||||
# Proxy the rest of the traffic without interference.
|
||||
Router.log.debug "Droplet has #{@droplet[:connections].size} pooled connections waiting.."
|
||||
@bound_app_conn = @droplet[:connections].pop
|
||||
if (@bound_app_conn && !@bound_app_conn.error?)
|
||||
Router.log.debug "Reusing pooled AppConnection.."
|
||||
@bound_app_conn.rebind(self, @headers)
|
||||
else
|
||||
host, port = @droplet[:host], @droplet[:port]
|
||||
@bound_app_conn = EM.connect(host, port, AppConnection, self, @headers, @droplet)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def on_message_complete
|
||||
@parser = nil
|
||||
:stop
|
||||
end
|
||||
|
||||
def receive_data(data)
|
||||
# Parser is created after headers have been received and processed.
|
||||
# If it exists we are continuing the processing of body fragments.
|
||||
# Allow the parser to process to signal proper end of message, e.g. chunked, etc
|
||||
return process_request_body_chunk(data) if @parser
|
||||
|
||||
# We are awaiting headers here.
|
||||
# We buffer them if needed to determine the header/body boundary correctly.
|
||||
@buf = @buf ? @buf << data : data
|
||||
|
||||
if hindex = @buf.index(HTTP_HEADERS_END) # all http headers received, figure out where to route to..
|
||||
@parser = Http::Parser.new(self)
|
||||
|
||||
# split headers and rest of body out here.
|
||||
@headers = @buf.slice!(0...hindex+HTTP_HEADERS_END_SIZE)
|
||||
|
||||
# Process headers
|
||||
@parser << @headers
|
||||
|
||||
# Process left over body fragment if any
|
||||
process_request_body_chunk(@buf) if @parser
|
||||
@buf = @headers = nil
|
||||
end
|
||||
|
||||
rescue Http::Parser::Error => e
|
||||
Router.log.debug "HTTP Parser error on request: #{e}"
|
||||
close_connection
|
||||
end
|
||||
|
||||
def unbind
|
||||
Router.log.debug "Unbinding client connection"
|
||||
VCAP::Component.varz[:client_connections] = Router.client_connection_count -= 1
|
||||
Router.log.debug Router.connection_stats
|
||||
Router.log.debug "------------"
|
||||
@close_connection_after_request ? terminate_app_conn : recycle_app_conn
|
||||
end
|
||||
|
||||
end
|
|
@ -2,23 +2,24 @@
|
|||
# HTTP Header processing
|
||||
HOST_HEADER = 'Host'.freeze
|
||||
CONNECTION_HEADER = 'Connection'.freeze
|
||||
REAL_IP_HEADER = 'X-Real_IP'.freeze
|
||||
HTTP_HEADERS_END = "\r\n\r\n".freeze
|
||||
HTTP_HEADERS_END_SIZE = HTTP_HEADERS_END.bytesize
|
||||
KEEP_ALIVE = 'keep-alive'.freeze
|
||||
SET_COOKIE_HEADER = 'Set-Cookie'.freeze
|
||||
COOKIE_HEADER = 'Cookie'.freeze
|
||||
CR_LF = "\r\n".freeze
|
||||
|
||||
STICKY_SESSIONS = /(JSESSIONID)/i
|
||||
|
||||
VCAP_SESSION_ID = '__VCAP_ID__'.freeze
|
||||
VCAP_COOKIE = /__VCAP_ID__=([^;]+)/
|
||||
#STICKY_SESSIONS = /(JSESSIONID)/i
|
||||
|
||||
VCAP_BACKEND_HEADER = 'X-Vcap-Backend'
|
||||
VCAP_ROUTER_HEADER = 'X-Vcap-Router'
|
||||
VCAP_TRACE_HEADER = 'X-Vcap-Trace'
|
||||
|
||||
ULS_HOST_QUERY = :"host"
|
||||
ULS_STATS_UPDATE = :"stats"
|
||||
ULS_REQUEST_TAGS = :"request_tags"
|
||||
ULS_RESPONSE_STATUS = :"response_codes"
|
||||
ULS_RESPONSE_SAMPLES = :"response_samples"
|
||||
ULS_RESPONSE_LATENCY = :"response_latency"
|
||||
ULS_BACKEND_ADDR = :"backend_addr"
|
||||
ULS_ROUTER_IP = :"router_ip"
|
||||
ULS_STICKY_SESSION = :"sticky_session"
|
||||
|
||||
# Max Connections to Pool
|
||||
MAX_POOL = 32
|
||||
|
||||
|
@ -29,7 +30,12 @@ START_SWEEPER = 30 # Timer to publish router.start for refreshing state
|
|||
CHECK_SWEEPER = 30 # Check time for watching health of registered droplet
|
||||
MAX_AGE_STALE = 120 # Max stale age, unregistered if older then 2 minutes
|
||||
|
||||
# 404 Response
|
||||
ERROR_404_RESPONSE="HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n" +
|
||||
"VCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n".freeze
|
||||
# 200 Response
|
||||
HTTP_200_RESPONSE = "HTTP/1.1 200 OK\r\n\r\n".freeze
|
||||
|
||||
# 400 Response
|
||||
ERROR_400_RESPONSE = "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n".freeze
|
||||
|
||||
# 404 Response
|
||||
ERROR_404_RESPONSE = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n" +
|
||||
"VCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n".freeze
|
||||
|
|
|
@ -6,7 +6,7 @@ class Router
|
|||
class << self
|
||||
attr_reader :log, :notfound_redirect, :session_key, :trace_key, :client_inactivity_timeout
|
||||
attr_accessor :server, :local_server, :timestamp, :shutting_down
|
||||
attr_accessor :client_connection_count, :app_connection_count, :outstanding_request_count
|
||||
attr_accessor :client_connection_count
|
||||
attr_accessor :inet, :port
|
||||
|
||||
alias :shutting_down? :shutting_down
|
||||
|
@ -17,7 +17,7 @@ class Router
|
|||
|
||||
def config(config)
|
||||
@droplets = {}
|
||||
@client_connection_count = @app_connection_count = @outstanding_request_count = 0
|
||||
@client_connection_count = 0
|
||||
VCAP::Logging.setup_from_config(config['logging'] || {})
|
||||
@log = VCAP::Logging.logger('router')
|
||||
if config['404_redirect']
|
||||
|
@ -120,14 +120,12 @@ class Router
|
|||
|
||||
def connection_stats
|
||||
tc = EM.connection_count
|
||||
ac = Router.app_connection_count
|
||||
cc = Router.client_connection_count
|
||||
"Connections: [Clients: #{cc}, Apps: #{ac}, Total: #{tc}]"
|
||||
"Connections: [uls Clients: #{cc}, Total: #{tc}]"
|
||||
end
|
||||
|
||||
def log_connection_stats
|
||||
tc = EM.connection_count
|
||||
ac = Router.app_connection_count
|
||||
cc = Router.client_connection_count
|
||||
log.info connection_stats
|
||||
end
|
||||
|
@ -139,7 +137,9 @@ class Router
|
|||
c.key = @session_key
|
||||
e = c.update(Marshal.dump(token))
|
||||
e << c.final
|
||||
[e].pack('m0').gsub("\n",'')
|
||||
session = [e].pack('m0').gsub("\n",'')
|
||||
droplet[:session] = session
|
||||
session
|
||||
end
|
||||
|
||||
def decrypt_session_cookie(key)
|
||||
|
@ -154,8 +154,12 @@ class Router
|
|||
nil
|
||||
end
|
||||
|
||||
def get_session_cookie(droplet)
|
||||
droplet[:session] || generate_session_cookie(droplet)
|
||||
end
|
||||
|
||||
def lookup_droplet(url)
|
||||
@droplets[url]
|
||||
@droplets[url.downcase]
|
||||
end
|
||||
|
||||
def register_droplet(url, host, port, tags)
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
|
||||
require "sinatra/base"
|
||||
|
||||
class RouterULSServer < Sinatra::Base
|
||||
|
||||
class ParserError < StandardError; end
|
||||
|
||||
disable :show_exceptions, :dump_errors
|
||||
|
||||
get "/" do
|
||||
uls_response = {}
|
||||
VCAP::Component.varz[:requests] += 1
|
||||
|
||||
# Get request body
|
||||
request.body.rewind # in case someone already read the body
|
||||
body = request.body.read
|
||||
Router.log.debug "Request body: #{body}"
|
||||
|
||||
# Parse request body
|
||||
uls_req = JSON.parse(body, :symbolize_keys => true)
|
||||
raise ParserError if uls_req.nil? || !uls_req.is_a?(Hash)
|
||||
stats, url = uls_req[ULS_STATS_UPDATE], uls_req[ULS_HOST_QUERY]
|
||||
sticky = uls_req[ULS_STICKY_SESSION]
|
||||
|
||||
if stats then
|
||||
update_uls_stats(stats)
|
||||
end
|
||||
|
||||
if url then
|
||||
# Lookup a droplet
|
||||
unless droplets = Router.lookup_droplet(url)
|
||||
Router.log.debug "No droplet registered for #{url}"
|
||||
raise Sinatra::NotFound
|
||||
end
|
||||
|
||||
# Pick a droplet based on original backend addr or pick a droplet randomly
|
||||
if sticky
|
||||
_, host, port = Router.decrypt_session_cookie(sticky)
|
||||
droplet = check_original_droplet(droplets, host, port)
|
||||
end
|
||||
droplet ||= droplets[rand*droplets.size]
|
||||
Router.log.debug "Routing #{droplet[:url]} to #{droplet[:host]}:#{droplet[:port]}"
|
||||
|
||||
# Update droplet stats
|
||||
update_droplet_stats(droplet)
|
||||
|
||||
# Get session cookie for droplet
|
||||
new_sticky = Router.get_session_cookie(droplet)
|
||||
|
||||
uls_req_tags = Base64.encode64(Marshal.dump(droplet[:tags])).strip if droplet[:tags]
|
||||
uls_response = {
|
||||
ULS_STICKY_SESSION => new_sticky,
|
||||
ULS_BACKEND_ADDR => "#{droplet[:host]}:#{droplet[:port]}",
|
||||
ULS_REQUEST_TAGS => uls_req_tags,
|
||||
ULS_ROUTER_IP => Router.inet
|
||||
}
|
||||
end
|
||||
|
||||
uls_response.to_json
|
||||
end
|
||||
|
||||
not_found do
|
||||
VCAP::Component.varz[:bad_requests] += 1
|
||||
"VCAP ROUTER: 404 - DESTINATION NOT FOUND"
|
||||
end
|
||||
|
||||
error [ JSON::ParserError, ParserError ] do
|
||||
VCAP::Component.varz[:bad_requests] += 1
|
||||
|
||||
_, body = request.body.rewind, request.body.read
|
||||
Router.log.error "Failed to parse request body: '#{body}'"
|
||||
|
||||
status 400
|
||||
"VCAP ROUTER: 400 - FAILED TO PARSE PAYLOAD"
|
||||
end
|
||||
|
||||
error do
|
||||
VCAP::Component.varz[:bad_requests] += 1
|
||||
Router.log.error env['sinatra.error']
|
||||
"VCAP ROUTER: 500 - UNKNOWN"
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def check_original_droplet(droplets, host, port)
|
||||
droplet = nil
|
||||
if host and port
|
||||
Router.log.debug "request has __VCAP_ID__ cookie for #{host}:#{port}"
|
||||
# Check host?
|
||||
droplets.each do |d|
|
||||
if(d[:host] == host && d[:port] == port.to_i)
|
||||
droplet = d; break
|
||||
end
|
||||
end
|
||||
Router.log.debug "request's __VCAP_ID__ is stale" unless droplet
|
||||
end
|
||||
droplet
|
||||
end
|
||||
|
||||
def update_uls_stats(stats)
|
||||
stats.each do |stat|
|
||||
if stat[ULS_REQUEST_TAGS].length > 0
|
||||
tags = Marshal.load(Base64.decode64(stat[ULS_REQUEST_TAGS]))
|
||||
end
|
||||
|
||||
latency = stat[ULS_RESPONSE_LATENCY]
|
||||
samples = stat[ULS_RESPONSE_SAMPLES]
|
||||
|
||||
# We may find a better solution for latency
|
||||
1.upto samples do
|
||||
VCAP::Component.varz[:latency] << latency
|
||||
end
|
||||
|
||||
stat[ULS_RESPONSE_STATUS].each_pair do |k, v|
|
||||
response_code_metric = k.to_sym
|
||||
VCAP::Component.varz[response_code_metric] += v
|
||||
if not tags then next end
|
||||
|
||||
tags.each do |key, value|
|
||||
# In case some req tags of syncup state may be invalid at this time
|
||||
if not VCAP::Component.varz[:tags][key] or
|
||||
not VCAP::Component.varz[:tags][key][value]
|
||||
next
|
||||
end
|
||||
|
||||
tag_metrics = VCAP::Component.varz[:tags][key][value]
|
||||
tag_metrics[response_code_metric] += v
|
||||
1.upto samples do
|
||||
tag_metrics[:latency] << latency
|
||||
end
|
||||
|
||||
end # tags
|
||||
end # stat[ULS_RESPONSE_STATUS]
|
||||
end # stats.each
|
||||
end
|
||||
|
||||
def update_droplet_stats(droplet)
|
||||
if droplet[:tags]
|
||||
droplet[:tags].each do |key, value|
|
||||
tag_metrics = VCAP::Component.varz[:tags][key][value]
|
||||
tag_metrics[:requests] += 1
|
||||
end
|
||||
end
|
||||
|
||||
droplet[:requests] += 1
|
||||
end
|
||||
end
|
|
@ -17,10 +17,11 @@ def stop(pidfile)
|
|||
Router.log.info 'waiting for pending requests to complete.'
|
||||
EM.stop_server(Router.server) if Router.server
|
||||
EM.stop_server(Router.local_server) if Router.local_server
|
||||
if Router.outstanding_request_count <= 0
|
||||
|
||||
if Router.client_connection_count <= 0
|
||||
exit_router(pidfile)
|
||||
else
|
||||
EM.add_periodic_timer(0.25) { exit_router(pidfile) if (Router.outstanding_request_count <= 0) }
|
||||
EM.add_periodic_timer(0.25) { exit_router(pidfile) if (Router.client_connection_count <= 0) }
|
||||
EM.add_timer(10) { exit_router(pidfile) } # Wait at most 10 secs
|
||||
end
|
||||
|
||||
|
@ -32,4 +33,3 @@ def exit_router(pidfile)
|
|||
FileUtils.rm_f(pidfile)
|
||||
exit
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
require 'rake'
|
||||
require 'tempfile'
|
||||
|
||||
require 'rubygems'
|
||||
require 'bundler/setup'
|
||||
Bundler.require(:default, :test)
|
||||
|
||||
require 'rspec/core/rake_task'
|
||||
require 'ci/reporter/rake/rspec'
|
||||
|
||||
coverage_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", "spec_coverage"))
|
||||
reports_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", "spec_reports"))
|
||||
dump_file = File.join(Dir.tmpdir, "router.rcov")
|
||||
|
||||
ENV['CI_REPORTS'] = reports_dir
|
||||
|
||||
desc "Run specs using RCov"
|
||||
task "spec:rcov" => ["ci:setup:rspec", "spec:rcov_internal", "convert_rcov_to_clover"]
|
||||
|
||||
RSpec::Core::RakeTask.new do |t|
|
||||
t.pattern = "{functional,unit}/*_spec.rb"
|
||||
t.rspec_opts = ["--format", "documentation", "--colour"]
|
||||
end
|
||||
|
||||
RSpec::Core::RakeTask.new("test") do |t|
|
||||
t.pattern = "**/*_spec.rb"
|
||||
t.rspec_opts = ["--format", "documentation", "--colour"]
|
||||
end
|
||||
|
||||
ignore_pattern = 'spec,[.]bundle,[/]gems[/]'
|
||||
|
||||
desc "Run specs using RCov (internal, use spec:rcov instead)"
|
||||
RSpec::Core::RakeTask.new("spec:rcov_internal") do |t|
|
||||
FileUtils.rm_rf(dump_file)
|
||||
t.pattern = "**/*_spec.rb"
|
||||
t.rspec_opts = ["--format", "progress", "--colour"]
|
||||
t.rcov = true
|
||||
t.rcov_opts = ['--aggregate', dump_file, '--exclude', ignore_pattern, '--output', coverage_dir]
|
||||
# t.rcov_opts = %W{--exclude osx\/objc,gems\/,spec\/,features\/ -o "#{coverage_dir}"}
|
||||
end
|
||||
|
||||
task "convert_rcov_to_clover" do |t|
|
||||
clover_output = File.join(coverage_dir, "clover.xml")
|
||||
analyzer = File.expand_path(File.join(File.dirname(__FILE__), "..", "..", "tests", "common", "rcov_analyzer.rb"))
|
||||
sh("ruby #{analyzer} #{dump_file} #{ignore_pattern} > #{clover_output}")
|
||||
FileUtils.rm_rf(dump_file)
|
||||
end
|
|
@ -1,40 +1,36 @@
|
|||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
require File.dirname(__FILE__) + '/spec_helper'
|
||||
require 'fileutils'
|
||||
require "base64"
|
||||
|
||||
describe 'Router Functional Tests' do
|
||||
before :each do
|
||||
@dir = Dir.mktmpdir('router-test')
|
||||
nats_port = VCAP::grab_ephemeral_port
|
||||
@nats_server = VCAP::Spec::ForkedComponent::NatsServer.new(
|
||||
File.join(@dir, 'nats.pid'),
|
||||
nats_port,
|
||||
@dir)
|
||||
@nats_server.start
|
||||
@nats_server.running?.should be_true
|
||||
|
||||
router_port = VCAP::grab_ephemeral_port
|
||||
@router = ForkedRouter.new(File.join(@dir, 'router.log'),
|
||||
router_port,
|
||||
nats_port,
|
||||
@dir)
|
||||
include Functional
|
||||
|
||||
ROUTER_V1_DROPLET = { :url => 'router_test.vcap.me', :host => '127.0.0.1', :port => 12345 }
|
||||
ROUTER_V1_SESSION = "zXiJv9VIyWW7kqrcqYUkzj+UEkC4UUHGaYX9fCpDMm2szLfOpt+aeRZMK7kfkpET+PDhvfKRP/M="
|
||||
|
||||
before :each do
|
||||
@nats_server = NatsServer.new
|
||||
@nats_server.start_server
|
||||
@nats_server.is_running?.should be_true
|
||||
|
||||
@router = RouterServer.new(@nats_server.uri)
|
||||
# The router will only announce itself after it has subscribed to 'vcap.component.discover'.
|
||||
NATS.start(:uri => @nats_server.uri) do
|
||||
NATS.subscribe('vcap.component.announce') { NATS.stop }
|
||||
# Ensure that NATS has processed our subscribe from above before we start the router
|
||||
NATS.publish('xxx') { @router.start }
|
||||
NATS.publish('xxx') { @router.start_server }
|
||||
EM.add_timer(5) { NATS.stop }
|
||||
end
|
||||
@router.is_running?.should be_true
|
||||
end
|
||||
|
||||
after :each do
|
||||
@router.stop
|
||||
@router.kill_server
|
||||
@router.is_running?.should be_false
|
||||
|
||||
@nats_server.stop
|
||||
@nats_server.running?.should be_false
|
||||
FileUtils.remove_entry_secure(@dir)
|
||||
@nats_server.kill_server
|
||||
@nats_server.is_running?.should be_false
|
||||
end
|
||||
|
||||
it 'should respond to a discover message properly' do
|
||||
|
@ -61,13 +57,9 @@ describe 'Router Functional Tests' do
|
|||
healthz_resp = Net::HTTP.new(host, port).start { |http| http.request(healthz_req) }
|
||||
healthz_resp.body.should =~ /ok/i
|
||||
|
||||
varz_req = Net::HTTP::Get.new("/varz")
|
||||
varz_req.basic_auth *credentials
|
||||
varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) }
|
||||
varz = JSON.parse(varz_resp.body, :symbolize_keys => true)
|
||||
varz = get_varz()
|
||||
varz[:requests].should be_a_kind_of(Integer)
|
||||
varz[:bad_requests].should be_a_kind_of(Integer)
|
||||
varz[:client_connections].should be_a_kind_of(Integer)
|
||||
varz[:type].should =~ /router/i
|
||||
end
|
||||
|
||||
|
@ -76,8 +68,7 @@ describe 'Router Functional Tests' do
|
|||
app = TestApp.new('router_test.vcap.me')
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
dea.register_app(app)
|
||||
app.verify_registered('127.0.0.1', @router.port)
|
||||
app.stop
|
||||
app.verify_registered
|
||||
end
|
||||
|
||||
it 'should properly unregister an application endpoint' do
|
||||
|
@ -85,113 +76,39 @@ describe 'Router Functional Tests' do
|
|||
app = TestApp.new('router_test.vcap.me')
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
dea.register_app(app)
|
||||
app.verify_registered('127.0.0.1', @router.port)
|
||||
app.verify_registered
|
||||
dea.unregister_app(app)
|
||||
# We should be unregistered here..
|
||||
# Send out simple request and check request and response
|
||||
req = simple_http_request('router_test.cap.me', '/')
|
||||
# force context switch to avoid a race between unregister processing and
|
||||
# accepting new connections
|
||||
sleep(0.01)
|
||||
verify_vcap_404(req, '127.0.0.1', @router.port)
|
||||
app.stop
|
||||
app.verify_unregistered
|
||||
end
|
||||
|
||||
it 'should properly distibute messages between multiple backends' do
|
||||
num_apps = 10
|
||||
num_requests = 100
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
|
||||
apps = []
|
||||
for ii in (0...num_apps)
|
||||
app = TestApp.new('lb_test.vcap.me')
|
||||
dea.register_app(app)
|
||||
apps << app
|
||||
end
|
||||
|
||||
req = simple_http_request('lb_test.vcap.me', '/')
|
||||
for ii in (0...num_requests)
|
||||
TCPSocket.open('127.0.0.1', @router.port) {|rs| rs.send(req, 0) }
|
||||
end
|
||||
sleep(0.25) # Wait here for requests to trip accept state
|
||||
|
||||
app_sockets = apps.collect { |a| a.socket }
|
||||
ready = IO.select(app_sockets, nil, nil, 1)
|
||||
ready[0].should have(num_apps).items
|
||||
apps.each {|a| a.stop }
|
||||
it 'should generate the same token as router v1 did' do
|
||||
Router.config({})
|
||||
token = Router.generate_session_cookie(ROUTER_V1_DROPLET)
|
||||
token.should == ROUTER_V1_SESSION
|
||||
end
|
||||
|
||||
it 'should properly do sticky sessions' do
|
||||
num_apps = 10
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
|
||||
apps = []
|
||||
for ii in (0...num_apps)
|
||||
app = TestApp.new('sticky.vcap.me')
|
||||
dea.register_app(app)
|
||||
apps << app
|
||||
end
|
||||
|
||||
vcap_id = app_socket = nil
|
||||
app_sockets = apps.collect { |a| a.socket }
|
||||
|
||||
TCPSocket.open('127.0.0.1', @router.port) do |rs|
|
||||
rs.send(STICKY_REQUEST, 0)
|
||||
ready = IO.select(app_sockets, nil, nil, 1)
|
||||
ready[0].should have(1).items
|
||||
app_socket = ready[0].first
|
||||
ss = app_socket.accept_nonblock
|
||||
req_received = ss.recv(STICKY_REQUEST.bytesize)
|
||||
req_received.should == STICKY_REQUEST
|
||||
# Send a response back.. This will set the sticky session
|
||||
ss.send(STICKY_RESPONSE, 0)
|
||||
response = rs.read(STICKY_RESPONSE.bytesize)
|
||||
# Make sure the __VCAP_ID__ has been set
|
||||
response =~ /Set-Cookie:\s*__VCAP_ID__=([^;]+);/
|
||||
(vcap_id = $1).should be
|
||||
end
|
||||
|
||||
cookie = "__VCAP_ID__=#{vcap_id}"
|
||||
sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', cookie)
|
||||
|
||||
# Now fire off requests, all should go to same socket as first
|
||||
(0...5).each do
|
||||
TCPSocket.open('127.0.0.1', @router.port) do |rs|
|
||||
rs.send(sticky_request, 0)
|
||||
end
|
||||
end
|
||||
|
||||
ready = IO.select(app_sockets, nil, nil, 1)
|
||||
ready[0].should have(1).items
|
||||
app_socket.should == ready[0].first
|
||||
|
||||
for app in apps
|
||||
dea.unregister_app(app)
|
||||
end
|
||||
|
||||
# force context switch to avoid a race between unregister processing and
|
||||
# accepting new connections
|
||||
sleep(0.01)
|
||||
# Check that is is gone
|
||||
verify_vcap_404(STICKY_REQUEST, '127.0.0.1', @router.port)
|
||||
|
||||
apps.each {|a| a.stop }
|
||||
it 'should decrypt router v1 session' do
|
||||
Router.config({})
|
||||
url, host, port = Router.decrypt_session_cookie(ROUTER_V1_SESSION)
|
||||
url.should == ROUTER_V1_DROPLET[:url]
|
||||
host.should == ROUTER_V1_DROPLET[:host]
|
||||
port.should == ROUTER_V1_DROPLET[:port]
|
||||
end
|
||||
|
||||
it 'should properly exit when NATS fails to reconnect' do
|
||||
@nats_server.stop
|
||||
@nats_server.running?.should be_false
|
||||
@nats_server.kill_server
|
||||
@nats_server.is_running?.should be_false
|
||||
sleep(0.5)
|
||||
@router.is_running?.should be_false
|
||||
end
|
||||
|
||||
it 'should not start with nats not running' do
|
||||
@nats_server.stop
|
||||
@nats_server.running?.should be_false
|
||||
@router.stop
|
||||
@nats_server.kill_server
|
||||
@nats_server.is_running?.should be_false
|
||||
@router.kill_server
|
||||
@router.is_running?.should be_false
|
||||
|
||||
@router.start
|
||||
@router.start_server
|
||||
sleep(0.5)
|
||||
@router.is_running?.should be_false
|
||||
end
|
||||
|
@ -211,11 +128,20 @@ describe 'Router Functional Tests' do
|
|||
reply
|
||||
end
|
||||
|
||||
def verify_vcap_404(req, router_host, router_port)
|
||||
TCPSocket.open(router_host, router_port) do |rs|
|
||||
rs.send(req, 0)
|
||||
response = rs.read(VCAP_NOT_FOUND.bytesize)
|
||||
response.should == VCAP_NOT_FOUND
|
||||
end
|
||||
def get_varz
|
||||
reply = json_request(@nats_server.uri, 'vcap.component.discover')
|
||||
reply.should_not be_nil
|
||||
|
||||
credentials = reply[:credentials]
|
||||
credentials.should_not be_nil
|
||||
|
||||
host, port = reply[:host].split(":")
|
||||
|
||||
varz_req = Net::HTTP::Get.new("/varz")
|
||||
varz_req.basic_auth *credentials
|
||||
varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) }
|
||||
varz = JSON.parse(varz_resp.body, :symbolize_keys => true)
|
||||
varz
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,174 +1,127 @@
|
|||
# Copyright (c) 2009-2012 VMware, Inc.
|
||||
require File.dirname(__FILE__) + '/../spec_helper'
|
||||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
require File.dirname(__FILE__) + '/../lib/spec_helper'
|
||||
|
||||
require 'fileutils'
|
||||
require 'nats/client'
|
||||
require 'yajl/json_gem'
|
||||
require 'openssl'
|
||||
require 'net/http'
|
||||
require 'uri'
|
||||
require 'tempfile'
|
||||
require 'yaml'
|
||||
require 'vcap/spec/forked_component/nats_server'
|
||||
module Functional
|
||||
|
||||
require 'pp'
|
||||
class TestApp
|
||||
class UsageError < StandardError; end
|
||||
|
||||
# Full path to the Ruby we are running under as configured when it was
|
||||
# compiled, so if you have moved or copied it, funny things might happen
|
||||
def current_ruby
|
||||
File.join(Config::CONFIG['bindir'], Config::CONFIG['ruby_install_name'])
|
||||
end
|
||||
attr_reader :uris, :droplet
|
||||
|
||||
class ForkedRouter < VCAP::Spec::ForkedComponent::Base
|
||||
def initialize(*uris)
|
||||
@uris = uris
|
||||
end
|
||||
|
||||
ROUTER_PATH = File.expand_path('../../../bin/router', __FILE__)
|
||||
def bind_droplet(droplet)
|
||||
@droplet = droplet
|
||||
end
|
||||
|
||||
attr_reader :port
|
||||
def initialize(log_file, port, nats_port, router_dir)
|
||||
@port, @nats_port, @log_file = port, nats_port, log_file
|
||||
pid_file = File.join(router_dir, 'router.pid')
|
||||
config = {
|
||||
'port' => port,
|
||||
'inet' => '127.0.0.1',
|
||||
'mbus' => "nats://127.0.0.1:#{nats_port}",
|
||||
'logging' => { 'level' => 'debug' },
|
||||
'pid' => pid_file,
|
||||
}
|
||||
def unbind_droplet
|
||||
@droplet = nil
|
||||
end
|
||||
|
||||
config_file = File.join(router_dir, 'router.yml')
|
||||
nats_timeout = File.expand_path(File.join(File.dirname(__FILE__), 'nats_timeout'))
|
||||
def port
|
||||
@droplet.port if @droplet
|
||||
end
|
||||
|
||||
# Write the config
|
||||
File.open(config_file, 'w') { |f| YAML.dump config, f }
|
||||
cmd = "#{current_ruby} -r#{nats_timeout} #{ROUTER_PATH} -c #{config_file}"
|
||||
def verify_registered
|
||||
for uri in @uris
|
||||
status, body = query_uls(uri)
|
||||
status.should == 200
|
||||
Yajl::Parser.parse(body)["backend_addr"].should == droplet.host_port
|
||||
end
|
||||
end
|
||||
|
||||
def verify_unregistered
|
||||
for uri in @uris
|
||||
status, body = query_uls(uri)
|
||||
status.should == 404
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def query_uls(uri)
|
||||
parser, body = nil, nil
|
||||
UNIXSocket.open(RouterServer.sock) do |socket|
|
||||
socket.send(simple_uls_request(uri), 0)
|
||||
socket.close_write
|
||||
buf = socket.read
|
||||
parser, body = parse_http_msg(buf)
|
||||
socket.close
|
||||
end
|
||||
return parser.status_code, body
|
||||
end
|
||||
|
||||
def simple_uls_request(host)
|
||||
body = { :host => host }.to_json
|
||||
"GET / HTTP/1.0\r\nConnection: Keep-alive\r\nHost: localhost\r\nContent-Length: #{body.length}\r\nContent-Type: application/json\r\nX-Vcap-Service-Token: changemysqltoken\r\nUser-Agent: EventMachine HttpClient\r\n\r\n#{body}"
|
||||
end
|
||||
|
||||
def parse_http_msg(buf)
|
||||
parser = Http::Parser.new
|
||||
body = ''
|
||||
|
||||
parser.on_body = proc do |chunk|
|
||||
body << chunk
|
||||
end
|
||||
|
||||
parser.on_message_complete = proc do
|
||||
:stop
|
||||
end
|
||||
|
||||
parser << buf
|
||||
|
||||
return parser, body
|
||||
end
|
||||
|
||||
super(cmd, 'router', router_dir, pid_file)
|
||||
end
|
||||
|
||||
def start
|
||||
return if is_running?
|
||||
super
|
||||
end
|
||||
class Droplet
|
||||
attr_reader :host, :port
|
||||
|
||||
def is_running?
|
||||
require 'socket'
|
||||
s = TCPSocket.new('localhost', @port)
|
||||
s.close
|
||||
return true
|
||||
rescue
|
||||
return false
|
||||
end
|
||||
end
|
||||
def initialize(host)
|
||||
@host = host
|
||||
@port = Random.rand(100_000)
|
||||
end
|
||||
|
||||
|
||||
# HTTP REQUESTS / RESPONSES
|
||||
|
||||
FOO_HTTP_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 53\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n<h1>Hello from the Clouds!</h1>"
|
||||
|
||||
VCAP_NOT_FOUND = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\nVCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n"
|
||||
|
||||
STICKY_REQUEST = "GET /sticky HTTP/1.1\r\nHost: sticky.vcap.me\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n"
|
||||
|
||||
STICKY_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 242\r\nSet-Cookie: _session_id=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nSet-Cookie: JSESSIONID=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n<h1>Hello from the Cookie Monster! via: 10.0.1.222:35267</h1><h2>session = be009e56c7be0e855d951a3b49e288c98aa36ede</h2><h4>Cookies set: _session_id, JSESSIONID<h4>Note: Trigger new sticky session cookie name via ?ss=NAME appended to URL</h4>"
|
||||
|
||||
|
||||
def simple_http_request(host, path, http_version='1.1')
|
||||
"GET #{path} HTTP/#{http_version}\r\nUser-Agent: curl/7.19.7 (i486-pc-linux-gnu) libcurl/7.19.7 OpenSSL/0.9.8k zlib/1.2.3.3 libidn/1.15\r\nHost: #{host}\r\nAccept: */*\r\n\r\n"
|
||||
end
|
||||
|
||||
def simple_sticky_request(host, path, cookie, http_version='1.1')
|
||||
"GET #{path} HTTP/#{http_version}\r\nHost: #{host}\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\nCookie: #{cookie}\r\n\r\n"
|
||||
end
|
||||
|
||||
def new_app_socket
|
||||
app_socket = TCPServer.new('127.0.0.1', 0)
|
||||
app_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true)
|
||||
Socket.do_not_reverse_lookup = true
|
||||
app_port = app_socket.addr[1]
|
||||
[app_socket, app_port]
|
||||
end
|
||||
|
||||
|
||||
class TestApp
|
||||
class UsageError < StandardError; end
|
||||
|
||||
attr_reader :host, :port, :uris, :socket
|
||||
|
||||
def initialize(*uris)
|
||||
@uris = uris
|
||||
@port = nil
|
||||
@socket = nil
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
raise UsageError, "Already started" if @socket
|
||||
sock, port = new_app_socket
|
||||
@socket = sock
|
||||
@port = port
|
||||
end
|
||||
|
||||
def stop
|
||||
raise UsageError, "Already stopped" if !@socket
|
||||
@socket.close
|
||||
@socket = nil
|
||||
@port = nil
|
||||
end
|
||||
|
||||
# Simple check that the app can be queried via the router
|
||||
def verify_registered(router_host, router_port)
|
||||
for uri in @uris
|
||||
verify_path_registered(uri, '/', router_host, router_port)
|
||||
def host_port
|
||||
"#{host}:#{port}"
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
class DummyDea
|
||||
|
||||
def verify_path_registered(host, path, router_host, router_port)
|
||||
req = simple_http_request(host, path)
|
||||
# Send out simple request and check request and response
|
||||
TCPSocket.open(router_host, router_port) do |rs|
|
||||
rs.send(req, 0)
|
||||
IO.select([@socket], nil, nil, 2) # 2 secs timeout
|
||||
ss = @socket.accept_nonblock
|
||||
req_received = ss.recv(req.bytesize)
|
||||
req_received.should == req
|
||||
# Send a response back..
|
||||
ss.send(FOO_HTTP_RESPONSE, 0)
|
||||
response = rs.read(FOO_HTTP_RESPONSE.bytesize)
|
||||
response.should == FOO_HTTP_RESPONSE
|
||||
ss.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class DummyDea
|
||||
|
||||
attr_reader :nats_uri, :dea_id
|
||||
|
||||
def initialize(nats_uri, dea_id, host='127.0.0.1')
|
||||
@nats_uri = nats_uri
|
||||
@dea_id = dea_id
|
||||
@host = host
|
||||
end
|
||||
|
||||
def reg_hash_for_app(app)
|
||||
{ :dea => @dea_id,
|
||||
:host => @host,
|
||||
:port => app.port,
|
||||
:uris => app.uris
|
||||
}
|
||||
end
|
||||
|
||||
def register_app(app)
|
||||
NATS.start(:uri => @nats_uri) do
|
||||
NATS.publish('router.register', reg_hash_for_app(app).to_json) { NATS.stop }
|
||||
end
|
||||
end
|
||||
|
||||
def unregister_app(app)
|
||||
NATS.start(:uri => @nats_uri) do
|
||||
NATS.publish('router.unregister', reg_hash_for_app(app).to_json) { NATS.stop }
|
||||
attr_reader :nats_uri, :dea_id
|
||||
|
||||
def initialize(nats_uri, dea_id, host='127.0.0.1')
|
||||
@nats_uri = nats_uri
|
||||
@dea_id = dea_id
|
||||
@host = host
|
||||
end
|
||||
|
||||
def reg_hash_for_app(app, tags = {})
|
||||
{ :dea => @dea_id,
|
||||
:host => @host,
|
||||
:port => app.port,
|
||||
:uris => app.uris,
|
||||
:tags => tags
|
||||
}
|
||||
end
|
||||
|
||||
def register_app(app, tags = {})
|
||||
droplet = Droplet.new(@host)
|
||||
app.bind_droplet(droplet)
|
||||
NATS.start(:uri => @nats_uri) do
|
||||
NATS.publish('router.register', reg_hash_for_app(app, tags).to_json) { NATS.stop }
|
||||
end
|
||||
end
|
||||
|
||||
def unregister_app(app)
|
||||
NATS.start(:uri => @nats_uri) do
|
||||
NATS.publish('router.unregister', reg_hash_for_app(app).to_json) { NATS.stop }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -0,0 +1,274 @@
|
|||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
require File.dirname(__FILE__) + '/spec_helper'
|
||||
require "base64"
|
||||
|
||||
describe 'Router Integration Tests (require nginx running)' do
|
||||
include Integration
|
||||
|
||||
before :each do
|
||||
@nats_server = NatsServer.new
|
||||
@nats_server.start_server
|
||||
@nats_server.is_running?.should be_true
|
||||
|
||||
@router = RouterServer.new(@nats_server.uri)
|
||||
# The router will only announce itself after it has subscribed to 'vcap.component.discover'.
|
||||
NATS.start(:uri => @nats_server.uri) do
|
||||
NATS.subscribe('vcap.component.announce') { NATS.stop }
|
||||
# Ensure that NATS has processed our subscribe from above before we start the router
|
||||
NATS.publish('xxx') { @router.start_server }
|
||||
EM.add_timer(5) { NATS.stop }
|
||||
end
|
||||
@router.is_running?.should be_true
|
||||
end
|
||||
|
||||
after :each do
|
||||
@router.kill_server
|
||||
@router.is_running?.should be_false
|
||||
|
||||
@nats_server.kill_server
|
||||
@nats_server.is_running?.should be_false
|
||||
end
|
||||
|
||||
it 'should get health status via nginx' do
|
||||
body = get_healthz()
|
||||
body.should =~ /ok/i
|
||||
end
|
||||
|
||||
it 'should properly register an application endpoint' do
|
||||
# setup the "app"
|
||||
app = TestApp.new('router_test.vcap.me')
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
dea.register_app(app)
|
||||
app.verify_registered('127.0.0.1', RouterServer.port)
|
||||
app.stop
|
||||
end
|
||||
|
||||
it 'should properly unregister an application endpoint' do
|
||||
# setup the "app"
|
||||
app = TestApp.new('router_test.vcap.me')
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
dea.register_app(app)
|
||||
app.verify_registered('127.0.0.1', RouterServer.port)
|
||||
dea.unregister_app(app)
|
||||
# We should be unregistered here..
|
||||
# Send out simple request and check request and response
|
||||
req = simple_http_request('router_test.cap.me', '/')
|
||||
verify_vcap_404(req, '127.0.0.1', RouterServer.port)
|
||||
app.stop
|
||||
end
|
||||
|
||||
it 'should properly distribute messages between multiple backends' do
|
||||
num_apps = 10
|
||||
num_requests = 100
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
|
||||
apps = []
|
||||
for ii in (0...num_apps)
|
||||
app = TestApp.new('lb_test.vcap.me')
|
||||
dea.register_app(app)
|
||||
apps << app
|
||||
end
|
||||
|
||||
req = simple_http_request('lb_test.vcap.me', '/')
|
||||
app_sockets = apps.collect { |a| a.socket }
|
||||
|
||||
results = send_requests_to_apps("127.0.0.1", RouterServer.port,
|
||||
req, num_requests, app_sockets,
|
||||
FOO_HTTP_RESPONSE)
|
||||
results.should have(num_apps).items
|
||||
recv_requests = 0
|
||||
results.each { |entry|
|
||||
recv_requests += entry[:counter]
|
||||
}
|
||||
recv_requests.should == num_requests
|
||||
apps.each {|a| a.stop }
|
||||
|
||||
end
|
||||
|
||||
it 'should get correct statistics' do
|
||||
num_apps = 10
|
||||
num_requests = 100
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
|
||||
apps = []
|
||||
for ii in (0...num_apps)
|
||||
app = TestApp.new('lb_test.vcap.me')
|
||||
dea.register_app(app, {"component" => "test#{ii}", "runtime" => "ruby"})
|
||||
apps << app
|
||||
end
|
||||
|
||||
# Before we count the statistics, we should restart nginx worker
|
||||
# to cleanup unsynced stats.
|
||||
# TODO: It's hard to tell which nginx process belongs to us since it was
|
||||
# started by vcap_dev_setup, we may figure out an elegant way to do this
|
||||
# and get notification when it's ready instead of sleep 2 seconds.
|
||||
%x[ps -ef|grep "nginx: master process"|grep -v grep|awk '{print $2}'|xargs sudo kill -HUP 2> /dev/null]
|
||||
sleep(2)
|
||||
|
||||
req = simple_http_request('lb_test.vcap.me', '/')
|
||||
app_sockets = apps.collect { |a| a.socket }
|
||||
|
||||
results = send_requests_to_apps("127.0.0.1", RouterServer.port,
|
||||
req, num_requests, app_sockets,
|
||||
FOO_HTTP_RESPONSE)
|
||||
# Verify all apps get request and the totally number is correct
|
||||
results.should have(num_apps).items
|
||||
recv_requests = 0
|
||||
results.each { |entry|
|
||||
recv_requests += entry[:counter]
|
||||
}
|
||||
recv_requests.should == num_requests
|
||||
for app in apps
|
||||
dea.unregister_app(app)
|
||||
end
|
||||
|
||||
apps.each {|a| a.stop }
|
||||
|
||||
varz = get_varz()
|
||||
varz[:requests].should be_a_kind_of(Integer)
|
||||
varz[:type].should =~ /router/i
|
||||
|
||||
# Requests are collected exactly the same number as we received
|
||||
# since each of them triggers a location query to uls
|
||||
varz[:requests].should == num_requests
|
||||
|
||||
# send_requests_to_apps is sequentially sending out num_requests requests,
|
||||
# so each response of a outstanding request updates its previous one
|
||||
# and we are sure the status of the last request is still in nginx
|
||||
varz[:responses_2xx].should == (num_requests - 1)
|
||||
|
||||
comp_reqs = comp_resps_2xx = 0
|
||||
# Verify the statistics for each type of request tags
|
||||
for ii in (0...num_apps)
|
||||
comp_reqs += varz[:tags][:component]["test#{ii}".to_sym][:requests]
|
||||
comp_resps_2xx += varz[:tags][:component]["test#{ii}".to_sym][:responses_2xx]
|
||||
end
|
||||
comp_reqs.should == num_requests
|
||||
comp_resps_2xx.should == num_requests - 1
|
||||
varz[:tags][:runtime][:ruby][:requests].should == num_requests
|
||||
varz[:tags][:runtime][:ruby][:responses_2xx].should == num_requests - 1
|
||||
|
||||
# Send an monitor request to nginx to syncup the left stats
|
||||
body = get_healthz()
|
||||
body.should =~ /ok/i
|
||||
|
||||
varz = get_varz()
|
||||
comp_resps_2xx = 0
|
||||
# Verify the statistics for each type of request tags
|
||||
for ii in (0...num_apps)
|
||||
comp_resps_2xx += varz[:tags][:component]["test#{ii}".to_sym][:responses_2xx]
|
||||
end
|
||||
comp_resps_2xx.should == num_requests
|
||||
varz[:tags][:runtime][:ruby][:responses_2xx].should == num_requests
|
||||
end
|
||||
|
||||
it 'should properly do sticky sessions' do
|
||||
num_apps = 10
|
||||
num_requests = 100
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
|
||||
apps = []
|
||||
for ii in (0...num_apps)
|
||||
app = TestApp.new('sticky.vcap.me')
|
||||
dea.register_app(app)
|
||||
apps << app
|
||||
end
|
||||
|
||||
vcap_id = app_socket = nil
|
||||
app_sockets = apps.collect { |a| a.socket }
|
||||
|
||||
TCPSocket.open('127.0.0.1', RouterServer.port) do |rs|
|
||||
rs.send(STICKY_REQUEST, 0)
|
||||
ready = IO.select(app_sockets, nil, nil, 1)
|
||||
ready[0].should have(1).items
|
||||
app_socket = ready[0].first
|
||||
ss = app_socket.accept_nonblock
|
||||
|
||||
smsg, sbody = parse_http_msg_from_buf(STICKY_REQUEST)
|
||||
rmsg, rbody = parse_http_msg_from_socket(ss)
|
||||
validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true
|
||||
|
||||
ss.send(STICKY_RESPONSE, 0)
|
||||
smsg, sbody = parse_http_msg_from_buf(STICKY_RESPONSE)
|
||||
rmsg, rbody = parse_http_msg_from_socket(rs)
|
||||
validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true
|
||||
rmsg.headers["Set-Cookie"] =~ /\s*__VCAP_ID__=([^,;]+)/
|
||||
(vcap_id = $1).should be
|
||||
end
|
||||
|
||||
cookie = "__VCAP_ID__=#{vcap_id}"
|
||||
sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', cookie)
|
||||
|
||||
results = send_requests_to_apps("127.0.0.1", RouterServer.port,
|
||||
sticky_request, num_requests, app_sockets,
|
||||
FOO_HTTP_RESPONSE)
|
||||
verify_results(results, app_socket, num_requests)
|
||||
|
||||
|
||||
# Verify bad cookie won't fail
|
||||
bad_cookie = "__VCAP_ID__=bad_cookie"
|
||||
sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', bad_cookie)
|
||||
|
||||
results = send_requests_to_apps("127.0.0.1", RouterServer.port,
|
||||
sticky_request, num_requests, app_sockets,
|
||||
FOO_HTTP_RESPONSE)
|
||||
verify_results_by_request(results, num_requests)
|
||||
|
||||
# Verify cookie to backend that never exists
|
||||
Router.config({})
|
||||
droplet = { :url => 'sticky.vcap.me', :host => '10.10.10.10', :port => 10 }
|
||||
down_dea_cookie = "__VCAP_ID__=#{Router.generate_session_cookie(droplet)}"
|
||||
sticky_request = simple_sticky_request('sticky.vcap.me', '/sticky', bad_cookie)
|
||||
|
||||
results = send_requests_to_apps("127.0.0.1", RouterServer.port,
|
||||
sticky_request, num_requests, app_sockets,
|
||||
FOO_HTTP_RESPONSE)
|
||||
|
||||
verify_results_by_request(results, num_requests)
|
||||
|
||||
for app in apps
|
||||
dea.unregister_app(app)
|
||||
end
|
||||
|
||||
# Check that it is gone
|
||||
verify_vcap_404(STICKY_REQUEST, '127.0.0.1', RouterServer.port)
|
||||
|
||||
apps.each {|a| a.stop }
|
||||
end
|
||||
|
||||
it 'should add vcap trace headers' do
|
||||
app = TestApp.new('trace.vcap.me')
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
dea.register_app(app, {"component" => "trace", "runtime" => "ruby"})
|
||||
|
||||
resp = app.get_trace_header("127.0.0.1", RouterServer.port, TRACE_KEY)
|
||||
|
||||
resp.headers["X-Vcap-Backend"].should_not be_nil
|
||||
h, p = resp.headers["X-Vcap-Backend"].split(":")
|
||||
p.to_i.should == app.port.to_i
|
||||
|
||||
resp.headers["X-Vcap-Router"].should_not be_nil
|
||||
resp.headers["X-Vcap-Router"].should == RouterServer.host
|
||||
|
||||
dea.unregister_app(app)
|
||||
|
||||
app.stop
|
||||
end
|
||||
|
||||
it 'should not add vcap trace headers when trace key is wrong' do
|
||||
app = TestApp.new('trace.vcap.me')
|
||||
dea = DummyDea.new(@nats_server.uri, '1234')
|
||||
dea.register_app(app, {"component" => "trace", "runtime" => "ruby"})
|
||||
|
||||
resp = app.get_trace_header("127.0.0.1", RouterServer.port, "fake_trace_key")
|
||||
|
||||
resp.headers["X-Vcap-Backend"].should be_nil
|
||||
resp.headers["X-Vcap-Router"].should be_nil
|
||||
|
||||
dea.unregister_app(app)
|
||||
|
||||
app.stop
|
||||
end
|
||||
|
||||
|
||||
end
|
|
@ -0,0 +1,338 @@
|
|||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
require File.dirname(__FILE__) + '/../lib/spec_helper'
|
||||
|
||||
# HTTP REQUESTS / RESPONSES
|
||||
FOO_HTTP_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 31\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n<h1>Hello from the Clouds!</h1>"
|
||||
|
||||
VCAP_NOT_FOUND = "HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\nVCAP ROUTER: 404 - DESTINATION NOT FOUND\r\n"
|
||||
|
||||
STICKY_REQUEST = "GET /sticky HTTP/1.1\r\nHost: sticky.vcap.me\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n"
|
||||
|
||||
STICKY_RESPONSE = "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\nContent-Length: 242\r\nSet-Cookie: _session_id=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nSet-Cookie: JSESSIONID=be009e56c7be0e855d951a3b49e288c98aa36ede; path=/\r\nConnection: keep-alive\r\nServer: thin 1.2.7 codename No Hup\r\n\r\n<h1>Hello from the Cookie Monster! via: 10.0.1.222:35267</h1><h2>session = be009e56c7be0e855d951a3b49e288c98aa36ede</h2><h4>Cookies set: _session_id, JSESSIONID<h4>Note: Trigger new sticky session cookie name via ?ss=NAME appended to URL</h4>"
|
||||
|
||||
TRACE_KEY = "222" # Should be consistent with dev_setup deployment configuration
|
||||
|
||||
def simple_http_request(host, path, http_version='1.1')
|
||||
"GET #{path} HTTP/#{http_version}\r\nUser-Agent: curl/7.19.7 (i486-pc-linux-gnu) libcurl/7.19.7 OpenSSL/0.9.8k zlib/1.2.3.3 libidn/1.15\r\nHost: #{host}\r\nAccept: */*\r\nContent-Length: 11\r\n\r\nhello world"
|
||||
end
|
||||
|
||||
def parse_http_msg_from_socket(socket)
|
||||
parser = Http::Parser.new
|
||||
complete = false
|
||||
body = ''
|
||||
|
||||
parser.on_body = proc do |chunk|
|
||||
body << chunk
|
||||
end
|
||||
|
||||
parser.on_message_complete = proc do
|
||||
complete = true
|
||||
:stop
|
||||
end
|
||||
|
||||
while not complete
|
||||
raw_data = socket.recv(1024)
|
||||
parser << raw_data
|
||||
end
|
||||
|
||||
return parser, body
|
||||
end
|
||||
|
||||
def parse_http_msg_from_buf(buf)
|
||||
parser = Http::Parser.new
|
||||
body = ''
|
||||
|
||||
parser.on_body = proc do |chunk|
|
||||
body << chunk
|
||||
end
|
||||
parser.on_message_complete = proc do
|
||||
:stop
|
||||
end
|
||||
|
||||
parser << buf
|
||||
|
||||
return parser, body
|
||||
end
|
||||
|
||||
def validate_recv_msg_against_send(send_msg, send_body, recv_msg, recv_body)
|
||||
recv_body.should == send_body
|
||||
|
||||
recv_msg.http_method.should == send_msg.http_method
|
||||
recv_msg.request_url.should == send_msg.request_url
|
||||
recv_msg.status_code.should == send_msg.status_code
|
||||
|
||||
# Verify most of the headers are preserved when traversing the "router"
|
||||
send_msg.headers.each do |hdr, val|
|
||||
# Skip the headers nginx will rewrite
|
||||
if (hdr == "Server" or hdr == "Date" or hdr == "Connection") then next end
|
||||
|
||||
if hdr == "Set-Cookie"
|
||||
# Http Parser concatenates all Set-Cookie headers together
|
||||
val.split(',').each do |cookie|
|
||||
(recv_msg.headers["Set-Cookie"].include? cookie).should == true
|
||||
end
|
||||
elsif hdr == "Host"
|
||||
# nginx will rewrite uppercase host to lowercase
|
||||
val.downcase.should == recv_msg.headers[hdr]
|
||||
else
|
||||
val.should == recv_msg.headers[hdr]
|
||||
end
|
||||
end
|
||||
|
||||
return true
|
||||
end
|
||||
|
||||
|
||||
# Encodes _data_ as json, decodes reply as json
|
||||
def json_request(uri, subj, data=nil, timeout=1)
|
||||
reply = nil
|
||||
data_enc = data ? Yajl::Encoder.encode(data) : nil
|
||||
NATS.start(:uri => uri) do
|
||||
NATS.request(subj, data_enc) do |msg|
|
||||
reply = JSON.parse(msg, :symbolize_keys => true)
|
||||
NATS.stop
|
||||
end
|
||||
EM.add_timer(timeout) { NATS.stop }
|
||||
end
|
||||
|
||||
reply
|
||||
end
|
||||
|
||||
def verify_vcap_404(req, router_host, router_port)
|
||||
TCPSocket.open(router_host, router_port) do |rs|
|
||||
rs.send(req, 0)
|
||||
rmsg, rbody = parse_http_msg_from_socket(rs)
|
||||
rmsg.status_code.should == 404
|
||||
end
|
||||
end
|
||||
|
||||
def get_varz
|
||||
reply = json_request(@nats_server.uri, 'vcap.component.discover')
|
||||
reply.should_not be_nil
|
||||
|
||||
credentials = reply[:credentials]
|
||||
credentials.should_not be_nil
|
||||
|
||||
host, port = reply[:host].split(":")
|
||||
|
||||
varz_req = Net::HTTP::Get.new("/varz")
|
||||
varz_req.basic_auth *credentials
|
||||
varz_resp = Net::HTTP.new(host, port).start { |http| http.request(varz_req) }
|
||||
varz = JSON.parse(varz_resp.body, :symbolize_keys => true)
|
||||
varz
|
||||
end
|
||||
|
||||
def get_healthz
|
||||
reply = json_request(@nats_server.uri, 'vcap.component.discover')
|
||||
reply.should_not be_nil
|
||||
|
||||
credentials = reply[:credentials]
|
||||
credentials.should_not be_nil
|
||||
|
||||
rbody = nil
|
||||
TCPSocket.open("127.0.0.1", RouterServer.port) {|rs|
|
||||
rs.send(healthz_request, 0)
|
||||
|
||||
resp, rbody = parse_http_msg_from_socket(rs)
|
||||
resp.status_code.should == 200
|
||||
}
|
||||
rbody
|
||||
end
|
||||
|
||||
def simple_sticky_request(host, path, cookie, http_version='1.1')
|
||||
"GET #{path} HTTP/#{http_version}\r\nHost: #{host}\r\nConnection: keep-alive\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\nCookie: #{cookie}\r\n\r\n"
|
||||
end
|
||||
|
||||
def healthz_request
|
||||
"GET / HTTP/1.0\r\nUser-Agent: HTTP-Monitor/1.1\r\n\r\n"
|
||||
end
|
||||
|
||||
def trace_request(trace_key)
|
||||
"GET /trace HTTP/1.1\r\nHost: trace.vcap.me\r\nConnection: keep-alive\r\nX-Vcap-Trace: #{trace_key}\r\nAccept: application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5\r\nUser-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.10 (KHTML, like Gecko) Chrome/8.0.552.237 Safari/534.10\r\nAccept-Encoding: gzip,deflate,sdch\r\nAccept-Language: en-US,en;q=0.8\r\nAccept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.3\r\n\r\n"
|
||||
end
|
||||
|
||||
def send_http_request(ip, port, req)
|
||||
status, body = nil, nil
|
||||
TCPSocket.open(ip, port) do |rs|
|
||||
rs.send(req, 0)
|
||||
rs.close_write
|
||||
result = rs.read
|
||||
parser, body = parse_http_msg_from_buf(result)
|
||||
status = parser.status_code
|
||||
end
|
||||
[ status, body ]
|
||||
end
|
||||
|
||||
def send_requests_to_apps(ip, port, req, num_requests, app_sockets, resp)
|
||||
results = []
|
||||
for i in (0...num_requests)
|
||||
TCPSocket.open(ip, port) {|rs|
|
||||
rs.send(req, 0)
|
||||
ready = IO.select(app_sockets, nil, nil, 1)
|
||||
ready[0].should have(1).items
|
||||
app_socket = ready[0].first
|
||||
ss = app_socket.accept_nonblock
|
||||
# Drain the socket
|
||||
parse_http_msg_from_socket(ss)
|
||||
# Send a response back to client to emulate a full req/resp cycle
|
||||
# to avoid nginx 499 error
|
||||
ss.send(resp, 0)
|
||||
|
||||
found = false
|
||||
results.each { |entry|
|
||||
if (entry[:app_socket] == app_socket)
|
||||
entry[:counter] += 1
|
||||
found = true
|
||||
break
|
||||
end
|
||||
}
|
||||
if not found
|
||||
entry = {
|
||||
:app_socket => app_socket,
|
||||
:counter => 1
|
||||
}
|
||||
results << entry
|
||||
end
|
||||
}
|
||||
end
|
||||
results
|
||||
end
|
||||
|
||||
def verify_results_by_request(results, num_requests)
|
||||
recv_requests = 0
|
||||
results.each { |entry|
|
||||
recv_requests += entry[:counter]
|
||||
}
|
||||
recv_requests.should == num_requests
|
||||
end
|
||||
|
||||
def verify_results_by_socket(results, app_socket)
|
||||
results.should have(1).items
|
||||
results[0][:app_socket].should == app_socket
|
||||
end
|
||||
|
||||
def verify_results(results, app_socket, num_requests)
|
||||
verify_results_by_request(results, num_requests)
|
||||
verify_results_by_socket(results, app_socket)
|
||||
end
|
||||
|
||||
module Integration
|
||||
|
||||
class TestApp
|
||||
class UsageError < StandardError; end
|
||||
|
||||
attr_reader :host, :port, :uris, :socket
|
||||
|
||||
def initialize(*uris)
|
||||
@uris = uris
|
||||
@port = nil
|
||||
@socket = nil
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
raise UsageError, "Already started" if @socket
|
||||
sock, port = new_app_socket
|
||||
@socket = sock
|
||||
@port = port
|
||||
end
|
||||
|
||||
def new_app_socket
|
||||
app_socket = TCPServer.new('127.0.0.1', 0)
|
||||
app_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true)
|
||||
Socket.do_not_reverse_lookup = true
|
||||
app_port = app_socket.addr[1]
|
||||
[app_socket, app_port]
|
||||
end
|
||||
|
||||
def stop
|
||||
raise UsageError, "Already stopped" if !@socket
|
||||
@socket.close
|
||||
@socket = nil
|
||||
@port = nil
|
||||
end
|
||||
|
||||
# Simple check that the app can be queried via the router
|
||||
def verify_registered(router_host, router_port)
|
||||
for uri in @uris
|
||||
# verify both original uri and uppercase uri
|
||||
verify_path_registered(uri, '/', router_host, router_port)
|
||||
verify_path_registered(uri.upcase, '/', router_host, router_port)
|
||||
end
|
||||
end
|
||||
|
||||
def get_trace_header(router_host, router_port, trace_key)
|
||||
req = trace_request(trace_key)
|
||||
# Send out simple request and check request and response
|
||||
TCPSocket.open(router_host, router_port) do |rs|
|
||||
rs.send(req, 0)
|
||||
IO.select([@socket], nil, nil, 2) # 2 secs timeout
|
||||
ss = @socket.accept_nonblock
|
||||
|
||||
# Send a response back..
|
||||
ss.send(FOO_HTTP_RESPONSE, 0)
|
||||
rmsg, rbody = parse_http_msg_from_socket(rs)
|
||||
ss.close
|
||||
return rmsg
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def verify_path_registered(host, path, router_host, router_port)
|
||||
req = simple_http_request(host, path)
|
||||
# Send out simple request and check request and response
|
||||
TCPSocket.open(router_host, router_port) do |rs|
|
||||
rs.send(req, 0)
|
||||
IO.select([@socket], nil, nil, 2) # 2 secs timeout
|
||||
ss = @socket.accept_nonblock
|
||||
|
||||
smsg, sbody = parse_http_msg_from_buf(req)
|
||||
rmsg, rbody = parse_http_msg_from_socket(ss)
|
||||
validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true
|
||||
|
||||
# Send a response back..
|
||||
ss.send(FOO_HTTP_RESPONSE, 0)
|
||||
smsg, sbody = parse_http_msg_from_buf(FOO_HTTP_RESPONSE)
|
||||
rmsg, rbody = parse_http_msg_from_socket(rs)
|
||||
validate_recv_msg_against_send(smsg, sbody, rmsg, rbody).should == true
|
||||
|
||||
ss.close
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
end
|
||||
|
||||
class DummyDea
|
||||
|
||||
attr_reader :nats_uri, :dea_id
|
||||
|
||||
def initialize(nats_uri, dea_id, host='127.0.0.1')
|
||||
@nats_uri = nats_uri
|
||||
@dea_id = dea_id
|
||||
@host = host
|
||||
end
|
||||
|
||||
def reg_hash_for_app(app, tags = {})
|
||||
{ :dea => @dea_id,
|
||||
:host => @host,
|
||||
:port => app.port,
|
||||
:uris => app.uris,
|
||||
:tags => tags
|
||||
}
|
||||
end
|
||||
|
||||
def register_app(app, tags = {})
|
||||
NATS.start(:uri => @nats_uri) do
|
||||
NATS.publish('router.register', reg_hash_for_app(app, tags).to_json) { NATS.stop }
|
||||
end
|
||||
end
|
||||
|
||||
def unregister_app(app)
|
||||
NATS.start(:uri => @nats_uri) do
|
||||
NATS.publish('router.unregister', reg_hash_for_app(app).to_json) { NATS.stop }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,135 @@
|
|||
# Copyright (c) 2009-2011 VMware, Inc.
|
||||
require File.dirname(__FILE__) + '/../spec_helper'
|
||||
|
||||
require 'fileutils'
|
||||
require 'nats/client'
|
||||
require 'yajl/json_gem'
|
||||
require 'vcap/common'
|
||||
require 'openssl'
|
||||
require 'net/http'
|
||||
require 'uri'
|
||||
require "http/parser"
|
||||
require "router/const"
|
||||
require "router/router"
|
||||
|
||||
require 'pp'
|
||||
|
||||
class NatsServer
|
||||
|
||||
TEST_PORT = 4228
|
||||
|
||||
def initialize(uri="nats://localhost:#{TEST_PORT}", pid_file='/tmp/nats-router-tests.pid')
|
||||
@uri = URI.parse(uri)
|
||||
@pid_file = pid_file
|
||||
end
|
||||
|
||||
def uri
|
||||
@uri.to_s
|
||||
end
|
||||
|
||||
def server_pid
|
||||
@pid ||= File.read(@pid_file).chomp.to_i
|
||||
end
|
||||
|
||||
def start_server
|
||||
return if NATS.server_running? @uri
|
||||
%x[ruby -S bundle exec nats-server -p #{@uri.port} -P #{@pid_file} -d 2> /dev/null]
|
||||
NATS.wait_for_server(@uri) # New version takes opt_timeout
|
||||
end
|
||||
|
||||
def is_running?
|
||||
NATS.server_running? @uri
|
||||
end
|
||||
|
||||
def kill_server
|
||||
if File.exists? @pid_file
|
||||
Process.kill('KILL', server_pid)
|
||||
FileUtils.rm_f(@pid_file)
|
||||
end
|
||||
sleep(0.5)
|
||||
end
|
||||
end
|
||||
|
||||
class RouterServer
|
||||
|
||||
PID_FILE = '/tmp/router-test.pid'
|
||||
CONFIG_FILE = '/tmp/router-test.yml'
|
||||
LOG_FILE = '/tmp/router-test.log'
|
||||
UNIX_SOCK = '/tmp/router.sock' # unix socket between nginx and uls
|
||||
PORT = 80 # nginx listening port
|
||||
STATUS_PORT = 8081 # must be consistent with nginx config in dev_setup
|
||||
STATUS_USER = "admin" # must be consistent with nginx config in dev_setup
|
||||
STATUS_PASSWD = "password" # must be consistent with nginx config in dev_setup
|
||||
|
||||
# We verify functionalities for the whole "router" (i.e. nginx + uls).
|
||||
# In all tests, when a client like to send a request to an test app,
|
||||
# it has to send to the port which nginx is listening.
|
||||
def initialize(nats_uri)
|
||||
mbus = "mbus: #{nats_uri}"
|
||||
log_info = "logging:\n level: debug\n file: #{LOG_FILE}"
|
||||
@config = %Q{sock: #{UNIX_SOCK}\n#{mbus}\n#{log_info}\npid: #{PID_FILE}\nlocal_route: 127.0.0.1\nstatus:\n port: #{STATUS_PORT}\n user: #{STATUS_USER}\n password: #{STATUS_PASSWD}}
|
||||
end
|
||||
|
||||
def self.port
|
||||
PORT
|
||||
end
|
||||
|
||||
def self.sock
|
||||
UNIX_SOCK
|
||||
end
|
||||
|
||||
def self.host
|
||||
'127.0.0.1'
|
||||
end
|
||||
|
||||
def server_pid
|
||||
File.read(PID_FILE).chomp.to_i
|
||||
end
|
||||
|
||||
def start_server
|
||||
return if is_running?
|
||||
|
||||
# Write the config
|
||||
File.open(CONFIG_FILE, 'w') { |f| f.puts "#{@config}" }
|
||||
|
||||
# Wipe old log file, but truncate so running tail works
|
||||
if (File.exists? LOG_FILE)
|
||||
File.truncate(LOG_FILE, 0)
|
||||
# %x[rm #{LOG_FILE}] if File.exists? LOG_FILE
|
||||
end
|
||||
|
||||
server = File.expand_path(File.join(__FILE__, '../../../bin/router'))
|
||||
nats_timeout = File.expand_path(File.join(File.dirname(__FILE__), 'nats_timeout'))
|
||||
#pid = Process.fork { %x[#{server} -c #{CONFIG_FILE} 2> /dev/null] }
|
||||
pid = Process.fork { %x[ruby -r#{nats_timeout} #{server} -c #{CONFIG_FILE}] }
|
||||
Process.detach(pid)
|
||||
|
||||
wait_for_server
|
||||
end
|
||||
|
||||
def is_running?
|
||||
require 'socket'
|
||||
s = UNIXSocket.new(UNIX_SOCK)
|
||||
s.close
|
||||
return true
|
||||
rescue
|
||||
return false
|
||||
end
|
||||
|
||||
def wait_for_server(max_wait = 5)
|
||||
start = Time.now
|
||||
while (Time.now - start < max_wait) # Wait max_wait seconds max
|
||||
break if is_running?
|
||||
sleep(0.2)
|
||||
end
|
||||
end
|
||||
|
||||
def kill_server
|
||||
if File.exists? PID_FILE
|
||||
%x[kill -9 #{server_pid} 2> /dev/null]
|
||||
%x[rm #{PID_FILE}]
|
||||
end
|
||||
%x[rm #{CONFIG_FILE}] if File.exists? CONFIG_FILE
|
||||
sleep(0.2)
|
||||
end
|
||||
end
|
|
@ -23,19 +23,6 @@ describe Router do
|
|||
before :each do
|
||||
clear_router
|
||||
end
|
||||
|
||||
it 'should set up a session key' do
|
||||
Router.session_key.should be
|
||||
end
|
||||
|
||||
it 'should set a default client inactivity timeout' do
|
||||
Router.client_inactivity_timeout.should be
|
||||
end
|
||||
|
||||
it 'should respect a client_inactivity_timeout key when supplied' do
|
||||
Router.config('client_inactivity_timeout' => 30)
|
||||
Router.client_inactivity_timeout.should == 30
|
||||
end
|
||||
end
|
||||
|
||||
describe 'Router.register_droplet' do
|
||||
|
@ -70,6 +57,13 @@ describe Router do
|
|||
droplet[:clients].should == {}
|
||||
end
|
||||
|
||||
it 'should allow looking up uppercase uri' do
|
||||
Router.register_droplet('foo.vcap.me', '10.0.1.22', 2222, {})
|
||||
droplets = Router.lookup_droplet('FOO.VCAP.ME')
|
||||
droplets.should be_instance_of Array
|
||||
droplets.should have(1).items
|
||||
end
|
||||
|
||||
it 'should count droplets independent of URL' do
|
||||
Router.register_droplet('foo.vcap.me', '10.0.1.22', 2222, {})
|
||||
Router.register_droplet('foo.vcap.me', '10.0.1.22', 2224, {})
|
||||
|
@ -139,27 +133,6 @@ describe Router do
|
|||
end
|
||||
end
|
||||
|
||||
describe 'Router.session_keys' do
|
||||
before :each do
|
||||
clear_router
|
||||
end
|
||||
|
||||
it 'should properly encrypt and decrypt session keys' do
|
||||
Router.register_droplet('foo.vcap.me', '10.0.1.22', 2222, {})
|
||||
droplets = Router.lookup_droplet('foo.vcap.me')
|
||||
droplets.should have(1).items
|
||||
droplet = droplets.first
|
||||
key = Router.generate_session_cookie(droplet)
|
||||
key.should be
|
||||
droplet_array = Router.decrypt_session_cookie(key)
|
||||
droplet_array.should be_instance_of Array
|
||||
droplet_array.should have(3).items
|
||||
droplet_array[0].should == droplet[:url]
|
||||
droplet_array[1].should == droplet[:host]
|
||||
droplet_array[2].should == droplet[:port]
|
||||
end
|
||||
end
|
||||
|
||||
def clear_router
|
||||
Router.config({})
|
||||
Router.instance_variable_set(:@log, double(:black_hole).as_null_object)
|
||||
|
|
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Загрузка…
Ссылка в новой задаче