diff --git a/ChangeLog b/ChangeLog index 997c6cac7a..25d72b04cd 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +Sat Feb 26 16:10:23 2011 Shota Fukumori + + * lib/test/unit.rb: --jobs-status won't puts over 2 lines. + + * test/testunit/test_parallel.rb: Fix test for above. + + * lib/test/*: refactoring. + Sat Feb 26 07:10:05 2011 Aaron Patterson * ext/psych/lib/psych/scalar_scanner.rb: fix parsing timezone's whose diff --git a/lib/test/unit.rb b/lib/test/unit.rb index dee7e3cb74..4d6d6014d6 100644 --- a/lib/test/unit.rb +++ b/lib/test/unit.rb @@ -229,33 +229,87 @@ module Test include Test::Unit::GCStressOption include Test::Unit::RunCount + class Worker + def self.launch(ruby,args=[]) + i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master + j,k = IO.pipe("ASCII-8BIT") # worker worker[:file].size - line2 << worker[:file].ljust(a.size) - else - a << " "*(worker[:file].size-a.size) - line2 << worker[:file] - end - end - else - line2 << " "*a.size - end - a - }.join(" ") - if @opts[:job_status_type] == :replace - @terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \ - || %x{tput cols 2>/dev/null}.to_i.nonzero? \ - || 80 - @jstr_size ||= 0 - del_jobs_status - STDOUT.flush - print line1[0...@terminal_width] - STDOUT.flush - @jstr_size = line1.size > @terminal_width ? @terminal_width : line1.size - else - puts line1 - puts line2.join(" ") - end + status_line = @workers.map(&:to_s).join(" ") + if @opts[:job_status_type] == :replace + @terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \ + || %x{tput cols 2>/dev/null}.to_i.nonzero? \ + || 80 + @jstr_size ||= 0 + del_jobs_status + STDOUT.flush + print status_line[0...@terminal_width] + STDOUT.flush + @jstr_size = status_line.size > @terminal_width ? \ + @terminal_width : status_line.size + else + puts status_line end end @@ -333,184 +366,174 @@ module Test def after_worker_dead(worker) return unless @opts[:parallel] return if @interrupt - worker[:status] = :quit - worker[:in].close - worker[:out].close @workers.delete(worker) @dead_workers << worker @ios = @workers.map{|w| w[:out] } end + def _run_parallel suites, type, result + begin + # Require needed things for parallel running + require 'thread' + require 'timeout' + @tasks = @files.dup # Array of filenames. + @need_quit = false + @dead_workers = [] # Array of dead workers. + @warnings = [] + shutting_down = false + rep = [] # FIXME: more good naming + + # Array of workers. + @workers = @opts[:parallel].times.map { + begin + worker = Worker.launch(@opts[:ruby],@args) + worker.hook(:dead) do |w,info| + after_worker_dead w + after_worker_down w, *info unless info.empty? + end + worker + rescue Exception; puts "#{$!.class}: #{$!.message}\n#{$!.backtrace}" + end + } + + # Thread: watchdog + watchdog = Thread.new do + while stat = Process.wait2 + break if @interrupt # Break when interrupt + w = (@workers + @dead_workers).find{|x| stat[0] == x[:pid] }.dup + next unless w + unless w[:status] == :quit + # Worker down + w.dead(nil, stat[1].to_i) + end + end + end + + @workers_hash = Hash[@workers.map {|w| [w[:out],w] }] # out-IO => worker + @ios = @workers.map{|w| w[:out] } # Array of worker IOs + + while _io = IO.select(@ios)[0] + break unless _io.each do |io| + break if @need_quit + worker = @workers_hash[io] + case worker.read + when /^okay$/ + worker[:status] = :running + jobs_status + when /^ready$/ + worker[:status] = :ready + if @tasks.empty? + break unless @workers.find{|x| x[:status] == :running } + else + worker.run(@tasks.shift, type) + end + + jobs_status + when /^done (.+?)$/ + r = Marshal.load($1.unpack("m")[0]) + result << r[0..1] + rep << {file: worker[:real_file], + report: r[2], result: r[3], testcase: r[5]} + $:.push(*r[4]).uniq! + when /^p (.+?)$/ + del_jobs_status + print $1.unpack("m")[0] + jobs_status if @opts[:job_status_type] == :replace + when /^after (.+?)$/ + @warnings << Marshal.load($1.unpack("m")[0]) + when /^bye (.+?)$/ + after_worker_down worker, Marshal.load($1.unpack("m")[0]) + when /^bye$/ + if shutting_down + after_worker_dead worker + else + after_worker_down worker + end + end + break if @need_quit + end + end + rescue Interrupt => e + @interrupt = e + return result + ensure + shutting_down = true + + watchdog.kill if watchdog + @workers.each do |worker| + begin + timeout(1) do + worker[:in].puts "quit" + end + rescue Errno::EPIPE + rescue Timeout::Error + end + [:in,:out].each { |name| worker[name].close } + end + begin + timeout(0.2*@workers.size) do + Process.waitall + end + rescue Timeout::Error + @workers.each do |worker| + begin + Process.kill(:KILL,worker[:pid]) + rescue Errno::ESRCH; end + end + end + + if @interrupt || @opts[:no_retry] + rep.each do |r| + report.push(*r[:report]) + end + @errors += rep.map{|x| x[:result][0] }.inject(:+) + @failures += rep.map{|x| x[:result][1] }.inject(:+) + @skips += rep.map{|x| x[:result][2] }.inject(:+) + elsif @need_quit + rep.each do |r| + report.push(*r[:report]) + @errors += r[:result][0] + @failures += r[:result][1] + @skips += r[:result][2] + end + else + puts "" + puts "Retrying..." + puts "" + @options = @opts + rep.each do |r| + if r[:testcase] && r[:file] && !r[:report].empty? + require r[:file] + _run_suite(eval(r[:testcase]),type) + else + report.push(*r[:report]) + @errors += r[:result][0] + @failures += r[:result][1] + @skips += r[:result][2] + end + end + end + if @warnings + warn "" + ary = [] + @warnings.reject! do |w| + r = ary.include?(w[1].message) + ary << w[1].message + r + end + @warnings.each do |w| + warn "#{w[0]}: #{w[1].message} (#{w[1].class})" + end + warn "" + end + end + end + def _run_suites suites, type @interrupt = nil result = [] if @opts[:parallel] - begin - # Require needed things for parallel running - require 'thread' - require 'timeout' - @tasks = @files.dup # Array of filenames. - @need_quit = false - @dead_workers = [] # Array of dead workers. - @warnings = [] - shutting_down = false - errors = [] - failures = [] - skips = [] - rep = [] - - # Array of workers. - @workers = @opts[:parallel].times.map do - i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master - j,k = IO.pipe("ASCII-8BIT") # worker worker - @ios = @workers.map{|w| w[:out] } # Array of worker IOs - - while _io = IO.select(@ios)[0] - break unless _io.each do |io| - break if @need_quit - worker = @workers_hash[io] - buf = ((worker[:status] == :quit) ? io.read : io.gets).chomp - case buf - when /^okay$/ # Worker will run task - worker[:status] = :running - jobs_status - when /^ready$/ # Worker is ready - worker[:status] = :ready - if @tasks.empty? - break unless @workers.find{|x| x[:status] == :running } - else - task = @tasks.shift - worker[:file] = File.basename(task).gsub(/\.rb/,"") - worker[:real_file] = task - begin - worker[:loadpath] ||= [] - worker[:in].puts "loadpath #{[Marshal.dump($:-worker[:loadpath])].pack("m").gsub("\n","")}" - worker[:loadpath] = $:.dup - worker[:in].puts "run #{task} #{type}" - worker[:status] = :prepare - rescue Errno::EPIPE - after_worker_down worker - rescue IOError - raise unless ["stream closed","closed stream"].include? $!.message - after_worker_down worker - end - end - - jobs_status - when /^done (.+?)$/ # Worker ran a one of suites in a file - r = Marshal.load($1.unpack("m")[0]) - # [result,result,report,$:] - result << r[0..1] - rep << {file: worker[:real_file], report: r[2], result: r[3], - testcase: r[5]} - errors << [worker[:real_file],r[5],r[3][0]] - failures << [worker[:real_file],r[5],r[3][1]] - skips << [worker[:real_file],r[5],r[3][2]] - $:.push(*r[4]).uniq! - worker[:status] = :done - jobs_status if @opts[:job_status_type] == :replace - worker[:status] = :running - when /^p (.+?)$/ # Worker wanna print to STDOUT - del_jobs_status - print $1.unpack("m")[0] - jobs_status if @opts[:job_status_type] == :replace - when /^after (.+?)$/ - @warnings << Marshal.load($1.unpack("m")[0]) - when /^bye (.+?)$/ # Worker will shutdown - e = Marshal.load($1.unpack("m")[0]) - after_worker_down worker, e - when /^bye$/ # Worker will shutdown - if shutting_down - after_worker_dead worker - else - after_worker_down worker - end - end - break if @need_quit - end - end - - # Retry - # TODO: Interrupt? - rescue Interrupt => e - @interrupt = e - return result - ensure - shutting_down = true - - watchdog.kill if watchdog - @workers.each do |worker| - begin - timeout(1) do - worker[:in].puts "quit" - end - rescue Errno::EPIPE - rescue Timeout::Error - end - [:in,:out].each do |name| - worker[name].close - end - end - begin - timeout(0.2*@workers.size) do - Process.waitall - end - rescue Timeout::Error - @workers.each do |worker| - begin - Process.kill(:KILL,worker[:pid]) - rescue Errno::ESRCH; end - end - end - - unless @need_quit - if @interrupt || @opts[:no_retry] - rep.each do |r| - report.push(*r[:report]) - end - @errors += errors.map(&:last).inject(:+) - @failures += failures.map(&:last).inject(:+) - @skips += skips.map(&:last).inject(:+) - else - puts "" - puts "Retrying..." - puts "" - @options = @opts - rep.each do |r| - if r[:testcase] && r[:file] && !r[:report].empty? - require r[:file] - _run_suite(eval(r[:testcase]),type) - else - report.push(*r[:report]) - @errors += r[:result][0] - @failures += r[:result][1] - @skips += r[:result][2] - end - end - end - end - end + _run_parallel suites, type, result else suites.each {|suite| begin diff --git a/lib/test/unit/parallel.rb b/lib/test/unit/parallel.rb index ae1bf2961c..80dd4eae56 100644 --- a/lib/test/unit/parallel.rb +++ b/lib/test/unit/parallel.rb @@ -26,9 +26,9 @@ module Test stdout = STDOUT.dup - th = Thread.new(i.dup) do |io| + th = Thread.new do begin - while buf = (self.verbose ? io.gets : io.read(5)) + while buf = (self.verbose ? i.gets : i.read(5)) stdout.puts "p #{[buf].pack("m").gsub("\n","")}" end rescue IOError @@ -70,13 +70,11 @@ module Test @@stop_auto_run = true @opts = @options.dup - STDOUT.sync = true - STDOUT.puts "ready" Signal.trap(:INT,"IGNORE") - - @old_loadpath = [] begin + STDOUT.sync = true + STDOUT.puts "ready" stdin = STDIN.dup stdout = STDOUT.dup while buf = stdin.gets @@ -123,6 +121,7 @@ module Test exit end end + rescue Errno::EPIPE rescue Exception => e begin STDOUT.puts "bye #{[Marshal.dump(e)].pack("m").gsub("\n","")}" diff --git a/test/testunit/test_parallel.rb b/test/testunit/test_parallel.rb index 3eff960af9..24e03be736 100644 --- a/test/testunit/test_parallel.rb +++ b/test/testunit/test_parallel.rb @@ -169,8 +169,7 @@ module TestParallel def test_jobs_status spawn_runner "--jobs-status" buf = timeout(10){@test_out.read} - assert_match(/\d+:(ready|prepare|running) */,buf) - assert_match(/test_(first|second|third|forth) */,buf) + assert_match(/\d+=test_(first|second|third|forth) */,buf) end end