tMove getnodeaddresses RPC calls to main thread - electrum-personal-server - Maximally lightweight electrum server for a single user HTML git clone https://git.parazyd.org/electrum-personal-server DIR Log DIR Files DIR Refs DIR README --- DIR commit dd9695403c88ae5e4b54ef682564a315203caf2d DIR parent fe44aa79ca54d3a1bf805e1ea203f9cb07ab9b14 HTML Author: chris-belcher <chris-belcher@users.noreply.github.com> Date: Mon, 24 Jun 2019 00:12:34 +0100 Move getnodeaddresses RPC calls to main thread This avoids a RPC concurrency issue arising from having 8 threads connect to the node RPC port at once Diffstat: M electrumpersonalserver/server/comm… | 15 +++++---------- M electrumpersonalserver/server/peer… | 64 ++++++++++++++++++++++--------- 2 files changed, 50 insertions(+), 29 deletions(-) --- DIR diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py t@@ -5,14 +5,13 @@ import traceback, sys, platform from ipaddress import ip_network, ip_address import logging import tempfile -import threading from electrumpersonalserver.server.jsonrpc import JsonRpc, JsonRpcError import electrumpersonalserver.server.hashes as hashes import electrumpersonalserver.server.merkleproof as merkleproof import electrumpersonalserver.server.deterministicwallet as deterministicwallet import electrumpersonalserver.server.transactionmonitor as transactionmonitor -import electrumpersonalserver.server.peertopeer as p2p +import electrumpersonalserver.server.peertopeer as peertopeer SERVER_VERSION_NUMBER = "0.1.7" t@@ -247,21 +246,17 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram, try: rpc.call("sendrawtransaction", [txhex]) except JsonRpcError as e: - pass + logger.error("Error broadcasting: " + repr(e)) elif broadcast_method == "tor": - TOR_CONNECTIONS = 8 network = "mainnet" chaininfo = rpc.call("getblockchaininfo", []) if chaininfo["chain"] == "test": network = "testnet" elif chaininfo["chain"] == "regtest": network = "regtest" - for i in range(TOR_CONNECTIONS): - t = threading.Thread(target=p2p.tor_broadcast_tx, - args=(txhex, tor_hostport, network, rpc, logger), - daemon=True) - t.start() - time.sleep(0.1) + logger.debug("broadcasting to network: " + network) + peertopeer.tor_broadcast_tx(txhex, tor_hostport, network, rpc, + logger) elif broadcast_method.startswith("system "): with tempfile.NamedTemporaryFile() as fd: system_line = broadcast_method[7:].replace("%s", fd.name) DIR diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py t@@ -2,6 +2,7 @@ import socket, time import base64 +import threading from struct import pack, unpack from datetime import datetime t@@ -13,8 +14,6 @@ from electrumpersonalserver.server.socks import ( ) from electrumpersonalserver.server.jsonrpc import JsonRpcError -import logging - PROTOCOL_VERSION = 70012 DEFAULT_USER_AGENT = '/Satoshi:0.18.0/' NODE_WITNESS = (1 << 3) t@@ -187,9 +186,9 @@ class P2PProtocol(object): + pack('<I', start_height) + b'\x01') - self.logger.debug('Connecting to bitcoin peer (magic=' + - hex(self.magic) + ') at ' + str(self.remote_hostport) + - ' with proxy ' + str(self.socks5_hostport)) + self.logger.debug('Connecting to bitcoin peer at ' + + str(self.remote_hostport) + ' with proxy ' + + str(self.socks5_hostport)) setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0], self.socks5_hostport[1], True) self.sock = socksocket() t@@ -325,23 +324,15 @@ class P2PBroadcastTx(P2PMessageHandler): hash_id = payload[ptr[0] : ptr[0] + 32] ptr[0] += 32 if hash_id == self.txid: - self.logger.info("Uploading tx to " + - str(p2p.remote_hostport)) p2p.sock.sendall(p2p.create_message('tx', self.txhex)) self.uploaded_tx = True + self.logger.info("Uploaded transaction via tor to peer at " + + str(p2p.remote_hostport)) p2p.close() -def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger): - ATTEMPTS = 8 # how many times to search for a node that accepts txes - try: - node_addrs = rpc.call("getnodeaddresses", [ATTEMPTS]) - except JsonRpcError as e: - logger.debug(repr(e)) - logger.error("BitcoinCore v0.18.0 is required to broadcast through Tor") - return False - node_addrs = [a for a in node_addrs if a["services"] & NODE_WITNESS] - for i in range(len(node_addrs)): - remote_hostport = (node_addrs[i]["address"], node_addrs[i]["port"]) +def broadcaster_thread(txhex, node_addrs, tor_hostport, network, rpc, logger): + for node_addr in node_addrs: + remote_hostport = (node_addr["address"], node_addr["port"]) p2p_msg_handler = P2PBroadcastTx(txhex, logger) p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport, network=network, logger=logger, socks5_hostport=tor_hostport, t@@ -349,7 +340,7 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger): try: p2p.run() except IOError as e: - logger.debug("p2p.run(): " + repr(e)) + logger.debug("p2p.run() exited: " + repr(e)) continue if p2p_msg_handler.uploaded_tx: break t@@ -358,3 +349,38 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger): # return false if never found a node that accepted unconfirms return p2p_msg_handler.uploaded_tx +def chunk_list(d, n): + return [d[x:x + n] for x in range(0, len(d), n)] + +def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger): + CONNECTION_THREADS = 8 + CONNECTION_ATTEMPTS_PER_THREAD = 10 + + required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS + node_addrs_witness = [] + while True: + try: + new_node_addrs = rpc.call("getnodeaddresses", + [required_address_count]) + except JsonRpcError as e: + logger.debug(repr(e)) + logger.error("Bitcoin Core v0.18.0 or higher is required " + "to broadcast through Tor") + return False + node_addrs_witness.extend( + [a for a in new_node_addrs if a["services"] & NODE_WITNESS] + ) + logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) + + " len(node_addrs_witness) = " + str(len(node_addrs_witness))) + if len(node_addrs_witness) > required_address_count: + break + node_addrs_chunks = chunk_list( + node_addrs_witness[:required_address_count], + CONNECTION_ATTEMPTS_PER_THREAD + ) + for node_addrs in node_addrs_chunks: + t = threading.Thread(target=broadcaster_thread, + args=(txhex, node_addrs, tor_hostport, network, rpc, logger), + daemon=True) + t.start() +