diff --git a/.changeset/many-pillows-reflect.md b/.changeset/many-pillows-reflect.md new file mode 100644 index 00000000000..6de57ecc2a4 --- /dev/null +++ b/.changeset/many-pillows-reflect.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +core/services/keeper: switch to sqlutil.DataSource #internal diff --git a/core/internal/cltest/factories.go b/core/internal/cltest/factories.go index b1c705ea027..d7e1036bcac 100644 --- a/core/internal/cltest/factories.go +++ b/core/internal/cltest/factories.go @@ -44,7 +44,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keeper" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -379,15 +378,16 @@ func MakeDirectRequestJobSpec(t *testing.T) *job.Job { return spec } -func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm keeper.ORM, from evmtypes.EIP55Address, contract evmtypes.EIP55Address) job.Job { +func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm *keeper.ORM, from evmtypes.EIP55Address, contract evmtypes.EIP55Address) job.Job { t.Helper() + ctx := testutils.Context(t) var keeperSpec job.KeeperSpec - err := korm.Q().Get(&keeperSpec, `INSERT INTO keeper_specs (contract_address, from_address, created_at, updated_at,evm_chain_id) VALUES ($1, $2, NOW(), NOW(), $3) RETURNING *`, contract, from, testutils.SimulatedChainID.Int64()) + err := korm.DataSource().GetContext(ctx, &keeperSpec, `INSERT INTO keeper_specs (contract_address, from_address, created_at, updated_at,evm_chain_id) VALUES ($1, $2, NOW(), NOW(), $3) RETURNING *`, contract, from, testutils.SimulatedChainID.Int64()) require.NoError(t, err) var pipelineSpec pipeline.Spec - err = korm.Q().Get(&pipelineSpec, `INSERT INTO pipeline_specs (dot_dag_source,created_at) VALUES ('',NOW()) RETURNING *`) + err = korm.DataSource().GetContext(ctx, &pipelineSpec, `INSERT INTO pipeline_specs (dot_dag_source,created_at) VALUES ('',NOW()) RETURNING *`) require.NoError(t, err) jb := job.Job{ @@ -411,10 +411,11 @@ func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm keeper.ORM, from evmtyp return jb } -func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm keeper.ORM, ethKeyStore keystore.Eth, keeperIndex, numKeepers, blockCountPerTurn int32) (keeper.Registry, job.Job) { +func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm *keeper.ORM, ethKeyStore keystore.Eth, keeperIndex, numKeepers, blockCountPerTurn int32) (keeper.Registry, job.Job) { + t.Helper() + ctx := testutils.Context(t) key, _ := MustInsertRandomKey(t, ethKeyStore, *ubig.New(testutils.SimulatedChainID)) from := key.EIP55Address - t.Helper() contractAddress := NewEIP55Address() jb := MustInsertKeeperJob(t, db, korm, from, contractAddress) registry := keeper.Registry{ @@ -429,13 +430,14 @@ func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm keeper.ORM, ethKey from: keeperIndex, }, } - err := korm.UpsertRegistry(®istry) + err := korm.UpsertRegistry(ctx, ®istry) require.NoError(t, err) return registry, jb } -func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, registry keeper.Registry) keeper.UpkeepRegistration { - korm := keeper.NewORM(db, logger.TestLogger(t), cfg) +func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, registry keeper.Registry) keeper.UpkeepRegistration { + ctx := testutils.Context(t) + korm := keeper.NewORM(db, logger.TestLogger(t)) upkeepID := ubig.NewI(int64(mathrand.Uint32())) upkeep := keeper.UpkeepRegistration{ UpkeepID: upkeepID, @@ -447,7 +449,7 @@ func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, regi positioningConstant, err := keeper.CalcPositioningConstant(upkeepID, registry.ContractAddress) require.NoError(t, err) upkeep.PositioningConstant = positioningConstant - err = korm.UpsertUpkeep(&upkeep) + err = korm.UpsertUpkeep(ctx, &upkeep) require.NoError(t, err) return upkeep } diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index b4cdc8d012a..a6e3622df1b 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -349,8 +349,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { pipelineORM := pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()) bridgesORM := bridges.NewORM(db) jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, keyStore, config.Database()) - scopedConfig := evmtest.NewChainScopedConfig(t, config) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) t.Run("it deletes records for offchainreporting jobs", func(t *testing.T) { _, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{}) @@ -381,8 +380,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { t.Run("it deletes records for keeper jobs", func(t *testing.T) { registry, keeperJob := cltest.MustInsertKeeperRegistry(t, db, korm, keyStore.Eth(), 0, 1, 20) - scoped := evmtest.NewChainScopedConfig(t, config) - cltest.MustInsertUpkeepForRegistry(t, db, scoped.Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) cltest.AssertCount(t, db, "keeper_specs", 1) cltest.AssertCount(t, db, "keeper_registries", 1) diff --git a/core/services/keeper/delegate.go b/core/services/keeper/delegate.go index 8cadb8cd77f..679ccf3053d 100644 --- a/core/services/keeper/delegate.go +++ b/core/services/keeper/delegate.go @@ -66,7 +66,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services return nil, err } registryAddress := spec.KeeperSpec.ContractAddress - orm := NewORM(d.db, d.logger, chain.Config().Database()) + orm := NewORM(d.db, d.logger) svcLogger := d.logger.With( "jobID", spec.ID, "registryAddress", registryAddress.Hex(), diff --git a/core/services/keeper/helpers_test.go b/core/services/keeper/helpers_test.go index fdcb12b01b1..3fb9d7760a4 100644 --- a/core/services/keeper/helpers_test.go +++ b/core/services/keeper/helpers_test.go @@ -1,14 +1,15 @@ package keeper import ( + "context" "math/big" "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" ) -func (rs *RegistrySynchronizer) ExportedFullSync() { - rs.fullSync() +func (rs *RegistrySynchronizer) ExportedFullSync(ctx context.Context) { + rs.fullSync(ctx) } func (rw *RegistryWrapper) GetUpkeepIdFromRawRegistrationLog(rawLog types.Log) (*big.Int, error) { diff --git a/core/services/keeper/integration_test.go b/core/services/keeper/integration_test.go index d78b1fb2ca5..49073c8de56 100644 --- a/core/services/keeper/integration_test.go +++ b/core/services/keeper/integration_test.go @@ -32,7 +32,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/job" @@ -247,8 +246,7 @@ func TestKeeperEthIntegration(t *testing.T) { c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) // disable reorg protection for this test c.EVM[0].HeadTracker.MaxBufferSize = ptr[uint32](100) // helps prevent missed heads }) - scopedConfig := evmtest.NewChainScopedConfig(t, config) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey) require.NoError(t, app.Start(testutils.Context(t))) @@ -328,6 +326,7 @@ func TestKeeperEthIntegration(t *testing.T) { func TestKeeperForwarderEthIntegration(t *testing.T) { t.Parallel() t.Run("keeper_forwarder_flow", func(t *testing.T) { + ctx := testutils.Context(t) g := gomega.NewWithT(t) // setup node key @@ -407,15 +406,14 @@ func TestKeeperForwarderEthIntegration(t *testing.T) { c.EVM[0].Transactions.ForwardersEnabled = ptr(true) // Enable Operator Forwarder flow c.EVM[0].ChainID = (*ubig.Big)(testutils.SimulatedChainID) }) - scopedConfig := evmtest.NewChainScopedConfig(t, config) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey) - require.NoError(t, app.Start(testutils.Context(t))) + require.NoError(t, app.Start(ctx)) forwarderORM := forwarders.NewORM(db) chainID := ubig.Big(*backend.ConfiguredChainID()) - _, err = forwarderORM.CreateForwarder(testutils.Context(t), fwdrAddress, chainID) + _, err = forwarderORM.CreateForwarder(ctx, fwdrAddress, chainID) require.NoError(t, err) addr, err := app.GetRelayers().LegacyEVMChains().Slice()[0].TxManager().GetForwarderForEOA(nodeAddress) @@ -452,7 +450,7 @@ func TestKeeperForwarderEthIntegration(t *testing.T) { evmtypes.EIP55AddressFromAddress(nelly.From): 1, }, } - err = korm.UpsertRegistry(®istry) + err = korm.UpsertRegistry(ctx, ®istry) require.NoError(t, err) callOpts := bind.CallOpts{From: nodeAddress} @@ -464,7 +462,7 @@ func TestKeeperForwarderEthIntegration(t *testing.T) { } require.Equal(t, lastKeeper(), common.Address{}) - err = app.JobSpawner().StartService(testutils.Context(t), jb) + err = app.JobSpawner().StartService(ctx, jb) require.NoError(t, err) // keeper job is triggered and payload is received @@ -551,8 +549,7 @@ func TestMaxPerformDataSize(t *testing.T) { c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) // disable reorg protection for this test c.EVM[0].HeadTracker.MaxBufferSize = ptr[uint32](100) // helps prevent missed heads }) - scopedConfig := evmtest.NewChainScopedConfig(t, config) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey) require.NoError(t, app.Start(testutils.Context(t))) diff --git a/core/services/keeper/orm.go b/core/services/keeper/orm.go index 55dd6c52e68..ed3196bb660 100644 --- a/core/services/keeper/orm.go +++ b/core/services/keeper/orm.go @@ -1,60 +1,60 @@ package keeper import ( + "context" "math/rand" - "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) // ORM implements ORM layer using PostgreSQL type ORM struct { - q pg.Q + ds sqlutil.DataSource logger logger.Logger } // NewORM is the constructor of postgresORM -func NewORM(db *sqlx.DB, lggr logger.Logger, config pg.QConfig) ORM { +func NewORM(ds sqlutil.DataSource, lggr logger.Logger) *ORM { lggr = lggr.Named("KeeperORM") - return ORM{ - q: pg.NewQ(db, lggr, config), + return &ORM{ + ds: ds, logger: lggr, } } -func (korm ORM) Q() pg.Q { - return korm.q +func (o *ORM) DataSource() sqlutil.DataSource { + return o.ds } // Registries returns all registries -func (korm ORM) Registries() ([]Registry, error) { +func (o *ORM) Registries(ctx context.Context) ([]Registry, error) { var registries []Registry - err := korm.q.Select(®istries, `SELECT * FROM keeper_registries ORDER BY id ASC`) + err := o.ds.SelectContext(ctx, ®istries, `SELECT * FROM keeper_registries ORDER BY id ASC`) return registries, errors.Wrap(err, "failed to get registries") } // RegistryByContractAddress returns a single registry based on provided address -func (korm ORM) RegistryByContractAddress(registryAddress types.EIP55Address) (Registry, error) { +func (o *ORM) RegistryByContractAddress(ctx context.Context, registryAddress types.EIP55Address) (Registry, error) { var registry Registry - err := korm.q.Get(®istry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress) + err := o.ds.GetContext(ctx, ®istry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress) return registry, errors.Wrap(err, "failed to get registry") } // RegistryForJob returns a specific registry for a job with the given ID -func (korm ORM) RegistryForJob(jobID int32) (Registry, error) { +func (o *ORM) RegistryForJob(ctx context.Context, jobID int32) (Registry, error) { var registry Registry - err := korm.q.Get(®istry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID) + err := o.ds.GetContext(ctx, ®istry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID) return registry, errors.Wrapf(err, "failed to get registry with job_id %d", jobID) } // UpsertRegistry upserts registry by the given input -func (korm ORM) UpsertRegistry(registry *Registry) error { +func (o *ORM) UpsertRegistry(ctx context.Context, registry *Registry) error { stmt := ` INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_address, check_gas, block_count_per_turn, num_keepers, keeper_index_map) VALUES ( :job_id, :keeper_index, :contract_address, :from_address, :check_gas, :block_count_per_turn, :num_keepers, :keeper_index_map @@ -66,12 +66,16 @@ INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_addr keeper_index_map = :keeper_index_map RETURNING * ` - err := korm.q.GetNamed(stmt, registry, registry) + query, args, err := o.ds.BindNamed(stmt, registry) + if err != nil { + return errors.Wrap(err, "failed to upsert registry") + } + err = o.ds.GetContext(ctx, registry, query, args...) return errors.Wrap(err, "failed to upsert registry") } // UpsertUpkeep upserts upkeep by the given input -func (korm ORM) UpsertUpkeep(registration *UpkeepRegistration) error { +func (o *ORM) UpsertUpkeep(ctx context.Context, registration *UpkeepRegistration) error { stmt := ` INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_id, positioning_constant, last_run_block_height) VALUES ( :registry_id, :execute_gas, :check_data, :upkeep_id, :positioning_constant, :last_run_block_height @@ -81,13 +85,17 @@ INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_i positioning_constant = :positioning_constant RETURNING * ` - err := korm.q.GetNamed(stmt, registration, registration) + query, args, err := o.ds.BindNamed(stmt, registration) + if err != nil { + return errors.Wrap(err, "failed to upsert upkeep") + } + err = o.ds.GetContext(ctx, registration, query, args...) return errors.Wrap(err, "failed to upsert upkeep") } // UpdateUpkeepLastKeeperIndex updates the last keeper index for an upkeep -func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, fromAddress types.EIP55Address) error { - _, err := korm.q.Exec(` +func (o *ORM) UpdateUpkeepLastKeeperIndex(ctx context.Context, jobID int32, upkeepID *big.Big, fromAddress types.EIP55Address) error { + _, err := o.ds.ExecContext(ctx, ` UPDATE upkeep_registrations SET last_keeper_index = CAST((SELECT keeper_index_map -> $3 FROM keeper_registries WHERE job_id = $1) AS int) @@ -98,12 +106,12 @@ func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, from } // BatchDeleteUpkeepsForJob deletes all upkeeps by the given IDs for the job with the given ID -func (korm ORM) BatchDeleteUpkeepsForJob(jobID int32, upkeepIDs []big.Big) (int64, error) { +func (o *ORM) BatchDeleteUpkeepsForJob(ctx context.Context, jobID int32, upkeepIDs []big.Big) (int64, error) { strIds := []string{} for _, upkeepID := range upkeepIDs { strIds = append(strIds, upkeepID.String()) } - res, err := korm.q.Exec(` + res, err := o.ds.ExecContext(ctx, ` DELETE FROM upkeep_registrations WHERE registry_id IN ( SELECT id FROM keeper_registries WHERE job_id = $1 ) AND upkeep_id = ANY($2) @@ -125,7 +133,7 @@ DELETE FROM upkeep_registrations WHERE registry_id IN ( // -- OR is it my buddy's turn AND they were the last keeper to do the perform for this upkeep // DEV: note we cast upkeep_id and binaryHash as 32 bits, even though both are 256 bit numbers when performing XOR. This is enough information // to distribute the upkeeps over the keepers so long as num keepers < 4294967296 -func (korm ORM) EligibleUpkeepsForRegistry(registryAddress types.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) { +func (o *ORM) EligibleUpkeepsForRegistry(ctx context.Context, registryAddress types.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) { stmt := ` SELECT upkeep_registrations.* FROM upkeep_registrations @@ -165,10 +173,10 @@ WHERE keeper_registries.contract_address = $1 ) ) ` - if err = korm.q.Select(&upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil { + if err = o.ds.SelectContext(ctx, &upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil { return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to get upkeep_registrations") } - if err = loadUpkeepsRegistry(korm.q, upkeeps); err != nil { + if err = o.loadUpkeepsRegistry(ctx, upkeeps); err != nil { return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to load Registry on upkeeps") } @@ -179,7 +187,7 @@ WHERE keeper_registries.contract_address = $1 return upkeeps, err } -func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error { +func (o *ORM) loadUpkeepsRegistry(ctx context.Context, upkeeps []UpkeepRegistration) error { registryIDM := make(map[int64]*Registry) var registryIDs []int64 for _, upkeep := range upkeeps { @@ -189,7 +197,7 @@ func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error { } } var registries []*Registry - err := q.Select(®istries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs)) + err := o.ds.SelectContext(ctx, ®istries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs)) if err != nil { return errors.Wrap(err, "loadUpkeepsRegistry failed") } @@ -202,8 +210,8 @@ func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error { return nil } -func (korm ORM) AllUpkeepIDsForRegistry(regID int64) (upkeeps []big.Big, err error) { - err = korm.q.Select(&upkeeps, ` +func (o *ORM) AllUpkeepIDsForRegistry(ctx context.Context, regID int64) (upkeeps []big.Big, err error) { + err = o.ds.SelectContext(ctx, &upkeeps, ` SELECT upkeep_id FROM upkeep_registrations WHERE registry_id = $1 @@ -212,8 +220,8 @@ WHERE registry_id = $1 } // SetLastRunInfoForUpkeepOnJob sets the last run block height and the associated keeper index only if the new block height is greater than the previous. -func (korm ORM) SetLastRunInfoForUpkeepOnJob(jobID int32, upkeepID *big.Big, height int64, fromAddress types.EIP55Address, qopts ...pg.QOpt) (int64, error) { - res, err := korm.q.WithOpts(qopts...).Exec(` +func (o *ORM) SetLastRunInfoForUpkeepOnJob(ctx context.Context, jobID int32, upkeepID *big.Big, height int64, fromAddress types.EIP55Address) (int64, error) { + res, err := o.ds.ExecContext(ctx, ` UPDATE upkeep_registrations SET last_run_block_height = $1, last_keeper_index = CAST((SELECT keeper_index_map -> $4 FROM keeper_registries WHERE job_id = $3) AS int) diff --git a/core/services/keeper/orm_test.go b/core/services/keeper/orm_test.go index ed58554ef0d..1e5d927fe2f 100644 --- a/core/services/keeper/orm_test.go +++ b/core/services/keeper/orm_test.go @@ -19,6 +19,7 @@ import ( evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" @@ -36,12 +37,12 @@ var ( func setupKeeperDB(t *testing.T) ( *sqlx.DB, evmconfig.ChainScopedConfig, - keeper.ORM, + *keeper.ORM, ) { gcfg := configtest.NewGeneralConfig(t, nil) db := pgtest.NewSqlxDB(t) cfg := evmtest.NewChainScopedConfig(t, gcfg) - orm := keeper.NewORM(db, logger.TestLogger(t), cfg.Database()) + orm := keeper.NewORM(db, logger.TestLogger(t)) return db, cfg, orm } @@ -74,32 +75,35 @@ func assertLastRunHeight(t *testing.T, db *sqlx.DB, upkeep keeper.UpkeepRegistra func TestKeeperDB_Registries(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - existingRegistries, err := orm.Registries() + existingRegistries, err := orm.Registries(ctx) require.NoError(t, err) require.Equal(t, 2, len(existingRegistries)) } func TestKeeperDB_RegistryByContractAddress(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - registryByContractAddress, err := orm.RegistryByContractAddress(registry.ContractAddress) + registryByContractAddress, err := orm.RegistryByContractAddress(ctx, registry.ContractAddress) require.NoError(t, err) require.Equal(t, registry, registryByContractAddress) } func TestKeeperDB_UpsertUpkeep(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() @@ -113,7 +117,7 @@ func TestKeeperDB_UpsertUpkeep(t *testing.T) { LastRunBlockHeight: 1, PositioningConstant: 1, } - require.NoError(t, orm.UpsertUpkeep(&upkeep)) + require.NoError(t, orm.UpsertUpkeep(ctx, &upkeep)) cltest.AssertCount(t, db, "upkeep_registrations", 1) // update upkeep @@ -121,7 +125,7 @@ func TestKeeperDB_UpsertUpkeep(t *testing.T) { upkeep.CheckData = common.Hex2Bytes("8888") upkeep.LastRunBlockHeight = 2 - err := orm.UpsertUpkeep(&upkeep) + err := orm.UpsertUpkeep(ctx, &upkeep) require.NoError(t, err) cltest.AssertCount(t, db, "upkeep_registrations", 1) @@ -135,21 +139,22 @@ func TestKeeperDB_UpsertUpkeep(t *testing.T) { func TestKeeperDB_BatchDeleteUpkeepsForJob(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, job := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - expectedUpkeepID := cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry).UpkeepID + expectedUpkeepID := cltest.MustInsertUpkeepForRegistry(t, db, registry).UpkeepID var upkeepIDs []ubig.Big for i := 0; i < 2; i++ { - upkeep := cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + upkeep := cltest.MustInsertUpkeepForRegistry(t, db, registry) upkeepIDs = append(upkeepIDs, *upkeep.UpkeepID) } cltest.AssertCount(t, db, "upkeep_registrations", 3) - _, err := orm.BatchDeleteUpkeepsForJob(job.ID, upkeepIDs) + _, err := orm.BatchDeleteUpkeepsForJob(ctx, job.ID, upkeepIDs) require.NoError(t, err) cltest.AssertCount(t, db, "upkeep_registrations", 1) @@ -161,6 +166,7 @@ func TestKeeperDB_BatchDeleteUpkeepsForJob(t *testing.T) { func TestKeeperDB_EligibleUpkeeps_Shuffle(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() @@ -173,12 +179,12 @@ func TestKeeperDB_EligibleUpkeeps_Shuffle(t *testing.T) { for i := 0; i < 100; i++ { k := newUpkeep(registry, int64(i)) ordered[i] = int64(i) - err := orm.UpsertUpkeep(&k) + err := orm.UpsertUpkeep(ctx, &k) require.NoError(t, err) } cltest.AssertCount(t, db, "upkeep_registrations", 100) - eligibleUpkeeps, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, blockheight, gracePeriod, fmt.Sprintf("%b", evmutils.NewHash().Big())) + eligibleUpkeeps, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, blockheight, gracePeriod, fmt.Sprintf("%b", evmutils.NewHash().Big())) assert.NoError(t, err) require.Len(t, eligibleUpkeeps, 100) @@ -191,13 +197,14 @@ func TestKeeperDB_EligibleUpkeeps_Shuffle(t *testing.T) { func TestKeeperDB_NewEligibleUpkeeps_GracePeriod(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 2, 20) for i := 0; i < 100; i++ { - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) } cltest.AssertCount(t, db, "keeper_registries", 1) @@ -206,38 +213,39 @@ func TestKeeperDB_NewEligibleUpkeeps_GracePeriod(t *testing.T) { // if current keeper index = 0 and all upkeeps last perform was done by index = 0 and still within grace period upkeep := keeper.UpkeepRegistration{} require.NoError(t, db.Get(&upkeep, `UPDATE upkeep_registrations SET last_keeper_index = 0, last_run_block_height = 10 RETURNING *`)) - list0, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 21, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) // none eligible + list0, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 21, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) // none eligible require.NoError(t, err) require.Equal(t, 0, len(list0), "should be 0 as all last perform was done by current node") // once passed grace period - list1, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 121, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) // none eligible + list1, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 121, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) // none eligible require.NoError(t, err) require.NotEqual(t, 0, len(list1), "should get some eligible upkeeps now that they are outside grace period") } func TestKeeperDB_EligibleUpkeeps_TurnsRandom(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 3, 10) for i := 0; i < 1000; i++ { - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) } cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 1000) // 3 keepers 10 block turns should be different every turn - list1, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 20, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) + list1, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 20, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) require.NoError(t, err) - list2, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 31, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) + list2, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 31, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) require.NoError(t, err) - list3, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 42, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) + list3, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 42, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) require.NoError(t, err) - list4, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 53, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) + list4, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 53, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) require.NoError(t, err) // sort before compare @@ -261,13 +269,14 @@ func TestKeeperDB_EligibleUpkeeps_TurnsRandom(t *testing.T) { func TestKeeperDB_NewEligibleUpkeeps_SkipIfLastPerformedByCurrentKeeper(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 2, 20) for i := 0; i < 100; i++ { - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) } cltest.AssertCount(t, db, "keeper_registries", 1) @@ -276,20 +285,21 @@ func TestKeeperDB_NewEligibleUpkeeps_SkipIfLastPerformedByCurrentKeeper(t *testi // if current keeper index = 0 and all upkeeps last perform was done by index = 0 then skip as it would not pass required turn taking upkeep := keeper.UpkeepRegistration{} require.NoError(t, db.Get(&upkeep, `UPDATE upkeep_registrations SET last_keeper_index = 0 RETURNING *`)) - list0, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 21, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) // none eligible + list0, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 21, 100, fmt.Sprintf("%b", evmutils.NewHash().Big())) // none eligible require.NoError(t, err) require.Equal(t, 0, len(list0), "should be 0 as all last perform was done by current node") } func TestKeeperDB_NewEligibleUpkeeps_CoverBuddy(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 1, 2, 20) for i := 0; i < 100; i++ { - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) } cltest.AssertCount(t, db, "keeper_registries", 1) @@ -297,23 +307,24 @@ func TestKeeperDB_NewEligibleUpkeeps_CoverBuddy(t *testing.T) { upkeep := keeper.UpkeepRegistration{} binaryHash := fmt.Sprintf("%b", evmutils.NewHash().Big()) - listBefore, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 21, 100, binaryHash) // normal + listBefore, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 21, 100, binaryHash) // normal require.NoError(t, err) require.NoError(t, db.Get(&upkeep, `UPDATE upkeep_registrations SET last_keeper_index = 0 RETURNING *`)) - listAfter, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 21, 100, binaryHash) // covering buddy + listAfter, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 21, 100, binaryHash) // covering buddy require.NoError(t, err) require.Greater(t, len(listAfter), len(listBefore), "after our buddy runs all the performs we should have more eligible then a normal turn") } func TestKeeperDB_NewEligibleUpkeeps_FirstTurn(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 2, 20) for i := 0; i < 100; i++ { - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) } cltest.AssertCount(t, db, "keeper_registries", 1) @@ -321,29 +332,30 @@ func TestKeeperDB_NewEligibleUpkeeps_FirstTurn(t *testing.T) { binaryHash := fmt.Sprintf("%b", evmutils.NewHash().Big()) // last keeper index is null to simulate a normal first run - listKpr0, err := orm.EligibleUpkeepsForRegistry(registry.ContractAddress, 21, 100, binaryHash) // someone eligible only kpr0 turn + listKpr0, err := orm.EligibleUpkeepsForRegistry(ctx, registry.ContractAddress, 21, 100, binaryHash) // someone eligible only kpr0 turn require.NoError(t, err) require.NotEqual(t, 0, len(listKpr0), "kpr0 should have some eligible as a normal turn") } func TestKeeperDB_NewEligibleUpkeeps_FiltersByRegistry(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry1, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) registry2, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry1) - cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry2) + cltest.MustInsertUpkeepForRegistry(t, db, registry1) + cltest.MustInsertUpkeepForRegistry(t, db, registry2) cltest.AssertCount(t, db, "keeper_registries", 2) cltest.AssertCount(t, db, "upkeep_registrations", 2) binaryHash := fmt.Sprintf("%b", evmutils.NewHash().Big()) - list1, err := orm.EligibleUpkeepsForRegistry(registry1.ContractAddress, 20, 100, binaryHash) + list1, err := orm.EligibleUpkeepsForRegistry(ctx, registry1.ContractAddress, 20, 100, binaryHash) require.NoError(t, err) - list2, err := orm.EligibleUpkeepsForRegistry(registry2.ContractAddress, 20, 100, binaryHash) + list2, err := orm.EligibleUpkeepsForRegistry(ctx, registry2.ContractAddress, 20, 100, binaryHash) require.NoError(t, err) assert.Equal(t, 1, len(list1)) @@ -352,25 +364,26 @@ func TestKeeperDB_NewEligibleUpkeeps_FiltersByRegistry(t *testing.T) { func TestKeeperDB_AllUpkeepIDsForRegistry(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, _ := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - upkeepIDs, err := orm.AllUpkeepIDsForRegistry(registry.ID) + upkeepIDs, err := orm.AllUpkeepIDsForRegistry(ctx, registry.ID) require.NoError(t, err) // No upkeeps returned require.Len(t, upkeepIDs, 0) upkeep := newUpkeep(registry, 3) - err = orm.UpsertUpkeep(&upkeep) + err = orm.UpsertUpkeep(ctx, &upkeep) require.NoError(t, err) upkeep = newUpkeep(registry, 8) - err = orm.UpsertUpkeep(&upkeep) + err = orm.UpsertUpkeep(ctx, &upkeep) require.NoError(t, err) // We should get two upkeeps IDs, 3 & 8 - upkeepIDs, err = orm.AllUpkeepIDsForRegistry(registry.ID) + upkeepIDs, err = orm.AllUpkeepIDsForRegistry(ctx, registry.ID) require.NoError(t, err) // No upkeeps returned require.Len(t, upkeepIDs, 2) @@ -380,12 +393,13 @@ func TestKeeperDB_AllUpkeepIDsForRegistry(t *testing.T) { func TestKeeperDB_UpdateUpkeepLastKeeperIndex(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, j := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - upkeep := cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + upkeep := cltest.MustInsertUpkeepForRegistry(t, db, registry) - require.NoError(t, orm.UpdateUpkeepLastKeeperIndex(j.ID, upkeep.UpkeepID, registry.FromAddress)) + require.NoError(t, orm.UpdateUpkeepLastKeeperIndex(ctx, j.ID, upkeep.UpkeepID, registry.FromAddress)) err := db.Get(&upkeep, `SELECT * FROM upkeep_registrations WHERE upkeep_id = $1`, upkeep.UpkeepID) require.NoError(t, err) @@ -394,36 +408,37 @@ func TestKeeperDB_UpdateUpkeepLastKeeperIndex(t *testing.T) { func TestKeeperDB_NewSetLastRunInfoForUpkeepOnJob(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) db, config, orm := setupKeeperDB(t) ethKeyStore := cltest.NewKeyStore(t, db, config.Database()).Eth() registry, j := cltest.MustInsertKeeperRegistry(t, db, orm, ethKeyStore, 0, 1, 20) - upkeep := cltest.MustInsertUpkeepForRegistry(t, db, config.Database(), registry) + upkeep := cltest.MustInsertUpkeepForRegistry(t, db, registry) registry.NumKeepers = 2 registry.KeeperIndexMap = map[types.EIP55Address]int32{ registry.FromAddress: 0, types.EIP55AddressFromAddress(evmutils.ZeroAddress): 1, } - err := orm.UpsertRegistry(®istry) + err := orm.UpsertRegistry(ctx, ®istry) require.NoError(t, err, "UPDATE keeper_registries") // update - rowsAffected, err := orm.SetLastRunInfoForUpkeepOnJob(j.ID, upkeep.UpkeepID, 100, registry.FromAddress) + rowsAffected, err := orm.SetLastRunInfoForUpkeepOnJob(ctx, j.ID, upkeep.UpkeepID, 100, registry.FromAddress) require.NoError(t, err) require.Equal(t, rowsAffected, int64(1)) assertLastRunHeight(t, db, upkeep, 100, 0) // update to lower block height not allowed - rowsAffected, err = orm.SetLastRunInfoForUpkeepOnJob(j.ID, upkeep.UpkeepID, 0, registry.FromAddress) + rowsAffected, err = orm.SetLastRunInfoForUpkeepOnJob(ctx, j.ID, upkeep.UpkeepID, 0, registry.FromAddress) require.NoError(t, err) require.Equal(t, rowsAffected, int64(0)) assertLastRunHeight(t, db, upkeep, 100, 0) // update to same block height allowed - rowsAffected, err = orm.SetLastRunInfoForUpkeepOnJob(j.ID, upkeep.UpkeepID, 100, types.EIP55AddressFromAddress(evmutils.ZeroAddress)) + rowsAffected, err = orm.SetLastRunInfoForUpkeepOnJob(ctx, j.ID, upkeep.UpkeepID, 100, types.EIP55AddressFromAddress(evmutils.ZeroAddress)) require.NoError(t, err) require.Equal(t, rowsAffected, int64(1)) assertLastRunHeight(t, db, upkeep, 100, 1) // update to higher block height allowed - rowsAffected, err = orm.SetLastRunInfoForUpkeepOnJob(j.ID, upkeep.UpkeepID, 101, registry.FromAddress) + rowsAffected, err = orm.SetLastRunInfoForUpkeepOnJob(ctx, j.ID, upkeep.UpkeepID, 101, registry.FromAddress) require.NoError(t, err) require.Equal(t, rowsAffected, int64(1)) assertLastRunHeight(t, db, upkeep, 101, 0) diff --git a/core/services/keeper/registry1_1_synchronizer_test.go b/core/services/keeper/registry1_1_synchronizer_test.go index e0c2ebb2b3a..24a6a7288a7 100644 --- a/core/services/keeper/registry1_1_synchronizer_test.go +++ b/core/services/keeper/registry1_1_synchronizer_test.go @@ -20,7 +20,6 @@ import ( registry1_1 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper1_1" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -72,8 +71,7 @@ func mockRegistry1_1( func Test_LogListenerOpts1_1(t *testing.T) { db := pgtest.NewSqlxDB(t) - scopedConfig := evmtest.NewChainScopedConfig(t, configtest.NewGeneralConfig(t, nil)) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) j := cltest.MustInsertKeeperJob(t, db, korm, cltest.NewEIP55Address(), cltest.NewEIP55Address()) @@ -129,6 +127,7 @@ func Test_RegistrySynchronizer_CalcPositioningConstant(t *testing.T) { } func Test_RegistrySynchronizer1_1_FullSync(t *testing.T) { + ctx := testutils.Context(t) g := gomega.NewWithT(t) db, synchronizer, ethMock, _, job := setupRegistrySync(t, keeper.RegistryVersion_1_1) @@ -149,7 +148,7 @@ func Test_RegistrySynchronizer1_1_FullSync(t *testing.T) { upkeepConfig, 2) // sync only 2 (#0,#2) - synchronizer.ExportedFullSync() + synchronizer.ExportedFullSync(ctx) cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 2) @@ -194,7 +193,7 @@ func Test_RegistrySynchronizer1_1_FullSync(t *testing.T) { big.NewInt(5), upkeepConfig1_1, 2) // sync all 2 upkeeps (#2, #4) - synchronizer.ExportedFullSync() + synchronizer.ExportedFullSync(ctx) cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 2) diff --git a/core/services/keeper/registry1_2_synchronizer_test.go b/core/services/keeper/registry1_2_synchronizer_test.go index 387452dddf9..23e6c0355ec 100644 --- a/core/services/keeper/registry1_2_synchronizer_test.go +++ b/core/services/keeper/registry1_2_synchronizer_test.go @@ -19,7 +19,6 @@ import ( registry1_2 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper1_2" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -95,8 +94,7 @@ func mockRegistry1_2( func Test_LogListenerOpts1_2(t *testing.T) { db := pgtest.NewSqlxDB(t) - scopedConfig := evmtest.NewChainScopedConfig(t, configtest.NewGeneralConfig(t, nil)) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) j := cltest.MustInsertKeeperJob(t, db, korm, cltest.NewEIP55Address(), cltest.NewEIP55Address()) @@ -148,6 +146,7 @@ func Test_RegistrySynchronizer1_2_Start(t *testing.T) { } func Test_RegistrySynchronizer1_2_FullSync(t *testing.T) { + ctx := testutils.Context(t) g := gomega.NewWithT(t) db, synchronizer, ethMock, _, job := setupRegistrySync(t, keeper.RegistryVersion_1_2) @@ -167,7 +166,7 @@ func Test_RegistrySynchronizer1_2_FullSync(t *testing.T) { 3, // sync all 3 2, 1) - synchronizer.ExportedFullSync() + synchronizer.ExportedFullSync(ctx) cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 3) @@ -213,7 +212,7 @@ func Test_RegistrySynchronizer1_2_FullSync(t *testing.T) { 3, // sync all 3 active upkeeps 2, 1) - synchronizer.ExportedFullSync() + synchronizer.ExportedFullSync(ctx) cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 3) diff --git a/core/services/keeper/registry1_3_synchronizer_test.go b/core/services/keeper/registry1_3_synchronizer_test.go index 6fc919775cc..2b5900ac189 100644 --- a/core/services/keeper/registry1_3_synchronizer_test.go +++ b/core/services/keeper/registry1_3_synchronizer_test.go @@ -21,7 +21,6 @@ import ( registry1_3 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper1_3" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -97,8 +96,7 @@ func mockRegistry1_3( func Test_LogListenerOpts1_3(t *testing.T) { db := pgtest.NewSqlxDB(t) - scopedConfig := evmtest.NewChainScopedConfig(t, configtest.NewGeneralConfig(t, nil)) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) j := cltest.MustInsertKeeperJob(t, db, korm, cltest.NewEIP55Address(), cltest.NewEIP55Address()) @@ -153,6 +151,7 @@ func Test_RegistrySynchronizer1_3_Start(t *testing.T) { } func Test_RegistrySynchronizer1_3_FullSync(t *testing.T) { + ctx := testutils.Context(t) g := gomega.NewWithT(t) db, synchronizer, ethMock, _, job := setupRegistrySync(t, keeper.RegistryVersion_1_3) @@ -172,7 +171,7 @@ func Test_RegistrySynchronizer1_3_FullSync(t *testing.T) { 3, // sync all 3 2, 1) - synchronizer.ExportedFullSync() + synchronizer.ExportedFullSync(ctx) cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 3) @@ -218,7 +217,7 @@ func Test_RegistrySynchronizer1_3_FullSync(t *testing.T) { 3, // sync all 3 upkeeps 2, 1) - synchronizer.ExportedFullSync() + synchronizer.ExportedFullSync(ctx) cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 3) diff --git a/core/services/keeper/registry_synchronizer_core.go b/core/services/keeper/registry_synchronizer_core.go index 86c79ac0007..a720fa8f13c 100644 --- a/core/services/keeper/registry_synchronizer_core.go +++ b/core/services/keeper/registry_synchronizer_core.go @@ -27,7 +27,7 @@ var ( type RegistrySynchronizerOptions struct { Job job.Job RegistryWrapper RegistryWrapper - ORM ORM + ORM *ORM JRM job.ORM LogBroadcaster log.Broadcaster MailMon *mailbox.Monitor @@ -49,7 +49,7 @@ type RegistrySynchronizer struct { mbLogs *mailbox.Mailbox[log.Broadcast] minIncomingConfirmations uint32 effectiveKeeperAddress common.Address - orm ORM + orm *ORM logger logger.SugaredLogger wgDone sync.WaitGroup syncUpkeepQueueSize uint32 //Represents the max number of upkeeps that can be synced in parallel @@ -117,14 +117,14 @@ func (rs *RegistrySynchronizer) run() { ctx, cancel := rs.chStop.NewCtx() defer cancel() - rs.fullSync() + rs.fullSync(ctx) for { select { case <-rs.chStop: return case <-syncTicker.Ticks(): - rs.fullSync() + rs.fullSync(ctx) syncTicker.Reset(rs.interval) case <-rs.mbLogs.Notify(): rs.processLogs(ctx) diff --git a/core/services/keeper/registry_synchronizer_helper_test.go b/core/services/keeper/registry_synchronizer_helper_test.go index 19ba2eedbbb..c97d3c0c92c 100644 --- a/core/services/keeper/registry_synchronizer_helper_test.go +++ b/core/services/keeper/registry_synchronizer_helper_test.go @@ -38,8 +38,7 @@ func setupRegistrySync(t *testing.T, version keeper.RegistryVersion) ( ) { db := pgtest.NewSqlxDB(t) cfg := configtest.NewGeneralConfig(t, nil) - scopedConfig := evmtest.NewChainScopedConfig(t, cfg) - korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database()) + korm := keeper.NewORM(db, logger.TestLogger(t)) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) keyStore := cltest.NewKeyStore(t, db, cfg.Database()) lbMock := logmocks.NewBroadcaster(t) @@ -47,7 +46,6 @@ func setupRegistrySync(t *testing.T, version keeper.RegistryVersion) ( j := cltest.MustInsertKeeperJob(t, db, korm, cltest.NewEIP55Address(), cltest.NewEIP55Address()) relayExtenders := evmtest.NewChainRelayExtenders(t, evmtest.TestChainOpts{DB: db, Client: ethClient, LogBroadcaster: lbMock, GeneralConfig: cfg, KeyStore: keyStore.Eth()}) legacyChains := evmrelay.NewLegacyChainsFromRelayerExtenders(relayExtenders) - ch := evmtest.MustGetDefaultChain(t, legacyChains) jpv2 := cltest.NewJobPipelineV2(t, cfg.WebServer(), cfg.JobPipeline(), cfg.Database(), legacyChains, db, keyStore, nil, nil) contractAddress := j.KeeperSpec.ContractAddress.Address() @@ -75,7 +73,7 @@ func setupRegistrySync(t *testing.T, version keeper.RegistryVersion) ( mailMon := servicetest.Run(t, mailboxtest.NewMonitor(t)) - orm := keeper.NewORM(db, logger.TestLogger(t), ch.Config().Database()) + orm := keeper.NewORM(db, logger.TestLogger(t)) synchronizer := keeper.NewRegistrySynchronizer(keeper.RegistrySynchronizerOptions{ Job: j, RegistryWrapper: *registryWrapper, diff --git a/core/services/keeper/registry_synchronizer_process_logs.go b/core/services/keeper/registry_synchronizer_process_logs.go index 5dfdc7b6950..0a0e1613c95 100644 --- a/core/services/keeper/registry_synchronizer_process_logs.go +++ b/core/services/keeper/registry_synchronizer_process_logs.go @@ -38,43 +38,43 @@ func (rs *RegistrySynchronizer) processLogs(ctx context.Context) { *registry1_2.KeeperRegistryConfigSet, *registry1_3.KeeperRegistryKeepersUpdated, *registry1_3.KeeperRegistryConfigSet: - err = rs.handleSyncRegistryLog(broadcast) + err = rs.handleSyncRegistryLog(ctx, broadcast) case *registry1_1.KeeperRegistryUpkeepCanceled, *registry1_2.KeeperRegistryUpkeepCanceled, *registry1_3.KeeperRegistryUpkeepCanceled: - err = rs.handleUpkeepCancelled(broadcast) + err = rs.handleUpkeepCancelled(ctx, broadcast) case *registry1_1.KeeperRegistryUpkeepRegistered, *registry1_2.KeeperRegistryUpkeepRegistered, *registry1_3.KeeperRegistryUpkeepRegistered: - err = rs.handleUpkeepRegistered(broadcast) + err = rs.handleUpkeepRegistered(ctx, broadcast) case *registry1_1.KeeperRegistryUpkeepPerformed, *registry1_2.KeeperRegistryUpkeepPerformed, *registry1_3.KeeperRegistryUpkeepPerformed: - err = rs.handleUpkeepPerformed(broadcast) + err = rs.handleUpkeepPerformed(ctx, broadcast) case *registry1_2.KeeperRegistryUpkeepGasLimitSet, *registry1_3.KeeperRegistryUpkeepGasLimitSet: - err = rs.handleUpkeepGasLimitSet(broadcast) + err = rs.handleUpkeepGasLimitSet(ctx, broadcast) case *registry1_2.KeeperRegistryUpkeepReceived, *registry1_3.KeeperRegistryUpkeepReceived: - err = rs.handleUpkeepReceived(broadcast) + err = rs.handleUpkeepReceived(ctx, broadcast) case *registry1_2.KeeperRegistryUpkeepMigrated, *registry1_3.KeeperRegistryUpkeepMigrated: - err = rs.handleUpkeepMigrated(broadcast) + err = rs.handleUpkeepMigrated(ctx, broadcast) case *registry1_3.KeeperRegistryUpkeepPaused: - err = rs.handleUpkeepPaused(broadcast) + err = rs.handleUpkeepPaused(ctx, broadcast) case *registry1_3.KeeperRegistryUpkeepUnpaused: - err = rs.handleUpkeepUnpaused(broadcast) + err = rs.handleUpkeepUnpaused(ctx, broadcast) case *registry1_3.KeeperRegistryUpkeepCheckDataUpdated: - err = rs.handleUpkeepCheckDataUpdated(broadcast) + err = rs.handleUpkeepCheckDataUpdated(ctx, broadcast) default: rs.logger.Warn("unexpected log type") @@ -92,17 +92,17 @@ func (rs *RegistrySynchronizer) processLogs(ctx context.Context) { } } -func (rs *RegistrySynchronizer) handleSyncRegistryLog(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleSyncRegistryLog(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing SyncRegistry log", "txHash", broadcast.RawLog().TxHash.Hex()) - _, err := rs.syncRegistry() + _, err := rs.syncRegistry(ctx) if err != nil { return errors.Wrap(err, "unable to sync registry") } return nil } -func (rs *RegistrySynchronizer) handleUpkeepCancelled(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepCancelled(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepCanceled log", "txHash", broadcast.RawLog().TxHash.Hex()) cancelledID, err := rs.registryWrapper.GetCancelledUpkeepIDFromLog(broadcast) @@ -110,7 +110,7 @@ func (rs *RegistrySynchronizer) handleUpkeepCancelled(broadcast log.Broadcast) e return errors.Wrap(err, "Unable to fetch cancelled upkeep ID from log") } - affected, err := rs.orm.BatchDeleteUpkeepsForJob(rs.job.ID, []big.Big{*big.New(cancelledID)}) + affected, err := rs.orm.BatchDeleteUpkeepsForJob(ctx, rs.job.ID, []big.Big{*big.New(cancelledID)}) if err != nil { return errors.Wrap(err, "unable to batch delete upkeeps") } @@ -118,10 +118,10 @@ func (rs *RegistrySynchronizer) handleUpkeepCancelled(broadcast log.Broadcast) e return nil } -func (rs *RegistrySynchronizer) handleUpkeepRegistered(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepRegistered(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepRegistered log", "txHash", broadcast.RawLog().TxHash.Hex()) - registry, err := rs.orm.RegistryForJob(rs.job.ID) + registry, err := rs.orm.RegistryForJob(ctx, rs.job.ID) if err != nil { return errors.Wrap(err, "unable to find registry for job") } @@ -131,21 +131,21 @@ func (rs *RegistrySynchronizer) handleUpkeepRegistered(broadcast log.Broadcast) return errors.Wrap(err, "Unable to fetch upkeep ID from registration log") } - err = rs.syncUpkeep(&rs.registryWrapper, registry, big.New(upkeepID)) + err = rs.syncUpkeep(ctx, &rs.registryWrapper, registry, big.New(upkeepID)) if err != nil { return errors.Wrapf(err, "failed to sync upkeep, log: %v", broadcast.String()) } return nil } -func (rs *RegistrySynchronizer) handleUpkeepPerformed(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepPerformed(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepPerformed log", "jobID", rs.job.ID, "txHash", broadcast.RawLog().TxHash.Hex()) log, err := rs.registryWrapper.ParseUpkeepPerformedLog(broadcast) if err != nil { return errors.Wrap(err, "Unable to fetch upkeep ID from performed log") } - rowsAffected, err := rs.orm.SetLastRunInfoForUpkeepOnJob(rs.job.ID, big.New(log.UpkeepID), int64(broadcast.RawLog().BlockNumber), types.EIP55AddressFromAddress(log.FromKeeper)) + rowsAffected, err := rs.orm.SetLastRunInfoForUpkeepOnJob(ctx, rs.job.ID, big.New(log.UpkeepID), int64(broadcast.RawLog().BlockNumber), types.EIP55AddressFromAddress(log.FromKeeper)) if err != nil { return errors.Wrap(err, "failed to set last run to 0") } @@ -159,10 +159,10 @@ func (rs *RegistrySynchronizer) handleUpkeepPerformed(broadcast log.Broadcast) e return nil } -func (rs *RegistrySynchronizer) handleUpkeepGasLimitSet(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepGasLimitSet(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepGasLimitSet log", "jobID", rs.job.ID, "txHash", broadcast.RawLog().TxHash.Hex()) - registry, err := rs.orm.RegistryForJob(rs.job.ID) + registry, err := rs.orm.RegistryForJob(ctx, rs.job.ID) if err != nil { return errors.Wrap(err, "unable to find registry for job") } @@ -172,17 +172,17 @@ func (rs *RegistrySynchronizer) handleUpkeepGasLimitSet(broadcast log.Broadcast) return errors.Wrap(err, "Unable to fetch upkeep ID from gas limit set log") } - err = rs.syncUpkeep(&rs.registryWrapper, registry, big.New(upkeepID)) + err = rs.syncUpkeep(ctx, &rs.registryWrapper, registry, big.New(upkeepID)) if err != nil { return errors.Wrapf(err, "failed to sync upkeep, log: %v", broadcast.String()) } return nil } -func (rs *RegistrySynchronizer) handleUpkeepReceived(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepReceived(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepReceived log", "txHash", broadcast.RawLog().TxHash.Hex()) - registry, err := rs.orm.RegistryForJob(rs.job.ID) + registry, err := rs.orm.RegistryForJob(ctx, rs.job.ID) if err != nil { return errors.Wrap(err, "unable to find registry for job") } @@ -192,14 +192,14 @@ func (rs *RegistrySynchronizer) handleUpkeepReceived(broadcast log.Broadcast) er return errors.Wrap(err, "Unable to fetch upkeep ID from received log") } - err = rs.syncUpkeep(&rs.registryWrapper, registry, big.New(upkeepID)) + err = rs.syncUpkeep(ctx, &rs.registryWrapper, registry, big.New(upkeepID)) if err != nil { return errors.Wrapf(err, "failed to sync upkeep, log: %v", broadcast.String()) } return nil } -func (rs *RegistrySynchronizer) handleUpkeepMigrated(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepMigrated(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepMigrated log", "txHash", broadcast.RawLog().TxHash.Hex()) migratedID, err := rs.registryWrapper.GetUpkeepIdFromMigratedLog(broadcast) @@ -207,7 +207,7 @@ func (rs *RegistrySynchronizer) handleUpkeepMigrated(broadcast log.Broadcast) er return errors.Wrap(err, "Unable to fetch migrated upkeep ID from log") } - affected, err := rs.orm.BatchDeleteUpkeepsForJob(rs.job.ID, []big.Big{*big.New(migratedID)}) + affected, err := rs.orm.BatchDeleteUpkeepsForJob(ctx, rs.job.ID, []big.Big{*big.New(migratedID)}) if err != nil { return errors.Wrap(err, "unable to batch delete upkeeps") } @@ -215,7 +215,7 @@ func (rs *RegistrySynchronizer) handleUpkeepMigrated(broadcast log.Broadcast) er return nil } -func (rs *RegistrySynchronizer) handleUpkeepPaused(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepPaused(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepPaused log", "txHash", broadcast.RawLog().TxHash.Hex()) pausedUpkeepId, err := rs.registryWrapper.GetUpkeepIdFromUpkeepPausedLog(broadcast) @@ -223,7 +223,7 @@ func (rs *RegistrySynchronizer) handleUpkeepPaused(broadcast log.Broadcast) erro return errors.Wrap(err, "Unable to fetch upkeep ID from upkeep paused log") } - _, err = rs.orm.BatchDeleteUpkeepsForJob(rs.job.ID, []big.Big{*big.New(pausedUpkeepId)}) + _, err = rs.orm.BatchDeleteUpkeepsForJob(ctx, rs.job.ID, []big.Big{*big.New(pausedUpkeepId)}) if err != nil { return errors.Wrap(err, "unable to batch delete upkeeps") } @@ -231,10 +231,10 @@ func (rs *RegistrySynchronizer) handleUpkeepPaused(broadcast log.Broadcast) erro return nil } -func (rs *RegistrySynchronizer) handleUpkeepUnpaused(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepUnpaused(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing UpkeepUnpaused log", "txHash", broadcast.RawLog().TxHash.Hex()) - registry, err := rs.orm.RegistryForJob(rs.job.ID) + registry, err := rs.orm.RegistryForJob(ctx, rs.job.ID) if err != nil { return errors.Wrap(err, "unable to find registry for job") } @@ -244,7 +244,7 @@ func (rs *RegistrySynchronizer) handleUpkeepUnpaused(broadcast log.Broadcast) er return errors.Wrap(err, "Unable to fetch upkeep ID from upkeep unpaused log") } - err = rs.syncUpkeep(&rs.registryWrapper, registry, big.New(unpausedUpkeepId)) + err = rs.syncUpkeep(ctx, &rs.registryWrapper, registry, big.New(unpausedUpkeepId)) if err != nil { return errors.Wrapf(err, "failed to sync upkeep, log: %s", broadcast.String()) } @@ -252,10 +252,10 @@ func (rs *RegistrySynchronizer) handleUpkeepUnpaused(broadcast log.Broadcast) er return nil } -func (rs *RegistrySynchronizer) handleUpkeepCheckDataUpdated(broadcast log.Broadcast) error { +func (rs *RegistrySynchronizer) handleUpkeepCheckDataUpdated(ctx context.Context, broadcast log.Broadcast) error { rs.logger.Debugw("processing Upkeep check data updated log", "txHash", broadcast.RawLog().TxHash.Hex()) - registry, err := rs.orm.RegistryForJob(rs.job.ID) + registry, err := rs.orm.RegistryForJob(ctx, rs.job.ID) if err != nil { return errors.Wrap(err, "unable to find registry for job") } @@ -265,7 +265,7 @@ func (rs *RegistrySynchronizer) handleUpkeepCheckDataUpdated(broadcast log.Broad return errors.Wrap(err, "Unable to parse update log from upkeep check data updated log") } - err = rs.syncUpkeep(&rs.registryWrapper, registry, big.New(updateLog.UpkeepID)) + err = rs.syncUpkeep(ctx, &rs.registryWrapper, registry, big.New(updateLog.UpkeepID)) if err != nil { return errors.Wrapf(err, "unable to update check data for upkeep %s", updateLog.UpkeepID.String()) } diff --git a/core/services/keeper/registry_synchronizer_sync.go b/core/services/keeper/registry_synchronizer_sync.go index cdca9512976..6c0e12d844b 100644 --- a/core/services/keeper/registry_synchronizer_sync.go +++ b/core/services/keeper/registry_synchronizer_sync.go @@ -1,6 +1,7 @@ package keeper import ( + "context" "encoding/binary" "math" "sync" @@ -12,29 +13,29 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" ) -func (rs *RegistrySynchronizer) fullSync() { +func (rs *RegistrySynchronizer) fullSync(ctx context.Context) { rs.logger.Debugf("fullSyncing registry %s", rs.job.KeeperSpec.ContractAddress.Hex()) - registry, err := rs.syncRegistry() + registry, err := rs.syncRegistry(ctx) if err != nil { rs.logger.Error(errors.Wrap(err, "failed to sync registry during fullSyncing registry")) return } - if err := rs.fullSyncUpkeeps(registry); err != nil { + if err := rs.fullSyncUpkeeps(ctx, registry); err != nil { rs.logger.Error(errors.Wrap(err, "failed to sync upkeeps during fullSyncing registry")) return } rs.logger.Debugf("fullSyncing registry successful %s", rs.job.KeeperSpec.ContractAddress.Hex()) } -func (rs *RegistrySynchronizer) syncRegistry() (Registry, error) { +func (rs *RegistrySynchronizer) syncRegistry(ctx context.Context) (Registry, error) { registry, err := rs.newRegistryFromChain() if err != nil { return Registry{}, errors.Wrap(err, "failed to get new registry from chain") } - err = rs.orm.UpsertRegistry(®istry) + err = rs.orm.UpsertRegistry(ctx, ®istry) if err != nil { return Registry{}, errors.Wrap(err, "failed to upsert registry") } @@ -42,13 +43,13 @@ func (rs *RegistrySynchronizer) syncRegistry() (Registry, error) { return registry, nil } -func (rs *RegistrySynchronizer) fullSyncUpkeeps(reg Registry) error { +func (rs *RegistrySynchronizer) fullSyncUpkeeps(ctx context.Context, reg Registry) error { activeUpkeepIDs, err := rs.registryWrapper.GetActiveUpkeepIDs(nil) if err != nil { return errors.Wrap(err, "unable to get active upkeep IDs") } - existingUpkeepIDs, err := rs.orm.AllUpkeepIDsForRegistry(reg.ID) + existingUpkeepIDs, err := rs.orm.AllUpkeepIDsForRegistry(ctx, reg.ID) if err != nil { return errors.Wrap(err, "unable to fetch existing upkeep IDs from DB") } @@ -59,7 +60,7 @@ func (rs *RegistrySynchronizer) fullSyncUpkeeps(reg Registry) error { activeSet[upkeepID.String()] = true allActiveUpkeeps = append(allActiveUpkeeps, *big.New(upkeepID)) } - rs.batchSyncUpkeepsOnRegistry(reg, allActiveUpkeeps) + rs.batchSyncUpkeepsOnRegistry(ctx, reg, allActiveUpkeeps) // All upkeeps in existingUpkeepIDs, not in activeUpkeepIDs should be deleted canceled := make([]big.Big, 0) @@ -68,7 +69,7 @@ func (rs *RegistrySynchronizer) fullSyncUpkeeps(reg Registry) error { canceled = append(canceled, upkeepID) } } - if _, err := rs.orm.BatchDeleteUpkeepsForJob(rs.job.ID, canceled); err != nil { + if _, err := rs.orm.BatchDeleteUpkeepsForJob(ctx, rs.job.ID, canceled); err != nil { return errors.Wrap(err, "failed to batch delete upkeeps from job") } return nil @@ -76,28 +77,27 @@ func (rs *RegistrySynchronizer) fullSyncUpkeeps(reg Registry) error { // batchSyncUpkeepsOnRegistry syncs upkeeps at a time in parallel // for all the IDs within newUpkeeps slice -func (rs *RegistrySynchronizer) batchSyncUpkeepsOnRegistry(reg Registry, newUpkeeps []big.Big) { +func (rs *RegistrySynchronizer) batchSyncUpkeepsOnRegistry(ctx context.Context, reg Registry, newUpkeeps []big.Big) { wg := sync.WaitGroup{} - wg.Add(len(newUpkeeps)) chSyncUpkeepQueue := make(chan struct{}, rs.syncUpkeepQueueSize) done := func() { <-chSyncUpkeepQueue; wg.Done() } for i := range newUpkeeps { select { - case <-rs.chStop: - return + case <-ctx.Done(): case chSyncUpkeepQueue <- struct{}{}: - go rs.syncUpkeepWithCallback(&rs.registryWrapper, reg, &newUpkeeps[i], done) + wg.Add(1) + go rs.syncUpkeepWithCallback(ctx, &rs.registryWrapper, reg, &newUpkeeps[i], done) } } wg.Wait() } -func (rs *RegistrySynchronizer) syncUpkeepWithCallback(getter upkeepGetter, registry Registry, upkeepID *big.Big, doneCallback func()) { +func (rs *RegistrySynchronizer) syncUpkeepWithCallback(ctx context.Context, getter upkeepGetter, registry Registry, upkeepID *big.Big, doneCallback func()) { defer doneCallback() - if err := rs.syncUpkeep(getter, registry, upkeepID); err != nil { + if err := rs.syncUpkeep(ctx, getter, registry, upkeepID); err != nil { rs.logger.With("err", err.Error()).With( "upkeepID", NewUpkeepIdentifier(upkeepID).String(), "registryContract", registry.ContractAddress.Hex(), @@ -105,7 +105,7 @@ func (rs *RegistrySynchronizer) syncUpkeepWithCallback(getter upkeepGetter, regi } } -func (rs *RegistrySynchronizer) syncUpkeep(getter upkeepGetter, registry Registry, upkeepID *big.Big) error { +func (rs *RegistrySynchronizer) syncUpkeep(ctx context.Context, getter upkeepGetter, registry Registry, upkeepID *big.Big) error { upkeep, err := getter.GetUpkeep(nil, upkeepID.ToInt()) if err != nil { return errors.Wrap(err, "failed to get upkeep config") @@ -126,11 +126,11 @@ func (rs *RegistrySynchronizer) syncUpkeep(getter upkeepGetter, registry Registr PositioningConstant: positioningConstant, UpkeepID: upkeepID, } - if err := rs.orm.UpsertUpkeep(&newUpkeep); err != nil { + if err := rs.orm.UpsertUpkeep(ctx, &newUpkeep); err != nil { return errors.Wrap(err, "failed to upsert upkeep") } - if err := rs.orm.UpdateUpkeepLastKeeperIndex(rs.job.ID, upkeepID, types.EIP55AddressFromAddress(upkeep.LastKeeper)); err != nil { + if err := rs.orm.UpdateUpkeepLastKeeperIndex(ctx, rs.job.ID, upkeepID, types.EIP55AddressFromAddress(upkeep.LastKeeper)); err != nil { return errors.Wrap(err, "failed to update upkeep last keeper index") } diff --git a/core/services/keeper/registry_synchronizer_sync_test.go b/core/services/keeper/registry_synchronizer_sync_test.go index e4d8e44e20a..7cc1c5a11cc 100644 --- a/core/services/keeper/registry_synchronizer_sync_test.go +++ b/core/services/keeper/registry_synchronizer_sync_test.go @@ -27,6 +27,7 @@ func (g *GetUpkeepFailure) GetUpkeep(opts *bind.CallOpts, id *big.Int) (*UpkeepC } func TestSyncUpkeepWithCallback_UpkeepNotFound(t *testing.T) { + ctx := testutils.Context(t) log, logObserver := logger.TestLoggerObserved(t, zapcore.ErrorLevel) synchronizer := &RegistrySynchronizer{ logger: log.(logger.SugaredLogger), @@ -49,7 +50,7 @@ func TestSyncUpkeepWithCallback_UpkeepNotFound(t *testing.T) { } getter := &GetUpkeepFailure{} - synchronizer.syncUpkeepWithCallback(getter, registry, id, doneFunc) + synchronizer.syncUpkeepWithCallback(ctx, getter, registry, id, doneFunc) // logs should have the upkeep identifier included in the error context properly formatted require.Equal(t, 1, logObserver.Len()) diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go index f86859f7452..c66f2d31c5a 100644 --- a/core/services/keeper/upkeep_executer.go +++ b/core/services/keeper/upkeep_executer.go @@ -23,7 +23,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) @@ -64,7 +63,7 @@ type UpkeepExecuter struct { gasEstimator gas.EvmFeeEstimator job job.Job mailbox *mailbox.Mailbox[*evmtypes.Head] - orm ORM + orm *ORM pr pipeline.Runner logger logger.Logger wgDone sync.WaitGroup @@ -74,7 +73,7 @@ type UpkeepExecuter struct { // NewUpkeepExecuter is the constructor of UpkeepExecuter func NewUpkeepExecuter( job job.Job, - orm ORM, + orm *ORM, pr pipeline.Runner, ethClient evmclient.Client, headBroadcaster httypes.HeadBroadcaster, @@ -133,17 +132,19 @@ func (ex *UpkeepExecuter) OnNewLongestChain(_ context.Context, head *evmtypes.He func (ex *UpkeepExecuter) run() { defer ex.wgDone.Done() + ctx, cancel := ex.chStop.NewCtx() + defer cancel() for { select { case <-ex.chStop: return case <-ex.mailbox.Notify(): - ex.processActiveUpkeeps() + ex.processActiveUpkeeps(ctx) } } } -func (ex *UpkeepExecuter) processActiveUpkeeps() { +func (ex *UpkeepExecuter) processActiveUpkeeps(ctx context.Context) { // Keepers could miss their turn in the turn taking algo if they are too overloaded // with work because processActiveUpkeeps() blocks head, exists := ex.mailbox.Retrieve() @@ -154,7 +155,7 @@ func (ex *UpkeepExecuter) processActiveUpkeeps() { ex.logger.Debugw("checking active upkeeps", "blockheight", head.Number) - registry, err := ex.orm.RegistryByContractAddress(ex.job.KeeperSpec.ContractAddress) + registry, err := ex.orm.RegistryByContractAddress(ctx, ex.job.KeeperSpec.ContractAddress) if err != nil { ex.logger.Error(errors.Wrap(err, "unable to load registry")) return @@ -167,6 +168,7 @@ func (ex *UpkeepExecuter) processActiveUpkeeps() { return } activeUpkeeps, err2 = ex.orm.EligibleUpkeepsForRegistry( + ctx, ex.job.KeeperSpec.ContractAddress, head.Number, ex.config.MaxGracePeriod(), @@ -232,7 +234,7 @@ func (ex *UpkeepExecuter) execute(upkeep UpkeepRegistration, head *evmtypes.Head // Only after task runs where a tx was broadcast if run.State == pipeline.RunStatusCompleted { - rowsAffected, err := ex.orm.SetLastRunInfoForUpkeepOnJob(ex.job.ID, upkeep.UpkeepID, head.Number, upkeep.Registry.FromAddress, pg.WithParentCtx(ctxService)) + rowsAffected, err := ex.orm.SetLastRunInfoForUpkeepOnJob(ctxService, ex.job.ID, upkeep.UpkeepID, head.Number, upkeep.Registry.FromAddress) if err != nil { svcLogger.Error(errors.Wrap(err, "failed to set last run height for upkeep")) } diff --git a/core/services/keeper/upkeep_executer_test.go b/core/services/keeper/upkeep_executer_test.go index fbe61f35743..10995ec3c0d 100644 --- a/core/services/keeper/upkeep_executer_test.go +++ b/core/services/keeper/upkeep_executer_test.go @@ -66,7 +66,7 @@ func setup(t *testing.T, estimator gas.EvmFeeEstimator, overrideFn func(c *chain *txmmocks.MockEvmTxManager, keystore.Master, legacyevm.Chain, - keeper.ORM, + *keeper.ORM, ) { cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { c.Keeper.TurnLookBack = ptr[int64](0) @@ -85,12 +85,12 @@ func setup(t *testing.T, estimator gas.EvmFeeEstimator, overrideFn func(c *chain legacyChains := evmrelay.NewLegacyChainsFromRelayerExtenders(relayExtenders) jpv2 := cltest.NewJobPipelineV2(t, cfg.WebServer(), cfg.JobPipeline(), cfg.Database(), legacyChains, db, keyStore, nil, nil) ch := evmtest.MustGetDefaultChain(t, legacyChains) - orm := keeper.NewORM(db, logger.TestLogger(t), ch.Config().Database()) + orm := keeper.NewORM(db, logger.TestLogger(t)) registry, jb := cltest.MustInsertKeeperRegistry(t, db, orm, keyStore.Eth(), 0, 1, 20) lggr := logger.TestLogger(t) executer := keeper.NewUpkeepExecuter(jb, orm, jpv2.Pr, ethClient, ch.HeadBroadcaster(), ch.GasEstimator(), lggr, ch.Config().Keeper(), jb.KeeperSpec.FromAddress.Address()) - upkeep := cltest.MustInsertUpkeepForRegistry(t, db, ch.Config().Database(), registry) + upkeep := cltest.MustInsertUpkeepForRegistry(t, db, registry) servicetest.Run(t, executer) return db, cfg, ethClient, executer, registry, upkeep, jb, jpv2, txm, keyStore, ch, orm } @@ -267,7 +267,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { registry, jb := cltest.MustInsertKeeperRegistry(t, db, orm, keyStore.Eth(), 0, 1, 20) // change chain ID to non-configured chain jb.KeeperSpec.EVMChainID = (*ubig.Big)(big.NewInt(999)) - cltest.MustInsertUpkeepForRegistry(t, db, ch.Config().Database(), registry) + cltest.MustInsertUpkeepForRegistry(t, db, registry) lggr := logger.TestLogger(t) executer := keeper.NewUpkeepExecuter(jb, orm, jpv2.Pr, ethMock, ch.HeadBroadcaster(), ch.GasEstimator(), lggr, ch.Config().Keeper(), jb.KeeperSpec.FromAddress.Address()) err := executer.Start(testutils.Context(t))