Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Persistence Layer on top of PubSub #33

Merged
merged 28 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
43ebd02
removed bootstrapping functionality
aschmahmann May 12, 2019
f57d48c
start pubsub validator changes to implement LWW pubsub
aschmahmann May 22, 2019
e2a5fca
Merge branch 'master' into feat/persistence
aschmahmann May 31, 2019
ff89016
Implement persistence --temporary
aschmahmann Jun 14, 2019
90c980a
Added message protobuf that was missing from previous commit
aschmahmann Jun 21, 2019
3ae7b1f
Use changes from ongoing pubsub PRs
aschmahmann Jul 3, 2019
6fb9435
removed unused protobufs
aschmahmann Jul 3, 2019
f3f8dd4
records past EOL should fail
aschmahmann Jul 3, 2019
9e9d778
reorder imports
aschmahmann Jul 3, 2019
a24054d
Improved the get-latest protocol (protobufs for request and response,…
aschmahmann Aug 9, 2019
092d0e1
better context cancel in get-latest protocol
aschmahmann Aug 9, 2019
3bf24fa
restore bootstrapping ... for now
aschmahmann Aug 9, 2019
e9b0864
In get-latest tests wait a bit after connecting hosts so they have ti…
aschmahmann Aug 9, 2019
f85f2bc
Changed get-latest protocol to have responses with a status code.
aschmahmann Aug 9, 2019
9756263
get-latest responds with ERR message even when sender sends an incorr…
aschmahmann Aug 9, 2019
a4d4d6c
Removed ERR from protobuf and we just reset the stream when we encoun…
aschmahmann Aug 9, 2019
97ac549
protobuf Makefile supports spaces in path name
aschmahmann Aug 9, 2019
b0eeb77
fixed potential goroutine leak. switched order of protobuf fields.
aschmahmann Aug 11, 2019
4fbc97b
Small Makefile refactor
aschmahmann Aug 11, 2019
aa737d1
changed get-latest protocol to be called fetch. some refactoring
aschmahmann Aug 12, 2019
ded823d
renamed protocol files to match protocol rename
aschmahmann Aug 12, 2019
76d8fca
made function passed into the fetch protocol a typedef
aschmahmann Aug 14, 2019
1913974
renames in the fetch protobufs
aschmahmann Aug 15, 2019
fdbeaec
Makefile is more MSYS friendly but you still need a weird GOPATH
aschmahmann Aug 15, 2019
b303f91
Updated go.mod to use unreleased version of pubsub. Refactored Fetch …
aschmahmann Aug 16, 2019
0787cb5
rebroadcast initial delay using timer
aschmahmann Aug 16, 2019
41f6fb8
Added Error status code to Fetch protobuf. Currently unused.
aschmahmann Aug 16, 2019
981b38c
use go-libp2p-pubsub v0.1.1. Fix `Fetch` function to be a pointer rec…
aschmahmann Aug 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package namesys

import (
"context"
"errors"
"time"

ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
)

const FetchProtoID = protocol.ID("/libp2p/fetch/0.0.1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/libp2p/record-fetch/1 (or something). "fetch" is too generic (does it fetch blocks? keys? everything?). Users are going to try to use it to fetch IPLD blocks and be very confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I figured that it would just fetch anything (i.e. []byte), and if for some reason someone wanted to configure their nodes to accept /ipfs/bafyABC via fetch and return a block that's something that would be up to them to do. This would also make it easier for people to reuse this protocol within or on top of other protocols.

Is that too generic/confusing for people? Should we just restrict it to records (i.e. things with validators)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually really like this. +1


type fetchProtocol struct {
ctx context.Context
host host.Host
}

type getValue func(key string) ([]byte, error)

func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fetchProtocol {
p := &fetchProtocol{ctx, host}

host.SetStreamHandler(FetchProtoID, func(s network.Stream) {
p.receive(s, getData)
})

return p
}

func (p *fetchProtocol) receive(s network.Stream, getData getValue) {
defer helpers.FullClose(s)

msg := &pb.FetchRequest{}
if err := readMsg(p.ctx, s, msg); err != nil {
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err)
s.Reset()
return
}

response, err := getData(msg.Identifier)
var respProto pb.FetchResponse

if err != nil {
respProto = pb.FetchResponse{Status: pb.FetchResponse_NOT_FOUND}
} else {
respProto = pb.FetchResponse{Data: response}
}

if err := writeMsg(p.ctx, s, &respProto); err != nil {
return
}
}

func (p fetchProtocol) Get(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inconsistent receiver.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this is fixed/better now. Just called it Fetch.

peerCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

s, err := p.host.NewStream(peerCtx, pid, FetchProtoID)
if err != nil {
return nil, err
}
defer helpers.FullClose(s)

msg := &pb.FetchRequest{Identifier: key}

if err := writeMsg(ctx, s, msg); err != nil {
return nil, err
}
s.Close()

response := &pb.FetchResponse{}
if err := readMsg(ctx, s, response); err != nil {
return nil, err
}

switch response.Status {
case pb.FetchResponse_OK:
return response.Data, nil
case pb.FetchResponse_NOT_FOUND:
return nil, nil
default:
return nil, errors.New("get-latest: received unknown status code")
}
}

func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
done := make(chan error, 1)
go func() {
wc := ggio.NewDelimitedWriter(s)

if err := wc.WriteMsg(msg); err != nil {
done <- err
return
}

done <- nil
}()

var retErr error
select {
case retErr = <-done:
case <-ctx.Done():
retErr = ctx.Err()
}

if retErr != nil {
s.Reset()
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr)
}
return retErr
}

