-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement rendezvous protocol spec #1
Open
vyzo
wants to merge
48
commits into
master
Choose a base branch
from
implement-spec
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
48 commits
Select commit
Hold shift + click to select a range
f946163
rendezvous protobuf
vyzo 0cbcbf6
client implementation
vyzo f94b0b4
update client interface
vyzo 268abf3
more fine-grained rendezvous api
vyzo b7bc940
add error notification for background register/discover
vyzo a107e34
interface ergonomics
vyzo 13f4c67
add E_INVALID_TTL to rendezvous.proto
vyzo e3d343f
include namespace in error logs
vyzo 1506c04
simplify Rendezvous interface
vyzo 7e5664c
use discriminated registration errors
vyzo 6a1176f
annotate registration error
vyzo b7c304d
update protobuf
vyzo aeac2e2
update for response error changes in the protocol
vyzo e5a72b9
rendezvous interface should expose full registration information
vyzo 7d72fc7
service implementation
vyzo dbe6b0d
rendezvous service sync: hook for federation
vyzo 6c1d282
Registration records should have an actual peer ID
vyzo 8181424
client: use larger batch in discovery, only poll immediately if full
vyzo fbaf21c
client: add TODO comment for adaptive backoff
vyzo 4e3eaa7
refactor service constructor into two parts
vyzo aa3f46c
user parameter for ttl in Register
vyzo c703d37
database logic implementation
vyzo ae10cc6
implement binary packing details
vyzo 8c12272
better logging for service i/o
vyzo 53dfbc7
test address and cookie packing
vyzo f41fbba
test db functionality
vyzo cfbcdde
up MaxRegistrations to 1k
vyzo 4788ef7
test db functionality with multiple namespaces
vyzo a47367d
basic service test
vyzo 2b0995f
test service errors
vyzo 9ab12ab
make db nonce 32 bytes
vyzo 6c4fda5
test client specific functionality
vyzo e530204
use randomized exponential backoff in error retry for persistent clie…
vyzo aa7f9da
client: add TODO for robust discovery error recovery
vyzo c487c20
refactor database interface and implementation into db subpackage
vyzo baf1e4e
don't leak database error details in internal errors
vyzo c540724
two interfaces for client-side: RendezvousPoint and RendezvousClient
vyzo 1ee2b55
update protobuf
vyzo 8846a4b
include ttl in registration response
vyzo 3c726d2
update gx deps
vyzo f2ee9b3
expose counter in register interface
vyzo 2843bd3
update tests
vyzo 91cdb88
Switched from gx to go mod and started using go-libp2p-core interfaces
aschmahmann 7901280
Add stateful discovery client
aschmahmann 9052b53
RendezvousPoint and RendezvousClient now return the server's TTL on R…
aschmahmann 25d0082
fixed compile error from previous commit + code refactoring
aschmahmann 0e771cd
replaced sync.Map with map + RW mutex. small refactors
aschmahmann 7371441
Merge pull request #3 from aschmahmann/feat/add-discovery-client
vyzo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,311 @@ | ||
package rendezvous | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"time" | ||
|
||
pb "github.com/libp2p/go-libp2p-rendezvous/pb" | ||
|
||
ggio "github.com/gogo/protobuf/io" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
inet "github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
var ( | ||
DiscoverAsyncInterval = 2 * time.Minute | ||
) | ||
|
||
type RendezvousPoint interface { | ||
Register(ctx context.Context, ns string, ttl int) (time.Duration, error) | ||
Unregister(ctx context.Context, ns string) error | ||
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) | ||
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) | ||
} | ||
|
||
type Registration struct { | ||
Peer peer.AddrInfo | ||
Ns string | ||
Ttl int | ||
} | ||
|
||
type RendezvousClient interface { | ||
Register(ctx context.Context, ns string, ttl int) (time.Duration, error) | ||
Unregister(ctx context.Context, ns string) error | ||
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error) | ||
DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) | ||
} | ||
|
||
func NewRendezvousPoint(host host.Host, p peer.ID) RendezvousPoint { | ||
return &rendezvousPoint{ | ||
host: host, | ||
p: p, | ||
} | ||
} | ||
|
||
type rendezvousPoint struct { | ||
host host.Host | ||
p peer.ID | ||
} | ||
|
||
func NewRendezvousClient(host host.Host, rp peer.ID) RendezvousClient { | ||
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp)) | ||
} | ||
|
||
func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient { | ||
return &rendezvousClient{rp: rp} | ||
} | ||
|
||
type rendezvousClient struct { | ||
rp RendezvousPoint | ||
} | ||
|
||
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) { | ||
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) | ||
if err != nil { | ||
return 0, err | ||
} | ||
defer s.Close() | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
req := newRegisterMessage(ns, peer.AddrInfo{ID: rp.host.ID(), Addrs: rp.host.Addrs()}, ttl) | ||
err = w.WriteMsg(req) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
var res pb.Message | ||
err = r.ReadMsg(&res) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
if res.GetType() != pb.Message_REGISTER_RESPONSE { | ||
return 0, fmt.Errorf("Unexpected response: %s", res.GetType().String()) | ||
} | ||
|
||
response := res.GetRegisterResponse() | ||
status := response.GetStatus() | ||
if status != pb.Message_OK { | ||
return 0, RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()} | ||
} | ||
|
||
return time.Duration(*response.Ttl) * time.Second, nil | ||
} | ||
|
||
func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) { | ||
if ttl < 120 { | ||
return 0, fmt.Errorf("registration TTL is too short") | ||
} | ||
|
||
returnedTTL, err := rc.rp.Register(ctx, ns, ttl) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
go registerRefresh(ctx, rc.rp, ns, ttl) | ||
return returnedTTL, nil | ||
} | ||
|
||
func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int) { | ||
var refresh time.Duration | ||
errcount := 0 | ||
|
||
for { | ||
if errcount > 0 { | ||
// do randomized exponential backoff, up to ~4 hours | ||
if errcount > 7 { | ||
errcount = 7 | ||
} | ||
backoff := 2 << uint(errcount) | ||
refresh = 5*time.Minute + time.Duration(rand.Intn(backoff*60000))*time.Millisecond | ||
} else { | ||
refresh = time.Duration(ttl-30) * time.Second | ||
} | ||
|
||
select { | ||
case <-time.After(refresh): | ||
case <-ctx.Done(): | ||
return | ||
} | ||
|
||
_, err := rz.Register(ctx, ns, ttl) | ||
if err != nil { | ||
log.Errorf("Error registering [%s]: %s", ns, err.Error()) | ||
errcount++ | ||
} else { | ||
errcount = 0 | ||
} | ||
} | ||
} | ||
|
||
func (rp *rendezvousPoint) Unregister(ctx context.Context, ns string) error { | ||
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) | ||
if err != nil { | ||
return err | ||
} | ||
defer s.Close() | ||
|
||
w := ggio.NewDelimitedWriter(s) | ||
req := newUnregisterMessage(ns, rp.host.ID()) | ||
return w.WriteMsg(req) | ||
} | ||
|
||
func (rc *rendezvousClient) Unregister(ctx context.Context, ns string) error { | ||
return rc.rp.Unregister(ctx, ns) | ||
} | ||
|
||
func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) { | ||
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
defer s.Close() | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
return discoverQuery(ns, limit, cookie, r, w) | ||
} | ||
|
||
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) { | ||
|
||
req := newDiscoverMessage(ns, limit, cookie) | ||
err := w.WriteMsg(req) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
var res pb.Message | ||
err = r.ReadMsg(&res) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
if res.GetType() != pb.Message_DISCOVER_RESPONSE { | ||
return nil, nil, fmt.Errorf("Unexpected response: %s", res.GetType().String()) | ||
} | ||
|
||
status := res.GetDiscoverResponse().GetStatus() | ||
if status != pb.Message_OK { | ||
return nil, nil, RendezvousError{Status: status, Text: res.GetDiscoverResponse().GetStatusText()} | ||
} | ||
|
||
regs := res.GetDiscoverResponse().GetRegistrations() | ||
result := make([]Registration, 0, len(regs)) | ||
for _, reg := range regs { | ||
pi, err := pbToPeerInfo(reg.GetPeer()) | ||
if err != nil { | ||
log.Errorf("Invalid peer info: %s", err.Error()) | ||
continue | ||
} | ||
result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())}) | ||
} | ||
|
||
return result, res.GetDiscoverResponse().GetCookie(), nil | ||
} | ||
|
||
func (rp *rendezvousPoint) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) { | ||
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ch := make(chan Registration) | ||
go discoverAsync(ctx, ns, s, ch) | ||
return ch, nil | ||
} | ||
|
||
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) { | ||
defer s.Close() | ||
defer close(ch) | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
const batch = 200 | ||
|
||
var ( | ||
cookie []byte | ||
regs []Registration | ||
err error | ||
) | ||
|
||
for { | ||
regs, cookie, err = discoverQuery(ns, batch, cookie, r, w) | ||
if err != nil { | ||
// TODO robust error recovery | ||
// - handle closed streams with backoff + new stream, preserving the cookie | ||
// - handle E_INVALID_COOKIE errors in that case to restart the discovery | ||
log.Errorf("Error in discovery [%s]: %s", ns, err.Error()) | ||
return | ||
} | ||
|
||
for _, reg := range regs { | ||
select { | ||
case ch <- reg: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
if len(regs) < batch { | ||
// TODO adaptive backoff for heavily loaded rendezvous points | ||
select { | ||
case <-time.After(DiscoverAsyncInterval): | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (rc *rendezvousClient) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error) { | ||
regs, cookie, err := rc.rp.Discover(ctx, ns, limit, cookie) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
pinfos := make([]peer.AddrInfo, len(regs)) | ||
for i, reg := range regs { | ||
pinfos[i] = reg.Peer | ||
} | ||
|
||
return pinfos, cookie, nil | ||
} | ||
|
||
func (rc *rendezvousClient) DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) { | ||
rch, err := rc.rp.DiscoverAsync(ctx, ns) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ch := make(chan peer.AddrInfo) | ||
go discoverPeersAsync(ctx, rch, ch) | ||
return ch, nil | ||
} | ||
|
||
func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan peer.AddrInfo) { | ||
defer close(ch) | ||
for { | ||
select { | ||
case reg, ok := <-rch: | ||
if !ok { | ||
return | ||
} | ||
|
||
select { | ||
case ch <- reg.Peer: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the service respond something like "load too high, try again in a bit" ? Seems like a nice DoS mitigation feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if a load indicator would help, as the clients can simply ignore it.
I am in favour of load inference from the rate of registration responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, we could make this an Error response --
E_TEMPORARILY_UNAVAILABLE
perhaps.Now that could help, as it would force the clients to back off and possibly create a new stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was thinking have it be an error response. So they don't get any records back, and the error tells them to just wait and try again in a bit