Skip to content

Commit

Permalink
continued indexer implementation, added finalization processing
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jul 31, 2024
1 parent 38778c7 commit a2ef656
Show file tree
Hide file tree
Showing 30 changed files with 1,641 additions and 151 deletions.
8 changes: 8 additions & 0 deletions clients/consensus/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,11 @@ func (cs *ChainState) SlotToSlotIndex(slot phase0.Slot) phase0.Slot {

return slot % phase0.Slot(cs.specs.SlotsPerEpoch)
}

func (cs *ChainState) EpochStartSlot(epoch phase0.Epoch) phase0.Slot {
if cs.specs == nil {
return 0
}

return phase0.Slot(epoch) * phase0.Slot(cs.specs.SlotsPerEpoch)
}
7 changes: 7 additions & 0 deletions clients/consensus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ func (client *Client) GetLastClientError() error {
return client.lastError
}

func (client *Client) GetFinalityCheckpoint() (finalitedEpoch phase0.Epoch, finalizedRoot phase0.Root, justifiedEpoch phase0.Epoch, justifiedRoot phase0.Root) {
client.headMutex.RLock()
defer client.headMutex.RUnlock()

return client.finalizedEpoch, client.finalizedRoot, client.justifiedEpoch, client.justifiedRoot
}

func (client *Client) GetStatus() ClientStatus {
switch {
case client.isSyncing:
Expand Down
5 changes: 5 additions & 0 deletions clients/consensus/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/ethpandaops/ethwallclock"
"github.com/sirupsen/logrus"
"golang.org/x/exp/rand"
)

type Pool struct {
Expand Down Expand Up @@ -75,6 +76,10 @@ func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client {
readyClients = append(readyClients, client)
}

rand.Shuffle(len(readyClients), func(i, j int) {
readyClients[i], readyClients[j] = readyClients[j], readyClients[i]
})

return readyClients[0]
}

Expand Down
12 changes: 7 additions & 5 deletions db/deposits.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func InsertDeposits(deposits []*dbtypes.Deposit, tx *sqlx.Tx) error {
dbtypes.DBEnginePgsql: "INSERT INTO deposits ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO deposits ",
}),
"(deposit_index, slot_number, slot_index, slot_root, orphaned, publickey, withdrawalcredentials, amount)",
"(deposit_index, slot_number, slot_index, slot_root, orphaned, publickey, withdrawalcredentials, amount, fork_id)",
" VALUES ",
)
argIdx := 0
fieldCount := 8
fieldCount := 9

