Skip to content

Commit

Permalink
[BCF-2630] Add pipeline runner wrapper (#11091)
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored Oct 30, 2023
1 parent b655080 commit db64df9
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1458,8 +1458,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 h1:vdieOW3CZGdD2R5zvCSMS+0vksyExPN3/Fa1uVfld/A=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
94 changes: 94 additions & 0 deletions core/services/ocr2/plugins/generic/pipeline_runner_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package generic

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

var _ types.PipelineRunnerService = (*PipelineRunnerAdapter)(nil)

type pipelineRunner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
}

type PipelineRunnerAdapter struct {
runner pipelineRunner
job job.Job
logger logger.Logger
}

func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) ([]types.TaskResult, error) {
s := pipeline.Spec{
DotDagSource: spec,
CreatedAt: time.Now(),
MaxTaskDuration: models.Interval(options.MaxTaskDuration),
JobID: p.job.ID,
JobName: p.job.Name.ValueOrZero(),
JobType: string(p.job.Type),
}

defaultVars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": p.job.ID,
"externalJobID": p.job.ExternalJobID,
"name": p.job.Name.ValueOrZero(),
},
}

err := merge(defaultVars, vars.Vars)
if err != nil {
return nil, err
}

finalVars := pipeline.NewVarsFrom(defaultVars)
_, trrs, err := p.runner.ExecuteRun(ctx, s, finalVars, p.logger)
if err != nil {
return nil, err
}

taskResults := make([]types.TaskResult, len(trrs))
for i, trr := range trrs {
taskResults[i] = types.TaskResult{
ID: trr.ID.String(),
Type: string(trr.Task.Type()),
Value: trr.Result.Value,
Error: trr.Result.Error,
Index: int(trr.TaskRun.Index),
}
}
return taskResults, nil
}

func NewPipelineRunnerAdapter(logger logger.Logger, job job.Job, runner pipelineRunner) *PipelineRunnerAdapter {
return &PipelineRunnerAdapter{
logger: logger,
job: job,
runner: runner,
}
}

// merge merges mapTwo into mapOne, modifying mapOne in the process.
func merge(mapOne, mapTwo map[string]interface{}) error {
for k, v := range mapTwo {
// if `mapOne` doesn't have `k`, then nothing to do, just assign v to `mapOne`.
if _, ok := mapOne[k]; !ok {
mapOne[k] = v
} else {
vAsMap, vOK := v.(map[string]interface{})
mapOneVAsMap, moOK := mapOne[k].(map[string]interface{})
if vOK && moOK {
merge(mapOneVAsMap, vAsMap)
} else {
mapOne[k] = v
}
}
}

return nil
}
144 changes: 144 additions & 0 deletions core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package generic

import (
"context"
"net/http"
"reflect"
"testing"
"time"

"github.com/google/uuid"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
_ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const spec = `
answer [type=sum values=<[ $(val), 2 ]>]
answer;
`

func TestAdapter_Integration(t *testing.T) {
logger := logger.TestLogger(t)
cfg := configtest.NewTestGeneralConfig(t)
url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)

keystore := keystore.NewInMemory(db, utils.FastScryptParams, logger, cfg.Database())
pipelineORM := pipeline.NewORM(db, logger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db, logger, cfg.Database())
pr := pipeline.NewRunner(
pipelineORM,
bridgesORM,
cfg.JobPipeline(),
cfg.WebServer(),
nil,
keystore.Eth(),
keystore.VRF(),
logger,
http.DefaultClient,
http.DefaultClient,
)
pra := NewPipelineRunnerAdapter(logger, job.Job{}, pr)
results, err := pra.ExecuteRun(context.Background(), spec, types.Vars{Vars: map[string]interface{}{"val": 1}}, types.Options{})
require.NoError(t, err)

finalResult := results[0].Value.(decimal.Decimal)

assert.True(t, decimal.NewFromInt(3).Equal(finalResult))
}

func newMockPipelineRunner() *mockPipelineRunner {
return &mockPipelineRunner{}
}

type mockPipelineRunner struct {
results pipeline.TaskRunResults
err error
run *pipeline.Run
spec pipeline.Spec
vars pipeline.Vars
}

func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error) {
m.spec = spec
m.vars = vars
return m.run, m.results, m.err
}

func TestAdapter_AddsDefaultVars(t *testing.T) {
logger := logger.TestLogger(t)
mpr := newMockPipelineRunner()
jobID, externalJobID, name := int32(100), uuid.New(), null.StringFrom("job-name")
pra := NewPipelineRunnerAdapter(logger, job.Job{ID: jobID, ExternalJobID: externalJobID, Name: name}, mpr)

_, err := pra.ExecuteRun(context.Background(), spec, types.Vars{}, types.Options{})
require.NoError(t, err)

gotName, err := mpr.vars.Get("jb.name")
require.NoError(t, err)
assert.Equal(t, name.String, gotName)

gotID, err := mpr.vars.Get("jb.databaseID")
require.NoError(t, err)
assert.Equal(t, jobID, gotID)

gotExternalID, err := mpr.vars.Get("jb.externalJobID")
require.NoError(t, err)
assert.Equal(t, externalJobID, gotExternalID)
}

func TestPipelineRunnerAdapter_SetsVarsOnSpec(t *testing.T) {
logger := logger.TestLogger(t)
mpr := newMockPipelineRunner()
jobID, externalJobID, name, jobType := int32(100), uuid.New(), null.StringFrom("job-name"), job.Type("generic")
pra := NewPipelineRunnerAdapter(logger, job.Job{ID: jobID, ExternalJobID: externalJobID, Name: name, Type: jobType}, mpr)

maxDuration := time.Duration(100 * time.Second)
_, err := pra.ExecuteRun(context.Background(), spec, types.Vars{}, types.Options{MaxTaskDuration: maxDuration})
require.NoError(t, err)

assert.Equal(t, jobID, mpr.spec.JobID)
assert.Equal(t, name.ValueOrZero(), mpr.spec.JobName)
assert.Equal(t, string(jobType), mpr.spec.JobType)
assert.Equal(t, maxDuration, mpr.spec.MaxTaskDuration.Duration())

}

func TestMerge(t *testing.T) {
vars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
},
}
addedVars := map[string]interface{}{
"jb": map[string]interface{}{
"some-other-var": "foo",
},
"val": 0,
}

err := merge(vars, addedVars)
require.NoError(t, err)

assert.True(t, reflect.DeepEqual(vars, map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
"some-other-var": "foo",
},
"val": 0,
}), vars)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1459,8 +1459,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 h1:vdieOW3CZGdD2R5zvCSMS+0vksyExPN3/Fa1uVfld/A=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb // indirect
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2364,8 +2364,8 @@ github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc4
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8=
github.com/smartcontractkit/chainlink-env v0.38.3 h1:ZtOnwkG622R0VCTxL5V09AnT/QXhlFwkGTjd0Lsfpfg=
github.com/smartcontractkit/chainlink-env v0.38.3/go.mod h1:7z4sw/hN8TxioQCLwFqQdhK3vaOV0a22Qe99z4bRUcg=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down

0 comments on commit db64df9

Please sign in to comment.