diff --git a/internal/session/sessionwants.go b/internal/session/sessionwants.go index 803e2e73..0d4ded01 100644 --- a/internal/session/sessionwants.go +++ b/internal/session/sessionwants.go @@ -8,15 +8,20 @@ import ( cid "github.com/ipfs/go-cid" ) +// liveWantsOrder and liveWants will get out of sync as blocks are received. +// This constant is the maximum amount to allow them to be out of sync before +// cleaning up the ordering array. +const liveWantsOrderGCLimit = 32 + // sessionWants keeps track of which cids are waiting to be sent out, and which // peers are "live" - ie, we've sent a request but haven't received a block yet type sessionWants struct { // The wants that have not yet been sent out toFetch *cidQueue // Wants that have been sent but have not received a response - liveWants *cidQueue - // The time at which live wants were sent - sentAt map[cid.Cid]time.Time + liveWants map[cid.Cid]time.Time + // The order in which wants were requested + liveWantsOrder []cid.Cid // The maximum number of want-haves to send in a broadcast broadcastLimit int } @@ -24,14 +29,13 @@ type sessionWants struct { func newSessionWants(broadcastLimit int) sessionWants { return sessionWants{ toFetch: newCidQueue(), - liveWants: newCidQueue(), - sentAt: make(map[cid.Cid]time.Time), + liveWants: make(map[cid.Cid]time.Time), broadcastLimit: broadcastLimit, } } func (sw *sessionWants) String() string { - return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), sw.liveWants.Len()) + return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants)) } // BlocksRequested is called when the client makes a request for blocks @@ -48,16 +52,17 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) { func (sw *sessionWants) GetNextWants() []cid.Cid { now := time.Now() - // Move CIDs from fetch queue to the live wants queue (up to the limit) - currentLiveCount := sw.liveWants.Len() + // Move CIDs from fetch queue to the live wants queue (up to the broadcast + // limit) + currentLiveCount := len(sw.liveWants) toAdd := sw.broadcastLimit - currentLiveCount var live []cid.Cid for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- { c := sw.toFetch.Pop() live = append(live, c) - sw.liveWants.Push(c) - sw.sentAt[c] = now + sw.liveWantsOrder = append(sw.liveWantsOrder, c) + sw.liveWants[c] = now } return live @@ -67,10 +72,10 @@ func (sw *sessionWants) GetNextWants() []cid.Cid { func (sw *sessionWants) WantsSent(ks []cid.Cid) { now := time.Now() for _, c := range ks { - if _, ok := sw.sentAt[c]; !ok && sw.toFetch.Has(c) { + if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) { sw.toFetch.Remove(c) - sw.liveWants.Push(c) - sw.sentAt[c] = now + sw.liveWantsOrder = append(sw.liveWantsOrder, c) + sw.liveWants[c] = now } } } @@ -85,24 +90,36 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) return wanted, totalLatency } + // Filter for blocks that were actually wanted (as opposed to duplicates) now := time.Now() for _, c := range ks { if sw.isWanted(c) { wanted = append(wanted, c) // Measure latency - sentAt, ok := sw.sentAt[c] + sentAt, ok := sw.liveWants[c] if ok && !sentAt.IsZero() { totalLatency += now.Sub(sentAt) } // Remove the CID from the live wants / toFetch queue - sw.liveWants.Remove(c) - delete(sw.sentAt, c) + delete(sw.liveWants, c) sw.toFetch.Remove(c) } } + // If the live wants ordering array is a long way out of sync with the + // live wants map, clean up the ordering array + if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit { + cleaned := sw.liveWantsOrder[:0] + for _, c := range sw.liveWantsOrder { + if _, ok := sw.liveWants[c]; ok { + cleaned = append(cleaned, c) + } + } + sw.liveWantsOrder = cleaned + } + return wanted, totalLatency } @@ -110,13 +127,20 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) // live want CIDs up to the broadcast limit. func (sw *sessionWants) PrepareBroadcast() []cid.Cid { now := time.Now() - live := sw.liveWants.Cids() - if len(live) > sw.broadcastLimit { - live = live[:sw.broadcastLimit] - } - for _, c := range live { - sw.sentAt[c] = now + live := make([]cid.Cid, 0, len(sw.liveWants)) + for _, c := range sw.liveWantsOrder { + if _, ok := sw.liveWants[c]; ok { + // No response was received for the want, so reset the sent time + // to now as we're about to broadcast + sw.liveWants[c] = now + + live = append(live, c) + if len(live) == sw.broadcastLimit { + break + } + } } + return live } @@ -129,18 +153,23 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) { // LiveWants returns a list of live wants func (sw *sessionWants) LiveWants() []cid.Cid { - return sw.liveWants.Cids() + live := make([]cid.Cid, 0, len(sw.liveWants)) + for c := range sw.liveWants { + live = append(live, c) + } + + return live } // RandomLiveWant returns a randomly selected live want func (sw *sessionWants) RandomLiveWant() cid.Cid { - if len(sw.sentAt) == 0 { + if len(sw.liveWants) == 0 { return cid.Cid{} } // picking a random live want - i := rand.Intn(len(sw.sentAt)) - for k := range sw.sentAt { + i := rand.Intn(len(sw.liveWants)) + for k := range sw.liveWants { if i == 0 { return k } @@ -151,12 +180,12 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid { // Has live wants indicates if there are any live wants func (sw *sessionWants) HasLiveWants() bool { - return sw.liveWants.Len() > 0 + return len(sw.liveWants) > 0 } // Indicates whether the want is in either of the fetch or live queues func (sw *sessionWants) isWanted(c cid.Cid) bool { - ok := sw.liveWants.Has(c) + _, ok := sw.liveWants[c] if !ok { ok = sw.toFetch.Has(c) } diff --git a/internal/session/sessionwants_test.go b/internal/session/sessionwants_test.go index 07c23a13..b6e6c94f 100644 --- a/internal/session/sessionwants_test.go +++ b/internal/session/sessionwants_test.go @@ -116,7 +116,7 @@ func TestPrepareBroadcast(t *testing.T) { // Add 6 new wants // toFetch Live // 543210 - sw.BlocksRequested(cids[0:6]) + sw.BlocksRequested(cids[:6]) // Get next wants with a limit of 3 // The first 3 cids should go move into the live queue @@ -139,7 +139,7 @@ func TestPrepareBroadcast(t *testing.T) { // One block received // Remove a cid from the live queue - sw.BlocksReceived(cids[0:1]) + sw.BlocksReceived(cids[:1]) // toFetch Live // 543 21_ @@ -167,3 +167,23 @@ func TestPrepareBroadcast(t *testing.T) { } } } + +// Test that even after GC broadcast returns correct wants +func TestPrepareBroadcastAfterGC(t *testing.T) { + sw := newSessionWants(5) + cids := testutil.GenerateCids(liveWantsOrderGCLimit * 2) + + sw.BlocksRequested(cids) + + // Trigger a sessionWants internal GC of the live wants + sw.BlocksReceived(cids[:liveWantsOrderGCLimit+1]) + cids = cids[:liveWantsOrderGCLimit+1] + + // Broadcast should contain wants in order + ws := sw.PrepareBroadcast() + for i, c := range ws { + if !c.Equals(cids[i]) { + t.Fatal("broadcast should always return wants in order") + } + } +}