Skip to content

Commit

Permalink
share cancellation source
Browse files Browse the repository at this point in the history
  • Loading branch information
colombod committed Feb 16, 2023
1 parent bfc84b4 commit 1a54b56
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 7 deletions.
46 changes: 44 additions & 2 deletions src/Microsoft.DotNet.Interactive.Tests/CancelCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace Microsoft.DotNet.Interactive.Tests;
#pragma warning disable xUnit1000
internal class CancelCommandTests : LanguageKernelTestBase
public class CancelCommandTests : LanguageKernelTestBase
{
public CancelCommandTests(ITestOutputHelper output) : base(output)
{
Expand Down Expand Up @@ -134,7 +134,7 @@ public async Task when_cancelling_command_it_reports_what_command_was_cancelled(
}
}

[Fact]
[Theory]
public async Task can_cancel_user_loop_using_CancellationToken()
{
// todo: this test is flaky and timeouts in CI
Expand Down Expand Up @@ -175,6 +175,48 @@ public async Task can_cancel_user_loop_using_CancellationToken()
}
}

[Fact]
public async Task can_cancel_user_code_when_commands_are_split()
{
// todo: this test is flaky and timeouts in CI
while (true)
{
using var kernel = CreateKernel();

var cancelCommand = new Cancel();

var commandToCancel = new SubmitCode(@"
#!csharp
using Microsoft.DotNet.Interactive;
var cancellationToken = KernelInvocationContext.Current.CancellationToken;
while(!cancellationToken.IsCancellationRequested){
await Task.Delay(10);
}");
try
{
var resultForCommandToCancel = kernel.SendAsync(commandToCancel);

await Task.Delay(200);

await kernel.SendAsync(cancelCommand).Timeout(10.Seconds());

var result = await resultForCommandToCancel.Timeout(10.Seconds());

result.Events
.Should()
.ContainSingle<CommandFailed>()
.Which
.Command
.Should()
.Be(commandToCancel);
break;
}
catch (TimeoutException)
{
}
}
}

[Fact]
public void user_code_can_react_to_cancel_command_using_KernelInvocationContext_cancellation_token()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ await composite.SendAsync(new SubmitCode(@$"#!value --name data --mime-type appl
.Which
.Should()
.BeEquivalentTo(expected, JsonEquivalenceConfig);

static EquivalencyAssertionOptions<JsonDocument> JsonEquivalenceConfig(EquivalencyAssertionOptions<JsonDocument> opt) => opt.ComparingByMembers<JsonElement>();

}

public EquivalencyAssertionOptions<JsonDocument> JsonEquivalenceConfig(EquivalencyAssertionOptions<JsonDocument> opt) => opt.ComparingByMembers<JsonElement>();


[Fact]
Expand Down
22 changes: 18 additions & 4 deletions src/Microsoft.DotNet.Interactive/KernelInvocationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.CommandLine;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -21,6 +22,7 @@ namespace Microsoft.DotNet.Interactive;
public class KernelInvocationContext : IDisposable
{
private static readonly AsyncLocal<KernelInvocationContext> _current = new();
private static readonly ConcurrentDictionary<string, CancellationTokenSource> _cancellationTokenSources = new ();

private readonly ReplaySubject<KernelEvent> _events = new();

Expand All @@ -32,6 +34,8 @@ public class KernelInvocationContext : IDisposable

private readonly CancellationTokenSource _cancellationTokenSource;

private bool _ownsCancellationTokenSource;

private KernelInvocationContext(KernelCommand command)
{
var operation = new OperationLogger(
Expand All @@ -47,16 +51,22 @@ private KernelInvocationContext(KernelCommand command)
category: nameof(KernelInvocationContext),
logOnStart: true);

_cancellationTokenSource = new CancellationTokenSource();
_cancellationTokenSource =
_cancellationTokenSources.GetOrAdd(
command.GetOrCreateToken(),
s =>
{
_ownsCancellationTokenSource = true;
return new CancellationTokenSource();
}
);

Command = command;

Result = new KernelCommandResult(command);

_disposables.Add(_events.Subscribe(Result.AddEvent));

_disposables.Add(_cancellationTokenSource);

_disposables.Add(ConsoleOutput.Subscribe(c =>
{
return new CompositeDisposable
Expand Down Expand Up @@ -321,7 +331,11 @@ public void Dispose()
}

Complete(Command);

if (_ownsCancellationTokenSource)
{
_cancellationTokenSources.TryRemove(Command.GetOrCreateToken(), out _);
_cancellationTokenSource.Dispose();
}
_disposables.Dispose();
}

Expand Down

0 comments on commit 1a54b56

Please sign in to comment.