Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(chainwatch): use height to determine unsynced blocks and fix deadlock in sector deal table #3275

Merged
merged 8 commits into from
Aug 28, 2020
48 changes: 0 additions & 48 deletions cmd/lotus-chainwatch/processor/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip
log.Fatalw("Failed to persist market actors", "error", err)
}

// we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first
if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil {
close(p.sectorDealEvents)
return err
}

if err := p.updateMarket(ctx, marketChanges); err != nil {
log.Fatalw("Failed to update market actors", "error", err)
}
Expand Down Expand Up @@ -272,48 +266,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip

}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
start := time.Now()
defer func() {
Expand Down
13 changes: 0 additions & 13 deletions cmd/lotus-chainwatch/processor/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package processor
import (
"context"
"sync"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -120,10 +119,6 @@ func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[c
}

func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Receipts", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -164,10 +159,6 @@ create temp table recs (like receipts excluding constraints) on commit drop;
}

func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Message Inclusions", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -206,10 +197,6 @@ create temp table mi (like block_messages excluding constraints) on commit drop;
}

func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Messages", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
Expand Down
52 changes: 49 additions & 3 deletions cmd/lotus-chainwatch/processor/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
preCommitEvents := make(chan *MinerSectorsEvent, 8)
sectorEvents := make(chan *MinerSectorsEvent, 8)
partitionEvents := make(chan *MinerSectorsEvent, 8)
p.sectorDealEvents = make(chan *SectorDealEvent, 8)
dealEvents := make(chan *SectorDealEvent, 8)

grp.Go(func() error {
return p.storePreCommitDealInfo(dealEvents)
})

grp.Go(func() error {
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
Expand All @@ -280,9 +284,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
grp.Go(func() error {
defer func() {
close(preCommitEvents)
close(p.sectorDealEvents)
close(dealEvents)
}()
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents)
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents)
})

grp.Go(func() error {
Expand Down Expand Up @@ -911,6 +915,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
return tx.Commit()
}

func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}

stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}

for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}

if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}

if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}

if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil

}

func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now()
defer func() {
Expand Down
2 changes: 0 additions & 2 deletions cmd/lotus-chainwatch/processor/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ func (p *Processor) subMpool(ctx context.Context) {
msgs[v.Message.Message.Cid()] = &v.Message.Message
}

log.Debugf("Processing %d mpool updates", len(msgs))

err := p.storeMessages(msgs)
if err != nil {
log.Error(err)
Expand Down
11 changes: 1 addition & 10 deletions cmd/lotus-chainwatch/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type Processor struct {

// number of blocks processed at a time
batch int

// process communication channels
sectorDealEvents chan *SectorDealEvent
}

type ActorTips map[types.TipSetKey][]actorInfo
Expand Down Expand Up @@ -152,7 +149,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle market changes: %w", err)
return
}
log.Info("Processed Market Changes")
}()

grp.Add(1)
Expand All @@ -162,7 +158,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle miner changes: %w", err)
return
}
log.Info("Processed Miner Changes")
}()

grp.Add(1)
Expand All @@ -172,7 +167,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle reward changes: %w", err)
return
}
log.Info("Processed Reward Changes")
}()

grp.Add(1)
Expand All @@ -182,7 +176,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle power actor changes: %w", err)
return
}
log.Info("Processes Power Changes")
}()

grp.Add(1)
Expand All @@ -192,7 +185,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle message changes: %w", err)
return
}
log.Info("Processed Message Changes")
}()

grp.Add(1)
Expand All @@ -202,7 +194,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle common actor changes: %w", err)
return
}
log.Info("Processed CommonActor Changes")
}()

grp.Wait()
Expand All @@ -214,7 +205,7 @@ func (p *Processor) Start(ctx context.Context) {
if err := p.refreshViews(); err != nil {
log.Errorw("Failed to refresh views", "error", err)
}
log.Infow("Processed Batch", "duration", time.Since(loopStart).String())
log.Infow("Processed Batch Complete", "duration", time.Since(loopStart).String())
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-chainwatch/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var runCmd = &cli.Command{
}
db.SetMaxOpenConns(1350)

sync := syncer.NewSyncer(db, api)
sync := syncer.NewSyncer(db, api, 1400)
sync.Start(ctx)

proc := processor.NewProcessor(ctx, db, api, maxBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"database/sql"
"time"

"golang.org/x/xerrors"
)
Expand Down Expand Up @@ -65,11 +64,6 @@ func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error {
default:
}

t := time.Now()
defer func() {
log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String())
}()

_, err := db.Exec("refresh materialized view top_miners_by_base_reward;")
if err != nil {
return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-chainwatch/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (s *Scheduler) Start(ctx context.Context) {

go func() {
// run once on start after schema has initialized
time.Sleep(5 * time.Second)
time.Sleep(1 * time.Minute)
if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil {
log.Errorw("failed to refresh top miner", "error", err)
}
Expand Down
Loading