Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
feat: Add tests for affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
AmeanAsad committed Sep 19, 2023
1 parent c1ab0e9 commit 1975f49
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 6 deletions.
10 changes: 5 additions & 5 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,14 @@ func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error
ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -213,7 +213,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -225,14 +225,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
func (c *Caboose) GetAffinity(ctx context.Context) string {
// https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865
if affG := ctx.Value(gateway.ContentPathKey); affG != nil {
contentPath := affG.(ipath.Path).String()
Expand Down
2 changes: 2 additions & 0 deletions internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing)
ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap)
ch.CaboosePool = conf.Harness.PoolController
ch.Config = conf
return ch
}

Expand All @@ -89,6 +90,7 @@ type CabooseHarness struct {
CabooseActiveNodes *caboose.NodeRing
CabooseAllNodes *caboose.NodeHeap
CaboosePool state.PoolController
Config *caboose.Config

gol sync.Mutex
goodOrch bool
Expand Down
108 changes: 107 additions & 1 deletion pool_dynamics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package caboose_test

import (
"context"
cryptoRand "crypto/rand"
"fmt"
"math/rand"
"net/url"
"testing"
Expand All @@ -15,8 +17,9 @@ import (
)

const (
nodesSize = 200
nodesSize = 6

This comment has been minimized.

Copy link
@aarshkshah1992

aarshkshah1992 Sep 19, 2023

Contributor

@AmeanAsad Why did we reduce the node size here ?

This comment has been minimized.

Copy link
@AmeanAsad

AmeanAsad Sep 19, 2023

Author Contributor

Just to make it easier to debug and get the tests passing. Was plannign to bump it again once we get all the tests passing again

)
const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block"

/*
This function tests if the caboose pool converges to a set of nodes that are expected
Expand Down Expand Up @@ -209,6 +212,94 @@ func TestPoolDynamics(t *testing.T) {

}

func TestPoolAffinity(t *testing.T) {
baseStatSize := 100000
baseStatLatency := 100
// statVarianceFactor := 0.1
poolRefreshNo := 10
simReqCount := 10000
ctx := context.Background()
cidList := generateRandomCIDs(20)

t.Run("selected nodes remain consistent for same cid reqs", func(t *testing.T) {
ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2)
_, _ = ch.Caboose.Get(ctx, cidList[0])

goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)

for _, n := range ch.CabooseAllNodes.Nodes {
_, ok := controlGroup[n.URL]
if ok {
goodNodes = append(goodNodes, n)
} else {
badNodes = append(badNodes, n)
}
}

// Send requests to control group nodes to bump their selection into the pool.
for i := 0; i < poolRefreshNo; i++ {
baseStats := util.NodeStats{
Start: time.Now().Add(-time.Second * 2),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}

ch.RecordSuccesses(t, goodNodes, baseStats, 1000)
ch.CaboosePool.DoRefresh()
}

// Make a bunch of requests to similar cids to establish a stable hashring
for i := 0; i < simReqCount; i++ {
rand.New(rand.NewSource(time.Now().Unix()))
idx := rand.Intn(len(cidList))
_, _ = ch.Caboose.Get(ctx, cidList[idx])
}
ch.CaboosePool.DoRefresh()

// Introduce new nodes by sendng same stats to those nodes.
for i := 0; i < poolRefreshNo/2; i++ {
baseStats := util.NodeStats{
Start: time.Now().Add(-time.Second * 2),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}

// variedStats := util.NodeStats{
// Start: time.Now().Add(-time.Second * 2),
// Latency: float64(baseStatLatency) / (float64(10) + (1 + statVarianceFactor)),
// Size: float64(baseStatSize) * float64(10) * (1 + statVarianceFactor),
// }

ch.RecordSuccesses(t, goodNodes, baseStats, 100)
ch.RecordSuccesses(t, badNodes, baseStats, 10)

ch.CaboosePool.DoRefresh()
}

// for _, i := range ch.CabooseAllNodes.Nodes {
// fmt.Println(i.URL, i.Priority(), i.PredictedLatency)
// }

// Get the candidate nodes for a few cids from our formed cid list using
// the affinity of each cid.
for i := 0; i < 10; i++ {
rand.New(rand.NewSource(time.Now().Unix()))
idx := rand.Intn(len(cidList))
c := cidList[idx]
aff := ch.Caboose.GetAffinity(ctx)
if aff == "" {
aff = fmt.Sprintf(blockPathPattern, c)
}
nodes, _ := ch.CabooseActiveNodes.GetNodes(aff, ch.Config.MaxRetrievalAttempts)

// We expect that the candidate nodes are part of the "good nodes" list.
assert.Contains(t, goodNodes, nodes[0])
}

})
}

func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) {
ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) {
config.PoolTargetSize = nodesSize / 2
Expand All @@ -231,3 +322,18 @@ func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util

return ch, controlGroup
}

func generateRandomCIDs(count int) []cid.Cid {
var cids []cid.Cid
for i := 0; i < count; i++ {
block := make([]byte, 32)
cryptoRand.Read(block)
c, _ := cid.V1Builder{
Codec: uint64(multicodec.Raw),
MhType: uint64(multicodec.Sha2_256),
}.Sum(block)

cids = append(cids, c)
}
return cids
}

0 comments on commit 1975f49

Please sign in to comment.