ruby/sample/drb/old_tuplespace.rb

213 строки
3.7 KiB
Ruby

#!/usr/local/bin/ruby
# TupleSpace
# Copyright (c) 1999-2000 Masatoshi SEKI
# You can redistribute it and/or modify it under the same terms as Ruby.
class TupleSpace
class Template
def initialize(list)
@list = list
@check_idx = []
@list.each_with_index do |x, i|
@check_idx.push i if x
end
@size = @list.size
end
attr :size
alias length size
def match(tuple)
return nil if tuple.size != self.size
@check_idx.each do |i|
unless @list[i] === tuple[i]
return false
end
end
return true
end
end
def initialize
@que = {}
@waiting = {}
@que.taint # enable tainted communication
@waiting.taint
self.taint
end
def wakeup_waiting(tuple)
sz = tuple.length
return nil unless @waiting[sz]
x = nil
i = -1
found = false
@waiting[sz] = @waiting[sz].find_all { |x|
if x[0].match(tuple)
begin
x[1].wakeup
rescue ThreadError
end
false
else
true
end
}
end
def put_waiting(template, thread)
sz = template.length
@waiting[sz] = [] unless @waiting[sz]
@waiting[sz].push([Template.new(template), thread])
end
private :wakeup_waiting
private :put_waiting
def get_que(template)
sz = template.length
return nil unless @que[sz]
template = Template.new(template)
x = nil
i = -1
found = false
@que[sz].each_with_index do |x, i|
if template.match(x)
found = true
break
end
end
return nil unless found
@que[sz].delete_at(i)
return x
end
def put_que(tuple)
sz = tuple.length
@que[sz] = [] unless @que[sz]
@que[sz].push tuple
end
private :get_que
private :put_que
def out(*tuples)
tuples.each do |tuple|
Thread.critical = true
put_que(tuple)
wakeup_waiting(tuple)
Thread.critical = false
end
end
alias put out
alias write out
def in(template, non_block=false)
begin
loop do
Thread.critical = true
tuple = get_que(template)
unless tuple
if non_block
raise ThreadError, "queue empty"
end
put_waiting(template, Thread.current)
Thread.stop
else
return tuple
end
end
ensure
Thread.critical = false
end
end
alias get in
alias take in
def rd(template, non_block=false)
tuple = self.in(template, non_block)
out(tuple)
tuple
end
alias read rd
def mv(dest, template, non_block=false)
tuple = self.in(template, non_block)
begin
dest.out(tuple)
rescue
self.out(tuple)
end
end
alias move mv
end
if __FILE__ == $0
ts = TupleSpace.new
clients = []
servers = []
def server(ts, id)
Thread.start {
loop do
req = ts.in(['req', nil, nil])
ac = req[1]
num = req[2]
sleep id
ts.out([ac, id, num, num * num])
end
}
end
def client(ts, n)
Thread.start {
ac = Object.new
tuples = (1..10).collect { |i|
['req', ac, i * 10 + n]
}
ts.out(*tuples)
ts.out(tuples[0])
puts "out: #{n}"
11.times do |i|
ans = ts.in([ac, nil, nil, nil])
puts "client(#{n}) server(#{ans[1]}) #{ans[2]} #{ans[3]}"
end
}
end
def watcher(ts)
Thread.start {
loop do
begin
sleep 1
p ts.rd(['req', nil, nil], true)
rescue ThreadError
puts "'req' not found."
end
end
}
end
(0..3).each do |n|
servers.push(server(ts, n))
end
(1..6).each do |n|
clients.push(client(ts, n))
end
(1..3).each do
watcher(ts)
end
clients.each do |t|
t.join
end
end