From b013b5f50fbc7c12d7397b555b203bd33bb6357c Mon Sep 17 00:00:00 2001 From: Craig Raw Date: Thu, 9 Dec 2021 09:29:50 +0200 Subject: [PATCH] add paging for batched server requests with configurable page size defaulting to 500 ids --- .../com/sparrowwallet/sparrow/io/Config.java | 7 +- .../sparrow/net/BatchedElectrumServerRpc.java | 29 ++-- .../sparrow/net/PagedBatchRequestBuilder.java | 145 ++++++++++++++++++ 3 files changed, 161 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/sparrowwallet/sparrow/net/PagedBatchRequestBuilder.java diff --git a/src/main/java/com/sparrowwallet/sparrow/io/Config.java b/src/main/java/com/sparrowwallet/sparrow/io/Config.java index 3112e677..b6f22ca6 100644 --- a/src/main/java/com/sparrowwallet/sparrow/io/Config.java +++ b/src/main/java/com/sparrowwallet/sparrow/io/Config.java @@ -15,6 +15,7 @@ import java.lang.reflect.Type; import java.util.*; import java.util.stream.Collectors; +import static com.sparrowwallet.sparrow.net.PagedBatchRequestBuilder.DEFAULT_PAGE_SIZE; import static com.sparrowwallet.sparrow.net.TcpTransport.DEFAULT_MAX_TIMEOUT; public class Config { @@ -59,6 +60,7 @@ public class Config { private boolean useProxy; private String proxyServer; private int maxServerTimeout = DEFAULT_MAX_TIMEOUT; + private int batchPageSize = DEFAULT_PAGE_SIZE; private boolean usePayNym; private boolean sameAppMixing; private Double appWidth; @@ -501,9 +503,8 @@ public class Config { return maxServerTimeout; } - public void setMaxServerTimeout(int maxServerTimeout) { - this.maxServerTimeout = maxServerTimeout; - flush(); + public int getBatchPageSize() { + return batchPageSize; } public boolean isUsePayNym() { diff --git a/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java b/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java index 5029eab6..564931e2 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/BatchedElectrumServerRpc.java @@ -21,8 +21,8 @@ import static com.sparrowwallet.drongo.wallet.WalletNode.nodeRangesToString; public class BatchedElectrumServerRpc implements ElectrumServerRpc { private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class); - private static final int MAX_RETRIES = 5; - private static final int RETRY_DELAY = 1; + static final int MAX_RETRIES = 5; + static final int RETRY_DELAY = 1; private final AtomicLong idCounter = new AtomicLong(); @@ -73,8 +73,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { @Override @SuppressWarnings("unchecked") public Map getScriptHashHistory(Transport transport, Wallet wallet, Map pathScriptHashes, boolean failOnError) { - JsonRpcClient client = new JsonRpcClient(transport); - BatchRequestBuilder batchRequest = client.createBatchRequest().keysType(String.class).returnType(ScriptHashTx[].class); + PagedBatchRequestBuilder batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(ScriptHashTx[].class); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Loading transactions for " + nodeRangesToString(pathScriptHashes.keySet()))); for(String path : pathScriptHashes.keySet()) { @@ -82,7 +81,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); + return batchRequest.execute(); } catch (JsonRpcBatchException e) { if(failOnError) { throw new ElectrumServerRpcException("Failed to retrieve transaction history for paths: " + nodeRangesToString((Collection)e.getErrors().keySet()), e); @@ -102,15 +101,14 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { @Override @SuppressWarnings("unchecked") public Map getScriptHashMempool(Transport transport, Wallet wallet, Map pathScriptHashes, boolean failOnError) { - JsonRpcClient client = new JsonRpcClient(transport); - BatchRequestBuilder batchRequest = client.createBatchRequest().keysType(String.class).returnType(ScriptHashTx[].class); + PagedBatchRequestBuilder batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(ScriptHashTx[].class); for(String path : pathScriptHashes.keySet()) { batchRequest.add(path, "blockchain.scripthash.get_mempool", pathScriptHashes.get(path)); } try { - return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); + return batchRequest.execute(); } catch(JsonRpcBatchException e) { if(failOnError) { throw new ElectrumServerRpcException("Failed to retrieve mempool transactions for paths: " + nodeRangesToString((Collection)e.getErrors().keySet()), e); @@ -130,8 +128,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { @Override @SuppressWarnings("unchecked") public Map subscribeScriptHashes(Transport transport, Wallet wallet, Map pathScriptHashes) { - JsonRpcClient client = new JsonRpcClient(transport); - BatchRequestBuilder batchRequest = client.createBatchRequest().keysType(String.class).returnType(String.class); + PagedBatchRequestBuilder batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(String.class); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Finding transactions for " + nodeRangesToString(pathScriptHashes.keySet()))); for(String path : pathScriptHashes.keySet()) { @@ -139,7 +136,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); + return batchRequest.execute(); } catch(JsonRpcBatchException e) { //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. throw new ElectrumServerRpcException("Failed to subscribe to paths: " + nodeRangesToString((Collection)e.getErrors().keySet()), e); @@ -151,8 +148,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { @Override @SuppressWarnings("unchecked") public Map getBlockHeaders(Transport transport, Wallet wallet, Set blockHeights) { - JsonRpcClient client = new JsonRpcClient(transport); - BatchRequestBuilder batchRequest = client.createBatchRequest().keysType(Integer.class).returnType(String.class); + PagedBatchRequestBuilder batchRequest = PagedBatchRequestBuilder.create(transport).keysType(Integer.class).returnType(String.class); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Retrieving " + blockHeights.size() + " block headers")); for(Integer height : blockHeights) { @@ -160,7 +156,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); + return batchRequest.execute(); } catch(JsonRpcBatchException e) { return (Map)e.getSuccesses(); } catch(Exception e) { @@ -171,8 +167,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { @Override @SuppressWarnings("unchecked") public Map getTransactions(Transport transport, Wallet wallet, Set txids) { - JsonRpcClient client = new JsonRpcClient(transport); - BatchRequestBuilder batchRequest = client.createBatchRequest().keysType(String.class).returnType(String.class); + PagedBatchRequestBuilder batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(String.class); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Retrieving " + txids.size() + " transactions")); for(String txid : txids) { @@ -180,7 +175,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc { } try { - return new RetryLogic>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); + return batchRequest.execute(); } catch(JsonRpcBatchException e) { Map result = (Map)e.getSuccesses(); diff --git a/src/main/java/com/sparrowwallet/sparrow/net/PagedBatchRequestBuilder.java b/src/main/java/com/sparrowwallet/sparrow/net/PagedBatchRequestBuilder.java new file mode 100644 index 00000000..fc8739b1 --- /dev/null +++ b/src/main/java/com/sparrowwallet/sparrow/net/PagedBatchRequestBuilder.java @@ -0,0 +1,145 @@ +package com.sparrowwallet.sparrow.net; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.arteam.simplejsonrpc.client.JsonRpcClient; +import com.github.arteam.simplejsonrpc.client.Transport; +import com.github.arteam.simplejsonrpc.client.builder.AbstractBuilder; +import com.github.arteam.simplejsonrpc.client.builder.BatchRequestBuilder; +import com.google.common.collect.Lists; +import com.sparrowwallet.sparrow.io.Config; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.*; + +import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.MAX_RETRIES; +import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.RETRY_DELAY; + +public class PagedBatchRequestBuilder extends AbstractBuilder { + public static final int DEFAULT_PAGE_SIZE = 500; + + @NotNull + private final List requests; + + /** + * Type of request ids + */ + @Nullable + private final Class keysType; + + /** + * Expected return type for all requests + *

+ * This property works exclusively with {@code returnTypes}. Only one of them should be set. + */ + @Nullable + private final Class returnType; + + /** + * Creates a new batch request builder in an initial state + * + * @param transport transport for request performing + * @param mapper mapper for JSON processing + */ + public PagedBatchRequestBuilder(@NotNull Transport transport, @NotNull ObjectMapper mapper) { + this(transport, mapper, new ArrayList(), null, null); + } + + public PagedBatchRequestBuilder(@NotNull Transport transport, @NotNull ObjectMapper mapper, + @NotNull List requests, + @Nullable Class keysType, @Nullable Class returnType) { + super(transport, mapper); + this.requests = requests; + this.keysType = keysType; + this.returnType = returnType; + } + + /** + * Adds a new request without specifying a return type + * + * @param id request id as a text value + * @param method request method + * @param param request param + * @return the current builder + */ + @NotNull + public PagedBatchRequestBuilder add(Object id, @NotNull String method, @NotNull Object param) { + requests.add(new Request(id, method, param)); + return this; + } + + /** + * Sets type of request keys. + * The purpose of this method is providing static and runtime type safety of processing of batch responses + * + * @param keysClass type of keys + * @param type of keys + * @return a new builder + */ + public PagedBatchRequestBuilder keysType(@NotNull Class keysClass) { + return new PagedBatchRequestBuilder(transport, mapper, requests, keysClass, returnType); + } + + /** + * Sets an expected response type of requests. + * This method is preferred when requests have the same response type. + * + * @param valuesClass expected requests return type + * @param expected requests return type + * @return a new builder + */ + public PagedBatchRequestBuilder returnType(@NotNull Class valuesClass) { + return new PagedBatchRequestBuilder(transport, mapper, requests, keysType, valuesClass); + } + + /** + * Validates, executes the request and process response + * + * @return map of responses by request ids + */ + @NotNull + public Map execute() throws Exception { + Map allResults = new HashMap<>(); + JsonRpcClient client = new JsonRpcClient(transport); + + List> pages = Lists.partition(requests, getPageSize()); + for(List page : pages) { + BatchRequestBuilder batchRequest = client.createBatchRequest().keysType(keysType).returnType(returnType); + for(Request request : page) { + if(request.id instanceof String strReq) { + batchRequest.add(strReq, request.method, request.param); + } else if(request.id instanceof Integer intReq) { + batchRequest.add(intReq, request.method, request.param); + } else { + throw new IllegalArgumentException("Id of class " + request.id.getClass().getName() + " not supported"); + } + } + + Map pageResult = new RetryLogic>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); + allResults.putAll(pageResult); + } + + return allResults; + } + + private int getPageSize() { + int pageSize = Config.get().getBatchPageSize(); + if(pageSize < 1) { + pageSize = DEFAULT_PAGE_SIZE; + } + + return pageSize; + } + + /** + * Creates a builder of a JSON-RPC batch request in initial state + * + * @return batch request builder + */ + @NotNull + public static PagedBatchRequestBuilder create(Transport transport) { + return new PagedBatchRequestBuilder(transport, new ObjectMapper()); + } + + private static record Request(Object id, String method, Object param) {} +}