Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/services/keeper: switch to sqlutil.DataSource #12820

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/many-pillows-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/keeper: switch to sqlutil.DataSource #internal
22 changes: 12 additions & 10 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -379,15 +378,16 @@ func MakeDirectRequestJobSpec(t *testing.T) *job.Job {
return spec
}

func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm keeper.ORM, from evmtypes.EIP55Address, contract evmtypes.EIP55Address) job.Job {
func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm *keeper.ORM, from evmtypes.EIP55Address, contract evmtypes.EIP55Address) job.Job {
t.Helper()
ctx := testutils.Context(t)

var keeperSpec job.KeeperSpec
err := korm.Q().Get(&keeperSpec, `INSERT INTO keeper_specs (contract_address, from_address, created_at, updated_at,evm_chain_id) VALUES ($1, $2, NOW(), NOW(), $3) RETURNING *`, contract, from, testutils.SimulatedChainID.Int64())
err := korm.DataSource().GetContext(ctx, &keeperSpec, `INSERT INTO keeper_specs (contract_address, from_address, created_at, updated_at,evm_chain_id) VALUES ($1, $2, NOW(), NOW(), $3) RETURNING *`, contract, from, testutils.SimulatedChainID.Int64())
require.NoError(t, err)

var pipelineSpec pipeline.Spec
err = korm.Q().Get(&pipelineSpec, `INSERT INTO pipeline_specs (dot_dag_source,created_at) VALUES ('',NOW()) RETURNING *`)
err = korm.DataSource().GetContext(ctx, &pipelineSpec, `INSERT INTO pipeline_specs (dot_dag_source,created_at) VALUES ('',NOW()) RETURNING *`)
require.NoError(t, err)

jb := job.Job{
Expand All @@ -411,10 +411,11 @@ func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm keeper.ORM, from evmtyp
return jb
}

func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm keeper.ORM, ethKeyStore keystore.Eth, keeperIndex, numKeepers, blockCountPerTurn int32) (keeper.Registry, job.Job) {
func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm *keeper.ORM, ethKeyStore keystore.Eth, keeperIndex, numKeepers, blockCountPerTurn int32) (keeper.Registry, job.Job) {
t.Helper()
ctx := testutils.Context(t)
key, _ := MustInsertRandomKey(t, ethKeyStore, *ubig.New(testutils.SimulatedChainID))
from := key.EIP55Address
t.Helper()
contractAddress := NewEIP55Address()
jb := MustInsertKeeperJob(t, db, korm, from, contractAddress)
registry := keeper.Registry{
Expand All @@ -429,13 +430,14 @@ func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm keeper.ORM, ethKey
from: keeperIndex,
},
}
err := korm.UpsertRegistry(&registry)
err := korm.UpsertRegistry(ctx, &registry)
require.NoError(t, err)
return registry, jb
}

func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, registry keeper.Registry) keeper.UpkeepRegistration {
korm := keeper.NewORM(db, logger.TestLogger(t), cfg)
func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, registry keeper.Registry) keeper.UpkeepRegistration {
ctx := testutils.Context(t)
korm := keeper.NewORM(db, logger.TestLogger(t))
upkeepID := ubig.NewI(int64(mathrand.Uint32()))
upkeep := keeper.UpkeepRegistration{
UpkeepID: upkeepID,
Expand All @@ -447,7 +449,7 @@ func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, regi
positioningConstant, err := keeper.CalcPositioningConstant(upkeepID, registry.ContractAddress)
require.NoError(t, err)
upkeep.PositioningConstant = positioningConstant
err = korm.UpsertUpkeep(&upkeep)
err = korm.UpsertUpkeep(ctx, &upkeep)
require.NoError(t, err)
return upkeep
}
Expand Down
6 changes: 2 additions & 4 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {
pipelineORM := pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db)
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, keyStore, config.Database())
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

