Skip to content

Commit

Permalink
Rework channel reestablish (#2036)
Browse files Browse the repository at this point in the history
In an "outdated commitment" scenario where we are on the up-to-date side, we always react by force-closing the channel immediately, not giving our peer a chance to fix their data and restart. On top of that, we consider this a commitment sync error, instead of clearly logging that our counterparty is using outdated data.

Addressing this turned out to be rabbit-holey: our sync code is quite complicated and is a bit redundant because we separate between:
- checking whether we are late
- deciding what messages we need to retransmit

Also, discovered a missing corner case when syncing in SHUTDOWN state.
  • Loading branch information
pm47 committed Oct 27, 2021
1 parent 2e9f8d9 commit 2c0c24e
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 176 deletions.
186 changes: 71 additions & 115 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.Commitments.PostRevocationAction
import fr.acinq.eclair.channel.Helpers.{Closing, Funding, getRelayFees}
import fr.acinq.eclair.channel.Helpers.Syncing.SyncResult
import fr.acinq.eclair.channel.Helpers.{Closing, Funding, Syncing, getRelayFees}
import fr.acinq.eclair.channel.Monitoring.Metrics.ProcessMessage
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.publish.TxPublisher
Expand Down Expand Up @@ -1662,44 +1663,38 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
goto(WAIT_FOR_FUNDING_LOCKED) sending fundingLocked

case Event(channelReestablish: ChannelReestablish, d: DATA_NORMAL) =>
var sendQueue = Queue.empty[LightningMessage]
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, d.commitments.channelConfig)
channelReestablish match {
case ChannelReestablish(_, _, nextRemoteRevocationNumber, yourLastPerCommitmentSecret, _, _) if !Helpers.checkLocalCommit(d, nextRemoteRevocationNumber) =>
// if next_remote_revocation_number is greater than our local commitment index, it means that either we are using an outdated commitment, or they are lying
// but first we need to make sure that the last per_commitment_secret that they claim to have received from us is correct for that next_remote_revocation_number minus 1
if (keyManager.commitmentSecret(channelKeyPath, nextRemoteRevocationNumber - 1) == yourLastPerCommitmentSecret) {
log.warning(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourCommitmentNumber=${d.commitments.localCommit.index} theirCommitmentNumber=$nextRemoteRevocationNumber")
// their data checks out, we indeed seem to be using an old revoked commitment, and must absolutely *NOT* publish it, because that would be a cheating attempt and they
// would punish us by taking all the funds in the channel
val exc = PleasePublishYourCommitment(d.channelId)
val error = Error(d.channelId, exc.getMessage)
goto(WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) using DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(d.commitments, channelReestablish) storing() sending error
} else {
// they lied! the last per_commitment_secret they claimed to have received from us is invalid
throw InvalidRevokedCommitProof(d.channelId, d.commitments.localCommit.index, nextRemoteRevocationNumber, yourLastPerCommitmentSecret)
}
case ChannelReestablish(_, nextLocalCommitmentNumber, _, _, _, _) if !Helpers.checkRemoteCommit(d, nextLocalCommitmentNumber) =>
// if next_local_commit_number is more than one more our remote commitment index, it means that either we are using an outdated commitment, or they are lying
log.warning(s"counterparty says that they have a more recent commitment than the one we know of!!! ourCommitmentNumber=${d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.index).getOrElse(d.commitments.remoteCommit.index)} theirCommitmentNumber=$nextLocalCommitmentNumber")
// there is no way to make sure that they are saying the truth, the best thing to do is ask them to publish their commitment right now
// maybe they will publish their commitment, in that case we need to remember their commitment point in order to be able to claim our outputs
// not that if they don't comply, we could publish our own commitment (it is not stale, otherwise we would be in the case above)
val exc = PleasePublishYourCommitment(d.channelId)
val error = Error(d.channelId, exc.getMessage)
goto(WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) using DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(d.commitments, channelReestablish) storing() sending error
case _ =>
Syncing.checkSync(keyManager, d, channelReestablish) match {
case syncFailure: SyncResult.Failure =>
handleSyncFailure(channelReestablish, syncFailure, d)
case syncSuccess: SyncResult.Success =>
var sendQueue = Queue.empty[LightningMessage]
// normal case, our data is up-to-date
if (channelReestablish.nextLocalCommitmentNumber == 1 && d.commitments.localCommit.index == 0) {
// If next_local_commitment_number is 1 in both the channel_reestablish it sent and received, then the node MUST retransmit funding_locked, otherwise it MUST NOT
log.debug("re-sending fundingLocked")
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, d.commitments.channelConfig)
val nextPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, 1)
val fundingLocked = FundingLocked(d.commitments.channelId, nextPerCommitmentPoint)
sendQueue = sendQueue :+ fundingLocked
}

val (commitments1, sendQueue1) = handleSync(channelReestablish, d)
sendQueue = sendQueue ++ sendQueue1
// we may need to retransmit updates and/or commit_sig and/or revocation
sendQueue = sendQueue ++ syncSuccess.retransmit

// then we clean up unsigned updates
val commitments1 = Commitments.discardUnsignedUpdates(d.commitments)

commitments1.remoteNextCommitInfo match {
case Left(_) =>
// we expect them to (re-)send the revocation immediately
startSingleTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), nodeParams.revocationTimeout)
case _ => ()
}

