From 102f01fcec39ce6c634bbf961f9b8a102e74a4dd Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 30 Dec 2022 13:57:24 -0800 Subject: [PATCH] Fix race condition with rust x go --- multidim-interop/go/v0.22/main.go | 23 +++++++++++++++++++++-- multidim-interop/go/v0.23/main.go | 23 +++++++++++++++++++++-- multidim-interop/go/v0.24/main.go | 23 +++++++++++++++++++++-- multidim-interop/rust/src/lib.rs | 2 +- 4 files changed, 64 insertions(+), 7 deletions(-) diff --git a/multidim-interop/go/v0.22/main.go b/multidim-interop/go/v0.22/main.go index 7c3d8e95f..11e89e156 100644 --- a/multidim-interop/go/v0.22/main.go +++ b/multidim-interop/go/v0.22/main.go @@ -9,6 +9,8 @@ import ( "golang.org/x/sync/errgroup" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/event" + corenetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/muxer/mplex" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" @@ -119,6 +121,12 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { // Record our listen addrs. runenv.RecordMessage("my listen addrs: %v", host.Addrs()) + // Subscribe to connectedness events. + connectedEvtBus, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + return err + } + // Obtain our own address info, and use the sync service to publish it to a // 'peersTopic' topic, where others will read from. var ( @@ -202,7 +210,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { if ai.ID == hostId { break } - runenv.RecordMessage("Dial peer: %s", ai.ID) + runenv.RecordMessage("Dial peer: %s on %s", ai.ID, ai.Addrs) if err := host.Connect(ctx, *ai); err != nil { return err } @@ -210,6 +218,17 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { runenv.RecordMessage("done dialling my peers") + // Wait for a connection to all peers + connectedPeers := 0 + for e := range connectedEvtBus.Out() { + if e.(event.EvtPeerConnectednessChanged).Connectedness == corenetwork.Connected { + connectedPeers++ + } + if connectedPeers == runenv.TestInstanceCount-1 { + break + } + } + // Wait for all peers to signal that they're done with the connection phase. initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount) @@ -232,7 +251,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { // Let's initialize the random seed to the current timestamp + our global sequence number. // Otherwise all instances will end up generating the same "random" latencies 🤦‍ rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq) - iterations := 3 + iterations := 1 maxLatencyMs := 100 for i := 1; i <= iterations; i++ { diff --git a/multidim-interop/go/v0.23/main.go b/multidim-interop/go/v0.23/main.go index 8f916bbdc..0e4a9d0f4 100644 --- a/multidim-interop/go/v0.23/main.go +++ b/multidim-interop/go/v0.23/main.go @@ -9,6 +9,8 @@ import ( "golang.org/x/sync/errgroup" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/event" + corenetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/muxer/mplex" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" @@ -123,6 +125,12 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { // Record our listen addrs. runenv.RecordMessage("my listen addrs: %v", host.Addrs()) + // Subscribe to connectedness events. + connectedEvtBus, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + return err + } + // Obtain our own address info, and use the sync service to publish it to a // 'peersTopic' topic, where others will read from. var ( @@ -206,7 +214,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { if ai.ID == hostId { break } - runenv.RecordMessage("Dial peer: %s", ai.ID) + runenv.RecordMessage("Dial peer: %s on %s", ai.ID, ai.Addrs) if err := host.Connect(ctx, *ai); err != nil { return err } @@ -214,6 +222,17 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { runenv.RecordMessage("done dialling my peers") + // Wait for a connection to all peers + connectedPeers := 0 + for e := range connectedEvtBus.Out() { + if e.(event.EvtPeerConnectednessChanged).Connectedness == corenetwork.Connected { + connectedPeers++ + } + if connectedPeers == runenv.TestInstanceCount-1 { + break + } + } + // Wait for all peers to signal that they're done with the connection phase. initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount) @@ -236,7 +255,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { // Let's initialize the random seed to the current timestamp + our global sequence number. // Otherwise all instances will end up generating the same "random" latencies 🤦‍ rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq) - iterations := 3 + iterations := 1 maxLatencyMs := 100 for i := 1; i <= iterations; i++ { diff --git a/multidim-interop/go/v0.24/main.go b/multidim-interop/go/v0.24/main.go index be2d7c179..29b32171f 100644 --- a/multidim-interop/go/v0.24/main.go +++ b/multidim-interop/go/v0.24/main.go @@ -9,6 +9,8 @@ import ( "golang.org/x/sync/errgroup" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/event" + corenetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/muxer/mplex" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" @@ -126,6 +128,12 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { // Record our listen addrs. runenv.RecordMessage("my listen addrs: %v", host.Addrs()) + // Subscribe to connectedness events. + connectedEvtBus, err := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + return err + } + // Obtain our own address info, and use the sync service to publish it to a // 'peersTopic' topic, where others will read from. var ( @@ -209,7 +217,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { if ai.ID == hostId { break } - runenv.RecordMessage("Dial peer: %s", ai.ID) + runenv.RecordMessage("Dial peer: %s on %s", ai.ID, ai.Addrs) if err := host.Connect(ctx, *ai); err != nil { return err } @@ -217,6 +225,17 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { runenv.RecordMessage("done dialling my peers") + // Wait for a connection to all peers + connectedPeers := 0 + for e := range connectedEvtBus.Out() { + if e.(event.EvtPeerConnectednessChanged).Connectedness == corenetwork.Connected { + connectedPeers++ + } + if connectedPeers == runenv.TestInstanceCount-1 { + break + } + } + // Wait for all peers to signal that they're done with the connection phase. initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount) @@ -239,7 +258,7 @@ func runInterop(runenv *runtime.RunEnv, initCtx *run.InitContext) error { // Let's initialize the random seed to the current timestamp + our global sequence number. // Otherwise all instances will end up generating the same "random" latencies 🤦‍ rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq) - iterations := 3 + iterations := 1 maxLatencyMs := 100 for i := 1; i <= iterations; i++ { diff --git a/multidim-interop/rust/src/lib.rs b/multidim-interop/rust/src/lib.rs index 03d5cc74e..aa7ee4cdd 100644 --- a/multidim-interop/rust/src/lib.rs +++ b/multidim-interop/rust/src/lib.rs @@ -114,7 +114,7 @@ where ping(&client, &mut swarm, "initial".to_string()).await?; - let iterations: usize = 3; + let iterations: usize = 1; let max_latency_ms: u64 = 100; for i in 1..iterations + 1 {