This commit is contained in:
Ben Burkert 2013-06-29 18:36:30 -07:00
Родитель 08c92544f8
Коммит 79ae28a29e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 7F8B7CF7590D2B35
14 изменённых файлов: 376 добавлений и 395 удалений

Просмотреть файл

@ -24,7 +24,7 @@ end
opts = Trollop::options do
opt :uuid, "The UUID of the stream.", :short => '-u', :type => String
opt :'content-type', "The content-type of the stream.", :short => '-c', :type => String, :default => 'text/plain'
opt :endpoint, "The endpoint of the htttee service.", :short => '-e', :type => String, :default => 'http://htttee.engineyard.com/'
opt :endpoint, "The endpoint of the htttee service.", :short => '-e', :type => String, :default => 'http://localhost:3000/'
opt :quiet, "Don't echo to stdout.", :short => '-q', :default => false
end
@ -34,6 +34,6 @@ $stdin.sync = true
$stdout.sync = true
input = opts[:quiet] ? $stdin : MultiplexedIO.new($stdin, $stdout)
client = EY::Tea::Client.new(:endpoint => opts[:endpoint])
client = HTTTee::Client.new(:endpoint => opts[:endpoint])
client.up(input, opts[:uuid], opts[:'content-type'])

Просмотреть файл

@ -3,4 +3,4 @@ $:.unshift File.join(File.dirname(__FILE__), 'lib')
require 'htttee/server'
use Rack::CommonLogger
run EY::Tea::Server.app
run HTTTee::Server.app

Просмотреть файл

@ -2,12 +2,10 @@ require 'net/http'
require 'rack/client'
#require 'uuidtools'
module EY
module Tea
module Client
def self.new(*a)
Consumer.new(*a)
end
module HTTTee
module Client
def self.new(*a)
Consumer.new(*a)
end
end
end

Просмотреть файл

@ -1,48 +1,46 @@
module EY
module Tea
module Client
class Consumer < Rack::Client::Base
def initialize(options = {})
@base_uri = URI.parse(options.fetch(:endpoint, 'http://tea.engineyard.com/'))
inner_app = options.fetch(:app, Rack::Client::Handler::NetHTTP.new)
module HTTTee
module Client
class Consumer < Rack::Client::Base
def initialize(options = {})
@base_uri = URI.parse(options.fetch(:endpoint, 'http://localhost:3000/'))
inner_app = options.fetch(:app, Rack::Client::Handler::NetHTTP.new)
super(inner_app)
super(inner_app)
end
def up(io, uuid, content_type = 'text/plain')
post("/#{uuid}", {'Content-Type' => content_type, 'Transfer-Encoding' => 'chunked'}, io)
end
def down(uuid)
get("/#{uuid}") do |status, headers, response_body|
response_body.each do |chunk|
yield chunk
end
def up(io, uuid, content_type = 'text/plain')
post("/#{uuid}", {'Content-Type' => content_type, 'Transfer-Encoding' => 'chunked'}, io)
end
end
def down(uuid)
get("/#{uuid}") do |status, headers, response_body|
response_body.each do |chunk|
yield chunk
end
end
def build_env(request_method, url, headers = {}, body = nil)
uri = @base_uri.nil? ? URI.parse(url) : @base_uri + url
env = super(request_method, uri.to_s, headers, body)
env['HTTP_HOST'] ||= http_host_for(uri)
env['HTTP_USER_AGENT'] ||= http_user_agent
env
end
def http_host_for(uri)
if uri.to_s.include?(":#{uri.port}")
[uri.host, uri.port].join(':')
else
uri.host
end
end
def build_env(request_method, url, headers = {}, body = nil)
uri = @base_uri.nil? ? URI.parse(url) : @base_uri + url
env = super(request_method, uri.to_s, headers, body)
env['HTTP_HOST'] ||= http_host_for(uri)
env['HTTP_USER_AGENT'] ||= http_user_agent
env
end
def http_host_for(uri)
if uri.to_s.include?(":#{uri.port}")
[uri.host, uri.port].join(':')
else
uri.host
end
end
def http_user_agent
"htttee (rack-client #{Rack::Client::VERSION} (app: #{@app.class}))"
end
def http_user_agent
"htttee (rack-client #{Rack::Client::VERSION} (app: #{@app.class}))"
end
end
end

Просмотреть файл

