Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(chainwatch): use height to determine unsynced blocks and fix deadlock in sector deal table #3275

Merged
merged 8 commits into from
Aug 28, 2020
48 changes: 0 additions & 48 deletions cmd/lotus-chainwatch/processor/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip
log.Fatalw("Failed to persist market actors", "error", err)
}

// we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first
if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil {
close(p.sectorDealEvents)
return err
}

if err := p.updateMarket(ctx, marketChanges); err != nil {
log.Fatalw("Failed to update market actors", "error", err)
}
Expand Down Expand Up @@ -272,48 +266,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip

}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
start := time.Now()
defer func() {
Expand Down
13 changes: 0 additions & 13 deletions cmd/lotus-chainwatch/processor/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package processor
import (
"context"
"sync"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -120,10 +119,6 @@ func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[c
}

func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Receipts", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -164,10 +159,6 @@ create temp table recs (like receipts excluding constraints) on commit drop;
}

func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Message Inclusions", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -206,10 +197,6 @@ create temp table mi (like block_messages excluding constraints) on commit drop;
}

func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Messages", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down
52 changes: 49 additions & 3 deletions cmd/lotus-chainwatch/processor/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
preCommitEvents := make(chan *MinerSectorsEvent, 8)
sectorEvents := make(chan *MinerSectorsEvent, 8)
partitionEvents := make(chan *MinerSectorsEvent, 8)
p.sectorDealEvents = make(chan *SectorDealEvent, 8)
dealEvents := make(chan *SectorDealEvent, 8)

grp.Go(func() error {
return p.storePreCommitDealInfo(dealEvents)
})

grp.Go(func() error {
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
Expand All @@ -280,9 +284,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
grp.Go(func() error {
defer func() {
close(preCommitEvents)
close(p.sectorDealEvents)
close(dealEvents)
}()
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents)
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents)
})

grp.Go(func() error {
Expand Down Expand Up @@ -911,6 +915,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
return tx.Commit()
}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now()
defer func() {
Expand Down
2 changes: 0 additions & 2 deletions cmd/lotus-chainwatch/processor/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ func (p *Processor) subMpool(ctx context.Context) {
msgs[v.Message.Message.Cid()] = &v.Message.Message
}

log.Debugf("Processing %d mpool updates", len(msgs))

err := p.storeMessages(msgs)
if err != nil {
log.Error(err)
Expand Down
104 changes: 78 additions & 26 deletions cmd/lotus-chainwatch/processor/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"golang.org/x/xerrors"

"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
Expand All @@ -15,7 +16,19 @@ import (
type powerActorInfo struct {
common actorInfo

epochSmoothingEstimate *smoothing.FilterEstimate
totalRawBytes big.Int
totalRawBytesCommitted big.Int
totalQualityAdjustedBytes big.Int
totalQualityAdjustedBytesCommitted big.Int
totalPledgeCollateral big.Int

newRawBytes big.Int
newQualityAdjustedBytes big.Int
newPledgeCollateral big.Int
newQAPowerSmoothed *smoothing.FilterEstimate

minerCount int64
minerCountAboveMinimumPower int64
}

func (p *Processor) setupPower() error {
Expand All @@ -25,13 +38,27 @@ func (p *Processor) setupPower() error {
}

if _, err := tx.Exec(`
create table if not exists power_smoothing_estimates
create table if not exists chain_power
(
state_root text not null
constraint power_smoothing_estimates_pk
primary key,
position_estimate text not null,
velocity_estimate text not null
state_root text not null
constraint power_smoothing_estimates_pk
primary key,

new_raw_bytes_power text not null,
new_qa_bytes_power text not null,
new_pledge_collateral text not null,

total_raw_bytes_power text not null,
total_raw_bytes_committed text not null,
total_qa_bytes_power text not null,
total_qa_bytes_committed text not null,
total_pledge_collateral text not null,

qa_smoothed_position_estimate text not null,
qa_smoothed_velocity_estimate text not null,

miner_count int not null,
minimum_consensus_miner_count int not null
);
`); err != nil {
return err
Expand Down Expand Up @@ -60,8 +87,8 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
}()

var out []powerActorInfo
for tipset, powers := range powerTips {
for _, act := range powers {
for tipset, powerStates := range powerTips {
for _, act := range powerStates {
var pw powerActorInfo
pw.common = act

Expand All @@ -80,54 +107,79 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err)
}

pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed
pw.totalRawBytes = powerActorState.TotalRawBytePower
pw.totalRawBytesCommitted = powerActorState.TotalBytesCommitted
pw.totalQualityAdjustedBytes = powerActorState.TotalQualityAdjPower
pw.totalQualityAdjustedBytesCommitted = powerActorState.TotalQABytesCommitted
pw.totalPledgeCollateral = powerActorState.TotalPledgeCollateral

pw.newRawBytes = powerActorState.ThisEpochRawBytePower
pw.newQualityAdjustedBytes = powerActorState.ThisEpochQualityAdjPower
pw.newPledgeCollateral = powerActorState.ThisEpochPledgeCollateral
pw.newQAPowerSmoothed = powerActorState.ThisEpochQAPowerSmoothed

pw.minerCount = powerActorState.MinerCount
pw.minerCountAboveMinimumPower = powerActorState.MinerAboveMinPowerCount
out = append(out, pw)
}
}

return out, nil
}

func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error {
func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error {
// NB: use errgroup when there is more than a single store operation
return p.storePowerSmoothingEstimates(powers)
return p.storePowerSmoothingEstimates(powerStates)
}

func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error {
func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error {
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err)
return xerrors.Errorf("begin chain_power tx: %w", err)
}

if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil {
return xerrors.Errorf("prep power_smoothing_estimates: %w", err)
if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil {
return xerrors.Errorf("prep chain_power: %w", err)
}

stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
stmt, err := tx.Prepare(`copy cp (state_root, new_raw_bytes_power, new_qa_bytes_power, new_pledge_collateral, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`)
if err != nil {
return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err)
return xerrors.Errorf("prepare tmp chain_power: %w", err)
}

for _, powerState := range powers {
for _, ps := range powerStates {
if _, err := stmt.Exec(
powerState.common.stateroot.String(),
powerState.epochSmoothingEstimate.PositionEstimate.String(),
powerState.epochSmoothingEstimate.VelocityEstimate.String(),
ps.common.stateroot.String(),
ps.newRawBytes.String(),
ps.newQualityAdjustedBytes.String(),
ps.newPledgeCollateral.String(),

ps.totalRawBytes.String(),
ps.totalRawBytesCommitted.String(),
ps.totalQualityAdjustedBytes.String(),
ps.totalQualityAdjustedBytesCommitted.String(),
ps.totalPledgeCollateral.String(),

ps.newQAPowerSmoothed.PositionEstimate.String(),
ps.newQAPowerSmoothed.VelocityEstimate.String(),

ps.minerCount,
ps.minerCountAboveMinimumPower,
); err != nil {
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err)
return xerrors.Errorf("close prepared chain_power: %w", err)
}

if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err)
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert chain_power from tmp: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err)
return xerrors.Errorf("commit chain_power tx: %w", err)
}

return nil
Expand Down
Loading