DEV: Add a new way to run specs in parallel with better output (#7778)

* DEV: Add a new way to run specs in parallel with better output

This commit:

 1. adds a new executable, `bin/interleaved_rspec` which works much like
    `rspec`, but runs the tests in parallel.

 2. adds a rake task, `rake interleaved:spec` which runs the whole test
    suite.

 3. makes autospec use this new wrapper by default. You can disable this
    by running `PARALLEL_SPEC=0 rake autospec`.

It works much like the `parallel_tests` gem (and relies on it), but
makes each subprocess use a machine-readable formatter and parses this
output in order to provide a better overall summary.

(It's called interleaved, because parallel was taken and naming is
hard).

* Make popen3 invocation safer

* Use FileUtils instead of shelling out

* DRY up reporter

* Moved summary logic into Reporter

* s/interleaved/turbo/g

* Move Reporter into its own file

* Moved run into its own class

* Moved Runner into its own file

* Move JsonRowsFormatter under TurboTests

* Join on threads at the end

* Acted on feedback from eviltrout
This commit is contained in:
Daniel Waterworth
2019-06-21 01:59:01 +01:00
committed by Sam
parent 9f0574dcfd
commit e18ce56f4b
9 changed files with 460 additions and 30 deletions

View File

@ -0,0 +1,93 @@
module TurboTests
# An RSpec formatter used for each subprocess during parallel test execution
class JsonRowsFormatter
RSpec::Core::Formatters.register(
self,
:close,
:example_failed,
:example_passed,
:example_pending,
:seed
)
attr_reader :output
def initialize(output)
@output = output
end
def exception_to_json(exception)
if exception
{
backtrace: exception.backtrace,
message: exception.message,
cause: exception.cause
}
end
end
def execution_result_to_json(result)
{
example_skipped?: result.example_skipped?,
pending_message: result.pending_message,
status: result.status,
pending_fixed?: result.pending_fixed?,
exception: exception_to_json(result.exception)
}
end
def example_to_json(example)
{
execution_result: execution_result_to_json(example.execution_result),
location: example.location,
full_description: example.full_description,
metadata: {
shared_group_inclusion_backtrace:
example.metadata[:shared_group_inclusion_backtrace]
},
location_rerun_argument: example.location_rerun_argument
}
end
def example_passed(notification)
output_row({
type: :example_passed,
example: example_to_json(notification.example)
})
end
def example_pending(notification)
output_row({
type: :example_pending,
example: example_to_json(notification.example)
})
end
def example_failed(notification)
output_row({
type: :example_failed,
example: example_to_json(notification.example)
})
end
def seed(notification)
output_row({
type: :seed,
seed: notification.seed,
})
end
def close(notification)
output_row({
type: :close,
})
end
private
def output_row(obj)
output.puts(obj.to_json)
output.flush
end
end
end

103
lib/turbo_tests/reporter.rb Normal file
View File

@ -0,0 +1,103 @@
module TurboTests
class Reporter
def self.from_config(formatter_config, start_time)
reporter = new(start_time)
formatter_config.each do |config|
name, outputs = config.values_at(:name, :outputs)
outputs.map! do |filename|
filename == '-' ? STDOUT : File.open(filename, 'w')
end
reporter.add(name, outputs)
end
reporter
end
attr_reader :pending_examples
attr_reader :failed_examples
def initialize(start_time)
@formatters = []
@pending_examples = []
@failed_examples = []
@all_examples = []
@start_time = start_time
end
def add(name, outputs)
outputs.each do |output|
formatter_class =
case name
when 'p', 'progress'
RSpec::Core::Formatters::ProgressFormatter
else
Kernel.const_get(name)
end
@formatters << formatter_class.new(output)
end
end
def example_passed(example)
delegate_to_formatters(:example_passed, example.notification)
@all_examples << example
end
def example_pending(example)
delegate_to_formatters(:example_pending, example.notification)
@all_examples << example
@pending_examples << example
end
def example_failed(example)
delegate_to_formatters(:example_failed, example.notification)
@all_examples << example
@failed_examples << example
end
def finish
end_time = Time.now
delegate_to_formatters(:start_dump,
RSpec::Core::Notifications::NullNotification
)
delegate_to_formatters(:dump_pending,
RSpec::Core::Notifications::ExamplesNotification.new(
self
)
)
delegate_to_formatters(:dump_failures,
RSpec::Core::Notifications::ExamplesNotification.new(
self
)
)
delegate_to_formatters(:dump_summary,
RSpec::Core::Notifications::SummaryNotification.new(
end_time - @start_time,
@all_examples,
@failed_examples,
@pending_examples,
0,
0
)
)
delegate_to_formatters(:close,
RSpec::Core::Notifications::NullNotification
)
end
protected
def delegate_to_formatters(method, *args)
@formatters.each do |formatter|
formatter.send(method, *args) if formatter.respond_to?(method)
end
end
end
end

134
lib/turbo_tests/runner.rb Normal file
View File

@ -0,0 +1,134 @@
module TurboTests
class Runner
def self.run(formatter_config, files, start_time=Time.now)
reporter = Reporter.from_config(formatter_config, start_time)
new(reporter, files).run
end
def initialize(reporter, files)
@reporter = reporter
@files = files
@messages = Queue.new
@threads = []
end
def run
@num_processes = ParallelTests.determine_number_of_processes(nil)
tests_in_groups =
ParallelTests::RSpec::Runner.tests_in_groups(
@files,
@num_processes,
group_by: :filesize
)
setup_tmp_dir
tests_in_groups.each_with_index do |tests, process_num|
start_subprocess(tests, process_num + 1)
end
handle_messages
@reporter.finish
@threads.each(&:join)
end
protected
def setup_tmp_dir
begin
FileUtils.rm_r('tmp/test-pipes')
rescue Errno::ENOENT
end
FileUtils.mkdir_p('tmp/test-pipes/')
end
def start_subprocess(tests, process_num)
if tests.empty?
@messages << {type: 'exit', process_num: process_num}
else
begin
File.mkfifo("tmp/test-pipes/subprocess-#{process_num}")
rescue Errno::EEXIST
end
stdin, stdout, stderr, wait_thr =
Open3.popen3(
{'TEST_ENV_NUMBER' => process_num.to_s},
"bundle", "exec", "rspec",
"-f", "TurboTests::JsonRowsFormatter",
"-o", "tmp/test-pipes/subprocess-#{process_num}",
*tests
)
@threads <<
Thread.new do
File.open("tmp/test-pipes/subprocess-#{process_num}") do |fd|
fd.each_line do |line|
message = JSON.parse(line)
message = message.symbolize_keys
message[:process_num] = process_num
@messages << message
end
end
@messages << {type: 'exit', process_num: process_num}
end
@threads << start_copy_thread(stdout, STDOUT)
@threads << start_copy_thread(stderr, STDERR)
end
end
def start_copy_thread(src, dst)
Thread.new do
while true
begin
msg = src.readpartial(4096)
rescue EOFError
break
else
dst.write(msg)
end
end
end
end
def handle_messages
exited = 0
begin
while true
message = @messages.pop
case message[:type]
when 'example_passed'
example = FakeExample.from_obj(message[:example])
@reporter.example_passed(example)
when 'example_pending'
example = FakeExample.from_obj(message[:example])
@reporter.example_pending(example)
when 'example_failed'
example = FakeExample.from_obj(message[:example])
@reporter.example_failed(example)
when 'seed'
when 'close'
when 'exit'
exited += 1
if exited == @num_processes
break
end
else
STDERR.puts("Unhandled message in main process: #{message}")
end
STDOUT.flush
end
rescue Interrupt
end
end
end
end