Skip to content

Commit

Permalink
Workflow unit testing changes for 1.10 release (dapr#1038)
Browse files Browse the repository at this point in the history
* Update DurableTask SDK dependency to get ARM64 compatibility (dapr#1024)

* Update DurableTask SDK dependency to get ARM64 compatibility

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Fix issue with gRPC address override behavior

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Workflow SDK changes to enable unit testing

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

---------

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
  • Loading branch information
cgillum authored Feb 16, 2023
1 parent 0f1e1bf commit 23ba7a2
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 57 deletions.
98 changes: 98 additions & 0 deletions src/Dapr.Workflow/DaprWorkflowContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Workflow
{
using System;
using Microsoft.DurableTask;
using System.Threading.Tasks;
using System.Threading;

class DaprWorkflowContext : WorkflowContext
{
readonly TaskOrchestrationContext innerContext;

internal DaprWorkflowContext(TaskOrchestrationContext innerContext)
{
this.innerContext = innerContext ?? throw new ArgumentNullException(nameof(innerContext));
}

public override string Name => this.innerContext.Name;

public override string InstanceId => this.innerContext.InstanceId;

public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;

public override bool IsReplaying => this.innerContext.IsReplaying;

public override Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync(name, input, options);
}

public override Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync<T>(name, input, options);
}

public override Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
{
return this.innerContext.CreateTimer(delay, cancellationToken);
}

public override Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken)
{
return this.innerContext.CreateTimer(fireAt, cancellationToken);
}

public override Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken cancellationToken = default)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, cancellationToken);
}

public override Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, timeout);
}

public override void SendEvent(string instanceId, string eventName, object payload)
{
this.innerContext.SendEvent(instanceId, eventName, payload);
}

public override void SetCustomStatus(object? customStatus)
{
this.innerContext.SetCustomStatus(customStatus);
}

public override Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options);
}

public override Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync(workflowName, input, options);
}

public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
{
this.innerContext.ContinueAsNew(newInput!, preserveUnprocessedEvents);
}

public override Guid NewGuid()
{
return this.innerContext.NewGuid();
}
}
}
74 changes: 20 additions & 54 deletions src/Dapr.Workflow/WorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,17 @@ namespace Dapr.Workflow
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
/// external events, and for getting basic information about the current workflow instance.
/// </summary>
public class WorkflowContext
public abstract class WorkflowContext
{
readonly TaskOrchestrationContext innerContext;

internal WorkflowContext(TaskOrchestrationContext innerContext)
{
this.innerContext = innerContext ?? throw new ArgumentNullException(nameof(innerContext));
}

/// <summary>
/// Gets the name of the current workflow.
/// </summary>
public string Name => this.innerContext.Name;
public abstract string Name { get; }

/// <summary>
/// Gets the instance ID of the current workflow.
/// </summary>
public string InstanceId => this.innerContext.InstanceId;
public abstract string InstanceId { get; }

/// <summary>
/// Gets the current workflow time in UTC.
Expand All @@ -51,7 +44,7 @@ internal WorkflowContext(TaskOrchestrationContext innerContext)
/// the current time, such as <see cref="DateTime.UtcNow"/> and <see cref="DateTimeOffset.UtcNow"/>
/// (which should not be used).
/// </remarks>
public DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;
public abstract DateTime CurrentUtcDateTime { get; }

/// <summary>
/// Gets a value indicating whether the workflow is currently replaying a previous execution.
Expand All @@ -72,7 +65,7 @@ internal WorkflowContext(TaskOrchestrationContext innerContext)
/// <value>
/// <c>true</c> if the workflow is currently replaying a previous execution; otherwise <c>false</c>.
/// </value>
public bool IsReplaying => this.innerContext.IsReplaying;
public abstract bool IsReplaying { get; }

/// <summary>
/// Asynchronously invokes an activity by name and with the specified input value.
Expand Down Expand Up @@ -108,19 +101,16 @@ internal WorkflowContext(TaskOrchestrationContext innerContext)
/// The activity failed with an unhandled exception. The details of the failure can be found in the
/// <see cref="TaskFailedException.FailureDetails"/> property.
/// </exception>
public Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
public virtual Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync(name, input, options);
return this.CallActivityAsync<object>(name, input, options);
}

/// <returns>
/// A task that completes when the activity completes or fails. The result of the task is the activity's return value.
/// </returns>
/// <inheritdoc cref="CallActivityAsync"/>
public Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallActivityAsync<T>(name, input, options);
}
public abstract Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null);

/// <summary>
/// Creates a durable timer that expires after the specified delay.
Expand All @@ -131,9 +121,9 @@ public Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptio
/// <exception cref="InvalidOperationException">
/// Thrown if the calling thread is not the workflow dispatch thread.
/// </exception>
public Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
{
return this.innerContext.CreateTimer(delay, cancellationToken);
return this.CreateTimer(this.CurrentUtcDateTime.Add(delay), cancellationToken);
}

