Avoid cascading failures when compiler crashes

Change logic so that we never run the multiprocessing codepath with only
1 worker. That configuration was causing all subsequent tests to
spuriously fail if one test failed with a crash (this was easy to see
after sorting the tests). That configuration was the one used by the CI.

Also, sort tests to make output nicer.
Also, make verbose mode more verbose so that it is easy to see in `-s`
mode which test is crashing.
pull/1296/head
Sean Silva 2022-08-25 21:44:21 +00:00
parent 493f45f939
commit a507ae498a
2 changed files with 31 additions and 18 deletions

View File

@ -117,7 +117,7 @@ def main():
sys.exit(1)
# Run the tests.
results = run_tests(tests, config, args.sequential)
results = run_tests(tests, config, args.sequential, args.verbose)
# Report the test results.
failed = report_results(results, xfail_set, args.verbose)

View File

@ -23,6 +23,7 @@ compiling or TorchScript'ing).
import abc
from typing import Any, Callable, List, NamedTuple, Optional, TypeVar, Union, Dict
import sys
import traceback
import torch
@ -280,9 +281,11 @@ def generate_golden_trace(test: Test) -> Trace:
return trace
def compile_and_run_test(test: Test, config: TestConfig) -> Any:
def compile_and_run_test(test: Test, config: TestConfig, verbose=False) -> Any:
try:
golden_trace = generate_golden_trace(test)
if verbose:
print(f"Compiling {test.unique_name}...", file=sys.stderr)
compiled = config.compile(test.program_factory())
except Exception as e:
return TestResult(unique_name=test.unique_name,
@ -293,6 +296,8 @@ def compile_and_run_test(test: Test, config: TestConfig) -> Any:
trace=None,
golden_trace=None)
try:
if verbose:
print(f"Running {test.unique_name}...", file=sys.stderr)
trace = config.run(compiled, golden_trace)
except Exception as e:
return TestResult(unique_name=test.unique_name,
@ -312,31 +317,39 @@ def compile_and_run_test(test: Test, config: TestConfig) -> Any:
queue_sentinel = "QUEUE_SENTINEL"
def run_workers_in_parallel(task_queue: mp.Queue, worker):
NUMBER_OF_PROCESSES = min(int(mp.cpu_count() * 1.1), task_queue.qsize())
# TODO: We've noticed that on certain 2 core machine parallelizing the tests
# makes the llvm backend legacy pass manager 20x slower than using a
# single process. Need to investigate the root cause eventually. This is a
# hack to work around this issue.
if mp.cpu_count() == 2:
NUMBER_OF_PROCESSES = 1
def run_workers_in_parallel(task_queue: mp.Queue, worker, num_processes: int):
processes = []
for i in range(NUMBER_OF_PROCESSES):
for i in range(num_processes):
p = mp.get_context("fork").Process(target=worker, args=(task_queue, ))
p.start()
processes.append(p)
for i in range(NUMBER_OF_PROCESSES):
for i in range(num_processes):
task_queue.put(queue_sentinel)
for p in processes:
p.join()
def run_tests(tests: List[Test], config: TestConfig, sequential = False) -> List[TestResult]:
def run_tests(tests: List[Test], config: TestConfig, sequential=False, verbose=False) -> List[TestResult]:
"""Invoke the given `Test`'s with the provided `TestConfig`."""
if sequential:
return [compile_and_run_test(test, config) for test in tests]
num_processes = min(int(mp.cpu_count() * 1.1), len(tests))
# TODO: We've noticed that on certain 2 core machine parallelizing the tests
# makes the llvm backend legacy pass manager 20x slower than using a
# single process. Need to investigate the root cause eventually. This is a
# hack to work around this issue.
# Also our multiprocessing implementation is not the most efficient, so
# the benefit at core count 2 is probably not worth it anyway.
if mp.cpu_count() == 2:
num_processes = 1
# TODO: If num_processes == 1, then run without any of the multiprocessing
# machinery. In theory it should work, but any crash in the testing process
# seems to cause a cascade of failures resulting in undecipherable error
# messages.
if num_processes == 1 or sequential:
return [compile_and_run_test(test, config, verbose) for test in tests]
# Sort the tests to make output nicer.
tests = list(sorted(tests, key=lambda t: t.unique_name))
# To run e2e tests in parallel:
# The tests are put into a synchronized queue. Multiple worker processes are
@ -360,7 +373,7 @@ def run_tests(tests: List[Test], config: TestConfig, sequential = False) -> List
sync_results.append(
compile_and_run_test(tests_dict[test_name], config))
run_workers_in_parallel(tests_queue, worker)
run_workers_in_parallel(tests_queue, worker, num_processes)
tests_with_results = {result.unique_name for result in sync_results}
all_tests = {test.unique_name for test in tests}
# For processes that are crashed due to compile time or runtime error,