Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Persistence Layer on top of PubSub #33

Merged
merged 28 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
43ebd02
removed bootstrapping functionality
aschmahmann May 12, 2019
f57d48c
start pubsub validator changes to implement LWW pubsub
aschmahmann May 22, 2019
e2a5fca
Merge branch 'master' into feat/persistence
aschmahmann May 31, 2019
ff89016
Implement persistence --temporary
aschmahmann Jun 14, 2019
90c980a
Added message protobuf that was missing from previous commit
aschmahmann Jun 21, 2019
3ae7b1f
Use changes from ongoing pubsub PRs
aschmahmann Jul 3, 2019
6fb9435
removed unused protobufs
aschmahmann Jul 3, 2019
f3f8dd4
records past EOL should fail
aschmahmann Jul 3, 2019
9e9d778
reorder imports
aschmahmann Jul 3, 2019
a24054d
Improved the get-latest protocol (protobufs for request and response,…
aschmahmann Aug 9, 2019
092d0e1
better context cancel in get-latest protocol
aschmahmann Aug 9, 2019
3bf24fa
restore bootstrapping ... for now
aschmahmann Aug 9, 2019
e9b0864
In get-latest tests wait a bit after connecting hosts so they have ti…
aschmahmann Aug 9, 2019
f85f2bc
Changed get-latest protocol to have responses with a status code.
aschmahmann Aug 9, 2019
9756263
get-latest responds with ERR message even when sender sends an incorr…
aschmahmann Aug 9, 2019
a4d4d6c
Removed ERR from protobuf and we just reset the stream when we encoun…
aschmahmann Aug 9, 2019
97ac549
protobuf Makefile supports spaces in path name
aschmahmann Aug 9, 2019
b0eeb77
fixed potential goroutine leak. switched order of protobuf fields.
aschmahmann Aug 11, 2019
4fbc97b
Small Makefile refactor
aschmahmann Aug 11, 2019
aa737d1
changed get-latest protocol to be called fetch. some refactoring
aschmahmann Aug 12, 2019
ded823d
renamed protocol files to match protocol rename
aschmahmann Aug 12, 2019
76d8fca
made function passed into the fetch protocol a typedef
aschmahmann Aug 14, 2019
1913974
renames in the fetch protobufs
aschmahmann Aug 15, 2019
fdbeaec
Makefile is more MSYS friendly but you still need a weird GOPATH
aschmahmann Aug 15, 2019
b303f91
Updated go.mod to use unreleased version of pubsub. Refactored Fetch …
aschmahmann Aug 16, 2019
0787cb5
rebroadcast initial delay using timer
aschmahmann Aug 16, 2019
41f6fb8
Added Error status code to Fetch protobuf. Currently unused.
aschmahmann Aug 16, 2019
981b38c
use go-libp2p-pubsub v0.1.1. Fix `Fetch` function to be a pointer rec…
aschmahmann Aug 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions getlatest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package namesys

import (
"bufio"
"context"
"io"
"time"

ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
)

type getLatestProtocol struct {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
host host.Host
}

func newGetLatestProtocol(host host.Host, getLocal func(key string) ([]byte, error)) *getLatestProtocol {
p := &getLatestProtocol{host}

host.SetStreamHandler(PSGetLatestProto, func(s network.Stream) {
p.Receive(s, getLocal)
})

return p
}

func (p *getLatestProtocol) Receive(s network.Stream, getLocal func(key string) ([]byte, error)) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
r := ggio.NewDelimitedReader(s, 1<<20)
msg := &pb.RequestLatest{}
if err := r.ReadMsg(msg); err != nil {
if err != io.EOF {
s.Reset()
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err)
} else {
// Just be nice. They probably won't read this
// but it doesn't hurt to send it.
s.Close()
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
return
}

response, err := getLocal(*msg.Identifier)
var respProto pb.RespondLatest

if err != nil || response == nil {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
nodata := true
respProto = pb.RespondLatest{Nodata: &nodata}
} else {
respProto = pb.RespondLatest{Data: response}
}

if err := writeBytes(s, &respProto); err != nil {
s.Reset()
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), err)
return
}
helpers.FullClose(s)
}