// do I have something to sign?
if (Commitments.localHasChanges(commitments1)) {
self ! CMD_SIGN()
}

// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
d.localShutdown.foreach {
Expand Down Expand Up @@ -1756,11 +1751,15 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)

case Event(channelReestablish: ChannelReestablish, d: DATA_SHUTDOWN) =>
var sendQueue = Queue.empty[LightningMessage]
val (commitments1, sendQueue1) = handleSync(channelReestablish, d)
sendQueue = sendQueue ++ sendQueue1 :+ d.localShutdown
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
goto(SHUTDOWN) using d.copy(commitments = commitments1) sending sendQueue
Syncing.checkSync(keyManager, d, channelReestablish) match {
case syncFailure: SyncResult.Failure =>
handleSyncFailure(channelReestablish, syncFailure, d)
case syncSuccess: SyncResult.Success =>
val commitments1 = Commitments.discardUnsignedUpdates(d.commitments)
val sendQueue = Queue.empty[LightningMessage] ++ syncSuccess.retransmit :+ d.localShutdown
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
goto(SHUTDOWN) using d.copy(commitments = commitments1) sending sendQueue
}

case Event(_: ChannelReestablish, d: DATA_NEGOTIATING) =>
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
Expand Down Expand Up @@ -2229,6 +2228,29 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
stay() using d.copy(channelUpdate = channelUpdate1) storing()
}

private def handleSyncFailure(channelReestablish: ChannelReestablish, syncFailure: SyncResult.Failure, d: HasCommitments) = {
syncFailure match {
case res: SyncResult.LocalLateProven =>
log.error(s"counterparty proved that we have an outdated (revoked) local commitment!!! ourLocalCommitmentNumber=${res.ourLocalCommitmentNumber} theirRemoteCommitmentNumber=${res.theirRemoteCommitmentNumber}")
// their data checks out, we indeed seem to be using an old revoked commitment, and must absolutely *NOT* publish it, because that would be a cheating attempt and they
// would punish us by taking all the funds in the channel
handleOutdatedCommitment(channelReestablish, d)
case res: Syncing.SyncResult.LocalLateUnproven =>
log.error(s"our local commitment is in sync, but counterparty says that they have a more recent remote commitment than the one we know of (they could be lying)!!! ourRemoteCommitmentNumber=${res.ourRemoteCommitmentNumber} theirCommitmentNumber=${res.theirLocalCommitmentNumber}")
// there is no way to make sure that they are saying the truth, the best thing to do is "call their bluff" and
// ask them to publish their commitment right now. If they weren't lying and they do publish their commitment,
// we need to remember their commitment point in order to be able to claim our outputs
handleOutdatedCommitment(channelReestablish, d)
case res: Syncing.SyncResult.RemoteLying =>
log.error(s"counterparty is lying about us having an outdated commitment!!! ourLocalCommitmentNumber=${res.ourLocalCommitmentNumber} theirRemoteCommitmentNumber=${res.theirRemoteCommitmentNumber}")
// they are deliberately trying to fool us into thinking we have a late commitment
handleLocalError(InvalidRevokedCommitProof(d.channelId, res.ourLocalCommitmentNumber, res.theirRemoteCommitmentNumber, res.invalidPerCommitmentSecret), d, Some(channelReestablish))
case SyncResult.RemoteLate =>
log.error("counterparty appears to be using an outdated commitment, they may request a force-close, standing by...")
stay()
}
}

