diff --git a/dedupkey/dedupkey.go b/dedupkey/dedupkey.go new file mode 100644 index 00000000..0bbd5613 --- /dev/null +++ b/dedupkey/dedupkey.go @@ -0,0 +1,26 @@ +package dedupkey + +import ( + "github.com/ipfs/go-graphsync/ipldutil" + basicnode "github.com/ipld/go-ipld-prime/node/basic" +) + +// EncodeDedupKey returns encoded cbor data for string key +func EncodeDedupKey(key string) ([]byte, error) { + nb := basicnode.Style.String.NewBuilder() + err := nb.AssignString(key) + if err != nil { + return nil, err + } + nd := nb.Build() + return ipldutil.EncodeNode(nd) +} + +// DecodeDedupKey returns a string key decoded from cbor data +func DecodeDedupKey(data []byte) (string, error) { + nd, err := ipldutil.DecodeNode(data) + if err != nil { + return "", err + } + return nd.AsString() +} diff --git a/graphsync.go b/graphsync.go index 831f1737..4acdabd0 100644 --- a/graphsync.go +++ b/graphsync.go @@ -42,6 +42,10 @@ const ( // https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids") + // ExtensionDeDupByKey tells the responding peer to only deduplicate block sending + // for requests that have the same key. The data for the extension is a string key + ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key") + // GraphSync Response Status Codes // Informational Response Codes (partial) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 7fad0a12..70ebafa0 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -611,6 +611,71 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store") } +func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + + // initialize graphsync on second node to response to requests + _ = td.GraphSyncHost2() + + // alternate storing location for responder + altStore1 := make(map[ipld.Link][]byte) + altLoader1, altStorer1 := testutil.NewTestStore(altStore1) + + // alternate storing location for requestor + altStore2 := make(map[ipld.Link][]byte) + altLoader2, altStorer2 := testutil.NewTestStore(altStore2) + + err := requestor.RegisterPersistenceOption("chainstore1", altLoader1, altStorer1) + require.NoError(t, err) + + err = requestor.RegisterPersistenceOption("chainstore2", altLoader2, altStorer2) + require.NoError(t, err) + + blockChainLength := 100 + blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + + extensionName1 := graphsync.ExtensionName("blockchain1") + extension1 := graphsync.ExtensionData{ + Name: extensionName1, + Data: nil, + } + + extensionName2 := graphsync.ExtensionName("blockchain2") + extension2 := graphsync.ExtensionData{ + Name: extensionName2, + Data: nil, + } + + requestor.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { + _, has := requestData.Extension(extensionName1) + if has { + hookActions.UsePersistenceOption("chainstore1") + } + _, has = requestData.Extension(extensionName2) + if has { + hookActions.UsePersistenceOption("chainstore2") + } + }) + + progressChan1, errChan1 := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension1) + progressChan2, errChan2 := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension2) + + blockChain.VerifyWholeChain(ctx, progressChan1) + testutil.VerifyEmptyErrors(ctx, t, errChan1) + require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 1") + blockChain.VerifyWholeChain(ctx, progressChan2) + testutil.VerifyEmptyErrors(ctx, t, errChan2) + require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 2") + +} + // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work // under a specific of adverse conditions: // -- large blocks being returned by a query diff --git a/linktracker/linktracker.go b/linktracker/linktracker.go index eafe073c..673c6062 100644 --- a/linktracker/linktracker.go +++ b/linktracker/linktracker.go @@ -78,3 +78,8 @@ func (lt *LinkTracker) FinishRequest(requestID graphsync.RequestID) (hasAllBlock return } + +// Empty returns true if the link tracker is empty +func (lt *LinkTracker) Empty() bool { + return len(lt.missingBlocks) == 0 && len(lt.traversalsWithBlocksInProgress) == 0 +} diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 6acd39ef..58078b2f 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -15,6 +15,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" ipldutil "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/metadata" @@ -514,6 +515,18 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer. } request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...) hooksResult := rm.requestHooks.ProcessRequestHooks(p, request) + if hooksResult.PersistenceOption != "" { + dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption) + if err != nil { + return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err + } + request = request.ReplaceExtensions([]graphsync.ExtensionData{ + { + Name: graphsync.ExtensionDeDupByKey, + Data: dedupData, + }, + }) + } err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption) if err != nil { return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index ccd73a5d..c4325795 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -15,6 +15,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/requestmanager/hooks" @@ -646,6 +647,12 @@ func TestOutgoingRequestHooks(t *testing.T) { requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2) + dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey) + require.True(t, has) + key, err := dedupkey.DecodeDedupKey(dedupData) + require.NoError(t, err) + require.Equal(t, "chainstore", key) + md := metadataForBlocks(td.blockChain.AllBlocks(), true) mdEncoded, err := metadata.EncodeMetadata(md) require.NoError(t, err) diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 4212770d..92c6300f 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -42,6 +42,8 @@ type peerResponseSender struct { linkTrackerLk sync.RWMutex linkTracker *linktracker.LinkTracker + altTrackers map[string]*linktracker.LinkTracker + dedupKeys map[graphsync.RequestID]string responseBuildersLk sync.RWMutex responseBuilders []*responsebuilder.ResponseBuilder } @@ -50,6 +52,7 @@ type peerResponseSender struct { // a given peer across multiple requests. type PeerResponseSender interface { peermanager.PeerProcess + DedupKey(requestID graphsync.RequestID, key string) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) SendResponse( requestID graphsync.RequestID, @@ -90,6 +93,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa peerHandler: peerHandler, outgoingWork: make(chan struct{}, 1), linkTracker: linktracker.New(), + dedupKeys: make(map[graphsync.RequestID]string), + altTrackers: make(map[string]*linktracker.LinkTracker), } } @@ -98,10 +103,29 @@ func (prs *peerResponseSender) Startup() { go prs.run() } +func (prs *peerResponseSender) getLinkTracker(requestID graphsync.RequestID) *linktracker.LinkTracker { + key, ok := prs.dedupKeys[requestID] + if ok { + return prs.altTrackers[key] + } + return prs.linkTracker +} + +func (prs *peerResponseSender) DedupKey(requestID graphsync.RequestID, key string) { + prs.linkTrackerLk.Lock() + defer prs.linkTrackerLk.Unlock() + prs.dedupKeys[requestID] = key + _, ok := prs.altTrackers[key] + if !ok { + prs.altTrackers[key] = linktracker.New() + } +} + func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) { prs.linkTrackerLk.Lock() + linkTracker := prs.getLinkTracker(requestID) for _, link := range links { - prs.linkTracker.RecordLinkTraversal(requestID, link, true) + linkTracker.RecordLinkTraversal(requestID, link, true) } prs.linkTrackerLk.Unlock() } @@ -235,8 +259,9 @@ func (prs *peerResponseSender) setupBlockOperation(requestID graphsync.RequestID link ipld.Link, data []byte) blockOperation { hasBlock := data != nil prs.linkTrackerLk.Lock() - sendBlock := hasBlock && prs.linkTracker.BlockRefCount(link) == 0 - prs.linkTracker.RecordLinkTraversal(requestID, link, hasBlock) + linkTracker := prs.getLinkTracker(requestID) + sendBlock := hasBlock && linkTracker.BlockRefCount(link) == 0 + linkTracker.RecordLinkTraversal(requestID, link, hasBlock) prs.linkTrackerLk.Unlock() return blockOperation{ data, sendBlock, link, requestID, @@ -273,7 +298,16 @@ func (fo statusOperation) size() uint64 { func (prs *peerResponseSender) finishTracking(requestID graphsync.RequestID) bool { prs.linkTrackerLk.Lock() defer prs.linkTrackerLk.Unlock() - return prs.linkTracker.FinishRequest(requestID) + linkTracker := prs.getLinkTracker(requestID) + allBlocks := linkTracker.FinishRequest(requestID) + key, ok := prs.dedupKeys[requestID] + if ok { + delete(prs.dedupKeys, requestID) + if linkTracker.Empty() { + delete(prs.altTrackers, key) + } + } + return allBlocks } func (prs *peerResponseSender) setupFinishOperation(requestID graphsync.RequestID) statusOperation { @@ -295,9 +329,7 @@ func (prs *peerResponseSender) FinishRequest(requestID graphsync.RequestID) grap } func (prs *peerResponseSender) setupFinishWithErrOperation(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) statusOperation { - prs.linkTrackerLk.Lock() - prs.linkTracker.FinishRequest(requestID) - prs.linkTrackerLk.Unlock() + prs.finishTracking(requestID) return statusOperation{requestID, status} } diff --git a/responsemanager/peerresponsemanager/peerresponsesender_test.go b/responsemanager/peerresponsemanager/peerresponsesender_test.go index 902fa9af..d1688e5d 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender_test.go +++ b/responsemanager/peerresponsemanager/peerresponsesender_test.go @@ -415,6 +415,111 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) { require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message") } +func TestPeerResponseSenderDupKeys(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.RequestID(rand.Int31()) + requestID3 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + done := make(chan struct{}, 1) + sent := make(chan struct{}, 1) + fph := &fakePeerHandler{ + done: done, + sent: sent, + } + peerResponseSender := NewResponseSender(ctx, p, fph) + peerResponseSender.Startup() + + peerResponseSender.DedupKey(requestID1, "applesauce") + peerResponseSender.DedupKey(requestID3, "applesauce") + + bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) + require.Equal(t, links[0], bd.Link()) + require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize()) + require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire()) + testutil.AssertDoesReceive(ctx, t, sent, "did not send first message") + + require.Len(t, fph.lastBlocks, 1) + require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message") + + require.Len(t, fph.lastResponses, 1) + require.Equal(t, requestID1, fph.lastResponses[0].RequestID()) + require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status()) + + bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData()) + require.Equal(t, links[0], bd.Link()) + require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize()) + require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire()) + bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData()) + require.Equal(t, links[1], bd.Link()) + require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize()) + require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSizeOnWire()) + bd = peerResponseSender.SendResponse(requestID1, links[2], nil) + require.Equal(t, links[2], bd.Link()) + require.Equal(t, uint64(0), bd.BlockSize()) + require.Equal(t, uint64(0), bd.BlockSizeOnWire()) + + // let peer reponse manager know last message was sent so message sending can continue + done <- struct{}{} + + testutil.AssertDoesReceive(ctx, t, sent, "did not send second message") + + require.Len(t, fph.lastBlocks, 2) + require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not dedup blocks correctly on second message") + require.Equal(t, blks[1].Cid(), fph.lastBlocks[1].Cid(), "did not dedup blocks correctly on second message") + + require.Len(t, fph.lastResponses, 2, "did not send correct number of responses") + response1, err := findResponseForRequestID(fph.lastResponses, requestID1) + require.NoError(t, err) + require.Equal(t, graphsync.PartialResponse, response1.Status(), "did not send correct response code in second message") + response2, err := findResponseForRequestID(fph.lastResponses, requestID2) + require.NoError(t, err) + require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message") + + peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData()) + peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData()) + peerResponseSender.FinishRequest(requestID2) + + // let peer reponse manager know last message was sent so message sending can continue + done <- struct{}{} + + testutil.AssertDoesReceive(ctx, t, sent, "did not send third message") + + require.Equal(t, 2, len(fph.lastBlocks)) + testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3]) + testutil.AssertContainsBlock(t, fph.lastBlocks, blks[4]) + + require.Len(t, fph.lastResponses, 2, "did not send correct number of responses") + response2, err = findResponseForRequestID(fph.lastResponses, requestID2) + require.NoError(t, err) + require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message") + response3, err := findResponseForRequestID(fph.lastResponses, requestID3) + require.NoError(t, err) + require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not send correct response code in third message") + + peerResponseSender.SendResponse(requestID3, links[0], blks[0].RawData()) + peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData()) + + // let peer reponse manager know last message was sent so message sending can continue + done <- struct{}{} + + testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message") + + require.Len(t, fph.lastBlocks, 0) + + require.Len(t, fph.lastResponses, 1) + require.Equal(t, requestID3, fph.lastResponses[0].RequestID()) + require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status()) + +} + func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) { for _, response := range responses { if response.RequestID() == requestID { diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 9625ad6e..89212cd6 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/responsemanager/hooks" @@ -138,6 +139,9 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context, if transactionError != nil { return nil, nil, false, transactionError } + if err := qe.processDedupByKey(request, peerResponseSender); err != nil { + return nil, nil, false, err + } if err := qe.processDoNoSendCids(request, peerResponseSender); err != nil { return nil, nil, false, err } @@ -154,6 +158,20 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context, return loader, traverser, isPaused, nil } +func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error { + dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey) + if !has { + return nil + } + key, err := dedupkey.DecodeDedupKey(dedupData) + if err != nil { + peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) + return err + } + peerResponseSender.DedupKey(request.ID(), key) + return nil +} + func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error { doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) if !has { diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index d9e89d3c..6ed75120 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager" @@ -114,6 +115,7 @@ type fakePeerResponseSender struct { pausedRequests chan pausedRequest cancelledRequests chan cancelledRequest ignoredLinks chan []ipld.Link + dedupKeys chan string } func (fprs *fakePeerResponseSender) Startup() {} @@ -128,6 +130,10 @@ func (fprs *fakePeerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, fprs.ignoredLinks <- links } +func (fprs *fakePeerResponseSender) DedupKey(requestID graphsync.RequestID, key string) { + fprs.dedupKeys <- key +} + func (fbd fakeBlkData) Link() ipld.Link { return fbd.link } @@ -556,6 +562,31 @@ func TestValidationAndExtensions(t *testing.T) { require.True(t, set.Has(link.(cidlink.Link).Cid)) } }) + t.Run("dedup-by-key extension", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + data, err := dedupkey.EncodeDedupKey("applesauce") + require.NoError(t, err) + requests := []gsmsg.GraphSyncRequest{ + gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), + graphsync.ExtensionData{ + Name: graphsync.ExtensionDeDupByKey, + Data: data, + }), + } + responseManager.ProcessRequests(td.ctx, td.p, requests) + var lastRequest completedRequest + testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request") + require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed") + var dedupKey string + testutil.AssertReceive(td.ctx, t, td.dedupKeys, &dedupKey, "should dedup by key") + require.Equal(t, dedupKey, "applesauce") + }) t.Run("test pause/resume", func(t *testing.T) { td := newTestData(t) defer td.cancel() @@ -931,6 +962,7 @@ type testData struct { pausedRequests chan pausedRequest cancelledRequests chan cancelledRequest ignoredLinks chan []ipld.Link + dedupKeys chan string peerManager *fakePeerManager queryQueue *fakeQueryQueue extensionData []byte @@ -968,6 +1000,7 @@ func newTestData(t *testing.T) testData { td.pausedRequests = make(chan pausedRequest, 1) td.cancelledRequests = make(chan cancelledRequest, 1) td.ignoredLinks = make(chan []ipld.Link, 1) + td.dedupKeys = make(chan string, 1) fprs := &fakePeerResponseSender{ lastCompletedRequest: td.completedRequestChan, sentResponses: td.sentResponses, @@ -975,6 +1008,7 @@ func newTestData(t *testing.T) testData { pausedRequests: td.pausedRequests, cancelledRequests: td.cancelledRequests, ignoredLinks: td.ignoredLinks, + dedupKeys: td.dedupKeys, } td.peerManager = &fakePeerManager{peerResponseSender: fprs} td.queryQueue = &fakeQueryQueue{}