@ -5,59 +5,57 @@ require 'em-redis'
EM.epoll
module EY
module Tea
module Server
module HTTTee
module Server
def self.app
mocking? ? mock_app : rack_app
def self.app
mocking? ? mock_app : rack_app
end
def self.api(host = (ENV['REDIS_HOST'] || 'localhost' ), port = (ENV['REDIS_PORT'] || 6379).to_i)
Api.new(host, port)
end
def self.rack_app
Rack::Builder.app do |builder|
builder.use AsyncFixer
builder.use Dechunker
builder.run Server.api
end
end
def self.api(host = (ENV['REDIS_HOST'] || 'localhost' ), port = (ENV['REDIS_PORT'] || 6379).to_i)
Api.new(host, port)
def self.mock_app
Rack::Builder.app do |builder|
builder.use Mock::ThinMuxer
builder.use Mock::EchoUri
builder.run Server.rack_app
end
end
def self.rack_app
Rack::Builder.app do |builder|
builder.use AsyncFixer
builder.use Dechunker
builder.run Server.api
def self.mock!
require 'htttee/server/mock'
@mocking = true
@mock_uri = Mock.boot_forking_server
end
def self.reset!
raise "Can't reset in non-mocked mode." unless mocking?
EM.run do
EM::Protocols::Redis.connect.flush_all do
EM.stop
end
end
end
def self.mock_app
Rack::Builder.app do |builder|
builder.use Mock::ThinMuxer
builder.use Mock::EchoUri
builder.run Server.rack_app
end
end
def self.mocking?
@mocking
end
def self.mock!
require 'htttee/server/mock'
@mocking = true
@mock_uri = Mock.boot_forking_server
end
def self.reset!
raise "Can't reset in non-mocked mode." unless mocking?
EM.run do
EM::Protocols::Redis.connect.flush_all do
EM.stop
end
end
end
def self.mocking?
@mocking
end
def self.mock_uri
raise "Not in mock mode!" unless mocking?
@mock_uri
end
def self.mock_uri
raise "Not in mock mode!" unless mocking?
@mock_uri
end
end
end

Просмотреть файл

