From 4bad46c9e311e7cb626654544cf4537087e0ab2e Mon Sep 17 00:00:00 2001 From: Craig Raw Date: Thu, 17 Sep 2020 15:10:56 +0200 Subject: [PATCH] optimize electrum server queries --- drongo | 2 +- .../sparrow/net/ElectrumServer.java | 82 +++++++++---- .../sparrow/net/SimpleElectrumServerRpc.java | 4 +- .../sparrow/net/SubscriptionService.java | 15 ++- .../sparrow/net/TcpTransport.java | 116 +++++++++++------- .../sparrow/wallet/WalletForm.java | 12 +- .../wallet/WalletTransactionsEntry.java | 33 ++--- 7 files changed, 174 insertions(+), 90 deletions(-) diff --git a/drongo b/drongo index db8ef9e4..5e281982 160000 --- a/drongo +++ b/drongo @@ -1 +1 @@ -Subproject commit db8ef9e4a1362671128c694c6b8068c6c9e620b6 +Subproject commit 5e281982cb3f4711486f53b02b1ea9d9b12a493e diff --git a/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java b/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java index a6b46084..8201dfd4 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java @@ -1,6 +1,7 @@ package com.sparrowwallet.sparrow.net; import com.github.arteam.simplejsonrpc.client.Transport; +import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; import com.google.common.net.HostAndPort; import com.sparrowwallet.drongo.KeyPurpose; @@ -33,7 +34,7 @@ public class ElectrumServer { private static Transport transport; - private static final Map subscribedScriptHashes = Collections.synchronizedMap(new HashMap<>()); + private static final Map> subscribedScriptHashes = Collections.synchronizedMap(new HashMap<>()); private static ElectrumServerRpc electrumServerRpc = new SimpleElectrumServerRpc(); @@ -134,9 +135,18 @@ public class ElectrumServer { return receiveTransactionMap; } + public Map> getHistory(Wallet wallet, WalletNode walletNode) throws ServerException { + Map> nodeTransactionMap = new TreeMap<>(); + Collection nodes = Set.of(walletNode); + subscribeWalletNodes(wallet, nodes, nodeTransactionMap, 0); + getReferences(wallet, nodes, nodeTransactionMap); + + return nodeTransactionMap; + } + public void getHistory(Wallet wallet, KeyPurpose keyPurpose, Map> nodeTransactionMap) throws ServerException { WalletNode purposeNode = wallet.getNode(keyPurpose); - getHistory(wallet, purposeNode.getChildren(), nodeTransactionMap, 0); + subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, 0); //Because node children are added sequentially in WalletNode.fillToIndex, we can simply look at the number of children to determine the highest filled index int historySize = purposeNode.getChildren().size(); @@ -144,10 +154,18 @@ public class ElectrumServer { int gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap); while(historySize < gapLimitSize) { purposeNode.fillToIndex(gapLimitSize - 1); - getHistory(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize); + subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize); historySize = purposeNode.getChildren().size(); gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap); } + + //All WalletNode keys in nodeTransactionMap with non-null values need to have their history fetched + Collection usedNodes = nodeTransactionMap.entrySet().stream().filter(entry -> entry.getValue() != null).map(Map.Entry::getKey).collect(Collectors.toList()); + log.debug("Retrieving history for " + usedNodes.stream().map(WalletNode::getDerivationPath).collect(Collectors.joining(", "))); + getReferences(wallet, usedNodes, nodeTransactionMap); + + //Set the remaining WalletNode keys to empty sets to indicate no history + nodeTransactionMap.entrySet().stream().filter(entry -> entry.getValue() == null).forEach(entry -> entry.setValue(Collections.emptySet())); } private int getGapLimitSize(Wallet wallet, Map> nodeTransactionMap) { @@ -155,22 +173,11 @@ public class ElectrumServer { return highestIndex + wallet.getGapLimit() + 1; } - public void getHistory(Wallet wallet, Collection nodes, Map> nodeTransactionMap, int startIndex) throws ServerException { - getReferences(wallet, "blockchain.scripthash.get_history", nodes, nodeTransactionMap, startIndex); - subscribeWalletNodes(wallet, nodes, startIndex); - } - - public void getMempool(Wallet wallet, Collection nodes, Map> nodeTransactionMap, int startIndex) throws ServerException { - getReferences(wallet, "blockchain.scripthash.get_mempool", nodes, nodeTransactionMap, startIndex); - } - - public void getReferences(Wallet wallet, String method, Collection nodes, Map> nodeTransactionMap, int startIndex) throws ServerException { + public void getReferences(Wallet wallet, Collection nodes, Map> nodeTransactionMap) throws ServerException { try { Map pathScriptHashes = new LinkedHashMap<>(nodes.size()); for(WalletNode node : nodes) { - if(node.getIndex() >= startIndex) { - pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node)); - } + pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node)); } //Even if we have some successes, failure to retrieve all references will result in an incomplete wallet history. Don't proceed if that's the case. @@ -211,14 +218,18 @@ public class ElectrumServer { } } - public void subscribeWalletNodes(Wallet wallet, Collection nodes, int startIndex) throws ServerException { + public void subscribeWalletNodes(Wallet wallet, Collection nodes, Map> nodeTransactionMap, int startIndex) throws ServerException { try { Set scriptHashes = new HashSet<>(); Map pathScriptHashes = new LinkedHashMap<>(); for(WalletNode node : nodes) { if(node.getIndex() >= startIndex) { String scriptHash = getScriptHash(wallet, node); - if(!subscribedScriptHashes.containsKey(scriptHash) && scriptHashes.add(scriptHash)) { + if(getSubscribedScriptHashStatus(scriptHash) != null) { + //Already subscribed, but still need to fetch history from a used node + nodeTransactionMap.put(node, new TreeSet<>()); + } else if(!subscribedScriptHashes.containsKey(scriptHash) && scriptHashes.add(scriptHash)) { + //Unique script hash we are not yet subscribed to pathScriptHashes.put(node.getDerivationPath(), scriptHash); } } @@ -236,7 +247,15 @@ public class ElectrumServer { Optional optionalNode = nodes.stream().filter(n -> n.getDerivationPath().equals(path)).findFirst(); if(optionalNode.isPresent()) { WalletNode node = optionalNode.get(); - subscribedScriptHashes.put(getScriptHash(wallet, node), status); + String scriptHash = getScriptHash(wallet, node); + + //Check if there is history for this script hash + if(status != null) { + //Set the value for this node to be an empty set to mark it as requiring a get_history RPC call for this wallet + nodeTransactionMap.put(node, new TreeSet<>()); + } + + updateSubscribedScriptHashStatus(scriptHash, status); } } } catch (ElectrumServerRpcException e) { @@ -543,10 +562,24 @@ public class ElectrumServer { return Utils.bytesToHex(reversed); } - public static Map getSubscribedScriptHashes() { + public static Map> getSubscribedScriptHashes() { return subscribedScriptHashes; } + public static String getSubscribedScriptHashStatus(String scriptHash) { + Set existingStatuses = subscribedScriptHashes.get(scriptHash); + if(existingStatuses != null) { + return Iterables.getLast(existingStatuses); + } + + return null; + } + + public static void updateSubscribedScriptHashStatus(String scriptHash, String status) { + Set existingStatuses = subscribedScriptHashes.computeIfAbsent(scriptHash, k -> new LinkedHashSet<>()); + existingStatuses.add(status); + } + public static boolean supportsBatching(List serverVersion) { return serverVersion.size() > 0 && serverVersion.get(0).toLowerCase().contains("electrumx"); } @@ -707,9 +740,16 @@ public class ElectrumServer { public static class TransactionHistoryService extends Service { private final Wallet wallet; + private final WalletNode node; public TransactionHistoryService(Wallet wallet) { this.wallet = wallet; + this.node = null; + } + + public TransactionHistoryService(Wallet wallet, WalletNode node) { + this.wallet = wallet; + this.node = node; } @Override @@ -717,7 +757,7 @@ public class ElectrumServer { return new Task<>() { protected Boolean call() throws ServerException { ElectrumServer electrumServer = new ElectrumServer(); - Map> nodeTransactionMap = electrumServer.getHistory(wallet); + Map> nodeTransactionMap = (node == null ? electrumServer.getHistory(wallet) : electrumServer.getHistory(wallet, node)); electrumServer.getReferencedTransactions(wallet, nodeTransactionMap); electrumServer.calculateNodeHistory(wallet, nodeTransactionMap); return true; diff --git a/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java b/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java index 03240d20..4061d118 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java @@ -108,8 +108,8 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { Map result = new LinkedHashMap<>(); for(String path : pathScriptHashes.keySet()) { try { - client.createRequest().method("blockchain.scripthash.subscribe").id(path).params(pathScriptHashes.get(path)).executeNullable(); - result.put(path, ""); + String scriptHash = client.createRequest().returnAs(String.class).method("blockchain.scripthash.subscribe").id(path).params(pathScriptHashes.get(path)).executeNullable(); + result.put(path, scriptHash); } catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); diff --git a/src/main/java/com/sparrowwallet/sparrow/net/SubscriptionService.java b/src/main/java/com/sparrowwallet/sparrow/net/SubscriptionService.java index 71edffed..1d90bce6 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/SubscriptionService.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/SubscriptionService.java @@ -3,6 +3,7 @@ package com.sparrowwallet.sparrow.net; import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcMethod; import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcParam; import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcService; +import com.google.common.collect.Iterables; import com.sparrowwallet.sparrow.EventManager; import com.sparrowwallet.sparrow.event.NewBlockEvent; import com.sparrowwallet.sparrow.event.WalletNodeHistoryChangedEvent; @@ -10,7 +11,7 @@ import javafx.application.Platform; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Objects; +import java.util.Set; @JsonRpcService public class SubscriptionService { @@ -23,9 +24,17 @@ public class SubscriptionService { @JsonRpcMethod("blockchain.scripthash.subscribe") public void scriptHashStatusUpdated(@JsonRpcParam("scripthash") final String scriptHash, @JsonRpcParam("status") final String status) { - String oldStatus = ElectrumServer.getSubscribedScriptHashes().put(scriptHash, status); - if(Objects.equals(oldStatus, status)) { + Set existingStatuses = ElectrumServer.getSubscribedScriptHashes().get(scriptHash); + if(existingStatuses == null) { + log.warn("Received script hash status update for unsubscribed script hash: " + scriptHash); + ElectrumServer.updateSubscribedScriptHashStatus(scriptHash, status); + } else if(existingStatuses.contains(status)) { log.warn("Received script hash status update, but status has not changed"); + return; + } else { + String oldStatus = Iterables.getLast(existingStatuses); + log.debug("Status updated for script hash " + scriptHash + ", was " + oldStatus + " now " + status); + existingStatuses.add(status); } Platform.runLater(() -> EventManager.get().post(new WalletNodeHistoryChangedEvent(scriptHash))); diff --git a/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java b/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java index e43faa20..ac1abf08 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java @@ -11,6 +11,8 @@ import org.slf4j.LoggerFactory; import javax.net.SocketFactory; import java.io.*; import java.net.Socket; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class TcpTransport implements Transport, Closeable { @@ -25,6 +27,9 @@ public class TcpTransport implements Transport, Closeable { private String response; + private final ReentrantLock readLock = new ReentrantLock(); + private final Condition readingCondition = readLock.newCondition(); + private final ReentrantLock clientRequestLock = new ReentrantLock(); private boolean running = false; private boolean reading = true; @@ -57,64 +62,83 @@ public class TcpTransport implements Transport, Closeable { out.flush(); } - private synchronized String readResponse() throws IOException { - if(firstRead) { - notifyAll(); - firstRead = false; - } - - while(reading) { - try { - wait(); - } catch(InterruptedException e) { - //Restore interrupt status and continue - Thread.currentThread().interrupt(); + private String readResponse() throws IOException { + try { + if(!readLock.tryLock(60, TimeUnit.SECONDS)) { + throw new IOException("No response from server"); } + } catch(InterruptedException e) { + throw new IOException("Read thread interrupted"); } - if(lastException != null) { - throw new IOException("Error reading response: " + lastException.getMessage(), lastException); + try { + if(firstRead) { + readingCondition.signal(); + firstRead = false; + } + + while(reading) { + try { + readingCondition.await(); + } catch(InterruptedException e) { + //Restore interrupt status and continue + Thread.currentThread().interrupt(); + } + } + + if(lastException != null) { + throw new IOException("Error reading response: " + lastException.getMessage(), lastException); + } + + reading = true; + + readingCondition.signal(); + return response; + } finally { + readLock.unlock(); } - - reading = true; - - notifyAll(); - return response; } - public synchronized void readInputLoop() throws ServerException { - try { - //Don't start reading until first RPC request is sent - wait(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } + public void readInputLoop() throws ServerException { + readLock.lock(); - while(running) { + try { try { - String received = readInputStream(); - if(received.contains("method")) { - //Handle subscription notification - jsonRpcServer.handle(received, subscriptionService); - } else { - //Handle client's response - response = received; - reading = false; - notifyAll(); - wait(); - } + //Don't start reading until first RPC request is sent + readingCondition.await(); } catch(InterruptedException e) { - //Restore interrupt status and continue Thread.currentThread().interrupt(); - } catch(Exception e) { - if(running) { - lastException = e; - reading = false; - notifyAll(); - //Allow this thread to terminate as we will need to reconnect with a new transport anyway - running = false; + } + + while(running) { + try { + String received = readInputStream(); + if(received.contains("method")) { + //Handle subscription notification + jsonRpcServer.handle(received, subscriptionService); + } else { + //Handle client's response + response = received; + reading = false; + readingCondition.signal(); + readingCondition.await(); + } + } catch(InterruptedException e) { + //Restore interrupt status and continue + Thread.currentThread().interrupt(); + } catch(Exception e) { + log.debug("Connection error while reading", e); + if(running) { + lastException = e; + reading = false; + readingCondition.signal(); + //Allow this thread to terminate as we will need to reconnect with a new transport anyway + running = false; + } } } + } finally { + readLock.unlock(); } } diff --git a/src/main/java/com/sparrowwallet/sparrow/wallet/WalletForm.java b/src/main/java/com/sparrowwallet/sparrow/wallet/WalletForm.java index d289af0c..50a3daac 100644 --- a/src/main/java/com/sparrowwallet/sparrow/wallet/WalletForm.java +++ b/src/main/java/com/sparrowwallet/sparrow/wallet/WalletForm.java @@ -65,9 +65,14 @@ public class WalletForm { } public void refreshHistory(Integer blockHeight) { + refreshHistory(blockHeight, null); + } + + public void refreshHistory(Integer blockHeight, WalletNode node) { Wallet previousWallet = wallet.copy(); if(wallet.isValid() && AppController.isOnline()) { - ElectrumServer.TransactionHistoryService historyService = new ElectrumServer.TransactionHistoryService(wallet); + log.debug(node == null ? "Refreshing full wallet history" : "Requesting node wallet history for " + node.getDerivationPath()); + ElectrumServer.TransactionHistoryService historyService = new ElectrumServer.TransactionHistoryService(wallet, node); historyService.setOnSucceeded(workerStateEvent -> { updateWallet(previousWallet, blockHeight); }); @@ -213,8 +218,9 @@ public class WalletForm { @Subscribe public void walletNodeHistoryChanged(WalletNodeHistoryChangedEvent event) { - if(event.getWalletNode(wallet) != null) { - refreshHistory(AppController.getCurrentBlockHeight()); + WalletNode walletNode = event.getWalletNode(wallet); + if(walletNode != null) { + refreshHistory(AppController.getCurrentBlockHeight(), walletNode); } } diff --git a/src/main/java/com/sparrowwallet/sparrow/wallet/WalletTransactionsEntry.java b/src/main/java/com/sparrowwallet/sparrow/wallet/WalletTransactionsEntry.java index d2c3c2c4..aa677831 100644 --- a/src/main/java/com/sparrowwallet/sparrow/wallet/WalletTransactionsEntry.java +++ b/src/main/java/com/sparrowwallet/sparrow/wallet/WalletTransactionsEntry.java @@ -89,21 +89,26 @@ public class WalletTransactionsEntry extends Entry { for(WalletNode addressNode : purposeNode.getChildren()) { for(BlockTransactionHashIndex hashIndex : addressNode.getTransactionOutputs()) { BlockTransaction inputTx = wallet.getTransactions().get(hashIndex.getHash()); - WalletTransaction inputWalletTx = walletTransactionMap.get(inputTx); - if(inputWalletTx == null) { - inputWalletTx = new WalletTransaction(wallet, inputTx); - walletTransactionMap.put(inputTx, inputWalletTx); - } - inputWalletTx.incoming.put(hashIndex, keyPurpose); - - if(hashIndex.getSpentBy() != null) { - BlockTransaction outputTx = wallet.getTransactions().get(hashIndex.getSpentBy().getHash()); - WalletTransaction outputWalletTx = walletTransactionMap.get(outputTx); - if(outputWalletTx == null) { - outputWalletTx = new WalletTransaction(wallet, outputTx); - walletTransactionMap.put(outputTx, outputWalletTx); + //A null inputTx here means the wallet is still updating - ignore as the WalletHistoryChangedEvent will run this again + if(inputTx != null) { + WalletTransaction inputWalletTx = walletTransactionMap.get(inputTx); + if(inputWalletTx == null) { + inputWalletTx = new WalletTransaction(wallet, inputTx); + walletTransactionMap.put(inputTx, inputWalletTx); + } + inputWalletTx.incoming.put(hashIndex, keyPurpose); + + if(hashIndex.getSpentBy() != null) { + BlockTransaction outputTx = wallet.getTransactions().get(hashIndex.getSpentBy().getHash()); + if(outputTx != null) { + WalletTransaction outputWalletTx = walletTransactionMap.get(outputTx); + if(outputWalletTx == null) { + outputWalletTx = new WalletTransaction(wallet, outputTx); + walletTransactionMap.put(outputTx, outputWalletTx); + } + outputWalletTx.outgoing.put(hashIndex.getSpentBy(), keyPurpose); + } } - outputWalletTx.outgoing.put(hashIndex.getSpentBy(), keyPurpose); } } }