Skip to content

Commit

Permalink
peer+wire: add addrv2 message, protocol negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
Crypt-iQ committed Feb 11, 2022
1 parent 44f9fd2 commit 746021c
Show file tree
Hide file tree
Showing 8 changed files with 712 additions and 80 deletions.
228 changes: 189 additions & 39 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

const (
// MaxProtocolVersion is the max protocol version the peer supports.
MaxProtocolVersion = wire.FeeFilterVersion
MaxProtocolVersion = wire.AddrV2Version

// DefaultTrickleInterval is the min time between attempts to send an
// inv message to a peer.
Expand Down Expand Up @@ -102,6 +102,9 @@ type MessageListeners struct {
// OnAddr is invoked when a peer receives an addr bitcoin message.
OnAddr func(p *Peer, msg *wire.MsgAddr)

// OnAddrV2 is invoked when a peer receives an addrv2 bitcoin message.
OnAddrV2 func(p *Peer, msg *wire.MsgAddrV2)

// OnPing is invoked when a peer receives a ping bitcoin message.
OnPing func(p *Peer, msg *wire.MsgPing)

Expand Down Expand Up @@ -197,6 +200,9 @@ type MessageListeners struct {
// message.
OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)

// OnSendAddrV2 is invoked when a peer receives a sendaddrv2 message.
OnSendAddrV2 func(p *Peer, msg *wire.MsgSendAddrV2)

// OnRead is invoked when a peer receives a bitcoin message. It
// consists of the number of bytes read, the message, and whether or not
// an error in the read occurred. Typically, callers will opt to use
Expand Down Expand Up @@ -399,7 +405,7 @@ type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
// HostToNetAddrFunc is a func which takes a host, port, services and returns
// the netaddress.
type HostToNetAddrFunc func(host string, port uint16,
services wire.ServiceFlag) (*wire.NetAddress, error)
services wire.ServiceFlag) (*wire.NetAddressV2, error)

// NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound
// messages are read via the inHandler goroutine and generally dispatched to
Expand Down Expand Up @@ -445,7 +451,7 @@ type Peer struct {
inbound bool

flagsMtx sync.Mutex // protects the peer flags below
na *wire.NetAddress
na *wire.NetAddressV2
id int32
userAgent string
services wire.ServiceFlag
Expand All @@ -455,6 +461,7 @@ type Peer struct {
sendHeadersPreferred bool // peer sent a sendheaders message
verAckReceived bool
witnessEnabled bool
sendAddrV2 bool

wireEncoding wire.MessageEncoding

Expand Down Expand Up @@ -585,7 +592,7 @@ func (p *Peer) ID() int32 {
// NA returns the peer network address.
//
// This function is safe for concurrent access.
func (p *Peer) NA() *wire.NetAddress {
func (p *Peer) NA() *wire.NetAddressV2 {
p.flagsMtx.Lock()
na := p.na
p.flagsMtx.Unlock()
Expand Down Expand Up @@ -820,6 +827,16 @@ func (p *Peer) IsWitnessEnabled() bool {
return witnessEnabled
}

// WantsAddrV2 returns if the peer supports addrv2 messages instead of the
// legacy addr messages.
func (p *Peer) WantsAddrV2() bool {
p.flagsMtx.Lock()
wantsAddrV2 := p.sendAddrV2
p.flagsMtx.Unlock()

return wantsAddrV2
}

// PushAddrMsg sends an addr message to the connected peer using the provided
// addresses. This function is useful over manually sending the message via
// QueueMessage since it automatically limits the addresses to the maximum
Expand Down Expand Up @@ -856,6 +873,40 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, er
return msg.AddrList, nil
}

// PushAddrV2Msg is used to push an addrv2 message to the remote peer.
//
// This function is safe for concurrent access.
func (p *Peer) PushAddrV2Msg(addrs []*wire.NetAddressV2) (
[]*wire.NetAddressV2, error) {

count := len(addrs)

// Nothing to send.
if count == 0 {
return nil, nil
}

m := wire.NewMsgAddrV2()
m.AddrList = make([]*wire.NetAddressV2, count)
copy(m.AddrList, addrs)

// Randomize the addresses sent if there are more than the maximum.
if count > wire.MaxV2AddrPerMsg {
// Shuffle the address list.
for i := 0; i < wire.MaxV2AddrPerMsg; i++ {
j := i + rand.Intn(count-i)
m.AddrList[i] = m.AddrList[j]
m.AddrList[j] = m.AddrList[i]
}

// Truncate it to the maximum size.
m.AddrList = m.AddrList[:wire.MaxV2AddrPerMsg]
}

p.QueueMessage(m, nil)
return m.AddrList, nil
}

// PushGetBlocksMsg sends a getblocks message for the provided block locator
// and stop hash. It will ignore back-to-back duplicate requests.
//
Expand Down Expand Up @@ -1363,6 +1414,19 @@ out:
continue
}

// Since the protocol version is 70016 but we don't
// implement compact blocks, we have to ignore unknown
// messages after the version-verack handshake. This
// matches bitcoind's behavior and is necessary since
// compact blocks negotiation occurs after the
// handshake.
if err == wire.ErrUnknownMessage {
log.Debugf("Received unknown message from %s:"+
" %v", p, err)
idleTimer.Reset(idleTimeout)
continue
}

// Only log the error and send reject message if the
// local peer is not forcibly disconnecting and the
// remote peer has not disconnected.
Expand Down Expand Up @@ -1404,6 +1468,11 @@ out:
)
break out

case *wire.MsgSendAddrV2:
// Disconnect if peer sends this after the handshake is
// completed.
break out

case *wire.MsgGetAddr:
if p.cfg.Listeners.OnGetAddr != nil {
p.cfg.Listeners.OnGetAddr(p, msg)
Expand All @@ -1414,6 +1483,11 @@ out:
p.cfg.Listeners.OnAddr(p, msg)
}

case *wire.MsgAddrV2:
if p.cfg.Listeners.OnAddrV2 != nil {
p.cfg.Listeners.OnAddrV2(p, msg)
}

case *wire.MsgPing:
p.handlePingMsg(msg)
if p.cfg.Listeners.OnPing != nil {
Expand Down Expand Up @@ -1986,38 +2060,15 @@ func (p *Peer) readRemoteVersionMsg() error {
return nil
}

// readRemoteVerAckMsg waits for the next message to arrive from the remote
// peer. If this message is not a verack message, then an error is returned.
// This method is to be used as part of the version negotiation upon a new
// connection.
func (p *Peer) readRemoteVerAckMsg() error {
// Read the next message from the wire.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}

// It should be a verack message, otherwise send a reject message to the
// peer explaining why.
msg, ok := remoteMsg.(*wire.MsgVerAck)
if !ok {
reason := "a verack message must follow version"
rejectMsg := wire.NewMsgReject(
msg.Command(), wire.RejectMalformed, reason,
)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}

// processRemoteVerAckMsg takes the verack from the remote peer and handles it.
func (p *Peer) processRemoteVerAckMsg(msg *wire.MsgVerAck) {
p.flagsMtx.Lock()
p.verAckReceived = true
p.flagsMtx.Unlock()

if p.cfg.Listeners.OnVerAck != nil {
p.cfg.Listeners.OnVerAck(p, msg)
}

return nil
}

// localVersionMsg creates a version message that can be used to send to the
Expand All @@ -2032,15 +2083,23 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
}
}

theirNA := p.na
theirNA := p.na.ToLegacy()

// If p.na is a torv3 hidden service address, we'll need to send over
// an empty NetAddress for their address.
if p.na.IsTorV3() {
theirNA = wire.NewNetAddressIPPort(
net.IP([]byte{0, 0, 0, 0}), p.na.Port, p.na.Services,
)
}

// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
if err != nil || p.na.Addr.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
theirNA.Services)
}
Expand Down Expand Up @@ -2092,14 +2151,71 @@ func (p *Peer) writeLocalVersionMsg() error {
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}

// writeSendAddrV2Msg writes our sendaddrv2 message to the remote peer if the
// peer supports protocol version 70016 and above.
func (p *Peer) writeSendAddrV2Msg(pver uint32) error {
if pver < wire.AddrV2Version {
return nil
}

sendAddrMsg := wire.NewMsgSendAddrV2()
return p.writeMessage(sendAddrMsg, wire.LatestEncoding)
}

// waitToFinishNegotiation waits until desired negotiation messages are
// received, recording the remote peer's preference for sendaddrv2 as an
// example. The list of negotiated features can be expanded in the future. If a
// verack is received, negotiation stops and the connection is live.
func (p *Peer) waitToFinishNegotiation(pver uint32) error {
// There are several possible messages that can be received here. We
// could immediately receive verack and be done with the handshake. We
// could receive sendaddrv2 and still have to wait for verack. Or we
// can receive unknown messages before and after sendaddrv2 and still
// have to wait for verack.
for {
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err == wire.ErrUnknownMessage {
continue
} else if err != nil {
return err
}

switch m := remoteMsg.(type) {
case *wire.MsgSendAddrV2:
if pver >= wire.AddrV2Version {
p.flagsMtx.Lock()
p.sendAddrV2 = true
p.flagsMtx.Unlock()

if p.cfg.Listeners.OnSendAddrV2 != nil {
p.cfg.Listeners.OnSendAddrV2(p, m)
}
}
case *wire.MsgVerAck:
// Receiving a verack means we are done with the
// handshake.
p.processRemoteVerAckMsg(m)
return nil
default:
// This is triggered if the peer sends, for example, a
// GETDATA message during this negotiation.
return wire.ErrInvalidHandshake
}
}
}

// negotiateInboundProtocol performs the negotiation protocol for an inbound
// peer. The events should occur in the following order, otherwise an error is
// returned:
//
// 1. Remote peer sends their version.
// 2. We send our version.
// 3. We send our verack.
// 4. Remote peer sends their verack.
// 3. We send sendaddrv2 if their version is >= 70016.
// 4. We send our verack.
// 5. Wait until sendaddrv2 or verack is received. Unknown messages are
// skipped as it could be wtxidrelay or a different message in the future
// that btcd does not implement but bitcoind does.
// 6. If remote peer sent sendaddrv2 above, wait until receipt of verack.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
Expand All @@ -2109,12 +2225,22 @@ func (p *Peer) negotiateInboundProtocol() error {
return err
}

var protoVersion uint32
p.flagsMtx.Lock()
protoVersion = p.protocolVersion
p.flagsMtx.Unlock()

if err := p.writeSendAddrV2Msg(protoVersion); err != nil {
return err
}

err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
if err != nil {
return err
}

return p.readRemoteVerAckMsg()
// Finish the negotiation by waiting for negotiable messages or verack.
return p.waitToFinishNegotiation(protoVersion)
}

