Skip to content

Commit

Permalink
Dedup by key extension (#83)
Browse files Browse the repository at this point in the history
* feat(responsemanager): dedup-by-key extension

add deduping requests by a given key to the response manager

* feat(requestmanager): use dedup extension

use dedup extension to not dedup data across stores
  • Loading branch information
hannahhoward authored Aug 13, 2020
1 parent 7dbe280 commit 8e60042
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 7 deletions.
26 changes: 26 additions & 0 deletions dedupkey/dedupkey.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dedupkey

import (
"github.com/ipfs/go-graphsync/ipldutil"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

// EncodeDedupKey returns encoded cbor data for string key
func EncodeDedupKey(key string) ([]byte, error) {
nb := basicnode.Style.String.NewBuilder()
err := nb.AssignString(key)
if err != nil {
return nil, err
}
nd := nb.Build()
return ipldutil.EncodeNode(nd)
}

// DecodeDedupKey returns a string key decoded from cbor data
func DecodeDedupKey(data []byte) (string, error) {
nd, err := ipldutil.DecodeNode(data)
if err != nil {
return "", err
}
return nd.AsString()
}
4 changes: 4 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const (
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids")

// ExtensionDeDupByKey tells the responding peer to only deduplicate block sending
// for requests that have the same key. The data for the extension is a string key
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")

// GraphSync Response Status Codes

// Informational Response Codes (partial)
Expand Down
65 changes: 65 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,71 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store")
}

func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// initialize graphsync on second node to response to requests
_ = td.GraphSyncHost2()

// alternate storing location for responder
altStore1 := make(map[ipld.Link][]byte)
altLoader1, altStorer1 := testutil.NewTestStore(altStore1)

// alternate storing location for requestor
altStore2 := make(map[ipld.Link][]byte)
altLoader2, altStorer2 := testutil.NewTestStore(altStore2)

err := requestor.RegisterPersistenceOption("chainstore1", altLoader1, altStorer1)
require.NoError(t, err)

err = requestor.RegisterPersistenceOption("chainstore2", altLoader2, altStorer2)
require.NoError(t, err)

blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

extensionName1 := graphsync.ExtensionName("blockchain1")
extension1 := graphsync.ExtensionData{
Name: extensionName1,
Data: nil,
}

extensionName2 := graphsync.ExtensionName("blockchain2")
extension2 := graphsync.ExtensionData{
Name: extensionName2,
Data: nil,
}

requestor.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension(extensionName1)
if has {
hookActions.UsePersistenceOption("chainstore1")
}
_, has = requestData.Extension(extensionName2)
if has {
hookActions.UsePersistenceOption("chainstore2")
}
})

progressChan1, errChan1 := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension1)
progressChan2, errChan2 := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension2)

blockChain.VerifyWholeChain(ctx, progressChan1)
testutil.VerifyEmptyErrors(ctx, t, errChan1)
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 1")
blockChain.VerifyWholeChain(ctx, progressChan2)
testutil.VerifyEmptyErrors(ctx, t, errChan2)
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 2")

}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
// under a specific of adverse conditions:
// -- large blocks being returned by a query
Expand Down
5 changes: 5 additions & 0 deletions linktracker/linktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (lt *LinkTracker) FinishRequest(requestID graphsync.RequestID) (hasAllBlock

return
}

// Empty returns true if the link tracker is empty
func (lt *LinkTracker) Empty() bool {
return len(lt.missingBlocks) == 0 && len(lt.traversalsWithBlocksInProgress) == 0
}
13 changes: 13 additions & 0 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
ipldutil "github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
Expand Down Expand Up @@ -514,6 +515,18 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
}
request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
if hooksResult.PersistenceOption != "" {
dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
request = request.ReplaceExtensions([]graphsync.ExtensionData{
{
Name: graphsync.ExtensionDeDupByKey,
Data: dedupData,
},
})
}
err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Expand Down
7 changes: 7 additions & 0 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
Expand Down Expand Up @@ -646,6 +647,12 @@ func TestOutgoingRequestHooks(t *testing.T) {

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)

dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
require.True(t, has)
key, err := dedupkey.DecodeDedupKey(dedupData)
require.NoError(t, err)
require.Equal(t, "chainstore", key)

