do db updates in background thread and improve efficiency when refreshing a deep wallet

This commit is contained in:
Craig Raw 2022-01-09 11:06:17 +02:00
parent c28c2ed506
commit 6f11a20feb
4 changed files with 71 additions and 26 deletions

2
drongo

@ -1 +1 @@
Subproject commit 083288061ffe6e08805bb58108a9afab0d93fb0f
Subproject commit 3a557e3af8bd17abf7697f93e586baf67745b460

View file

@ -16,6 +16,7 @@ import com.sparrowwallet.sparrow.wallet.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import com.zaxxer.hikari.pool.HikariPool;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.exception.FlywayValidateException;
@ -33,6 +34,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.StandardCopyOption;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -55,6 +59,7 @@ public class DbPersistence implements Persistence {
private Wallet masterWallet;
private final Map<Wallet, DirtyPersistables> dirtyPersistablesMap = new HashMap<>();
private ExecutorService updateExecutor;
public DbPersistence() {
EventManager.get().register(this);
@ -94,6 +99,8 @@ public class DbPersistence implements Persistence {
Map<WalletBackupAndKey, Storage> childWallets = loadChildWallets(storage, masterWallet, backupWallet, encryptionKey);
masterWallet.setChildWallets(childWallets.keySet().stream().map(WalletBackupAndKey::getWallet).collect(Collectors.toList()));
createUpdateExecutor(masterWallet);
return new WalletBackupAndKey(masterWallet, backupWallet, encryptionKey, keyDeriver, childWallets);
}
@ -157,7 +164,21 @@ public class DbPersistence implements Persistence {
@Override
public void updateWallet(Storage storage, Wallet wallet, ECKey encryptionPubKey) throws StorageException {
updatePassword(storage, encryptionPubKey);
update(storage, wallet, getFilePassword(encryptionPubKey));
updateExecutor.execute(() -> {
try {
update(storage, wallet, getFilePassword(encryptionPubKey));
} catch(Exception e) {
log.error("Error updating wallet db", e);
}
});
}
private synchronized void createUpdateExecutor(Wallet masterWallet) {
if(updateExecutor == null) {
BasicThreadFactory factory = new BasicThreadFactory.Builder().namingPattern(masterWallet.getFullName() + "-dbupdater").daemon(false).priority(Thread.NORM_PRIORITY).build();
updateExecutor = Executors.newSingleThreadExecutor(factory);
}
}
private File renameToDbFile(File walletFile) throws IOException {
@ -337,6 +358,7 @@ public class DbPersistence implements Persistence {
if(wallet.isMasterWallet()) {
masterWallet = wallet;
createUpdateExecutor(masterWallet);
}
}
@ -549,6 +571,24 @@ public class DbPersistence implements Persistence {
@Override
public void close() {
EventManager.get().unregister(this);
if(updateExecutor != null) {
updateExecutor.shutdown();
try {
if(!updateExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
updateExecutor.shutdownNow();
}
closeDataSource();
} catch (InterruptedException e) {
updateExecutor.shutdownNow();
closeDataSource();
}
} else {
closeDataSource();
}
}
private void closeDataSource() {
if(dataSource != null && !dataSource.isClosed()) {
dataSource.close();
}
@ -639,85 +679,87 @@ public class DbPersistence implements Persistence {
@Subscribe
public void walletDeleted(WalletDeletedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).deleteAccount = true;
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).deleteAccount = true);
}
}
@Subscribe
public void walletHistoryCleared(WalletHistoryClearedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).clearHistory = true;
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).clearHistory = true);
}
}
@Subscribe
public void walletHistoryChanged(WalletHistoryChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).historyNodes.addAll(event.getHistoryChangedNodes());
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).historyNodes.addAll(event.getHistoryChangedNodes()));
}
}
@Subscribe
public void walletLabelChanged(WalletLabelChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).label = event.getLabel();
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).label = event.getLabel());
}
}
@Subscribe
public void walletBlockHeightChanged(WalletBlockHeightChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).blockHeight = event.getBlockHeight();
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).blockHeight = event.getBlockHeight());
}
}
@Subscribe
public void walletGapLimitChanged(WalletGapLimitChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).gapLimit = event.getGapLimit();
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).gapLimit = event.getGapLimit());
}
}
@Subscribe
public void walletEntryLabelsChanged(WalletEntryLabelsChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).labelEntries.addAll(event.getEntries());
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).labelEntries.addAll(event.getEntries()));
}
}
@Subscribe
public void walletUtxoStatusChanged(WalletUtxoStatusChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).utxoStatuses.addAll(event.getUtxos());
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).utxoStatuses.addAll(event.getUtxos()));
}
}
@Subscribe
public void walletMixConfigChanged(WalletMixConfigChangedEvent event) {
if(persistsFor(event.getWallet()) && event.getWallet().getMixConfig() != null) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).mixConfig = true;
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).mixConfig = true);
}
}
@Subscribe
public void walletUtxoMixesChanged(WalletUtxoMixesChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).changedUtxoMixes.putAll(event.getChangedUtxoMixes());
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).removedUtxoMixes.putAll(event.getRemovedUtxoMixes());
updateExecutor.execute(() -> {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).changedUtxoMixes.putAll(event.getChangedUtxoMixes());
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).removedUtxoMixes.putAll(event.getRemovedUtxoMixes());
});
}
}
@Subscribe
public void keystoreLabelsChanged(KeystoreLabelsChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).labelKeystores.addAll(event.getChangedKeystores());
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).labelKeystores.addAll(event.getChangedKeystores()));
}
}
@Subscribe
public void keystoreEncryptionChanged(KeystoreEncryptionChangedEvent event) {
if(persistsFor(event.getWallet())) {
dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).encryptionKeystores.addAll(event.getChangedKeystores());
updateExecutor.execute(() -> dirtyPersistablesMap.computeIfAbsent(event.getWallet(), key -> new DirtyPersistables()).encryptionKeystores.addAll(event.getChangedKeystores()));
}
}

