diff --git a/dual/dual.go b/dual/dual.go index b839e43c4..0db05868d 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -97,8 +97,9 @@ func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { reqCtx, cancel := context.WithCancel(ctx) outCh := make(chan peer.AddrInfo) - wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count) - lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count) + subCtx, errCh := routing.RegisterForQueryEvents(reqCtx) + wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count) + lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count) zeroCount := (count == 0) go func() { defer cancel() @@ -106,9 +107,15 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) found := make(map[peer.ID]struct{}, count) var pi peer.AddrInfo + var qEv *routing.QueryEvent for (zeroCount || count > 0) && (wanCh != nil || lanCh != nil) { var ok bool select { + case qEv, ok = <-errCh: + if ok && qEv != nil && qEv.Type != routing.QueryError { + routing.PublishQueryEvent(reqCtx, qEv) + } + continue case pi, ok = <-wanCh: if !ok { wanCh = nil @@ -133,6 +140,9 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) return } } + if qEv != nil && qEv.Type == routing.QueryError && len(found) == 0 { + routing.PublishQueryEvent(reqCtx, qEv) + } }() return outCh }