Skip to content

Commit

Permalink
Add randomization on node addresses (#2123)
Browse files Browse the repository at this point in the history
Instead of always using the last known peer address from the `PeersDb`,
we randomly pick the address from the `PeersDb` and the `NetworkDb`.

We use clearnet or tor addresses depending on the node configuration:
- tor not supported: always use clearnet addresses
- tor supported but not enabled for clearnet addresses: use clearnet
  addresses when available
- tor supported and enabled for clearnet addresses: use tor
  addresses when available
  • Loading branch information
pm47 committed Jan 21, 2022
1 parent c180ca2 commit 0a37213
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import akka.event.Logging.MDC
import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.db.{NetworkDb, PeersDb}
import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.wire.protocol.{NodeAddress, OnionAddress}
import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, NodeParams, TimestampMilli}

import java.net.InetSocketAddress
Expand Down Expand Up @@ -65,7 +65,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
when(WAITING) {
case Event(TickReconnect, d: WaitingData) =>
// we query the db every time because it may have been updated in the meantime (e.g. with network announcements)
getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId) match {
getPeerAddressFromDb(nodeParams, remoteNodeId) match {
case Some(address) =>
connect(address, origin = self, isPersistent = true)
goto(CONNECTING) using ConnectingData(address, d.nextReconnectionDelay)
Expand Down Expand Up @@ -136,7 +136,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
// if we are already connecting/connected, the peer will kill any duplicate connections
hostAndPort_opt
.map(hostAndPort2InetSocketAddress)
.orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match {
.orElse(getPeerAddressFromDb(nodeParams, remoteNodeId)) match {
case Some(address) => connect(address, origin = replyTo, isPersistent)
case None => replyTo ! PeerConnection.ConnectionResult.NoAddressFound
}
Expand Down Expand Up @@ -189,10 +189,33 @@ object ReconnectionTask {
case class WaitingData(nextReconnectionDelay: FiniteDuration) extends Data
// @formatter:on

def getPeerAddressFromDb(peersDb: PeersDb, networkDb: NetworkDb, remoteNodeId: PublicKey): Option[InetSocketAddress] = {
peersDb.getPeer(remoteNodeId) // TODO should we start with the network db which may be more up to date? or rotate?
.orElse(networkDb.getNode(remoteNodeId).flatMap(_.addresses.headOption)) // TODO gets the first of the list, improve selection?
.map(_.socketAddress)
def selectNodeAddress(nodeParams: NodeParams, nodeAddresses: Seq[NodeAddress]): Option[NodeAddress] = {
// it doesn't make sense to mix tor and clearnet addresses, so we separate them and decide whether we use one or the other
val torAddresses = nodeAddresses.collect { case o: OnionAddress => o }
val clearnetAddresses = nodeAddresses diff torAddresses
val selectedAddresses = nodeParams.socksProxy_opt match {
case Some(params) if clearnetAddresses.nonEmpty && params.useForTor && (!params.useForIPv4 || !params.useForIPv6) =>
// Remote has clearnet (and possibly tor addresses), and we support tor, but we have configured it to only use
// tor when strictly necessary. In this case we will only connect over clearnet.
clearnetAddresses
case Some(params) if torAddresses.nonEmpty && params.useForTor =>
// In all other cases, if they have a tor address and we support tor, we use tor.
torAddresses
case _ =>
// Otherwise, if we don't support tor or they don't have a tor address, we use clearnet.
clearnetAddresses
}
// finally, we pick an address at random
if (selectedAddresses.nonEmpty) {
Some(selectedAddresses(Random.nextInt(selectedAddresses.size)))
} else {
None
}
}

def getPeerAddressFromDb(nodeParams: NodeParams, remoteNodeId: PublicKey): Option[InetSocketAddress] = {
val nodeAddresses = nodeParams.db.peers.getPeer(remoteNodeId).toSeq ++ nodeParams.db.network.getNode(remoteNodeId).toSeq.flatMap(_.addresses)
selectNodeAddress(nodeParams, nodeAddresses).map(_.socketAddress)
}

def hostAndPort2InetSocketAddress(hostAndPort: HostAndPort): InetSocketAddress = new InetSocketAddress(hostAndPort.getHost, hostAndPort.getPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair._
import fr.acinq.eclair.io.Peer.ChannelId
import fr.acinq.eclair.io.ReconnectionTask.WaitingData
import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.protocol.{Color, IPv4, NodeAddress, NodeAnnouncement}
import org.mockito.IdiomaticMockito.StubbingOps
import org.mockito.MockitoSugar.mock
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, ParallelTestExecution, Tag}

import java.net.Inet4Address
import scala.concurrent.duration._

class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ParallelTestExecution {
Expand Down Expand Up @@ -221,4 +225,45 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
mockServer.close()
}

test("select peer address for reconnection") { _ =>
val nodeParams = mock[NodeParams]
val clearnet = NodeAddress.fromParts("1.2.3.4", 9735).get
val tor = NodeAddress.fromParts("iq7zhmhck54vcax2vlrdcavq2m32wao7ekh6jyeglmnuuvv3js57r4id.onion", 9735).get

// NB: we don't test randomization here, but it makes tests unnecessary more complex for little value

{
// tor not supported: always return clearnet addresses
nodeParams.socksProxy_opt returns None
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) === Some(clearnet))
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) === None)
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) === Some(clearnet))
}

{
// tor supported but not enabled for clearnet addresses: return clearnet addresses when available
val socksParams = mock[Socks5ProxyParams]
socksParams.useForTor returns true
socksParams.useForIPv4 returns false
socksParams.useForIPv6 returns false
nodeParams.socksProxy_opt returns Some(socksParams)
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) === Some(clearnet))
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) === Some(tor))
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) === Some(clearnet))
}

{
// tor supported and enabled for clearnet addresses: return tor addresses when available
val socksParams = mock[Socks5ProxyParams]
socksParams.useForTor returns true
socksParams.useForIPv4 returns true
socksParams.useForIPv6 returns true
nodeParams.socksProxy_opt returns Some(socksParams)
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) === Some(clearnet))
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) === Some(tor))
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) === Some(tor))
}

}

}

0 comments on commit 0a37213

Please sign in to comment.