Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sequential and Parallel Routers #58

Merged
merged 15 commits into from
Sep 16, 2022
27 changes: 27 additions & 0 deletions compconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package routinghelpers

import (
"context"
"time"

"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multihash"
)

type ParallelRouter struct {
Timeout time.Duration
IgnoreError bool
Router routing.Routing
ExecuteAfter time.Duration
}

type SequentialRouter struct {
Timeout time.Duration
IgnoreError bool
Router routing.Routing
}

type ProvideManyRouter interface {
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
Ready() bool
}
Comment on lines +24 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is duplicated in multiple places and is ultimately what we need to consume a router in Kubo, right? Is there a way to move this somewhere authoritative? Should this be in libp2p? Also does this need to be exported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be where routing.Routing interface is living. It is public to at least have one source of truth that can be used anywhere.

323 changes: 323 additions & 0 deletions compparallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
package routinghelpers

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multihash"
)

var _ routing.Routing = &composableParallel{}
var _ ProvideManyRouter = &composableParallel{}

type composableParallel struct {
routers []*ParallelRouter
}

// NewComposableParallel creates a Router that will execute methods from provided Routers in parallel.
// On all methods, If IgnoreError flag is set, that Router will not stop the entire execution.
// On all methods, If ExecuteAfter is set, that Router will be executed after the timer.
// Router specific timeout will start counting AFTER the ExecuteAfter timer.
func NewComposableParallel(routers []*ParallelRouter) *composableParallel {
return &composableParallel{
routers: routers,
}
}

// Provide will call all Routers in parallel.
func (r *composableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.Provide(ctx, cid, provide)
},
)
}

// ProvideMany will call all supported Routers in parallel.
func (r *composableParallel) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
pm, ok := r.(ProvideManyRouter)
if !ok {
return nil
}
return pm.ProvideMany(ctx, keys)
},
)
}

// Ready will call all supported ProvideMany Routers SEQUENTIALLY.
// If some of them are not ready, this method will return false.
func (r *composableParallel) Ready() bool {
for _, ro := range r.routers {
pm, ok := ro.Router.(ProvideManyRouter)
if !ok {
continue
}

if !pm.Ready() {
return false
}
}

return true
}

// FindProvidersAsync will execute all Routers in parallel, iterating results from them in unspecified order.
// If count is set, only that amount of elements will be returned without any specification about from what router is obtained.
// To gather providers from a set of Routers first, you can use the ExecuteAfter timer to delay some Router execution.
func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
var totalCount int64
ch, _ := getChannelOrErrorParallel(
ctx,
r.routers,
func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) {
return r.FindProvidersAsync(ctx, cid, count), nil
},
func() bool {
return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0
},
)

return ch
}

// FindPeer will execute all Routers in parallel, getting the first AddrInfo found and cancelling all other Router calls.
func (r *composableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return getValueOrErrorParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (peer.AddrInfo, bool, error) {
addr, err := r.FindPeer(ctx, id)
return addr, addr.ID == "", err
},
)
}

// PutValue will execute all Routers in parallel. If a Router fails and IgnoreError flag is not set, the whole execution will fail.
// Some Puts before the failure might be successful, even if we return an error.
func (r *composableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.PutValue(ctx, key, val, opts...)
},
)
}

// GetValue will execute all Routers in parallel. The first value found will be returned, cancelling all other executions.
func (r *composableParallel) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
return getValueOrErrorParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) ([]byte, bool, error) {
val, err := r.GetValue(ctx, key, opts...)
return val, len(val) == 0, err
})
}

func (r *composableParallel) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
return getChannelOrErrorParallel(
ctx,
r.routers,
func(ctx context.Context, r routing.Routing) (<-chan []byte, error) {
return r.SearchValue(ctx, key, opts...)
},
func() bool { return false },
)
}

func (r *composableParallel) Bootstrap(ctx context.Context) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.Bootstrap(ctx)
})
}

func getValueOrErrorParallel[T any](
ctx context.Context,
routers []*ParallelRouter,
f func(context.Context, routing.Routing) (T, bool, error),
) (value T, err error) {
outCh := make(chan T)
errCh := make(chan error)

// global cancel context to stop early other router's execution.
ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll()
var wg sync.WaitGroup
for _, r := range routers {
wg.Add(1)
go func(r *ParallelRouter) {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
value, empty, err := f(ctx, r.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!r.IgnoreError {
select {
case <-ctx.Done():
case errCh <- err:
}
return
}
if empty {
return
}
select {
case <-ctx.Done():
return
case outCh <- value:
}
}
}(r)
}

// goroutine closing everything when finishing execution
go func() {
wg.Wait()
close(outCh)
close(errCh)
}()

select {
case out, ok := <-outCh:
if !ok {
return value, routing.ErrNotFound
}
return out, nil
case err, ok := <-errCh:
if !ok {
return value, routing.ErrNotFound
}
return value, err
case <-ctx.Done():
return value, ctx.Err()
}
}

func executeParallel(
ctx context.Context,
routers []*ParallelRouter,
f func(context.Context, routing.Routing,
) error) error {
var wg sync.WaitGroup
errCh := make(chan error)
for _, r := range routers {
wg.Add(1)
go func(r *ParallelRouter) {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if !r.IgnoreError {
errCh <- ctx.Err()
}
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
err := f(ctx, r.Router)
if err != nil &&
!r.IgnoreError {
errCh <- err
}
}
}(r)
}

go func() {
wg.Wait()
close(errCh)
}()

var errOut error
for err := range errCh {
errOut = multierror.Append(errOut, err)
}

return errOut
}

func getChannelOrErrorParallel[T any](
ctx context.Context,
routers []*ParallelRouter,
f func(context.Context, routing.Routing) (<-chan T, error),
shouldStop func() bool,
) (chan T, error) {
outCh := make(chan T)
errCh := make(chan error)
var wg sync.WaitGroup
ctx, cancelAll := context.WithCancel(ctx)
for _, r := range routers {
wg.Add(1)
go func(r *ParallelRouter) {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
return
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
valueChan, err := f(ctx, r.Router)
if err != nil && !r.IgnoreError {
select {
case <-ctx.Done():
case errCh <- err:
}
return
}
for {
select {
case <-ctx.Done():
return
case val, ok := <-valueChan:
if !ok {
return
}

if shouldStop() {
return
}

select {
case <-ctx.Done():
return
case outCh <- val:
}
}
}
}
}(r)
}

// goroutine closing everything when finishing execution
go func() {
wg.Wait()
close(outCh)
close(errCh)
cancelAll()
}()

select {
case err, ok := <-errCh:
if !ok {
return nil, routing.ErrNotFound
}
return nil, err
ajnavarro marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return nil, ctx.Err()
default:
return outCh, nil
}
}
Loading