From 17e7892036a696b9a349eb79b42cccbc4a6a48f1 Mon Sep 17 00:00:00 2001 From: chenminjian <727180553@qq.com> Date: Fri, 2 Nov 2018 16:27:05 +0800 Subject: [PATCH] enhance(cmd/verify): add goroutine count to improve verify speed License: MIT Signed-off-by: chenminjian <727180553@qq.com> --- core/commands/repo.go | 56 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/core/commands/repo.go b/core/commands/repo.go index ecca547f256..74a6204d120 100644 --- a/core/commands/repo.go +++ b/core/commands/repo.go @@ -2,12 +2,15 @@ package commands import ( "bytes" + "context" "errors" "fmt" "io" "os" "path/filepath" + "runtime" "strings" + "sync" "text/tabwriter" oldcmds "github.com/ipfs/go-ipfs/commands" @@ -276,6 +279,48 @@ type VerifyProgress struct { Progress int } +func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) { + defer wg.Done() + + for k := range keys { + _, err := bs.Get(k) + if err != nil { + select { + case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err): + case <-ctx.Done(): + return + } + + continue + } + + select { + case results <- "": + case <-ctx.Done(): + return + } + } +} + +func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string { + results := make(chan string) + + go func() { + defer close(results) + + var wg sync.WaitGroup + + for i := 0; i < runtime.NumCPU()*2; i++ { + wg.Add(1) + go verifyWorkerRun(ctx, &wg, keys, results, bs) + } + + wg.Wait() + }() + + return results +} + var repoVerifyCmd = &oldcmds.Command{ Helptext: cmdkit.HelpText{ Tagline: "Verify all blocks in repo are not corrupted.", @@ -300,15 +345,14 @@ var repoVerifyCmd = &oldcmds.Command{ return } + results := verifyResultChan(req.Context(), keys, bs) + var fails int var i int - for k := range keys { - _, err := bs.Get(k) - if err != nil { + for msg := range results { + if msg != "" { select { - case out <- &VerifyProgress{ - Msg: fmt.Sprintf("block %s was corrupt (%s)", k, err), - }: + case out <- &VerifyProgress{Msg: msg}: case <-req.Context().Done(): return }