diff --git a/compconfig.go b/compconfig.go new file mode 100644 index 0000000..b3bd85f --- /dev/null +++ b/compconfig.go @@ -0,0 +1,27 @@ +package routinghelpers + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/routing" + "github.com/multiformats/go-multihash" +) + +type ParallelRouter struct { + Timeout time.Duration + IgnoreError bool + Router routing.Routing + ExecuteAfter time.Duration +} + +type SequentialRouter struct { + Timeout time.Duration + IgnoreError bool + Router routing.Routing +} + +type ProvideManyRouter interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error + Ready() bool +} diff --git a/compparallel.go b/compparallel.go new file mode 100644 index 0000000..3c38239 --- /dev/null +++ b/compparallel.go @@ -0,0 +1,323 @@ +package routinghelpers + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/multiformats/go-multihash" +) + +var _ routing.Routing = &composableParallel{} +var _ ProvideManyRouter = &composableParallel{} + +type composableParallel struct { + routers []*ParallelRouter +} + +// NewComposableParallel creates a Router that will execute methods from provided Routers in parallel. +// On all methods, If IgnoreError flag is set, that Router will not stop the entire execution. +// On all methods, If ExecuteAfter is set, that Router will be executed after the timer. +// Router specific timeout will start counting AFTER the ExecuteAfter timer. +func NewComposableParallel(routers []*ParallelRouter) *composableParallel { + return &composableParallel{ + routers: routers, + } +} + +// Provide will call all Routers in parallel. +func (r *composableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error { + return executeParallel(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + return r.Provide(ctx, cid, provide) + }, + ) +} + +// ProvideMany will call all supported Routers in parallel. +func (r *composableParallel) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { + return executeParallel(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + pm, ok := r.(ProvideManyRouter) + if !ok { + return nil + } + return pm.ProvideMany(ctx, keys) + }, + ) +} + +// Ready will call all supported ProvideMany Routers SEQUENTIALLY. +// If some of them are not ready, this method will return false. +func (r *composableParallel) Ready() bool { + for _, ro := range r.routers { + pm, ok := ro.Router.(ProvideManyRouter) + if !ok { + continue + } + + if !pm.Ready() { + return false + } + } + + return true +} + +// FindProvidersAsync will execute all Routers in parallel, iterating results from them in unspecified order. +// If count is set, only that amount of elements will be returned without any specification about from what router is obtained. +// To gather providers from a set of Routers first, you can use the ExecuteAfter timer to delay some Router execution. +func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo { + var totalCount int64 + ch, _ := getChannelOrErrorParallel( + ctx, + r.routers, + func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) { + return r.FindProvidersAsync(ctx, cid, count), nil + }, + func() bool { + return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0 + }, + ) + + return ch +} + +// FindPeer will execute all Routers in parallel, getting the first AddrInfo found and cancelling all other Router calls. +func (r *composableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + return getValueOrErrorParallel(ctx, r.routers, + func(ctx context.Context, r routing.Routing) (peer.AddrInfo, bool, error) { + addr, err := r.FindPeer(ctx, id) + return addr, addr.ID == "", err + }, + ) +} + +// PutValue will execute all Routers in parallel. If a Router fails and IgnoreError flag is not set, the whole execution will fail. +// Some Puts before the failure might be successful, even if we return an error. +func (r *composableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error { + return executeParallel(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + return r.PutValue(ctx, key, val, opts...) + }, + ) +} + +// GetValue will execute all Routers in parallel. The first value found will be returned, cancelling all other executions. +func (r *composableParallel) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + return getValueOrErrorParallel(ctx, r.routers, + func(ctx context.Context, r routing.Routing) ([]byte, bool, error) { + val, err := r.GetValue(ctx, key, opts...) + return val, len(val) == 0, err + }) +} + +func (r *composableParallel) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + return getChannelOrErrorParallel( + ctx, + r.routers, + func(ctx context.Context, r routing.Routing) (<-chan []byte, error) { + return r.SearchValue(ctx, key, opts...) + }, + func() bool { return false }, + ) +} + +func (r *composableParallel) Bootstrap(ctx context.Context) error { + return executeParallel(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + return r.Bootstrap(ctx) + }) +} + +func getValueOrErrorParallel[T any]( + ctx context.Context, + routers []*ParallelRouter, + f func(context.Context, routing.Routing) (T, bool, error), +) (value T, err error) { + outCh := make(chan T) + errCh := make(chan error) + + // global cancel context to stop early other router's execution. + ctx, cancelAll := context.WithCancel(ctx) + defer cancelAll() + var wg sync.WaitGroup + for _, r := range routers { + wg.Add(1) + go func(r *ParallelRouter) { + defer wg.Done() + tim := time.NewTimer(r.ExecuteAfter) + defer tim.Stop() + select { + case <-ctx.Done(): + case <-tim.C: + ctx, cancel := context.WithTimeout(ctx, r.Timeout) + defer cancel() + value, empty, err := f(ctx, r.Router) + if err != nil && + !errors.Is(err, routing.ErrNotFound) && + !r.IgnoreError { + select { + case <-ctx.Done(): + case errCh <- err: + } + return + } + if empty { + return + } + select { + case <-ctx.Done(): + return + case outCh <- value: + } + } + }(r) + } + + // goroutine closing everything when finishing execution + go func() { + wg.Wait() + close(outCh) + close(errCh) + }() + + select { + case out, ok := <-outCh: + if !ok { + return value, routing.ErrNotFound + } + return out, nil + case err, ok := <-errCh: + if !ok { + return value, routing.ErrNotFound + } + return value, err + case <-ctx.Done(): + return value, ctx.Err() + } +} + +func executeParallel( + ctx context.Context, + routers []*ParallelRouter, + f func(context.Context, routing.Routing, + ) error) error { + var wg sync.WaitGroup + errCh := make(chan error) + for _, r := range routers { + wg.Add(1) + go func(r *ParallelRouter) { + defer wg.Done() + tim := time.NewTimer(r.ExecuteAfter) + defer tim.Stop() + select { + case <-ctx.Done(): + if !r.IgnoreError { + errCh <- ctx.Err() + } + case <-tim.C: + ctx, cancel := context.WithTimeout(ctx, r.Timeout) + defer cancel() + err := f(ctx, r.Router) + if err != nil && + !r.IgnoreError { + errCh <- err + } + } + }(r) + } + + go func() { + wg.Wait() + close(errCh) + }() + + var errOut error + for err := range errCh { + errOut = multierror.Append(errOut, err) + } + + return errOut +} + +func getChannelOrErrorParallel[T any]( + ctx context.Context, + routers []*ParallelRouter, + f func(context.Context, routing.Routing) (<-chan T, error), + shouldStop func() bool, +) (chan T, error) { + outCh := make(chan T) + errCh := make(chan error) + var wg sync.WaitGroup + ctx, cancelAll := context.WithCancel(ctx) + for _, r := range routers { + wg.Add(1) + go func(r *ParallelRouter) { + defer wg.Done() + tim := time.NewTimer(r.ExecuteAfter) + defer tim.Stop() + select { + case <-ctx.Done(): + return + case <-tim.C: + ctx, cancel := context.WithTimeout(ctx, r.Timeout) + defer cancel() + valueChan, err := f(ctx, r.Router) + if err != nil && !r.IgnoreError { + select { + case <-ctx.Done(): + case errCh <- err: + } + return + } + for { + select { + case <-ctx.Done(): + return + case val, ok := <-valueChan: + if !ok { + return + } + + if shouldStop() { + return + } + + select { + case <-ctx.Done(): + return + case outCh <- val: + } + } + } + } + }(r) + } + + // goroutine closing everything when finishing execution + go func() { + wg.Wait() + close(outCh) + close(errCh) + cancelAll() + }() + + select { + case err, ok := <-errCh: + if !ok { + return nil, routing.ErrNotFound + } + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + default: + return outCh, nil + } +} diff --git a/compparallel_test.go b/compparallel_test.go new file mode 100644 index 0000000..ccfe59c --- /dev/null +++ b/compparallel_test.go @@ -0,0 +1,429 @@ +package routinghelpers + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/stretchr/testify/require" +) + +func TestComposableParallelFixtures(t *testing.T) { + fixtures := []struct { + Name string + routers []*ParallelRouter + GetValueFixtures []struct { + err error + key string + value string + searchValCount int + } + PutValueFixtures []struct { + err error + key string + value string + } + ProvideFixtures []struct { + err error + } + FindPeerFixtures []struct { + peerID string + err error + } + }{ + { + Name: "simple two routers, one with delay", + routers: []*ParallelRouter{ + { + Timeout: time.Second, + IgnoreError: false, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"a", "b", "c"}, []string{"av", "bv", "cv"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}), + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: false, + ExecuteAfter: time.Second, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"a", "d"}, []string{"av2", "dv"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}), + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + key: "a", + value: "av", + searchValCount: 2, + }, + }, + PutValueFixtures: []struct { + err error + key string + value string + }{ + { + err: errors.New("2 errors occurred:\n\t* a\n\t* a\n\n"), + key: "/error/a", + value: "a", + }, + { + key: "a", + value: "a", + }, + }, + ProvideFixtures: []struct { + err error + }{ + { + err: errors.New("2 errors occurred:\n\t* routing: operation or key not supported\n\t* routing: operation or key not supported\n\n"), + }, + }, + FindPeerFixtures: []struct { + peerID string + err error + }{ + { + peerID: "pid1", + }, + { + peerID: "pid3", + }, + }, + }, + { + Name: "two routers with ignore errors", + routers: []*ParallelRouter{ + { + Timeout: time.Second, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{}, []string{}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}), + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: true, + ExecuteAfter: time.Second, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d"}, []string{"dv"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}), + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + err: routing.ErrNotFound, // even ignoring errors, if the value is not found we return not found + key: "a", + }, + }, + PutValueFixtures: []struct { + err error + key string + value string + }{ + { + key: "/error/x", + value: "xv", + }, + { + key: "/error/y", + value: "yv", + }, + }, + FindPeerFixtures: []struct { + peerID string + err error + }{ + { + peerID: "pid1", + }, + { + err: routing.ErrNotFound, // even ignoring errors, if the value is not found we return not found + peerID: "pid4", + }, + }, + }, + { + Name: "two routers with ignore errors no delay", + routers: []*ParallelRouter{ + { + Timeout: time.Second, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"a"}, []string{"av"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}), + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d", "e"}, []string{"dv", "ev"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}), + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + key: "a", + value: "av", + searchValCount: 1, + }, + { + err: routing.ErrNotFound, + key: "/error/z", + }, + { + err: routing.ErrNotFound, + key: "/error/y", + }, + }, + PutValueFixtures: []struct { + err error + key string + value string + }{ + { + key: "/error/x", + value: "xv", + }, + { + key: "/error/y", + value: "yv", + }, + }, + FindPeerFixtures: []struct { + peerID string + err error + }{ + { + peerID: "pid1", + }, + { + peerID: "pid4", + err: routing.ErrNotFound, + }, + }, + }, + { + Name: "two routers one value store failing always", + routers: []*ParallelRouter{ + { + Timeout: time.Second, + IgnoreError: false, + Router: &Compose{ + ValueStore: failValueStore{}, + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: false, + ExecuteAfter: time.Minute, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d", "e"}, []string{"dv", "ev"}), + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + err: errFailValue, + key: "d", + value: "dv", + }, + { + err: errFailValue, + key: "a", + value: "av", + }, + }, + }, + { + Name: "two routers one value store failing always but ignored", + routers: []*ParallelRouter{ + { + Timeout: time.Second, + IgnoreError: true, + Router: &Compose{ + ValueStore: failValueStore{}, + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Second, + IgnoreError: false, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d", "e"}, []string{"dv", "ev"}), + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + err: routing.ErrNotFound, + key: "a", + value: "av", + }, + }, + }, + } + + for _, f := range fixtures { + f := f + t.Run(f.Name, func(t *testing.T) { + t.Parallel() + require := require.New(t) + cpr := NewComposableParallel(f.routers) + for _, gvf := range f.GetValueFixtures { + val, err := cpr.GetValue(context.Background(), gvf.key) + if gvf.err != nil { + require.ErrorContains(err, gvf.err.Error()) + continue + } else { + require.NoError(err) + } + + require.Equal(gvf.value, string(val)) + + vals, err := cpr.SearchValue(context.Background(), gvf.key) + if gvf.err != nil { + require.ErrorContains(err, gvf.err.Error()) + continue + } else { + require.NoError(err) + } + + count := 0 + for range vals { + count++ + } + + require.Equal(gvf.searchValCount, count) + } + + for _, pvf := range f.PutValueFixtures { + err := cpr.PutValue(context.Background(), pvf.key, []byte(pvf.value)) + if pvf.err != nil { + require.ErrorContains(err, pvf.err.Error()) + continue + } else { + require.NoError(err) + } + } + + for _, pf := range f.ProvideFixtures { + err := cpr.Provide(context.Background(), cid.Cid{}, true) + if pf.err != nil { + require.ErrorContains(err, pf.err.Error()) + continue + } else { + require.NoError(err) + } + } + + for _, fpf := range f.FindPeerFixtures { + addr, err := cpr.FindPeer(context.Background(), peer.ID(fpf.peerID)) + if fpf.err != nil { + require.ErrorContains(err, fpf.err.Error()) + continue + } else { + require.NoError(err) + } + + require.Equal(fpf.peerID, string(addr.ID)) + } + }) + } +} + +func newDummyPeerRouting(t testing.TB, ids []peer.ID) routing.PeerRouting { + pr := dummyPeerRouter{} + for _, id := range ids { + pr[id] = struct{}{} + } + + return pr +} + +func newDummyValueStore(t testing.TB, keys []string, values []string) routing.ValueStore { + t.Helper() + + if len(keys) != len(values) { + t.Fatal("keys and values must be the same amount") + } + + dvs := &dummyValueStore{} + for i, k := range keys { + v := values[i] + err := dvs.PutValue(context.TODO(), k, []byte(v)) + if err != nil { + t.Fatal(err) + } + } + + return dvs +} diff --git a/compsequential.go b/compsequential.go new file mode 100644 index 0000000..111185b --- /dev/null +++ b/compsequential.go @@ -0,0 +1,238 @@ +package routinghelpers + +import ( + "context" + "errors" + "sync/atomic" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/multiformats/go-multihash" +) + +var _ routing.Routing = &composableSequential{} + +type composableSequential struct { + routers []*SequentialRouter +} + +func NewComposableSequential(routers []*SequentialRouter) *composableSequential { + return &composableSequential{ + routers: routers, + } +} + +// Provide calls Provide method per each router sequentially. +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +func (r *composableSequential) Provide(ctx context.Context, cid cid.Cid, provide bool) error { + return executeSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + return r.Provide(ctx, cid, provide) + }) +} + +// ProvideMany will call all supported Routers sequentially. +func (r *composableSequential) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { + return executeSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + pm, ok := r.(ProvideManyRouter) + if !ok { + return nil + } + return pm.ProvideMany(ctx, keys) + }, + ) +} + +// Ready will call all supported ProvideMany Routers sequentially. +// If some of them are not ready, this method will return false. +func (r *composableSequential) Ready() bool { + for _, ro := range r.routers { + pm, ok := ro.Router.(ProvideManyRouter) + if !ok { + continue + } + + if !pm.Ready() { + return false + } + } + + return true +} + +// FindProvidersAsync calls FindProvidersAsync per each router sequentially. +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +// If count is set, the channel will return up to count results, stopping routers iteration. +func (r *composableSequential) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo { + var totalCount int64 + return getChannelOrErrorSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) { + return r.FindProvidersAsync(ctx, cid, count), nil + }, + func() bool { + return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0 + }, + ) +} + +// FindPeer calls FindPeer per each router sequentially. +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +func (r *composableSequential) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) { + return getValueOrErrorSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) (peer.AddrInfo, bool, error) { + addr, err := r.FindPeer(ctx, pid) + return addr, addr.ID == "", err + }, + ) +} + +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +func (r *composableSequential) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error { + return executeSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + return r.PutValue(ctx, key, val, opts...) + }) +} + +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +func (r *composableSequential) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + return getValueOrErrorSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) ([]byte, bool, error) { + val, err := r.GetValue(ctx, key, opts...) + return val, len(val) == 0, err + }, + ) +} + +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +func (r *composableSequential) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + ch := getChannelOrErrorSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) (<-chan []byte, error) { + return r.SearchValue(ctx, key, opts...) + }, + func() bool { return false }, + ) + + return ch, nil + +} + +// If some router fails and the IgnoreError flag is true, we continue to the next router. +// Context timeout error will be also ignored if the flag is set. +func (r *composableSequential) Bootstrap(ctx context.Context) error { + return executeSequential(ctx, r.routers, + func(ctx context.Context, r routing.Routing) error { + return r.Bootstrap(ctx) + }, + ) +} + +func getValueOrErrorSequential[T any]( + ctx context.Context, + routers []*SequentialRouter, + f func(context.Context, routing.Routing) (T, bool, error), +) (value T, err error) { + for _, router := range routers { + if ctxErr := ctx.Err(); ctxErr != nil { + return value, ctxErr + } + + ctx, cancel := context.WithTimeout(ctx, router.Timeout) + defer cancel() + value, empty, err := f(ctx, router.Router) + if err != nil && + !errors.Is(err, routing.ErrNotFound) && + !router.IgnoreError { + return value, err + } + + if empty { + continue + } + + return value, nil + } + + return value, routing.ErrNotFound +} + +func executeSequential( + ctx context.Context, + routers []*SequentialRouter, + f func(context.Context, routing.Routing, + ) error) error { + for _, router := range routers { + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + ctx, cancel := context.WithTimeout(ctx, router.Timeout) + if err := f(ctx, router.Router); err != nil && + !errors.Is(err, routing.ErrNotFound) && + !router.IgnoreError { + cancel() + return err + } + cancel() + } + + return nil +} + +func getChannelOrErrorSequential[T any]( + ctx context.Context, + routers []*SequentialRouter, + f func(context.Context, routing.Routing) (<-chan T, error), + shouldStop func() bool, +) chan T { + chanOut := make(chan T) + + go func() { + for _, router := range routers { + if ctxErr := ctx.Err(); ctxErr != nil { + close(chanOut) + return + } + + ctx, cancel := context.WithTimeout(ctx, router.Timeout) + rch, err := f(ctx, router.Router) + if err != nil && + !errors.Is(err, routing.ErrNotFound) && + !router.IgnoreError { + cancel() + break + } + + f: + for { + select { + case <-ctx.Done(): + break f + case v, ok := <-rch: + if !ok { + break f + } + select { + case <-ctx.Done(): + break f + case chanOut <- v: + } + + } + } + + cancel() + } + + close(chanOut) + }() + + return chanOut +} diff --git a/compsequential_test.go b/compsequential_test.go new file mode 100644 index 0000000..1e21e86 --- /dev/null +++ b/compsequential_test.go @@ -0,0 +1,398 @@ +package routinghelpers + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/stretchr/testify/require" +) + +func TestComposableSequentialFixtures(t *testing.T) { + fixtures := []struct { + Name string + routers []*SequentialRouter + GetValueFixtures []struct { + err error + key string + value string + searchValCount int + } + PutValueFixtures []struct { + err error + key string + value string + } + ProvideFixtures []struct { + err error + } + FindPeerFixtures []struct { + peerID string + err error + } + }{ + { + Name: "simple two routers", + routers: []*SequentialRouter{ + { + Timeout: time.Second, + IgnoreError: false, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"a", "b", "c"}, []string{"av", "bv", "cv"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}), + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: false, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"a", "d"}, []string{"av2", "dv"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}), + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + key: "a", + value: "av", + searchValCount: 2, + }, + }, + PutValueFixtures: []struct { + err error + key string + value string + }{ + { + err: errors.New("a"), + key: "/error/a", + value: "a", + }, + { + key: "a", + value: "a", + }, + }, + ProvideFixtures: []struct { + err error + }{ + { + err: routing.ErrNotSupported, + }, + }, + FindPeerFixtures: []struct { + peerID string + err error + }{ + { + peerID: "pid1", + }, + { + peerID: "pid3", + }, + }, + }, + { + Name: "two routers with ignore errors", + routers: []*SequentialRouter{ + { + Timeout: time.Second, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{}, []string{}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}), + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d"}, []string{"dv"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}), + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + err: routing.ErrNotFound, // even ignoring errors, if the value is not found we return not found + key: "a", + }, + }, + PutValueFixtures: []struct { + err error + key string + value string + }{ + { + key: "/error/x", + value: "xv", + }, + { + key: "/error/y", + value: "yv", + }, + }, + FindPeerFixtures: []struct { + peerID string + err error + }{ + { + peerID: "pid1", + }, + { + err: routing.ErrNotFound, // even ignoring errors, if the value is not found we return not found + peerID: "pid4", + }, + }, + }, + { + Name: "two routers with ignore errors no delay", + routers: []*SequentialRouter{ + { + Timeout: time.Second, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"a"}, []string{"av"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid2"}), + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: true, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d", "e"}, []string{"dv", "ev"}), + PeerRouting: newDummyPeerRouting(t, []peer.ID{"pid1", "pid3"}), + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + key: "a", + value: "av", + searchValCount: 1, + }, + { + err: routing.ErrNotFound, + key: "/error/z", + }, + { + err: routing.ErrNotFound, + key: "/error/y", + }, + }, + PutValueFixtures: []struct { + err error + key string + value string + }{ + { + key: "/error/x", + value: "xv", + }, + { + key: "/error/y", + value: "yv", + }, + }, + FindPeerFixtures: []struct { + peerID string + err error + }{ + { + peerID: "pid1", + }, + { + peerID: "pid4", + err: routing.ErrNotFound, + }, + }, + }, + { + Name: "two routers one value store failing always", + routers: []*SequentialRouter{ + { + Timeout: time.Second, + IgnoreError: false, + Router: &Compose{ + ValueStore: failValueStore{}, + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Minute, + IgnoreError: false, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d", "e"}, []string{"dv", "ev"}), + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + err: errFailValue, + key: "d", + value: "dv", + }, + { + err: errFailValue, + key: "a", + value: "av", + }, + }, + }, + { + Name: "two routers one value store failing always but ignored", + routers: []*SequentialRouter{ + { + Timeout: time.Second, + IgnoreError: true, + Router: &Compose{ + ValueStore: failValueStore{}, + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + { + Timeout: time.Second, + IgnoreError: false, + Router: &Compose{ + ValueStore: newDummyValueStore(t, []string{"d", "e"}, []string{"dv", "ev"}), + PeerRouting: Null{}, + ContentRouting: Null{}, + }, + }, + }, + GetValueFixtures: []struct { + err error + key string + value string + searchValCount int + }{ + { + key: "d", + value: "dv", + searchValCount: 1, + }, + { + err: routing.ErrNotFound, + key: "a", + value: "av", + }, + }, + }, + } + + for _, f := range fixtures { + f := f + t.Run(f.Name, func(t *testing.T) { + t.Parallel() + require := require.New(t) + cpr := NewComposableSequential(f.routers) + for _, gvf := range f.GetValueFixtures { + val, err := cpr.GetValue(context.Background(), gvf.key) + if gvf.err != nil { + require.ErrorContains(err, gvf.err.Error()) + continue + } else { + require.NoError(err) + } + + require.Equal(gvf.value, string(val)) + + vals, err := cpr.SearchValue(context.Background(), gvf.key) + if gvf.err != nil { + require.ErrorContains(err, gvf.err.Error()) + continue + } else { + require.NoError(err) + } + + count := 0 + for range vals { + count++ + } + + require.Equal(gvf.searchValCount, count) + } + + for _, pvf := range f.PutValueFixtures { + err := cpr.PutValue(context.Background(), pvf.key, []byte(pvf.value)) + if pvf.err != nil { + require.ErrorContains(err, pvf.err.Error()) + continue + } else { + require.NoError(err) + } + } + + for _, pf := range f.ProvideFixtures { + err := cpr.Provide(context.Background(), cid.Cid{}, true) + if pf.err != nil { + require.ErrorContains(err, pf.err.Error()) + continue + } else { + require.NoError(err) + } + } + + for _, fpf := range f.FindPeerFixtures { + addr, err := cpr.FindPeer(context.Background(), peer.ID(fpf.peerID)) + if fpf.err != nil { + require.ErrorContains(err, fpf.err.Error()) + continue + } else { + require.NoError(err) + } + + require.Equal(fpf.peerID, string(addr.ID)) + } + }) + } +} diff --git a/go.mod b/go.mod index 7430e17..e3476fd 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,11 @@ require ( github.com/libp2p/go-libp2p v0.22.0 github.com/libp2p/go-libp2p-record v0.2.0 github.com/multiformats/go-multihash v0.2.1 + github.com/stretchr/testify v1.8.0 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect @@ -26,9 +28,11 @@ require ( github.com/multiformats/go-multibase v0.1.1 // indirect github.com/multiformats/go-multicodec v0.5.0 // indirect github.com/multiformats/go-varint v0.0.6 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/go.sum b/go.sum index 7471eb7..2baad75 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= @@ -47,11 +49,16 @@ github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY= github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -84,6 +91,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0= lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=