tImplement responsive mempool sync - electrum-personal-server - Maximally lightweight electrum server for a single user HTML git clone https://git.parazyd.org/electrum-personal-server DIR Log DIR Files DIR Refs DIR README --- DIR commit 136b95767a1519c7025f384ea0200452613773eb DIR parent fcbd3ce5fc3573e14dcebac3ab7ac44737a95d7a HTML Author: chris-belcher <chris-belcher@users.noreply.github.com> Date: Thu, 4 Mar 2021 15:45:03 +0000 Implement responsive mempool sync Previously when generating fee histogram required by Electrum, the server would use the RPC call `getrawmempool true` which would be very slow during times of large mempools, and cause the server to be unresponsive. This commit instead uses `getrawmempool false` and `getmempoolentry` to obtain all the mempool fees. Because the mempool synchronization is split up over many different RPC calls, the server can always remain responsive even while obtaining the mempool. The typical lag will be at most 1 or 2 seconds. See issue #96 Diffstat: M config.ini_sample | 4 ++++ M electrumpersonalserver/server/__in… | 3 +++ M electrumpersonalserver/server/comm… | 66 +++++++++++++++++++++++++------ M electrumpersonalserver/server/elec… | 45 ++++--------------------------- A electrumpersonalserver/server/memp… | 114 +++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+), 52 deletions(-) --- DIR diff --git a/config.ini_sample b/config.ini_sample t@@ -70,6 +70,10 @@ keyfile = certs/cert.key # This is useful on low powered devices at times when the node mempool is large disable_mempool_fee_histogram = false +# How often in seconds to update the mempool +# this mempool is used to calculate the mempool fee histogram +mempool_update_interval = 60 + # Parameter for broadcasting unconfirmed transactions # Options are: # * tor-or-own-node (use tor if tor is running locally, otherwise own-node) DIR diff --git a/electrumpersonalserver/server/__init__.py b/electrumpersonalserver/server/__init__.py t@@ -46,3 +46,6 @@ from electrumpersonalserver.server.electrumprotocol import ( get_block_headers_hex, DONATION_ADDR, ) +from electrumpersonalserver.server.mempoolhistogram import ( + MempoolSync +) DIR diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py t@@ -1,6 +1,6 @@ import socket import time -import datetime +from datetime import datetime import ssl import os import os.path t@@ -26,12 +26,23 @@ from electrumpersonalserver.server.electrumprotocol import ( get_block_headers_hex, DONATION_ADDR, ) +from electrumpersonalserver.server.mempoolhistogram import ( + MempoolSync, + PollIntervalChange +) ##python has demented rules for variable scope, so these ## global variables are actually mutable lists bestblockhash = [None] -def on_heartbeat_listening(txmonitor): +last_heartbeat_listening = [datetime.now()] +last_heartbeat_connected = [datetime.now()] + +def on_heartbeat_listening(poll_interval_listening, txmonitor): + if ((datetime.now() - last_heartbeat_listening[0]).total_seconds() + < poll_interval_listening): + return True + last_heartbeat_listening[0] = datetime.now() logger = logging.getLogger('ELECTRUMPERSONALSERVER') try: txmonitor.check_for_updated_txes() t@@ -40,7 +51,11 @@ def on_heartbeat_listening(txmonitor): is_node_reachable = False return is_node_reachable -def on_heartbeat_connected(rpc, txmonitor, protocol): +def on_heartbeat_connected(poll_interval_connected, rpc, txmonitor, protocol): + if ((datetime.now() - last_heartbeat_connected[0]).total_seconds() + < poll_interval_connected): + return + last_heartbeat_connected[0] = datetime.now() logger = logging.getLogger('ELECTRUMPERSONALSERVER') is_tip_updated, header = check_for_new_blockchain_tip(rpc, protocol.are_headers_raw) t@@ -90,17 +105,26 @@ def run_electrum_server(rpc, txmonitor, config): logger.debug('using cert: {}, key: {}'.format(certfile, keyfile)) disable_mempool_fee_histogram = config.getboolean("electrum-server", "disable_mempool_fee_histogram", fallback=False) + mempool_update_interval = int(config.get("bitcoin-rpc", + "mempool_update_interval", fallback=60)) broadcast_method = config.get("electrum-server", "broadcast_method", fallback="own-node") tor_host = config.get("electrum-server", "tor_host", fallback="localhost") tor_port = int(config.get("electrum-server", "tor_port", fallback="9050")) tor_hostport = (tor_host, tor_port) + mempool_sync = MempoolSync(rpc, + disable_mempool_fee_histogram, mempool_update_interval) + mempool_sync.initial_sync(logger) + protocol = ElectrumProtocol(rpc, txmonitor, logger, broadcast_method, - tor_hostport, disable_mempool_fee_histogram) + tor_hostport, mempool_sync) + normal_listening_timeout = min(poll_interval_listening, + mempool_update_interval) + fast_listening_timeout = 0.5 server_sock = create_server_socket(hostport) - server_sock.settimeout(poll_interval_listening) + server_sock.settimeout(normal_listening_timeout) accepting_clients = True while True: # main server loop, runs forever t@@ -121,7 +145,14 @@ def run_electrum_server(rpc, txmonitor, config): certfile=certfile, keyfile=keyfile, ssl_version=ssl.PROTOCOL_SSLv23) except socket.timeout: - is_node_reachable = on_heartbeat_listening(txmonitor) + poll_interval_change = mempool_sync.poll_update(1) + if poll_interval_change == PollIntervalChange.FAST_POLLING: + server_sock.settimeout(fast_listening_timeout) + elif poll_interval_change == PollIntervalChange.NORMAL_POLLING: + server_sock.settimeout(normal_listening_timeout) + + is_node_reachable = on_heartbeat_listening( + poll_interval_listening, txmonitor) accepting_clients = is_node_reachable except (ConnectionRefusedError, ssl.SSLError, IOError): sock.close() t@@ -135,7 +166,10 @@ def run_electrum_server(rpc, txmonitor, config): protocol.set_send_reply_fun(send_reply_fun) try: - sock.settimeout(poll_interval_connected) + normal_connected_timeout = min(poll_interval_connected, + mempool_update_interval) + fast_connected_timeout = 0.5 + sock.settimeout(normal_connected_timeout) recv_buffer = bytearray() while True: # loop for replying to client queries t@@ -159,7 +193,15 @@ def run_electrum_server(rpc, txmonitor, config): logger.debug("=> " + line) protocol.handle_query(query) except socket.timeout: - on_heartbeat_connected(rpc, txmonitor, protocol) + poll_interval_change = mempool_sync.poll_update(1) + if poll_interval_change == PollIntervalChange.FAST_POLLING: + sock.settimeout(fast_connected_timeout) + elif (poll_interval_change + == PollIntervalChange.NORMAL_POLLING): + sock.settimeout(normal_connected_timeout) + + on_heartbeat_connected(poll_interval_connected, rpc, + txmonitor, protocol) except JsonRpcError as e: logger.debug("Error with node connection, e = " + repr(e) + "\ntraceback = " + str(traceback.format_exc())) t@@ -454,14 +496,14 @@ def main(): def search_for_block_height_of_date(datestr, rpc): logger = logging.getLogger('ELECTRUMPERSONALSERVER') - target_time = datetime.datetime.strptime(datestr, "%d/%m/%Y") + target_time = datetime.strptime(datestr, "%d/%m/%Y") bestblockhash = rpc.call("getbestblockhash", []) best_head = rpc.call("getblockheader", [bestblockhash]) - if target_time > datetime.datetime.fromtimestamp(best_head["time"]): + if target_time > datetime.fromtimestamp(best_head["time"]): logger.error("date in the future") return -1 genesis_block = rpc.call("getblockheader", [rpc.call("getblockhash", [0])]) - if target_time < datetime.datetime.fromtimestamp(genesis_block["time"]): + if target_time < datetime.fromtimestamp(genesis_block["time"]): logger.warning("date is before the creation of bitcoin") return 0 first_height = 0 t@@ -469,7 +511,7 @@ def search_for_block_height_of_date(datestr, rpc): while True: m = (first_height + last_height) // 2 m_header = rpc.call("getblockheader", [rpc.call("getblockhash", [m])]) - m_header_time = datetime.datetime.fromtimestamp(m_header["time"]) + m_header_time = datetime.fromtimestamp(m_header["time"]) m_time_diff = (m_header_time - target_time).total_seconds() if abs(m_time_diff) < 60*60*2: #2 hours return m_header["height"] DIR diff --git a/electrumpersonalserver/server/electrumprotocol.py b/electrumpersonalserver/server/electrumprotocol.py t@@ -134,19 +134,18 @@ class ElectrumProtocol(object): """ def __init__(self, rpc, txmonitor, logger, broadcast_method, - tor_hostport, disable_mempool_fee_histogram): + tor_hostport, mempool_sync): self.rpc = rpc self.txmonitor = txmonitor self.logger = logger self.broadcast_method = broadcast_method self.tor_hostport = tor_hostport - self.disable_mempool_fee_histogram = disable_mempool_fee_histogram + self.mempool_sync = mempool_sync self.protocol_version = 0 self.subscribed_to_headers = False self.are_headers_raw = False self.txid_blockhash_map = {} - self.printed_slow_mempool_warning = False def set_send_reply_fun(self, send_reply_fun): self.send_reply_fun = send_reply_fun t@@ -379,43 +378,9 @@ class ElectrumProtocol(object): else: self._send_error(query["id"], error) elif method == "mempool.get_fee_histogram": - if self.disable_mempool_fee_histogram: - result = [[0, 0]] - self.logger.debug("fee histogram disabled, sending back empty " - + "mempool") - else: - st = time.time() - mempool = self.rpc.call("getrawmempool", [True]) - et = time.time() - MEMPOOL_WARNING_DURATION = 10 #seconds - if et - st > MEMPOOL_WARNING_DURATION: - if not self.printed_slow_mempool_warning: - self.logger.warning("Mempool very large resulting in" - + " slow response by server (" - + str(round(et-st, 1)) + "sec). Consider setting " - + "`disable_mempool_fee_histogram = true`") - self.printed_slow_mempool_warning = True - #algorithm copied from the relevant place in ElectrumX - #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py - fee_hist = defaultdict(int) - for txid, details in mempool.items(): - size = (details["size"] if "size" in details else - details["vsize"]) - fee_rate = 1e8*details["fee"] // size - fee_hist[fee_rate] += size - l = list(reversed(sorted(fee_hist.items()))) - out = [] - size = 0 - r = 0 - binsize = 100000 - for fee, s in l: - size += s - if size + r > binsize: - out.append((fee, size)) - r += size - binsize - size = 0 - binsize *= 1.1 - result = out + result = self.mempool_sync.get_fee_histogram() + self.logger.debug("mempool entry count = " + + str(len(self.mempool_sync.mempool))) self._send_response(query, result) elif method == "blockchain.estimatefee": estimate = self.rpc.call("estimatesmartfee", [query["params"][0]]) DIR diff --git a/electrumpersonalserver/server/mempoolhistogram.py b/electrumpersonalserver/server/mempoolhistogram.py t@@ -0,0 +1,114 @@ + +import time +from collections import defaultdict +from datetime import datetime +from enum import Enum + +from electrumpersonalserver.server.jsonrpc import JsonRpcError + +def calc_histogram(mempool): + #algorithm copied from the relevant place in ElectrumX + #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py + fee_hist = defaultdict(int) + for fee_rate, size in mempool.values(): + fee_hist[fee_rate] += size + l = list(reversed(sorted(fee_hist.items()))) + out = [] + size = 0 + r = 0 + binsize = 100000 + for fee, s in l: + size += s + if size + r > binsize: + out.append((fee, size)) + r += size - binsize + size = 0 + binsize *= 1.1 + return out + +class PollIntervalChange(Enum): + UNCHANGED = "unchanged" + FAST_POLLING = "fastpolling" + NORMAL_POLLING = "normalpolling" + +class MempoolSync(object): + def __init__(self, rpc, disabled, polling_interval): + self.rpc = rpc + self.disabled = disabled + self.polling_interval = polling_interval + self.mempool = dict() + self.cached_fee_histogram = [[0, 0]] + self.added_txids = None + self.last_poll = None + self.state = "gettxids" + + def set_polling_interval(self, polling_interval): + self.polling_interval = polling_interval + + def get_fee_histogram(self): + return self.cached_fee_histogram + + def initial_sync(self, logger): + if self.disabled: + return + logger.info("Synchronizing mempool . . .") + st = time.time() + for _ in range(2): + self.poll_update(-1) + self.state = "gettxids" + for _ in range(2): + self.poll_update(-1) + #run once for the getrawmempool + #again for the getmempoolentry + #and all that again because the first time will take so long + # that new txes could arrive in that time + et = time.time() + logger.info("Found " + str(len(self.mempool)) + " mempool entries. " + + "Synchronized mempool in " + str(et - st) + "sec") + + #-1 for no timeout + def poll_update(self, timeout): + poll_interval_change = PollIntervalChange.UNCHANGED + if self.disabled: + return poll_interval_change + if self.state == "waiting": + if ((datetime.now() - self.last_poll).total_seconds() + > self.polling_interval): + poll_interval_change = PollIntervalChange.FAST_POLLING + self.state = "gettxids" + elif self.state == "gettxids": + mempool_txids = self.rpc.call("getrawmempool", []) + self.last_poll = datetime.now() + mempool_txids = set(mempool_txids) + + removed_txids = set(self.mempool.keys()).difference(mempool_txids) + self.added_txids = iter(mempool_txids.difference( + set(self.mempool.keys()))) + + for txid in removed_txids: + del self.mempool[txid] + + self.state = "getfeerates" + elif self.state == "getfeerates": + if timeout == -1: + timeout = 2**32 + start_time = datetime.now() + while self.state != "waiting" and ((datetime.now() - start_time + ).total_seconds() < timeout): + try: + txid = next(self.added_txids) + except StopIteration: + self.cached_fee_histogram = calc_histogram(self.mempool) + self.state = "waiting" + poll_interval_change = \ + PollIntervalChange.NORMAL_POLLING + self.last_poll = datetime.now() + continue + try: + mempool_tx = self.rpc.call("getmempoolentry", [txid]) + except JsonRpcError: + continue + fee_rate = 1e8*mempool_tx["fee"] // mempool_tx["vsize"] + self.mempool[txid] = (fee_rate, mempool_tx["vsize"]) + + return poll_interval_change