View file

@ -2,6 +2,7 @@ package com.sparrowwallet.sparrow.wallet;
import com.google.common.eventbus.Subscribe;
import com.sparrowwallet.drongo.KeyPurpose;
import com.sparrowwallet.drongo.protocol.HashIndex;
import com.sparrowwallet.drongo.protocol.TransactionInput;
import com.sparrowwallet.drongo.protocol.TransactionOutput;
import com.sparrowwallet.drongo.wallet.*;
@ -85,25 +86,24 @@ public class TransactionEntry extends Entry implements Comparable<TransactionEnt
}
}
public boolean isComplete() {
public boolean isComplete(Map<HashIndex, BlockTransactionHashIndex> walletTxos) {
int validEntries = 0;
Map<BlockTransactionHashIndex, WalletNode> walletTxos = getWallet().getWalletTxos();
for(TransactionInput txInput : blockTransaction.getTransaction().getInputs()) {
Optional<BlockTransactionHashIndex> optRef = walletTxos.keySet().stream().filter(ref -> ref.getHash().equals(txInput.getOutpoint().getHash()) && ref.getIndex() == txInput.getOutpoint().getIndex()).findFirst();
if(optRef.isPresent()) {
BlockTransactionHashIndex ref = walletTxos.get(new HashIndex(txInput.getOutpoint().getHash(), txInput.getOutpoint().getIndex()));
if(ref != null) {
validEntries++;
if(getChildren().stream().noneMatch(entry -> ((HashIndexEntry)entry).getHashIndex().equals(optRef.get().getSpentBy()) && ((HashIndexEntry)entry).getType().equals(HashIndexEntry.Type.INPUT))) {
log.warn("TransactionEntry " + blockTransaction.getHash() + " for wallet " + getWallet().getFullName() + " missing child for input " + optRef.get().getSpentBy() + " on output " + optRef.get());
if(getChildren().stream().noneMatch(entry -> ((HashIndexEntry)entry).getHashIndex().equals(ref.getSpentBy()) && ((HashIndexEntry)entry).getType().equals(HashIndexEntry.Type.INPUT))) {
log.warn("TransactionEntry " + blockTransaction.getHash() + " for wallet " + getWallet().getFullName() + " missing child for input " + ref.getSpentBy() + " on output " + ref);
return false;
}
}
}
for(TransactionOutput txOutput : blockTransaction.getTransaction().getOutputs()) {
Optional<BlockTransactionHashIndex> optRef = walletTxos.keySet().stream().filter(ref -> ref.getHash().equals(txOutput.getHash()) && ref.getIndex() == txOutput.getIndex()).findFirst();
if(optRef.isPresent()) {
BlockTransactionHashIndex ref = walletTxos.get(new HashIndex(txOutput.getHash(), txOutput.getIndex()));
if(ref != null) {
validEntries++;
if(getChildren().stream().noneMatch(entry -> ((HashIndexEntry)entry).getHashIndex().equals(optRef.get()) && ((HashIndexEntry)entry).getType().equals(HashIndexEntry.Type.OUTPUT))) {
log.warn("TransactionEntry " + blockTransaction.getHash() + " for wallet " + getWallet().getFullName() + " missing child for output " + optRef.get());
if(getChildren().stream().noneMatch(entry -> ((HashIndexEntry)entry).getHashIndex().equals(ref) && ((HashIndexEntry)entry).getType().equals(HashIndexEntry.Type.OUTPUT))) {
log.warn("TransactionEntry " + blockTransaction.getHash() + " for wallet " + getWallet().getFullName() + " missing child for output " + ref);
return false;
}
}

View file

@ -1,6 +1,7 @@
package com.sparrowwallet.sparrow.wallet;
import com.sparrowwallet.drongo.KeyPurpose;
import com.sparrowwallet.drongo.protocol.HashIndex;
import com.sparrowwallet.drongo.wallet.BlockTransaction;
import com.sparrowwallet.drongo.wallet.BlockTransactionHashIndex;
import com.sparrowwallet.drongo.wallet.Wallet;
@ -67,7 +68,9 @@ public class WalletTransactionsEntry extends Entry {
calculateBalances();
List<Entry> entriesComplete = entriesAdded.stream().filter(txEntry -> ((TransactionEntry)txEntry).isComplete()).collect(Collectors.toList());
Map<HashIndex, BlockTransactionHashIndex> walletTxos = getWallet().getWalletTxos().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(entry -> new HashIndex(entry.getKey().getHash(), entry.getKey().getIndex()), Map.Entry::getKey));
List<Entry> entriesComplete = entriesAdded.stream().filter(txEntry -> ((TransactionEntry)txEntry).isComplete(walletTxos)).collect(Collectors.toList());
if(!entriesComplete.isEmpty()) {
EventManager.get().post(new NewWalletTransactionsEvent(getWallet(), entriesAdded.stream().map(entry -> (TransactionEntry)entry).collect(Collectors.toList())));
}