add paging for batched server requests with configurable page size defaulting to 500 ids

This commit is contained in:
Craig Raw 2021-12-09 09:29:50 +02:00
parent 14db333a6f
commit b013b5f50f
3 changed files with 161 additions and 20 deletions

View file

@ -15,6 +15,7 @@ import java.lang.reflect.Type;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.sparrowwallet.sparrow.net.PagedBatchRequestBuilder.DEFAULT_PAGE_SIZE;
import static com.sparrowwallet.sparrow.net.TcpTransport.DEFAULT_MAX_TIMEOUT; import static com.sparrowwallet.sparrow.net.TcpTransport.DEFAULT_MAX_TIMEOUT;
public class Config { public class Config {
@ -59,6 +60,7 @@ public class Config {
private boolean useProxy; private boolean useProxy;
private String proxyServer; private String proxyServer;
private int maxServerTimeout = DEFAULT_MAX_TIMEOUT; private int maxServerTimeout = DEFAULT_MAX_TIMEOUT;
private int batchPageSize = DEFAULT_PAGE_SIZE;
private boolean usePayNym; private boolean usePayNym;
private boolean sameAppMixing; private boolean sameAppMixing;
private Double appWidth; private Double appWidth;
@ -501,9 +503,8 @@ public class Config {
return maxServerTimeout; return maxServerTimeout;
} }
public void setMaxServerTimeout(int maxServerTimeout) { public int getBatchPageSize() {
this.maxServerTimeout = maxServerTimeout; return batchPageSize;
flush();
} }
public boolean isUsePayNym() { public boolean isUsePayNym() {

View file

@ -21,8 +21,8 @@ import static com.sparrowwallet.drongo.wallet.WalletNode.nodeRangesToString;
public class BatchedElectrumServerRpc implements ElectrumServerRpc { public class BatchedElectrumServerRpc implements ElectrumServerRpc {
private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class); private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class);
private static final int MAX_RETRIES = 5; static final int MAX_RETRIES = 5;
private static final int RETRY_DELAY = 1; static final int RETRY_DELAY = 1;
private final AtomicLong idCounter = new AtomicLong(); private final AtomicLong idCounter = new AtomicLong();
@ -73,8 +73,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, ScriptHashTx[]> getScriptHashHistory(Transport transport, Wallet wallet, Map<String, String> pathScriptHashes, boolean failOnError) { public Map<String, ScriptHashTx[]> getScriptHashHistory(Transport transport, Wallet wallet, Map<String, String> pathScriptHashes, boolean failOnError) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<String, ScriptHashTx[]> batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(ScriptHashTx[].class);
BatchRequestBuilder<String, ScriptHashTx[]> batchRequest = client.createBatchRequest().keysType(String.class).returnType(ScriptHashTx[].class);
EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Loading transactions for " + nodeRangesToString(pathScriptHashes.keySet()))); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Loading transactions for " + nodeRangesToString(pathScriptHashes.keySet())));
for(String path : pathScriptHashes.keySet()) { for(String path : pathScriptHashes.keySet()) {
@ -82,7 +81,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); return batchRequest.execute();
} catch (JsonRpcBatchException e) { } catch (JsonRpcBatchException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve transaction history for paths: " + nodeRangesToString((Collection<String>)e.getErrors().keySet()), e); throw new ElectrumServerRpcException("Failed to retrieve transaction history for paths: " + nodeRangesToString((Collection<String>)e.getErrors().keySet()), e);
@ -102,15 +101,14 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, ScriptHashTx[]> getScriptHashMempool(Transport transport, Wallet wallet, Map<String, String> pathScriptHashes, boolean failOnError) { public Map<String, ScriptHashTx[]> getScriptHashMempool(Transport transport, Wallet wallet, Map<String, String> pathScriptHashes, boolean failOnError) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<String, ScriptHashTx[]> batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(ScriptHashTx[].class);
BatchRequestBuilder<String, ScriptHashTx[]> batchRequest = client.createBatchRequest().keysType(String.class).returnType(ScriptHashTx[].class);
for(String path : pathScriptHashes.keySet()) { for(String path : pathScriptHashes.keySet()) {
batchRequest.add(path, "blockchain.scripthash.get_mempool", pathScriptHashes.get(path)); batchRequest.add(path, "blockchain.scripthash.get_mempool", pathScriptHashes.get(path));
} }
try { try {
return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); return batchRequest.execute();
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve mempool transactions for paths: " + nodeRangesToString((Collection<String>)e.getErrors().keySet()), e); throw new ElectrumServerRpcException("Failed to retrieve mempool transactions for paths: " + nodeRangesToString((Collection<String>)e.getErrors().keySet()), e);
@ -130,8 +128,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, String> subscribeScriptHashes(Transport transport, Wallet wallet, Map<String, String> pathScriptHashes) { public Map<String, String> subscribeScriptHashes(Transport transport, Wallet wallet, Map<String, String> pathScriptHashes) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<String, String> batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(String.class);
BatchRequestBuilder<String, String> batchRequest = client.createBatchRequest().keysType(String.class).returnType(String.class);
EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Finding transactions for " + nodeRangesToString(pathScriptHashes.keySet()))); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Finding transactions for " + nodeRangesToString(pathScriptHashes.keySet())));
for(String path : pathScriptHashes.keySet()) { for(String path : pathScriptHashes.keySet()) {
@ -139,7 +136,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<String, String>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); return batchRequest.execute();
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
//Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed.
throw new ElectrumServerRpcException("Failed to subscribe to paths: " + nodeRangesToString((Collection<String>)e.getErrors().keySet()), e); throw new ElectrumServerRpcException("Failed to subscribe to paths: " + nodeRangesToString((Collection<String>)e.getErrors().keySet()), e);
@ -151,8 +148,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<Integer, String> getBlockHeaders(Transport transport, Wallet wallet, Set<Integer> blockHeights) { public Map<Integer, String> getBlockHeaders(Transport transport, Wallet wallet, Set<Integer> blockHeights) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<Integer, String> batchRequest = PagedBatchRequestBuilder.create(transport).keysType(Integer.class).returnType(String.class);
BatchRequestBuilder<Integer, String> batchRequest = client.createBatchRequest().keysType(Integer.class).returnType(String.class);
EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Retrieving " + blockHeights.size() + " block headers")); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Retrieving " + blockHeights.size() + " block headers"));
for(Integer height : blockHeights) { for(Integer height : blockHeights) {
@ -160,7 +156,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<Integer, String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return batchRequest.execute();
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
return (Map<Integer, String>)e.getSuccesses(); return (Map<Integer, String>)e.getSuccesses();
} catch(Exception e) { } catch(Exception e) {
@ -171,8 +167,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, String> getTransactions(Transport transport, Wallet wallet, Set<String> txids) { public Map<String, String> getTransactions(Transport transport, Wallet wallet, Set<String> txids) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<String, String> batchRequest = PagedBatchRequestBuilder.create(transport).keysType(String.class).returnType(String.class);
BatchRequestBuilder<String, String> batchRequest = client.createBatchRequest().keysType(String.class).returnType(String.class);
EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Retrieving " + txids.size() + " transactions")); EventManager.get().post(new WalletHistoryStatusEvent(wallet, true, "Retrieving " + txids.size() + " transactions"));
for(String txid : txids) { for(String txid : txids) {
@ -180,7 +175,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<String, String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return batchRequest.execute();
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
Map<String, String> result = (Map<String, String>)e.getSuccesses(); Map<String, String> result = (Map<String, String>)e.getSuccesses();

View file

@ -0,0 +1,145 @@
package com.sparrowwallet.sparrow.net;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.arteam.simplejsonrpc.client.JsonRpcClient;
import com.github.arteam.simplejsonrpc.client.Transport;
import com.github.arteam.simplejsonrpc.client.builder.AbstractBuilder;
import com.github.arteam.simplejsonrpc.client.builder.BatchRequestBuilder;
import com.google.common.collect.Lists;
import com.sparrowwallet.sparrow.io.Config;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.*;
import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.MAX_RETRIES;
import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.RETRY_DELAY;
public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
public static final int DEFAULT_PAGE_SIZE = 500;
@NotNull
private final List<Request> requests;
/**
* Type of request ids
*/
@Nullable
private final Class<K> keysType;
/**
* Expected return type for all requests
* <p/>
* This property works exclusively with {@code returnTypes}. Only one of them should be set.
*/
@Nullable
private final Class<V> returnType;
/**
* Creates a new batch request builder in an initial state
*
* @param transport transport for request performing
* @param mapper mapper for JSON processing
*/
public PagedBatchRequestBuilder(@NotNull Transport transport, @NotNull ObjectMapper mapper) {
this(transport, mapper, new ArrayList<Request>(), null, null);
}
public PagedBatchRequestBuilder(@NotNull Transport transport, @NotNull ObjectMapper mapper,
@NotNull List<Request> requests,
@Nullable Class<K> keysType, @Nullable Class<V> returnType) {
super(transport, mapper);
this.requests = requests;
this.keysType = keysType;
this.returnType = returnType;
}
/**
* Adds a new request without specifying a return type
*
* @param id request id as a text value
* @param method request method
* @param param request param
* @return the current builder
*/
@NotNull
public PagedBatchRequestBuilder<K, V> add(Object id, @NotNull String method, @NotNull Object param) {
requests.add(new Request(id, method, param));
return this;
}
/**
* Sets type of request keys.
* The purpose of this method is providing static and runtime type safety of processing of batch responses
*
* @param keysClass type of keys
* @param <NK> type of keys
* @return a new builder
*/
public <NK> PagedBatchRequestBuilder<NK, V> keysType(@NotNull Class<NK> keysClass) {
return new PagedBatchRequestBuilder<NK, V>(transport, mapper, requests, keysClass, returnType);
}
/**
* Sets an expected response type of requests.
* This method is preferred when requests have the same response type.
*
* @param valuesClass expected requests return type
* @param <NV> expected requests return type
* @return a new builder
*/
public <NV> PagedBatchRequestBuilder<K, NV> returnType(@NotNull Class<NV> valuesClass) {
return new PagedBatchRequestBuilder<K, NV>(transport, mapper, requests, keysType, valuesClass);
}
/**
* Validates, executes the request and process response
*
* @return map of responses by request ids
*/
@NotNull
public Map<K, V> execute() throws Exception {
Map<K, V> allResults = new HashMap<>();
JsonRpcClient client = new JsonRpcClient(transport);
List<List<Request>> pages = Lists.partition(requests, getPageSize());
for(List<Request> page : pages) {
BatchRequestBuilder<K, V> batchRequest = client.createBatchRequest().keysType(keysType).returnType(returnType);
for(Request request : page) {
if(request.id instanceof String strReq) {
batchRequest.add(strReq, request.method, request.param);
} else if(request.id instanceof Integer intReq) {
batchRequest.add(intReq, request.method, request.param);
} else {
throw new IllegalArgumentException("Id of class " + request.id.getClass().getName() + " not supported");
}
}
Map<K, V> pageResult = new RetryLogic<Map<K, V>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute);
allResults.putAll(pageResult);
}
return allResults;
}
private int getPageSize() {
int pageSize = Config.get().getBatchPageSize();
if(pageSize < 1) {
pageSize = DEFAULT_PAGE_SIZE;
}
return pageSize;
}
/**
* Creates a builder of a JSON-RPC batch request in initial state
*
* @return batch request builder
*/
@NotNull
public static PagedBatchRequestBuilder<?, ?> create(Transport transport) {
return new PagedBatchRequestBuilder<Object, Object>(transport, new ObjectMapper());
}
private static record Request(Object id, String method, Object param) {}
}