Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement activity commands #445

Merged
merged 3 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package temporalcli

import (
"fmt"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/failure/v1"
"go.temporal.io/api/workflowservice/v1"
)

func (c *TemporalActivityCompleteCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

metadata := map[string][]byte{"encoding": []byte("json/plain")}
resultPayloads, err := CreatePayloads([][]byte{[]byte(c.Result)}, metadata, false)
if err != nil {
return err
}

_, err = cl.WorkflowService().RespondActivityTaskCompletedById(cctx, &workflowservice.RespondActivityTaskCompletedByIdRequest{
Copy link
Member

@cretz cretz Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is the approach of the old CLI, cl.CompleteActivityByID would have worked here too (you'd wrap the payload in rawValue). Arguably it'd be cleaner/clearer to use the high-level SDK client, but don't need to change now, but we might consider using the high-level API in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'm happy to make that change and it would be easy, however, it looks like it doesn't support identity so I'm going to leave it for now.

Copy link
Member

@cretz cretz Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually smart clients set identity at the client level not the request level. That's what makes the --identity here so strange.

Namespace: c.Parent.Namespace,
WorkflowId: c.WorkflowId,
RunId: c.RunId,
ActivityId: c.ActivityId,
Result: resultPayloads,
Identity: c.Identity,
})
if err != nil {
return fmt.Errorf("unable to complete Activity: %w", err)
}
return nil
}

func (c *TemporalActivityFailCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

metadata := map[string][]byte{"encoding": []byte("json/plain")}
var detailPayloads *common.Payloads
detailPayloads = nil
if len(c.Detail) > 0 {
detailPayloads, err = CreatePayloads([][]byte{[]byte(c.Detail)}, metadata, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metadata := map[string][]byte{"encoding": []byte("json/plain")}
var detailPayloads *common.Payloads
detailPayloads = nil
if len(c.Detail) > 0 {
detailPayloads, err = CreatePayloads([][]byte{[]byte(c.Detail)}, metadata, false)
var detailPayloads *common.Payloads
if len(c.Detail) > 0 {
metadata := map[string][]byte{"encoding": []byte("json/plain")}
detailPayloads, err = CreatePayloads([][]byte{[]byte(c.Detail)}, metadata, false)

Little cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

if err != nil {
return err
}
}
_, err = cl.WorkflowService().RespondActivityTaskFailedById(cctx, &workflowservice.RespondActivityTaskFailedByIdRequest{
Namespace: c.Parent.Namespace,
WorkflowId: c.WorkflowId,
RunId: c.RunId,
ActivityId: c.ActivityId,
Failure: &failure.Failure{
Message: c.Reason,
Source: "CLI",
FailureInfo: &failure.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failure.ApplicationFailureInfo{
NonRetryable: true,
Details: detailPayloads,
}},
},
Identity: c.Identity,
})
if err != nil {
return fmt.Errorf("unable to fail Activity: %w", err)
}
return nil
}
150 changes: 150 additions & 0 deletions temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package temporalcli_test

import (
"context"
"time"

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/sdk/client"
)

func (s *SharedServerSuite) TestActivity_Complete() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
identity := "MyIdentity"
res := s.Execute(
"activity", "complete",
"--activity-id", aid,
"--workflow-id", wid,
"--result", "\"complete-activity-result\"",
"--identity", identity,
"--address", s.Address(),
)
s.NoError(res.Err)
var actual string
s.NoError(run.Get(s.Context, &actual))
s.Equal("complete-activity-result", actual)

started, completed, failed := s.getActivityEvents(wid, aid)
s.NotNil(started)
s.Nil(failed)
s.NotNil(completed)
s.Equal("\"complete-activity-result\"", string(completed.Result.Payloads[0].GetData()))
s.Equal(identity, completed.GetIdentity())
}

func (s *SharedServerSuite) TestActivity_Fail() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
detail := "{\"myKey\": \"myValue\"}"
reason := "MyReason"
identity := "MyIdentity"
res := s.Execute(
"activity", "fail",
"--activity-id", aid,
"--workflow-id", wid,
"--run-id", run.GetRunID(),
"--detail", detail,
"--reason", reason,
"--identity", identity,
"--address", s.Address(),
)
s.NoError(res.Err)
err := run.Get(s.Context, nil)
s.NotNil(err)

started, completed, failed := s.getActivityEvents(wid, aid)
s.NotNil(started)
s.Nil(completed)
s.NotNil(failed)
s.Equal(
detail,
string(failed.GetFailure().GetApplicationFailureInfo().GetDetails().Payloads[0].GetData()),
)
s.Equal(reason, failed.GetFailure().Message)
s.Equal(identity, failed.GetIdentity())
}

func (s *SharedServerSuite) TestActivity_Complete_InvalidResult() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
res := s.Execute(
"activity", "complete",
"--activity-id", aid,
"--workflow-id", wid,
"--result", "{not json}",
"--address", s.Address(),
)
s.ErrorContains(res.Err, "is not valid JSON")

started, completed, failed := s.getActivityEvents(wid, aid)
s.Nil(started)
s.Nil(completed)
s.Nil(failed)
}

func (s *SharedServerSuite) TestActivity_Fail_InvalidDetail() {
run := s.waitActivityStarted()
wid := run.GetID()
aid := "dev-activity-id"
res := s.Execute(
"activity", "fail",
"--activity-id", aid,
"--workflow-id", wid,
"--detail", "{not json}",
"--address", s.Address(),
)
s.ErrorContains(res.Err, "is not valid JSON")

started, completed, failed := s.getActivityEvents(wid, aid)
s.Nil(started)
s.Nil(completed)
s.Nil(failed)
}

