Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup by key extension #83

Merged
merged 2 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions dedupkey/dedupkey.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dedupkey

import (
"github.com/ipfs/go-graphsync/ipldutil"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

// EncodeDedupKey returns encoded cbor data for string key
func EncodeDedupKey(key string) ([]byte, error) {
nb := basicnode.Style.String.NewBuilder()
err := nb.AssignString(key)
if err != nil {
return nil, err
}
nd := nb.Build()
return ipldutil.EncodeNode(nd)
}

// DecodeDedupKey returns a string key decoded from cbor data
func DecodeDedupKey(data []byte) (string, error) {
nd, err := ipldutil.DecodeNode(data)
if err != nil {
return "", err
}
return nd.AsString()
}
4 changes: 4 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const (
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids")

// ExtensionDeDupByKey tells the responding peer to only deduplicate block sending
// for requests that have the same key. The data for the extension is a string key
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")

// GraphSync Response Status Codes

// Informational Response Codes (partial)
Expand Down
65 changes: 65 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,71 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store")
}

func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// initialize graphsync on second node to response to requests
_ = td.GraphSyncHost2()

// alternate storing location for responder
altStore1 := make(map[ipld.Link][]byte)
altLoader1, altStorer1 := testutil.NewTestStore(altStore1)

// alternate storing location for requestor
altStore2 := make(map[ipld.Link][]byte)
altLoader2, altStorer2 := testutil.NewTestStore(altStore2)

err := requestor.RegisterPersistenceOption("chainstore1", altLoader1, altStorer1)
require.NoError(t, err)

err = requestor.RegisterPersistenceOption("chainstore2", altLoader2, altStorer2)
require.NoError(t, err)

blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

extensionName1 := graphsync.ExtensionName("blockchain1")
extension1 := graphsync.ExtensionData{
Name: extensionName1,
Data: nil,
}

extensionName2 := graphsync.ExtensionName("blockchain2")
extension2 := graphsync.ExtensionData{
Name: extensionName2,
Data: nil,
}

requestor.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension(extensionName1)
if has {
hookActions.UsePersistenceOption("chainstore1")
}
_, has = requestData.Extension(extensionName2)
if has {
hookActions.UsePersistenceOption("chainstore2")
}
})

progressChan1, errChan1 := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension1)
progressChan2, errChan2 := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension2)

blockChain.VerifyWholeChain(ctx, progressChan1)
testutil.VerifyEmptyErrors(ctx, t, errChan1)
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 1")
blockChain.VerifyWholeChain(ctx, progressChan2)
testutil.VerifyEmptyErrors(ctx, t, errChan2)
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 2")

}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
// under a specific of adverse conditions:
// -- large blocks being returned by a query
Expand Down
5 changes: 5 additions & 0 deletions linktracker/linktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (lt *LinkTracker) FinishRequest(requestID graphsync.RequestID) (hasAllBlock

return
}

// Empty returns true if the link tracker is empty
func (lt *LinkTracker) Empty() bool {
return len(lt.missingBlocks) == 0 && len(lt.traversalsWithBlocksInProgress) == 0
}
13 changes: 13 additions & 0 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
ipldutil "github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
Expand Down Expand Up @@ -514,6 +515,18 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
}
request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
if hooksResult.PersistenceOption != "" {
dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
request = request.ReplaceExtensions([]graphsync.ExtensionData{
{
Name: graphsync.ExtensionDeDupByKey,
Data: dedupData,
},
})
}
err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Expand Down
7 changes: 7 additions & 0 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
Expand Down Expand Up @@ -646,6 +647,12 @@ func TestOutgoingRequestHooks(t *testing.T) {

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)

dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
require.True(t, has)
key, err := dedupkey.DecodeDedupKey(dedupData)
require.NoError(t, err)
require.Equal(t, "chainstore", key)

