From 998100505dc9efa94bcb8ca38f35f7ea0d72a311 Mon Sep 17 00:00:00 2001 From: Craig Raw Date: Tue, 13 Oct 2020 12:48:32 +0200 Subject: [PATCH] change electrum call ordering and add retry handling for better behaviour on electrs --- .../sparrow/net/BatchedElectrumServerRpc.java | 68 ++++++++++----- .../sparrow/net/ElectrumServer.java | 63 ++++++++++---- .../sparrowwallet/sparrow/net/RetryLogic.java | 54 ++++++++++++ .../sparrow/net/SimpleElectrumServerRpc.java | 83 ++++++++++--------- .../sparrow/net/TcpTransport.java | 3 +- 5 files changed, 193 insertions(+), 78 deletions(-) create mode 100644 src/main/java/com/sparrowwallet/sparrow/net/RetryLogic.java diff --git a/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java b/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java index e97aecc7..11963d38 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java @@ -18,6 +18,8 @@ import java.util.concurrent.atomic.AtomicLong; public class BatchedElectrumServerRpc implements ElectrumServerRpc { private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class); + private static final int MAX_RETRIES = 3; + private static final int RETRY_DELAY = 0; private final AtomicLong idCounter = new AtomicLong(); @@ -25,8 +27,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { public void ping(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable(); - } catch(JsonRpcException e) { + new RetryLogic<>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error pinging server", e); } } @@ -35,8 +38,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { public List getServerVersion(Transport transport, String clientName, String[] supportedVersions) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).param("client_name", clientName).param("protocol_version", supportedVersions).execute(); - } catch(JsonRpcException e) { + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).param("client_name", clientName).param("protocol_version", supportedVersions).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error getting server version", e); } } @@ -45,8 +49,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { public String getServerBanner(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute(); - } catch(JsonRpcException e) { + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error getting server banner", e); } } @@ -55,8 +60,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { public BlockHeaderTip subscribeBlockHeaders(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute(); - } catch(JsonRpcException e) { + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error subscribing to block headers", e); } } @@ -73,7 +79,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); } catch (JsonRpcBatchException e) { if(failOnError) { throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + e.getErrors().keySet(), e); @@ -85,6 +91,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } return result; + } catch(Exception e) { + throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + pathScriptHashes.keySet(), e); } } @@ -99,8 +107,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); - } catch (JsonRpcBatchException e) { + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); + } catch(JsonRpcBatchException e) { if(failOnError) { throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + e.getErrors().keySet(), e); } @@ -111,6 +119,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } return result; + } catch(Exception e) { + throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + pathScriptHashes.keySet(), e); } } @@ -125,10 +135,12 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); } catch(JsonRpcBatchException e) { //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. throw new ElectrumServerRpcException("Failed to subscribe for updates for paths: " + e.getErrors().keySet(), e); + } catch(Exception e) { + throw new ElectrumServerRpcException("Failed to subscribe for updates for paths: " + pathScriptHashes.keySet(), e); } } @@ -144,9 +156,11 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); - } catch (JsonRpcBatchException e) { + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); + } catch(JsonRpcBatchException e) { return (Map)e.getSuccesses(); + } catch(Exception e) { + throw new ElectrumServerRpcException("Failed to block headers for block heights: " + blockHeights, e); } } @@ -162,8 +176,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); - } catch (JsonRpcBatchException e) { + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); + } catch(JsonRpcBatchException e) { Map result = (Map)e.getSuccesses(); String strErrorTx = Sha256Hash.ZERO_HASH.toString(); @@ -173,6 +187,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } return result; + } catch(Exception e) { + throw new ElectrumServerRpcException("Failed to retrieve transactions for txids: " + txids, e); } } @@ -186,10 +202,12 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); - } catch (JsonRpcBatchException e) { + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); + } catch(JsonRpcBatchException e) { log.warn("Some errors retrieving transactions: " + e.getErrors()); return (Map)e.getSuccesses(); + } catch(Exception e) { + throw new ElectrumServerRpcException("Failed to retrieve verbose transactions for txids: " + txids, e); } } @@ -202,9 +220,11 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return batchRequest.execute(); + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); } catch(JsonRpcBatchException e) { throw new ElectrumServerRpcException("Error getting fee estimates", e); + } catch(Exception e) { + throw new ElectrumServerRpcException("Error getting fee estimates for target blocks: " + targetBlocks, e); } } @@ -212,8 +232,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { public Double getMinimumRelayFee(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute(); - } catch(JsonRpcException e) { + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error getting minimum relay fee", e); } } @@ -222,9 +243,12 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { public String broadcastTransaction(Transport transport, String txHex) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).param("raw_tx", txHex).execute(); + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).param("raw_tx", txHex).execute()); } catch(JsonRpcException e) { throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e); + } catch(Exception e) { + throw new ElectrumServerRpcException("Error broadcasting transaction", e); } } } diff --git a/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java b/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java index a3b13192..8f574636 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/ElectrumServer.java @@ -138,14 +138,19 @@ public class ElectrumServer { public Map> getHistory(Wallet wallet, Collection nodes) throws ServerException { Map> nodeTransactionMap = new TreeMap<>(); subscribeWalletNodes(wallet, nodes, nodeTransactionMap, 0); - getReferences(wallet, nodes, nodeTransactionMap); + getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, 0); return nodeTransactionMap; } public void getHistory(Wallet wallet, KeyPurpose keyPurpose, Map> nodeTransactionMap) throws ServerException { WalletNode purposeNode = wallet.getNode(keyPurpose); + //Subscribe to all existing address WalletNodes and add them to nodeTransactionMap as keys to empty sets if they have history subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, 0); + //All WalletNode keys in nodeTransactionMap need to have their history fetched (nodes without history will not be keys in the map yet) + getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, 0); + //Fetch all referenced transaction to wallet transactions map + getReferencedTransactions(wallet, nodeTransactionMap); //Because node children are added sequentially in WalletNode.fillToIndex, we can simply look at the number of children to determine the highest filled index int historySize = purposeNode.getChildren().size(); @@ -154,16 +159,13 @@ public class ElectrumServer { while(historySize < gapLimitSize) { purposeNode.fillToIndex(gapLimitSize - 1); subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize); + getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, historySize); + getReferencedTransactions(wallet, nodeTransactionMap); historySize = purposeNode.getChildren().size(); gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap); } - //All WalletNode keys in nodeTransactionMap need to have their history fetched - Collection usedNodes = new ArrayList<>(nodeTransactionMap.keySet()); - log.debug("Retrieving history for " + usedNodes.stream().map(WalletNode::getDerivationPath).collect(Collectors.joining(", "))); - getReferences(wallet, usedNodes, nodeTransactionMap); - - //Set the remaining WalletNode keys to empty sets to indicate no history + //Set the remaining WalletNode keys in nodeTransactionMap to empty sets to indicate no history purposeNode.getChildren().stream().filter(node -> !nodeTransactionMap.containsKey(node)).forEach(node -> nodeTransactionMap.put(node, Collections.emptySet())); } @@ -172,11 +174,13 @@ public class ElectrumServer { return highestIndex + wallet.getGapLimit() + 1; } - public void getReferences(Wallet wallet, Collection nodes, Map> nodeTransactionMap) throws ServerException { + public void getReferences(Wallet wallet, Collection nodes, Map> nodeTransactionMap, int startIndex) throws ServerException { try { Map pathScriptHashes = new LinkedHashMap<>(nodes.size()); for(WalletNode node : nodes) { - pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node)); + if(node.getIndex() >= startIndex) { + pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node)); + } } if(pathScriptHashes.isEmpty()) { @@ -226,7 +230,11 @@ public class ElectrumServer { Set scriptHashes = new HashSet<>(); Map pathScriptHashes = new LinkedHashMap<>(); for(WalletNode node : nodes) { - if(node.getIndex() >= startIndex) { + if(node == null) { + log.error("Null node for wallet " + wallet.getName() + " subscribing nodes " + nodes + " startIndex " + startIndex); + } + + if(node != null && node.getIndex() >= startIndex) { String scriptHash = getScriptHash(wallet, node); if(getSubscribedScriptHashStatus(scriptHash) != null) { //Already subscribed, but still need to fetch history from a used node @@ -318,6 +326,14 @@ public class ElectrumServer { references.addAll(nodeReferences); } + for(Iterator iter = references.iterator(); iter.hasNext(); ) { + BlockTransactionHash reference = iter.next(); + BlockTransaction blockTransaction = wallet.getTransactions().get(reference.getHash()); + if(blockTransaction != null && reference.getHeight() == blockTransaction.getHeight()) { + iter.remove(); + } + } + Map transactionMap = new HashMap<>(); if(!references.isEmpty()) { Map blockHeaderMap = getBlockHeaders(references); @@ -388,7 +404,14 @@ public class ElectrumServer { } byte[] rawtx = Utils.hexToBytes(strRawTx); - Transaction transaction = new Transaction(rawtx); + Transaction transaction; + + try { + transaction = new Transaction(rawtx); + } catch(ProtocolException e) { + log.error("Could not parse tx: " + strRawTx); + continue; + } Optional optionalReference = references.stream().filter(reference -> reference.getHash().equals(hash)).findFirst(); if(optionalReference.isEmpty()) { @@ -439,7 +462,9 @@ public class ElectrumServer { Set history = nodeTransactionMap.get(node); for(BlockTransactionHash reference : history) { BlockTransaction blockTransaction = wallet.getTransactions().get(reference.getHash()); - if(blockTransaction == null || blockTransaction.equals(UNFETCHABLE_BLOCK_TRANSACTION)) { + if(blockTransaction == null) { + throw new IllegalStateException("Did not retrieve transaction for hash " + reference.getHashAsString()); + } else if(blockTransaction.equals(UNFETCHABLE_BLOCK_TRANSACTION)) { throw new IllegalStateException("Could not retrieve transaction for hash " + reference.getHashAsString()); } Transaction transaction = blockTransaction.getTransaction(); @@ -760,6 +785,7 @@ public class ElectrumServer { public static class TransactionHistoryService extends Service { private final Wallet wallet; private final Set nodes; + private final static Map walletSynchronizeLocks = new HashMap<>(); public TransactionHistoryService(Wallet wallet) { this.wallet = wallet; @@ -775,11 +801,14 @@ public class ElectrumServer { protected Task createTask() { return new Task<>() { protected Boolean call() throws ServerException { - ElectrumServer electrumServer = new ElectrumServer(); - Map> nodeTransactionMap = (nodes == null ? electrumServer.getHistory(wallet) : electrumServer.getHistory(wallet, nodes)); - electrumServer.getReferencedTransactions(wallet, nodeTransactionMap); - electrumServer.calculateNodeHistory(wallet, nodeTransactionMap); - return true; + walletSynchronizeLocks.putIfAbsent(wallet, new Object()); + synchronized(walletSynchronizeLocks.get(wallet)) { + ElectrumServer electrumServer = new ElectrumServer(); + Map> nodeTransactionMap = (nodes == null ? electrumServer.getHistory(wallet) : electrumServer.getHistory(wallet, nodes)); + electrumServer.getReferencedTransactions(wallet, nodeTransactionMap); + electrumServer.calculateNodeHistory(wallet, nodeTransactionMap); + return true; + } } }; } diff --git a/src/main/java/com/sparrowwallet/sparrow/net/RetryLogic.java b/src/main/java/com/sparrowwallet/sparrow/net/RetryLogic.java new file mode 100644 index 00000000..81e057ae --- /dev/null +++ b/src/main/java/com/sparrowwallet/sparrow/net/RetryLogic.java @@ -0,0 +1,54 @@ +package com.sparrowwallet.sparrow.net; + +import java.util.List; + +/** + * Generic retry logic. Delegate must throw the specified exception type to trigger the retry logic. + */ +public class RetryLogic { + public static interface Delegate { + T call() throws Exception; + } + + private final int maxAttempts; + private final int retryWaitSeconds; + @SuppressWarnings("rawtypes") + private final List retryExceptionTypes; + + public RetryLogic(int maxAttempts, int retryWaitSeconds, @SuppressWarnings("rawtypes") Class retryExceptionType) { + this(maxAttempts, retryWaitSeconds, List.of(retryExceptionType)); + } + + public RetryLogic(int maxAttempts, int retryWaitSeconds, @SuppressWarnings("rawtypes") List retryExceptionTypes) { + this.maxAttempts = maxAttempts; + this.retryWaitSeconds = retryWaitSeconds; + this.retryExceptionTypes = retryExceptionTypes; + } + + public T getResult(Delegate caller) throws Exception { + T result = null; + int remainingAttempts = maxAttempts; + do { + try { + return caller.call(); + } catch(Exception e) { + if(retryExceptionTypes.contains(e.getClass())) { + if(--remainingAttempts == 0) { + throw new ServerException("Retries exhausted", e); + } else { + try { + Thread.sleep((1000 * retryWaitSeconds)); + } catch(InterruptedException ie) { + //ignore + } + } + } else { + throw e; + } + } + + } while(remainingAttempts > 0); + + throw new IllegalStateException("Should be impossible"); + } +} \ No newline at end of file diff --git a/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java b/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java index a79bb377..abcaf587 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/SimpleElectrumServerRpc.java @@ -18,7 +18,8 @@ import java.util.concurrent.atomic.AtomicLong; public class SimpleElectrumServerRpc implements ElectrumServerRpc { private static final Logger log = LoggerFactory.getLogger(SimpleElectrumServerRpc.class); private static final int MAX_TARGET_BLOCKS = 25; - private static final int PER_REQUEST_DELAY_MILLIS = 50; + private static final int MAX_RETRIES = 3; + private static final int RETRY_DELAY = 0; private final AtomicLong idCounter = new AtomicLong(); @@ -26,8 +27,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { public void ping(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable(); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { + new RetryLogic<>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error pinging server", e); } } @@ -36,9 +38,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { public List getServerVersion(Transport transport, String clientName, String[] supportedVersions) { try { JsonRpcClient client = new JsonRpcClient(transport); - //Using 1.4 as the version number as EPS tries to parse this number to a float - return client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).params(clientName, "1.4").execute(); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { + //Using 1.4 as the version number as EPS tries to parse this number to a float :( + return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).params(clientName, "1.4").execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error getting server version", e); } } @@ -47,8 +50,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { public String getServerBanner(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute(); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error getting server banner", e); } } @@ -57,8 +61,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { public BlockHeaderTip subscribeBlockHeaders(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute(); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error subscribing to block headers", e); } } @@ -71,10 +76,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { for(String path : pathScriptHashes.keySet()) { EventManager.get().post(new WalletHistoryStatusEvent(false, "Loading transactions for " + path)); try { - ScriptHashTx[] scriptHashTxes = client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_history").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute(); + ScriptHashTx[] scriptHashTxes = new RetryLogic(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() -> + client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_history").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute()); result.put(path, scriptHashTxes); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) { + } catch(Exception e) { if(failOnError) { throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); } @@ -93,10 +98,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { Map result = new LinkedHashMap<>(); for(String path : pathScriptHashes.keySet()) { try { - ScriptHashTx[] scriptHashTxes = client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_mempool").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute(); + ScriptHashTx[] scriptHashTxes = new RetryLogic(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() -> + client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_mempool").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute()); result.put(path, scriptHashTxes); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) { + } catch(Exception e) { if(failOnError) { throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); } @@ -116,10 +121,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { for(String path : pathScriptHashes.keySet()) { EventManager.get().post(new WalletHistoryStatusEvent(false, "Finding transactions for " + path)); try { - String scriptHash = client.createRequest().returnAs(String.class).method("blockchain.scripthash.subscribe").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).executeNullable(); + String scriptHash = new RetryLogic(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() -> + client.createRequest().returnAs(String.class).method("blockchain.scripthash.subscribe").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).executeNullable()); result.put(path, scriptHash); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) { + } catch(Exception e) { //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); } @@ -136,13 +141,13 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { for(Integer blockHeight : blockHeights) { EventManager.get().post(new WalletHistoryStatusEvent(false, "Retrieving block at height " + blockHeight)); try { - String blockHeader = client.createRequest().returnAs(String.class).method("blockchain.block.header").id(idCounter.incrementAndGet()).params(blockHeight).execute(); + String blockHeader = new RetryLogic(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() -> + client.createRequest().returnAs(String.class).method("blockchain.block.header").id(idCounter.incrementAndGet()).params(blockHeight).execute()); result.put(blockHeight, blockHeader); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); - } catch(IllegalStateException | IllegalArgumentException | InterruptedException e) { - log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getMessage() + ")"); } catch(JsonRpcException e) { log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getErrorMessage() + ")"); + } catch(Exception e) { + log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getMessage() + ")"); } } @@ -157,10 +162,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { for(String txid : txids) { EventManager.get().post(new WalletHistoryStatusEvent(false, "Retrieving transaction [" + txid.substring(0, 6) + "]")); try { - String rawTxHex = client.createRequest().returnAs(String.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid).execute(); + String rawTxHex = new RetryLogic(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() -> + client.createRequest().returnAs(String.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid).execute()); result.put(txid, rawTxHex); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); - } catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) { + } catch(Exception e) { result.put(txid, Sha256Hash.ZERO_HASH.toString()); } } @@ -175,9 +180,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { Map result = new LinkedHashMap<>(); for(String txid : txids) { try { - VerboseTransaction verboseTransaction = client.createRequest().returnAs(VerboseTransaction.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid, true).execute(); + VerboseTransaction verboseTransaction = new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(VerboseTransaction.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid, true).execute()); result.put(txid, verboseTransaction); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); } catch(Exception e) { //electrs-esplora does not currently support the verbose parameter, so try to fetch an incomplete VerboseTransaction without it //Note that without the script hash associated with the transaction, we can't get a block height as there is no way in the Electrum RPC protocol to do this @@ -223,14 +228,14 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { for(Integer targetBlock : targetBlocks) { if(targetBlock <= MAX_TARGET_BLOCKS) { try { - Double targetBlocksFeeRateBtcKb = client.createRequest().returnAs(Double.class).method("blockchain.estimatefee").id(idCounter.incrementAndGet()).params(targetBlock).execute(); + Double targetBlocksFeeRateBtcKb = new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(Double.class).method("blockchain.estimatefee").id(idCounter.incrementAndGet()).params(targetBlock).execute()); result.put(targetBlock, targetBlocksFeeRateBtcKb); - Thread.sleep(PER_REQUEST_DELAY_MILLIS); - } catch(IllegalStateException | IllegalArgumentException | InterruptedException e) { - log.warn("Failed to retrieve fee rate for target blocks: " + targetBlock + " (" + e.getMessage() + ")"); - result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d)); } catch(JsonRpcException e) { throw new ElectrumServerRpcException("Failed to retrieve fee rate for target blocks: " + targetBlock, e); + } catch(Exception e) { + log.warn("Failed to retrieve fee rate for target blocks: " + targetBlock + " (" + e.getMessage() + ")"); + result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d)); } } else { result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d)); @@ -244,8 +249,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { public Double getMinimumRelayFee(Transport transport) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute(); - } catch(JsonRpcException e) { + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute()); + } catch(Exception e) { throw new ElectrumServerRpcException("Error getting minimum relay fee", e); } } @@ -254,11 +260,12 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc { public String broadcastTransaction(Transport transport, String txHex) { try { JsonRpcClient client = new JsonRpcClient(transport); - return client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).params(txHex).execute(); - } catch(IllegalStateException | IllegalArgumentException e) { - throw new ElectrumServerRpcException(e.getMessage(), e); + return new RetryLogic(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> + client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).params(txHex).execute()); } catch(JsonRpcException e) { throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e); + } catch(Exception e) { + throw new ElectrumServerRpcException(e.getMessage(), e); } } } diff --git a/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java b/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java index ac1abf08..dbe4fea0 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java @@ -19,6 +19,7 @@ public class TcpTransport implements Transport, Closeable { private static final Logger log = LoggerFactory.getLogger(TcpTransport.class); public static final int DEFAULT_PORT = 50001; + private static final int READ_TIMEOUT_SECS = 3; protected final HostAndPort server; protected final SocketFactory socketFactory; @@ -64,7 +65,7 @@ public class TcpTransport implements Transport, Closeable { private String readResponse() throws IOException { try { - if(!readLock.tryLock(60, TimeUnit.SECONDS)) { + if(!readLock.tryLock(READ_TIMEOUT_SECS, TimeUnit.SECONDS)) { throw new IOException("No response from server"); } } catch(InterruptedException e) {