// Test helpers

func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun {
s.Worker.OnDevActivity(func(ctx context.Context, a any) (any, error) {
time.Sleep(0xFFFF * time.Hour)
return nil, nil
})
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)
s.Eventually(func() bool {
resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID())
s.NoError(err)
return len(resp.PendingActivities) > 0
}, 5*time.Second, 100*time.Millisecond)
return run
}

func (s *SharedServerSuite) getActivityEvents(workflowID, activityID string) (
started *history.ActivityTaskStartedEventAttributes,
completed *history.ActivityTaskCompletedEventAttributes,
failed *history.ActivityTaskFailedEventAttributes,
) {
iter := s.Client.GetWorkflowHistory(s.Context, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
s.NoError(err)
if attrs := event.GetActivityTaskStartedEventAttributes(); attrs != nil {
started = attrs
} else if attrs := event.GetActivityTaskCompletedEventAttributes(); attrs != nil {
completed = attrs
s.Equal("json/plain", string(completed.Result.Payloads[0].Metadata["encoding"]))
} else if attrs := event.GetActivityTaskFailedEventAttributes(); attrs != nil {
failed = attrs
}
}
return started, completed, failed
}
91 changes: 91 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand {
s.Command.Short = "Temporal command-line interface and development server."
s.Command.Long = ""
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalActivityCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalEnvCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalServerCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalTaskQueueCommand(cctx, &s).Command)
Expand All @@ -56,6 +57,96 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand {
return &s
}

type TemporalActivityCommand struct {
Parent *TemporalCommand
Command cobra.Command
ClientOptions
}

func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) *TemporalActivityCommand {
var s TemporalActivityCommand
s.Parent = parent
s.Command.Use = "activity"
s.Command.Short = "Complete or fail an activity."
s.Command.Long = ""
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command)
s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags())
return &s
}

type TemporalActivityCompleteCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
Identity string
Result string
}

func NewTemporalActivityCompleteCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityCompleteCommand {
var s TemporalActivityCompleteCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "complete [flags]"
s.Command.Short = "Complete an activity."
if hasHighlighting {
s.Command.Long = "Complete an Activity Execution.\n\n\x1b[1mtemporal activity complete --activity-id=MyActivityId --workflow-id=MyWorkflowId --result='{\"MyResultKey\": \"MyResultVal\"}'\x1b[0m"
} else {
s.Command.Long = "Complete an Activity Execution.\n\n`temporal activity complete --activity-id=MyActivityId --workflow-id=MyWorkflowId --result='{\"MyResultKey\": \"MyResultVal\"}'`"
}
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.ActivityId, "activity-id", "", "The Activity to be completed.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of operator.")
s.Command.Flags().StringVar(&s.Result, "result", "", "The result with which to complete the Activity (JSON).")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "result")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalActivityFailCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
Detail string
Identity string
Reason string
}

func NewTemporalActivityFailCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityFailCommand {
var s TemporalActivityFailCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "fail [flags]"
s.Command.Short = "Fail an activity."
if hasHighlighting {
s.Command.Long = "Fail an Activity Execution.\n\n\x1b[1mtemporal activity fail --activity-id=MyActivityId --workflow-id=MyWorkflowId\x1b[0m"
} else {
s.Command.Long = "Fail an Activity Execution.\n\n`temporal activity fail --activity-id=MyActivityId --workflow-id=MyWorkflowId`"
}
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.ActivityId, "activity-id", "", "The Activity to be failed.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.Detail, "detail", "", "JSON data describing reason for failing the Activity.")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of user submitting this request.")
s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for failing the Activity.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalEnvCommand struct {
Parent *TemporalCommand
Command cobra.Command
Expand Down
20 changes: 1 addition & 19 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package temporalcli

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -312,24 +311,7 @@ func (p *PayloadInputOptions) buildRawInputPayloads() (*common.Payloads, error)
}
metadata[metaPieces[0]] = []byte(metaPieces[1])
}

// Create payloads
ret := &common.Payloads{Payloads: make([]*common.Payload, len(inData))}
for i, in := range inData {
// First, if it's JSON, validate that it is accurate
if strings.HasPrefix(string(metadata["encoding"]), "json/") && !json.Valid(in) {
return nil, fmt.Errorf("input #%v is not valid JSON", i+1)
}
// Decode base64 if base64'd (std encoding only for now)
if p.InputBase64 {
var err error
if in, err = base64.StdEncoding.DecodeString(string(in)); err != nil {
return nil, fmt.Errorf("input #%v is not valid base64", i+1)
}
}
ret.Payloads[i] = &common.Payload{Data: in, Metadata: metadata}
}
return ret, nil
return CreatePayloads(inData, metadata, p.InputBase64)
}

// Rules:
Expand Down
5 changes: 4 additions & 1 deletion temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ func (d *devOperations) DevWorkflow(ctx workflow.Context, input any) (any, error
if callback != nil {
return callback(ctx, input)
}
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second})
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
ActivityID: "dev-activity-id",
})
var res any
err := workflow.ExecuteActivity(ctx, DevActivity, input).Get(ctx, &res)
return res, err
Expand Down
Loading
Loading