md := metadataForBlocks(td.blockChain.AllBlocks(), true)
mdEncoded, err := metadata.EncodeMetadata(md)
require.NoError(t, err)
Expand Down
46 changes: 39 additions & 7 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type peerResponseSender struct {

linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
responseBuildersLk sync.RWMutex
responseBuilders []*responsebuilder.ResponseBuilder
}
Expand All @@ -50,6 +52,7 @@ type peerResponseSender struct {
// a given peer across multiple requests.
type PeerResponseSender interface {
peermanager.PeerProcess
DedupKey(requestID graphsync.RequestID, key string)
IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
SendResponse(
requestID graphsync.RequestID,
Expand Down Expand Up @@ -90,6 +93,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa
peerHandler: peerHandler,
outgoingWork: make(chan struct{}, 1),
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
}
}

Expand All @@ -98,10 +103,29 @@ func (prs *peerResponseSender) Startup() {
go prs.run()
}

func (prs *peerResponseSender) getLinkTracker(requestID graphsync.RequestID) *linktracker.LinkTracker {
key, ok := prs.dedupKeys[requestID]
if ok {
return prs.altTrackers[key]
}
return prs.linkTracker
}

func (prs *peerResponseSender) DedupKey(requestID graphsync.RequestID, key string) {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
prs.dedupKeys[requestID] = key
_, ok := prs.altTrackers[key]
if !ok {
prs.altTrackers[key] = linktracker.New()
}
}

func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
prs.linkTrackerLk.Lock()
linkTracker := prs.getLinkTracker(requestID)
for _, link := range links {
prs.linkTracker.RecordLinkTraversal(requestID, link, true)
linkTracker.RecordLinkTraversal(requestID, link, true)
}
prs.linkTrackerLk.Unlock()
}
Expand Down Expand Up @@ -235,8 +259,9 @@ func (prs *peerResponseSender) setupBlockOperation(requestID graphsync.RequestID
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
prs.linkTrackerLk.Lock()
sendBlock := hasBlock && prs.linkTracker.BlockRefCount(link) == 0
prs.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
linkTracker := prs.getLinkTracker(requestID)
sendBlock := hasBlock && linkTracker.BlockRefCount(link) == 0
linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prs.linkTrackerLk.Unlock()
return blockOperation{
data, sendBlock, link, requestID,
Expand Down Expand Up @@ -273,7 +298,16 @@ func (fo statusOperation) size() uint64 {
func (prs *peerResponseSender) finishTracking(requestID graphsync.RequestID) bool {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
return prs.linkTracker.FinishRequest(requestID)
linkTracker := prs.getLinkTracker(requestID)
allBlocks := linkTracker.FinishRequest(requestID)
key, ok := prs.dedupKeys[requestID]
if ok {
delete(prs.dedupKeys, requestID)
if linkTracker.Empty() {
delete(prs.altTrackers, key)
}
}
return allBlocks
}

func (prs *peerResponseSender) setupFinishOperation(requestID graphsync.RequestID) statusOperation {
Expand All @@ -295,9 +329,7 @@ func (prs *peerResponseSender) FinishRequest(requestID graphsync.RequestID) grap
}

func (prs *peerResponseSender) setupFinishWithErrOperation(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) statusOperation {
prs.linkTrackerLk.Lock()
prs.linkTracker.FinishRequest(requestID)
prs.linkTrackerLk.Unlock()
prs.finishTracking(requestID)
return statusOperation{requestID, status}
}

Expand Down
105 changes: 105 additions & 0 deletions responsemanager/peerresponsemanager/peerresponsesender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,111 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) {
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
}

func TestPeerResponseSenderDupKeys(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
requestID3 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
done := make(chan struct{}, 1)
sent := make(chan struct{}, 1)
fph := &fakePeerHandler{
done: done,
sent: sent,
}
peerResponseSender := NewResponseSender(ctx, p, fph)
peerResponseSender.Startup()

peerResponseSender.DedupKey(requestID1, "applesauce")
peerResponseSender.DedupKey(requestID3, "applesauce")

bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())
testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")

require.Len(t, fph.lastBlocks, 1)
require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")

require.Len(t, fph.lastResponses, 1)
require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())

bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
require.Equal(t, links[1], bd.Link())
require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[2], nil)
require.Equal(t, links[2], bd.Link())
require.Equal(t, uint64(0), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")

require.Len(t, fph.lastBlocks, 2)
require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not dedup blocks correctly on second message")
require.Equal(t, blks[1].Cid(), fph.lastBlocks[1].Cid(), "did not dedup blocks correctly on second message")

require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response1.Status(), "did not send correct response code in second message")
response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")

peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData())
peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData())
peerResponseSender.FinishRequest(requestID2)

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")

require.Equal(t, 2, len(fph.lastBlocks))
testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])
testutil.AssertContainsBlock(t, fph.lastBlocks, blks[4])

require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
response3, err := findResponseForRequestID(fph.lastResponses, requestID3)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not send correct response code in third message")

peerResponseSender.SendResponse(requestID3, links[0], blks[0].RawData())
peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData())

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")

require.Len(t, fph.lastBlocks, 0)

require.Len(t, fph.lastResponses, 1)
require.Equal(t, requestID3, fph.lastResponses[0].RequestID())
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())

}

func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
for _, response := range responses {
if response.RequestID() == requestID {
Expand Down
Loading