Skip to content

Commit

Permalink
Merge pull request #853 from minrk/queue-abort
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 committed Feb 4, 2022
2 parents 97cf7bd + 23c3709 commit 7a229c6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
26 changes: 22 additions & 4 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ async def execute_request(self, stream, ident, parent):
self.log.debug("%s", reply_msg)

if not silent and reply_msg['content']['status'] == 'error' and stop_on_error:
await self._abort_queues()
self._abort_queues()

def do_execute(self, code, silent, store_history=True,
user_expressions=None, allow_stdin=False):
Expand Down Expand Up @@ -974,13 +974,31 @@ def _topic(self, topic):

_aborting = Bool(False)

async def _abort_queues(self):
self.shell_stream.flush()
def _abort_queues(self):
# while this flag is true,
# execute requests will be aborted
self._aborting = True
self.log.info("Aborting queue")

# flush streams, so all currently waiting messages
# are added to the queue
self.shell_stream.flush()

# Callback to signal that we are done aborting
def stop_aborting():
self.log.info("Finishing abort")
self._aborting = False
asyncio.get_event_loop().call_later(self.stop_on_error_timeout, stop_aborting)

# put the stop-aborting event on the message queue
# so that all messages already waiting in the queue are aborted
# before we reset the flag
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)

# if we have a delay, give messages this long to arrive on the queue
# before we stop aborting requests
asyncio.get_event_loop().call_later(
self.stop_on_error_timeout, schedule_stop_aborting
)

def _send_abort_reply(self, stream, msg, idents):
"""Send a reply to an aborted request"""
Expand Down
22 changes: 15 additions & 7 deletions ipykernel/tests/test_message_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,23 @@ def test_execute_stop_on_error():
"""execute request should not abort execution queue with stop_on_error False"""
flush_channels()

fail = '\n'.join([
# sleep to ensure subsequent message is waiting in the queue to be aborted
'import time',
'time.sleep(0.5)',
'raise ValueError',
])
fail = "\n".join(
[
# sleep to ensure subsequent message is waiting in the queue to be aborted
# async sleep to ensure coroutines are processing while this happens
"import asyncio",
"await asyncio.sleep(1)",
"raise ValueError()",
]
)
KC.execute(code=fail)
KC.execute(code='print("Hello")')
KC.get_shell_msg(timeout=TIMEOUT)
KC.execute(code='print("world")')
reply = KC.get_shell_msg(timeout=TIMEOUT)
print(reply)
reply = KC.get_shell_msg(timeout=TIMEOUT)
assert reply["content"]["status"] == "aborted"
# second message, too
reply = KC.get_shell_msg(timeout=TIMEOUT)
assert reply['content']['status'] == 'aborted'

Expand Down

0 comments on commit 7a229c6

Please sign in to comment.