private def maybeEmitChannelUpdateChangedEvent(newUpdate: ChannelUpdate, oldUpdate_opt: Option[ChannelUpdate], d: DATA_NORMAL): Unit = {
if (oldUpdate_opt.isEmpty || !Announcements.areSameIgnoreFlags(newUpdate, oldUpdate_opt.get)) {
context.system.eventStream.publish(ChannelUpdateParametersChanged(self, d.channelId, newUpdate.shortChannelId, d.commitments.remoteNodeId, newUpdate))
Expand Down Expand Up @@ -2267,13 +2289,16 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
private def handleLocalError(cause: Throwable, d: ChannelData, msg: Option[Any]) = {
cause match {
case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request")
case _ if stateName == WAIT_FOR_OPEN_CHANNEL => log.warning(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _ => log.error(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
}
cause match {
case _: ChannelException => ()
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData")
case _ if msg.exists(_.isInstanceOf[OpenChannel]) || msg.exists(_.isInstanceOf[AcceptChannel]) =>
// invalid remote channel parameters are logged as warning
log.warning(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _: ChannelException =>
log.error(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _ =>
// unhandled error: we dump the channel data, and print the stack trace
log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData:")
}

val error = Error(d.channelId, cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = true))

Expand Down Expand Up @@ -2535,79 +2560,10 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
goto(ERR_INFORMATION_LEAK) calling doPublish(localCommitPublished, d.commitments) sending error
}

private def handleSync(channelReestablish: ChannelReestablish, d: HasCommitments): (Commitments, Queue[LightningMessage]) = {
var sendQueue = Queue.empty[LightningMessage]
// first we clean up unacknowledged updates
log.debug("discarding proposed OUT: {}", d.commitments.localChanges.proposed.map(Commitments.msg2String(_)).mkString(","))
log.debug("discarding proposed IN: {}", d.commitments.remoteChanges.proposed.map(Commitments.msg2String(_)).mkString(","))
val commitments1 = d.commitments.copy(
localChanges = d.commitments.localChanges.copy(proposed = Nil),
remoteChanges = d.commitments.remoteChanges.copy(proposed = Nil),
localNextHtlcId = d.commitments.localNextHtlcId - d.commitments.localChanges.proposed.collect { case u: UpdateAddHtlc => u }.size,
remoteNextHtlcId = d.commitments.remoteNextHtlcId - d.commitments.remoteChanges.proposed.collect { case u: UpdateAddHtlc => u }.size)
log.debug(s"localNextHtlcId=${d.commitments.localNextHtlcId}->${commitments1.localNextHtlcId}")
log.debug(s"remoteNextHtlcId=${d.commitments.remoteNextHtlcId}->${commitments1.remoteNextHtlcId}")

def resendRevocation(): Unit = {
// let's see the state of remote sigs
if (commitments1.localCommit.index == channelReestablish.nextRemoteRevocationNumber) {
// nothing to do
} else if (commitments1.localCommit.index == channelReestablish.nextRemoteRevocationNumber + 1) {
// our last revocation got lost, let's resend it
log.debug("re-sending last revocation")
val channelKeyPath = keyManager.keyPath(d.commitments.localParams, d.commitments.channelConfig)
val localPerCommitmentSecret = keyManager.commitmentSecret(channelKeyPath, d.commitments.localCommit.index - 1)
val localNextPerCommitmentPoint = keyManager.commitmentPoint(channelKeyPath, d.commitments.localCommit.index + 1)
val revocation = RevokeAndAck(
channelId = commitments1.channelId,
perCommitmentSecret = localPerCommitmentSecret,
nextPerCommitmentPoint = localNextPerCommitmentPoint
)
sendQueue = sendQueue :+ revocation
} else throw RevocationSyncError(d.channelId)
}

// re-sending sig/rev (in the right order)
commitments1.remoteNextCommitInfo match {
case Left(waitingForRevocation) if waitingForRevocation.nextRemoteCommit.index + 1 == channelReestablish.nextLocalCommitmentNumber =>
// we had sent a new sig and were waiting for their revocation
// they had received the new sig but their revocation was lost during the disconnection
// they will send us the revocation, nothing to do here
log.debug("waiting for them to re-send their last revocation")
resendRevocation()
case Left(waitingForRevocation) if waitingForRevocation.nextRemoteCommit.index == channelReestablish.nextLocalCommitmentNumber =>
// we had sent a new sig and were waiting for their revocation
// they didn't receive the new sig because of the disconnection
// we just resend the same updates and the same sig

val revWasSentLast = commitments1.localCommit.index > waitingForRevocation.sentAfterLocalCommitIndex
if (!revWasSentLast) resendRevocation()

log.debug("re-sending previously local signed changes: {}", commitments1.localChanges.signed.map(Commitments.msg2String(_)).mkString(","))
commitments1.localChanges.signed.foreach(revocation => sendQueue = sendQueue :+ revocation)
log.debug("re-sending the exact same previous sig")
sendQueue = sendQueue :+ waitingForRevocation.sent

if (revWasSentLast) resendRevocation()
case Right(_) if commitments1.remoteCommit.index + 1 == channelReestablish.nextLocalCommitmentNumber =>
// there wasn't any sig in-flight when the disconnection occurred
resendRevocation()
case _ => throw CommitmentSyncError(d.channelId)
}

commitments1.remoteNextCommitInfo match {
case Left(_) =>
// we expect them to (re-)send the revocation immediately
startSingleTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), nodeParams.revocationTimeout)
case _ => ()
}

// have I something to sign?
if (Commitments.localHasChanges(commitments1)) {
self ! CMD_SIGN()
}

(commitments1, sendQueue)
private def handleOutdatedCommitment(channelReestablish: ChannelReestablish, d: HasCommitments) = {
val exc = PleasePublishYourCommitment(d.channelId)
val error = Error(d.channelId, exc.getMessage)
goto(WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) using DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(d.commitments, channelReestablish) storing() sending error
}

/**
Expand Down Expand Up @@ -2696,7 +2652,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
}

// we let the peer decide what to do
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Escalate }
override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Escalate }

override def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
KamonExt.time(ProcessMessage.withTag("MessageType", msg.getClass.getSimpleName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ case class CannotSignWithoutChanges (override val channelId: Byte
case class CannotSignBeforeRevocation (override val channelId: ByteVector32) extends ChannelException(channelId, "cannot sign until next revocation hash is received")
case class UnexpectedRevocation (override val channelId: ByteVector32) extends ChannelException(channelId, "received unexpected RevokeAndAck message")
case class InvalidRevocation (override val channelId: ByteVector32) extends ChannelException(channelId, "invalid revocation")
case class InvalidRevokedCommitProof (override val channelId: ByteVector32, ourCommitmentNumber: Long, theirCommitmentNumber: Long, perCommitmentSecret: PrivateKey) extends ChannelException(channelId, s"counterparty claimed that we have a revoked commit but their proof doesn't check out: ourCommitmentNumber=$ourCommitmentNumber theirCommitmentNumber=$theirCommitmentNumber perCommitmentSecret=$perCommitmentSecret")
case class CommitmentSyncError (override val channelId: ByteVector32) extends ChannelException(channelId, "commitment sync error")
case class RevocationSyncError (override val channelId: ByteVector32) extends ChannelException(channelId, "revocation sync error")
case class InvalidRevokedCommitProof (override val channelId: ByteVector32, ourLocalCommitmentNumber: Long, theirRemoteCommitmentNumber: Long, invalidPerCommitmentSecret: PrivateKey) extends ChannelException(channelId, s"counterparty claimed that we have a revoked commit but their proof doesn't check out: ourCommitmentNumber=$ourLocalCommitmentNumber theirCommitmentNumber=$theirRemoteCommitmentNumber perCommitmentSecret=$invalidPerCommitmentSecret")
case class InvalidFailureCode (override val channelId: ByteVector32) extends ChannelException(channelId, "UpdateFailMalformedHtlc message doesn't have BADONION bit set")
case class PleasePublishYourCommitment (override val channelId: ByteVector32) extends ChannelException(channelId, "please publish your local commitment")
case class CommandUnavailableInThisState (override val channelId: ByteVector32, command: String, state: ChannelState) extends ChannelException(channelId, s"cannot execute command=$command in state=$state")
Expand Down
Loading

0 comments on commit 2c0c24e

Please sign in to comment.