-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove goprocess from Host #865
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"context" | ||
"io" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p/p2p/protocol/identify" | ||
|
@@ -21,8 +22,6 @@ import ( | |
inat "github.com/libp2p/go-libp2p-nat" | ||
|
||
logging "github.com/ipfs/go-log" | ||
"github.com/jbenet/goprocess" | ||
goprocessctx "github.com/jbenet/goprocess/context" | ||
|
||
ma "github.com/multiformats/go-multiaddr" | ||
madns "github.com/multiformats/go-multiaddr-dns" | ||
|
@@ -68,6 +67,13 @@ const NATPortMap Option = iota | |
// * uses an identity service to send + receive node information | ||
// * uses a nat service to establish NAT port mappings | ||
type BasicHost struct { | ||
ctx context.Context | ||
ctxCancel context.CancelFunc | ||
// ensures we shutdown ONLY once | ||
closeSync sync.Once | ||
// keep track of resources we need to wait on before shutting down | ||
refCount sync.WaitGroup | ||
|
||
network network.Network | ||
mux *msmux.MultistreamMuxer | ||
ids *identify.IDService | ||
|
@@ -81,8 +87,6 @@ type BasicHost struct { | |
|
||
negtimeout time.Duration | ||
|
||
proc goprocess.Process | ||
|
||
emitters struct { | ||
evtLocalProtocolsUpdated event.Emitter | ||
evtLocalAddrsUpdated event.Emitter | ||
|
@@ -128,6 +132,8 @@ type HostOpts struct { | |
|
||
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. | ||
func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) { | ||
hostCtx, cancel := context.WithCancel(ctx) | ||
|
||
h := &BasicHost{ | ||
network: net, | ||
mux: msmux.NewMultistreamMuxer(), | ||
|
@@ -136,6 +142,8 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo | |
maResolver: madns.DefaultResolver, | ||
eventbus: eventbus.NewBus(), | ||
addrChangeChan: make(chan struct{}, 1), | ||
ctx: hostCtx, | ||
ctxCancel: cancel, | ||
} | ||
|
||
var err error | ||
|
@@ -146,28 +154,12 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo | |
return nil, err | ||
} | ||
|
||
h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { | ||
if h.natmgr != nil { | ||
h.natmgr.Close() | ||
} | ||
if h.cmgr != nil { | ||
h.cmgr.Close() | ||
} | ||
_ = h.emitters.evtLocalProtocolsUpdated.Close() | ||
_ = h.emitters.evtLocalAddrsUpdated.Close() | ||
return h.Network().Close() | ||
}) | ||
|
||
if opts.MultistreamMuxer != nil { | ||
h.mux = opts.MultistreamMuxer | ||
} | ||
|
||
// we can't set this as a default above because it depends on the *BasicHost. | ||
h.ids = identify.NewIDService( | ||
goprocessctx.WithProcessClosing(ctx, h.proc), | ||
h, | ||
identify.UserAgent(opts.UserAgent), | ||
) | ||
h.ids = identify.NewIDService(h, identify.UserAgent(opts.UserAgent)) | ||
|
||
if uint64(opts.NegotiationTimeout) != 0 { | ||
h.negtimeout = opts.NegotiationTimeout | ||
|
@@ -242,7 +234,7 @@ func New(net network.Network, opts ...interface{}) *BasicHost { | |
|
||
// Start starts background tasks in the host | ||
func (h *BasicHost) Start() { | ||
h.proc.Go(h.background) | ||
go h.background() | ||
} | ||
|
||
// newConnHandler is the remote-opened conn handler for inet.Network | ||
|
@@ -343,7 +335,12 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses | |
return &evt | ||
} | ||
|
||
func (h *BasicHost) background(p goprocess.Process) { | ||
func (h *BasicHost) background() { | ||
h.refCount.Add(1) | ||
defer func() { | ||
h.refCount.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
}() | ||
|
||
// periodically schedules an IdentifyPush to update our peers for changes | ||
// in our address set (if needed) | ||
ticker := time.NewTicker(10 * time.Second) | ||
|
@@ -356,7 +353,7 @@ func (h *BasicHost) background(p goprocess.Process) { | |
select { | ||
case <-ticker.C: | ||
case <-h.addrChangeChan: | ||
case <-p.Closing(): | ||
case <-h.ctx.Done(): | ||
return | ||
} | ||
|
||
|
@@ -805,14 +802,26 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { | |
|
||
// Close shuts down the Host's services (network, etc). | ||
func (h *BasicHost) Close() error { | ||
// You're thinking of adding some teardown logic here, right? Well | ||
// don't! Add any process teardown logic to the teardown function in the | ||
// constructor. | ||
// | ||
// This: | ||
// 1. May be called multiple times. | ||
// 2. May _never_ be called if the host is stopped by the context. | ||
return h.proc.Close() | ||
h.closeSync.Do(func() { | ||
h.ctxCancel() | ||
if h.natmgr != nil { | ||
h.natmgr.Close() | ||
} | ||
if h.cmgr != nil { | ||
h.cmgr.Close() | ||
} | ||
if h.ids != nil { | ||
h.ids.Close() | ||
} | ||
|
||
_ = h.emitters.evtLocalProtocolsUpdated.Close() | ||
_ = h.emitters.evtLocalAddrsUpdated.Close() | ||
h.Network().Close() | ||
|
||
h.refCount.Wait() | ||
}) | ||
|
||
return nil | ||
Stebalien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
type streamWrapper struct { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-eventbus" | ||
ic "github.com/libp2p/go-libp2p-core/crypto" | ||
"github.com/libp2p/go-libp2p-core/event" | ||
"github.com/libp2p/go-libp2p-core/helpers" | ||
|
@@ -17,6 +16,7 @@ import ( | |
"github.com/libp2p/go-libp2p-core/peerstore" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
"github.com/libp2p/go-eventbus" | ||
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" | ||
|
||
ggio "github.com/gogo/protobuf/io" | ||
|
@@ -71,7 +71,12 @@ type IDService struct { | |
Host host.Host | ||
UserAgent string | ||
|
||
ctx context.Context | ||
ctx context.Context | ||
ctxCancel context.CancelFunc | ||
// ensure we shutdown ONLY once | ||
closeSync sync.Once | ||
// track resources that need to be shut down before we shut down | ||
refCount sync.WaitGroup | ||
|
||
// connections undergoing identification | ||
// for wait purposes | ||
|
@@ -94,7 +99,7 @@ type IDService struct { | |
|
||
// NewIDService constructs a new *IDService and activates it by | ||
// attaching its stream handler to the given host.Host. | ||
func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService { | ||
func NewIDService(h host.Host, opts ...Option) *IDService { | ||
var cfg config | ||
for _, opt := range opts { | ||
opt(&cfg) | ||
|
@@ -105,13 +110,15 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService { | |
userAgent = cfg.userAgent | ||
} | ||
|
||
hostCtx, cancel := context.WithCancel(context.Background()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. being able to have the ID service run within a larger context continues to seem valuable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see why. It's a separate service and exposes a What are your concerns here ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if a consumer is spinning up a bunch of separate services, it's nice to have them all wired up to a central context, and then cancel that context to shut things down, rather than explicitly calling close on each service. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see where you are coming from but we've had problems in the past using contexts for cancellation across service boundaries. Please take a look at libp2p/go-libp2p-kbucket#50 (comment) & the linked discussions in that comment. The most important benefit to me is being able to control the order in which the services, sub-components etc. are shut down. |
||
s := &IDService{ | ||
Host: h, | ||
UserAgent: userAgent, | ||
|
||
ctx: ctx, | ||
ctx: hostCtx, | ||
ctxCancel: cancel, | ||
currid: make(map[network.Conn]chan struct{}), | ||
observedAddrs: NewObservedAddrSet(ctx), | ||
observedAddrs: NewObservedAddrSet(hostCtx), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some day, yes. But not today. |
||
} | ||
|
||
// handle local protocol handler updates, and push deltas to peers. | ||
|
@@ -143,13 +150,24 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService { | |
return s | ||
} | ||
|
||
// Close shuts down the IDService | ||
func (ids *IDService) Close() error { | ||
ids.closeSync.Do(func() { | ||
ids.ctxCancel() | ||
ids.refCount.Wait() | ||
}) | ||
return nil | ||
} | ||
|
||
func (ids *IDService) handleEvents() { | ||
ids.refCount.Add(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
sub := ids.subscription | ||
defer func() { | ||
_ = sub.Close() | ||
// drain the channel. | ||
for range sub.Out() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: we no longer need this (close will do it for us). We can simplify to: defer ids.refCount.Done()
defer sub.Close() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had a look at Sub.Close() in |
||
} | ||
ids.refCount.Done() | ||
}() | ||
|
||
for { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this is racy. See the note on https://golang.org/pkg/sync/#WaitGroup.Add.
We have to call Add before we go async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(otherwise
background
might not start running till after the user callsClose
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea how I missed this. Thanks for the catch.