URI: 
       tImplement responsive mempool sync - electrum-personal-server - Maximally lightweight electrum server for a single user
  HTML git clone https://git.parazyd.org/electrum-personal-server
   DIR Log
   DIR Files
   DIR Refs
   DIR README
       ---
   DIR commit 136b95767a1519c7025f384ea0200452613773eb
   DIR parent fcbd3ce5fc3573e14dcebac3ab7ac44737a95d7a
  HTML Author: chris-belcher <chris-belcher@users.noreply.github.com>
       Date:   Thu,  4 Mar 2021 15:45:03 +0000
       
       Implement responsive mempool sync
       
       Previously when generating fee histogram required by Electrum, the server would
       use the RPC call `getrawmempool true` which would be very slow during times of
       large mempools, and cause the server to be unresponsive.
       
       This commit instead uses `getrawmempool false` and `getmempoolentry` to obtain
       all the mempool fees. Because the mempool synchronization is split up over
       many different RPC calls, the server can always remain responsive even while
       obtaining the mempool. The typical lag will be at most 1 or 2 seconds.
       
       See issue #96
       
       Diffstat:
         M config.ini_sample                   |       4 ++++
         M electrumpersonalserver/server/__in… |       3 +++
         M electrumpersonalserver/server/comm… |      66 +++++++++++++++++++++++++------
         M electrumpersonalserver/server/elec… |      45 ++++---------------------------
         A electrumpersonalserver/server/memp… |     114 +++++++++++++++++++++++++++++++
       
       5 files changed, 180 insertions(+), 52 deletions(-)
       ---
   DIR diff --git a/config.ini_sample b/config.ini_sample
       t@@ -70,6 +70,10 @@ keyfile = certs/cert.key
        # This is useful on low powered devices at times when the node mempool is large
        disable_mempool_fee_histogram = false
        
       +# How often in seconds to update the mempool
       +# this mempool is used to calculate the mempool fee histogram
       +mempool_update_interval = 60
       +
        # Parameter for broadcasting unconfirmed transactions
        # Options are:
        # * tor-or-own-node  (use tor if tor is running locally, otherwise own-node)
   DIR diff --git a/electrumpersonalserver/server/__init__.py b/electrumpersonalserver/server/__init__.py
       t@@ -46,3 +46,6 @@ from electrumpersonalserver.server.electrumprotocol import (
            get_block_headers_hex,
            DONATION_ADDR,
        )
       +from electrumpersonalserver.server.mempoolhistogram import (
       +    MempoolSync
       +)
   DIR diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py
       t@@ -1,6 +1,6 @@
        import socket
        import time
       -import datetime
       +from datetime import datetime
        import ssl
        import os
        import os.path
       t@@ -26,12 +26,23 @@ from electrumpersonalserver.server.electrumprotocol import (
            get_block_headers_hex,
            DONATION_ADDR,
        )
       +from electrumpersonalserver.server.mempoolhistogram import (
       +    MempoolSync,
       +    PollIntervalChange
       +)
        
        ##python has demented rules for variable scope, so these
        ## global variables are actually mutable lists
        bestblockhash = [None]
        
       -def on_heartbeat_listening(txmonitor):
       +last_heartbeat_listening = [datetime.now()]
       +last_heartbeat_connected = [datetime.now()]
       +
       +def on_heartbeat_listening(poll_interval_listening, txmonitor):
       +    if ((datetime.now() - last_heartbeat_listening[0]).total_seconds()
       +            < poll_interval_listening):
       +        return True
       +    last_heartbeat_listening[0] = datetime.now()
            logger = logging.getLogger('ELECTRUMPERSONALSERVER')
            try:
                txmonitor.check_for_updated_txes()
       t@@ -40,7 +51,11 @@ def on_heartbeat_listening(txmonitor):
                is_node_reachable = False
            return is_node_reachable
        
       -def on_heartbeat_connected(rpc, txmonitor, protocol):
       +def on_heartbeat_connected(poll_interval_connected, rpc, txmonitor, protocol):
       +    if ((datetime.now() - last_heartbeat_connected[0]).total_seconds()
       +            < poll_interval_connected):
       +        return
       +    last_heartbeat_connected[0] = datetime.now()
            logger = logging.getLogger('ELECTRUMPERSONALSERVER')
            is_tip_updated, header = check_for_new_blockchain_tip(rpc,
                protocol.are_headers_raw)
       t@@ -90,17 +105,26 @@ def run_electrum_server(rpc, txmonitor, config):
            logger.debug('using cert: {}, key: {}'.format(certfile, keyfile))
            disable_mempool_fee_histogram = config.getboolean("electrum-server",
                "disable_mempool_fee_histogram", fallback=False)
       +    mempool_update_interval = int(config.get("bitcoin-rpc",
       +        "mempool_update_interval", fallback=60))
            broadcast_method = config.get("electrum-server", "broadcast_method",
                fallback="own-node")
            tor_host = config.get("electrum-server", "tor_host", fallback="localhost")
            tor_port = int(config.get("electrum-server", "tor_port", fallback="9050"))
            tor_hostport = (tor_host, tor_port)
        
       +    mempool_sync = MempoolSync(rpc,
       +        disable_mempool_fee_histogram, mempool_update_interval)
       +    mempool_sync.initial_sync(logger)
       +
            protocol = ElectrumProtocol(rpc, txmonitor, logger, broadcast_method,
       -        tor_hostport, disable_mempool_fee_histogram)
       +        tor_hostport, mempool_sync)
        
       +    normal_listening_timeout = min(poll_interval_listening,
       +        mempool_update_interval)
       +    fast_listening_timeout = 0.5
            server_sock = create_server_socket(hostport)
       -    server_sock.settimeout(poll_interval_listening)
       +    server_sock.settimeout(normal_listening_timeout)
            accepting_clients = True
            while True:
                # main server loop, runs forever
       t@@ -121,7 +145,14 @@ def run_electrum_server(rpc, txmonitor, config):
                            certfile=certfile, keyfile=keyfile,
                            ssl_version=ssl.PROTOCOL_SSLv23)
                    except socket.timeout:
       -                is_node_reachable = on_heartbeat_listening(txmonitor)
       +                poll_interval_change = mempool_sync.poll_update(1)
       +                if poll_interval_change == PollIntervalChange.FAST_POLLING:
       +                    server_sock.settimeout(fast_listening_timeout)
       +                elif poll_interval_change == PollIntervalChange.NORMAL_POLLING:
       +                    server_sock.settimeout(normal_listening_timeout)
       +
       +                is_node_reachable = on_heartbeat_listening(
       +                    poll_interval_listening, txmonitor)
                        accepting_clients = is_node_reachable
                    except (ConnectionRefusedError, ssl.SSLError, IOError):
                        sock.close()
       t@@ -135,7 +166,10 @@ def run_electrum_server(rpc, txmonitor, config):
                protocol.set_send_reply_fun(send_reply_fun)
        
                try:
       -            sock.settimeout(poll_interval_connected)
       +            normal_connected_timeout = min(poll_interval_connected,
       +                mempool_update_interval)
       +            fast_connected_timeout = 0.5
       +            sock.settimeout(normal_connected_timeout)
                    recv_buffer = bytearray()
                    while True:
                        # loop for replying to client queries
       t@@ -159,7 +193,15 @@ def run_electrum_server(rpc, txmonitor, config):
                                logger.debug("=> " + line)
                                protocol.handle_query(query)
                        except socket.timeout:
       -                    on_heartbeat_connected(rpc, txmonitor, protocol)
       +                    poll_interval_change = mempool_sync.poll_update(1)
       +                    if poll_interval_change == PollIntervalChange.FAST_POLLING:
       +                        sock.settimeout(fast_connected_timeout)
       +                    elif (poll_interval_change
       +                            == PollIntervalChange.NORMAL_POLLING):
       +                        sock.settimeout(normal_connected_timeout)
       +
       +                    on_heartbeat_connected(poll_interval_connected, rpc,
       +                        txmonitor, protocol)
                except JsonRpcError as e:
                    logger.debug("Error with node connection, e = " + repr(e)
                        + "\ntraceback = " + str(traceback.format_exc()))
       t@@ -454,14 +496,14 @@ def main():
        
        def search_for_block_height_of_date(datestr, rpc):
            logger = logging.getLogger('ELECTRUMPERSONALSERVER')
       -    target_time = datetime.datetime.strptime(datestr, "%d/%m/%Y")
       +    target_time = datetime.strptime(datestr, "%d/%m/%Y")
            bestblockhash = rpc.call("getbestblockhash", [])
            best_head = rpc.call("getblockheader", [bestblockhash])
       -    if target_time > datetime.datetime.fromtimestamp(best_head["time"]):
       +    if target_time > datetime.fromtimestamp(best_head["time"]):
                logger.error("date in the future")
                return -1
            genesis_block = rpc.call("getblockheader", [rpc.call("getblockhash", [0])])
       -    if target_time < datetime.datetime.fromtimestamp(genesis_block["time"]):
       +    if target_time < datetime.fromtimestamp(genesis_block["time"]):
                logger.warning("date is before the creation of bitcoin")
                return 0
            first_height = 0
       t@@ -469,7 +511,7 @@ def search_for_block_height_of_date(datestr, rpc):
            while True:
                m = (first_height + last_height) // 2
                m_header = rpc.call("getblockheader", [rpc.call("getblockhash", [m])])
       -        m_header_time = datetime.datetime.fromtimestamp(m_header["time"])
       +        m_header_time = datetime.fromtimestamp(m_header["time"])
                m_time_diff = (m_header_time - target_time).total_seconds()
                if abs(m_time_diff) < 60*60*2: #2 hours
                    return m_header["height"]
   DIR diff --git a/electrumpersonalserver/server/electrumprotocol.py b/electrumpersonalserver/server/electrumprotocol.py
       t@@ -134,19 +134,18 @@ class ElectrumProtocol(object):
            """
        
            def __init__(self, rpc, txmonitor, logger, broadcast_method,
       -            tor_hostport, disable_mempool_fee_histogram):
       +            tor_hostport, mempool_sync):
                self.rpc = rpc
                self.txmonitor = txmonitor
                self.logger = logger
                self.broadcast_method = broadcast_method
                self.tor_hostport = tor_hostport
       -        self.disable_mempool_fee_histogram = disable_mempool_fee_histogram
       +        self.mempool_sync = mempool_sync
        
                self.protocol_version = 0   
                self.subscribed_to_headers = False
                self.are_headers_raw = False
                self.txid_blockhash_map = {}
       -        self.printed_slow_mempool_warning = False
        
            def set_send_reply_fun(self, send_reply_fun):
                self.send_reply_fun = send_reply_fun
       t@@ -379,43 +378,9 @@ class ElectrumProtocol(object):
                    else:
                        self._send_error(query["id"], error)
                elif method == "mempool.get_fee_histogram":
       -            if self.disable_mempool_fee_histogram:
       -                result = [[0, 0]]
       -                self.logger.debug("fee histogram disabled, sending back empty "
       -                    + "mempool")
       -            else:
       -                st = time.time()
       -                mempool = self.rpc.call("getrawmempool", [True])
       -                et = time.time()
       -                MEMPOOL_WARNING_DURATION = 10 #seconds
       -                if et - st > MEMPOOL_WARNING_DURATION:
       -                    if not self.printed_slow_mempool_warning:
       -                        self.logger.warning("Mempool very large resulting in"
       -                            + " slow response by server ("
       -                            + str(round(et-st, 1)) + "sec). Consider setting "
       -                            + "`disable_mempool_fee_histogram = true`")
       -                    self.printed_slow_mempool_warning = True
       -                #algorithm copied from the relevant place in ElectrumX
       -                #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py
       -                fee_hist = defaultdict(int)
       -                for txid, details in mempool.items():
       -                    size = (details["size"] if "size" in details else
       -                        details["vsize"])
       -                    fee_rate = 1e8*details["fee"] // size
       -                    fee_hist[fee_rate] += size
       -                l = list(reversed(sorted(fee_hist.items())))
       -                out = []
       -                size = 0
       -                r = 0
       -                binsize = 100000
       -                for fee, s in l:
       -                    size += s
       -                    if size + r > binsize:
       -                        out.append((fee, size))
       -                        r += size - binsize
       -                        size = 0
       -                        binsize *= 1.1
       -                result = out
       +            result = self.mempool_sync.get_fee_histogram()
       +            self.logger.debug("mempool entry count = "
       +                + str(len(self.mempool_sync.mempool)))
                    self._send_response(query, result)
                elif method == "blockchain.estimatefee":
                    estimate = self.rpc.call("estimatesmartfee", [query["params"][0]])
   DIR diff --git a/electrumpersonalserver/server/mempoolhistogram.py b/electrumpersonalserver/server/mempoolhistogram.py
       t@@ -0,0 +1,114 @@
       +
       +import time
       +from collections import defaultdict
       +from datetime import datetime
       +from enum import Enum
       +
       +from electrumpersonalserver.server.jsonrpc import JsonRpcError
       +
       +def calc_histogram(mempool):
       +    #algorithm copied from the relevant place in ElectrumX
       +    #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py
       +    fee_hist = defaultdict(int)
       +    for fee_rate, size in mempool.values():
       +        fee_hist[fee_rate] += size
       +    l = list(reversed(sorted(fee_hist.items())))
       +    out = []
       +    size = 0
       +    r = 0
       +    binsize = 100000
       +    for fee, s in l:
       +        size += s
       +        if size + r > binsize:
       +            out.append((fee, size))
       +            r += size - binsize
       +            size = 0
       +            binsize *= 1.1
       +    return out
       +
       +class PollIntervalChange(Enum):
       +    UNCHANGED = "unchanged"
       +    FAST_POLLING = "fastpolling"
       +    NORMAL_POLLING = "normalpolling"
       +
       +class MempoolSync(object):
       +    def __init__(self, rpc, disabled, polling_interval):
       +        self.rpc = rpc
       +        self.disabled = disabled
       +        self.polling_interval = polling_interval
       +        self.mempool = dict()
       +        self.cached_fee_histogram = [[0, 0]]
       +        self.added_txids = None
       +        self.last_poll = None
       +        self.state = "gettxids"
       +
       +    def set_polling_interval(self, polling_interval):
       +        self.polling_interval = polling_interval
       +
       +    def get_fee_histogram(self):
       +        return self.cached_fee_histogram
       +
       +    def initial_sync(self, logger):
       +        if self.disabled:
       +            return
       +        logger.info("Synchronizing mempool . . .")
       +        st = time.time()
       +        for _ in range(2):
       +            self.poll_update(-1)
       +        self.state = "gettxids"
       +        for _ in range(2):
       +            self.poll_update(-1)
       +        #run once for the getrawmempool
       +        #again for the getmempoolentry
       +        #and all that again because the first time will take so long
       +        # that new txes could arrive in that time
       +        et = time.time()
       +        logger.info("Found " + str(len(self.mempool)) + " mempool entries. "
       +            + "Synchronized mempool in " + str(et - st) + "sec")
       +
       +    #-1 for no timeout
       +    def poll_update(self, timeout):
       +        poll_interval_change = PollIntervalChange.UNCHANGED
       +        if self.disabled:
       +            return poll_interval_change
       +        if self.state == "waiting":
       +            if ((datetime.now() - self.last_poll).total_seconds()
       +                    > self.polling_interval):
       +                poll_interval_change = PollIntervalChange.FAST_POLLING
       +                self.state = "gettxids"
       +        elif self.state == "gettxids":
       +            mempool_txids = self.rpc.call("getrawmempool", [])
       +            self.last_poll = datetime.now()
       +            mempool_txids = set(mempool_txids)
       +
       +            removed_txids = set(self.mempool.keys()).difference(mempool_txids)
       +            self.added_txids = iter(mempool_txids.difference(
       +                set(self.mempool.keys())))
       +
       +            for txid in removed_txids:
       +                del self.mempool[txid]
       +
       +            self.state = "getfeerates"
       +        elif self.state == "getfeerates":
       +            if timeout == -1:
       +                timeout = 2**32
       +            start_time = datetime.now()
       +            while self.state != "waiting" and ((datetime.now() - start_time
       +                    ).total_seconds() < timeout):
       +                try:
       +                    txid = next(self.added_txids)
       +                except StopIteration:
       +                    self.cached_fee_histogram = calc_histogram(self.mempool)
       +                    self.state = "waiting"
       +                    poll_interval_change = \
       +                        PollIntervalChange.NORMAL_POLLING
       +                    self.last_poll = datetime.now()
       +                    continue
       +                try:
       +                    mempool_tx = self.rpc.call("getmempoolentry", [txid])
       +                except JsonRpcError:
       +                    continue
       +                fee_rate = 1e8*mempool_tx["fee"] // mempool_tx["vsize"]
       +                self.mempool[txid] = (fee_rate, mempool_tx["vsize"])
       +
       +        return poll_interval_change