func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
done := make(chan error, 1)
go func() {
r := ggio.NewDelimitedReader(s, 1<<20)
if err := r.ReadMsg(msg); err != nil {
done <- err
return
}
done <- nil
}()

select {
case err := <-done:
return err
case <-ctx.Done():
s.Reset()
return ctx.Err()
}
}
109 changes: 109 additions & 0 deletions fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package namesys

import (
"bytes"
"context"
"errors"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/host"
)

func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}

type datastore struct {
data map[string][]byte
}

func (d *datastore) Lookup(key string) ([]byte, error) {
v, ok := d.data[key]
if !ok {
return nil, errors.New("key not found")
}
return v, nil
}

func TestFetchProtocolTrip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)

d2 := &datastore{map[string][]byte{"key": []byte("value2")}}
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)

fetchCheck(t, ctx, h1, h2, "key", []byte("value2"))
fetchCheck(t, ctx, h2, h1, "key", []byte("value1"))
}

func TestFetchProtocolNotFound(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)

d2 := &datastore{make(map[string][]byte)}
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)

fetchCheck(t, ctx, h1, h2, "key", nil)
fetchCheck(t, ctx, h2, h1, "key", []byte("value1"))
}

func TestFetchProtocolRepeated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)

d2 := &datastore{make(map[string][]byte)}
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)

for i := 0; i < 10; i++ {
fetchCheck(t, ctx, h1, h2, "key", nil)
fetchCheck(t, ctx, h2, h1, "key", []byte("value1"))
}
}

func fetchCheck(t *testing.T, ctx context.Context,
requester *fetchProtocol, responder *fetchProtocol, key string, expected []byte) {
data, err := requester.Get(ctx, responder.host.ID(), key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(data, expected) {
t.Fatalf("expected: %v, received: %v", string(expected), string(data))
}

if (data == nil && expected != nil) || (data != nil && expected == nil) {
t.Fatalf("expected []byte{} or nil and received the opposite")
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/libp2p/go-libp2p-pubsub-router

require (
github.com/gogo/protobuf v1.2.1
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-ds-help v0.0.1
Expand All @@ -13,3 +14,5 @@ require (
github.com/libp2p/go-libp2p-routing-helpers v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
)

replace github.com/libp2p/go-libp2p-pubsub v0.1.0 => github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now uses a version of libp2p/pubsub from master. However, there hasn't been a release yet so it's referencing v0.1.1-0.20190807100218-9f04364996b4

5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd h1:P1//443gNhGLDpMpzwnaNDgKoHpXrlt+iIEVgwlTHoI=
github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd/go.mod h1:ekhyliBSJ0aBg62+j9rECrxW+UUPAj1SLS3jN7JMAGc=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
Expand Down Expand Up @@ -86,6 +88,7 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand All @@ -95,8 +98,6 @@ github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUje
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU=
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc=
github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q=
github.com/libp2p/go-libp2p-routing-helpers v0.1.0 h1:BaFvpyv8TyhCN7TihawTiKuzeu8/Pyw7ZnMA4IvqIN8=
Expand Down
11 changes: 11 additions & 0 deletions pb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)

all: $(GO)

%.pb.go: %.proto
protoc --proto_path="$(GOPATH)/src" --proto_path="." --gogofast_out=. $<

clean:
rm -f *.pb.go
rm -f *.go
Loading