md := metadataForBlocks(td.blockChain.AllBlocks(), true)
mdEncoded, err := metadata.EncodeMetadata(md)
require.NoError(t, err)
Expand Down
46 changes: 39 additions & 7 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type peerResponseSender struct {

linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
responseBuildersLk sync.RWMutex
responseBuilders []*responsebuilder.ResponseBuilder
}
Expand All @@ -50,6 +52,7 @@ type peerResponseSender struct {
// a given peer across multiple requests.
type PeerResponseSender interface {
peermanager.PeerProcess
DedupKey(requestID graphsync.RequestID, key string)
IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
SendResponse(
requestID graphsync.RequestID,
Expand Down Expand Up @@ -90,6 +93,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa
peerHandler: peerHandler,
outgoingWork: make(chan struct{}, 1),
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
}
}

Expand All @@ -98,10 +103,29 @@ func (prs *peerResponseSender) Startup() {
go prs.run()
}

func (prs *peerResponseSender) getLinkTracker(requestID graphsync.RequestID) *linktracker.LinkTracker {
key, ok := prs.dedupKeys[requestID]
if ok {
return prs.altTrackers[key]
}
return prs.linkTracker
}

func (prs *peerResponseSender) DedupKey(requestID graphsync.RequestID, key string) {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
prs.dedupKeys[requestID] = key
_, ok := prs.altTrackers[key]
if !ok {
prs.altTrackers[key] = linktracker.New()
}
}

func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
prs.linkTrackerLk.Lock()
linkTracker := prs.getLinkTracker(requestID)
for _, link := range links {
prs.linkTracker.RecordLinkTraversal(requestID, link, true)
linkTracker.RecordLinkTraversal(requestID, link, true)
}
prs.linkTrackerLk.Unlock()
}
Expand Down Expand Up @@ -235,8 +259,9 @@ func (prs *peerResponseSender) setupBlockOperation(requestID graphsync.RequestID
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
prs.linkTrackerLk.Lock()
sendBlock := hasBlock && prs.linkTracker.BlockRefCount(link) == 0
prs.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
linkTracker := prs.getLinkTracker(requestID)
sendBlock := hasBlock && linkTracker.BlockRefCount(link) == 0
linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prs.linkTrackerLk.Unlock()
return blockOperation{
data, sendBlock, link, requestID,
Expand Down Expand Up @@ -273,7 +298,16 @@ func (fo statusOperation) size() uint64 {
func (prs *peerResponseSender) finishTracking(requestID graphsync.RequestID) bool {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
return prs.linkTracker.FinishRequest(requestID)
linkTracker := prs.getLinkTracker(requestID)
allBlocks := linkTracker.FinishRequest(requestID)
key, ok := prs.dedupKeys[requestID]
if ok {
delete(prs.dedupKeys, requestID)
if linkTracker.Empty() {
delete(prs.altTrackers, key)
}
}
return allBlocks
}

func (prs *peerResponseSender) setupFinishOperation(requestID graphsync.RequestID) statusOperation {
Expand All @@ -295,9 +329,7 @@ func (prs *peerResponseSender) FinishRequest(requestID graphsync.RequestID) grap
}

func (prs *peerResponseSender) setupFinishWithErrOperation(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) statusOperation {
prs.linkTrackerLk.Lock()
prs.linkTracker.FinishRequest(requestID)
prs.linkTrackerLk.Unlock()
prs.finishTracking(requestID)
return statusOperation{requestID, status}
}

Expand Down
105 changes: 105 additions & 0 deletions responsemanager/peerresponsemanager/peerresponsesender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,111 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) {
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
}

func TestPeerResponseSenderDupKeys(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
requestID3 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
done := make(chan struct{}, 1)
sent := make(chan struct{}, 1)
fph := &fakePeerHandler{
done: done,
sent: sent,
}
peerResponseSender := NewResponseSender(ctx, p, fph)
peerResponseSender.Startup()

peerResponseSender.DedupKey(requestID1, "applesauce")
peerResponseSender.DedupKey(requestID3, "applesauce")

bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())
testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")

require.Len(t, fph.lastBlocks, 1)
require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")

require.Len(t, fph.lastResponses, 1)
require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())

bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
require.Equal(t, links[1], bd.Link())
require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[2], nil)
require.Equal(t, links[2], bd.Link())
require.Equal(t, uint64(0), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")

require.Len(t, fph.lastBlocks, 2)
require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not dedup blocks correctly on second message")
require.Equal(t, blks[1].Cid(), fph.lastBlocks[1].Cid(), "did not dedup blocks correctly on second message")

require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response1.Status(), "did not send correct response code in second message")
response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")

peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData())
peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData())
peerResponseSender.FinishRequest(requestID2)

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")

require.Equal(t, 2, len(fph.lastBlocks))
testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])
testutil.AssertContainsBlock(t, fph.lastBlocks, blks[4])

require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
response3, err := findResponseForRequestID(fph.lastResponses, requestID3)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not send correct response code in third message")

peerResponseSender.SendResponse(requestID3, links[0], blks[0].RawData())
peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData())

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")

require.Len(t, fph.lastBlocks, 0)

require.Len(t, fph.lastResponses, 1)
require.Equal(t, requestID3, fph.lastResponses[0].RequestID())
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())

}

func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
for _, response := range responses {
if response.RequestID() == requestID {
Expand Down
Loading

0 comments on commit 8e60042

Please sign in to comment.