Skip to content

Commit

Permalink
Add coalesce_streams (#279)
Browse files Browse the repository at this point in the history
* Add coalesce_streams

Co-authored-by: chrisjsewell <chrisj_sewell@hotmail.com>

* Add test

---------

Co-authored-by: chrisjsewell <chrisj_sewell@hotmail.com>
  • Loading branch information
davidbrochart and chrisjsewell authored Apr 3, 2023
1 parent 0f6a79d commit aa62bc7
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
50 changes: 50 additions & 0 deletions nbclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import base64
import collections
import datetime
import re
import signal
import typing as t
from contextlib import asynccontextmanager, contextmanager
Expand All @@ -28,6 +29,9 @@
from .output_widget import OutputWidget
from .util import ensure_async, run_hook, run_sync

_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])")
_RGX_BACKSPACE = re.compile(r"[^\n]\b")


def timestamp(msg: t.Optional[t.Dict] = None) -> str:
"""Get the timestamp for a message."""
Expand Down Expand Up @@ -426,6 +430,14 @@ def _kernel_manager_class_default(self) -> t.Type[KernelManager]:
)
)

coalesce_streams = Bool(
help=dedent(
"""
Merge all stream outputs with shared names into single streams.
"""
)
)

def __init__(self, nb: NotebookNode, km: t.Optional[KernelManager] = None, **kw: t.Any) -> None:
"""Initializes the execution manager.
Expand Down Expand Up @@ -1006,6 +1018,44 @@ async def async_execute_cell(
self.on_cell_executed, cell=cell, cell_index=cell_index, execute_reply=exec_reply
)
await self._check_raise_for_error(cell, cell_index, exec_reply)

if self.coalesce_streams and cell.outputs:
new_outputs = []
streams: dict[str, NotebookNode] = {}
for output in cell.outputs:
if output["output_type"] == "stream":
if output["name"] in streams:
streams[output["name"]]["text"] += output["text"]
else:
new_outputs.append(output)
streams[output["name"]] = output
else:
new_outputs.append(output)

# process \r and \b characters
for output in streams.values():
old = output["text"]
while len(output["text"]) < len(old):
old = output["text"]
# Cancel out anything-but-newline followed by backspace
output["text"] = _RGX_BACKSPACE.sub("", output["text"])
# Replace all carriage returns not followed by newline
output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"])

# We also want to ensure stdout and stderr are always in the same consecutive order,
# because they are asynchronous, so order isn't guaranteed.
for i, output in enumerate(new_outputs):
if output["output_type"] == "stream" and output["name"] == "stderr":
if (
len(new_outputs) >= i + 2
and new_outputs[i + 1]["output_type"] == "stream"
and new_outputs[i + 1]["name"] == "stdout"
):
stdout = new_outputs.pop(i + 1)
new_outputs.insert(i, stdout)

cell.outputs = new_outputs

self.nb['cells'][cell_index] = cell
return cell

Expand Down
31 changes: 31 additions & 0 deletions nbclient/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1830,3 +1830,34 @@ def test_error_async_cell_hooks(self, executor, cell_mock, message_mock):
hooks["on_notebook_start"].assert_not_called()
hooks["on_notebook_complete"].assert_not_called()
hooks["on_notebook_error"].assert_not_called()

@prepare_cell_mocks(
{
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stdout', 'text': 'foo1'},
},
{
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stderr', 'text': 'bar1'},
},
{
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stdout', 'text': 'foo2'},
},
{
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stderr', 'text': 'bar2'},
},
)
def test_coalesce_streams(self, executor, cell_mock, message_mock):
executor.coalesce_streams = True
executor.execute_cell(cell_mock, 0)

assert cell_mock.outputs == [
{'output_type': 'stream', 'name': 'stdout', 'text': 'foo1foo2'},
{'output_type': 'stream', 'name': 'stderr', 'text': 'bar1bar2'},
]

0 comments on commit aa62bc7

Please sign in to comment.