Skip to content

Commit

Permalink
Ack htlc settlement commands after writing state (#1615)
Browse files Browse the repository at this point in the history
There are two issues:
1. because we forward commands *before* writing to disk in
   `PendingRelayDb.safeSend` in order to reduce latency, there is a race
   where the channel can process and acknowledge the command before the
   db write. As a result, the command stays in the pending relay db and
   will be cleaned up by the post-restart-htlc-cleaner at next restart.
2. in the general case, the channel acknowledges commands *before* it
   writes its state to disk, which opens a window for losing the command
   if we stop eclair at that exact time.

In order to fix 2., we introduce a new `acking` transition method, which
will be called after `storing`. This method adds a delay before actually
acknowledging the commands, which should be more than enough to solve 1.

I'm not sure we need that additional delay, because now that we
acknowledge the commands *after* storing the state, the channel should
lose the race most of the time.
  • Loading branch information
pm47 committed Dec 1, 2020
1 parent 6330e76 commit 848b433
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 19 deletions.
41 changes: 25 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fulfill
case Failure(cause) =>
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
handleCommandError(cause, c) acking(d.channelId, c)
}

case Event(fulfill: UpdateFulfillHtlc, d: DATA_NORMAL) =>
Expand All @@ -678,8 +677,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
handleCommandError(cause, c) acking(d.channelId, c)
}

case Event(c: CMD_FAIL_MALFORMED_HTLC, d: DATA_NORMAL) =>
Expand All @@ -690,8 +688,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
handleCommandError(cause, c) acking(d.channelId, c)
}

case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) =>
Expand Down Expand Up @@ -730,7 +727,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Commitments.sendCommit(d.commitments, keyManager) match {
case Success((commitments1, commit)) =>
log.debug("sending a new sig, spec:\n{}", Commitments.specs2String(commitments1))
PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, commitments1.localChanges.signed)
val nextRemoteCommit = commitments1.remoteNextCommitInfo.left.get.nextRemoteCommit
val nextCommitNumber = nextRemoteCommit.index
// we persist htlc data in order to be able to claim htlc outputs in case a revoked tx is published by our
Expand All @@ -749,7 +745,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
// we expect a quick response from our peer
setTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), timeout = nodeParams.revocationTimeout, repeat = false)
handleCommandSuccess(c, d.copy(commitments = commitments1)) storing() sending commit
handleCommandSuccess(c, d.copy(commitments = commitments1)) storing() sending commit acking(commitments1.localChanges.signed)
case Failure(cause) => handleCommandError(cause, c)
}
case Left(waitForRevocation) =>
Expand Down Expand Up @@ -1011,8 +1007,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fulfill
case Failure(cause) =>
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
handleCommandError(cause, c) acking(d.channelId, c)
}

case Event(fulfill: UpdateFulfillHtlc, d: DATA_SHUTDOWN) =>
Expand All @@ -1031,8 +1026,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
handleCommandError(cause, c) acking(d.channelId, c)
}

case Event(c: CMD_FAIL_MALFORMED_HTLC, d: DATA_SHUTDOWN) =>
Expand All @@ -1042,8 +1036,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending fail
case Failure(cause) =>
// we acknowledge the command right away in case of failure
PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, d.channelId, c)
handleCommandError(cause, c)
handleCommandError(cause, c) acking(d.channelId, c)
}

case Event(fail: UpdateFailHtlc, d: DATA_SHUTDOWN) =>
Expand Down Expand Up @@ -1082,11 +1075,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Commitments.sendCommit(d.commitments, keyManager) match {
case Success((commitments1, commit)) =>
log.debug("sending a new sig, spec:\n{}", Commitments.specs2String(commitments1))
PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, commitments1.localChanges.signed)
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
// we expect a quick response from our peer
setTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer), timeout = nodeParams.revocationTimeout, repeat = false)
handleCommandSuccess(c, d.copy(commitments = commitments1)) storing() sending commit
handleCommandSuccess(c, d.copy(commitments = commitments1)) storing() sending commit acking(commitments1.localChanges.signed)
case Failure(cause) => handleCommandError(cause, c)
}
case Left(waitForRevocation) =>
Expand Down Expand Up @@ -2390,6 +2382,23 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
state
}

