detect fulcrum batching version, ensure monotonically increasing ids are used for all requests in a session

This commit is contained in:
Craig Raw 2022-01-25 11:50:06 +01:00
parent 9faf036e4d
commit 7d459a9115
5 changed files with 70 additions and 30 deletions

View file

@ -2,7 +2,6 @@ package com.sparrowwallet.sparrow.net;
import com.github.arteam.simplejsonrpc.client.JsonRpcClient; import com.github.arteam.simplejsonrpc.client.JsonRpcClient;
import com.github.arteam.simplejsonrpc.client.Transport; import com.github.arteam.simplejsonrpc.client.Transport;
import com.github.arteam.simplejsonrpc.client.builder.BatchRequestBuilder;
import com.github.arteam.simplejsonrpc.client.exception.JsonRpcBatchException; import com.github.arteam.simplejsonrpc.client.exception.JsonRpcBatchException;
import com.github.arteam.simplejsonrpc.client.exception.JsonRpcException; import com.github.arteam.simplejsonrpc.client.exception.JsonRpcException;
import com.sparrowwallet.drongo.protocol.Sha256Hash; import com.sparrowwallet.drongo.protocol.Sha256Hash;
@ -21,16 +20,24 @@ import static com.sparrowwallet.drongo.wallet.WalletNode.nodeRangesToString;
public class BatchedElectrumServerRpc implements ElectrumServerRpc { public class BatchedElectrumServerRpc implements ElectrumServerRpc {
private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class); private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class);
static final int MAX_RETRIES = 5; static final int DEFAULT_MAX_ATTEMPTS = 5;
static final int RETRY_DELAY = 1; static final int RETRY_DELAY_SECS = 1;
private final AtomicLong idCounter = new AtomicLong(); private final AtomicLong idCounter;
public BatchedElectrumServerRpc() {
this.idCounter = new AtomicLong();
}
public BatchedElectrumServerRpc(long idCounterValue) {
this.idCounter = new AtomicLong(idCounterValue);
}
@Override @Override
public void ping(Transport transport) { public void ping(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
new RetryLogic<>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> new RetryLogic<>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable()); client.createRequest().method("server.ping").id(idCounter.incrementAndGet()).executeNullable());
} catch(Exception e) { } catch(Exception e) {
throw new ElectrumServerRpcException("Error pinging server", e); throw new ElectrumServerRpcException("Error pinging server", e);
@ -41,7 +48,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public List<String> getServerVersion(Transport transport, String clientName, String[] supportedVersions) { public List<String> getServerVersion(Transport transport, String clientName, String[] supportedVersions) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return new RetryLogic<List<String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> return new RetryLogic<List<String>>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).param("client_name", clientName).param("protocol_version", supportedVersions).execute()); client.createRequest().returnAsList(String.class).method("server.version").id(idCounter.incrementAndGet()).param("client_name", clientName).param("protocol_version", supportedVersions).execute());
} catch(Exception e) { } catch(Exception e) {
throw new ElectrumServerRpcException("Error getting server version", e); throw new ElectrumServerRpcException("Error getting server version", e);
@ -52,7 +59,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public String getServerBanner(Transport transport) { public String getServerBanner(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> return new RetryLogic<String>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute()); client.createRequest().returnAs(String.class).method("server.banner").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) { } catch(Exception e) {
throw new ElectrumServerRpcException("Error getting server banner", e); throw new ElectrumServerRpcException("Error getting server banner", e);
@ -63,7 +70,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public BlockHeaderTip subscribeBlockHeaders(Transport transport) { public BlockHeaderTip subscribeBlockHeaders(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return new RetryLogic<BlockHeaderTip>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> return new RetryLogic<BlockHeaderTip>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute()); client.createRequest().returnAs(BlockHeaderTip.class).method("blockchain.headers.subscribe").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) { } catch(Exception e) {
throw new ElectrumServerRpcException("Error subscribing to block headers", e); throw new ElectrumServerRpcException("Error subscribing to block headers", e);
@ -194,15 +201,14 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, VerboseTransaction> getVerboseTransactions(Transport transport, Set<String> txids, String scriptHash) { public Map<String, VerboseTransaction> getVerboseTransactions(Transport transport, Set<String> txids, String scriptHash) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<String, VerboseTransaction> batchRequest = PagedBatchRequestBuilder.create(transport, idCounter).keysType(String.class).returnType(VerboseTransaction.class);
BatchRequestBuilder<String, VerboseTransaction> batchRequest = client.createBatchRequest().keysType(String.class).returnType(VerboseTransaction.class);
for(String txid : txids) { for(String txid : txids) {
batchRequest.add(txid, "blockchain.transaction.get", txid, true); batchRequest.add(txid, "blockchain.transaction.get", txid, true);
} }
try { try {
//The server may return an error if the transaction has not yet been broadcasted - this is a valid state so only try once //The server may return an error if the transaction has not yet been broadcasted - this is a valid state so only try once
return new RetryLogic<Map<String, VerboseTransaction>>(1, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return batchRequest.execute(1);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
log.debug("Some errors retrieving transactions: " + e.getErrors()); log.debug("Some errors retrieving transactions: " + e.getErrors());
return (Map<String, VerboseTransaction>)e.getSuccesses(); return (Map<String, VerboseTransaction>)e.getSuccesses();
@ -213,14 +219,13 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
@Override @Override
public Map<Integer, Double> getFeeEstimates(Transport transport, List<Integer> targetBlocks) { public Map<Integer, Double> getFeeEstimates(Transport transport, List<Integer> targetBlocks) {
JsonRpcClient client = new JsonRpcClient(transport); PagedBatchRequestBuilder<Integer, Double> batchRequest = PagedBatchRequestBuilder.create(transport, idCounter).keysType(Integer.class).returnType(Double.class);
BatchRequestBuilder<Integer, Double> batchRequest = client.createBatchRequest().keysType(Integer.class).returnType(Double.class);
for(Integer targetBlock : targetBlocks) { for(Integer targetBlock : targetBlocks) {
batchRequest.add(targetBlock, "blockchain.estimatefee", targetBlock); batchRequest.add(targetBlock, "blockchain.estimatefee", targetBlock);
} }
try { try {
return new RetryLogic<Map<Integer, Double>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return batchRequest.execute();
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
throw new ElectrumServerRpcException("Error getting fee estimates", e); throw new ElectrumServerRpcException("Error getting fee estimates", e);
} catch(Exception e) { } catch(Exception e) {
@ -232,7 +237,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public Map<Long, Long> getFeeRateHistogram(Transport transport) { public Map<Long, Long> getFeeRateHistogram(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
BigInteger[][] feesArray = new RetryLogic<BigInteger[][]>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> BigInteger[][] feesArray = new RetryLogic<BigInteger[][]>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(BigInteger[][].class).method("mempool.get_fee_histogram").id(idCounter.incrementAndGet()).execute()); client.createRequest().returnAs(BigInteger[][].class).method("mempool.get_fee_histogram").id(idCounter.incrementAndGet()).execute());
Map<Long, Long> feeRateHistogram = new TreeMap<>(); Map<Long, Long> feeRateHistogram = new TreeMap<>();
@ -252,7 +257,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public Double getMinimumRelayFee(Transport transport) { public Double getMinimumRelayFee(Transport transport) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return new RetryLogic<Double>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> return new RetryLogic<Double>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute()); client.createRequest().returnAs(Double.class).method("blockchain.relayfee").id(idCounter.incrementAndGet()).execute());
} catch(Exception e) { } catch(Exception e) {
throw new ElectrumServerRpcException("Error getting minimum relay fee", e); throw new ElectrumServerRpcException("Error getting minimum relay fee", e);
@ -263,7 +268,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
public String broadcastTransaction(Transport transport, String txHex) { public String broadcastTransaction(Transport transport, String txHex) {
try { try {
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
return new RetryLogic<String>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(() -> return new RetryLogic<String>(DEFAULT_MAX_ATTEMPTS, RETRY_DELAY_SECS, IllegalStateException.class).getResult(() ->
client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).params(txHex).execute()); client.createRequest().returnAs(String.class).method("blockchain.transaction.broadcast").id(idCounter.incrementAndGet()).params(txHex).execute());
} catch(JsonRpcException e) { } catch(JsonRpcException e) {
throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e); throw new ElectrumServerRpcException(e.getErrorMessage().getMessage(), e);
@ -271,4 +276,9 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
throw new ElectrumServerRpcException("Error broadcasting transaction", e); throw new ElectrumServerRpcException("Error broadcasting transaction", e);
} }
} }
@Override
public long getIdCounterValue() {
return idCounter.get();
}
} }

View file

@ -38,6 +38,8 @@ public class ElectrumServer {
private static final Version ELECTRS_MIN_BATCHING_VERSION = new Version("0.9.0"); private static final Version ELECTRS_MIN_BATCHING_VERSION = new Version("0.9.0");
private static final Version FULCRUM_MIN_BATCHING_VERSION = new Version("1.6.0");
private static final int MINIMUM_BROADCASTS = 2; private static final int MINIMUM_BROADCASTS = 2;
public static final BlockTransaction UNFETCHABLE_BLOCK_TRANSACTION = new BlockTransaction(Sha256Hash.ZERO_HASH, 0, null, null, null); public static final BlockTransaction UNFETCHABLE_BLOCK_TRANSACTION = new BlockTransaction(Sha256Hash.ZERO_HASH, 0, null, null, null);
@ -968,6 +970,22 @@ public class ElectrumServer {
//ignore //ignore
} }
} }
if(server.startsWith("fulcrum")) {
String fulcrumVersion = server.substring("fulcrum".length()).trim();
int dashIndex = fulcrumVersion.indexOf('-');
if(dashIndex > -1) {
fulcrumVersion = fulcrumVersion.substring(0, dashIndex);
}
try {
Version version = new Version(fulcrumVersion);
if(version.compareTo(FULCRUM_MIN_BATCHING_VERSION) >= 0) {
return true;
}
} catch(Exception e) {
//ignore
}
}
} }
return false; return false;
@ -1083,7 +1101,7 @@ public class ElectrumServer {
//If electrumx is detected, we can upgrade to batched RPC. Electrs/EPS do not support batching. //If electrumx is detected, we can upgrade to batched RPC. Electrs/EPS do not support batching.
if(supportsBatching(serverVersion)) { if(supportsBatching(serverVersion)) {
log.debug("Upgrading to batched JSON-RPC"); log.debug("Upgrading to batched JSON-RPC");
electrumServerRpc = new BatchedElectrumServerRpc(); electrumServerRpc = new BatchedElectrumServerRpc(electrumServerRpc.getIdCounterValue());
} }
BlockHeaderTip tip; BlockHeaderTip tip;

View file

@ -35,4 +35,6 @@ public interface ElectrumServerRpc {
Double getMinimumRelayFee(Transport transport); Double getMinimumRelayFee(Transport transport);
String broadcastTransaction(Transport transport, String txHex); String broadcastTransaction(Transport transport, String txHex);
long getIdCounterValue();
} }

View file

@ -13,8 +13,8 @@ import org.jetbrains.annotations.Nullable;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.MAX_RETRIES; import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.DEFAULT_MAX_ATTEMPTS;
import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.RETRY_DELAY; import static com.sparrowwallet.sparrow.net.BatchedElectrumServerRpc.RETRY_DELAY_SECS;
public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder { public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
public static final int DEFAULT_PAGE_SIZE = 500; public static final int DEFAULT_PAGE_SIZE = 500;
@ -64,12 +64,12 @@ public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
* *
* @param id request id as a text value * @param id request id as a text value
* @param method request method * @param method request method
* @param param request param * @param params request params
* @return the current builder * @return the current builder
*/ */
@NotNull @NotNull
public PagedBatchRequestBuilder<K, V> add(K id, @NotNull String method, @NotNull Object param) { public PagedBatchRequestBuilder<K, V> add(K id, @NotNull String method, @NotNull Object... params) {
requests.add(new Request<K>(id, counter == null ? null : counter.incrementAndGet(), method, param)); requests.add(new Request<K>(id, counter == null ? null : counter.incrementAndGet(), method, params));
return this; return this;
} }
@ -97,13 +97,18 @@ public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
return new PagedBatchRequestBuilder<K, NV>(transport, mapper, requests, keysType, valuesClass, counter); return new PagedBatchRequestBuilder<K, NV>(transport, mapper, requests, keysType, valuesClass, counter);
} }
public Map<K, V> execute() throws Exception {
return execute(DEFAULT_MAX_ATTEMPTS);
}
/** /**
* Validates, executes the request and process response * Validates, executes the request and process response
* *
* @param maxAttempts number of times to try the request
* @return map of responses by request ids * @return map of responses by request ids
*/ */
@NotNull @NotNull
public Map<K, V> execute() throws Exception { public Map<K, V> execute(int maxAttempts) throws Exception {
Map<K, V> allResults = new HashMap<>(); Map<K, V> allResults = new HashMap<>();
JsonRpcClient client = new JsonRpcClient(transport); JsonRpcClient client = new JsonRpcClient(transport);
@ -114,10 +119,10 @@ public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
BatchRequestBuilder<Long, V> batchRequest = client.createBatchRequest().keysType(Long.class).returnType(returnType); BatchRequestBuilder<Long, V> batchRequest = client.createBatchRequest().keysType(Long.class).returnType(returnType);
for(Request<K> request : page) { for(Request<K> request : page) {
counterIdMap.put(request.counterId, request.id); counterIdMap.put(request.counterId, request.id);
batchRequest.add(request.counterId, request.method, request.param); batchRequest.add(request.counterId, request.method, request.params);
} }
Map<Long, V> pageResult = new RetryLogic<Map<Long, V>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); Map<Long, V> pageResult = new RetryLogic<Map<Long, V>>(maxAttempts, RETRY_DELAY_SECS, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute);
for(Map.Entry<Long, V> pageEntry : pageResult.entrySet()) { for(Map.Entry<Long, V> pageEntry : pageResult.entrySet()) {
allResults.put(counterIdMap.get(pageEntry.getKey()), pageEntry.getValue()); allResults.put(counterIdMap.get(pageEntry.getKey()), pageEntry.getValue());
} }
@ -125,15 +130,15 @@ public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
BatchRequestBuilder<K, V> batchRequest = client.createBatchRequest().keysType(keysType).returnType(returnType); BatchRequestBuilder<K, V> batchRequest = client.createBatchRequest().keysType(keysType).returnType(returnType);
for(Request<K> request : page) { for(Request<K> request : page) {
if(request.id instanceof String strReq) { if(request.id instanceof String strReq) {
batchRequest.add(strReq, request.method, request.param); batchRequest.add(strReq, request.method, request.params);
} else if(request.id instanceof Integer intReq) { } else if(request.id instanceof Integer intReq) {
batchRequest.add(intReq, request.method, request.param); batchRequest.add(intReq, request.method, request.params);
} else { } else {
throw new IllegalArgumentException("Id of class " + request.id.getClass().getName() + " not supported"); throw new IllegalArgumentException("Id of class " + request.id.getClass().getName() + " not supported");
} }
} }
Map<K, V> pageResult = new RetryLogic<Map<K, V>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute); Map<K, V> pageResult = new RetryLogic<Map<K, V>>(maxAttempts, RETRY_DELAY_SECS, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute);
allResults.putAll(pageResult); allResults.putAll(pageResult);
} }
} }
@ -170,5 +175,5 @@ public class PagedBatchRequestBuilder<K, V> extends AbstractBuilder {
return new PagedBatchRequestBuilder<Object, Object>(transport, new ObjectMapper(), counter); return new PagedBatchRequestBuilder<Object, Object>(transport, new ObjectMapper(), counter);
} }
private static record Request<K>(K id, Long counterId, String method, Object param) {} private static record Request<K>(K id, Long counterId, String method, Object[] params) {}
} }

View file

@ -297,4 +297,9 @@ public class SimpleElectrumServerRpc implements ElectrumServerRpc {
throw new ElectrumServerRpcException(e.getMessage(), e); throw new ElectrumServerRpcException(e.getMessage(), e);
} }
} }
@Override
public long getIdCounterValue() {
return idCounter.get();
}
} }