func (p getLatestProtocol) Send(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
peerCtx, _ := context.WithTimeout(ctx, time.Second*10)
s, err := p.host.NewStream(peerCtx, pid, PSGetLatestProto)
if err != nil {
return nil, err
}

if err := s.SetDeadline(time.Now().Add(time.Second * 5)); err != nil {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

defer helpers.FullClose(s)

msg := pb.RequestLatest{Identifier: &key}

if err := writeBytes(s, &msg); err != nil {
s.Reset()
return nil, err
}

s.Close()

r := ggio.NewDelimitedReader(s, 1<<20)
response := &pb.RespondLatest{}
if err := r.ReadMsg(response); err != nil {
return nil, err
}

return response.Data, nil
}

func writeBytes(w io.Writer, msg proto.Message) error {
bufw := bufio.NewWriter(w)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
wc := ggio.NewDelimitedWriter(bufw)

if err := wc.WriteMsg(msg); err != nil {
return err
}

return bufw.Flush()
}
99 changes: 99 additions & 0 deletions getlatest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package namesys

import (
"bytes"
"context"
"errors"
"testing"

"github.com/libp2p/go-libp2p-core/host"
)

func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}

type datastore struct {
data map[string][]byte
}

func (d *datastore) Lookup(key string) ([]byte, error) {
v, ok := d.data[key]
if !ok {
return nil, errors.New("key not found")
}
return v, nil
}

func TestGetLatestProtocolTrip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newGetLatestProtocol(hosts[0], d1.Lookup)

d2 := &datastore{map[string][]byte{"key": []byte("value2")}}
h2 := newGetLatestProtocol(hosts[1], d2.Lookup)

getLatest(t, ctx, h1, h2, "key", []byte("value2"))
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
}

func TestGetLatestProtocolNil(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newGetLatestProtocol(hosts[0], d1.Lookup)

d2 := &datastore{make(map[string][]byte)}
h2 := newGetLatestProtocol(hosts[1], d2.Lookup)

getLatest(t, ctx, h1, h2, "key", nil)
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
}

func TestGetLatestProtocolRepeated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])

d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newGetLatestProtocol(hosts[0], d1.Lookup)

d2 := &datastore{make(map[string][]byte)}
h2 := newGetLatestProtocol(hosts[1], d2.Lookup)

for i := 0; i < 10; i++ {
getLatest(t, ctx, h1, h2, "key", nil)
getLatest(t, ctx, h2, h1, "key", []byte("value1"))
}
}

func getLatest(t *testing.T, ctx context.Context,
requester *getLatestProtocol, responder *getLatestProtocol, key string, expected []byte) {
data, err := requester.Send(ctx, responder.host.ID(), key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(data, expected) {
t.Fatalf("expected: %v, received: %v", string(expected), string(data))
}

if (data == nil && expected != nil) || (data != nil && expected == nil) {
t.Fatalf("expected []byte{} or nil and received the opposite")
}
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
module github.com/libp2p/go-libp2p-pubsub-router

require (
github.com/ipfs/go-cid v0.0.2
github.com/gogo/protobuf v1.2.1
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-libp2p-blankhost v0.1.1
github.com/libp2p/go-libp2p-core v0.0.1
Expand All @@ -13,3 +12,5 @@ require (
github.com/libp2p/go-libp2p-routing-helpers v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
)

replace github.com/libp2p/go-libp2p-pubsub v0.1.0 => github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now uses a version of libp2p/pubsub from master. However, there hasn't been a release yet so it's referencing v0.1.1-0.20190807100218-9f04364996b4

5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd h1:P1//443gNhGLDpMpzwnaNDgKoHpXrlt+iIEVgwlTHoI=
github.com/aschmahmann/go-libp2p-pubsub v0.0.4-0.20190807152749-d7996289bbcd/go.mod h1:ekhyliBSJ0aBg62+j9rECrxW+UUPAj1SLS3jN7JMAGc=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
Expand Down Expand Up @@ -86,6 +88,7 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand All @@ -95,8 +98,6 @@ github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUje
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU=
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc=
github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q=
github.com/libp2p/go-libp2p-routing-helpers v0.1.0 h1:BaFvpyv8TyhCN7TihawTiKuzeu8/Pyw7ZnMA4IvqIN8=
Expand Down
11 changes: 11 additions & 0 deletions pb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)

all: $(GO)

%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<

clean:
rm -f *.pb.go
rm -f *.go
Loading