Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCF-2630] Add pipeline runner adapter server #11091

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230906073235-9e478e5e19f1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1458,8 +1458,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 h1:vdieOW3CZGdD2R5zvCSMS+0vksyExPN3/Fa1uVfld/A=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
94 changes: 94 additions & 0 deletions core/services/ocr2/plugins/generic/pipeline_runner_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package generic

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

var _ types.PipelineRunnerService = (*PipelineRunnerAdapter)(nil)

type pipelineRunner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
}

type PipelineRunnerAdapter struct {
runner pipelineRunner
job job.Job
logger logger.Logger
}

func (p *PipelineRunnerAdapter) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) ([]types.TaskResult, error) {
s := pipeline.Spec{
DotDagSource: spec,
CreatedAt: time.Now(),
MaxTaskDuration: models.Interval(options.MaxTaskDuration),
JobID: p.job.ID,
JobName: p.job.Name.ValueOrZero(),
JobType: string(p.job.Type),
}

defaultVars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": p.job.ID,
"externalJobID": p.job.ExternalJobID,
"name": p.job.Name.ValueOrZero(),
},
}

err := merge(defaultVars, vars.Vars)
if err != nil {
return nil, err
}

finalVars := pipeline.NewVarsFrom(defaultVars)
_, trrs, err := p.runner.ExecuteRun(ctx, s, finalVars, p.logger)
if err != nil {
return nil, err
}

taskResults := make([]types.TaskResult, len(trrs))
for i, trr := range trrs {
taskResults[i] = types.TaskResult{
ID: trr.ID.String(),
Type: string(trr.Task.Type()),
Value: trr.Result.Value,
Error: trr.Result.Error,
Index: int(trr.TaskRun.Index),
}
}
return taskResults, nil
}

func NewPipelineRunnerAdapter(logger logger.Logger, job job.Job, runner pipelineRunner) *PipelineRunnerAdapter {
return &PipelineRunnerAdapter{
logger: logger,
job: job,
runner: runner,
}
}

// merge merges mapTwo into mapOne, modifying mapOne in the process.
func merge(mapOne, mapTwo map[string]interface{}) error {
for k, v := range mapTwo {
// if `mapOne` doesn't have `k`, then nothing to do, just assign v to `mapOne`.
if _, ok := mapOne[k]; !ok {
mapOne[k] = v
} else {
vAsMap, vOK := v.(map[string]interface{})
mapOneVAsMap, moOK := mapOne[k].(map[string]interface{})
if vOK && moOK {
merge(mapOneVAsMap, vAsMap)
} else {
mapOne[k] = v
}
}
}

return nil
}
144 changes: 144 additions & 0 deletions core/services/ocr2/plugins/generic/pipeline_runner_adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package generic

