Skip to content

Commit

Permalink
Add listener to pick up warning tx broadcast
Browse files Browse the repository at this point in the history
Provide a 'SetupWarningTxListener' trade task, which runs at the opening
of the trade and upon initialisation of the trade manager at application
startup. It adds a listener which picks up either warning tx and updates
the dispute state to 'WARNING_SENT(_BY_PEER)', as appropriate.

As the peer's warning tx may be unknown (at least in the unlikely event
that sensitive data was cleared out of an unfailed trade), the listener
detects any spend of the deposit tx escrow output. (This functionality
will also be needed to pick up the peer's claim tx, which has a
completely unknown txId.) To this end, provide a new listener type,
'OutputSpendConfidenceListener', which can be added to or removed from a
'WalletService' instance and detects change in the confidence of any tx
spending the provided (non-detached) 'TranactionOutput' instance.

(Also do some minor cleanup of the 'WalletService' class.)
  • Loading branch information
stejbac committed Sep 22, 2024
1 parent ce5087c commit 21b0c35
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.core.btc.listeners;

import org.bitcoinj.core.TransactionConfidence;
import org.bitcoinj.core.TransactionOutput;

import lombok.Getter;

public abstract class OutputSpendConfidenceListener {
@Getter
private final TransactionOutput output;

public OutputSpendConfidenceListener(TransactionOutput output) {
this.output = output;
}

public abstract void onOutputSpendConfidenceChanged(TransactionConfidence confidence);
}
52 changes: 34 additions & 18 deletions core/src/main/java/bisq/core/btc/wallet/WalletService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bisq.core.btc.exceptions.WalletException;
import bisq.core.btc.listeners.AddressConfidenceListener;
import bisq.core.btc.listeners.BalanceListener;
import bisq.core.btc.listeners.OutputSpendConfidenceListener;
import bisq.core.btc.listeners.TxConfidenceListener;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.http.MemPoolSpaceTxBroadcaster;
Expand Down Expand Up @@ -125,6 +126,7 @@ public abstract class WalletService {
private final BisqWalletListener walletEventListener = new BisqWalletListener();
private final CopyOnWriteArraySet<AddressConfidenceListener> addressConfidenceListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<TxConfidenceListener> txConfidenceListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<OutputSpendConfidenceListener> spendConfidenceListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<BalanceListener> balanceListeners = new CopyOnWriteArraySet<>();
private final WalletChangeEventListener cacheInvalidationListener;
private final AtomicReference<Multiset<Address>> txOutputAddressCache = new AtomicReference<>();
Expand Down Expand Up @@ -228,6 +230,14 @@ public void removeTxConfidenceListener(TxConfidenceListener listener) {
txConfidenceListeners.remove(listener);
}

public void addSpendConfidenceListener(OutputSpendConfidenceListener listener) {
spendConfidenceListeners.add(listener);
}

public void removeSpendConfidenceListener(OutputSpendConfidenceListener listener) {
spendConfidenceListeners.remove(listener);
}

public void addBalanceListener(BalanceListener listener) {
balanceListeners.add(listener);
}
Expand Down Expand Up @@ -446,29 +456,28 @@ public void broadcastTx(Transaction tx, TxBroadcaster.Callback callback, int tim

@Nullable
public TransactionConfidence getConfidenceForAddress(Address address) {
List<TransactionConfidence> transactionConfidenceList = new ArrayList<>();
if (wallet != null) {
Set<Transaction> transactions = getAddressToMatchingTxSetMultimap().get(address);
transactionConfidenceList.addAll(transactions.stream().map(tx ->
getTransactionConfidence(tx, address)).collect(Collectors.toList()));
return getMostRecentConfidence(transactions.stream()
.map(tx -> getTransactionConfidence(tx, address))
.collect(Collectors.toList()));
}
return getMostRecentConfidence(transactionConfidenceList);
return null;
}

@Nullable
public TransactionConfidence getConfidenceForAddressFromBlockHeight(Address address, long targetHeight) {
List<TransactionConfidence> transactionConfidenceList = new ArrayList<>();
if (wallet != null) {
Set<Transaction> transactions = getAddressToMatchingTxSetMultimap().get(address);
// "acceptable confidence" is either a new (pending) Tx, or a Tx confirmed after target block height
transactionConfidenceList.addAll(transactions.stream()
return getMostRecentConfidence(transactions.stream()
.map(tx -> getTransactionConfidence(tx, address))
.filter(Objects::nonNull)
.filter(con -> con.getConfidenceType() == PENDING ||
(con.getConfidenceType() == BUILDING && con.getAppearedAtChainHeight() > targetHeight))
.collect(Collectors.toList()));
}
return getMostRecentConfidence(transactionConfidenceList);
return null;
}

private SetMultimap<Address, Transaction> getAddressToMatchingTxSetMultimap() {
Expand Down Expand Up @@ -500,7 +509,7 @@ public TransactionConfidence getConfidenceForTxId(@Nullable String txId) {
}

@Nullable
private TransactionConfidence getTransactionConfidence(Transaction tx, Address address) {
private static TransactionConfidence getTransactionConfidence(Transaction tx, Address address) {
List<TransactionConfidence> transactionConfidenceList = getOutputsWithConnectedOutputs(tx).stream()
.filter(output -> address != null && address.equals(getAddressFromOutput(output)))
.flatMap(o -> Stream.ofNullable(o.getParentTransaction()))
Expand All @@ -510,7 +519,7 @@ private TransactionConfidence getTransactionConfidence(Transaction tx, Address a
}


private List<TransactionOutput> getOutputsWithConnectedOutputs(Transaction tx) {
private static List<TransactionOutput> getOutputsWithConnectedOutputs(Transaction tx) {
List<TransactionOutput> transactionOutputs = tx.getOutputs();
List<TransactionOutput> connectedOutputs = new ArrayList<>();

Expand All @@ -530,7 +539,7 @@ private List<TransactionOutput> getOutputsWithConnectedOutputs(Transaction tx) {
}

@Nullable
private TransactionConfidence getMostRecentConfidence(List<TransactionConfidence> transactionConfidenceList) {
private static TransactionConfidence getMostRecentConfidence(List<TransactionConfidence> transactionConfidenceList) {
TransactionConfidence transactionConfidence = null;
for (TransactionConfidence confidence : transactionConfidenceList) {
if (confidence != null) {
Expand Down Expand Up @@ -932,7 +941,7 @@ public void onCoinsSent(Wallet wallet, Transaction tx, Coin prevBalance, Coin ne

@Override
public void onReorganize(Wallet wallet) {
log.warn("onReorganize ");
log.warn("onReorganize");
}

@Override
Expand All @@ -941,13 +950,20 @@ public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
TransactionConfidence confidence = getTransactionConfidence(tx, addressConfidenceListener.getAddress());
addressConfidenceListener.onTransactionConfidenceChanged(confidence);
}
txConfidenceListeners.stream()
.filter(txConfidenceListener -> tx != null &&
tx.getTxId().toString() != null &&
txConfidenceListener != null &&
tx.getTxId().toString().equals(txConfidenceListener.getTxId()))
.forEach(txConfidenceListener ->
txConfidenceListener.onTransactionConfidenceChanged(tx.getConfidence()));
for (OutputSpendConfidenceListener listener : spendConfidenceListeners) {
TransactionInput spentBy = listener.getOutput().getSpentBy();
if (spentBy != null && tx.equals(spentBy.getParentTransaction())) {
listener.onOutputSpendConfidenceChanged(tx.getConfidence());
}
}
if (!txConfidenceListeners.isEmpty()) {
String txId = tx.getTxId().toString();
for (TxConfidenceListener listener : txConfidenceListeners) {
if (txId.equals(listener.getTxId())) {
listener.onTransactionConfidenceChanged(tx.getConfidence());
}
}
}
}

void notifyBalanceListeners(Transaction tx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bisq.core.trade.model.bisq_v1.BuyerTrade;
import bisq.core.trade.model.bisq_v1.Trade;
import bisq.core.trade.model.bisq_v1.Trade.Phase;
import bisq.core.trade.protocol.BuyerProtocol;
import bisq.core.trade.protocol.FluentProtocol;
import bisq.core.trade.protocol.TradeMessage;
Expand All @@ -35,6 +36,7 @@
import bisq.core.trade.protocol.bisq_v1.tasks.buyer.BuyerSetupPayoutTxListener;
import bisq.core.trade.protocol.bisq_v1.tasks.buyer.BuyerSignPayoutTx;
import bisq.core.trade.protocol.bisq_v5.messages.DepositTxAndSellerPaymentAccountMessage;
import bisq.core.trade.protocol.bisq_v5.tasks.SetupWarningTxListener;
import bisq.core.trade.protocol.bisq_v5.tasks.buyer.BuyerProcessDepositTxAndSellerPaymentAccountMessage;

import bisq.network.p2p.NodeAddress;
Expand Down Expand Up @@ -64,17 +66,23 @@ protected void onInitialized() {
super.onInitialized();
// We get called the constructor with any possible state and phase. As we don't want to log an error for such
// cases we use the alternative 'given' method instead of 'expect'.
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
given(phase(Phase.TAKER_FEE_PUBLISHED)
.with(BuyerEvent.STARTUP))
.setup(tasks(BuyerSetupDepositTxListener.class))
.executeTasks();

given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.FIAT_RECEIVED)
given(anyPhase(Phase.DEPOSIT_PUBLISHED, Phase.DEPOSIT_CONFIRMED, Phase.FIAT_SENT, Phase.FIAT_RECEIVED)
.preCondition(trade.hasV5Protocol()) // FIXME: If trade opened with v4 protocol, should use BaseBuyerProtocol_v4.
.with(BuyerEvent.STARTUP))
.setup(tasks(SetupWarningTxListener.class))
.executeTasks();

given(anyPhase(Phase.FIAT_SENT, Phase.FIAT_RECEIVED)
.with(BuyerEvent.STARTUP))
.setup(tasks(BuyerSetupPayoutTxListener.class))
.executeTasks();

given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.FIAT_RECEIVED)
given(anyPhase(Phase.FIAT_SENT, Phase.FIAT_RECEIVED)
.anyState(Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG,
Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG)
.with(BuyerEvent.STARTUP))
Expand Down Expand Up @@ -110,20 +118,21 @@ public void onMailboxMessage(TradeMessage message, NodeAddress peer) {
// mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again
// in the hope to reach the buyer directly in case of network issues.
protected void handle(DepositTxAndSellerPaymentAccountMessage message, NodeAddress peer) {
expect(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
expect(anyPhase(Phase.TAKER_FEE_PUBLISHED, Phase.DEPOSIT_PUBLISHED)
.with(message)
.from(peer)
.preCondition(trade.getDepositTx() == null || processModel.getTradePeer().getPaymentAccountPayload() == null,
() -> {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
log.warn("We received a DepositTxAndSellerPaymentAccountMessage but we have already processed the deposit tx and " +
"seller payment account so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
stopTimeout();
sendAckMessage(message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(BuyerProcessDepositTxAndSellerPaymentAccountMessage.class,
ApplyFilter.class,
SetupWarningTxListener.class,
VerifyPeersAccountAgeWitness.class,
BuyerSendsShareBuyerPaymentAccountMessage.class)
.using(new TradeTaskRunner(trade,
Expand All @@ -142,7 +151,7 @@ protected void handle(DepositTxAndSellerPaymentAccountMessage message, NodeAddre
@Override
public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
BuyerEvent event = BuyerEvent.PAYMENT_SENT;
expect(phase(Trade.Phase.DEPOSIT_CONFIRMED)
expect(phase(Phase.DEPOSIT_CONFIRMED)
.with(event)
.preCondition(trade.confirmPermitted()))
.setup(tasks(ApplyFilter.class,
Expand Down Expand Up @@ -171,7 +180,7 @@ public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler er
///////////////////////////////////////////////////////////////////////////////////////////

protected void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
expect(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
expect(anyPhase(Phase.FIAT_SENT, Phase.PAYOUT_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(BuyerProcessPayoutTxPublishedMessage.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bisq.core.trade.model.bisq_v1.SellerTrade;
import bisq.core.trade.model.bisq_v1.Trade;
import bisq.core.trade.model.bisq_v1.Trade.Phase;
import bisq.core.trade.protocol.FluentProtocol;
import bisq.core.trade.protocol.SellerProtocol;
import bisq.core.trade.protocol.TradeMessage;
Expand All @@ -38,6 +39,7 @@
import bisq.core.trade.protocol.bisq_v1.tasks.seller.SellerSignAndFinalizePayoutTx;
import bisq.core.trade.protocol.bisq_v5.messages.PreparedTxBuyerSignaturesMessage;
import bisq.core.trade.protocol.bisq_v5.tasks.AddWatchedScriptsToWallet;
import bisq.core.trade.protocol.bisq_v5.tasks.SetupWarningTxListener;
import bisq.core.trade.protocol.bisq_v5.tasks.seller.SellerProcessPreparedTxBuyerSignaturesMessage;
import bisq.core.trade.protocol.bisq_v5.tasks.seller.SellerSendsDepositTxAndSellerPaymentAccountMessage;

Expand All @@ -55,10 +57,26 @@ enum SellerEvent implements FluentProtocol.Event {
PAYMENT_RECEIVED
}

///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////

protected BaseSellerProtocol_v5(SellerTrade trade) {
super(trade);
}

@Override
protected void onInitialized() {
super.onInitialized();
// We get called the constructor with any possible state and phase. As we don't want to log an error for such
// cases we use the alternative 'given' method instead of 'expect'.
given(anyPhase(Phase.DEPOSIT_PUBLISHED, Phase.DEPOSIT_CONFIRMED, Phase.FIAT_SENT, Phase.FIAT_RECEIVED)
.preCondition(trade.hasV5Protocol()) // FIXME: If trade opened with v4 protocol, should use BaseSellerProtocol_v4.
.with(SellerEvent.STARTUP))
.setup(tasks(SetupWarningTxListener.class))
.executeTasks();
}


///////////////////////////////////////////////////////////////////////////////////////////
// Mailbox
Expand All @@ -79,19 +97,20 @@ public void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress)
///////////////////////////////////////////////////////////////////////////////////////////

protected void handle(PreparedTxBuyerSignaturesMessage message, NodeAddress peer) {
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
expect(phase(Phase.TAKER_FEE_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(SellerProcessPreparedTxBuyerSignaturesMessage.class,
AddWatchedScriptsToWallet.class,
SellerSendsDepositTxAndSellerPaymentAccountMessage.class,
SellerPublishesDepositTx.class,
SetupWarningTxListener.class,
SellerPublishesTradeStatistics.class))
.executeTasks();
}

protected void handle(ShareBuyerPaymentAccountMessage message, NodeAddress peer) {
expect(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED, Trade.Phase.DEPOSIT_CONFIRMED)
expect(anyPhase(Phase.TAKER_FEE_PUBLISHED, Phase.DEPOSIT_PUBLISHED, Phase.DEPOSIT_CONFIRMED)
.with(message)
.from(peer))
.setup(tasks(SellerProcessShareBuyerPaymentAccountMessage.class,
Expand All @@ -117,7 +136,7 @@ protected void handle(CounterCurrencyTransferStartedMessage message, NodeAddress
// a mailbox message with CounterCurrencyTransferStartedMessage.
// TODO A better fix would be to add a listener for the wallet sync state and process
// the mailbox msg once wallet is ready and trade state set.
expect(anyPhase(Trade.Phase.DEPOSIT_CONFIRMED, Trade.Phase.DEPOSIT_PUBLISHED)
expect(anyPhase(Phase.DEPOSIT_CONFIRMED, Phase.DEPOSIT_PUBLISHED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
Expand All @@ -143,7 +162,7 @@ protected void handle(CounterCurrencyTransferStartedMessage message, NodeAddress
@Override
public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
SellerEvent event = SellerEvent.PAYMENT_RECEIVED;
expect(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
expect(anyPhase(Phase.FIAT_SENT, Phase.PAYOUT_PUBLISHED)
.with(event)
.preCondition(trade.confirmPermitted()))
.setup(tasks(
Expand Down
Loading

0 comments on commit 21b0c35

Please sign in to comment.