/**
* We don't acknowledge htlc commands immediately, because we send them to the channel as soon as possible, and they
* may not yet have been written to the database.
* @param cmd fail/fulfill command that has been processed
*/
def acking(channelId: ByteVector32, cmd: HtlcSettlementCommand): FSM.State[fr.acinq.eclair.channel.State, Data] = {
log.debug("scheduling acknowledgement of cmd id={}", cmd.id)
context.system.scheduler.scheduleOnce(10 seconds)(PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, channelId, cmd))(context.system.dispatcher)
state
}

def acking(updates: List[UpdateMessage]): FSM.State[fr.acinq.eclair.channel.State, Data] = {
log.debug("scheduling acknowledgement of cmds ids={}", updates.collect { case s: HtlcSettlementMessage => s.id }.mkString(","))
context.system.scheduler.scheduleOnce(10 seconds)(PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, updates))(context.system.dispatcher)
state
}

}

def send(msg: LightningMessage) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ sealed trait HasTemporaryChannelId extends LightningMessage { def temporaryChann
sealed trait HasChannelId extends LightningMessage { def channelId: ByteVector32 } // <- not in the spec
sealed trait HasChainHash extends LightningMessage { def chainHash: ByteVector32 } // <- not in the spec
sealed trait UpdateMessage extends HtlcMessage // <- not in the spec
sealed trait HtlcSettlementMessage extends UpdateMessage { def id: Long } // <- not in the spec
// @formatter:on

case class Init(features: Features, tlvs: TlvStream[InitTlv] = TlvStream.empty) extends SetupMessage {
Expand Down Expand Up @@ -132,16 +133,16 @@ case class UpdateAddHtlc(channelId: ByteVector32,

case class UpdateFulfillHtlc(channelId: ByteVector32,
id: Long,
paymentPreimage: ByteVector32) extends HtlcMessage with UpdateMessage with HasChannelId
paymentPreimage: ByteVector32) extends HtlcMessage with UpdateMessage with HasChannelId with HtlcSettlementMessage

case class UpdateFailHtlc(channelId: ByteVector32,
id: Long,
reason: ByteVector) extends HtlcMessage with UpdateMessage with HasChannelId
reason: ByteVector) extends HtlcMessage with UpdateMessage with HasChannelId with HtlcSettlementMessage

case class UpdateFailMalformedHtlc(channelId: ByteVector32,
id: Long,
onionHash: ByteVector32,
failureCode: Int) extends HtlcMessage with UpdateMessage with HasChannelId
failureCode: Int) extends HtlcMessage with UpdateMessage with HasChannelId with HtlcSettlementMessage

case class CommitSig(channelId: ByteVector32,
signature: ByteVector64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import fr.acinq.eclair.channel.Channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.channel.{ChannelErrorOccurred, _}
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment.OutgoingPacket
import fr.acinq.eclair.payment.relay.Relayer._
Expand Down Expand Up @@ -1253,6 +1254,24 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(initialState == bob.stateData)
}

test("recv CMD_FULFILL_HTLC (acknowledge in case of success)") { f =>
import f._
val sender = TestProbe()
val (r, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)

// actual test begins
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
val c = CMD_FULFILL_HTLC(htlc.id, r, replyTo_opt = Some(sender.ref))
// this would be done automatically when the relayer calls safeSend
bob.underlyingActor.nodeParams.db.pendingRelay.addPendingRelay(initialState.channelId, c)
bob ! c
bob2alice.expectMsgType[UpdateFulfillHtlc]
bob ! CMD_SIGN(replyTo_opt = Some(sender.ref))
bob2alice.expectMsgType[CommitSig]
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}

test("recv CMD_FULFILL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ class ShutdownStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wit
assert(initialState == bob.stateData)
}

test("recv CMD_FULFILL_HTLC (acknowledge in case of success)") { f =>
import f._
val sender = TestProbe()

// actual test begins
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
val c = CMD_FULFILL_HTLC(0, r1, replyTo_opt = Some(sender.ref))
// this would be done automatically when the relayer calls safeSend
bob.underlyingActor.nodeParams.db.pendingRelay.addPendingRelay(initialState.channelId, c)
bob ! c
bob2alice.expectMsgType[UpdateFulfillHtlc]
bob ! CMD_SIGN(replyTo_opt = Some(sender.ref))
bob2alice.expectMsgType[CommitSig]
awaitCond(bob.underlyingActor.nodeParams.db.pendingRelay.listPendingRelay(initialState.channelId).isEmpty)
}

test("recv CMD_FULFILL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
Expand Down

0 comments on commit 848b433

Please sign in to comment.