Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(2/6) Remove onBatch interfaces in P2PDataStore #3621

Merged
merged 8 commits into from
Nov 26, 2019
32 changes: 19 additions & 13 deletions core/src/main/java/bisq/core/alert/AlertManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.math.BigInteger;

import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,22 +81,26 @@ public AlertManager(P2PService p2PService,
if (!ignoreDevMsg) {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
});
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.Proposal;

import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import bisq.common.UserThread;

import org.bitcoinj.core.TransactionConfidence;
Expand All @@ -55,8 +50,7 @@
* our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated.
*/
@Slf4j
public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener,
MyProposalListService.Listener, DaoSetupService {
public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener, DaoSetupService {
private final ProposalService proposalService;
private final DaoStateService daoStateService;
private final MyProposalListService myProposalListService;
Expand All @@ -66,7 +60,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Getter
private final FilteredList<Proposal> activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals);
private final ListChangeListener<Proposal> proposalListChangeListener;
private boolean tempProposalsChanged;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -76,7 +69,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Inject
public ProposalListPresentation(ProposalService proposalService,
DaoStateService daoStateService,
P2PDataStorage p2PDataStorage,
MyProposalListService myProposalListService,
BsqWalletService bsqWalletService,
ProposalValidatorProvider validatorProvider) {
Expand All @@ -87,7 +79,6 @@ public ProposalListPresentation(ProposalService proposalService,
this.validatorProvider = validatorProvider;

daoStateService.addDaoStateListener(this);
p2PDataStorage.addHashMapChangedListener(this);
myProposalListService.addListener(this);

proposalListChangeListener = c -> updateLists();
Expand Down Expand Up @@ -124,44 +115,6 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
updateLists();
}


///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onBatchRemoveExpiredDataStarted() {
// We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each
// remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call
// the updateLists method.
proposalService.getTempProposals().removeListener(proposalListChangeListener);
}

@Override
public void onBatchRemoveExpiredDataCompleted() {
proposalService.getTempProposals().addListener(proposalListChangeListener);
// We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms.
if (tempProposalsChanged) {
updateLists();
tempProposalsChanged = false;
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// MyProposalListService.Listener
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,13 +135,15 @@ public void start() {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
onProtectedDataAdded(entry, true);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataAdded(protectedStorageEntry, true);
});
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
onProtectedDataRemoved(entry);
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
onProtectedDataRemoved(protectedStorageEntries);
}


Expand Down Expand Up @@ -266,30 +270,39 @@ private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean fromBroad
}
}

private void onProtectedDataRemoved(ProtectedStorageEntry entry) {
ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposals.contains(proposal)) {
tempProposals.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
private void onProtectedDataRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {

// The listeners of tmpProposals can do large amounts of work that cause performance issues. Apply all of the
// updates at once using retainAll which will cause all listeners to be updated only once.
ArrayList<Proposal> tempProposalsWithUpdates = new ArrayList<>(tempProposals);

protectedStorageEntries.forEach(protectedStorageEntry -> {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposalsWithUpdates.contains(proposal)) {
tempProposalsWithUpdates.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
}
});

tempProposals.retainAll(tempProposalsWithUpdates);
}

private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) {
Expand Down
31 changes: 18 additions & 13 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.math.BigInteger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -133,23 +134,27 @@ public void onAllServicesInitialized() {

p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(protectedStorageEntry), 1);
}
}
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
});
}
});
}
Expand Down
37 changes: 21 additions & 16 deletions core/src/main/java/bisq/core/offer/OfferBookService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.File;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -87,26 +88,30 @@ public OfferBookService(P2PService p2PService,

p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
});
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
});
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.math.BigInteger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,18 +132,22 @@ public DisputeAgentManager(KeyRing keyRing,
public void onAllServicesInitialized() {
disputeAgentService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
removeAcceptedDisputeAgentFromUser(data);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
removeAcceptedDisputeAgentFromUser(protectedStorageEntry);
}
});
}
});

Expand Down
Loading