tPass same logging instance to tor threads - electrum-personal-server - Maximally lightweight electrum server for a single user HTML git clone https://git.parazyd.org/electrum-personal-server DIR Log DIR Files DIR Refs DIR README --- DIR commit fe44aa79ca54d3a1bf805e1ea203f9cb07ab9b14 DIR parent 90da1ce20cbea9b8ffed1a4c812cde3f7b63b99b HTML Author: chris-belcher <chris-belcher@users.noreply.github.com> Date: Sat, 22 Jun 2019 01:00:50 +0100 Pass same logging instance to tor threads Also move around lines a little bit for clarity. And edit, remove or change logging level for some log messages. Diffstat: M electrumpersonalserver/server/comm… | 5 ++++- M electrumpersonalserver/server/peer… | 109 ++++++++++++++----------------- 2 files changed, 54 insertions(+), 60 deletions(-) --- DIR diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py t@@ -236,6 +236,8 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram, result = txreport["reject-reason"] else: result = txreport["txid"] + logger.info('Broadcasting tx ' + txreport["txid"] + " with " + + "broadcast method: " + broadcast_method) if broadcast_method == "own-node": if not rpc.call("getnetworkinfo", [])["localrelay"]: result = "Broadcast disabled when using blocksonly" t@@ -256,7 +258,8 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram, network = "regtest" for i in range(TOR_CONNECTIONS): t = threading.Thread(target=p2p.tor_broadcast_tx, - args=(txhex, tor_hostport, network, rpc,)) + args=(txhex, tor_hostport, network, rpc, logger), + daemon=True) t.start() time.sleep(0.1) elif broadcast_method.startswith("system "): DIR diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py t@@ -13,7 +13,7 @@ from electrumpersonalserver.server.socks import ( ) from electrumpersonalserver.server.jsonrpc import JsonRpcError -import logging as log +import logging PROTOCOL_VERSION = 70012 DEFAULT_USER_AGENT = '/Satoshi:0.18.0/' t@@ -35,17 +35,14 @@ def ip_to_hex(ip_str): # ipv4 only for now return socket.inet_pton(socket.AF_INET, ip_str) - def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki services = 0 hex = bytes(10) + b'\xFF\xFF' + hexip return pack('<Q16s', services, hex) + pack('>H', port) - def create_var_str(s): return btc.num_to_var_int(len(s)) + s.encode() - def read_int(ptr, payload, n, littleendian=True): data = payload[ptr[0] : ptr[0]+n] if littleendian: t@@ -54,7 +51,6 @@ def read_int(ptr, payload, n, littleendian=True): ptr[0] += n return ret - def read_var_int(ptr, payload): val = payload[ptr[0]] ptr[0] += 1 t@@ -62,14 +58,12 @@ def read_var_int(ptr, payload): return val return read_int(ptr, payload, 2**(val - 252)) - def read_var_str(ptr, payload): l = read_var_int(ptr, payload) ret = payload[ptr[0]: ptr[0] + l] ptr[0] += l return ret - def ip_hex_to_str(ip_hex): # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat t@@ -81,26 +75,24 @@ def ip_hex_to_str(ip_hex): else: return socket.inet_ntop(socket.AF_INET6, ip_hex) - class P2PMessageHandler(object): - def __init__(self): + def __init__(self, logger): self.last_message = datetime.now() self.waiting_for_keepalive = False - self.log = (log if log else - log.getLogger('ELECTRUMPERSONALSERVER')) + self.logger = logger def check_keepalive(self, p2p): if self.waiting_for_keepalive: if ((datetime.now() - self.last_message).total_seconds() < KEEPALIVE_TIMEOUT): return - log.info('keepalive timed out, closing') + self.logger.info('keepalive timed out, closing') p2p.sock.close() else: if ((datetime.now() - self.last_message).total_seconds() < KEEPALIVE_INTERVAL): return - log.debug('sending keepalive to peer') + self.logger.debug('sending keepalive to peer') self.waiting_for_keepalive = True p2p.sock.sendall(p2p.create_message('ping', '\x00'*8)) t@@ -128,7 +120,8 @@ class P2PMessageHandler(object): else: # must check node accepts unconfirmed txes before broadcasting relay = True - log.debug(('peer version message: version=%d services=0x%x' + self.logger.debug(('Received peer version message: version=%d' + + ' services=0x%x' + ' timestamp=%s user_agent=%s start_height=%d relay=%i' + ' them=%s:%d us=%s:%d') % (version, services, str(datetime.fromtimestamp(timestamp)), t@@ -160,11 +153,10 @@ class P2PMessageHandler(object): class P2PProtocol(object): def __init__(self, p2p_message_handler, remote_hostport, - network, user_agent=DEFAULT_USER_AGENT, + network, logger, user_agent=DEFAULT_USER_AGENT, socks5_hostport=("localhost", 9050), connect_timeout=30, heartbeat_interval=15): - self.log = (log if log else - log.getLogger('ELECTRUMPERSONALSERVER')) + self.logger = logger self.p2p_message_handler = p2p_message_handler self.network = network self.user_agent = user_agent t@@ -177,9 +169,7 @@ class P2PProtocol(object): self.magic = 0xdab5bffa else: self.magic = 0xd9b4bef9 - self.closed = False - self.remote_hostport = remote_hostport def run(self): t@@ -196,24 +186,18 @@ class P2PProtocol(object): + create_var_str(self.user_agent) + pack('<I', start_height) + b'\x01') - data = self.create_message('version', version_message) - while True: - log.info('connecting to bitcoin peer (magic=' + hex(self.magic) - + ') at ' + str(self.remote_hostport) + ' with proxy ' + - str(self.socks5_hostport)) - if self.socks5_hostport is None: - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) - else: - setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0], - self.socks5_hostport[1], True) - self.sock = socksocket() - self.sock.settimeout(self.connect_timeout) - self.sock.connect(self.remote_hostport) - self.sock.sendall(data) - break - log.info('connected') + self.logger.debug('Connecting to bitcoin peer (magic=' + + hex(self.magic) + ') at ' + str(self.remote_hostport) + + ' with proxy ' + str(self.socks5_hostport)) + setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0], + self.socks5_hostport[1], True) + self.sock = socksocket() + self.sock.settimeout(self.connect_timeout) + self.sock.connect(self.remote_hostport) + self.sock.sendall(self.create_message('version', version_message)) + + self.logger.debug('Connected to bitcoin peer') self.sock.settimeout(self.heartbeat_interval) self.closed = False try: t@@ -239,7 +223,8 @@ class P2PProtocol(object): unpack('<I12sI4s', recv_buffer[:HEADER_LENGTH]) recv_buffer = recv_buffer[HEADER_LENGTH:] if net_magic != self.magic: - log.debug('wrong MAGIC: ' + hex(net_magic)) + self.logger.debug('wrong MAGIC: ' + + hex(net_magic)) self.sock.close() break command = command.strip(b'\0') t@@ -252,7 +237,8 @@ class P2PProtocol(object): self.p2p_message_handler.handle_message( self, command, payload_length, payload) else: - log.debug("wrong checksum, dropping " + + self.logger.debug("wrong checksum, " + + "dropping " + "message, cmd=" + command + " payloadlen=" + str(payload_length)) payload_length = -1 t@@ -266,7 +252,7 @@ class P2PProtocol(object): self.closed = True except IOError as e: import traceback - log.debug("logging traceback from %s: \n" % + self.logger.debug("logging traceback from %s: \n" % traceback.format_exc()) self.closed = True finally: t@@ -283,11 +269,10 @@ class P2PProtocol(object): + btc.bin_dbl_sha256(payload)[:4] + payload) class P2PBroadcastTx(P2PMessageHandler): - def __init__(self, txhex): - P2PMessageHandler.__init__(self) + def __init__(self, txhex, logger): + P2PMessageHandler.__init__(self, logger) self.txhex = bytes.fromhex(txhex) self.txid = btc.bin_txhash(self.txhex) - log.debug('broadcasting txid ' + str(self.txid)) self.uploaded_tx = False self.time_marker = datetime.now() self.connected = False t@@ -296,15 +281,15 @@ class P2PBroadcastTx(P2PMessageHandler): addr_recv_services, addr_recv_ip, addr_trans_services, addr_trans_ip, addr_trans_port, user_agent, start_height, relay): if not relay: - log.debug('peer not accepting unconfirmed txes, trying another') + self.logger.debug('peer not accepting unconfirmed txes, trying ' + + 'another') # this happens if the other node is using blockonly=1 p2p.close() if not services & NODE_WITNESS: - log.debug('peer not accepting witness data, trying another') + self.logger.debug('peer not accepting witness data, trying another') p2p.close() def on_connected(self, p2p): - log.debug('sending inv') MSG = 1 #msg_tx inv_payload = pack('<BI', 1, MSG) + self.txid p2p.sock.sendall(p2p.create_message('inv', inv_payload)) t@@ -313,19 +298,20 @@ class P2PBroadcastTx(P2PMessageHandler): self.connected = True def on_heartbeat(self, p2p): - log.debug('broadcaster heartbeat') + self.logger.debug('broadcaster heartbeat') VERACK_TIMEOUT = 40 GETDATA_TIMEOUT = 60 if not self.connected: if ((datetime.now() - self.time_marker).total_seconds() < VERACK_TIMEOUT): return - log.debug('timed out of waiting for verack') + self.logger.debug('timed out of waiting for verack') else: if ((datetime.now() - self.time_marker).total_seconds() < GETDATA_TIMEOUT): return - log.debug('timed out of waiting for getdata, node already has tx') + self.logger.debug('timed out in waiting for getdata, node ' + + 'already has tx') self.uploaded_tx = True p2p.close() t@@ -338,32 +324,37 @@ class P2PBroadcastTx(P2PMessageHandler): ptr[0] += 4 hash_id = payload[ptr[0] : ptr[0] + 32] ptr[0] += 32 - log.debug('hashid=' + hash_id.hex()) if hash_id == self.txid: - log.debug('uploading tx') + self.logger.info("Uploading tx to " + + str(p2p.remote_hostport)) p2p.sock.sendall(p2p.create_message('tx', self.txhex)) self.uploaded_tx = True p2p.close() -def tor_broadcast_tx(txhex, tor_hostport, network, rpc): +def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger): ATTEMPTS = 8 # how many times to search for a node that accepts txes try: node_addrs = rpc.call("getnodeaddresses", [ATTEMPTS]) - except JsonRpcError: - log.error("BitcoinCore v0.18.0 must be used to broadcast through Tor") + except JsonRpcError as e: + logger.debug(repr(e)) + logger.error("BitcoinCore v0.18.0 is required to broadcast through Tor") return False node_addrs = [a for a in node_addrs if a["services"] & NODE_WITNESS] for i in range(len(node_addrs)): remote_hostport = (node_addrs[i]["address"], node_addrs[i]["port"]) - p2p_msg_handler = P2PBroadcastTx(txhex) + p2p_msg_handler = P2PBroadcastTx(txhex, logger) p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport, - network=network, socks5_hostport=tor_hostport, + network=network, logger=logger, socks5_hostport=tor_hostport, heartbeat_interval=20) try: p2p.run() - except IOError: + except IOError as e: + logger.debug("p2p.run(): " + repr(e)) continue - log.debug('uploaded={}'.format(p2p_msg_handler.uploaded_tx)) if p2p_msg_handler.uploaded_tx: - return True - return False # never find a node that accepted unconfirms + break + logger.debug("Exiting tor broadcast thread, uploaded_tx = " + + str(p2p_msg_handler.uploaded_tx)) + # return false if never found a node that accepted unconfirms + return p2p_msg_handler.uploaded_tx +