/// <summary>
Expand All @@ -142,10 +132,7 @@ public Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = de
/// <param name="fireAt">The time at which the timer should expire.</param>
/// <param name="cancellationToken">Used to cancel the durable timer.</param>
/// <inheritdoc cref="CreateTimer(TimeSpan, CancellationToken)"/>
public Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken)
{
return this.innerContext.CreateTimer(fireAt, cancellationToken);
}
public abstract Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken);

/// <summary>
/// Waits for an event to be raised with name <paramref name="eventName"/> and returns the event data.
Expand Down Expand Up @@ -176,10 +163,7 @@ public Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken)
/// <exception cref="InvalidOperationException">
/// Thrown if the calling thread is not the workflow dispatch thread.
/// </exception>
public Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken cancellationToken = default)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, cancellationToken);
}
public abstract Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken cancellationToken = default);

/// <summary>
/// Waits for an event to be raised with name <paramref name="eventName"/> and returns the event data.
Expand All @@ -190,10 +174,7 @@ public Task<T> WaitForExternalEventAsync<T>(string eventName, CancellationToken
/// </param>
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
/// <inheritdoc cref="WaitForExternalEventAsync{T}(string, CancellationToken)"/>
public Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout)
{
return this.innerContext.WaitForExternalEvent<T>(eventName, timeout);
}
public abstract Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout);

/// <summary>
/// Raises an external event for the specified workflow instance.
Expand All @@ -208,10 +189,7 @@ public Task<T> WaitForExternalEventAsync<T>(string eventName, TimeSpan timeout)
/// <param name="instanceId">The ID of the workflow instance to send the event to.</param>
/// <param name="eventName">The name of the event to wait for. Event names are case-insensitive.</param>
/// <param name="payload">The serializable payload of the external event.</param>
public void SendEvent(string instanceId, string eventName, object payload)
{
this.SendEvent(instanceId, eventName, payload);
}
public abstract void SendEvent(string instanceId, string eventName, object payload);

/// <summary>
/// Assigns a custom status value to the current workflow.
Expand All @@ -226,10 +204,7 @@ public void SendEvent(string instanceId, string eventName, object payload)
/// <exception cref="InvalidOperationException">
/// Thrown if the calling thread is not the workflow dispatch thread.
/// </exception>
public void SetCustomStatus(object? customStatus)
{
this.innerContext.SetCustomStatus(customStatus);
}
public abstract void SetCustomStatus(object? customStatus);

/// <summary>
/// Executes the specified workflow as a child workflow and returns the result.
Expand All @@ -238,10 +213,7 @@ public void SetCustomStatus(object? customStatus)
/// The type into which to deserialize the child workflow's output.
/// </typeparam>
/// <inheritdoc cref="CallChildWorkflowAsync(string, object?, TaskOptions?)"/>
public Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options);
}
public abstract Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null);

/// <summary>
/// Executes the specified workflow as a child workflow.
Expand Down Expand Up @@ -284,9 +256,9 @@ public Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object
/// The child workflow failed with an unhandled exception. The details of the failure can be found in the
/// <see cref="TaskFailedException.FailureDetails"/> property.
/// </exception>
public Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
public virtual Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
{
return this.innerContext.CallSubOrchestratorAsync(workflowName, input, options);
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
}

/// <summary>
Expand Down Expand Up @@ -320,10 +292,7 @@ public Task CallChildWorkflowAsync(string workflowName, object? input = null, Ta
/// history when the workflow instance restarts. If <c>false</c>, any unprocessed
/// external events will be discarded when the workflow instance restarts.
/// </param>
public void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
{
this.innerContext.ContinueAsNew(newInput!, preserveUnprocessedEvents);
}
public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true);

/// <summary>
/// Creates a new GUID that is safe for replay within a workflow.
Expand All @@ -334,9 +303,6 @@ public void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvent
/// and an internally managed sequence number.
/// </remarks>
/// <returns>The new <see cref="Guid"/> value.</returns>
public Guid NewGuid()
{
return this.innerContext.NewGuid();
}
public abstract Guid NewGuid();
}
}
5 changes: 2 additions & 3 deletions src/Dapr.Workflow/WorkflowRuntimeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace Dapr.Workflow
using System.Threading.Tasks;
using Microsoft.DurableTask;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

/// <summary>
/// Defines runtime options for workflows.
Expand Down Expand Up @@ -52,7 +51,7 @@ public void RegisterWorkflow<TInput, TOutput>(string name, Func<WorkflowContext,
{
registry.AddOrchestratorFunc<TInput, TOutput>(name, (innerContext, input) =>
{
WorkflowContext workflowContext = new(innerContext);
WorkflowContext workflowContext = new DaprWorkflowContext(innerContext);
return implementation(workflowContext, input);
});
});
Expand Down Expand Up @@ -145,7 +144,7 @@ public OrchestratorWrapper(IWorkflow workflow)

public Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
return this.workflow.RunAsync(new WorkflowContext(context), input);
return this.workflow.RunAsync(new DaprWorkflowContext(context), input);
}
}

Expand Down

0 comments on commit 23ba7a2

Please sign in to comment.