Skip to content

Commit

Permalink
gh-107219: Fix concurrent.futures terminate_broken() (#109244)
Browse files Browse the repository at this point in the history
Fix a race condition in concurrent.futures. When a process in the
process pool was terminated abruptly (while the future was running or
pending), close the connection write end. If the call queue is
blocked on sending bytes to a worker process, closing the connection
write end interrupts the send, so the queue can be closed.

Changes:

* _ExecutorManagerThread.terminate_broken() now closes
  call_queue._writer.
* multiprocessing PipeConnection.close() now interrupts
  WaitForMultipleObjects() in _send_bytes() by cancelling the
  overlapped operation.
  • Loading branch information
vstinner authored Sep 11, 2023
1 parent 3b2ecbc commit a9b1f84
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ def terminate_broken(self, cause):
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()

# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self.call_queue._writer.close()

# clean up resources
self.join_executor_internals()

Expand Down
18 changes: 18 additions & 0 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]

import errno
import io
import os
import sys
Expand Down Expand Up @@ -41,6 +42,7 @@
BUFSIZE = 8192
# A very generous timeout when it comes to local connections...
CONNECTION_TIMEOUT = 20.
WSA_OPERATION_ABORTED = 995

_mmap_counter = itertools.count()

Expand Down Expand Up @@ -271,12 +273,22 @@ class PipeConnection(_ConnectionBase):
with FILE_FLAG_OVERLAPPED.
"""
_got_empty_message = False
_send_ov = None

def _close(self, _CloseHandle=_winapi.CloseHandle):
ov = self._send_ov
if ov is not None:
# Interrupt WaitForMultipleObjects() in _send_bytes()
ov.cancel()
_CloseHandle(self._handle)

def _send_bytes(self, buf):
if self._send_ov is not None:
# A connection should only be used by a single thread
raise ValueError("concurrent send_bytes() calls "
"are not supported")
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
self._send_ov = ov
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
Expand All @@ -286,7 +298,13 @@ def _send_bytes(self, buf):
ov.cancel()
raise
finally:
self._send_ov = None
nwritten, err = ov.GetOverlappedResult(True)
if err == WSA_OPERATION_ABORTED:
# close() was called by another thread while
# WaitForMultipleObjects() was waiting for the overlapped
# operation.
raise OSError(errno.EPIPE, "handle is closed")
assert err == 0
assert nwritten == len(buf)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Fix a race condition in ``concurrent.futures``. When a process in the
process pool was terminated abruptly (while the future was running or
pending), close the connection write end. If the call queue is blocked on
sending bytes to a worker process, closing the connection write end interrupts
the send, so the queue can be closed. Patch by Victor Stinner.

0 comments on commit a9b1f84

Please sign in to comment.