URI: 
       tMove getnodeaddresses RPC calls to main thread - electrum-personal-server - Maximally lightweight electrum server for a single user
  HTML git clone https://git.parazyd.org/electrum-personal-server
   DIR Log
   DIR Files
   DIR Refs
   DIR README
       ---
   DIR commit dd9695403c88ae5e4b54ef682564a315203caf2d
   DIR parent fe44aa79ca54d3a1bf805e1ea203f9cb07ab9b14
  HTML Author: chris-belcher <chris-belcher@users.noreply.github.com>
       Date:   Mon, 24 Jun 2019 00:12:34 +0100
       
       Move getnodeaddresses RPC calls to main thread
       
       This avoids a RPC concurrency issue arising from having
       8 threads connect to the node RPC port at once
       
       Diffstat:
         M electrumpersonalserver/server/comm… |      15 +++++----------
         M electrumpersonalserver/server/peer… |      64 ++++++++++++++++++++++---------
       
       2 files changed, 50 insertions(+), 29 deletions(-)
       ---
   DIR diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py
       t@@ -5,14 +5,13 @@ import traceback, sys, platform
        from ipaddress import ip_network, ip_address
        import logging
        import tempfile
       -import threading
        
        from electrumpersonalserver.server.jsonrpc import JsonRpc, JsonRpcError
        import electrumpersonalserver.server.hashes as hashes
        import electrumpersonalserver.server.merkleproof as merkleproof
        import electrumpersonalserver.server.deterministicwallet as deterministicwallet
        import electrumpersonalserver.server.transactionmonitor as transactionmonitor
       -import electrumpersonalserver.server.peertopeer as p2p
       +import electrumpersonalserver.server.peertopeer as peertopeer
        
        SERVER_VERSION_NUMBER = "0.1.7"
        
       t@@ -247,21 +246,17 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram,
                            try:
                                rpc.call("sendrawtransaction", [txhex])
                            except JsonRpcError as e:
       -                        pass
       +                        logger.error("Error broadcasting: " + repr(e))
                    elif broadcast_method == "tor":
       -                TOR_CONNECTIONS = 8
                        network = "mainnet"
                        chaininfo = rpc.call("getblockchaininfo", [])
                        if chaininfo["chain"] == "test":
                            network = "testnet"
                        elif chaininfo["chain"] == "regtest":
                            network = "regtest"
       -                for i in range(TOR_CONNECTIONS):
       -                    t = threading.Thread(target=p2p.tor_broadcast_tx,
       -                        args=(txhex, tor_hostport, network, rpc, logger),
       -                        daemon=True)
       -                    t.start()
       -                    time.sleep(0.1)
       +                logger.debug("broadcasting to network: " + network)
       +                peertopeer.tor_broadcast_tx(txhex, tor_hostport, network, rpc,
       +                    logger)
                    elif broadcast_method.startswith("system "):
                        with tempfile.NamedTemporaryFile() as fd:
                            system_line = broadcast_method[7:].replace("%s", fd.name)
   DIR diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py
       t@@ -2,6 +2,7 @@
        
        import socket, time
        import base64
       +import threading
        from struct import pack, unpack
        from datetime import datetime
        
       t@@ -13,8 +14,6 @@ from electrumpersonalserver.server.socks import (
        )
        from electrumpersonalserver.server.jsonrpc import JsonRpcError
        
       -import logging
       -
        PROTOCOL_VERSION = 70012
        DEFAULT_USER_AGENT = '/Satoshi:0.18.0/'
        NODE_WITNESS = (1 << 3)
       t@@ -187,9 +186,9 @@ class P2PProtocol(object):
                                   + pack('<I', start_height)
                                   + b'\x01')
        
       -        self.logger.debug('Connecting to bitcoin peer (magic=' +
       -                hex(self.magic) + ') at ' + str(self.remote_hostport) +
       -                ' with proxy ' + str(self.socks5_hostport))
       +        self.logger.debug('Connecting to bitcoin peer at ' +
       +                str(self.remote_hostport) + ' with proxy ' +
       +                str(self.socks5_hostport))
                setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
                                self.socks5_hostport[1], True)
                self.sock = socksocket()
       t@@ -325,23 +324,15 @@ class P2PBroadcastTx(P2PMessageHandler):
                        hash_id = payload[ptr[0] : ptr[0] + 32]
                        ptr[0] += 32
                        if hash_id == self.txid:
       -                    self.logger.info("Uploading tx to " +
       -                        str(p2p.remote_hostport))
                            p2p.sock.sendall(p2p.create_message('tx', self.txhex))
                            self.uploaded_tx = True
       +                    self.logger.info("Uploaded transaction via tor to peer at "
       +                        + str(p2p.remote_hostport))
                            p2p.close()
        
       -def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
       -    ATTEMPTS = 8 # how many times to search for a node that accepts txes
       -    try:
       -        node_addrs = rpc.call("getnodeaddresses", [ATTEMPTS])
       -    except JsonRpcError as e:
       -        logger.debug(repr(e))
       -        logger.error("BitcoinCore v0.18.0 is required to broadcast through Tor")
       -        return False
       -    node_addrs = [a for a in node_addrs if a["services"] & NODE_WITNESS]
       -    for i in range(len(node_addrs)):
       -        remote_hostport = (node_addrs[i]["address"], node_addrs[i]["port"])
       +def broadcaster_thread(txhex, node_addrs, tor_hostport, network, rpc, logger):
       +    for node_addr in node_addrs:
       +        remote_hostport = (node_addr["address"], node_addr["port"])
                p2p_msg_handler = P2PBroadcastTx(txhex, logger)
                p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport,
                    network=network, logger=logger, socks5_hostport=tor_hostport,
       t@@ -349,7 +340,7 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
                try:
                    p2p.run()
                except IOError as e:
       -            logger.debug("p2p.run(): " + repr(e))
       +            logger.debug("p2p.run() exited: " + repr(e))
                    continue
                if p2p_msg_handler.uploaded_tx:
                    break
       t@@ -358,3 +349,38 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
            # return false if never found a node that accepted unconfirms
            return p2p_msg_handler.uploaded_tx
        
       +def chunk_list(d, n):
       +    return [d[x:x + n] for x in range(0, len(d), n)]
       +
       +def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
       +    CONNECTION_THREADS = 8
       +    CONNECTION_ATTEMPTS_PER_THREAD = 10
       +
       +    required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS
       +    node_addrs_witness = []
       +    while True:
       +        try:
       +            new_node_addrs = rpc.call("getnodeaddresses",
       +                [required_address_count])
       +        except JsonRpcError as e:
       +            logger.debug(repr(e))
       +            logger.error("Bitcoin Core v0.18.0 or higher is required "
       +                "to broadcast through Tor")
       +            return False
       +        node_addrs_witness.extend(
       +            [a for a in new_node_addrs if a["services"] & NODE_WITNESS]
       +        )
       +        logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) +
       +            " len(node_addrs_witness) = " + str(len(node_addrs_witness)))
       +        if len(node_addrs_witness) > required_address_count:
       +            break
       +    node_addrs_chunks = chunk_list(
       +        node_addrs_witness[:required_address_count],
       +        CONNECTION_ATTEMPTS_PER_THREAD
       +    )
       +    for node_addrs in node_addrs_chunks:
       +        t = threading.Thread(target=broadcaster_thread,
       +            args=(txhex, node_addrs, tor_hostport, network, rpc, logger),
       +            daemon=True)
       +        t.start()
       +