Skip to content

Commit

Permalink
Merge branch 'master' into yahya/send-control
Browse files Browse the repository at this point in the history
  • Loading branch information
yhassanzadeh13 authored Oct 7, 2024
2 parents 2d4d97a + f71345c commit 6a9806b
Show file tree
Hide file tree
Showing 44 changed files with 2,691 additions and 688 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/go-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: Go Checks

on:
pull_request:
push:
branches: ["master"]
workflow_dispatch:
merge_group:

permissions:
contents: read

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }}
cancel-in-progress: true

jobs:
go-check:
uses: ipdxco/unified-github-workflows/.github/workflows/go-check.yml@v1.0
4 changes: 4 additions & 0 deletions .github/workflows/go-test-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"skipOSes": ["windows", "macos"],
"skipRace": true
}
21 changes: 21 additions & 0 deletions .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Go Test

on:
pull_request:
push:
branches: ["master"]
workflow_dispatch:
merge_group:

permissions:
contents: read

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }}
cancel-in-progress: true

jobs:
go-test:
uses: ipdxco/unified-github-workflows/.github/workflows/go-test.yml@v1.0
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
21 changes: 21 additions & 0 deletions .github/workflows/release-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Release Checker

on:
pull_request_target:
paths: ["version.json"]
types: [ opened, synchronize, reopened, labeled, unlabeled ]
workflow_dispatch:

permissions:
contents: write
pull-requests: write

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
release-check:
uses: ipdxco/unified-github-workflows/.github/workflows/release-check.yml@v1.0
with:
sources: '["version.json"]'
21 changes: 21 additions & 0 deletions .github/workflows/releaser.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Releaser

on:
push:
paths: ["version.json"]
workflow_dispatch:

permissions:
contents: write

concurrency:
group: ${{ github.workflow }}-${{ github.sha }}
cancel-in-progress: true

jobs:
releaser:
uses: ipdxco/unified-github-workflows/.github/workflows/releaser.yml@v1.0
with:
sources: '["version.json"]'
secrets:
UCI_GITHUB_TOKEN: ${{ secrets.UCI_GITHUB_TOKEN }}
18 changes: 18 additions & 0 deletions .github/workflows/tagpush.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Tag Push Checker

on:
push:
tags:
- v*

permissions:
contents: read
issues: write

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
releaser:
uses: ipdxco/unified-github-workflows/.github/workflows/tagpush.yml@v1.0
1 change: 0 additions & 1 deletion backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Dur
info: make(map[peer.ID]*backoffHistory),
}

rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)

return b
Expand Down
6 changes: 3 additions & 3 deletions blacklist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestBlacklist(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestBlacklist2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestBlacklist3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)
hosts := getDefaultHosts(t, 2)
psubs := getPubsubs(ctx, hosts)

psubs[1].BlacklistPeer(hosts[0].ID())
Expand Down
41 changes: 20 additions & 21 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *PubSub) notifyPeerDead(pid peer.ID) {
}
}

func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing *rpcQueue) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)
Expand All @@ -135,7 +135,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing *rpcQueue) {
select {
case <-time.After(backoff):
p.handleNewPeer(ctx, pid, outgoing)
Expand All @@ -156,7 +156,7 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
p.notifyPeerDead(pid)
}

func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing *rpcQueue) {
writeRpc := func(rpc *RPC) error {
size := uint64(rpc.Size())

Expand All @@ -174,20 +174,17 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
}

defer s.Close()
for {
select {
case rpc, ok := <-outgoing:
if !ok {
return
}
for ctx.Err() == nil {
rpc, err := outgoing.Pop(ctx)
if err != nil {
log.Debugf("popping message from the queue to send to %s: %s", s.Conn().RemotePeer(), err)
return
}

err := writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
case <-ctx.Done():
err = writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
}
Expand All @@ -209,15 +206,17 @@ func rpcWithControl(msgs []*pb.Message,
ihave []*pb.ControlIHave,
iwant []*pb.ControlIWant,
graft []*pb.ControlGraft,
prune []*pb.ControlPrune) *RPC {
prune []*pb.ControlPrune,
idontwant []*pb.ControlIDontWant) *RPC {
return &RPC{
RPC: pb.RPC{
Publish: msgs,
Control: &pb.ControlMessage{
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
Idontwant: idontwant,
},
},
}
Expand Down
3 changes: 2 additions & 1 deletion compat/compat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestSimpleDiscovery(t *testing.T) {
server := newDiscoveryServer()
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)}

hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)

Expand Down Expand Up @@ -234,7 +234,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)}

// Put the pubsub clients into two partitions
hosts := getNetHosts(t, ctx, numHosts)
hosts := getDefaultHosts(t, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)

Expand Down
12 changes: 7 additions & 5 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
return AcceptAll
}

func (fs *FloodSubRouter) PreValidation([]*Message) {}

func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}

func (fs *FloodSubRouter) Publish(msg *Message) {
Expand All @@ -83,19 +85,19 @@ func (fs *FloodSubRouter) Publish(msg *Message) {
continue
}

mch, ok := fs.p.peers[pid]
q, ok := fs.p.peers[pid]
if !ok {
continue
}

select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
err := q.Push(out, false)
if err != nil {
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
continue
}
fs.tracer.SendRPC(out, pid)
}
}

Expand Down
Loading

0 comments on commit 6a9806b

Please sign in to comment.