зеркало из https://github.com/github/ruby.git
* lib/test/unit.rb: --jobs-status won't puts over 2 lines.
* test/testunit/test_parallel.rb: Fix test for above. * lib/test/*: refactoring. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@30959 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
Родитель
10e2dbee16
Коммит
86c7e68442
|
@ -1,3 +1,11 @@
|
|||
Sat Feb 26 16:10:23 2011 Shota Fukumori <sorah@tubusu.net>
|
||||
|
||||
* lib/test/unit.rb: --jobs-status won't puts over 2 lines.
|
||||
|
||||
* test/testunit/test_parallel.rb: Fix test for above.
|
||||
|
||||
* lib/test/*: refactoring.
|
||||
|
||||
Sat Feb 26 07:10:05 2011 Aaron Patterson <aaron@tenderlovemaking.com>
|
||||
|
||||
* ext/psych/lib/psych/scalar_scanner.rb: fix parsing timezone's whose
|
||||
|
|
469
lib/test/unit.rb
469
lib/test/unit.rb
|
@ -229,33 +229,87 @@ module Test
|
|||
include Test::Unit::GCStressOption
|
||||
include Test::Unit::RunCount
|
||||
|
||||
class Worker
|
||||
def self.launch(ruby,args=[])
|
||||
i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master
|
||||
j,k = IO.pipe("ASCII-8BIT") # worker <j|<k master
|
||||
k.sync = true
|
||||
pid = spawn(*ruby,
|
||||
"#{File.dirname(__FILE__)}/unit/parallel.rb",
|
||||
*args, out: o, in: j)
|
||||
[o,j].each(&:close)
|
||||
new(in: k, out: i, pid: pid, status: :waiting)
|
||||
end
|
||||
|
||||
def initialize(h={})
|
||||
@worker = h
|
||||
@hooks = {}
|
||||
end
|
||||
|
||||
def run(task,type)
|
||||
@worker[:file] = File.basename(task).gsub(/\.rb/,"")
|
||||
@worker[:real_file] = task
|
||||
begin
|
||||
@worker[:loadpath] ||= []
|
||||
@worker[:in].puts "loadpath #{[Marshal.dump($:-@worker[:loadpath])].pack("m").gsub("\n","")}"
|
||||
@worker[:loadpath] = $:.dup
|
||||
@worker[:in].puts "run #{task} #{type}"
|
||||
@worker[:status] = :prepare
|
||||
rescue Errno::EPIPE
|
||||
dead
|
||||
rescue IOError
|
||||
raise unless ["stream closed","closed stream"].include? $!.message
|
||||
dead
|
||||
end
|
||||
end
|
||||
|
||||
def hook(id,&block)
|
||||
@hooks[id] ||= []
|
||||
@hooks[id] << block
|
||||
self
|
||||
end
|
||||
|
||||
def read
|
||||
((self[:status] == :quit) ? self[:out].read : self[:out].gets).chomp
|
||||
end
|
||||
|
||||
def [](k); @worker[k]; end
|
||||
def []=(k,v); @worker[k]=v; end
|
||||
|
||||
def dead(*additional)
|
||||
@worker[:status] = :quit
|
||||
@worker[:in].close
|
||||
@worker[:out].close
|
||||
|
||||
call_hook(:dead,*additional)
|
||||
end
|
||||
|
||||
def to_s
|
||||
if self[:file]
|
||||
"#{self[:pid]}=#{self[:file]}"
|
||||
else
|
||||
"#{self[:pid]}:#{self[:status].to_s.ljust(7)}"
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def call_hook(id,*additional)
|
||||
@hooks[id] ||= []
|
||||
@hooks[id].each{|hook| hook[self,additional] }
|
||||
self
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class << self; undef autorun; end
|
||||
|
||||
alias orig_run_anything _run_anything
|
||||
undef _run_anything
|
||||
undef options
|
||||
|
||||
def options
|
||||
@optss ||= (@options||{}).merge(@opts)
|
||||
end
|
||||
|
||||
def _run_anything type
|
||||
if @opts[:parallel] && @warnings
|
||||
warn ""
|
||||
ary = []
|
||||
@warnings.reject! do |w|
|
||||
r = ary.include?(w[1].message)
|
||||
ary << w[1].message
|
||||
r
|
||||
end
|
||||
@warnings.each do |w|
|
||||
warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
|
||||
end
|
||||
warn ""
|
||||
end
|
||||
orig_run_anything(type)
|
||||
end
|
||||
|
||||
@@stop_auto_run = false
|
||||
def self.autorun
|
||||
at_exit {
|
||||
|
@ -269,7 +323,6 @@ module Test
|
|||
def after_worker_down(worker, e=nil, c=1)
|
||||
return unless @opts[:parallel]
|
||||
return if @interrupt
|
||||
after_worker_dead worker
|
||||
if e
|
||||
b = e.backtrace
|
||||
warn "#{b.shift}: #{e.message} (#{e.class})"
|
||||
|
@ -288,40 +341,20 @@ module Test
|
|||
def jobs_status
|
||||
return unless @opts[:job_status]
|
||||
puts "" unless @opts[:verbose]
|
||||
if @opts[:job_status]
|
||||
line2 = []
|
||||
line1 = @workers.map { |worker|
|
||||
a = "#{worker[:pid]}:#{worker[:status].to_s.ljust(7)}"
|
||||
if worker[:file]
|
||||
if @opts[:job_status_type] == :replace
|
||||
a = "#{worker[:pid]}=#{worker[:file]}"
|
||||
else
|
||||
if a.size > worker[:file].size
|
||||
line2 << worker[:file].ljust(a.size)
|
||||
else
|
||||
a << " "*(worker[:file].size-a.size)
|
||||
line2 << worker[:file]
|
||||
end
|
||||
end
|
||||
else
|
||||
line2 << " "*a.size
|
||||
end
|
||||
a
|
||||
}.join(" ")
|
||||
if @opts[:job_status_type] == :replace
|
||||
@terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \
|
||||
|| %x{tput cols 2>/dev/null}.to_i.nonzero? \
|
||||
|| 80
|
||||
@jstr_size ||= 0
|
||||
del_jobs_status
|
||||
STDOUT.flush
|
||||
print line1[0...@terminal_width]
|
||||
STDOUT.flush
|
||||
@jstr_size = line1.size > @terminal_width ? @terminal_width : line1.size
|
||||
else
|
||||
puts line1
|
||||
puts line2.join(" ")
|
||||
end
|
||||
status_line = @workers.map(&:to_s).join(" ")
|
||||
if @opts[:job_status_type] == :replace
|
||||
@terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \
|
||||
|| %x{tput cols 2>/dev/null}.to_i.nonzero? \
|
||||
|| 80
|
||||
@jstr_size ||= 0
|
||||
del_jobs_status
|
||||
STDOUT.flush
|
||||
print status_line[0...@terminal_width]
|
||||
STDOUT.flush
|
||||
@jstr_size = status_line.size > @terminal_width ? \
|
||||
@terminal_width : status_line.size
|
||||
else
|
||||
puts status_line
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -333,184 +366,174 @@ module Test
|
|||
def after_worker_dead(worker)
|
||||
return unless @opts[:parallel]
|
||||
return if @interrupt
|
||||
worker[:status] = :quit
|
||||
worker[:in].close
|
||||
worker[:out].close
|
||||
@workers.delete(worker)
|
||||
@dead_workers << worker
|
||||
@ios = @workers.map{|w| w[:out] }
|
||||
end
|
||||
|
||||
def _run_parallel suites, type, result
|
||||
begin
|
||||
# Require needed things for parallel running
|
||||
require 'thread'
|
||||
require 'timeout'
|
||||
@tasks = @files.dup # Array of filenames.
|
||||
@need_quit = false
|
||||
@dead_workers = [] # Array of dead workers.
|
||||
@warnings = []
|
||||
shutting_down = false
|
||||
rep = [] # FIXME: more good naming
|
||||
|
||||
# Array of workers.
|
||||
@workers = @opts[:parallel].times.map {
|
||||
begin
|
||||
worker = Worker.launch(@opts[:ruby],@args)
|
||||
worker.hook(:dead) do |w,info|
|
||||
after_worker_dead w
|
||||
after_worker_down w, *info unless info.empty?
|
||||
end
|
||||
worker
|
||||
rescue Exception; puts "#{$!.class}: #{$!.message}\n#{$!.backtrace}"
|
||||
end
|
||||
}
|
||||
|
||||
# Thread: watchdog
|
||||
watchdog = Thread.new do
|
||||
while stat = Process.wait2
|
||||
break if @interrupt # Break when interrupt
|
||||
w = (@workers + @dead_workers).find{|x| stat[0] == x[:pid] }.dup
|
||||
next unless w
|
||||
unless w[:status] == :quit
|
||||
# Worker down
|
||||
w.dead(nil, stat[1].to_i)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@workers_hash = Hash[@workers.map {|w| [w[:out],w] }] # out-IO => worker
|
||||
@ios = @workers.map{|w| w[:out] } # Array of worker IOs
|
||||
|
||||
while _io = IO.select(@ios)[0]
|
||||
break unless _io.each do |io|
|
||||
break if @need_quit
|
||||
worker = @workers_hash[io]
|
||||
case worker.read
|
||||
when /^okay$/
|
||||
worker[:status] = :running
|
||||
jobs_status
|
||||
when /^ready$/
|
||||
worker[:status] = :ready
|
||||
if @tasks.empty?
|
||||
break unless @workers.find{|x| x[:status] == :running }
|
||||
else
|
||||
worker.run(@tasks.shift, type)
|
||||
end
|
||||
|
||||
jobs_status
|
||||
when /^done (.+?)$/
|
||||
r = Marshal.load($1.unpack("m")[0])
|
||||
result << r[0..1]
|
||||
rep << {file: worker[:real_file],
|
||||
report: r[2], result: r[3], testcase: r[5]}
|
||||
$:.push(*r[4]).uniq!
|
||||
when /^p (.+?)$/
|
||||
del_jobs_status
|
||||
print $1.unpack("m")[0]
|
||||
jobs_status if @opts[:job_status_type] == :replace
|
||||
when /^after (.+?)$/
|
||||
@warnings << Marshal.load($1.unpack("m")[0])
|
||||
when /^bye (.+?)$/
|
||||
after_worker_down worker, Marshal.load($1.unpack("m")[0])
|
||||
when /^bye$/
|
||||
if shutting_down
|
||||
after_worker_dead worker
|
||||
else
|
||||
after_worker_down worker
|
||||
end
|
||||
end
|
||||
break if @need_quit
|
||||
end
|
||||
end
|
||||
rescue Interrupt => e
|
||||
@interrupt = e
|
||||
return result
|
||||
ensure
|
||||
shutting_down = true
|
||||
|
||||
watchdog.kill if watchdog
|
||||
@workers.each do |worker|
|
||||
begin
|
||||
timeout(1) do
|
||||
worker[:in].puts "quit"
|
||||
end
|
||||
rescue Errno::EPIPE
|
||||
rescue Timeout::Error
|
||||
end
|
||||
[:in,:out].each { |name| worker[name].close }
|
||||
end
|
||||
begin
|
||||
timeout(0.2*@workers.size) do
|
||||
Process.waitall
|
||||
end
|
||||
rescue Timeout::Error
|
||||
@workers.each do |worker|
|
||||
begin
|
||||
Process.kill(:KILL,worker[:pid])
|
||||
rescue Errno::ESRCH; end
|
||||
end
|
||||
end
|
||||
|
||||
if @interrupt || @opts[:no_retry]
|
||||
rep.each do |r|
|
||||
report.push(*r[:report])
|
||||
end
|
||||
@errors += rep.map{|x| x[:result][0] }.inject(:+)
|
||||
@failures += rep.map{|x| x[:result][1] }.inject(:+)
|
||||
@skips += rep.map{|x| x[:result][2] }.inject(:+)
|
||||
elsif @need_quit
|
||||
rep.each do |r|
|
||||
report.push(*r[:report])
|
||||
@errors += r[:result][0]
|
||||
@failures += r[:result][1]
|
||||
@skips += r[:result][2]
|
||||
end
|
||||
else
|
||||
puts ""
|
||||
puts "Retrying..."
|
||||
puts ""
|
||||
@options = @opts
|
||||
rep.each do |r|
|
||||
if r[:testcase] && r[:file] && !r[:report].empty?
|
||||
require r[:file]
|
||||
_run_suite(eval(r[:testcase]),type)
|
||||
else
|
||||
report.push(*r[:report])
|
||||
@errors += r[:result][0]
|
||||
@failures += r[:result][1]
|
||||
@skips += r[:result][2]
|
||||
end
|
||||
end
|
||||
end
|
||||
if @warnings
|
||||
warn ""
|
||||
ary = []
|
||||
@warnings.reject! do |w|
|
||||
r = ary.include?(w[1].message)
|
||||
ary << w[1].message
|
||||
r
|
||||
end
|
||||
@warnings.each do |w|
|
||||
warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
|
||||
end
|
||||
warn ""
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def _run_suites suites, type
|
||||
@interrupt = nil
|
||||
result = []
|
||||
if @opts[:parallel]
|
||||
begin
|
||||
# Require needed things for parallel running
|
||||
require 'thread'
|
||||
require 'timeout'
|
||||
@tasks = @files.dup # Array of filenames.
|
||||
@need_quit = false
|
||||
@dead_workers = [] # Array of dead workers.
|
||||
@warnings = []
|
||||
shutting_down = false
|
||||
errors = []
|
||||
failures = []
|
||||
skips = []
|
||||
rep = []
|
||||
|
||||
# Array of workers.
|
||||
@workers = @opts[:parallel].times.map do
|
||||
i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master
|
||||
j,k = IO.pipe("ASCII-8BIT") # worker <j|<k master
|
||||
k.sync = true
|
||||
pid = spawn(*@opts[:ruby],
|
||||
"#{File.dirname(__FILE__)}/unit/parallel.rb",
|
||||
*@args, out: o, in: j)
|
||||
[o,j].each{|io| io.close }
|
||||
{in: k, out: i, pid: pid, status: :waiting}
|
||||
end
|
||||
|
||||
# Thread: watchdog
|
||||
watchdog = Thread.new do
|
||||
while stat = Process.wait2
|
||||
break if @interrupt # Break when interrupt
|
||||
w = (@workers + @dead_workers).find{|x| stat[0] == x[:pid] }.dup
|
||||
next unless w
|
||||
unless w[:status] == :quit
|
||||
# Worker down
|
||||
after_worker_down w, nil, stat[1].to_i
|
||||
end
|
||||
end
|
||||
end
|
||||
@workers_hash = Hash[@workers.map {|w| [w[:out],w] }] # out-IO => worker
|
||||
@ios = @workers.map{|w| w[:out] } # Array of worker IOs
|
||||
|
||||
while _io = IO.select(@ios)[0]
|
||||
break unless _io.each do |io|
|
||||
break if @need_quit
|
||||
worker = @workers_hash[io]
|
||||
buf = ((worker[:status] == :quit) ? io.read : io.gets).chomp
|
||||
case buf
|
||||
when /^okay$/ # Worker will run task
|
||||
worker[:status] = :running
|
||||
jobs_status
|
||||
when /^ready$/ # Worker is ready
|
||||
worker[:status] = :ready
|
||||
if @tasks.empty?
|
||||
break unless @workers.find{|x| x[:status] == :running }
|
||||
else
|
||||
task = @tasks.shift
|
||||
worker[:file] = File.basename(task).gsub(/\.rb/,"")
|
||||
worker[:real_file] = task
|
||||
begin
|
||||
worker[:loadpath] ||= []
|
||||
worker[:in].puts "loadpath #{[Marshal.dump($:-worker[:loadpath])].pack("m").gsub("\n","")}"
|
||||
worker[:loadpath] = $:.dup
|
||||
worker[:in].puts "run #{task} #{type}"
|
||||
worker[:status] = :prepare
|
||||
rescue Errno::EPIPE
|
||||
after_worker_down worker
|
||||
rescue IOError
|
||||
raise unless ["stream closed","closed stream"].include? $!.message
|
||||
after_worker_down worker
|
||||
end
|
||||
end
|
||||
|
||||
jobs_status
|
||||
when /^done (.+?)$/ # Worker ran a one of suites in a file
|
||||
r = Marshal.load($1.unpack("m")[0])
|
||||
# [result,result,report,$:]
|
||||
result << r[0..1]
|
||||
rep << {file: worker[:real_file], report: r[2], result: r[3],
|
||||
testcase: r[5]}
|
||||
errors << [worker[:real_file],r[5],r[3][0]]
|
||||
failures << [worker[:real_file],r[5],r[3][1]]
|
||||
skips << [worker[:real_file],r[5],r[3][2]]
|
||||
$:.push(*r[4]).uniq!
|
||||
worker[:status] = :done
|
||||
jobs_status if @opts[:job_status_type] == :replace
|
||||
worker[:status] = :running
|
||||
when /^p (.+?)$/ # Worker wanna print to STDOUT
|
||||
del_jobs_status
|
||||
print $1.unpack("m")[0]
|
||||
jobs_status if @opts[:job_status_type] == :replace
|
||||
when /^after (.+?)$/
|
||||
@warnings << Marshal.load($1.unpack("m")[0])
|
||||
when /^bye (.+?)$/ # Worker will shutdown
|
||||
e = Marshal.load($1.unpack("m")[0])
|
||||
after_worker_down worker, e
|
||||
when /^bye$/ # Worker will shutdown
|
||||
if shutting_down
|
||||
after_worker_dead worker
|
||||
else
|
||||
after_worker_down worker
|
||||
end
|
||||
end
|
||||
break if @need_quit
|
||||
end
|
||||
end
|
||||
|
||||
# Retry
|
||||
# TODO: Interrupt?
|
||||
rescue Interrupt => e
|
||||
@interrupt = e
|
||||
return result
|
||||
ensure
|
||||
shutting_down = true
|
||||
|
||||
watchdog.kill if watchdog
|
||||
@workers.each do |worker|
|
||||
begin
|
||||
timeout(1) do
|
||||
worker[:in].puts "quit"
|
||||
end
|
||||
rescue Errno::EPIPE
|
||||
rescue Timeout::Error
|
||||
end
|
||||
[:in,:out].each do |name|
|
||||
worker[name].close
|
||||
end
|
||||
end
|
||||
begin
|
||||
timeout(0.2*@workers.size) do
|
||||
Process.waitall
|
||||
end
|
||||
rescue Timeout::Error
|
||||
@workers.each do |worker|
|
||||
begin
|
||||
Process.kill(:KILL,worker[:pid])
|
||||
rescue Errno::ESRCH; end
|
||||
end
|
||||
end
|
||||
|
||||
unless @need_quit
|
||||
if @interrupt || @opts[:no_retry]
|
||||
rep.each do |r|
|
||||
report.push(*r[:report])
|
||||
end
|
||||
@errors += errors.map(&:last).inject(:+)
|
||||
@failures += failures.map(&:last).inject(:+)
|
||||
@skips += skips.map(&:last).inject(:+)
|
||||
else
|
||||
puts ""
|
||||
puts "Retrying..."
|
||||
puts ""
|
||||
@options = @opts
|
||||
rep.each do |r|
|
||||
if r[:testcase] && r[:file] && !r[:report].empty?
|
||||
require r[:file]
|
||||
_run_suite(eval(r[:testcase]),type)
|
||||
else
|
||||
report.push(*r[:report])
|
||||
@errors += r[:result][0]
|
||||
@failures += r[:result][1]
|
||||
@skips += r[:result][2]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
_run_parallel suites, type, result
|
||||
else
|
||||
suites.each {|suite|
|
||||
begin
|
||||
|
|
|
@ -26,9 +26,9 @@ module Test
|
|||
|
||||
stdout = STDOUT.dup
|
||||
|
||||
th = Thread.new(i.dup) do |io|
|
||||
th = Thread.new do
|
||||
begin
|
||||
while buf = (self.verbose ? io.gets : io.read(5))
|
||||
while buf = (self.verbose ? i.gets : i.read(5))
|
||||
stdout.puts "p #{[buf].pack("m").gsub("\n","")}"
|
||||
end
|
||||
rescue IOError
|
||||
|
@ -70,13 +70,11 @@ module Test
|
|||
@@stop_auto_run = true
|
||||
@opts = @options.dup
|
||||
|
||||
STDOUT.sync = true
|
||||
STDOUT.puts "ready"
|
||||
Signal.trap(:INT,"IGNORE")
|
||||
|
||||
|
||||
@old_loadpath = []
|
||||
begin
|
||||
STDOUT.sync = true
|
||||
STDOUT.puts "ready"
|
||||
stdin = STDIN.dup
|
||||
stdout = STDOUT.dup
|
||||
while buf = stdin.gets
|
||||
|
@ -123,6 +121,7 @@ module Test
|
|||
exit
|
||||
end
|
||||
end
|
||||
rescue Errno::EPIPE
|
||||
rescue Exception => e
|
||||
begin
|
||||
STDOUT.puts "bye #{[Marshal.dump(e)].pack("m").gsub("\n","")}"
|
||||
|
|
|
@ -169,8 +169,7 @@ module TestParallel
|
|||
def test_jobs_status
|
||||
spawn_runner "--jobs-status"
|
||||
buf = timeout(10){@test_out.read}
|
||||
assert_match(/\d+:(ready|prepare|running) */,buf)
|
||||
assert_match(/test_(first|second|third|forth) */,buf)
|
||||
assert_match(/\d+=test_(first|second|third|forth) */,buf)
|
||||
end
|
||||
|
||||
end
|
||||
|
|
Загрузка…
Ссылка в новой задаче