diff --git a/core/scripts/go.mod b/core/scripts/go.mod index c8b616a4b6e..f097ea89be5 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -302,7 +302,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 // indirect - github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 // indirect + github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 // indirect github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index c4d0575bb20..0461daa25e9 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1458,8 +1458,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 h1:vdieOW3CZGdD2R5zvCSMS+0vksyExPN3/Fa1uVfld/A= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8= diff --git a/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go b/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go new file mode 100644 index 00000000000..5c58522f409 --- /dev/null +++ b/core/services/ocr2/plugins/generic/pipeline_runner_adapter.go @@ -0,0 +1,94 @@ +package generic + +import ( + "context" + "time" + + "github.com/smartcontractkit/chainlink-relay/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/store/models" +) + +var _ types.PipelineRunnerService = (*PipelineRunnerAdapter)(nil) + +type pipelineRunner interface { + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) +} + +type PipelineRunnerAdapter struct { + runner pipelineRunner + job job.Job + logger logger.Logger +} + +func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) ([]types.TaskResult, error) { + s := pipeline.Spec{ + DotDagSource: spec, + CreatedAt: time.Now(), + MaxTaskDuration: models.Interval(options.MaxTaskDuration), + JobID: p.job.ID, + JobName: p.job.Name.ValueOrZero(), + JobType: string(p.job.Type), + } + + defaultVars := map[string]interface{}{ + "jb": map[string]interface{}{ + "databaseID": p.job.ID, + "externalJobID": p.job.ExternalJobID, + "name": p.job.Name.ValueOrZero(), + }, + } + + err := merge(defaultVars, vars.Vars) + if err != nil { + return nil, err + } + + finalVars := pipeline.NewVarsFrom(defaultVars) + _, trrs, err := p.runner.ExecuteRun(ctx, s, finalVars, p.logger) + if err != nil { + return nil, err + } + + taskResults := make([]types.TaskResult, len(trrs)) + for i, trr := range trrs { + taskResults[i] = types.TaskResult{ + ID: trr.ID.String(), + Type: string(trr.Task.Type()), + Value: trr.Result.Value, + Error: trr.Result.Error, + Index: int(trr.TaskRun.Index), + } + } + return taskResults, nil +} + +func NewPipelineRunnerAdapter(logger logger.Logger, job job.Job, runner pipelineRunner) *PipelineRunnerAdapter { + return &PipelineRunnerAdapter{ + logger: logger, + job: job, + runner: runner, + } +} + +// merge merges mapTwo into mapOne, modifying mapOne in the process. +func merge(mapOne, mapTwo map[string]interface{}) error { + for k, v := range mapTwo { + // if `mapOne` doesn't have `k`, then nothing to do, just assign v to `mapOne`. + if _, ok := mapOne[k]; !ok { + mapOne[k] = v + } else { + vAsMap, vOK := v.(map[string]interface{}) + mapOneVAsMap, moOK := mapOne[k].(map[string]interface{}) + if vOK && moOK { + merge(mapOneVAsMap, vAsMap) + } else { + mapOne[k] = v + } + } + } + + return nil +} diff --git a/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go b/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go new file mode 100644 index 00000000000..d1f06d87662 --- /dev/null +++ b/core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go @@ -0,0 +1,144 @@ +package generic + +import ( + "context" + "net/http" + "reflect" + "testing" + "time" + + "github.com/google/uuid" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" + + "github.com/smartcontractkit/chainlink-relay/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/bridges" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + _ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +const spec = ` +answer [type=sum values=<[ $(val), 2 ]>] +answer; +` + +func TestAdapter_Integration(t *testing.T) { + logger := logger.TestLogger(t) + cfg := configtest.NewTestGeneralConfig(t) + url := cfg.Database().URL() + db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database()) + require.NoError(t, err) + + keystore := keystore.NewInMemory(db, utils.FastScryptParams, logger, cfg.Database()) + pipelineORM := pipeline.NewORM(db, logger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) + bridgesORM := bridges.NewORM(db, logger, cfg.Database()) + pr := pipeline.NewRunner( + pipelineORM, + bridgesORM, + cfg.JobPipeline(), + cfg.WebServer(), + nil, + keystore.Eth(), + keystore.VRF(), + logger, + http.DefaultClient, + http.DefaultClient, + ) + pra := NewPipelineRunnerAdapter(logger, job.Job{}, pr) + results, err := pra.ExecuteRun(context.Background(), spec, types.Vars{Vars: map[string]interface{}{"val": 1}}, types.Options{}) + require.NoError(t, err) + + finalResult := results[0].Value.(decimal.Decimal) + + assert.True(t, decimal.NewFromInt(3).Equal(finalResult)) +} + +func newMockPipelineRunner() *mockPipelineRunner { + return &mockPipelineRunner{} +} + +type mockPipelineRunner struct { + results pipeline.TaskRunResults + err error + run *pipeline.Run + spec pipeline.Spec + vars pipeline.Vars +} + +func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error) { + m.spec = spec + m.vars = vars + return m.run, m.results, m.err +} + +func TestAdapter_AddsDefaultVars(t *testing.T) { + logger := logger.TestLogger(t) + mpr := newMockPipelineRunner() + jobID, externalJobID, name := int32(100), uuid.New(), null.StringFrom("job-name") + pra := NewPipelineRunnerAdapter(logger, job.Job{ID: jobID, ExternalJobID: externalJobID, Name: name}, mpr) + + _, err := pra.ExecuteRun(context.Background(), spec, types.Vars{}, types.Options{}) + require.NoError(t, err) + + gotName, err := mpr.vars.Get("jb.name") + require.NoError(t, err) + assert.Equal(t, name.String, gotName) + + gotID, err := mpr.vars.Get("jb.databaseID") + require.NoError(t, err) + assert.Equal(t, jobID, gotID) + + gotExternalID, err := mpr.vars.Get("jb.externalJobID") + require.NoError(t, err) + assert.Equal(t, externalJobID, gotExternalID) +} + +func TestPipelineRunnerAdapter_SetsVarsOnSpec(t *testing.T) { + logger := logger.TestLogger(t) + mpr := newMockPipelineRunner() + jobID, externalJobID, name, jobType := int32(100), uuid.New(), null.StringFrom("job-name"), job.Type("generic") + pra := NewPipelineRunnerAdapter(logger, job.Job{ID: jobID, ExternalJobID: externalJobID, Name: name, Type: jobType}, mpr) + + maxDuration := time.Duration(100 * time.Second) + _, err := pra.ExecuteRun(context.Background(), spec, types.Vars{}, types.Options{MaxTaskDuration: maxDuration}) + require.NoError(t, err) + + assert.Equal(t, jobID, mpr.spec.JobID) + assert.Equal(t, name.ValueOrZero(), mpr.spec.JobName) + assert.Equal(t, string(jobType), mpr.spec.JobType) + assert.Equal(t, maxDuration, mpr.spec.MaxTaskDuration.Duration()) + +} + +func TestMerge(t *testing.T) { + vars := map[string]interface{}{ + "jb": map[string]interface{}{ + "databaseID": "some-job-id", + }, + } + addedVars := map[string]interface{}{ + "jb": map[string]interface{}{ + "some-other-var": "foo", + }, + "val": 0, + } + + err := merge(vars, addedVars) + require.NoError(t, err) + + assert.True(t, reflect.DeepEqual(vars, map[string]interface{}{ + "jb": map[string]interface{}{ + "databaseID": "some-job-id", + "some-other-var": "foo", + }, + "val": 0, + }), vars) +} diff --git a/go.mod b/go.mod index df970160acc..3679c79ed03 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 - github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 + github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 diff --git a/go.sum b/go.sum index 59286787c2e..e2bc6610488 100644 --- a/go.sum +++ b/go.sum @@ -1459,8 +1459,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 h1:vdieOW3CZGdD2R5zvCSMS+0vksyExPN3/Fa1uVfld/A= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 7683a966920..d3e09d94d7c 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -385,7 +385,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 // indirect - github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 // indirect + github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 // indirect github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index d280310b9f2..725e05914cf 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2364,8 +2364,8 @@ github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc4 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8= github.com/smartcontractkit/chainlink-env v0.38.3 h1:ZtOnwkG622R0VCTxL5V09AnT/QXhlFwkGTjd0Lsfpfg= github.com/smartcontractkit/chainlink-env v0.38.3/go.mod h1:7z4sw/hN8TxioQCLwFqQdhK3vaOV0a22Qe99z4bRUcg= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk= -github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE= +github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ= github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=