diff --git a/composed.go b/composed.go index 5ffa0d0..69b2ea0 100644 --- a/composed.go +++ b/composed.go @@ -63,7 +63,10 @@ func (cr *Compose) Provide(ctx context.Context, c cid.Cid, local bool) error { return cr.ContentRouting.Provide(ctx, c, local) } -// FindProvidersAsync searches for peers who are able to provide a given key +// FindProvidersAsync searches for peers who are able to provide a given key. +// +// If count > 0, it returns at most count providers. If count == 0, it returns +// an unbounded number of providers. func (cr *Compose) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { if cr.ContentRouting == nil { ch := make(chan peer.AddrInfo) diff --git a/limited.go b/limited.go index ff7d88e..04ad901 100644 --- a/limited.go +++ b/limited.go @@ -16,7 +16,8 @@ type LimitedValueStore struct { Namespaces []string } -// GetPublicKey returns the public key for the given peer. +// GetPublicKey returns the public key for the given peer, if and only if this +// router supports the /pk namespace. Otherwise, it returns routing.ErrNotFound. func (lvs *LimitedValueStore) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { for _, ns := range lvs.Namespaces { if ns == "pk" { @@ -26,7 +27,8 @@ func (lvs *LimitedValueStore) GetPublicKey(ctx context.Context, p peer.ID) (ci.P return nil, routing.ErrNotFound } -// PutValue returns ErrNotSupported +// PutValue puts the given key in the underlying value store if the namespace +// is supported. Otherwise, it returns routing.ErrNotSupported. func (lvs *LimitedValueStore) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { if !lvs.KeySupported(key) { return routing.ErrNotSupported @@ -51,7 +53,8 @@ func (lvs *LimitedValueStore) KeySupported(key string) bool { return false } -// GetValue returns routing.ErrNotFound if key isn't supported +// GetValue retrieves the given key from the underlying value store if the namespace +// is supported. Otherwise, it returns routing.ErrNotFound. func (lvs *LimitedValueStore) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { if !lvs.KeySupported(key) { return nil, routing.ErrNotFound @@ -59,8 +62,10 @@ func (lvs *LimitedValueStore) GetValue(ctx context.Context, key string, opts ... return lvs.ValueStore.GetValue(ctx, key, opts...) } -// SearchValue returns empty channel if key isn't supported or calls SearchValue -// on the underlying ValueStore +// SearchValue searches the underlying value store for the given key if the +// namespace is supported, returning results in monotonically increasing +// "freshness". Otherwise, it returns an empty, closed channel to indicate that +// the value wasn't found. func (lvs *LimitedValueStore) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { if !lvs.KeySupported(key) { out := make(chan []byte) @@ -70,6 +75,8 @@ func (lvs *LimitedValueStore) SearchValue(ctx context.Context, key string, opts return lvs.ValueStore.SearchValue(ctx, key, opts...) } +// Bootstrap signals the underlying value store to get into the "bootstrapped" +// state, if it implements the Bootstrap interface. func (lvs *LimitedValueStore) Bootstrap(ctx context.Context) error { if bs, ok := lvs.ValueStore.(Bootstrap); ok { return bs.Bootstrap(ctx) @@ -77,6 +84,8 @@ func (lvs *LimitedValueStore) Bootstrap(ctx context.Context) error { return nil } +// Close closest the underlying value store if it implements the io.Closer +// interface. func (lvs *LimitedValueStore) Close() error { if closer, ok := lvs.ValueStore.(io.Closer); ok { return closer.Close() diff --git a/parallel.go b/parallel.go index 14aef86..cd3ef7e 100644 --- a/parallel.go +++ b/parallel.go @@ -289,12 +289,17 @@ func (r Parallel) forKey(key string) Parallel { }) } +// PutValue puts the given key to all sub-routers in parallel. It succeeds as +// long as putting to at least one sub-router succeeds, but it waits for all +// puts to terminate. func (r Parallel) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { return r.forKey(key).put(func(ri routing.Routing) error { return ri.PutValue(ctx, key, value, opts...) }) } +// GetValue searches all sub-routers for the given key, returning the result +// from the first sub-router to complete the query. func (r Parallel) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { vInt, err := r.forKey(key).get(ctx, func(ri routing.Routing) (interface{}, error) { return ri.GetValue(ctx, key, opts...) @@ -303,6 +308,9 @@ func (r Parallel) GetValue(ctx context.Context, key string, opts ...routing.Opti return val, err } +// SearchValue searches all sub-routers for the given key in parallel, +// returning results in monotonically increasing "freshness" from all +// sub-routers. func (r Parallel) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { resCh, err := r.forKey(key).search(ctx, func(ri routing.Routing) (<-chan []byte, error) { return ri.SearchValue(ctx, key, opts...) @@ -342,6 +350,8 @@ func (r Parallel) SearchValue(ctx context.Context, key string, opts ...routing.O return valid, err } +// GetPublicKey retrieves the public key from all sub-routers in parallel, +// returning the first result. func (r Parallel) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { vInt, err := r. forKey(routing.KeyForPublicKey(p)). @@ -352,6 +362,8 @@ func (r Parallel) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error return val, err } +// FindPeer finds the given peer in all sub-routers in parallel, returning the +// first result. func (r Parallel) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { vInt, err := r.filter(func(ri routing.Routing) bool { return supportsPeer(ri) @@ -362,6 +374,13 @@ func (r Parallel) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error return pi, err } +// Provide announces that this peer provides the content in question to all +// sub-routers in parallel. Provide returns success as long as a single +// sub-router succeeds, but still waits for all sub-routers to finish before +// returning. +// +// If count > 0, it returns at most count providers. If count == 0, it returns +// an unbounded number of providers. func (r Parallel) Provide(ctx context.Context, c cid.Cid, local bool) error { return r.filter(func(ri routing.Routing) bool { return supportsContent(ri) @@ -370,6 +389,11 @@ func (r Parallel) Provide(ctx context.Context, c cid.Cid, local bool) error { }) } +// FindProvidersAsync searches all sub-routers in parallel for peers who are +// able to provide a given key. +// +// If count > 0, it returns at most count providers. If count == 0, it returns +// an unbounded number of providers. func (r Parallel) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { routers := r.filter(func(ri routing.Routing) bool { return supportsContent(ri) @@ -496,6 +520,7 @@ func fewProviders(ctx context.Context, out chan<- peer.AddrInfo, in []<-chan pee } } +// Bootstrap signals all the sub-routers to bootstrap. func (r Parallel) Bootstrap(ctx context.Context) error { var me multierror.Error for _, b := range r.Routers { @@ -506,6 +531,7 @@ func (r Parallel) Bootstrap(ctx context.Context) error { return me.ErrorOrNil() } +// Close closes all sub-routers that implement the io.Closer interface. func (r Parallel) Close() error { var me multierror.Error for _, router := range r.Routers { diff --git a/tiered.go b/tiered.go index b19cfdf..829be21 100644 --- a/tiered.go +++ b/tiered.go @@ -20,6 +20,9 @@ type Tiered struct { Validator record.Validator } +// PutValue puts the given key to all sub-routers in parallel. It succeeds as +// long as putting to at least one sub-router succeeds, but it waits for all +// puts to terminate. func (r Tiered) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { return Parallel{Routers: r.Routers}.PutValue(ctx, key, value, opts...) } @@ -49,6 +52,8 @@ func (r Tiered) get(ctx context.Context, do func(routing.Routing) (interface{}, } } +// GetValue sequentially searches each sub-router for the given key, returning +// the value from the first sub-router to complete the query. func (r Tiered) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { valInt, err := r.get(ctx, func(ri routing.Routing) (interface{}, error) { return ri.GetValue(ctx, key, opts...) @@ -57,10 +62,15 @@ func (r Tiered) GetValue(ctx context.Context, key string, opts ...routing.Option return val, err } +// SearchValue searches all sub-routers for the given key in parallel, +// returning results in monotonically increasing "freshness" from all +// sub-routers. func (r Tiered) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { return Parallel{Routers: r.Routers, Validator: r.Validator}.SearchValue(ctx, key, opts...) } +// GetPublicKey sequentially searches each sub-router for the the public key, +// returning the first result. func (r Tiered) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { vInt, err := r.get(ctx, func(ri routing.Routing) (interface{}, error) { return routing.GetPublicKey(ri, ctx, p) @@ -69,14 +79,25 @@ func (r Tiered) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) return val, err } +// Provide announces that this peer provides the content in question to all +// sub-routers in parallel. Provide returns success as long as a single +// sub-router succeeds, but still waits for all sub-routers to finish before +// returning. func (r Tiered) Provide(ctx context.Context, c cid.Cid, local bool) error { return Parallel{Routers: r.Routers}.Provide(ctx, c, local) } +// FindProvidersAsync searches all sub-routers in parallel for peers who are +// able to provide a given key. +// +// If count > 0, it returns at most count providers. If count == 0, it returns +// an unbounded number of providers. func (r Tiered) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { return Parallel{Routers: r.Routers}.FindProvidersAsync(ctx, c, count) } +// FindPeer sequentially searches for given peer using each sub-router, +// returning the first result. func (r Tiered) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { valInt, err := r.get(ctx, func(ri routing.Routing) (interface{}, error) { return ri.FindPeer(ctx, p) @@ -85,10 +106,12 @@ func (r Tiered) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) return val, err } +// Bootstrap signals all the sub-routers to bootstrap. func (r Tiered) Bootstrap(ctx context.Context) error { return Parallel{Routers: r.Routers}.Bootstrap(ctx) } +// Close closes all sub-routers that implement the io.Closer interface. func (r Tiered) Close() error { var me multierror.Error for _, router := range r.Routers {