Skip to content

Commit

Permalink
Bugfix. Dial all capabilities from same peer.
Browse files Browse the repository at this point in the history
Avoids deployment failures due to one capability encountering an
incompatible host.
  • Loading branch information
lthibault committed Jun 6, 2022
1 parent 7ed02a8 commit 53c1c68
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/procfs v0.7.3 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/urfave/cli/v2 v2.5.1
github.com/wetware/casm v0.0.0-20220526201835-0f6ca66923d5
github.com/wetware/casm v0.0.0-20220606184646-1f72896ced23
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/fx v1.17.1
go.uber.org/zap v1.21.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1156,8 +1156,8 @@ github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:Yko
github.com/warpfork/go-testmark v0.3.0/go.mod h1:jhEf8FVxd+F17juRubpmut64NEG6I2rgkUhlcqqXwE0=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/wetware/casm v0.0.0-20220526201835-0f6ca66923d5 h1:yRT6iHjOedGAtIEFwSdR/kcrGIfGMf+rzEMwMKg1cqo=
github.com/wetware/casm v0.0.0-20220526201835-0f6ca66923d5/go.mod h1:/hPIOdeX+Pnfn6PwHXvT3DTxMovLBY/4+diXleCY9/s=
github.com/wetware/casm v0.0.0-20220606184646-1f72896ced23 h1:KaMqgS7HB2YwMgKByt1vGBn3ow2Na2mN8iqrDyzEVd4=
github.com/wetware/casm v0.0.0-20220606184646-1f72896ced23/go.mod h1:/hPIOdeX+Pnfn6PwHXvT3DTxMovLBY/4+diXleCY9/s=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
Expand Down
56 changes: 35 additions & 21 deletions pkg/client/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"errors"
"fmt"

"capnproto.org/go/capnp/v3/rpc"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/lthibault/log"

"github.com/wetware/casm/pkg/boot"
"github.com/wetware/ww/pkg/ocap/cluster"
Expand All @@ -27,6 +27,7 @@ func (addr Addr) FindPeers(ctx context.Context, ns string, opt ...discovery.Opti
}

type Dialer struct {
Log log.Logger
Vat vat.Network
Boot discovery.Discoverer
}
Expand All @@ -42,45 +43,58 @@ func Dial(ctx context.Context, vat vat.Network, a Addr) (*Node, error) {

// Dial creates a client and connects it to a cluster.
func (d Dialer) Dial(ctx context.Context) (*Node, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

n := &Node{vat: d.Vat}

conn, err := d.join(ctx, pubsub.Capability)
if err != nil {
return nil, err
if d.Log == nil {
d.Log = log.New()
}
n.ps = pubsub.PubSub{Client: conn.Bootstrap(context.Background())}

conn, err = d.join(ctx, cluster.HostCapability)
if err != nil {
n.ps.Release()
return nil, err
}
n.host = cluster.Host{Client: conn.Bootstrap(context.Background())}
d.Log = d.Log.With(d.Vat)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

n.conn = conn
return n, nil
return d.join(ctx)
}

func (d Dialer) join(ctx context.Context, cap vat.Capability) (conn *rpc.Conn, err error) {
func (d Dialer) join(ctx context.Context) (n *Node, err error) {
var peers <-chan peer.AddrInfo
if peers, err = d.Boot.FindPeers(ctx, d.Vat.NS); err != nil {
return nil, fmt.Errorf("discover: %w", err)
}

for info := range peers {
conn, err = d.Vat.Connect(ctx, info, cap)
d.Log.WithField("peer_info", info).Debug("found peer")

n, err = d.dialCaps(ctx, info)
if err == nil {
break
}

d.Log.WithError(err).Debug("failed to connect to peer")
}

// no peers discovered?
if conn == nil && err == nil {
if n == nil && err == nil {
err = errors.New("bootstrap failed: no peers found")
}

return
}

func (d Dialer) dialCaps(ctx context.Context, info peer.AddrInfo) (*Node, error) {
psConn, err := d.Vat.Connect(ctx, info, pubsub.Capability)
if err != nil {
return nil, err
}

hostConn, err := d.Vat.Connect(ctx, info, cluster.HostCapability)
if err != nil {
return nil, err
}

return &Node{
vat: d.Vat,
conn: hostConn, // TODO: do we still need an rpc.Conn? Should we prefer one conn over the other?
ps: pubsub.PubSub{Client: psConn.Bootstrap(context.Background())},
host: cluster.Host{Client: hostConn.Bootstrap(context.Background())},
}, nil
}
16 changes: 8 additions & 8 deletions pkg/ocap/pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
capnp "capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/server"

"github.com/wetware/ww/internal/api/channel"
chan_api "github.com/wetware/ww/internal/api/channel"
api "github.com/wetware/ww/internal/api/pubsub"
)
Expand Down Expand Up @@ -66,22 +65,23 @@ func (t Topic) Publish(ctx context.Context, b []byte) error {
return err
}

func (t Topic) Subscribe(ctx context.Context, ch chan<- []byte) (release capnp.ReleaseFunc, err error) {
h := channel.Sender_ServerToClient(handler{
func (t Topic) Subscribe(ctx context.Context, ch chan<- []byte) (capnp.ReleaseFunc, error) {
h := chan_api.Sender_ServerToClient(handler{
ms: ch,
release: t.AddRef().Release,
}, &server.Policy{
MaxConcurrentCalls: cap(ch),
})

f, release := api.Topic(t).Subscribe(ctx, sender(h))
f, release := api.Topic(t).Subscribe(ctx, sender(h.AddRef()))
defer release()

if _, err = f.Struct(); err == nil {
release = h.Release
_, err := f.Struct()
if err != nil {
h.Release()
}

return
return h.Release, err
}

func (t Topic) Release() { t.Client.Release() }
Expand All @@ -106,7 +106,7 @@ func (h handler) Shutdown() {
h.release()
}

func (h handler) Send(ctx context.Context, call channel.Sender_send) error {
func (h handler) Send(ctx context.Context, call chan_api.Sender_send) error {
ptr, err := call.Args().Value()
if err != nil {
return err
Expand Down

0 comments on commit 53c1c68

Please sign in to comment.