t.Run("it deletes records for offchainreporting jobs", func(t *testing.T) {
_, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{})
Expand Down Expand Up @@ -381,8 +380,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {

t.Run("it deletes records for keeper jobs", func(t *testing.T) {
registry, keeperJob := cltest.MustInsertKeeperRegistry(t, db, korm, keyStore.Eth(), 0, 1, 20)
scoped := evmtest.NewChainScopedConfig(t, config)
cltest.MustInsertUpkeepForRegistry(t, db, scoped.Database(), registry)
cltest.MustInsertUpkeepForRegistry(t, db, registry)

cltest.AssertCount(t, db, "keeper_specs", 1)
cltest.AssertCount(t, db, "keeper_registries", 1)
Expand Down
2 changes: 1 addition & 1 deletion core/services/keeper/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
return nil, err
}
registryAddress := spec.KeeperSpec.ContractAddress
orm := NewORM(d.db, d.logger, chain.Config().Database())
orm := NewORM(d.db, d.logger)
svcLogger := d.logger.With(
"jobID", spec.ID,
"registryAddress", registryAddress.Hex(),
Expand Down
5 changes: 3 additions & 2 deletions core/services/keeper/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package keeper

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
)

func (rs *RegistrySynchronizer) ExportedFullSync() {
rs.fullSync()
func (rs *RegistrySynchronizer) ExportedFullSync(ctx context.Context) {
rs.fullSync(ctx)
}

func (rw *RegistryWrapper) GetUpkeepIdFromRawRegistrationLog(rawLog types.Log) (*big.Int, error) {
Expand Down
19 changes: 8 additions & 11 deletions core/services/keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
Expand Down Expand Up @@ -247,8 +246,7 @@ func TestKeeperEthIntegration(t *testing.T) {
c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) // disable reorg protection for this test
c.EVM[0].HeadTracker.MaxBufferSize = ptr[uint32](100) // helps prevent missed heads
})
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey)
require.NoError(t, app.Start(testutils.Context(t)))
Expand Down Expand Up @@ -328,6 +326,7 @@ func TestKeeperEthIntegration(t *testing.T) {
func TestKeeperForwarderEthIntegration(t *testing.T) {
t.Parallel()
t.Run("keeper_forwarder_flow", func(t *testing.T) {
ctx := testutils.Context(t)
g := gomega.NewWithT(t)

// setup node key
Expand Down Expand Up @@ -407,15 +406,14 @@ func TestKeeperForwarderEthIntegration(t *testing.T) {
c.EVM[0].Transactions.ForwardersEnabled = ptr(true) // Enable Operator Forwarder flow
c.EVM[0].ChainID = (*ubig.Big)(testutils.SimulatedChainID)
})
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey)
require.NoError(t, app.Start(testutils.Context(t)))
require.NoError(t, app.Start(ctx))

forwarderORM := forwarders.NewORM(db)
chainID := ubig.Big(*backend.ConfiguredChainID())
_, err = forwarderORM.CreateForwarder(testutils.Context(t), fwdrAddress, chainID)
_, err = forwarderORM.CreateForwarder(ctx, fwdrAddress, chainID)
require.NoError(t, err)

addr, err := app.GetRelayers().LegacyEVMChains().Slice()[0].TxManager().GetForwarderForEOA(nodeAddress)
Expand Down Expand Up @@ -452,7 +450,7 @@ func TestKeeperForwarderEthIntegration(t *testing.T) {
evmtypes.EIP55AddressFromAddress(nelly.From): 1,
},
}
err = korm.UpsertRegistry(&registry)
err = korm.UpsertRegistry(ctx, &registry)
require.NoError(t, err)

callOpts := bind.CallOpts{From: nodeAddress}
Expand All @@ -464,7 +462,7 @@ func TestKeeperForwarderEthIntegration(t *testing.T) {
}
require.Equal(t, lastKeeper(), common.Address{})

err = app.JobSpawner().StartService(testutils.Context(t), jb)
err = app.JobSpawner().StartService(ctx, jb)
require.NoError(t, err)

// keeper job is triggered and payload is received
Expand Down Expand Up @@ -551,8 +549,7 @@ func TestMaxPerformDataSize(t *testing.T) {
c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) // disable reorg protection for this test
c.EVM[0].HeadTracker.MaxBufferSize = ptr[uint32](100) // helps prevent missed heads
})
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey)
require.NoError(t, app.Start(testutils.Context(t)))
Expand Down
70 changes: 39 additions & 31 deletions core/services/keeper/orm.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,60 @@
package keeper