import (
"context"
"net/http"
"reflect"
"testing"
"time"

"github.com/google/uuid"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
_ "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const spec = `
answer [type=sum values=<[ $(val), 2 ]>]
answer;
`

func TestAdapter_Integration(t *testing.T) {
logger := logger.TestLogger(t)
cfg := configtest.NewTestGeneralConfig(t)
url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)

keystore := keystore.NewInMemory(db, utils.FastScryptParams, logger, cfg.Database())
pipelineORM := pipeline.NewORM(db, logger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db, logger, cfg.Database())
pr := pipeline.NewRunner(
pipelineORM,
bridgesORM,
cfg.JobPipeline(),
cfg.WebServer(),
nil,
keystore.Eth(),
keystore.VRF(),
logger,
http.DefaultClient,
http.DefaultClient,
)
pra := NewPipelineRunnerAdapter(logger, job.Job{}, pr)
results, err := pra.ExecuteRun(context.Background(), spec, types.Vars{Vars: map[string]interface{}{"val": 1}}, types.Options{})
require.NoError(t, err)

finalResult := results[0].Value.(decimal.Decimal)

assert.True(t, decimal.NewFromInt(3).Equal(finalResult))
}

func newMockPipelineRunner() *mockPipelineRunner {
return &mockPipelineRunner{}
}

type mockPipelineRunner struct {
results pipeline.TaskRunResults
err error
run *pipeline.Run
spec pipeline.Spec
vars pipeline.Vars
}

func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error) {
m.spec = spec
m.vars = vars
return m.run, m.results, m.err
}

func TestAdapter_AddsDefaultVars(t *testing.T) {
logger := logger.TestLogger(t)
mpr := newMockPipelineRunner()
jobID, externalJobID, name := int32(100), uuid.New(), null.StringFrom("job-name")
pra := NewPipelineRunnerAdapter(logger, job.Job{ID: jobID, ExternalJobID: externalJobID, Name: name}, mpr)

_, err := pra.ExecuteRun(context.Background(), spec, types.Vars{}, types.Options{})
require.NoError(t, err)

gotName, err := mpr.vars.Get("jb.name")
require.NoError(t, err)
assert.Equal(t, name.String, gotName)

gotID, err := mpr.vars.Get("jb.databaseID")
require.NoError(t, err)
assert.Equal(t, jobID, gotID)

gotExternalID, err := mpr.vars.Get("jb.externalJobID")
require.NoError(t, err)
assert.Equal(t, externalJobID, gotExternalID)
}

func TestPipelineRunnerAdapter_SetsVarsOnSpec(t *testing.T) {
logger := logger.TestLogger(t)
mpr := newMockPipelineRunner()
jobID, externalJobID, name, jobType := int32(100), uuid.New(), null.StringFrom("job-name"), job.Type("generic")
pra := NewPipelineRunnerAdapter(logger, job.Job{ID: jobID, ExternalJobID: externalJobID, Name: name, Type: jobType}, mpr)

maxDuration := time.Duration(100 * time.Second)
_, err := pra.ExecuteRun(context.Background(), spec, types.Vars{}, types.Options{MaxTaskDuration: maxDuration})
require.NoError(t, err)

assert.Equal(t, jobID, mpr.spec.JobID)
assert.Equal(t, name.ValueOrZero(), mpr.spec.JobName)
assert.Equal(t, string(jobType), mpr.spec.JobType)
assert.Equal(t, maxDuration, mpr.spec.MaxTaskDuration.Duration())

}

func TestMerge(t *testing.T) {
vars := map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
},
}
addedVars := map[string]interface{}{
"jb": map[string]interface{}{
"some-other-var": "foo",
},
"val": 0,
}

err := merge(vars, addedVars)
require.NoError(t, err)

assert.True(t, reflect.DeepEqual(vars, map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": "some-job-id",
"some-other-var": "foo",
},
"val": 0,
}), vars)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1459,8 +1459,8 @@ github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 h1:T3lFWumv
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704/go.mod h1:2QuJdEouTWjh5BDy5o/vgGXQtR4Gz8yH1IYB5eT7u4M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 h1:vdieOW3CZGdD2R5zvCSMS+0vksyExPN3/Fa1uVfld/A=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb // indirect
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb // indirect
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2364,8 +2364,8 @@ github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc4
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230913032705-f924d753cc47/go.mod h1:xMwqRdj5vqYhCJXgKVqvyAwdcqM6ZAEhnwEQ4Khsop8=
github.com/smartcontractkit/chainlink-env v0.38.3 h1:ZtOnwkG622R0VCTxL5V09AnT/QXhlFwkGTjd0Lsfpfg=
github.com/smartcontractkit/chainlink-env v0.38.3/go.mod h1:7z4sw/hN8TxioQCLwFqQdhK3vaOV0a22Qe99z4bRUcg=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9 h1:fFD5SgSJtnXvkGLK3CExNKpUIz4sGrNNkKv3Ljw63Hk=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231020230319-2ede955d1dc9/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04 h1:QFMxPq7AqU4qXeW7UBv0eP/mpLt2pG2QkASUyFjKoIE=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20231027131428-7dc07d302a04/go.mod h1:M9U1JV7IQi8Sfj4JR1qSi1tIh6omgW78W/8SHN/8BUQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05 h1:DaPSVnxe7oz1QJ+AVIhQWs1W3ubQvwvGo9NbHpMs1OQ=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20231023133638-72f4e799ab05/go.mod h1:o0Pn1pbaUluboaK6/yhf8xf7TiFCkyFl6WUOdwqamuU=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20231024133459-1ef3a11319eb h1:HiluOfEVGOQTM6BTDImOqYdMZZ7qq7fkZ3TJdmItNr8=
Expand Down
Loading