change electrum call ordering and add retry handling for better behaviour on electrs

This commit is contained in:
Craig Raw 2020-10-13 12:48:32 +02:00
parent a893b37bb0
commit 998100505d
5 changed files with 193 additions and 78 deletions

View file

@ -18,6 +18,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class BatchedElectrumServerRpc implements ElectrumServerRpc { public class BatchedElectrumServerRpc implements ElectrumServerRpc {
private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class); private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class);
private static final int MAX_RETRIES = 3;
private static final int RETRY_DELAY = 0;
private final AtomicLong idCounter = new AtomicLong(); private final AtomicLong idCounter = new AtomicLong();
@ -25,8 +27,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public void ping(Transport transport) { public void ping(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable(); new RetryLogic<>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException e) { client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error pinging server", e); throw new ElectrumServerRpcException("Error pinging server", e);
} }
} }
@ -35,8 +38,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public List<String> getServerVersion(Transport transport, String clientName, String[] supportedVersions) { public List<String> getServerVersion(Transport transport, String clientName, String[] supportedVersions) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).param("client_name", clientName).param("protocol_version", supportedVersions).execute(); return new RetryLogic<List<String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException e) { client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).param("client_name", clientName).param("protocol_version", supportedVersions).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting server version", e); throw new ElectrumServerRpcException("Error getting server version", e);
} }
} }
@ -45,8 +49,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public String getServerBanner(Transport transport) { public String getServerBanner(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute(); return new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException e) { client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting server banner", e); throw new ElectrumServerRpcException("Error getting server banner", e);
} }
} }
@ -55,8 +60,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public BlockHeaderTip subscribeBlockHeaders(Transport transport) { public BlockHeaderTip subscribeBlockHeaders(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute(); return new RetryLogic<BlockHeaderTip>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException e) { client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error subscribing to block headers", e); throw new ElectrumServerRpcException("Error subscribing to block headers", e);
} }
} }
@ -73,7 +79,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch (JsonRpcBatchException e) { } catch (JsonRpcBatchException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + e.getErrors().keySet(), e); throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + e.getErrors().keySet(), e);
@ -85,6 +91,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
return result; return result;
} catch(Exception e) {
throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + pathScriptHashes.keySet(), e);
} }
} }
@ -99,7 +107,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + e.getErrors().keySet(), e); throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + e.getErrors().keySet(), e);
@ -111,6 +119,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
return result; return result;
} catch(Exception e) {
throw new ElectrumServerRpcException("Failed to retrieve references for paths: " + pathScriptHashes.keySet(), e);
} }
} }
@ -125,10 +135,12 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<String, String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
//Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed.
throw new ElectrumServerRpcException("Failed to subscribe for updates for paths: " + e.getErrors().keySet(), e); throw new ElectrumServerRpcException("Failed to subscribe for updates for paths: " + e.getErrors().keySet(), e);
} catch(Exception e) {
throw new ElectrumServerRpcException("Failed to subscribe for updates for paths: " + pathScriptHashes.keySet(), e);
} }
} }
@ -144,9 +156,11 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<Integer, String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
return (Map<Integer, String>)e.getSuccesses(); return (Map<Integer, String>)e.getSuccesses();
} catch(Exception e) {
throw new ElectrumServerRpcException("Failed to block headers for block heights: " + blockHeights, e);
} }
} }
@ -162,7 +176,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<String, String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
Map<String, String> result = (Map<String, String>)e.getSuccesses(); Map<String, String> result = (Map<String, String>)e.getSuccesses();
@ -173,6 +187,8 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
return result; return result;
} catch(Exception e) {
throw new ElectrumServerRpcException("Failed to retrieve transactions for txids: " + txids, e);
} }
} }
@ -186,10 +202,12 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<String, VerboseTransaction>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
log.warn("Some errors retrieving transactions: " + e.getErrors()); log.warn("Some errors retrieving transactions: " + e.getErrors());
return (Map<String, VerboseTransaction>)e.getSuccesses(); return (Map<String, VerboseTransaction>)e.getSuccesses();
} catch(Exception e) {
throw new ElectrumServerRpcException("Failed to retrieve verbose transactions for txids: " + txids, e);
} }
} }
@ -202,9 +220,11 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return batchRequest.execute(); return new RetryLogic<Map<Integer, Double>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
throw new ElectrumServerRpcException("Error getting fee estimates", e); throw new ElectrumServerRpcException("Error getting fee estimates", e);
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting fee estimates for target blocks: " + targetBlocks, e);
} }
} }
@ -212,8 +232,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public Double getMinimumRelayFee(Transport transport) { public Double getMinimumRelayFee(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute(); return new RetryLogic<Double>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException e) { client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting minimum relay fee", e); throw new ElectrumServerRpcException("Error getting minimum relay fee", e);
} }
} }
@ -222,9 +243,12 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public String broadcastTransaction(Transport transport, String txHex) { public String broadcastTransaction(Transport transport, String txHex) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).param("raw_tx", txHex).execute(); return new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).param("raw_tx", txHex).execute());
} catch(JsonRpcException e) { } catch(JsonRpcException e) {
throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e); throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e);
} catch(Exception e) {
throw new ElectrumServerRpcException("Error broadcasting transaction", e);
} }
} }
} }

