Skip to content

Commit

Permalink
log exception/backtrace when a process run in parallel fails (#3423)
Browse files Browse the repository at this point in the history
What is the motivation for this PR?
Currently when a Process run via parallel_run function in parallel.py fails because of an exception, we
are only able to get the exit code, and fail a test. We have to dig through the debug test logs to
figure out why the process failed to find the exception.

How did you do it?
By wrapping the multiprocessing.Process class in SonicProcess class, we can catch the exception when the Process is run by
enclosing the run call in a try/catch block and storing the exception. parallel_run would then create instances of SonicProcess instead of Process.
If any of the SonicProcess has an non-zero exit code, we can get the actual exception and log it.

How did you verify/test it?
Ran sanity check where we have an exception in a log message (using hostname1 instead of hostname). Here is the output in the log
  • Loading branch information
sanmalho-git authored May 14, 2021
1 parent 1562eb1 commit 9dc1c17
Showing 1 changed file with 45 additions and 7 deletions.
52 changes: 45 additions & 7 deletions tests/common/helpers/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,41 @@
import shutil
import tempfile
import signal
from multiprocessing import Process, Manager
import traceback
from multiprocessing import Process, Manager, Pipe
from tests.common.helpers.assertions import pytest_assert as pt_assert

logger = logging.getLogger(__name__)


class SonicProcess(Process):
"""
Wrapper class around multiprocessing.Process that would capture the exception thrown if the Process throws
an exception when run.
This exception (including backtrace) can be logged in test log to provide better info of why a particular Process failed.
"""
def __init__(self, *args, **kwargs):
Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = Pipe()
self._exception = None

def run(self):
try:
Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
raise e

@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception


def parallel_run(target, args, kwargs, nodes, timeout=None):
"""Run target function on nodes in parallel
Expand Down Expand Up @@ -39,7 +68,7 @@ def parallel_run(target, args, kwargs, nodes, timeout=None):
kwargs['node'] = node
kwargs['results'] = results
process_name = "{}--{}".format(target.__name__, node)
worker = Process(name=process_name, target=target, args=args, kwargs=kwargs)
worker = SonicProcess(name=process_name, target=target, args=args, kwargs=kwargs)
worker.start()
logger.debug('Started process {} running target "{}"'.format(worker.pid, process_name))
workers.append(worker)
Expand All @@ -56,7 +85,12 @@ def parallel_run(target, args, kwargs, nodes, timeout=None):
break

# check if we have any processes that failed - have exitcode non-zero
failed_processes = [worker for worker in workers if worker.exitcode != 0]
failed_processes = {}
for worker in workers:
if worker.exitcode != 0:
failed_processes[worker.name] = {}
failed_processes[worker.name]['exit_code'] = worker.exitcode
failed_processes[worker.name]['exception'] = worker.exception

# Force terminate spawned processes
for worker in workers:
Expand All @@ -80,10 +114,14 @@ def parallel_run(target, args, kwargs, nodes, timeout=None):
pt_assert(False, \
'Processes running target "{}" could not be terminated. Tried killing them. But please check'.format(target.__name__))

# if we have failed processes, we should throw an exception and fail
if len(failed_processes):
logger.error('Processes "{}" had failures. Please check the debug logs'.format(failed_processes))
pt_assert(False, 'Processes "{}" had failures. Please check the debug logs'.format(failed_processes))
# if we have failed processes, we should log the exception and exit code of each Process and fail
if len(failed_processes.keys()):
for process_name, process in failed_processes.items():
p_exception = process['exception'][0]
p_traceback = process['exception'][1]
p_exitcode = process['exit_code']
logger.error('Process {} had exit code {} and exception {} and traceback {}'.format(process_name, p_exitcode, p_exception, p_traceback))
pt_assert(False, 'Processes "{}" had failures. Please check the logs'.format(failed_processes.keys()))

logger.info('Completed running processes for target "{}" in {} seconds'.format(target.__name__, str(delta_time)))

Expand Down

0 comments on commit 9dc1c17

Please sign in to comment.