optimize electrum server queries

This commit is contained in:
Craig Raw 2020-09-17 15:10:56 +02:00
parent 084a3c76af
commit 4bad46c9e3
7 changed files with 174 additions and 90 deletions

2
drongo

@ -1 +1 @@
Subproject commit db8ef9e4a1362671128c694c6b8068c6c9e620b6
Subproject commit 5e281982cb3f4711486f53b02b1ea9d9b12a493e

View file

@ -1,6 +1,7 @@
package com.sparrowwallet.sparrow.net;
import com.github.arteam.simplejsonrpc.client.Transport;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;
import com.google.common.net.HostAndPort;
import com.sparrowwallet.drongo.KeyPurpose;
@ -33,7 +34,7 @@ public class ElectrumServer {
private static Transport transport;
private static final Map<String, String> subscribedScriptHashes = Collections.synchronizedMap(new HashMap<>());
private static final Map<String, Set<String>> subscribedScriptHashes = Collections.synchronizedMap(new HashMap<>());
private static ElectrumServerRpc electrumServerRpc = new SimpleElectrumServerRpc();
@ -134,9 +135,18 @@ public class ElectrumServer {
return receiveTransactionMap;
}
public Map<WalletNode, Set<BlockTransactionHash>> getHistory(Wallet wallet, WalletNode walletNode) throws ServerException {
Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = new TreeMap<>();
Collection<WalletNode> nodes = Set.of(walletNode);
subscribeWalletNodes(wallet, nodes, nodeTransactionMap, 0);
getReferences(wallet, nodes, nodeTransactionMap);
return nodeTransactionMap;
}
public void getHistory(Wallet wallet, KeyPurpose keyPurpose, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap) throws ServerException {
WalletNode purposeNode = wallet.getNode(keyPurpose);
getHistory(wallet, purposeNode.getChildren(), nodeTransactionMap, 0);
subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, 0);
//Because node children are added sequentially in WalletNode.fillToIndex, we can simply look at the number of children to determine the highest filled index
int historySize = purposeNode.getChildren().size();
@ -144,10 +154,18 @@ public class ElectrumServer {
int gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap);
while(historySize < gapLimitSize) {
purposeNode.fillToIndex(gapLimitSize - 1);
getHistory(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize);
subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize);
historySize = purposeNode.getChildren().size();
gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap);
}
//All WalletNode keys in nodeTransactionMap with non-null values need to have their history fetched
Collection<WalletNode> usedNodes = nodeTransactionMap.entrySet().stream().filter(entry -> entry.getValue() != null).map(Map.Entry::getKey).collect(Collectors.toList());
log.debug("Retrieving history for " + usedNodes.stream().map(WalletNode::getDerivationPath).collect(Collectors.joining(", ")));
getReferences(wallet, usedNodes, nodeTransactionMap);
//Set the remaining WalletNode keys to empty sets to indicate no history
nodeTransactionMap.entrySet().stream().filter(entry -> entry.getValue() == null).forEach(entry -> entry.setValue(Collections.emptySet()));
}
private int getGapLimitSize(Wallet wallet, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap) {
@ -155,22 +173,11 @@ public class ElectrumServer {
return highestIndex + wallet.getGapLimit() + 1;
}
public void getHistory(Wallet wallet, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap, int startIndex) throws ServerException {
getReferences(wallet, "blockchain.scripthash.get_history", nodes, nodeTransactionMap, startIndex);
subscribeWalletNodes(wallet, nodes, startIndex);
}
public void getMempool(Wallet wallet, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap, int startIndex) throws ServerException {
getReferences(wallet, "blockchain.scripthash.get_mempool", nodes, nodeTransactionMap, startIndex);
}
public void getReferences(Wallet wallet, String method, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap, int startIndex) throws ServerException {
public void getReferences(Wallet wallet, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap) throws ServerException {
try {
Map<String, String> pathScriptHashes = new LinkedHashMap<>(nodes.size());
for(WalletNode node : nodes) {
if(node.getIndex() >= startIndex) {
pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node));
}
pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node));
}
//Even if we have some successes, failure to retrieve all references will result in an incomplete wallet history. Don't proceed if that's the case.
@ -211,14 +218,18 @@ public class ElectrumServer {
}
}
public void subscribeWalletNodes(Wallet wallet, Collection<WalletNode> nodes, int startIndex) throws ServerException {
public void subscribeWalletNodes(Wallet wallet, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap, int startIndex) throws ServerException {
try {
Set<String> scriptHashes = new HashSet<>();
Map<String, String> pathScriptHashes = new LinkedHashMap<>();
for(WalletNode node : nodes) {
if(node.getIndex() >= startIndex) {
String scriptHash = getScriptHash(wallet, node);
if(!subscribedScriptHashes.containsKey(scriptHash) && scriptHashes.add(scriptHash)) {
if(getSubscribedScriptHashStatus(scriptHash) != null) {
//Already subscribed, but still need to fetch history from a used node
nodeTransactionMap.put(node, new TreeSet<>());
} else if(!subscribedScriptHashes.containsKey(scriptHash) && scriptHashes.add(scriptHash)) {
//Unique script hash we are not yet subscribed to
pathScriptHashes.put(node.getDerivationPath(), scriptHash);
}
}
@ -236,7 +247,15 @@ public class ElectrumServer {
Optional<WalletNode> optionalNode = nodes.stream().filter(n -> n.getDerivationPath().equals(path)).findFirst();
if(optionalNode.isPresent()) {
WalletNode node = optionalNode.get();
subscribedScriptHashes.put(getScriptHash(wallet, node), status);
String scriptHash = getScriptHash(wallet, node);
//Check if there is history for this script hash
if(status != null) {
//Set the value for this node to be an empty set to mark it as requiring a get_history RPC call for this wallet
nodeTransactionMap.put(node, new TreeSet<>());
}
updateSubscribedScriptHashStatus(scriptHash, status);
}
}
} catch (ElectrumServerRpcException e) {
@ -543,10 +562,24 @@ public class ElectrumServer {
return Utils.bytesToHex(reversed);
}
public static Map<String, String> getSubscribedScriptHashes() {
public static Map<String, Set<String>> getSubscribedScriptHashes() {
return subscribedScriptHashes;
}
public static String getSubscribedScriptHashStatus(String scriptHash) {
Set<String> existingStatuses = subscribedScriptHashes.get(scriptHash);
if(existingStatuses != null) {
return Iterables.getLast(existingStatuses);
}
return null;
}
public static void updateSubscribedScriptHashStatus(String scriptHash, String status) {
Set<String> existingStatuses = subscribedScriptHashes.computeIfAbsent(scriptHash, k -> new LinkedHashSet<>());
existingStatuses.add(status);
}
public static boolean supportsBatching(List<String> serverVersion) {
return serverVersion.size() > 0 && serverVersion.get(0).toLowerCase().contains("electrumx");
}
@ -707,9 +740,16 @@ public class ElectrumServer {
public static class TransactionHistoryService extends Service<Boolean> {
private final Wallet wallet;
private final WalletNode node;
public TransactionHistoryService(Wallet wallet) {
this.wallet = wallet;
this.node = null;
}
public TransactionHistoryService(Wallet wallet, WalletNode node) {
this.wallet = wallet;
this.node = node;
}
@Override
@ -717,7 +757,7 @@ public class ElectrumServer {
return new Task<>() {
protected Boolean call() throws ServerException {
ElectrumServer electrumServer = new ElectrumServer();
Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = electrumServer.getHistory(wallet);
Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = (node == null ? electrumServer.getHistory(wallet) : electrumServer.getHistory(wallet, node));
electrumServer.getReferencedTransactions(wallet, nodeTransactionMap);
electrumServer.calculateNodeHistory(wallet, nodeTransactionMap);
return true;

View file

@ -108,8 +108,8 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
Map<String, String> result = new LinkedHashMap<>();
for(String path : pathScriptHashes.keySet()) {
try {
client.createRequest().method("blockchain.scripthash.subscribe").id(path).params(pathScriptHashes.get(path)).executeNullable();
result.put(path, "");
String scriptHash = client.createRequest().returnAs(String.class).method("blockchain.scripthash.subscribe").id(path).params(pathScriptHashes.get(path)).executeNullable();
result.put(path, scriptHash);
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) {
//Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed.
throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e);

View file

@ -3,6 +3,7 @@ package com.sparrowwallet.sparrow.net;
import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcMethod;
import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcParam;
import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcService;
import com.google.common.collect.Iterables;
import com.sparrowwallet.sparrow.EventManager;
import com.sparrowwallet.sparrow.event.NewBlockEvent;
import com.sparrowwallet.sparrow.event.WalletNodeHistoryChangedEvent;
@ -10,7 +11,7 @@ import javafx.application.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Set;
@JsonRpcService
public class SubscriptionService {
@ -23,9 +24,17 @@ public class SubscriptionService {
@JsonRpcMethod("blockchain.scripthash.subscribe")
public void scriptHashStatusUpdated(@JsonRpcParam("scripthash") final String scriptHash, @JsonRpcParam("status") final String status) {
String oldStatus = ElectrumServer.getSubscribedScriptHashes().put(scriptHash, status);
if(Objects.equals(oldStatus, status)) {
Set<String> existingStatuses = ElectrumServer.getSubscribedScriptHashes().get(scriptHash);
if(existingStatuses == null) {
log.warn("Received script hash status update for unsubscribed script hash: " + scriptHash);
ElectrumServer.updateSubscribedScriptHashStatus(scriptHash, status);
} else if(existingStatuses.contains(status)) {
log.warn("Received script hash status update, but status has not changed");
return;
} else {
String oldStatus = Iterables.getLast(existingStatuses);
log.debug("Status updated for script hash " + scriptHash + ", was " + oldStatus + " now " + status);
existingStatuses.add(status);
}
Platform.runLater(() -> EventManager.get().post(new WalletNodeHistoryChangedEvent(scriptHash)));

View file

@ -11,6 +11,8 @@ import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.*;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TcpTransport implements Transport, Closeable {
@ -25,6 +27,9 @@ public class TcpTransport implements Transport, Closeable {
private String response;
private final ReentrantLock readLock = new ReentrantLock();
private final Condition readingCondition = readLock.newCondition();
private final ReentrantLock clientRequestLock = new ReentrantLock();
private boolean running = false;
private boolean reading = true;
@ -57,64 +62,83 @@ public class TcpTransport implements Transport, Closeable {
out.flush();
}
private synchronized String readResponse() throws IOException {
if(firstRead) {
notifyAll();
firstRead = false;
}
while(reading) {
try {
wait();
} catch(InterruptedException e) {
//Restore interrupt status and continue
Thread.currentThread().interrupt();
private String readResponse() throws IOException {
try {
if(!readLock.tryLock(60, TimeUnit.SECONDS)) {
throw new IOException("No response from server");
}
} catch(InterruptedException e) {
throw new IOException("Read thread interrupted");
}
if(lastException != null) {
throw new IOException("Error reading response: " + lastException.getMessage(), lastException);
try {
if(firstRead) {
readingCondition.signal();
firstRead = false;
}
while(reading) {
try {
readingCondition.await();
} catch(InterruptedException e) {
//Restore interrupt status and continue
Thread.currentThread().interrupt();
}
}
if(lastException != null) {
throw new IOException("Error reading response: " + lastException.getMessage(), lastException);
}
reading = true;
readingCondition.signal();
return response;
} finally {
readLock.unlock();
}
reading = true;
notifyAll();
return response;
}
public synchronized void readInputLoop() throws ServerException {
try {
//Don't start reading until first RPC request is sent
wait();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
public void readInputLoop() throws ServerException {
readLock.lock();
while(running) {
try {
try {
String received = readInputStream();
if(received.contains("method")) {
//Handle subscription notification
jsonRpcServer.handle(received, subscriptionService);
} else {
//Handle client's response
response = received;
reading = false;
notifyAll();
wait();
}
//Don't start reading until first RPC request is sent
readingCondition.await();
} catch(InterruptedException e) {
//Restore interrupt status and continue
Thread.currentThread().interrupt();
} catch(Exception e) {
if(running) {
lastException = e;
reading = false;
notifyAll();
//Allow this thread to terminate as we will need to reconnect with a new transport anyway
running = false;
}
while(running) {
try {
String received = readInputStream();
if(received.contains("method")) {
//Handle subscription notification
jsonRpcServer.handle(received, subscriptionService);
} else {
//Handle client's response
response = received;
reading = false;
readingCondition.signal();
readingCondition.await();
}
} catch(InterruptedException e) {
//Restore interrupt status and continue
Thread.currentThread().interrupt();
} catch(Exception e) {
log.debug("Connection error while reading", e);
if(running) {
lastException = e;
reading = false;
readingCondition.signal();
//Allow this thread to terminate as we will need to reconnect with a new transport anyway
running = false;
}
}
}
} finally {
readLock.unlock();
}
}

View file

@ -65,9 +65,14 @@ public class WalletForm {
}
public void refreshHistory(Integer blockHeight) {
refreshHistory(blockHeight, null);
}
public void refreshHistory(Integer blockHeight, WalletNode node) {
Wallet previousWallet = wallet.copy();
if(wallet.isValid() && AppController.isOnline()) {
ElectrumServer.TransactionHistoryService historyService = new ElectrumServer.TransactionHistoryService(wallet);
log.debug(node == null ? "Refreshing full wallet history" : "Requesting node wallet history for " + node.getDerivationPath());
ElectrumServer.TransactionHistoryService historyService = new ElectrumServer.TransactionHistoryService(wallet, node);
historyService.setOnSucceeded(workerStateEvent -> {
updateWallet(previousWallet, blockHeight);
});
@ -213,8 +218,9 @@ public class WalletForm {
@Subscribe
public void walletNodeHistoryChanged(WalletNodeHistoryChangedEvent event) {
if(event.getWalletNode(wallet) != null) {
refreshHistory(AppController.getCurrentBlockHeight());
WalletNode walletNode = event.getWalletNode(wallet);
if(walletNode != null) {
refreshHistory(AppController.getCurrentBlockHeight(), walletNode);
}
}

View file

@ -89,21 +89,26 @@ public class WalletTransactionsEntry extends Entry {
for(WalletNode addressNode : purposeNode.getChildren()) {
for(BlockTransactionHashIndex hashIndex : addressNode.getTransactionOutputs()) {
BlockTransaction inputTx = wallet.getTransactions().get(hashIndex.getHash());
WalletTransaction inputWalletTx = walletTransactionMap.get(inputTx);
if(inputWalletTx == null) {
inputWalletTx = new WalletTransaction(wallet, inputTx);
walletTransactionMap.put(inputTx, inputWalletTx);
}
inputWalletTx.incoming.put(hashIndex, keyPurpose);
if(hashIndex.getSpentBy() != null) {
BlockTransaction outputTx = wallet.getTransactions().get(hashIndex.getSpentBy().getHash());
WalletTransaction outputWalletTx = walletTransactionMap.get(outputTx);
if(outputWalletTx == null) {
outputWalletTx = new WalletTransaction(wallet, outputTx);
walletTransactionMap.put(outputTx, outputWalletTx);
//A null inputTx here means the wallet is still updating - ignore as the WalletHistoryChangedEvent will run this again
if(inputTx != null) {
WalletTransaction inputWalletTx = walletTransactionMap.get(inputTx);
if(inputWalletTx == null) {
inputWalletTx = new WalletTransaction(wallet, inputTx);
walletTransactionMap.put(inputTx, inputWalletTx);
}
inputWalletTx.incoming.put(hashIndex, keyPurpose);
if(hashIndex.getSpentBy() != null) {
BlockTransaction outputTx = wallet.getTransactions().get(hashIndex.getSpentBy().getHash());
if(outputTx != null) {
WalletTransaction outputWalletTx = walletTransactionMap.get(outputTx);
if(outputWalletTx == null) {
outputWalletTx = new WalletTransaction(wallet, outputTx);
walletTransactionMap.put(outputTx, outputWalletTx);
}
outputWalletTx.outgoing.put(hashIndex.getSpentBy(), keyPurpose);
}
}
outputWalletTx.outgoing.put(hashIndex.getSpentBy(), keyPurpose);
}
}
}