args := make([]any, len(deposits)*fieldCount)
for i, deposit := range deposits {
Expand All @@ -100,10 +100,11 @@ func InsertDeposits(deposits []*dbtypes.Deposit, tx *sqlx.Tx) error {
args[argIdx+5] = deposit.PublicKey[:]
args[argIdx+6] = deposit.WithdrawalCredentials[:]
args[argIdx+7] = deposit.Amount
args[argIdx+8] = deposit.ForkId
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (slot_index, slot_root) DO UPDATE SET deposit_index = excluded.deposit_index, orphaned = excluded.orphaned",
dbtypes.DBEnginePgsql: " ON CONFLICT (slot_index, slot_root) DO UPDATE SET deposit_index = excluded.deposit_index, orphaned = excluded.orphaned, fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))
_, err := tx.Exec(sql.String(), args...)
Expand Down Expand Up @@ -239,7 +240,7 @@ func GetDepositsFiltered(offset uint64, limit uint32, finalizedBlock uint64, fil
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
deposit_index, slot_number, slot_index, slot_root, orphaned, publickey, withdrawalcredentials, amount
deposit_index, slot_number, slot_index, slot_root, orphaned, publickey, withdrawalcredentials, amount, fork_id
FROM deposits
`)

Expand Down Expand Up @@ -289,7 +290,8 @@ func GetDepositsFiltered(offset uint64, limit uint32, finalizedBlock uint64, fil
false AS orphaned,
null AS publickey,
null AS withdrawalcredentials,
0 AS amount
0 AS amount,
0 AS fork_id
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
Expand Down
8 changes: 0 additions & 8 deletions db/schema/pgsql/20230820050910_indexer-cache.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ CREATE INDEX IF NOT EXISTS "unfinalized_blocks_slot_idx"
ON public."unfinalized_blocks"
("slot" ASC NULLS LAST);

CREATE TABLE IF NOT EXISTS public."unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" bytea NOT NULL,
"duties" bytea NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
Expand Down
2 changes: 0 additions & 2 deletions db/schema/pgsql/20230923231035_unfinalized-epochs.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
-- +goose Up
-- +goose StatementBegin

DROP TABLE IF EXISTS public."unfinalized_duties";

CREATE TABLE IF NOT EXISTS public."unfinalized_epochs"
(
"epoch" bigint NOT NULL,
Expand Down
48 changes: 48 additions & 0 deletions db/schema/pgsql/20240730183116_unfinalized-blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,54 @@ CREATE INDEX IF NOT EXISTS "forks_base_root_idx"
ON public."forks"
("base_root" ASC NULLS LAST);

CREATE TABLE IF NOT EXISTS public."unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" bytea NOT NULL,
"duties" bytea NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- add fork_id to slots
ALTER TABLE public."slots"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "slots" SET "fork_id" = 1 WHERE "status" = 2;

CREATE INDEX IF NOT EXISTS "slots_fork_id_idx"
ON public."slots"
("fork_id" ASC NULLS LAST);

-- add fork_id to deposits
ALTER TABLE public."deposits"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "deposits" SET "fork_id" = 1 WHERE "orphaned" = TRUE;

CREATE INDEX IF NOT EXISTS "deposits_fork_id_idx"
ON public."deposits"
("fork_id" ASC NULLS LAST);

-- add fork_id to voluntary_exits
ALTER TABLE public."voluntary_exits"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "voluntary_exits" SET "fork_id" = 1 WHERE "orphaned" = TRUE;

CREATE INDEX IF NOT EXISTS "voluntary_exits_fork_id_idx"
ON public."voluntary_exits"
("fork_id" ASC NULLS LAST);

-- add fork_id to slashings
ALTER TABLE public."slashings"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "slashings" SET "fork_id" = 1 WHERE "orphaned" = TRUE;

CREATE INDEX IF NOT EXISTS "slashings_fork_id_idx"
ON public."slashings"
("fork_id" ASC NULLS LAST);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
Expand Down
8 changes: 0 additions & 8 deletions db/schema/sqlite/20230820050910_indexer-cache.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ CREATE INDEX IF NOT EXISTS "unfinalized_blocks_slot_idx"
ON "unfinalized_blocks"
("slot" ASC);

CREATE TABLE IF NOT EXISTS "unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" BLOB NOT NULL,
"duties" BLOB NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
Expand Down
2 changes: 0 additions & 2 deletions db/schema/sqlite/20230923231035_unfinalized-epochs.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
-- +goose Up
-- +goose StatementBegin

DROP TABLE IF EXISTS "unfinalized_duties";

CREATE TABLE IF NOT EXISTS "unfinalized_epochs"
(
"epoch" BIGINT NOT NULL,
Expand Down
47 changes: 47 additions & 0 deletions db/schema/sqlite/20240730183116_unfinalized-blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,53 @@ CREATE INDEX IF NOT EXISTS "forks_base_root_idx"
ON "forks"
("base_root" ASC);

CREATE TABLE IF NOT EXISTS "unfinalized_duties"
(
"epoch" bigint NOT NULL,
"dependent_root" BLOB NOT NULL,
"duties" BLOB NOT NULL,
CONSTRAINT "unfinalized_duties_pkey" PRIMARY KEY ("epoch", "dependent_root")
);

-- add fork_id to slots
ALTER TABLE "slots"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "slots" SET "fork_id" = 1 WHERE "status" = 2;

CREATE INDEX IF NOT EXISTS "slots_fork_id_idx"
ON "slots"
("fork_id" ASC);

-- add fork_id to deposits
ALTER TABLE "deposits"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "deposits" SET "fork_id" = 1 WHERE "orphaned" = TRUE;

CREATE INDEX IF NOT EXISTS "deposits_fork_id_idx"
ON "deposits"
("fork_id" ASC);

-- add fork_id to voluntary_exits
ALTER TABLE "voluntary_exits"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "voluntary_exits" SET "fork_id" = 1 WHERE "orphaned" = TRUE;

CREATE INDEX IF NOT EXISTS "voluntary_exits_fork_id_idx"
ON "voluntary_exits"
("fork_id" ASC);

-- add fork_id to slashings
ALTER TABLE "slashings"
ADD "fork_id" BIGINT NOT NULL DEFAULT 0;

UPDATE "slashings" SET "fork_id" = 1 WHERE "orphaned" = TRUE;

CREATE INDEX IF NOT EXISTS "slashings_fork_id_idx"
ON "slashings"
("fork_id" ASC);

-- +goose StatementEnd
-- +goose Down
Expand Down
14 changes: 8 additions & 6 deletions db/slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func InsertSlashings(slashings []*dbtypes.Slashing, tx *sqlx.Tx) error {
dbtypes.DBEnginePgsql: "INSERT INTO slashings ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO slashings ",
}),
"(slot_number, slot_index, slot_root, orphaned, validator, slasher, reason)",
"(slot_number, slot_index, slot_root, orphaned, validator, slasher, reason, fork_id)",
" VALUES ",
)
argIdx := 0
fieldCount := 7
fieldCount := 8

args := make([]any, len(slashings)*fieldCount)
for i, slashing := range slashings {
Expand All @@ -43,10 +43,11 @@ func InsertSlashings(slashings []*dbtypes.Slashing, tx *sqlx.Tx) error {
args[argIdx+4] = slashing.ValidatorIndex
args[argIdx+5] = slashing.SlasherIndex
args[argIdx+6] = slashing.Reason
args[argIdx+7] = slashing.ForkId
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (slot_root, slot_index, validator) DO UPDATE SET orphaned = excluded.orphaned",
dbtypes.DBEnginePgsql: " ON CONFLICT (slot_root, slot_index, validator) DO UPDATE SET orphaned = excluded.orphaned, fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

Expand All @@ -64,7 +65,7 @@ func GetSlashingForValidator(validator uint64) *dbtypes.Slashing {
}
fmt.Fprint(&sql, `
SELECT
slot_number, slot_index, slot_root, orphaned, validator, slasher, reason
slot_number, slot_index, slot_root, orphaned, validator, slasher, reason, fork_id
FROM slashings
WHERE validator = $1
`)
Expand All @@ -83,7 +84,7 @@ func GetSlashingsFiltered(offset uint64, limit uint32, finalizedBlock uint64, fi
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
slot_number, slot_index, slot_root, orphaned, validator, slasher, reason
slot_number, slot_index, slot_root, orphaned, validator, slasher, reason, fork_id
FROM slashings
`)

Expand Down Expand Up @@ -163,7 +164,8 @@ func GetSlashingsFiltered(offset uint64, limit uint32, finalizedBlock uint64, fi
false AS orphaned,
0 AS validator,
0 AS slasher,
0 AS reason
0 AS reason,
0 AS fork_id
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
Expand Down
24 changes: 12 additions & 12 deletions db/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func InsertSlot(slot *dbtypes.Slot, tx *sqlx.Tx) error {
slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, eth_block_number, eth_block_hash,
eth_block_extra, eth_block_extra_text, sync_participation
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
eth_block_extra, eth_block_extra_text, sync_participation, fork_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
ON CONFLICT (slot, root) DO UPDATE SET
status = excluded.status,
eth_block_extra = excluded.eth_block_extra,
Expand All @@ -28,13 +28,13 @@ func InsertSlot(slot *dbtypes.Slot, tx *sqlx.Tx) error {
slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, eth_block_number, eth_block_hash,
eth_block_extra, eth_block_extra_text, sync_participation
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)`,
eth_block_extra, eth_block_extra_text, sync_participation, fork_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)`,
}),
slot.Slot, slot.Proposer, slot.Status, slot.Root, slot.ParentRoot, slot.StateRoot, slot.Graffiti, slot.GraffitiText,
slot.AttestationCount, slot.DepositCount, slot.ExitCount, slot.WithdrawCount, slot.WithdrawAmount, slot.AttesterSlashingCount,
slot.ProposerSlashingCount, slot.BLSChangeCount, slot.EthTransactionCount, slot.EthBlockNumber, slot.EthBlockHash,
slot.EthBlockExtra, slot.EthBlockExtraText, slot.SyncParticipation)
slot.EthBlockExtra, slot.EthBlockExtraText, slot.SyncParticipation, slot.ForkId)
if err != nil {
return err
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func GetSlots(firstSlot uint64, limit uint32, withMissing bool, withOrphaned boo
"state_root", "root", "slot", "proposer", "status", "parent_root", "graffiti", "graffiti_text",
"attestation_count", "deposit_count", "exit_count", "withdraw_count", "withdraw_amount", "attester_slashing_count",
"proposer_slashing_count", "bls_change_count", "eth_transaction_count", "eth_block_number", "eth_block_hash",
"eth_block_extra", "eth_block_extra_text", "sync_participation",
"eth_block_extra", "eth_block_extra_text", "sync_participation", "fork_id",
}
for _, blockField := range blockFields {
fmt.Fprintf(&sql, ", slots.%v AS \"block.%v\"", blockField, blockField)
Expand Down Expand Up @@ -121,7 +121,7 @@ func GetSlotsRange(firstSlot uint64, lastSlot uint64, withMissing bool, withOrph
"state_root", "root", "slot", "proposer", "status", "parent_root", "graffiti", "graffiti_text",
"attestation_count", "deposit_count", "exit_count", "withdraw_count", "withdraw_amount", "attester_slashing_count",
"proposer_slashing_count", "bls_change_count", "eth_transaction_count", "eth_block_number", "eth_block_hash",
"eth_block_extra", "eth_block_extra_text", "sync_participation",
"eth_block_extra", "eth_block_extra_text", "sync_participation", "fork_id",
}
for _, blockField := range blockFields {
fmt.Fprintf(&sql, ", slots.%v AS \"block.%v\"", blockField, blockField)
Expand Down Expand Up @@ -153,7 +153,7 @@ func GetSlotsByParentRoot(parentRoot []byte) []*dbtypes.Slot {
slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, eth_block_number, eth_block_hash,
eth_block_extra, eth_block_extra_text, sync_participation
eth_block_extra, eth_block_extra_text, sync_participation, fork_id
FROM slots
WHERE parent_root = $1
ORDER BY slot DESC
Expand All @@ -172,7 +172,7 @@ func GetSlotByRoot(root []byte) *dbtypes.Slot {
root, slot, parent_root, state_root, status, proposer, graffiti, graffiti_text,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, eth_block_number, eth_block_hash,
eth_block_extra, eth_block_extra_text, sync_participation
eth_block_extra, eth_block_extra_text, sync_participation, fork_id
FROM slots
WHERE root = $1
`, root)
Expand All @@ -187,7 +187,7 @@ func GetBlockHeadByRoot(root []byte) *dbtypes.BlockHead {
blockHead := dbtypes.BlockHead{}
err := ReaderDb.Get(&blockHead, `
SELECT
root, slot, parent_root
root, slot, parent_root, fork_id
FROM slots
WHERE root = $1
`, root)
Expand All @@ -204,7 +204,7 @@ func GetSlotsByBlockHash(blockHash []byte) []*dbtypes.Slot {
slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text,
attestation_count, deposit_count, exit_count, withdraw_count, withdraw_amount, attester_slashing_count,
proposer_slashing_count, bls_change_count, eth_transaction_count, eth_block_number, eth_block_hash,
eth_block_extra, eth_block_extra_text, sync_participation
eth_block_extra, eth_block_extra_text, sync_participation, fork_id
FROM slots
WHERE eth_block_hash = $1
ORDER BY slot DESC
Expand Down Expand Up @@ -264,7 +264,7 @@ func GetFilteredSlots(filter *dbtypes.BlockFilter, firstSlot uint64, offset uint
"state_root", "root", "slot", "proposer", "status", "parent_root", "graffiti", "graffiti_text",
"attestation_count", "deposit_count", "exit_count", "withdraw_count", "withdraw_amount", "attester_slashing_count",
"proposer_slashing_count", "bls_change_count", "eth_transaction_count", "eth_block_number", "eth_block_hash",
"eth_block_extra", "eth_block_extra_text", "sync_participation",
"eth_block_extra", "eth_block_extra_text", "sync_participation", "fork_id",
}
for _, blockField := range blockFields {
fmt.Fprintf(&sql, ", slots.%v AS \"block.%v\"", blockField, blockField)
Expand Down
Loading

0 comments on commit a2ef656

Please sign in to comment.