support electrs batching and improve batching read timeout handling

This commit is contained in:
Craig Raw 2021-05-24 13:39:06 +02:00
parent 42b279d22a
commit 9ebbf2557f
6 changed files with 95 additions and 65 deletions

View file

@ -21,8 +21,8 @@ import java.util.stream.Collectors;
public class BatchedElectrumServerRpc implements ElectrumServerRpc { public class BatchedElectrumServerRpc implements ElectrumServerRpc {
private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class); private static final Logger log = LoggerFactory.getLogger(BatchedElectrumServerRpc.class);
private static final int MAX_RETRIES = 3; private static final int MAX_RETRIES = 5;
private static final int RETRY_DELAY = 0; private static final int RETRY_DELAY = 1;
private final AtomicLong idCounter = new AtomicLong(); private final AtomicLong idCounter = new AtomicLong();
@ -82,7 +82,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute);
} catch (JsonRpcBatchException e) { } catch (JsonRpcBatchException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve transaction history for paths: " + getScriptHashesAbbreviation((Collection<String>)e.getErrors().keySet()), e); throw new ElectrumServerRpcException("Failed to retrieve transaction history for paths: " + getScriptHashesAbbreviation((Collection<String>)e.getErrors().keySet()), e);
@ -110,7 +110,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return new RetryLogic<Map<String, ScriptHashTx[]>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
if(failOnError) { if(failOnError) {
throw new ElectrumServerRpcException("Failed to retrieve mempool transactions for paths: " + getScriptHashesAbbreviation((Collection<String>)e.getErrors().keySet()), e); throw new ElectrumServerRpcException("Failed to retrieve mempool transactions for paths: " + getScriptHashesAbbreviation((Collection<String>)e.getErrors().keySet()), e);
@ -139,7 +139,7 @@ public class BatchedElectrumServerRpc implements ElectrumServerRpc {
} }
try { try {
return new RetryLogic<Map<String, String>>(MAX_RETRIES, RETRY_DELAY, IllegalStateException.class).getResult(batchRequest::execute); return new RetryLogic<Map<String, String>>(MAX_RETRIES, RETRY_DELAY, List.of(IllegalStateException.class, IllegalArgumentException.class)).getResult(batchRequest::execute);
} catch(JsonRpcBatchException e) { } catch(JsonRpcBatchException e) {
//Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed. //Even if we have some successes, failure to subscribe for all script hashes will result in outdated wallet view. Don't proceed.
throw new ElectrumServerRpcException("Failed to subscribe to paths: " + getScriptHashesAbbreviation((Collection<String>)e.getErrors().keySet()), e); throw new ElectrumServerRpcException("Failed to subscribe to paths: " + getScriptHashesAbbreviation((Collection<String>)e.getErrors().keySet()), e);

View file

@ -37,6 +37,8 @@ public class ElectrumServer {
private static final String[] SUPPORTED_VERSIONS = new String[]{"1.3", "1.4.2"}; private static final String[] SUPPORTED_VERSIONS = new String[]{"1.3", "1.4.2"};
private static final Version ELECTRS_MIN_BATCHING_VERSION = new Version("0.9.0");
private static final int MINIMUM_BROADCASTS = 2; private static final int MINIMUM_BROADCASTS = 2;
public static final BlockTransaction UNFETCHABLE_BLOCK_TRANSACTION = new BlockTransaction(Sha256Hash.ZERO_HASH, 0, null, null, null); public static final BlockTransaction UNFETCHABLE_BLOCK_TRANSACTION = new BlockTransaction(Sha256Hash.ZERO_HASH, 0, null, null, null);
@ -839,7 +841,26 @@ public class ElectrumServer {
} }
public static boolean supportsBatching(List<String> serverVersion) { public static boolean supportsBatching(List<String> serverVersion) {
return serverVersion.size() > 0 && serverVersion.get(0).toLowerCase().contains("electrumx"); if(serverVersion.size() > 0) {
String server = serverVersion.get(0).toLowerCase();
if(server.contains("electrumx")) {
return true;
}
if(server.startsWith("electrs/")) {
String electrsVersion = server.substring("electrs/".length());
try {
Version version = new Version(electrsVersion);
if(version.compareTo(ELECTRS_MIN_BATCHING_VERSION) >= 0) {
return true;
}
} catch(Exception e) {
//ignore
}
}
}
return false;
} }
public static class ServerVersionService extends Service<List<String>> { public static class ServerVersionService extends Service<List<String>> {

View file

@ -1,6 +1,7 @@
package com.sparrowwallet.sparrow.net; package com.sparrowwallet.sparrow.net;
import java.util.List; import java.util.List;
import java.util.Random;
/** /**
* Generic retry logic. Delegate must throw the specified exception type to trigger the retry logic. * Generic retry logic. Delegate must throw the specified exception type to trigger the retry logic.
@ -21,7 +22,7 @@ public class RetryLogic<T> {
public RetryLogic(int maxAttempts, int retryWaitSeconds, @SuppressWarnings("rawtypes") List<Class> retryExceptionTypes) { public RetryLogic(int maxAttempts, int retryWaitSeconds, @SuppressWarnings("rawtypes") List<Class> retryExceptionTypes) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.retryWaitSeconds = retryWaitSeconds; this.retryWaitSeconds = Math.max(retryWaitSeconds, 1);
this.retryExceptionTypes = retryExceptionTypes; this.retryExceptionTypes = retryExceptionTypes;
} }
@ -37,7 +38,8 @@ public class RetryLogic<T> {
throw new ServerException("Retries exhausted", e); throw new ServerException("Retries exhausted", e);
} else { } else {
try { try {
Thread.sleep((1000 * retryWaitSeconds)); //Sleep with a +/- 2 seconds random wait time to avoid simultaneous retries
Thread.sleep((1000L * (retryWaitSeconds - 1)) + new Random().nextInt(2000));
} catch(InterruptedException ie) { } catch(InterruptedException ie) {
//ignore //ignore
} }

View file

@ -2,6 +2,7 @@ package com.sparrowwallet.sparrow.net;
import com.github.arteam.simplejsonrpc.client.Transport; import com.github.arteam.simplejsonrpc.client.Transport;
import com.github.arteam.simplejsonrpc.server.JsonRpcServer; import com.github.arteam.simplejsonrpc.server.JsonRpcServer;
import com.google.common.base.Splitter;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.sparrowwallet.sparrow.io.Config; import com.sparrowwallet.sparrow.io.Config;
@ -24,7 +25,8 @@ public class TcpTransport implements Transport, Closeable {
private static final Logger log = LoggerFactory.getLogger(TcpTransport.class); private static final Logger log = LoggerFactory.getLogger(TcpTransport.class);
public static final int DEFAULT_PORT = 50001; public static final int DEFAULT_PORT = 50001;
private static final int[] READ_TIMEOUT_SECS = {3, 8, 16, 34}; private static final int[] BASE_READ_TIMEOUT_SECS = {3, 8, 16, 34};
public static final long PER_REQUEST_READ_TIMEOUT_MILLIS = 50;
public static final int SOCKET_READ_TIMEOUT_MILLIS = 5000; public static final int SOCKET_READ_TIMEOUT_MILLIS = 5000;
protected final HostAndPort server; protected final HostAndPort server;
@ -42,6 +44,7 @@ public class TcpTransport implements Transport, Closeable {
private volatile boolean reading = true; private volatile boolean reading = true;
private boolean firstRead = true; private boolean firstRead = true;
private int readTimeoutIndex; private int readTimeoutIndex;
private int requestIdCount = 1;
private final JsonRpcServer jsonRpcServer = new JsonRpcServer(); private final JsonRpcServer jsonRpcServer = new JsonRpcServer();
private final SubscriptionService subscriptionService = new SubscriptionService(); private final SubscriptionService subscriptionService = new SubscriptionService();
@ -66,6 +69,8 @@ public class TcpTransport implements Transport, Closeable {
Rpc recvRpc; Rpc recvRpc;
String recv; String recv;
//Count number of requests in batched query to increase read timeout appropriately
requestIdCount = Splitter.on("\"id\"").splitToList(request).size() - 1;
writeRequest(request); writeRequest(request);
do { do {
recv = readResponse(); recv = readResponse();
@ -86,16 +91,16 @@ public class TcpTransport implements Transport, Closeable {
private String readResponse() throws IOException { private String readResponse() throws IOException {
try { try {
if(!readLock.tryLock(READ_TIMEOUT_SECS[readTimeoutIndex], TimeUnit.SECONDS)) { if(!readLock.tryLock((BASE_READ_TIMEOUT_SECS[readTimeoutIndex] * 1000) + (requestIdCount * PER_REQUEST_READ_TIMEOUT_MILLIS), TimeUnit.MILLISECONDS)) {
readTimeoutIndex = Math.min(readTimeoutIndex + 1, READ_TIMEOUT_SECS.length - 1); readTimeoutIndex = Math.min(readTimeoutIndex + 1, BASE_READ_TIMEOUT_SECS.length - 1);
log.debug("No response from server, setting read timeout to " + READ_TIMEOUT_SECS[readTimeoutIndex] + " secs"); log.info("No response from server, setting read timeout to " + BASE_READ_TIMEOUT_SECS[readTimeoutIndex] + " secs");
throw new IOException("No response from server"); throw new IOException("No response from server");
} }
} catch(InterruptedException e) { } catch(InterruptedException e) {
throw new IOException("Read thread interrupted"); throw new IOException("Read thread interrupted");
} }
if(readTimeoutIndex == READ_TIMEOUT_SECS.length - 1) { if(readTimeoutIndex == BASE_READ_TIMEOUT_SECS.length - 1) {
readTimeoutIndex--; readTimeoutIndex--;
} }

View file

@ -0,0 +1,54 @@
package com.sparrowwallet.sparrow.net;
public class Version implements Comparable<Version> {
private final String version;
public final String get() {
return this.version;
}
public Version(String version) {
if(version == null) {
throw new IllegalArgumentException("Version can not be null");
}
if(!version.matches("[0-9]+(\\.[0-9]+)*")) {
throw new IllegalArgumentException("Invalid version format");
}
this.version = version;
}
@Override
public int compareTo(Version that) {
if(that == null) {
return 1;
}
String[] thisParts = this.get().split("\\.");
String[] thatParts = that.get().split("\\.");
int length = Math.max(thisParts.length, thatParts.length);
for(int i = 0; i < length; i++) {
int thisPart = i < thisParts.length ? Integer.parseInt(thisParts[i]) : 0;
int thatPart = i < thatParts.length ? Integer.parseInt(thatParts[i]) : 0;
if(thisPart < thatPart) {
return -1;
}
if(thisPart > thatPart) {
return 1;
}
}
return 0;
}
@Override
public boolean equals(Object that) {
if(this == that) {
return true;
}
if(that == null) {
return false;
}
if(this.getClass() != that.getClass()) {
return false;
}
return this.compareTo((Version) that) == 0;
}
}

View file

@ -100,56 +100,4 @@ public class VersionCheckService extends ScheduledService<VersionUpdatedEvent> {
public Map<String, String> signatures; public Map<String, String> signatures;
} }
public static class Version implements Comparable<Version> {
private final String version;
public final String get() {
return this.version;
}
public Version(String version) {
if(version == null) {
throw new IllegalArgumentException("Version can not be null");
}
if(!version.matches("[0-9]+(\\.[0-9]+)*")) {
throw new IllegalArgumentException("Invalid version format");
}
this.version = version;
}
@Override
public int compareTo(Version that) {
if(that == null) {
return 1;
}
String[] thisParts = this.get().split("\\.");
String[] thatParts = that.get().split("\\.");
int length = Math.max(thisParts.length, thatParts.length);
for(int i = 0; i < length; i++) {
int thisPart = i < thisParts.length ? Integer.parseInt(thisParts[i]) : 0;
int thatPart = i < thatParts.length ? Integer.parseInt(thatParts[i]) : 0;
if(thisPart < thatPart) {
return -1;
}
if(thisPart > thatPart) {
return 1;
}
}
return 0;
}
@Override
public boolean equals(Object that) {
if(this == that) {
return true;
}
if(that == null) {
return false;
}
if(this.getClass() != that.getClass()) {
return false;
}
return this.compareTo((Version)that) == 0;
}
}
} }