Skip to content

Commit

Permalink
Fix race condition with rust x go
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Dec 30, 2022
1 parent 8dabfb6 commit 6644184
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 7 deletions.
23 changes: 21 additions & 2 deletions multidim-interop/go/v0.22/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"golang.org/x/sync/errgroup"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
corenetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand Down Expand Up @@ -119,6 +121,12 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Record our listen addrs.
runenv.RecordMessage("my listen addrs: %v", host.Addrs())

// Subscribe to connectedness events.
connectedEvtBus, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return err
}

// Obtain our own address info, and use the sync service to publish it to a
// 'peersTopic' topic, where others will read from.
var (
Expand Down Expand Up @@ -202,14 +210,25 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
if ai.ID == hostId {
break
}
runenv.RecordMessage("Dial peer: %s", ai.ID)
runenv.RecordMessage("Dial peer: %s on %s", ai.ID, ai.Addrs)
if err := host.Connect(ctx, *ai); err != nil {
return err
}
}

runenv.RecordMessage("done dialling my peers")

// Wait for a connection to all peers
connectedPeers := 0
for e := range connectedEvtBus.Out() {
if e.(event.EvtPeerConnectednessChanged).Connectedness == corenetwork.Connected {
connectedPeers++
}
if connectedPeers == runenv.TestInstanceCount-1 {
break
}
}

// Wait for all peers to signal that they're done with the connection phase.
initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount)

Expand All @@ -232,7 +251,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Let's initialize the random seed to the current timestamp + our global sequence number.
// Otherwise all instances will end up generating the same "random" latencies 🤦‍
rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq)
iterations := 3
iterations := 1
maxLatencyMs := 100

for i := 1; i <= iterations; i++ {
Expand Down
23 changes: 21 additions & 2 deletions multidim-interop/go/v0.23/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"golang.org/x/sync/errgroup"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
corenetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand Down Expand Up @@ -123,6 +125,12 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Record our listen addrs.
runenv.RecordMessage("my listen addrs: %v", host.Addrs())

// Subscribe to connectedness events.
connectedEvtBus, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return err
}

// Obtain our own address info, and use the sync service to publish it to a
// 'peersTopic' topic, where others will read from.
var (
Expand Down Expand Up @@ -206,14 +214,25 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
if ai.ID == hostId {
break
}
runenv.RecordMessage("Dial peer: %s", ai.ID)
runenv.RecordMessage("Dial peer: %s on %s", ai.ID, ai.Addrs)
if err := host.Connect(ctx, *ai); err != nil {
return err
}
}

runenv.RecordMessage("done dialling my peers")

// Wait for a connection to all peers
connectedPeers := 0
for e := range connectedEvtBus.Out() {
if e.(event.EvtPeerConnectednessChanged).Connectedness == corenetwork.Connected {
connectedPeers++
}
if connectedPeers == runenv.TestInstanceCount-1 {
break
}
}

// Wait for all peers to signal that they're done with the connection phase.
initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount)

Expand All @@ -236,7 +255,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Let's initialize the random seed to the current timestamp + our global sequence number.
// Otherwise all instances will end up generating the same "random" latencies 🤦‍
rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq)
iterations := 3
iterations := 1
maxLatencyMs := 100

for i := 1; i <= iterations; i++ {
Expand Down
23 changes: 21 additions & 2 deletions multidim-interop/go/v0.24/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"golang.org/x/sync/errgroup"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
corenetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand Down Expand Up @@ -126,6 +128,12 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Record our listen addrs.
runenv.RecordMessage("my listen addrs: %v", host.Addrs())

// Subscribe to connectedness events.
connectedEvtBus, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return err
}

// Obtain our own address info, and use the sync service to publish it to a
// 'peersTopic' topic, where others will read from.
var (
Expand Down Expand Up @@ -209,14 +217,25 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
if ai.ID == hostId {
break
}
runenv.RecordMessage("Dial peer: %s", ai.ID)
runenv.RecordMessage("Dial peer: %s on %s", ai.ID, ai.Addrs)
if err := host.Connect(ctx, *ai); err != nil {
return err
}
}

runenv.RecordMessage("done dialling my peers")

// Wait for a connection to all peers
connectedPeers := 0
for e := range connectedEvtBus.Out() {
if e.(event.EvtPeerConnectednessChanged).Connectedness == corenetwork.Connected {
connectedPeers++
}
if connectedPeers == runenv.TestInstanceCount-1 {
break
}
}

// Wait for all peers to signal that they're done with the connection phase.
initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount)

Expand All @@ -239,7 +258,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// Let's initialize the random seed to the current timestamp + our global sequence number.
// Otherwise all instances will end up generating the same "random" latencies 🤦‍
rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq)
iterations := 3
iterations := 1
maxLatencyMs := 100

for i := 1; i <= iterations; i++ {
Expand Down
2 changes: 1 addition & 1 deletion multidim-interop/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where

ping(&client, &mut swarm, "initial".to_string()).await?;

let iterations: usize = 3;
let iterations: usize = 1;
let max_latency_ms: u64 = 100;

for i in 1..iterations + 1 {
Expand Down

0 comments on commit 6644184

Please sign in to comment.