Skip to content

Commit

Permalink
shutdown deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored and blxdyx committed Sep 13, 2023
1 parent a7f5fdb commit a3b0611
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 73 deletions.
201 changes: 162 additions & 39 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package downloader

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
Expand All @@ -33,6 +34,7 @@ import (
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -65,7 +67,8 @@ type AggStats struct {
Completed bool
Progress float32

BytesCompleted, BytesTotal uint64
BytesCompleted, BytesTotal uint64
DroppedCompleted, DroppedTotal atomic.Uint64

BytesDownload, BytesUpload uint64
UploadRate, DownloadRate uint64
Expand Down Expand Up @@ -113,7 +116,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) {

statsLock: &sync.RWMutex{},
}
if err := d.addSegments(); err != nil {
if err := d.addSegments(ctx); err != nil {
return nil, err
}
return d, nil
Expand All @@ -124,36 +127,110 @@ func (d *Downloader) MainLoopInBackground(ctx context.Context, silent bool) {
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.mainLoop(ctx, silent)
if err := d.mainLoop(ctx, silent); err != nil {
if !errors.Is(err, context.Canceled) {
log.Warn("[snapshots]", "err", err)
}
}
}()
}

func (d *Downloader) mainLoop(ctx context.Context, silent bool) {
func (d *Downloader) mainLoop(ctx context.Context, silent bool) error {
var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots))

d.wg.Add(1)
go func() {
for {
torrents := d.Torrent().Torrents()
for _, t := range torrents {
<-t.GotInfo()
if t.Complete.Bool() {
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
defer d.wg.Done()

// Torrents that are already taken care of
torrentMap := map[metainfo.Hash]struct{}{}
// First loop drops torrents that were downloaded or are already complete
// This improves efficiency of download by reducing number of active torrent (empirical observation)
DownloadLoop:
torrents := d.Torrent().Torrents()
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}
if t.Complete.Bool() {
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
torrentMap[t.InfoHash()] = struct{}{}
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
return
}
t.AllowDataDownload()
t.DownloadAll()
torrentMap[t.InfoHash()] = struct{}{}
d.wg.Add(1)
go func(t *torrent.Torrent) {
defer d.wg.Done()
defer sem.Release(1)
select {
case <-ctx.Done():
return
case <-t.Complete.On():
}
t.AllowDataDownload()
t.DownloadAll()
go func(t *torrent.Torrent) {
defer sem.Release(1)
//r := t.NewReader()
//r.SetReadahead(t.Length())
//_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download

<-t.Complete.On()
}(t)
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
}(t)
}
if len(torrents) != len(d.Torrent().Torrents()) { //if amount of torrents changed - keep downloading
goto DownloadLoop
}

if err := d.addSegments(ctx); err != nil {
return
}
DownloadLoop2:
torrents = d.Torrent().Torrents()
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}
time.Sleep(30 * time.Second)
if t.Complete.Bool() {
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
torrentMap[t.InfoHash()] = struct{}{}
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
return
}
t.AllowDataDownload()
t.DownloadAll()
torrentMap[t.InfoHash()] = struct{}{}
d.wg.Add(1)
go func(t *torrent.Torrent) {
defer d.wg.Done()
defer sem.Release(1)
select {
case <-ctx.Done():
return
case <-t.Complete.On():
}
d.stats.DroppedCompleted.Add(uint64(t.BytesCompleted()))
d.stats.DroppedTotal.Add(uint64(t.Length()))
//t.Drop()
}(t)
}
if len(torrents) != len(d.Torrent().Torrents()) { //if amount of torrents changed - keep downloading
goto DownloadLoop2
}
}()

