electrs server robustness and tuning improvements

This commit is contained in:
Craig Raw 2020-10-15 14:38:30 +02:00
parent ff578c938e
commit b448927a6e
4 changed files with 90 additions and 8 deletions

View file

@ -70,7 +70,7 @@ import java.util.stream.Collectors;
public class AppController implements Initializable {
private static final Logger log = LoggerFactory.getLogger(AppController.class);
private static final int SERVER_PING_PERIOD = 10 * 1000;
private static final int SERVER_PING_PERIOD = 8 * 60 * 1000;
private static final int ENUMERATE_HW_PERIOD = 30 * 1000;
private static final int RATES_PERIOD = 5 * 60 * 1000;
private static final int VERSION_CHECK_PERIOD_HOURS = 24;

View file

@ -139,6 +139,42 @@ public class ElectrumServer {
Map<WalletNode, Set<BlockTransactionHash>> nodeTransactionMap = new TreeMap<>();
subscribeWalletNodes(wallet, nodes, nodeTransactionMap, 0);
getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, 0);
Set<BlockTransactionHash> newReferences = nodeTransactionMap.values().stream().flatMap(Collection::stream).filter(ref -> !wallet.getTransactions().containsKey(ref.getHash())).collect(Collectors.toSet());
getReferencedTransactions(wallet, nodeTransactionMap);
if(!newReferences.isEmpty()) {
//Look for additional nodes to fetch history for by considering the inputs and outputs of new transactions found
log.debug("Found new transactions: " + newReferences);
Set<WalletNode> additionalNodes = new HashSet<>();
Map<String, WalletNode> walletScriptHashes = getAllScriptHashes(wallet);
for(BlockTransactionHash reference : newReferences) {
BlockTransaction blockTransaction = wallet.getTransactions().get(reference.getHash());
for(TransactionOutput txOutput : blockTransaction.getTransaction().getOutputs()) {
WalletNode node = walletScriptHashes.get(getScriptHash(txOutput));
if(node != null && !nodes.contains(node)) {
additionalNodes.add(node);
}
}
for(TransactionInput txInput : blockTransaction.getTransaction().getInputs()) {
BlockTransaction inputBlockTransaction = wallet.getTransactions().get(txInput.getOutpoint().getHash());
if(inputBlockTransaction != null) {
TransactionOutput txOutput = inputBlockTransaction.getTransaction().getOutputs().get((int)txInput.getOutpoint().getIndex());
WalletNode node = walletScriptHashes.get(getScriptHash(txOutput));
if(node != null && !nodes.contains(node)) {
additionalNodes.add(node);
}
}
}
}
if(!additionalNodes.isEmpty()) {
log.debug("Found additional nodes: " + additionalNodes);
subscribeWalletNodes(wallet, additionalNodes, nodeTransactionMap, 0);
getReferences(wallet, additionalNodes, nodeTransactionMap, 0);
getReferencedTransactions(wallet, nodeTransactionMap);
}
}
return nodeTransactionMap;
}
@ -149,7 +185,7 @@ public class ElectrumServer {
subscribeWalletNodes(wallet, purposeNode.getChildren(), nodeTransactionMap, 0);
//All WalletNode keys in nodeTransactionMap need to have their history fetched (nodes without history will not be keys in the map yet)
getReferences(wallet, nodeTransactionMap.keySet(), nodeTransactionMap, 0);
//Fetch all referenced transaction to wallet transactions map
//Fetch all referenced transaction to wallet transactions map. We do this now even though it is done again later to get it done before too many script hashes are subscribed
getReferencedTransactions(wallet, nodeTransactionMap);
//Because node children are added sequentially in WalletNode.fillToIndex, we can simply look at the number of children to determine the highest filled index
@ -589,6 +625,18 @@ public class ElectrumServer {
}
}
public static Map<String, WalletNode> getAllScriptHashes(Wallet wallet) {
Map<String, WalletNode> scriptHashes = new HashMap<>();
List<KeyPurpose> purposes = List.of(KeyPurpose.RECEIVE, KeyPurpose.CHANGE);
for(KeyPurpose keyPurpose : purposes) {
for(WalletNode childNode : wallet.getNode(keyPurpose).getChildren()) {
scriptHashes.put(getScriptHash(wallet, childNode), childNode);
}
}
return scriptHashes;
}
public static String getScriptHash(Wallet wallet, WalletNode node) {
byte[] hash = Sha256Hash.hash(wallet.getOutputScript(node).getProgram());
byte[] reversed = Utils.reverseBytes(hash);
@ -648,7 +696,7 @@ public class ElectrumServer {
}
public static class ConnectionService extends ScheduledService<FeeRatesUpdatedEvent> implements Thread.UncaughtExceptionHandler {
private static final int FEE_RATES_PERIOD = 5 * 60 * 1000;
private static final int FEE_RATES_PERIOD = 10 * 60 * 1000;
private final boolean subscribe;
private boolean firstCall = true;

View file

@ -18,8 +18,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class SimpleElectrumServerRpc implements ElectrumServerRpc {
private static final Logger log = LoggerFactory.getLogger(SimpleElectrumServerRpc.class);
private static final int MAX_TARGET_BLOCKS = 25;
private static final int MAX_RETRIES = 3;
private static final int RETRY_DELAY = 0;
private static final int MAX_RETRIES = 10;
private static final int RETRY_DELAY = 1;
private final AtomicLong idCounter = new AtomicLong();

View file

@ -3,6 +3,7 @@ package com.sparrowwallet.sparrow.net;
import com.github.arteam.simplejsonrpc.client.Transport;
import com.github.arteam.simplejsonrpc.server.JsonRpcServer;
import com.google.common.net.HostAndPort;
import com.google.gson.Gson;
import com.sparrowwallet.sparrow.io.Config;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@ -11,6 +12,7 @@ import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.*;
import java.net.Socket;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@ -19,7 +21,7 @@ public class TcpTransport implements Transport, Closeable {
private static final Logger log = LoggerFactory.getLogger(TcpTransport.class);
public static final int DEFAULT_PORT = 50001;
private static final int READ_TIMEOUT_SECS = 3;
private static final int READ_TIMEOUT_SECS = 15;
protected final HostAndPort server;
protected final SocketFactory socketFactory;
@ -33,13 +35,14 @@ public class TcpTransport implements Transport, Closeable {
private final ReentrantLock clientRequestLock = new ReentrantLock();
private boolean running = false;
private boolean reading = true;
private volatile boolean reading = true;
private boolean firstRead = true;
private final JsonRpcServer jsonRpcServer = new JsonRpcServer();
private final SubscriptionService subscriptionService = new SubscriptionService();
private Exception lastException;
private final Gson gson = new Gson();
public TcpTransport(HostAndPort server) {
this.server = server;
@ -50,8 +53,17 @@ public class TcpTransport implements Transport, Closeable {
public @NotNull String pass(@NotNull String request) throws IOException {
clientRequestLock.lock();
try {
Rpc sentRpc = request.startsWith("{") ? gson.fromJson(request, Rpc.class) : null;
Rpc recvRpc;
String recv;
writeRequest(request);
return readResponse();
do {
recv = readResponse();
recvRpc = recv.startsWith("{") ? gson.fromJson(response, Rpc.class) : null;
} while(!Objects.equals(recvRpc, sentRpc));
return recv;
} finally {
clientRequestLock.unlock();
}
@ -66,6 +78,7 @@ public class TcpTransport implements Transport, Closeable {
private String readResponse() throws IOException {
try {
if(!readLock.tryLock(READ_TIMEOUT_SECS, TimeUnit.SECONDS)) {
log.debug("No response from server");
throw new IOException("No response from server");
}
} catch(InterruptedException e) {
@ -178,4 +191,25 @@ public class TcpTransport implements Transport, Closeable {
socket.close();
}
}
private static class Rpc {
public String id;
@Override
public boolean equals(Object o) {
if(this == o) {
return true;
}
if(o == null || getClass() != o.getClass()) {
return false;
}
Rpc rpc = (Rpc) o;
return Objects.equals(id, rpc.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}
}