From b7be601533f0c5a72d8b4c25331c17630d915e7e Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 29 Sep 2021 15:22:01 -0700 Subject: [PATCH 1/5] feat(responseassembler): add response skipping --- .../responseassembler/peerlinktracker.go | 36 ++++++--- .../responseassembler/responseBuilder.go | 4 +- .../responseassembler/responseassembler.go | 5 ++ .../responseassembler_test.go | 78 +++++++++++++++++++ 4 files changed, 110 insertions(+), 13 deletions(-) diff --git a/responsemanager/responseassembler/peerlinktracker.go b/responsemanager/responseassembler/peerlinktracker.go index 3d3c5bbe..3bea04c3 100644 --- a/responsemanager/responseassembler/peerlinktracker.go +++ b/responsemanager/responseassembler/peerlinktracker.go @@ -10,17 +10,21 @@ import ( ) type peerLinkTracker struct { - linkTrackerLk sync.RWMutex - linkTracker *linktracker.LinkTracker - altTrackers map[string]*linktracker.LinkTracker - dedupKeys map[graphsync.RequestID]string + linkTrackerLk sync.RWMutex + linkTracker *linktracker.LinkTracker + altTrackers map[string]*linktracker.LinkTracker + dedupKeys map[graphsync.RequestID]string + blockSentCount map[graphsync.RequestID]int64 + skipFirstBlocks map[graphsync.RequestID]int64 } func newTracker() *peerLinkTracker { return &peerLinkTracker{ - linkTracker: linktracker.New(), - dedupKeys: make(map[graphsync.RequestID]string), - altTrackers: make(map[string]*linktracker.LinkTracker), + linkTracker: linktracker.New(), + dedupKeys: make(map[graphsync.RequestID]string), + altTrackers: make(map[string]*linktracker.LinkTracker), + blockSentCount: make(map[graphsync.RequestID]int64), + skipFirstBlocks: make(map[graphsync.RequestID]int64), } } @@ -54,6 +58,12 @@ func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links [] prs.linkTrackerLk.Unlock() } +func (prs *peerLinkTracker) SkipFirstBlocks(requestID graphsync.RequestID, blocksToSkip int64) { + prs.linkTrackerLk.Lock() + prs.skipFirstBlocks[requestID] = blocksToSkip + prs.linkTrackerLk.Unlock() +} + // FinishTracking clears link tracking data for the request. func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool { prs.linkTrackerLk.Lock() @@ -74,16 +84,20 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool { delete(prs.altTrackers, key) } } + delete(prs.blockSentCount, requestID) + delete(prs.skipFirstBlocks, requestID) return allBlocks } // RecordLinkTraversal records whether a link is found for a request. func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID, - link ipld.Link, hasBlock bool) (isUnique bool) { + link ipld.Link, hasBlock bool) bool { prs.linkTrackerLk.Lock() + defer prs.linkTrackerLk.Unlock() + prs.blockSentCount[requestID]++ + notSkipped := prs.skipFirstBlocks[requestID] < prs.blockSentCount[requestID] linkTracker := prs.getLinkTracker(requestID) - isUnique = linkTracker.BlockRefCount(link) == 0 + isUnique := linkTracker.BlockRefCount(link) == 0 linkTracker.RecordLinkTraversal(requestID, link, hasBlock) - prs.linkTrackerLk.Unlock() - return + return hasBlock && notSkipped && isUnique } diff --git a/responsemanager/responseassembler/responseBuilder.go b/responsemanager/responseassembler/responseBuilder.go index 7ccee6cf..7752813c 100644 --- a/responsemanager/responseassembler/responseBuilder.go +++ b/responsemanager/responseassembler/responseBuilder.go @@ -60,9 +60,9 @@ func (rb *responseBuilder) AddNotifee(notifee notifications.Notifee) { func (rb *responseBuilder) setupBlockOperation( link ipld.Link, data []byte) blockOperation { hasBlock := data != nil - isUnique := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock) + send := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock) return blockOperation{ - data, hasBlock && isUnique, link, rb.requestID, + data, send, link, rb.requestID, } } diff --git a/responsemanager/responseassembler/responseassembler.go b/responsemanager/responseassembler/responseassembler.go index 761f744d..8a6ebc02 100644 --- a/responsemanager/responseassembler/responseassembler.go +++ b/responsemanager/responseassembler/responseassembler.go @@ -86,6 +86,11 @@ func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Request ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links) } +// SkipFirstBlocks tells the assembler for the given request to not send the first N blocks +func (ra *ResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipFirstBlocks int64) { + ra.GetProcess(p).(*peerLinkTracker).SkipFirstBlocks(requestID, skipFirstBlocks) +} + // Transaction builds a response, and queues it for sending in the next outgoing message func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error { rb := &responseBuilder{ diff --git a/responsemanager/responseassembler/responseassembler_test.go b/responsemanager/responseassembler/responseassembler_test.go index e6b644ba..6ce8710c 100644 --- a/responsemanager/responseassembler/responseassembler_test.go +++ b/responsemanager/responseassembler/responseassembler_test.go @@ -259,6 +259,84 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) { } +func TestResponseAssemblerSkipFirstBlocks(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + fph := newFakePeerHandler(ctx, t) + responseAssembler := New(ctx, fph) + + responseAssembler.SkipFirstBlocks(p, requestID1, 3) + + var bd1, bd2, bd3, bd4, bd5 graphsync.BlockData + err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error { + bd1 = b.SendResponse(links[0], blks[0].RawData()) + return nil + }) + require.NoError(t, err) + + assertSentNotOnWire(t, bd1, blks[0]) + fph.RefuteBlocks() + fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse}) + + err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error { + bd1 = b.SendResponse(links[0], blks[0].RawData()) + return nil + }) + require.NoError(t, err) + fph.AssertResponses(expectedResponses{ + requestID2: graphsync.PartialResponse, + }) + + err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error { + bd2 = b.SendResponse(links[1], blks[1].RawData()) + bd3 = b.SendResponse(links[2], blks[2].RawData()) + return nil + }) + require.NoError(t, err) + + assertSentNotOnWire(t, bd1, blks[0]) + assertSentNotOnWire(t, bd2, blks[1]) + assertSentNotOnWire(t, bd3, blks[2]) + + fph.RefuteBlocks() + fph.AssertResponses(expectedResponses{ + requestID1: graphsync.PartialResponse, + }) + err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error { + bd4 = b.SendResponse(links[3], blks[3].RawData()) + bd5 = b.SendResponse(links[4], blks[4].RawData()) + b.FinishRequest() + return nil + }) + require.NoError(t, err) + + assertSentOnWire(t, bd4, blks[3]) + assertSentOnWire(t, bd5, blks[4]) + + fph.AssertBlocks(blks[3], blks[4]) + fph.AssertResponses(expectedResponses{requestID1: graphsync.RequestCompletedFull}) + + err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error { + b.SendResponse(links[3], blks[3].RawData()) + b.FinishRequest() + return nil + }) + require.NoError(t, err) + + fph.AssertBlocks(blks[3]) + fph.AssertResponses(expectedResponses{requestID2: graphsync.RequestCompletedFull}) + +} + func TestResponseAssemblerDupKeys(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) From a4f1ac3a93a23d703c2d715f3335493de6ed66c9 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 29 Sep 2021 15:42:59 -0700 Subject: [PATCH 2/5] feat(responsemanager): process do not send first blocks define extension and add processing to response manager --- donotsendfirstblocks/donotsendfirstblocks.go | 28 +++++++++++++++ graphsync.go | 4 +++ responsemanager/client.go | 1 + responsemanager/querypreparer.go | 22 ++++++++++++ responsemanager/responsemanager_test.go | 37 ++++++++++++++++++++ 5 files changed, 92 insertions(+) create mode 100644 donotsendfirstblocks/donotsendfirstblocks.go diff --git a/donotsendfirstblocks/donotsendfirstblocks.go b/donotsendfirstblocks/donotsendfirstblocks.go new file mode 100644 index 00000000..a6ee8a42 --- /dev/null +++ b/donotsendfirstblocks/donotsendfirstblocks.go @@ -0,0 +1,28 @@ +package donotsendfirstblocks + +import ( + basicnode "github.com/ipld/go-ipld-prime/node/basic" + + "github.com/ipfs/go-graphsync/ipldutil" +) + +// EncodeDoNotSendFirstBlocks returns encoded cbor data for the given number +// of blocks to skip +func EncodeDoNotSendFirstBlocks(skipBlockCount int64) ([]byte, error) { + nb := basicnode.Prototype.Int.NewBuilder() + err := nb.AssignInt(skipBlockCount) + if err != nil { + return nil, err + } + nd := nb.Build() + return ipldutil.EncodeNode(nd) +} + +// DecodeDoNotSendFirstBlocks returns the number of blocks to skip +func DecodeDoNotSendFirstBlocks(data []byte) (int64, error) { + nd, err := ipldutil.DecodeNode(data) + if err != nil { + return 0, err + } + return nd.AsInt() +} diff --git a/graphsync.go b/graphsync.go index 599d98bc..17f2dc07 100644 --- a/graphsync.go +++ b/graphsync.go @@ -45,6 +45,10 @@ const ( // https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids") + // ExtensionsDoNotSendFirstBlocks tells the responding peer not to wait till the given + // number of blocks have been traversed before it begins to send blocks over the wire + ExtensionsDoNotSendFirstBlocks = ExtensionName("graphsync/do-not-send-first-blocks") + // ExtensionDeDupByKey tells the responding peer to only deduplicate block sending // for requests that have the same key. The data for the extension is a string key ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key") diff --git a/responsemanager/client.go b/responsemanager/client.go index d0a20dba..6f2cd63c 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -126,6 +126,7 @@ type NetworkErrorListeners interface { type ResponseAssembler interface { DedupKey(p peer.ID, requestID graphsync.RequestID, key string) IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) + SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64) Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error } diff --git a/responsemanager/querypreparer.go b/responsemanager/querypreparer.go index 811e285f..be12c43b 100644 --- a/responsemanager/querypreparer.go +++ b/responsemanager/querypreparer.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" @@ -62,6 +63,9 @@ func (qe *queryPreparer) prepareQuery(ctx context.Context, if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { return nil, nil, false, err } + if err := qe.processDoNotSendFirstBlocks(request, p, failNotifee); err != nil { + return nil, nil, false, err + } rootLink := cidlink.Link{Cid: request.Root()} linkSystem := result.CustomLinkSystem if linkSystem.StorageReadOpener == nil { @@ -120,3 +124,21 @@ func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p qe.responseAssembler.IgnoreBlocks(p, request.ID(), links) return nil } + +func (qe *queryPreparer) processDoNotSendFirstBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) + if !has { + return nil + } + skipCount, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) + if err != nil { + _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + return nil + }) + return err + } + qe.responseAssembler.SkipFirstBlocks(p, request.ID(), skipCount) + return nil +} diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 0d0c04f9..ef5267f1 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" @@ -348,6 +349,29 @@ func TestValidationAndExtensions(t *testing.T) { td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertIgnoredCids(set) }) + + t.Run("do-not-send-first-blocks extension", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + data, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(4) + require.NoError(t, err) + requests := []gsmsg.GraphSyncRequest{ + gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), + graphsync.ExtensionData{ + Name: graphsync.ExtensionsDoNotSendFirstBlocks, + Data: data, + }), + } + responseManager.ProcessRequests(td.ctx, td.p, requests) + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) + td.assertSkippedFirstBlocks(4) + }) + t.Run("dedup-by-key extension", func(t *testing.T) { td := newTestData(t) defer td.cancel() @@ -797,6 +821,7 @@ type fakeResponseAssembler struct { pausedRequests chan pausedRequest clearedRequests chan clearedRequest ignoredLinks chan []ipld.Link + skippedFirstBlocks chan int64 notifeePublisher *testutil.MockPublisher dedupKeys chan string missingBlock bool @@ -812,6 +837,9 @@ func (fra *fakeResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Re fra.ignoredLinks <- links } +func (fra *fakeResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64) { + fra.skippedFirstBlocks <- skipCount +} func (fra *fakeResponseAssembler) DedupKey(p peer.ID, requestID graphsync.RequestID, key string) { fra.dedupKeys <- key } @@ -947,6 +975,7 @@ type testData struct { pausedRequests chan pausedRequest clearedRequests chan clearedRequest ignoredLinks chan []ipld.Link + skippedFirstBlocks chan int64 dedupKeys chan string responseAssembler *fakeResponseAssembler queryQueue *fakeQueryQueue @@ -995,6 +1024,7 @@ func newTestData(t *testing.T) testData { td.pausedRequests = make(chan pausedRequest, 1) td.clearedRequests = make(chan clearedRequest, 1) td.ignoredLinks = make(chan []ipld.Link, 1) + td.skippedFirstBlocks = make(chan int64, 1) td.dedupKeys = make(chan string, 1) td.blockSends = make(chan graphsync.BlockData, td.blockChainLength*2) td.completedResponseStatuses = make(chan graphsync.ResponseStatusCode, 1) @@ -1007,6 +1037,7 @@ func newTestData(t *testing.T) testData { pausedRequests: td.pausedRequests, clearedRequests: td.clearedRequests, ignoredLinks: td.ignoredLinks, + skippedFirstBlocks: td.skippedFirstBlocks, dedupKeys: td.dedupKeys, notifeePublisher: td.notifeePublisher, } @@ -1170,6 +1201,12 @@ func (td *testData) assertIgnoredCids(set *cid.Set) { } } +func (td *testData) assertSkippedFirstBlocks(expectedSkipCount int64) { + var skippedFirstBlocks int64 + testutil.AssertReceive(td.ctx, td.t, td.skippedFirstBlocks, &skippedFirstBlocks, "should skip blocks") + require.Equal(td.t, expectedSkipCount, skippedFirstBlocks) +} + func (td *testData) notifyStatusMessagesSent() { td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool { _, isSn := data.(graphsync.ResponseStatusCode) From 03f28ec3690c9880a014bcdc4df810224cb14846 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 29 Sep 2021 15:47:33 -0700 Subject: [PATCH 3/5] test(impl): add integration test add test demonstrating do not send first blocks flow --- impl/graphsync_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index c975b4e5..004b03f4 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -40,6 +40,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" gsmsg "github.com/ipfs/go-graphsync/message" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" @@ -331,6 +332,55 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { require.Equal(t, blockChainLength-set.Len(), totalSentOnWire) } +func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + + // setup receiving peer to just record message coming in + blockChainLength := 100 + blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) + + // store blocks locally + firstHalf := blockChain.Blocks(0, 50) + for _, blk := range firstHalf { + td.blockStore1[cidlink.Link{Cid: blk.Cid()}] = blk.RawData() + } + + doNotSendFirstBlocksData, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(50) + require.NoError(t, err) + extension := graphsync.ExtensionData{ + Name: graphsync.ExtensionsDoNotSendFirstBlocks, + Data: doNotSendFirstBlocksData, + } + + // initialize graphsync on second node to response to requests + responder := td.GraphSyncHost2() + + totalSent := 0 + totalSentOnWire := 0 + responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) { + totalSent++ + if blockData.BlockSizeOnWire() > 0 { + totalSentOnWire++ + } + }) + + progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension) + + blockChain.VerifyWholeChain(ctx, progressChan) + testutil.VerifyEmptyErrors(ctx, t, errChan) + require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + + require.Equal(t, blockChainLength, totalSent) + require.Equal(t, blockChainLength-50, totalSentOnWire) +} + func TestPauseResume(t *testing.T) { // create network ctx := context.Background() From 3c695b776178ed2a3348b15cb760e90892ad1d1f Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Wed, 29 Sep 2021 16:54:32 -0700 Subject: [PATCH 4/5] Update donotsendfirstblocks/donotsendfirstblocks.go Co-authored-by: Will --- donotsendfirstblocks/donotsendfirstblocks.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/donotsendfirstblocks/donotsendfirstblocks.go b/donotsendfirstblocks/donotsendfirstblocks.go index a6ee8a42..629533a2 100644 --- a/donotsendfirstblocks/donotsendfirstblocks.go +++ b/donotsendfirstblocks/donotsendfirstblocks.go @@ -9,12 +9,7 @@ import ( // EncodeDoNotSendFirstBlocks returns encoded cbor data for the given number // of blocks to skip func EncodeDoNotSendFirstBlocks(skipBlockCount int64) ([]byte, error) { - nb := basicnode.Prototype.Int.NewBuilder() - err := nb.AssignInt(skipBlockCount) - if err != nil { - return nil, err - } - nd := nb.Build() + nd := basicnode.NewInt(skipBlockCount) return ipldutil.EncodeNode(nd) } From 0127c2beef7331907369641d5c7b243cc5d29e63 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Wed, 29 Sep 2021 16:54:51 -0700 Subject: [PATCH 5/5] Update impl/graphsync_test.go Co-authored-by: Will --- impl/graphsync_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 004b03f4..1ec45dce 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -334,8 +334,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { // create network - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() td := newGsTestData(ctx, t)