Expand All @@ -168,7 +245,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) {
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-statEvery.C:
d.ReCalcStats(statInterval)

Expand All @@ -179,11 +256,6 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) {

stats := d.Stats()

if stats.MetadataReady < stats.FilesTotal {
log.Info(fmt.Sprintf("[snapshots] Waiting for torrents metadata: %d/%d", stats.MetadataReady, stats.FilesTotal))
continue
}

if stats.Completed {
if justCompleted {
justCompleted = false
Expand Down Expand Up @@ -237,7 +309,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
stats.BytesDownload = uint64(connStats.BytesReadUsefulIntendedData.Int64())
stats.BytesUpload = uint64(connStats.BytesWrittenData.Int64())

stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = 0, 0, 0, 0
stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = stats.DroppedTotal.Load(), stats.DroppedCompleted.Load(), 0, 0
for _, t := range torrents {
select {
case <-t.GotInfo():
Expand Down Expand Up @@ -351,7 +423,9 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
logInterval := 20 * time.Second
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
d.wg.Add(1)
go func() {
defer d.wg.Done()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -380,7 +454,47 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil })
}

func (d *Downloader) addSegments() error {
func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *prototypes.H160, snapDir string) (bool, error) {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
if hash == nil {
return false, nil
}
infoHash := Proto2InfoHash(hash)
//log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)

if _, ok := d.torrentClient.Torrent(infoHash); ok {
//log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
return true, nil
}

magnet := mi.Magnet(&infoHash, nil)
t, err := d.torrentClient.AddMagnet(magnet.String())
if err != nil {
//log.Warn("[downloader] add magnet link", "err", err)
return false, err
}
t.DisallowDataDownload()
t.AllowDataUpload()
d.wg.Add(1)
go func(t *torrent.Torrent) {
defer d.wg.Done()
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(snapDir, t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
}
}(t)
//log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return false, nil
}

func (d *Downloader) addSegments(ctx context.Context) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
_, err := BuildTorrentFilesIfNeed(context.Background(), d.SnapDir())
Expand All @@ -396,27 +510,36 @@ func (d *Downloader) addSegments() error {
return fmt.Errorf("seedableHistorySnapshots: %w", err)
}
files = append(files, files2...)
wg := &sync.WaitGroup{}

g, ctx := errgroup.WithContext(ctx)
i := atomic.Int64{}
for _, f := range files {
wg.Add(1)
go func(f string) {
defer wg.Done()
f := f
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, err := AddSegment(f, d.cfg.DataDir, d.torrentClient)
if err != nil {
log.Warn("[snapshots] AddSegment", "err", err)
return
return err
}

i.Add(1)
select {
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
log.Info("[snpshots] initializing", "files", fmt.Sprintf("%d/%d", i.Load(), len(files)))
default:
}
}(f)
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
wg.Wait()
return nil
}

Expand Down
37 changes: 3 additions & 34 deletions downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
snapDir := s.d.SnapDir()
for i, it := range request.Items {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-logEvery.C:
log.Info("[snapshots] initializing", "files", fmt.Sprintf("%d/%d", i, len(request.Items)))
default:
Expand All @@ -72,7 +74,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
continue
}

_, err := createMagnetLinkWithInfoHash(it.TorrentHash, torrentClient, snapDir)
_, err := s.d.createMagnetLinkWithInfoHash(ctx, it.TorrentHash, snapDir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,36 +139,3 @@ func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.C
}

// we dont have .seg or .torrent so we get them through the torrent hash
func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent.Client, snapDir string) (bool, error) {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
if hash == nil {
return false, nil
}
infoHash := Proto2InfoHash(hash)
//log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)

if _, ok := torrentClient.Torrent(infoHash); ok {
//log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
return true, nil
}

magnet := mi.Magnet(&infoHash, nil)
t, err := torrentClient.AddMagnet(magnet.String())
if err != nil {
//log.Warn("[downloader] add magnet link", "err", err)
return false, err
}
t.DisallowDataDownload()
t.AllowDataUpload()
go func(t *torrent.Torrent) {
<-t.GotInfo()

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(snapDir, t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
}
}(t)
//log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return false, nil
}

0 comments on commit a3b0611

Please sign in to comment.