Skip to content

Commit

Permalink
Improve getsentinfo accuracy (#2142)
Browse files Browse the repository at this point in the history
The PaymentInitiator now stores all pending payments, where it previously
only stored trampoline payments (for which it handled the retries).

Whenever the DB doesn't contain any pending payment we also ask the
payment initiator whether a payment attempt is being started, which lets
us provide more accurate information with the `sentinfo` RPC.
  • Loading branch information
t-bast committed Jan 24, 2022
1 parent 2d64187 commit d59d434
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 183 deletions.
36 changes: 29 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment}
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment, OutgoingPaymentStatus}
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
import fr.acinq.eclair.io._
import fr.acinq.eclair.message.{OnionMessages, Postman}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, RelayFees, UsableBalance}
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.PreimageReceived
import fr.acinq.eclair.payment.send.PaymentInitiator.{SendPaymentToNode, SendPaymentToRoute, SendPaymentToRouteResponse, SendSpontaneousPayment}
import fr.acinq.eclair.payment.send.PaymentInitiator._
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol.MessageOnionCodecs.blindedRouteCodec
Expand Down Expand Up @@ -375,11 +375,33 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
}
}

override def sentInfo(id: Either[UUID, ByteVector32])(implicit timeout: Timeout): Future[Seq[OutgoingPayment]] = Future {
id match {
case Left(uuid) => appKit.nodeParams.db.payments.listOutgoingPayments(uuid)
case Right(paymentHash) => appKit.nodeParams.db.payments.listOutgoingPayments(paymentHash)
}
override def sentInfo(id: Either[UUID, ByteVector32])(implicit timeout: Timeout): Future[Seq[OutgoingPayment]] = {
Future {
id match {
case Left(uuid) => appKit.nodeParams.db.payments.listOutgoingPayments(uuid)
case Right(paymentHash) => appKit.nodeParams.db.payments.listOutgoingPayments(paymentHash)
}
}.flatMap(outgoingDbPayments => {
if (!outgoingDbPayments.exists(_.status == OutgoingPaymentStatus.Pending)) {
// We don't have any pending payment in the DB, but we may have just started a payment that hasn't written to the DB yet.
// We ask the payment initiator and if that's the case, we build a dummy payment placeholder to let the caller know
// that a payment attempt is in progress (even though we don't have information yet about the actual HTLCs).
(appKit.paymentInitiator ? GetPayment(id)).mapTo[GetPaymentResponse].map {
case NoPendingPayment(_) => outgoingDbPayments
case PaymentIsPending(paymentId, paymentHash, pending) =>
val paymentType = "placeholder"
val dummyOutgoingPayment = pending match {
case PendingSpontaneousPayment(_, r) => OutgoingPayment(paymentId, paymentId, r.externalId, paymentHash, paymentType, r.recipientAmount, r.recipientAmount, r.recipientNodeId, TimestampMilli.now(), None, OutgoingPaymentStatus.Pending)
case PendingPaymentToNode(_, r) => OutgoingPayment(paymentId, paymentId, r.externalId, paymentHash, paymentType, r.recipientAmount, r.recipientAmount, r.recipientNodeId, TimestampMilli.now(), Some(r.paymentRequest), OutgoingPaymentStatus.Pending)
case PendingPaymentToRoute(_, r) => OutgoingPayment(paymentId, paymentId, r.externalId, paymentHash, paymentType, r.recipientAmount, r.recipientAmount, r.recipientNodeId, TimestampMilli.now(), Some(r.paymentRequest), OutgoingPaymentStatus.Pending)
case PendingTrampolinePayment(_, _, r) => OutgoingPayment(paymentId, paymentId, None, paymentHash, paymentType, r.recipientAmount, r.recipientAmount, r.recipientNodeId, TimestampMilli.now(), Some(r.paymentRequest), OutgoingPaymentStatus.Pending)
}
dummyOutgoingPayment +: outgoingDbPayments
}
} else {
Future.successful(outgoingDbPayments)
}
})
}

override def receivedInfo(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[IncomingPayment]] = Future {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ class PaymentInitiator(nodeParams: NodeParams, outgoingPaymentFactory: PaymentIn
sender() ! PaymentFailed(paymentId, r.paymentHash, LocalFailure(r.recipientAmount, Nil, PaymentSecretMissing) :: Nil)
case Some(paymentSecret) if r.paymentRequest.features.allowMultiPart && nodeParams.features.hasFeature(BasicMultiPartPayment) =>
val fsm = outgoingPaymentFactory.spawnOutgoingMultiPartPayment(context, paymentCfg)
fsm ! SendMultiPartPayment(sender(), paymentSecret, r.recipientNodeId, r.recipientAmount, finalExpiry, r.maxAttempts, r.paymentRequest.paymentMetadata, r.assistedRoutes, r.routeParams, userCustomTlvs = r.userCustomTlvs)
fsm ! SendMultiPartPayment(self, paymentSecret, r.recipientNodeId, r.recipientAmount, finalExpiry, r.maxAttempts, r.paymentRequest.paymentMetadata, r.assistedRoutes, r.routeParams, userCustomTlvs = r.userCustomTlvs)
context become main(pending + (paymentId -> PendingPaymentToNode(sender(), r)))
case Some(paymentSecret) =>
val finalPayload = PaymentOnion.createSinglePartPayload(r.recipientAmount, finalExpiry, paymentSecret, r.paymentRequest.paymentMetadata, r.userCustomTlvs)
val fsm = outgoingPaymentFactory.spawnOutgoingPayment(context, paymentCfg)
fsm ! PaymentLifecycle.SendPaymentToNode(sender(), r.recipientNodeId, finalPayload, r.maxAttempts, r.assistedRoutes, r.routeParams)
fsm ! PaymentLifecycle.SendPaymentToNode(self, r.recipientNodeId, finalPayload, r.maxAttempts, r.assistedRoutes, r.routeParams)
context become main(pending + (paymentId -> PendingPaymentToNode(sender(), r)))
}

case r: SendSpontaneousPayment =>
Expand All @@ -74,7 +76,8 @@ class PaymentInitiator(nodeParams: NodeParams, outgoingPaymentFactory: PaymentIn
val finalExpiry = Channel.MIN_CLTV_EXPIRY_DELTA.toCltvExpiry(nodeParams.currentBlockHeight + 1)
val finalPayload = PaymentOnion.FinalTlvPayload(TlvStream(Seq(OnionPaymentPayloadTlv.AmountToForward(r.recipientAmount), OnionPaymentPayloadTlv.OutgoingCltv(finalExpiry), OnionPaymentPayloadTlv.PaymentData(randomBytes32(), r.recipientAmount), OnionPaymentPayloadTlv.KeySend(r.paymentPreimage)), r.userCustomTlvs))
val fsm = outgoingPaymentFactory.spawnOutgoingPayment(context, paymentCfg)
fsm ! PaymentLifecycle.SendPaymentToNode(sender(), r.recipientNodeId, finalPayload, r.maxAttempts, routeParams = r.routeParams)
fsm ! PaymentLifecycle.SendPaymentToNode(self, r.recipientNodeId, finalPayload, r.maxAttempts, routeParams = r.routeParams)
context become main(pending + (paymentId -> PendingSpontaneousPayment(sender(), r)))

case r: SendTrampolinePayment =>
val paymentId = UUID.randomUUID()
Expand All @@ -88,56 +91,13 @@ class PaymentInitiator(nodeParams: NodeParams, outgoingPaymentFactory: PaymentIn
log.info(s"sending trampoline payment with trampoline fees=$trampolineFees and expiry delta=$trampolineExpiryDelta")
sendTrampolinePayment(paymentId, r, trampolineFees, trampolineExpiryDelta) match {
case Success(_) =>
context become main(pending + (paymentId -> PendingPayment(sender(), remainingAttempts, r)))
context become main(pending + (paymentId -> PendingTrampolinePayment(sender(), remainingAttempts, r)))
case Failure(t) =>
log.warning("cannot send outgoing trampoline payment: {}", t.getMessage)
sender() ! PaymentFailed(paymentId, r.paymentHash, LocalFailure(r.recipientAmount, Nil, t) :: Nil)
}
}

case pf: PaymentFailed => pending.get(pf.id).foreach(pp => {
val trampolineRoute = Seq(
NodeHop(nodeParams.nodeId, pp.r.trampolineNodeId, nodeParams.expiryDelta, 0 msat),
NodeHop(pp.r.trampolineNodeId, pp.r.recipientNodeId, pp.r.trampolineAttempts.last._2, pp.r.trampolineAttempts.last._1)
)
val decryptedFailures = pf.failures.collect { case RemoteFailure(_, _, Sphinx.DecryptedFailurePacket(_, f)) => f }
val shouldRetry = decryptedFailures.contains(TrampolineFeeInsufficient) || decryptedFailures.contains(TrampolineExpiryTooSoon)
if (shouldRetry) {
pp.remainingAttempts match {
case (trampolineFees, trampolineExpiryDelta) :: remaining =>
log.info(s"retrying trampoline payment with trampoline fees=$trampolineFees and expiry delta=$trampolineExpiryDelta")
sendTrampolinePayment(pf.id, pp.r, trampolineFees, trampolineExpiryDelta) match {
case Success(_) =>
context become main(pending + (pf.id -> pp.copy(remainingAttempts = remaining)))
case Failure(t) =>
log.warning("cannot send outgoing trampoline payment: {}", t.getMessage)
val localFailure = pf.copy(failures = Seq(LocalFailure(pp.r.recipientAmount, trampolineRoute, t)))
pp.sender ! localFailure
context.system.eventStream.publish(localFailure)
context become main(pending - pf.id)
}
case Nil =>
log.info("trampoline node couldn't find a route after all retries")
val localFailure = pf.copy(failures = Seq(LocalFailure(pp.r.recipientAmount, trampolineRoute, RouteNotFound)))
pp.sender ! localFailure
context.system.eventStream.publish(localFailure)
context become main(pending - pf.id)
}
} else {
pp.sender ! pf
context.system.eventStream.publish(pf)
context become main(pending - pf.id)
}
})

case _: PreimageReceived => // we received the preimage, but we wait for the PaymentSent event that will contain more data

case ps: PaymentSent => pending.get(ps.id).foreach(pp => {
pp.sender ! ps
context.system.eventStream.publish(ps)
context become main(pending - ps.id)
})

case r: SendPaymentToRoute =>
val paymentId = UUID.randomUUID()
val parentPaymentId = r.parentId.getOrElse(UUID.randomUUID())
Expand All @@ -155,18 +115,81 @@ class PaymentInitiator(nodeParams: NodeParams, outgoingPaymentFactory: PaymentIn
val trampolineSecret = r.trampolineSecret.getOrElse(randomBytes32())
sender() ! SendPaymentToRouteResponse(paymentId, parentPaymentId, Some(trampolineSecret))
val payFsm = outgoingPaymentFactory.spawnOutgoingPayment(context, paymentCfg)
payFsm ! PaymentLifecycle.SendPaymentToRoute(sender(), Left(r.route), PaymentOnion.createMultiPartPayload(r.amount, trampolineAmount, trampolineExpiry, trampolineSecret, r.paymentRequest.paymentMetadata, Seq(OnionPaymentPayloadTlv.TrampolineOnion(trampolineOnion))), r.paymentRequest.routingInfo)
payFsm ! PaymentLifecycle.SendPaymentToRoute(self, Left(r.route), PaymentOnion.createMultiPartPayload(r.amount, trampolineAmount, trampolineExpiry, trampolineSecret, r.paymentRequest.paymentMetadata, Seq(OnionPaymentPayloadTlv.TrampolineOnion(trampolineOnion))), r.paymentRequest.routingInfo)
context become main(pending + (paymentId -> PendingPaymentToRoute(sender(), r)))
case Failure(t) =>
log.warning("cannot send outgoing trampoline payment: {}", t.getMessage)
sender() ! PaymentFailed(paymentId, r.paymentHash, LocalFailure(r.recipientAmount, Nil, t) :: Nil)
}
case Nil =>
sender() ! SendPaymentToRouteResponse(paymentId, parentPaymentId, None)
val payFsm = outgoingPaymentFactory.spawnOutgoingPayment(context, paymentCfg)
payFsm ! PaymentLifecycle.SendPaymentToRoute(sender(), Left(r.route), PaymentOnion.createMultiPartPayload(r.amount, r.recipientAmount, finalExpiry, r.paymentRequest.paymentSecret.get, r.paymentRequest.paymentMetadata), r.paymentRequest.routingInfo)
payFsm ! PaymentLifecycle.SendPaymentToRoute(self, Left(r.route), PaymentOnion.createMultiPartPayload(r.amount, r.recipientAmount, finalExpiry, r.paymentRequest.paymentSecret.get, r.paymentRequest.paymentMetadata), r.paymentRequest.routingInfo)
context become main(pending + (paymentId -> PendingPaymentToRoute(sender(), r)))
case _ =>
sender() ! PaymentFailed(paymentId, r.paymentHash, LocalFailure(r.recipientAmount, Nil, TrampolineMultiNodeNotSupported) :: Nil)
}

case pf: PaymentFailed => pending.get(pf.id).foreach {
case pp: PendingTrampolinePayment =>
val trampolineRoute = Seq(
NodeHop(nodeParams.nodeId, pp.r.trampolineNodeId, nodeParams.expiryDelta, 0 msat),
NodeHop(pp.r.trampolineNodeId, pp.r.recipientNodeId, pp.r.trampolineAttempts.last._2, pp.r.trampolineAttempts.last._1)
)
val decryptedFailures = pf.failures.collect { case RemoteFailure(_, _, Sphinx.DecryptedFailurePacket(_, f)) => f }
val shouldRetry = decryptedFailures.contains(TrampolineFeeInsufficient) || decryptedFailures.contains(TrampolineExpiryTooSoon)
if (shouldRetry) {
pp.remainingAttempts match {
case (trampolineFees, trampolineExpiryDelta) :: remaining =>
log.info(s"retrying trampoline payment with trampoline fees=$trampolineFees and expiry delta=$trampolineExpiryDelta")
sendTrampolinePayment(pf.id, pp.r, trampolineFees, trampolineExpiryDelta) match {
case Success(_) =>
context become main(pending + (pf.id -> pp.copy(remainingAttempts = remaining)))
case Failure(t) =>
log.warning("cannot send outgoing trampoline payment: {}", t.getMessage)
val localFailure = pf.copy(failures = Seq(LocalFailure(pp.r.recipientAmount, trampolineRoute, t)))
pp.sender ! localFailure
context.system.eventStream.publish(localFailure)
context become main(pending - pf.id)
}
case Nil =>
log.info("trampoline node couldn't find a route after all retries")
val localFailure = pf.copy(failures = Seq(LocalFailure(pp.r.recipientAmount, trampolineRoute, RouteNotFound)))
pp.sender ! localFailure
context.system.eventStream.publish(localFailure)
context become main(pending - pf.id)
}
} else {
pp.sender ! pf
context.system.eventStream.publish(pf)
context become main(pending - pf.id)
}
case pp =>
pp.sender ! pf
context become main(pending - pf.id)
}

case _: PreimageReceived => // we received the preimage, but we wait for the PaymentSent event that will contain more data

case ps: PaymentSent => pending.get(ps.id).foreach(pp => {
pp.sender ! ps
pp match {
case _: PendingTrampolinePayment => context.system.eventStream.publish(ps)
case _ => // other types of payment internally handle publishing the event
}
context become main(pending - ps.id)
})

case GetPayment(id) =>
val pending_opt = id match {
case Left(paymentId) => pending.get(paymentId).map(pp => (paymentId, pp))
case Right(paymentHash) => pending.collectFirst { case (paymentId, pp) if pp.paymentHash == paymentHash => (paymentId, pp) }
}
pending_opt match {
case Some((paymentId, pp)) => sender() ! PaymentIsPending(paymentId, pp.paymentHash, pp)
case None => sender() ! NoPendingPayment(id)
}

}

private def buildTrampolinePayment(r: SendRequestedPayment, trampolineNodeId: PublicKey, trampolineFees: MilliSatoshi, trampolineExpiryDelta: CltvExpiryDelta): Try[(MilliSatoshi, CltvExpiry, OnionRoutingPacket)] = {
Expand Down Expand Up @@ -225,7 +248,23 @@ object PaymentInitiator {

def props(nodeParams: NodeParams, outgoingPaymentFactory: MultiPartPaymentFactory) = Props(new PaymentInitiator(nodeParams, outgoingPaymentFactory))

case class PendingPayment(sender: ActorRef, remainingAttempts: Seq[(MilliSatoshi, CltvExpiryDelta)], r: SendTrampolinePayment)
// @formatter:off
sealed trait PendingPayment {
def sender: ActorRef
def paymentHash: ByteVector32
}
case class PendingSpontaneousPayment(sender: ActorRef, request: SendSpontaneousPayment) extends PendingPayment { override def paymentHash: ByteVector32 = request.paymentHash }
case class PendingPaymentToNode(sender: ActorRef, request: SendPaymentToNode) extends PendingPayment { override def paymentHash: ByteVector32 = request.paymentHash }
case class PendingPaymentToRoute(sender: ActorRef, request: SendPaymentToRoute) extends PendingPayment { override def paymentHash: ByteVector32 = request.paymentHash }
case class PendingTrampolinePayment(sender: ActorRef, remainingAttempts: Seq[(MilliSatoshi, CltvExpiryDelta)], r: SendTrampolinePayment) extends PendingPayment { override def paymentHash: ByteVector32 = r.paymentHash }
// @formatter:on

// @formatter:off
case class GetPayment(id: Either[UUID, ByteVector32])
sealed trait GetPaymentResponse
case class NoPendingPayment(id: Either[UUID, ByteVector32]) extends GetPaymentResponse
case class PaymentIsPending(paymentId: UUID, paymentHash: ByteVector32, pending: PendingPayment) extends GetPaymentResponse
// @formatter:on

sealed trait SendRequestedPayment {
// @formatter:off
Expand Down
Loading

0 comments on commit d59d434

Please sign in to comment.