@ -1,205 +1,203 @@
module EY
module Tea
module Server
class Api
STREAMING, FIN = ?0, ?1
SUBSCRIBE, UNSUBSCRIBE, MESSAGE = 'SUBSCRIBE', 'UNSUBSCRIBE', 'MESSAGE'
module HTTTee
module Server
class Api
STREAMING, FIN = ?0, ?1
SUBSCRIBE, UNSUBSCRIBE, MESSAGE = 'SUBSCRIBE', 'UNSUBSCRIBE', 'MESSAGE'
AsyncResponse = [-1, {}, []].freeze
AsyncResponse = [-1, {}, []].freeze
attr_accessor :redis
attr_accessor :redis
def initialize(host, port)
@host, @port = host, port
def initialize(host, port)
@host, @port = host, port
end
def call(env)
uuid = env['PATH_INFO'].sub(/^\//, '')
case env['REQUEST_METHOD']
when 'POST' then post(env, uuid)
when 'GET' then get(env, uuid)
end
def call(env)
uuid = env['PATH_INFO'].sub(/^\//, '')
AsyncResponse
end
case env['REQUEST_METHOD']
when 'POST' then post(env, uuid)
when 'GET' then get(env, uuid)
def post(env, uuid)
body = Thin::DeferrableBody.new
redis.set(state_key(uuid), STREAMING)
set_input_callback(env, uuid, body)
set_input_errback(env, uuid, body)
set_input_each(env, uuid, body)
end
def get(env, uuid)
body = ChunkedBody.new
with_state_for(uuid) do |state|
case state
when NilClass then four_oh_four_response(env, body)
when STREAMING then open_stream_response(env, uuid, body)
when FIN then closed_stream_response(env, uuid, body)
end
AsyncResponse
end
end
def post(env, uuid)
body = Thin::DeferrableBody.new
def set_input_callback(env, uuid, body)
rack_input(env).callback do
async_callback(env).call [204, {}, body]
redis.set(state_key(uuid), STREAMING)
set_input_callback(env, uuid, body)
set_input_errback(env, uuid, body)
set_input_each(env, uuid, body)
redis.set(state_key(uuid), FIN) do
finish(channel(uuid))
body.succeed
end
end
end
def get(env, uuid)
body = ChunkedBody.new
def set_input_errback(env, uuid, body)
rack_input(env).errback do |error|
async_callback(env).call [500, {}, body]
body.call [error.inspect]
body.succeed
end
end
def set_input_each(env, uuid, body)
rack_input(env).each do |chunk|
unless chunk.empty?
redis.pipeline ['append', data_key(uuid), chunk],
['publish', channel(uuid), encode(STREAMING, chunk)]
end
end
end
def four_oh_four_response(env, body)
env['async.callback'].call [404, {'Transfer-Encoding' => 'chunked'}, body]
body.succeed
end
def open_stream_response(env, uuid, body)
start_response(env, body)
stream_data_to(body, data_key(uuid)) do
with_state_for(uuid) do |state|
case state
when NilClass then four_oh_four_response(env, body)
when STREAMING then open_stream_response(env, uuid, body)
when FIN then closed_stream_response(env, uuid, body)
end
end
end
def set_input_callback(env, uuid, body)
rack_input(env).callback do
async_callback(env).call [204, {}, body]
redis.set(state_key(uuid), FIN) do
finish(channel(uuid))
if state == FIN
body.succeed
end
end
end
def set_input_errback(env, uuid, body)
rack_input(env).errback do |error|
async_callback(env).call [500, {}, body]
body.call [error.inspect]
body.succeed
end
end
def set_input_each(env, uuid, body)
rack_input(env).each do |chunk|
unless chunk.empty?
redis.pipeline ['append', data_key(uuid), chunk],
['publish', channel(uuid), encode(STREAMING, chunk)]
end
end
end
def four_oh_four_response(env, body)
env['async.callback'].call [404, {'Transfer-Encoding' => 'chunked'}, body]
body.succeed
end
def open_stream_response(env, uuid, body)
start_response(env, body)
stream_data_to(body, data_key(uuid)) do
with_state_for(uuid) do |state|
if state == FIN
body.succeed
else
subscribe_and_stream(env, uuid, body)
end
end
end
end
def closed_stream_response(env, uuid, body)
start_response(env, body)
stream_data_to(body, data_key(uuid)) do
body.succeed
end
end
def stream_data_to(body, key, offset = 0, chunk_size = 1024, &block)
redis.substr(key, offset, offset + chunk_size) do |chunk|
if chunk.nil? || chunk.empty?
yield
else
body.call [chunk]
stream_data_to(body, key, offset + chunk.size, chunk_size, &block)
subscribe_and_stream(env, uuid, body)
end
end
end
end
def respond_with(env, body, data)
start_response(env, body, data)
def closed_stream_response(env, uuid, body)
start_response(env, body)
stream_data_to(body, data_key(uuid)) do
body.succeed
end
end
def start_response(env, body, data = nil)
env['async.callback'].call [200, {'Transfer-Encoding' => 'chunked', 'Content-Type' => 'text/plain'}, body]
body.call [data] unless data.nil? || data.empty?
def stream_data_to(body, key, offset = 0, chunk_size = 1024, &block)
redis.substr(key, offset, offset + chunk_size) do |chunk|
if chunk.nil? || chunk.empty?
yield
else
body.call [chunk]
stream_data_to(body, key, offset + chunk.size, chunk_size, &block)
end
end
end
def subscribe_and_stream(env, uuid, body)
subscribe channel(uuid) do |type, message, *extra|
case type
def respond_with(env, body, data)
start_response(env, body, data)
body.succeed
end
def start_response(env, body, data = nil)
env['async.callback'].call [200, {'Transfer-Encoding' => 'chunked', 'Content-Type' => 'text/plain'}, body]
body.call [data] unless data.nil? || data.empty?
end
def subscribe_and_stream(env, uuid, body)
subscribe channel(uuid) do |type, message, *extra|
case type
#when SUBSCRIBE then start_response(env, body, data)
when FIN then body.succeed
when STREAMING then body.call [message]
end
when FIN then body.succeed
when STREAMING then body.call [message]
end
end
end
def with_state_for(uuid, &block)
redis.get(state_key(uuid), &block)
end
def with_state_for(uuid, &block)
redis.get(state_key(uuid), &block)
end
def with_state_and_data_for(uuid, &block)
redis.multi_get(state_key(uuid), data_key(uuid), &block)
end
def with_state_and_data_for(uuid, &block)
redis.multi_get(state_key(uuid), data_key(uuid), &block)
end
def data_key(uuid)
"#{uuid}:data"
end
def data_key(uuid)
"#{uuid}:data"
end
def state_key(uuid)
"#{uuid}:state"
end
def state_key(uuid)
"#{uuid}:state"
end
def channel(uuid)
uuid
end
def channel(uuid)
uuid
end
def rack_input(rack_env)
rack_env['rack.input']
end
def rack_input(rack_env)
rack_env['rack.input']
end
def async_callback(rack_env)
rack_env['async.callback']
end
def async_callback(rack_env)
rack_env['async.callback']
end
def publish(channel, data)
redis.publish channel, encode(STREAMING, data)
end
def publish(channel, data)
redis.publish channel, encode(STREAMING, data)
end
def finish(channel)
redis.publish channel, encode(FIN)
end
def finish(channel)
redis.publish channel, encode(FIN)
end
def subscribe(channel, &block)
conn = pubsub
conn.subscribe channel do |type, chan, data|
case type.upcase
when SUBSCRIBE then block.call(SUBSCRIBE, chan)
when MESSAGE
state, data = Yajl::Parser.parse(data)
case state
when STREAMING then block.call(STREAMING, data)
when FIN
conn.unsubscribe channel
block.call(FIN, data)
end
else
debugger
''
require 'debugger'
def subscribe(channel, &block)
conn = pubsub
conn.subscribe channel do |type, chan, data|
case type.upcase
when SUBSCRIBE then block.call(SUBSCRIBE, chan)
when MESSAGE
state, data = Yajl::Parser.parse(data)
case state
when STREAMING then block.call(STREAMING, data)
when FIN
conn.unsubscribe channel
block.call(FIN, data)
end
else
''
end
end
end
def pubsub
EM::Protocols::PubSubRedis.connect(@host, @port)
end
def pubsub
EM::Protocols::PubSubRedis.connect(@host, @port)
end
def redis
@redis ||= EM::Protocols::Redis.connect(@host, @port)
end
def redis
@@redis ||= EM::Protocols::Redis.connect(@host, @port)
end
def encode(*parts)
Yajl::Encoder.encode(parts)
end
def encode(*parts)
Yajl::Encoder.encode(parts)
end
end
end

Просмотреть файл

@ -1,21 +1,19 @@
module EY
module Tea
module Server
class ChunkedBody < Thin::DeferrableBody
def call(body)
body.each do |fragment|
@body_callback.call(chunk(fragment))
end
module HTTTee
module Server
class ChunkedBody < Thin::DeferrableBody
def call(body)
body.each do |fragment|
@body_callback.call(chunk(fragment))
end
end
def succeed(*a)
@body_callback.call("0\r\n\r\n")
super
end
def succeed(*a)
@body_callback.call("0\r\n\r\n")
super
end
def chunk(fragment)
"#{fragment.size.to_s(16)}\r\n#{fragment}\r\n"
end
def chunk(fragment)
"#{fragment.size.to_s(16)}\r\n#{fragment}\r\n"
end
end
end

Просмотреть файл

@ -1,20 +1,17 @@
module HTTTee
module Server
class AsyncFixer
def initialize(app)
@app = app
end
module EY
module Tea
module Server
class AsyncFixer
def initialize(app)
@app = app
end
def call(env)
tuple = @app.call(env)
def call(env)
tuple = @app.call(env)
if tuple.first == -1
Thin::Connection::AsyncResponse
else
tuple
end
if tuple.first == -1
Thin::Connection::AsyncResponse
else
tuple
end
end
end

Просмотреть файл

@ -1,59 +1,57 @@
module EY
module Tea
module Server
class Dechunker
def initialize(app)
@app = app
module HTTTee
module Server
class Dechunker
def initialize(app)
@app = app
end
def call(env)
env['rack.input'] = ChunkedBody.new(env['rack.input']) if chunked?(env)
@app.call(env)
end
def chunked?(env)
env['HTTP_TRANSFER_ENCODING'] == 'chunked'
end
class ChunkedBody
extend Forwardable
CRLF = "\r\n"
attr_reader :input
def_delegators :input, :callback, :errback
def initialize(input)
@input, @buffer = input, ''
end
def call(env)
env['rack.input'] = ChunkedBody.new(env['rack.input']) if chunked?(env)
@app.call(env)
end
def chunked?(env)
env['HTTP_TRANSFER_ENCODING'] == 'chunked'
end
class ChunkedBody
extend Forwardable
CRLF = "\r\n"
attr_reader :input
def_delegators :input, :callback, :errback
def initialize(input)
@input, @buffer = input, ''
def each(&blk)
@input.each do |chunk|
dechunk(chunk, &blk)
end
end
def each(&blk)
@input.each do |chunk|
dechunk(chunk, &blk)
end
end
def dechunk(chunk, &blk)
@buffer << chunk
def dechunk(chunk, &blk)
@buffer << chunk
loop do
return unless @buffer[CRLF]
loop do
return unless @buffer[CRLF]
string_length, remainder = @buffer.split(CRLF, 2)
length = string_length.to_i(16)
string_length, remainder = @buffer.split(CRLF, 2)
length = string_length.to_i(16)
if length == 0
@buffer = ''
@input.succeed
return
elsif remainder.size >= length + 2 # length + CRLF
data, @buffer = remainder.split(CRLF, 2)
blk.call(data)
else
return
end
if length == 0
@buffer = ''
@input.succeed
return
elsif remainder.size >= length + 2 # length + CRLF
data, @buffer = remainder.split(CRLF, 2)
blk.call(data)
else
return
end
end
end

Просмотреть файл

@ -1,67 +1,65 @@
require 'rack/mux'
module EY
module Tea
module Server
module Mock
module HTTTee
module Server
module Mock
def self.boot_forking_server
o,i = IO.pipe
def self.boot_forking_server
o,i = IO.pipe
if pid = fork
at_exit { Process.kill(:SIGTERM, pid) }
if pid = fork
at_exit { Process.kill(:SIGTERM, pid) }
i.close
URI.parse(o.read)
i.close
URI.parse(o.read)
else
o.close
process_child(i)
end
end
def self.process_child(i)
EM.run do
client = Rack::Client.new { run HTTTee::Server.mock_app }
uri = client.get("/mux-uri").body
i << uri
i.close
end
exit
end
class ThinMuxer
def initialize(app)
@app = Rack::Mux.new(async_safe(app), thin_options)
end
def call(env)
@app.call(env)
end
def async_safe(app)
AsyncFixer.new(app)
end
def thin_options
{ :server => Thin, :environment => 'none' }
end
end
class EchoUri
def initialize(app)
@app = app
end
def call(env)
if env['PATH_INFO'] == '/mux-uri'
[200, {'Content-Type' => 'text/plain'}, [env['X-Mux-Uri']]]
else
o.close
process_child(i)
end
end
def self.process_child(i)
EM.run do
client = Rack::Client.new { run EY::Tea::Server.mock_app }
uri = client.get("/mux-uri").body
i << uri
i.close
end
exit
end
class ThinMuxer
def initialize(app)
@app = Rack::Mux.new(async_safe(app), thin_options)
end
def call(env)
@app.call(env)
end
def async_safe(app)
AsyncFixer.new(app)
end
def thin_options
{ :server => Thin, :environment => 'none' }
end
end
class EchoUri
def initialize(app)
@app = app
end
def call(env)
if env['PATH_INFO'] == '/mux-uri'
[200, {'Content-Type' => 'text/plain'}, [env['X-Mux-Uri']]]
else
@app.call(env)
end
end
end
end
end

Просмотреть файл

@ -1,5 +1,3 @@
module EY
module Tea
VERSION = '0.0.0'
end
module HTTTee
VERSION = '0.0.0'
end

Просмотреть файл

@ -1,10 +1,10 @@
require 'spec_helper'
describe EY::Tea::Client do
describe HTTTee::Client do
subject { @client }
def new_client
EY::Tea::Client.new(:endpoint => EY::Tea::Server.mock_uri.to_s)
HTTTee::Client.new(:endpoint => HTTTee::Server.mock_uri.to_s)
end
def run(thread)

Просмотреть файл

@ -1,4 +1,4 @@
module TeaHelpers
module HTTTeeHelpers
class ThinMuxer
def initialize(app)
@app = Rack::Mux.new(thin_options)

Просмотреть файл

@ -6,16 +6,16 @@ require 'htttee/client'
require 'digest/sha2'
EY::Tea::Server.mock!
HTTTee::Server.mock!
Thin::Logging.debug = Thin::Logging.trace = true
RSpec.configure do |config|
config.color_enabled = config.tty = true #Force ANSI colors
config.around :each do |callback|
EY::Tea::Server.reset!
HTTTee::Server.reset!
@client = EY::Tea::Client.new(:endpoint => EY::Tea::Server.mock_uri.to_s)
@client = HTTTee::Client.new(:endpoint => HTTTee::Server.mock_uri.to_s)
callback.run
end
end