Skip to content

Commit

Permalink
Add dedicated wasm implementation for Parallel.Invoke and Parallel.For (
Browse files Browse the repository at this point in the history
#57283)

Introduces a simple single-threaded implementation for Parallel.Invoke, .For, and .ForEach that bypasses Task and wait primitives so that in browser environments parallel loop operations complete synchronously on the current thread
  • Loading branch information
kg authored Aug 17, 2021
1 parent 09c2902 commit addbbcb
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<Compile Include="System\Runtime\InteropServices\JavaScript\DelegateTests.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\HelperMarshal.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\Http\HttpRequestMessageTest.cs" />
<Compile Include="System\Runtime\InteropServices\JavaScript\ParallelTests.cs" />
</ItemGroup>
<ItemGroup>
<!-- Part of the shared framework but not exposed. -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections;
using System.Collections.Generic;
using System.Runtime.InteropServices.JavaScript;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace System.Runtime.InteropServices.JavaScript.Tests
{
public static class ParallelTests
{
// The behavior of APIs like Invoke depends on how many items they are asked to invoke
[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(2)]
[InlineData(5)]
[InlineData(32)]
[InlineData(250)]
public static void ParallelInvokeActionArray(int count)
{
var actions = new List<Action>();
int sum = 0, expected = 0;
for (int i = 0; i < count; i++) {
int j = i;
actions.Add(() => {
sum += j;
});
expected += j;
}

Parallel.Invoke(actions.ToArray());
Assert.Equal(expected, sum);
}

[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(32)]
[InlineData(250)]
public static void ParallelFor(int count)
{
int sum = 0, expected = 0;
for (int i = 0; i < count; i++)
expected += i;
Parallel.For(0, count, (i) => { sum += i; });
Assert.Equal(expected, sum);
}

[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(32)]
[InlineData(250)]
public static void ParallelForEach(int count)
{
int sum = 0, expected = 0;
var items = new List<int>();
for (int i = 0; i < count; i++) {
items.Add(i);
expected += i;
}
Parallel.ForEach(items, (i) => { sum += i; });
Assert.Equal(expected, sum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,12 @@ public static void Invoke(ParallelOptions parallelOptions, params Action[] actio
{
// If we've gotten this far, it's time to process the actions.

// This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism:
if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) ||
(parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
// Web browsers need special treatment that is implemented in TaskReplicator
if (OperatingSystem.IsBrowser() ||
// This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism:
(actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) ||
(parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)
)
{
// Used to hold any exceptions encountered during action processing
ConcurrentQueue<Exception>? exceptionQ = null; // will be lazily initialized if necessary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,29 @@ private TaskReplicator(ParallelOptions options, bool stopOnFirstFailure)

public static void Run<TState>(ReplicatableUserAction<TState> action, ParallelOptions options, bool stopOnFirstFailure)
{
int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;

TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();

Replica? nextReplica;
while (replicator._pendingReplicas.TryDequeue(out nextReplica))
nextReplica.Wait();

if (replicator._exceptions != null)
throw new AggregateException(replicator._exceptions);
// Browser hosts do not support synchronous Wait so we want to run the
// replicated task directly instead of going through Task infrastructure
if (OperatingSystem.IsBrowser()) {
// Since we are running on a single thread, we don't want the action to time out
var timeout = int.MaxValue - 1;
var state = default(TState)!;

action(ref state, timeout, out bool yieldedBeforeCompletion);
if (yieldedBeforeCompletion)
throw new Exception("Replicated tasks cannot yield in this single-threaded browser environment");
} else {
int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;

TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();

Replica? nextReplica;
while (replicator._pendingReplicas.TryDequeue(out nextReplica))
nextReplica.Wait();

if (replicator._exceptions != null)
throw new AggregateException(replicator._exceptions);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace System.Threading.Tasks.Tests
{
public static class BreakTests
{
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[InlineData(100, 10)]
[InlineData(100, 20)]
[InlineData(1000, 100)]
Expand Down Expand Up @@ -46,7 +46,7 @@ public static void TestFor_Break_Basic(int loopsize, int breakpoint)
Assert.True(result, "TestForBreak: Failed: Could not detect any interruption of For-loop.");
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[InlineData(100, 10)]
[InlineData(100, 20)]
[InlineData(1000, 100)]
Expand Down Expand Up @@ -86,7 +86,7 @@ public static void TestFor_Break_64Bits(int loopsize, int breakpoint)
Assert.True(result, "TestFor64Break: Failed: Could not detect any interruption of For-loop.");
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[InlineData(500, 10)]
[InlineData(500, 20)]
[InlineData(1000, 100)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace System.Threading.Tasks.Tests
{
public static class ParallelForUnitTests
{
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[InlineData(API.For64, StartIndexBase.Int32, 0, WithParallelOption.None, ActionWithState.None, ActionWithLocal.None)]
[InlineData(API.For64, StartIndexBase.Int32, 10, WithParallelOption.None, ActionWithState.Stop, ActionWithLocal.HasFinally)]
[InlineData(API.For64, StartIndexBase.Int32, 10, WithParallelOption.WithDOP, ActionWithState.None, ActionWithLocal.None)]
Expand Down

0 comments on commit addbbcb

Please sign in to comment.