View file

@ -138,14 +138,19 @@ public class ElectrumServer {
public Map<WalletNode, Set<BlockTransactionHash>> getHistory(Wallet wallet, Collection<WalletNode> nodes) throws ServerException { public Map<WalletNode, Set<BlockTransactionHash>> getHistory(Wallet wallet, Collection<WalletNode> nodes) throws ServerException {
Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = new TreeMap<>(); Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = new TreeMap<>();
subscribeWalletNodes(wallet, nodes, nodeTransactionMap, 0); subscribeWalletNodes(wallet, nodes, nodeTransactionMap, 0);
getReferences(wallet, nodes, nodeTransactionMap); getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, 0);
return nodeTransactionMap; return nodeTransactionMap;
} }
public void getHistory(Wallet wallet, KeyPurpose keyPurpose, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap) throws ServerException { public void getHistory(Wallet wallet, KeyPurpose keyPurpose, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap) throws ServerException {
WalletNode purposeNode = wallet.getNode(keyPurpose); WalletNode purposeNode = wallet.getNode(keyPurpose);
//Subscribe to all existing address WalletNodes and add them to nodeTransactionMap as keys to empty sets if they have history
subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, 0); subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, 0);
//All WalletNode keys in nodeTransactionMap need to have their history fetched (nodes without history will not be keys in the map yet)
getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, 0);
//Fetch all referenced transaction to wallet transactions map
getReferencedTransactions(wallet, nodeTransactionMap);
//Because node children are added sequentially in WalletNode.fillToIndex, we can simply look at the number of children to determine the highest filled index //Because node children are added sequentially in WalletNode.fillToIndex, we can simply look at the number of children to determine the highest filled index
int historySize = purposeNode.getChildren().size(); int historySize = purposeNode.getChildren().size();
@ -154,16 +159,13 @@ public class ElectrumServer {
while(historySize < gapLimitSize) { while(historySize < gapLimitSize) {
purposeNode.fillToIndex(gapLimitSize - 1); purposeNode.fillToIndex(gapLimitSize - 1);
subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize); subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, historySize);
getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, historySize);
getReferencedTransactions(wallet, nodeTransactionMap);
historySize = purposeNode.getChildren().size(); historySize = purposeNode.getChildren().size();
gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap); gapLimitSize = getGapLimitSize(wallet, nodeTransactionMap);
} }
//All WalletNode keys in nodeTransactionMap need to have their history fetched //Set the remaining WalletNode keys in nodeTransactionMap to empty sets to indicate no history
Collection<WalletNode> usedNodes = new ArrayList<>(nodeTransactionMap.keySet());
log.debug("Retrieving history for " + usedNodes.stream().map(WalletNode::getDerivationPath).collect(Collectors.joining(", ")));
getReferences(wallet, usedNodes, nodeTransactionMap);
//Set the remaining WalletNode keys to empty sets to indicate no history
purposeNode.getChildren().stream().filter(node -> !nodeTransactionMap.containsKey(node)).forEach(node -> nodeTransactionMap.put(node, Collections.emptySet())); purposeNode.getChildren().stream().filter(node -> !nodeTransactionMap.containsKey(node)).forEach(node -> nodeTransactionMap.put(node, Collections.emptySet()));
} }
@ -172,12 +174,14 @@ public class ElectrumServer {
return highestIndex + wallet.getGapLimit() + 1; return highestIndex + wallet.getGapLimit() + 1;
} }
public void getReferences(Wallet wallet, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap) throws ServerException { public void getReferences(Wallet wallet, Collection<WalletNode> nodes, Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap, int startIndex) throws ServerException {
try { try {
Map<String, String> pathScriptHashes = new LinkedHashMap<>(nodes.size()); Map<String, String> pathScriptHashes = new LinkedHashMap<>(nodes.size());
for(WalletNode node : nodes) { for(WalletNode node : nodes) {
if(node.getIndex() >= startIndex) {
pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node)); pathScriptHashes.put(node.getDerivationPath(), getScriptHash(wallet, node));
} }
}
if(pathScriptHashes.isEmpty()) { if(pathScriptHashes.isEmpty()) {
return; return;
@ -226,7 +230,11 @@ public class ElectrumServer {
Set<String> scriptHashes = new HashSet<>(); Set<String> scriptHashes = new HashSet<>();
Map<String, String> pathScriptHashes = new LinkedHashMap<>(); Map<String, String> pathScriptHashes = new LinkedHashMap<>();
for(WalletNode node : nodes) { for(WalletNode node : nodes) {
if(node.getIndex() >= startIndex) { if(node == null) {
log.error("Null node for wallet " + wallet.getName() + " subscribing nodes " + nodes + " startIndex " + startIndex);
}
if(node != null && node.getIndex() >= startIndex) {
String scriptHash = getScriptHash(wallet, node); String scriptHash = getScriptHash(wallet, node);
if(getSubscribedScriptHashStatus(scriptHash) != null) { if(getSubscribedScriptHashStatus(scriptHash) != null) {
//Already subscribed, but still need to fetch history from a used node //Already subscribed, but still need to fetch history from a used node
@ -318,6 +326,14 @@ public class ElectrumServer {
references.addAll(nodeReferences); references.addAll(nodeReferences);
} }
for(Iterator<BlockTransactionHash> iter = references.iterator(); iter.hasNext(); ) {
BlockTransactionHash reference = iter.next();
BlockTransaction blockTransaction = wallet.getTransactions().get(reference.getHash());
if(blockTransaction != null && reference.getHeight() == blockTransaction.getHeight()) {
iter.remove();
}
}
Map<Sha256Hash, BlockTransaction> transactionMap = new HashMap<>(); Map<Sha256Hash, BlockTransaction> transactionMap = new HashMap<>();
if(!references.isEmpty()) { if(!references.isEmpty()) {
Map<Integer, BlockHeader> blockHeaderMap = getBlockHeaders(references); Map<Integer, BlockHeader> blockHeaderMap = getBlockHeaders(references);
@ -388,7 +404,14 @@ public class ElectrumServer {
} }
byte[] rawtx = Utils.hexToBytes(strRawTx); byte[] rawtx = Utils.hexToBytes(strRawTx);
Transaction transaction = new Transaction(rawtx); Transaction transaction;
try {
transaction = new Transaction(rawtx);
} catch(ProtocolException e) {
log.error("Could not parse tx: " + strRawTx);
continue;
}
Optional<BlockTransactionHash> optionalReference = references.stream().filter(reference -> reference.getHash().equals(hash)).findFirst(); Optional<BlockTransactionHash> optionalReference = references.stream().filter(reference -> reference.getHash().equals(hash)).findFirst();
if(optionalReference.isEmpty()) { if(optionalReference.isEmpty()) {
@ -439,7 +462,9 @@ public class ElectrumServer {
Set<BlockTransactionHash> history = nodeTransactionMap.get(node); Set<BlockTransactionHash> history = nodeTransactionMap.get(node);
for(BlockTransactionHash reference : history) { for(BlockTransactionHash reference : history) {
BlockTransaction blockTransaction = wallet.getTransactions().get(reference.getHash()); BlockTransaction blockTransaction = wallet.getTransactions().get(reference.getHash());
if(blockTransaction == null || blockTransaction.equals(UNFETCHABLE_BLOCK_TRANSACTION)) { if(blockTransaction == null) {
throw new IllegalStateException("Did not retrieve transaction for hash " + reference.getHashAsString());
} else if(blockTransaction.equals(UNFETCHABLE_BLOCK_TRANSACTION)) {
throw new IllegalStateException("Could not retrieve transaction for hash " + reference.getHashAsString()); throw new IllegalStateException("Could not retrieve transaction for hash " + reference.getHashAsString());
} }
Transaction transaction = blockTransaction.getTransaction(); Transaction transaction = blockTransaction.getTransaction();
@ -760,6 +785,7 @@ public class ElectrumServer {
public static class TransactionHistoryService extends Service<Boolean> { public static class TransactionHistoryService extends Service<Boolean> {
private final Wallet wallet; private final Wallet wallet;
private final Set<WalletNode> nodes; private final Set<WalletNode> nodes;
private final static Map<Wallet, Object> walletSynchronizeLocks = new HashMap<>();
public TransactionHistoryService(Wallet wallet) { public TransactionHistoryService(Wallet wallet) {
this.wallet = wallet; this.wallet = wallet;
@ -775,12 +801,15 @@ public class ElectrumServer {
protected Task<Boolean> createTask() { protected Task<Boolean> createTask() {
return new Task<>() { return new Task<>() {
protected Boolean call() throws ServerException { protected Boolean call() throws ServerException {
walletSynchronizeLocks.putIfAbsent(wallet, new Object());
synchronized(walletSynchronizeLocks.get(wallet)) {
ElectrumServer electrumServer = new ElectrumServer(); ElectrumServer electrumServer = new ElectrumServer();
Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = (nodes == null ? electrumServer.getHistory(wallet) : electrumServer.getHistory(wallet, nodes)); Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = (nodes == null ? electrumServer.getHistory(wallet) : electrumServer.getHistory(wallet, nodes));
electrumServer.getReferencedTransactions(wallet, nodeTransactionMap); electrumServer.getReferencedTransactions(wallet, nodeTransactionMap);
electrumServer.calculateNodeHistory(wallet, nodeTransactionMap); electrumServer.calculateNodeHistory(wallet, nodeTransactionMap);
return true; return true;
} }
}
}; };
} }
} }

View file

@ -0,0 +1,54 @@
package com.sparrowwallet.sparrow.net;
import java.util.List;
/**
* Generic retry logic. Delegate must throw the specified exception type to trigger the retry logic.
*/
public class RetryLogic<T> {
public static interface Delegate<T> {
T call() throws Exception;
}
private final int maxAttempts;
private final int retryWaitSeconds;
@SuppressWarnings("rawtypes")
private final List<Class> retryExceptionTypes;
public RetryLogic(int maxAttempts, int retryWaitSeconds, @SuppressWarnings("rawtypes") Class retryExceptionType) {
this(maxAttempts, retryWaitSeconds, List.of(retryExceptionType));
}
public RetryLogic(int maxAttempts, int retryWaitSeconds, @SuppressWarnings("rawtypes") List<Class> retryExceptionTypes) {
this.maxAttempts = maxAttempts;
this.retryWaitSeconds = retryWaitSeconds;
this.retryExceptionTypes = retryExceptionTypes;
}
public T getResult(Delegate<T> caller) throws Exception {
T result = null;
int remainingAttempts = maxAttempts;
do {
try {
return caller.call();
} catch(Exception e) {
if(retryExceptionTypes.contains(e.getClass())) {
if(--remainingAttempts == 0) {
throw new ServerException("Retries exhausted", e);
} else {
try {
Thread.sleep((1000 * retryWaitSeconds));
} catch(InterruptedException ie) {
//ignore
}
}
} else {
throw e;
}
}
} while(remainingAttempts > 0);
throw new IllegalStateException("Should be impossible");
}
}

View file

@ -18,7 +18,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class SimpleElectrumServerRpc implements ElectrumServerRpc { public class SimpleElectrumServerRpc implements ElectrumServerRpc {
private static final Logger log = LoggerFactory.getLogger(SimpleElectrumServerRpc.class); private static final Logger log = LoggerFactory.getLogger(SimpleElectrumServerRpc.class);
private static final int MAX_TARGET_BLOCKS = 25; private static final int MAX_TARGET_BLOCKS = 25;
private static final int PER_REQUEST_DELAY_MILLIS = 50; private static final int MAX_RETRIES = 3;
private static final int RETRY_DELAY = 0;
private final AtomicLong idCounter = new AtomicLong(); private final AtomicLong idCounter = new AtomicLong();
@ -26,8 +27,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
public void ping(Transport transport) { public void ping(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable(); new RetryLogic<>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error pinging server", e); throw new ElectrumServerRpcException("Error pinging server", e);
} }
} }
@ -36,9 +38,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
public List<String> getServerVersion(Transport transport, String clientName, String[] supportedVersions) { public List<String> getServerVersion(Transport transport, String clientName, String[] supportedVersions) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
//Using 1.4 as the version number as EPS tries to parse this number to a float //Using 1.4 as the version number as EPS tries to parse this number to a float :(
return client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).params(clientName, "1.4").execute(); return new RetryLogic<List<String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).params(clientName, "1.4").execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting server version", e); throw new ElectrumServerRpcException("Error getting server version", e);
} }
} }
@ -47,8 +50,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
public String getServerBanner(Transport transport) { public String getServerBanner(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute(); return new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting server banner", e); throw new ElectrumServerRpcException("Error getting server banner", e);
} }
} }
@ -57,8 +61,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
public BlockHeaderTip subscribeBlockHeaders(Transport transport) { public BlockHeaderTip subscribeBlockHeaders(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute(); return new RetryLogic<BlockHeaderTip>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException e) { client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error subscribing to block headers", e); throw new ElectrumServerRpcException("Error subscribing to block headers", e);
} }
} }
@ -71,10 +76,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
for(String path : pathScriptHashes.keySet()) { for(String path : pathScriptHashes.keySet()) {
EventManager.get().post(new WalletHistoryStatusEvent(false, "Loading transactions for " + path)); EventManager.get().post(new WalletHistoryStatusEvent(false, "Loading transactions for " + path));
try { try {
ScriptHashTx[] scriptHashTxes = client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_history").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute(); ScriptHashTx[] scriptHashTxes = new RetryLogic<ScriptHashTx[]>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() ->
client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_history").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute());
result.put(path, scriptHashTxes); result.put(path, scriptHashTxes);
Thread.sleep(PER_REQUEST_DELAY_MILLIS); } catch(Exception e) {
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e);
} }
@ -93,10 +98,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
Map<String, ScriptHashTx[]> result = new LinkedHashMap<>(); Map<String, ScriptHashTx[]> result = new LinkedHashMap<>();
for(String path : pathScriptHashes.keySet()) { for(String path : pathScriptHashes.keySet()) {
try { try {
ScriptHashTx[] scriptHashTxes = client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_mempool").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute(); ScriptHashTx[] scriptHashTxes = new RetryLogic<ScriptHashTx[]>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() ->
client.createRequest().returnAs(ScriptHashTx[].class).method("blockchain.scripthash.get_mempool").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).execute());
result.put(path, scriptHashTxes); result.put(path, scriptHashTxes);
Thread.sleep(PER_REQUEST_DELAY_MILLIS); } catch(Exception e) {
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e);
} }
@ -116,10 +121,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
for(String path : pathScriptHashes.keySet()) { for(String path : pathScriptHashes.keySet()) {
EventManager.get().post(new WalletHistoryStatusEvent(false, "Finding transactions for " + path)); EventManager.get().post(new WalletHistoryStatusEvent(false, "Finding transactions for " + path));
try { try {
String scriptHash = client.createRequest().returnAs(String.class).method("blockchain.scripthash.subscribe").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).executeNullable(); String scriptHash = new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() ->
client.createRequest().returnAs(String.class).method("blockchain.scripthash.subscribe").id(path + "-" + idCounter.incrementAndGet()).params(pathScriptHashes.get(path)).executeNullable());
result.put(path, scriptHash); result.put(path, scriptHash);
Thread.sleep(PER_REQUEST_DELAY_MILLIS); } catch(Exception e) {
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) {
//Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed.
throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e); throw new ElectrumServerRpcException("Failed to retrieve reference for path: " + path, e);
} }
@ -136,13 +141,13 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
for(Integer blockHeight : blockHeights) { for(Integer blockHeight : blockHeights) {
EventManager.get().post(new WalletHistoryStatusEvent(false, "Retrieving block at height " + blockHeight)); EventManager.get().post(new WalletHistoryStatusEvent(false, "Retrieving block at height " + blockHeight));
try { try {
String blockHeader = client.createRequest().returnAs(String.class).method("blockchain.block.header").id(idCounter.incrementAndGet()).params(blockHeight).execute(); String blockHeader = new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() ->
client.createRequest().returnAs(String.class).method("blockchain.block.header").id(idCounter.incrementAndGet()).params(blockHeight).execute());
result.put(blockHeight, blockHeader); result.put(blockHeight, blockHeader);
Thread.sleep(PER_REQUEST_DELAY_MILLIS);
} catch(IllegalStateException | IllegalArgumentException | InterruptedException e) {
log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getMessage() + ")");
} catch(JsonRpcException e) { } catch(JsonRpcException e) {
log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getErrorMessage() + ")"); log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getErrorMessage() + ")");
} catch(Exception e) {
log.warn("Failed to retrieve block header for block height: " + blockHeight + " (" + e.getMessage() + ")");
} }
} }
@ -157,10 +162,10 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
for(String txid : txids) { for(String txid : txids) {
EventManager.get().post(new WalletHistoryStatusEvent(false, "Retrieving transaction [" + txid.substring(0, 6) + "]")); EventManager.get().post(new WalletHistoryStatusEvent(false, "Retrieving transaction [" + txid.substring(0, 6) + "]"));
try { try {
String rawTxHex = client.createRequest().returnAs(String.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid).execute(); String rawTxHex = new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(() ->
client.createRequest().returnAs(String.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid).execute());
result.put(txid, rawTxHex); result.put(txid, rawTxHex);
Thread.sleep(PER_REQUEST_DELAY_MILLIS); } catch(Exception e) {
} catch(JsonRpcException | IllegalStateException | IllegalArgumentException | InterruptedException e) {
result.put(txid, Sha256Hash.ZERO_HASH.toString()); result.put(txid, Sha256Hash.ZERO_HASH.toString());
} }
} }
@ -175,9 +180,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
Map<String, VerboseTransaction> result = new LinkedHashMap<>(); Map<String, VerboseTransaction> result = new LinkedHashMap<>();
for(String txid : txids) { for(String txid : txids) {
try { try {
VerboseTransaction verboseTransaction = client.createRequest().returnAs(VerboseTransaction.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid, true).execute(); VerboseTransaction verboseTransaction = new RetryLogic<VerboseTransaction>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(VerboseTransaction.class).method("blockchain.transaction.get").id(idCounter.incrementAndGet()).params(txid, true).execute());
result.put(txid, verboseTransaction); result.put(txid, verboseTransaction);
Thread.sleep(PER_REQUEST_DELAY_MILLIS);
} catch(Exception e) { } catch(Exception e) {
//electrs-esplora does not currently support the verbose parameter, so try to fetch an incomplete VerboseTransaction without it //electrs-esplora does not currently support the verbose parameter, so try to fetch an incomplete VerboseTransaction without it
//Note that without the script hash associated with the transaction, we can't get a block height as there is no way in the Electrum RPC protocol to do this //Note that without the script hash associated with the transaction, we can't get a block height as there is no way in the Electrum RPC protocol to do this
@ -223,14 +228,14 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
for(Integer targetBlock : targetBlocks) { for(Integer targetBlock : targetBlocks) {
if(targetBlock <= MAX_TARGET_BLOCKS) { if(targetBlock <= MAX_TARGET_BLOCKS) {
try { try {
Double targetBlocksFeeRateBtcKb = client.createRequest().returnAs(Double.class).method("blockchain.estimatefee").id(idCounter.incrementAndGet()).params(targetBlock).execute(); Double targetBlocksFeeRateBtcKb = new RetryLogic<Double>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(Double.class).method("blockchain.estimatefee").id(idCounter.incrementAndGet()).params(targetBlock).execute());
result.put(targetBlock, targetBlocksFeeRateBtcKb); result.put(targetBlock, targetBlocksFeeRateBtcKb);
Thread.sleep(PER_REQUEST_DELAY_MILLIS);
} catch(IllegalStateException | IllegalArgumentException | InterruptedException e) {
log.warn("Failed to retrieve fee rate for target blocks: " + targetBlock + " (" + e.getMessage() + ")");
result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d));
} catch(JsonRpcException e) { } catch(JsonRpcException e) {
throw new ElectrumServerRpcException("Failed to retrieve fee rate for target blocks: " + targetBlock, e); throw new ElectrumServerRpcException("Failed to retrieve fee rate for target blocks: " + targetBlock, e);
} catch(Exception e) {
log.warn("Failed to retrieve fee rate for target blocks: " + targetBlock + " (" + e.getMessage() + ")");
result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d));
} }
} else { } else {
result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d)); result.put(targetBlock, result.values().stream().mapToDouble(v -> v).min().orElse(0.0001d));
@ -244,8 +249,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
public Double getMinimumRelayFee(Transport transport) { public Double getMinimumRelayFee(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute(); return new RetryLogic<Double>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(JsonRpcException e) { client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) {
throw new ElectrumServerRpcException("Error getting minimum relay fee", e); throw new ElectrumServerRpcException("Error getting minimum relay fee", e);
} }
} }
@ -254,11 +260,12 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
public String broadcastTransaction(Transport transport, String txHex) { public String broadcastTransaction(Transport transport, String txHex) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).params(txHex).execute(); return new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() ->
} catch(IllegalStateException | IllegalArgumentException e) { client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).params(txHex).execute());
throw new ElectrumServerRpcException(e.getMessage(), e);
} catch(JsonRpcException e) { } catch(JsonRpcException e) {
throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e); throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e);
} catch(Exception e) {
throw new ElectrumServerRpcException(e.getMessage(), e);
} }
} }
} }

View file

@ -19,6 +19,7 @@ public class TcpTransport implements Transport, Closeable {
private static final Logger log = LoggerFactory.getLogger(TcpTransport.class); private static final Logger log = LoggerFactory.getLogger(TcpTransport.class);
public static final int DEFAULT_PORT = 50001; public static final int DEFAULT_PORT = 50001;
private static final int READ_TIMEOUT_SECS = 3;
protected final HostAndPort server; protected final HostAndPort server;
protected final SocketFactory socketFactory; protected final SocketFactory socketFactory;
@ -64,7 +65,7 @@ public class TcpTransport implements Transport, Closeable {
private String readResponse() throws IOException { private String readResponse() throws IOException {
try { try {
if(!readLock.tryLock(60, TimeUnit.SECONDS)) { if(!readLock.tryLock(READ_TIMEOUT_SECS, TimeUnit.SECONDS)) {
throw new IOException("No response from server"); throw new IOException("No response from server");
} }
} catch(InterruptedException e) { } catch(InterruptedException e) {