import (
"context"
"math/rand"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// ORM implements ORM layer using PostgreSQL
type ORM struct {
q pg.Q
ds sqlutil.DataSource
logger logger.Logger
}

// NewORM is the constructor of postgresORM
func NewORM(db *sqlx.DB, lggr logger.Logger, config pg.QConfig) ORM {
func NewORM(ds sqlutil.DataSource, lggr logger.Logger) *ORM {
lggr = lggr.Named("KeeperORM")
return ORM{
q: pg.NewQ(db, lggr, config),
return &ORM{
ds: ds,
logger: lggr,
}
}

func (korm ORM) Q() pg.Q {
return korm.q
func (o *ORM) DataSource() sqlutil.DataSource {
return o.ds
}

// Registries returns all registries
func (korm ORM) Registries() ([]Registry, error) {
func (o *ORM) Registries(ctx context.Context) ([]Registry, error) {
var registries []Registry
err := korm.q.Select(&registries, `SELECT * FROM keeper_registries ORDER BY id ASC`)
err := o.ds.SelectContext(ctx, &registries, `SELECT * FROM keeper_registries ORDER BY id ASC`)
return registries, errors.Wrap(err, "failed to get registries")
}

// RegistryByContractAddress returns a single registry based on provided address
func (korm ORM) RegistryByContractAddress(registryAddress types.EIP55Address) (Registry, error) {
func (o *ORM) RegistryByContractAddress(ctx context.Context, registryAddress types.EIP55Address) (Registry, error) {
var registry Registry
err := korm.q.Get(&registry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress)
err := o.ds.GetContext(ctx, &registry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress)
return registry, errors.Wrap(err, "failed to get registry")
}

// RegistryForJob returns a specific registry for a job with the given ID
func (korm ORM) RegistryForJob(jobID int32) (Registry, error) {
func (o *ORM) RegistryForJob(ctx context.Context, jobID int32) (Registry, error) {
var registry Registry
err := korm.q.Get(&registry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID)
err := o.ds.GetContext(ctx, &registry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID)
return registry, errors.Wrapf(err, "failed to get registry with job_id %d", jobID)
}

// UpsertRegistry upserts registry by the given input
func (korm ORM) UpsertRegistry(registry *Registry) error {
func (o *ORM) UpsertRegistry(ctx context.Context, registry *Registry) error {
stmt := `
INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_address, check_gas, block_count_per_turn, num_keepers, keeper_index_map) VALUES (
:job_id, :keeper_index, :contract_address, :from_address, :check_gas, :block_count_per_turn, :num_keepers, :keeper_index_map
Expand All @@ -66,12 +66,16 @@ INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_addr
keeper_index_map = :keeper_index_map
RETURNING *
`
err := korm.q.GetNamed(stmt, registry, registry)
query, args, err := o.ds.BindNamed(stmt, registry)
if err != nil {
return errors.Wrap(err, "failed to upsert registry")
}
err = o.ds.GetContext(ctx, registry, query, args...)
return errors.Wrap(err, "failed to upsert registry")
}

// UpsertUpkeep upserts upkeep by the given input
func (korm ORM) UpsertUpkeep(registration *UpkeepRegistration) error {
func (o *ORM) UpsertUpkeep(ctx context.Context, registration *UpkeepRegistration) error {
stmt := `
INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_id, positioning_constant, last_run_block_height) VALUES (
:registry_id, :execute_gas, :check_data, :upkeep_id, :positioning_constant, :last_run_block_height
Expand All @@ -81,13 +85,17 @@ INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_i
positioning_constant = :positioning_constant
RETURNING *
`
err := korm.q.GetNamed(stmt, registration, registration)
query, args, err := o.ds.BindNamed(stmt, registration)
if err != nil {
return errors.Wrap(err, "failed to upsert upkeep")
}
err = o.ds.GetContext(ctx, registration, query, args...)
return errors.Wrap(err, "failed to upsert upkeep")
}

// UpdateUpkeepLastKeeperIndex updates the last keeper index for an upkeep
func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, fromAddress types.EIP55Address) error {
_, err := korm.q.Exec(`
func (o *ORM) UpdateUpkeepLastKeeperIndex(ctx context.Context, jobID int32, upkeepID *big.Big, fromAddress types.EIP55Address) error {
_, err := o.ds.ExecContext(ctx, `
UPDATE upkeep_registrations
SET
last_keeper_index = CAST((SELECT keeper_index_map -> $3 FROM keeper_registries WHERE job_id = $1) AS int)
Expand All @@ -98,12 +106,12 @@ func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, from
}

// BatchDeleteUpkeepsForJob deletes all upkeeps by the given IDs for the job with the given ID
func (korm ORM) BatchDeleteUpkeepsForJob(jobID int32, upkeepIDs []big.Big) (int64, error) {
func (o *ORM) BatchDeleteUpkeepsForJob(ctx context.Context, jobID int32, upkeepIDs []big.Big) (int64, error) {
strIds := []string{}
for _, upkeepID := range upkeepIDs {
strIds = append(strIds, upkeepID.String())
}
res, err := korm.q.Exec(`
res, err := o.ds.ExecContext(ctx, `
DELETE FROM upkeep_registrations WHERE registry_id IN (
SELECT id FROM keeper_registries WHERE job_id = $1
) AND upkeep_id = ANY($2)
Expand All @@ -125,7 +133,7 @@ DELETE FROM upkeep_registrations WHERE registry_id IN (
// -- OR is it my buddy's turn AND they were the last keeper to do the perform for this upkeep
// DEV: note we cast upkeep_id and binaryHash as 32 bits, even though both are 256 bit numbers when performing XOR. This is enough information
// to distribute the upkeeps over the keepers so long as num keepers < 4294967296
func (korm ORM) EligibleUpkeepsForRegistry(registryAddress types.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) {
func (o *ORM) EligibleUpkeepsForRegistry(ctx context.Context, registryAddress types.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) {
stmt := `
SELECT upkeep_registrations.*
FROM upkeep_registrations
Expand Down Expand Up @@ -165,10 +173,10 @@ WHERE keeper_registries.contract_address = $1
)
)
`
if err = korm.q.Select(&upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil {
if err = o.ds.SelectContext(ctx, &upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil {
return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to get upkeep_registrations")
}
if err = loadUpkeepsRegistry(korm.q, upkeeps); err != nil {
if err = o.loadUpkeepsRegistry(ctx, upkeeps); err != nil {
return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to load Registry on upkeeps")
}

Expand All @@ -179,7 +187,7 @@ WHERE keeper_registries.contract_address = $1
return upkeeps, err
}

func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
func (o *ORM) loadUpkeepsRegistry(ctx context.Context, upkeeps []UpkeepRegistration) error {
registryIDM := make(map[int64]*Registry)
var registryIDs []int64
for _, upkeep := range upkeeps {
Expand All @@ -189,7 +197,7 @@ func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
}
}
var registries []*Registry
err := q.Select(&registries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs))
err := o.ds.SelectContext(ctx, &registries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs))
if err != nil {
return errors.Wrap(err, "loadUpkeepsRegistry failed")
}
Expand All @@ -202,8 +210,8 @@ func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
return nil
}

func (korm ORM) AllUpkeepIDsForRegistry(regID int64) (upkeeps []big.Big, err error) {
err = korm.q.Select(&upkeeps, `
func (o *ORM) AllUpkeepIDsForRegistry(ctx context.Context, regID int64) (upkeeps []big.Big, err error) {
err = o.ds.SelectContext(ctx, &upkeeps, `
SELECT upkeep_id
FROM upkeep_registrations
WHERE registry_id = $1
Expand All @@ -212,8 +220,8 @@ WHERE registry_id = $1
}

// SetLastRunInfoForUpkeepOnJob sets the last run block height and the associated keeper index only if the new block height is greater than the previous.
func (korm ORM) SetLastRunInfoForUpkeepOnJob(jobID int32, upkeepID *big.Big, height int64, fromAddress types.EIP55Address, qopts ...pg.QOpt) (int64, error) {
res, err := korm.q.WithOpts(qopts...).Exec(`
func (o *ORM) SetLastRunInfoForUpkeepOnJob(ctx context.Context, jobID int32, upkeepID *big.Big, height int64, fromAddress types.EIP55Address) (int64, error) {
res, err := o.ds.ExecContext(ctx, `
UPDATE upkeep_registrations
SET last_run_block_height = $1,
last_keeper_index = CAST((SELECT keeper_index_map -> $4 FROM keeper_registries WHERE job_id = $3) AS int)
Expand Down
Loading
Loading