// negotiateOutboundProtocol performs the negotiation protocol for an outbound
Expand All @@ -2123,8 +2249,11 @@ func (p *Peer) negotiateInboundProtocol() error {
//
// 1. We send our version.
// 2. Remote peer sends their version.
// 3. Remote peer sends their verack.
// 3. We send sendaddrv2 if their version is >= 70016.
// 4. We send our verack.
// 5. We wait to receive sendaddrv2 or verack, skipping unknown messages as
// in the inbound case.
// 6. If sendaddrv2 was received, wait for receipt of verack.
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil {
return err
Expand All @@ -2134,11 +2263,22 @@ func (p *Peer) negotiateOutboundProtocol() error {
return err
}

if err := p.readRemoteVerAckMsg(); err != nil {
var protoVersion uint32
p.flagsMtx.Lock()
protoVersion = p.protocolVersion
p.flagsMtx.Unlock()

if err := p.writeSendAddrV2Msg(protoVersion); err != nil {
return err
}

err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
if err != nil {
return err
}

return p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
// Finish the negotiation by waiting for negotiable messages or verack.
return p.waitToFinishNegotiation(protoVersion)
}

// start begins processing input and output messages.
Expand Down Expand Up @@ -2201,7 +2341,12 @@ func (p *Peer) AssociateConnection(conn net.Conn) {
p.Disconnect()
return
}
p.na = na

// Convert the NetAddress created above into NetAddressV2.
currentNa := wire.NetAddressV2FromBytes(
na.Timestamp, na.Services, na.IP, na.Port,
)
p.na = currentNa
}

go func() {
Expand Down Expand Up @@ -2289,7 +2434,12 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
// If host is an onion hidden service, it is likely that a
// nil-pointer-dereference will occur. Connecting to onion
// hidden services should have HostToNetAddress set.
p.na = wire.NetAddressV2FromBytes(
time.Now(), 0, net.ParseIP(host), uint16(port),
)
}

return p, nil
Expand Down
Loading

0 comments on commit 746021c

Please sign in to comment.