Skip to content

Commit

Permalink
minor code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Feb 1, 2024
1 parent a6c4610 commit be4fe6d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 21 deletions.
7 changes: 3 additions & 4 deletions delegatedrouting/cid_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ func (cq *cidQueue) recordCidNode(node *cidNode) *list.Element {
listElem.Value.(*cidNode).Timestamp = node.Timestamp
cq.nodesLl.MoveToFront(listElem)
return listElem
} else {
listElem := cq.nodesLl.PushFront(node)
cq.listNodeByCid[node.C] = listElem
return listElem
}
listElem := cq.nodesLl.PushFront(node)
cq.listNodeByCid[node.C] = listElem
return listElem
}

func (cq *cidQueue) removeCidNode(c cid.Cid) {
Expand Down
12 changes: 5 additions & 7 deletions delegatedrouting/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ func (listener *Listener) FindProviders(ctx context.Context, key cid.Cid, limit
}

func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {

Check failure on line 296 in delegatedrouting/listener.go

View workflow job for this annotation

GitHub Actions / go-check / All

server.BitswapWriteProvideRequest is deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: (SA1019)
const printFrequency = 10_000
cids := req.Keys
pid := req.ID
paddrs := req.Addrs
startTime := time.Now()
printFrequency := 10_000
listener.lock.Lock()
defer func() {
listener.stats.incDelegatedRoutingCallsProcessed()
Expand Down Expand Up @@ -326,12 +326,10 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
listener.lastSeenProviderInfo.ID = pid
listener.lastSeenProviderInfo.Addrs = paddrs

timestamp := time.Now()
for i, c := range cids {

// persisting timestamp only if this is not a snapshot
if len(cids) < listener.snapshotSize {
err := listener.dsWrapper.recordCidTimestamp(ctx, c, timestamp)
err := listener.dsWrapper.recordCidTimestamp(ctx, c, startTime)
if err != nil {
log.Errorw("Error persisting timestamp. Continuing.", "cid", c, "err", err)
continue
Expand All @@ -342,7 +340,7 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
if listElem == nil {
listener.cidQueue.recordCidNode(&cidNode{
C: c,
Timestamp: timestamp,
Timestamp: startTime,
})
err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error {
return listener.notifyPutAndPersist(ctx, cc)
Expand All @@ -354,7 +352,7 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
}
} else {
node := listElem.Value.(*cidNode)
node.Timestamp = timestamp
node.Timestamp = startTime
listener.cidQueue.recordCidNode(node)
// if no existing chunk has been found for the cid - adding it to the current one
// This can happen in the following cases:
Expand Down Expand Up @@ -393,12 +391,12 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa

// Revise logic here
func (listener *Listener) removeExpiredCids(ctx context.Context) (bool, error) {
const printFrequency = 100
lastElem := listener.cidQueue.nodesLl.Back()
currentTime := time.Now()
chunksToRemove := make(map[string]*cidsChunk)
cidsToRemove := make(map[cid.Cid]struct{})
removedSomeCids := false
printFrequency := 100
var cidsRemoved, chunksRemoved, chunksReplaced int
// find expired cids and their respective chunks
for {
Expand Down
2 changes: 1 addition & 1 deletion e2e_retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testS
// Explicitly override host so that the host is known for testing purposes.
h := newHost(t)
store := dssync.MutexWrap(datastore.NewMapDatastore())
dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem())
o = append(o, engine.WithHost(h), engine.WithDatastore(store))

var publisherAddr multiaddr.Multiaddr
Expand All @@ -289,6 +288,7 @@ func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testS
require.NoError(t, err)
require.NoError(t, e.Start(ctx))

dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem())
cs := supplier.NewCarSupplier(e, store, car.ZeroLengthSectionAsEOF(false))
require.NoError(t, cardatatransfer.StartCarDataTransfer(dt, cs))

Expand Down
15 changes: 6 additions & 9 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult
lnk, err := e.entriesChunker.Chunk(ctx, mhIter)
if err != nil {
return cid.Undef, fmt.Errorf("could not generate entries list: %s", err)
} else if lnk == nil {
}
if lnk == nil {
log.Warnw("chunking for context ID resulted in no link", "contextID", contextID)
lnk = schema.NoEntries
}
Expand Down Expand Up @@ -676,12 +677,10 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult
}

func (e *Engine) keyToCidKey(provider peer.ID, contextID []byte) datastore.Key {
switch provider {
case e.provider.ID:
if provider == e.provider.ID {
return datastore.NewKey(keyToCidMapPrefix + string(contextID))
default:
return datastore.NewKey(keyToCidMapPrefix + provider.String() + "/" + string(contextID))
}
return datastore.NewKey(keyToCidMapPrefix + provider.String() + "/" + string(contextID))
}

func (e *Engine) cidToKeyKey(c cid.Cid) datastore.Key {
Expand All @@ -693,12 +692,10 @@ func (e *Engine) cidToProviderAndKeyKey(c cid.Cid) datastore.Key {
}

func (e *Engine) keyToMetadataKey(provider peer.ID, contextID []byte) datastore.Key {
switch provider {
case e.provider.ID:
if provider == e.provider.ID {
return datastore.NewKey(keyToMetadataMapPrefix + string(contextID))
default:
return datastore.NewKey(keyToMetadataMapPrefix + provider.String() + "/" + string(contextID))
}
return datastore.NewKey(keyToMetadataMapPrefix + provider.String() + "/" + string(contextID))
}

func (e *Engine) putKeyCidMap(ctx context.Context, provider peer.ID, contextID []byte, c cid.Cid) error {
Expand Down

0 comments on commit be4fe6d

Please sign in to comment.