tImplement blockchain.headers.subscribe - obelisk - Electrum server using libbitcoin as its backend HTML git clone https://git.parazyd.org/obelisk DIR Log DIR Files DIR Refs DIR README DIR LICENSE --- DIR commit b6b8e2b59c752401bbae754be7a380cbd203255c DIR parent bd0f5497d09c336ca58688672f4b941c5ac9c688 HTML Author: parazyd <parazyd@dyne.org> Date: Thu, 8 Apr 2021 14:50:40 +0200 Implement blockchain.headers.subscribe Diffstat: M electrumobelisk/protocol.py | 90 ++++++++++++++++++++++--------- M electrumobelisk/util.py | 15 +++++++++++++++ M electrumobelisk/zeromq.py | 25 ++++++++++++++++++++++++- 3 files changed, 104 insertions(+), 26 deletions(-) --- DIR diff --git a/electrumobelisk/protocol.py b/electrumobelisk/protocol.py t@@ -24,6 +24,7 @@ from binascii import unhexlify from electrumobelisk.hashes import double_sha256, hash_to_hex_str from electrumobelisk.merkle import merkle_branch from electrumobelisk.util import ( + block_to_header, is_boolean, is_hash256_str, is_hex_str, t@@ -60,6 +61,9 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 self.version_called = False # Consider renaming bx to something else self.bx = Client(log, endpoints, self.loop) + self.block_queue = None + # TODO: Clean up on client disconnect + self.tasks = [] if chain == "mainnet": self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" t@@ -117,6 +121,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 data = await reader.read(4096) if not data or len(data) == 0: self.log.debug("Received EOF, disconnect") + # TODO: cancel asyncio tasks for this client here? return recv_buf.extend(data) lb = recv_buf.find(b"\n") t@@ -135,6 +140,13 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 self.log.debug("=> " + line) await self.handle_query(writer, query) + async def _send_notification(self, writer, method, params): + """Send JSON-RPC notification to given writer""" + response = {"jsonrpc": "2.0", "method": method, "params": params} + self.log.debug("<= %s", response) + writer.write(json.dumps(response).encode("utf-8") + b"\n") + await writer.drain() + async def _send_response(self, writer, result, nid): """Send successful JSON-RPC response to given writer""" response = {"jsonrpc": "2.0", "result": result, "id": nid} t@@ -169,10 +181,10 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 if not func: self.log.error("Unhandled method %s, query=%s", method, query) return - resp = await func(query) + resp = await func(writer, query) return await self._send_reply(writer, resp, query) - async def blockchain_block_header(self, query): + async def blockchain_block_header(self, writer, query): # pylint: disable=W0613 """Method: blockchain.block.header Return the block header at the given height. """ t@@ -193,7 +205,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return {"error": "request corrupted"} return {"result": safe_hexlify(data)} - async def blockchain_block_headers(self, query): + async def blockchain_block_headers(self, writer, query): # pylint: disable=W0613 """Method: blockchain.block.headers Return a concatenated chunk of block headers from the main chain. """ t@@ -227,7 +239,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 } return {"result": resp} - async def blockchain_estimatefee(self, query): # pylint: disable=W0613 + async def blockchain_estimatefee(self, writer, query): # pylint: disable=W0613 """Method: blockchain.estimatefee Return the estimated transaction fee per kilobyte for a transaction to be confirmed within a certain number of blocks. t@@ -235,13 +247,41 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 # TODO: Help wanted return {"result": -1} - async def blockchain_headers_subscribe(self, query): + async def header_notifier(self, writer): + self.block_queue = asyncio.Queue() + await self.bx.subscribe_to_blocks(self.block_queue) + while True: + # item = (seq, height, block_data) + item = await self.block_queue.get() + if len(item) != 3: + self.log.debug("error: item from block queue len != 3") + continue + + header = block_to_header(item[2]) + params = [{"height": item[1], "hex": safe_hexlify(header)}] + await self._send_notification(writer, + "blockchain.headers.subscribe", + params) + + async def blockchain_headers_subscribe(self, writer, query): # pylint: disable=W0613 """Method: blockchain.headers.subscribe Subscribe to receive block headers when a new block is found. """ - return + # Tip height and header are returned upon request + _ec, height = await self.bx.fetch_last_height() + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return {"error": "internal error"} + _ec, tip_header = await self.bx.fetch_block_header(height) + if _ec and _ec != 0: + self.log.debug("Got error: %s", repr(_ec)) + return {"error": "internal error"} + + self.tasks.append(asyncio.create_task(self.header_notifier(writer))) + ret = {"height": height, "hex": safe_hexlify(tip_header)} + return {"result": ret} - async def blockchain_relayfee(self, query): # pylint: disable=W0613 + async def blockchain_relayfee(self, writer, query): # pylint: disable=W0613 """Method: blockchain.relayfee Return the minimum fee a low-priority transaction must pay in order to be accepted to the daemon’s memory pool. t@@ -249,7 +289,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 # TODO: Help wanted return {"result": 0.00001} - async def blockchain_scripthash_get_balance(self, query): + async def blockchain_scripthash_get_balance(self, writer, query): # pylint: disable=W0613 """Method: blockchain.scripthash.get_balance Return the confirmed and unconfirmed balances of a script hash. """ t@@ -268,38 +308,38 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 ret = {"confirmed": data, "unconfirmed": 0} return {"result": ret} - async def blockchain_scripthash_get_history(self, query): + async def blockchain_scripthash_get_history(self, writer, query): # pylint: disable=W0613 """Method: blockchain.scripthash.get_history Return the confirmed and unconfirmed history of a script hash. """ return - async def blockchain_scripthash_get_mempool(self, query): + async def blockchain_scripthash_get_mempool(self, writer, query): # pylint: disable=W0613 """Method: blockchain.scripthash.get_mempool Return the unconfirmed transactions of a script hash. """ return - async def blockchain_scripthash_listunspent(self, query): + async def blockchain_scripthash_listunspent(self, writer, query): # pylint: disable=W0613 """Method: blockchain.scripthash.listunspent Return an ordered list of UTXOs sent to a script hash. """ return - async def blockchain_scripthash_subscribe(self, query): + async def blockchain_scripthash_subscribe(self, writer, query): # pylint: disable=W0613 """Method: blockchain.scripthash.subscribe Subscribe to a script hash. """ return - async def blockchain_scripthash_unsubscribe(self, query): + async def blockchain_scripthash_unsubscribe(self, writer, query): # pylint: disable=W0613 """Method: blockchain.scripthash.unsubscribe Unsubscribe from a script hash, preventing future notifications if its status changes. """ return - async def blockchain_transaction_broadcast(self, query): + async def blockchain_transaction_broadcast(self, writer, query): # pylint: disable=W0613 """Method: blockchain.transaction.broadcast Broadcast a transaction to the network. """ t@@ -319,7 +359,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 txid = double_sha256(rawtx) return {"result": hash_to_hex_str(txid)} - async def blockchain_transaction_get(self, query): + async def blockchain_transaction_get(self, writer, query): # pylint: disable=W0613 """Method: blockchain.transaction.get Return a raw transaction. """ t@@ -342,7 +382,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 return {"result", safe_hexlify(rawtx)} - async def blockchain_transaction_get_merkle(self, query): + async def blockchain_transaction_get_merkle(self, writer, query): # pylint: disable=W0613 """Method: blockchain.transaction.get_merkle Return the merkle branch to a confirmed transaction given its hash and height. t@@ -374,7 +414,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 } return {"result": res} - async def blockchain_transaction_from_pos(self, query): # pylint: disable=R0911 + async def blockchain_transaction_from_pos(self, writer, query): # pylint: disable=R0911,W0613 """Method: blockchain.transaction.id_from_pos Return a transaction hash and optionally a merkle proof, given a block height and a position in the block. t@@ -409,7 +449,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 branch = merkle_branch(hashes, tx_pos) return {"result": {"tx_hash": txid, "merkle": branch}} - async def mempool_get_fee_histogram(self, query): # pylint: disable=W0613 + async def mempool_get_fee_histogram(self, writer, query): # pylint: disable=W0613 """Method: mempool.get_fee_histogram Return a histogram of the fee rates paid by transactions in the memory pool, weighted by transaction size. t@@ -417,7 +457,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 # TODO: Help wanted return {"result": [[0, 0]]} - async def server_add_peer(self, query): # pylint: disable=W0613 + async def server_add_peer(self, writer, query): # pylint: disable=W0613 """Method: server.add_peer A newly-started server uses this call to get itself into other servers’ peers lists. It should not be used by wallet clients. t@@ -425,19 +465,19 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 # TODO: Help wanted return {"result": False} - async def server_banner(self, query): # pylint: disable=W0613 + async def server_banner(self, writer, query): # pylint: disable=W0613 """Method: server.banner Return a banner to be shown in the Electrum console. """ return {"result": BANNER} - async def server_donation_address(self, query): # pylint: disable=W0613 + async def server_donation_address(self, writer, query): # pylint: disable=W0613 """Method: server.donation_address Return a server donation address. """ return {"result": DONATION_ADDR} - async def server_features(self, query): + async def server_features(self, writer, query): # pylint: disable=W0613 """Method: server.features Return a list of features and services supported by the server. """ t@@ -461,7 +501,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 } } - async def server_peers_subscribe(self, query): # pylint: disable=W0613 + async def server_peers_subscribe(self, writer, query): # pylint: disable=W0613 """Method: server.peers.subscribe Return a list of peer servers. Despite the name this is not a subscription and the server must send no notifications. t@@ -469,7 +509,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 # TODO: Help wanted return {"result": []} - async def server_ping(self, query): # pylint: disable=W0613 + async def server_ping(self, writer, query): # pylint: disable=W0613 """Method: server.ping Ping the server to ensure it is responding, and to keep the session alive. The server may disconnect clients that have sent no requests t@@ -477,7 +517,7 @@ class ElectrumProtocol(asyncio.Protocol): # pylint: disable=R0904,R0902 """ return {"result": None} - async def server_version(self, query): + async def server_version(self, writer, query): # pylint: disable=W0613 """Method: server.version Identify the client to the server and negotiate the protocol version. Only the first server.version() message is accepted. DIR diff --git a/electrumobelisk/util.py b/electrumobelisk/util.py t@@ -72,3 +72,18 @@ def bh2u(val): '01020A' """ return val.hex() + + +def block_to_header(block): + """Return block header from raw block""" + if not isinstance(block, (bytes, bytearray)): + raise ValueError("block is not of type bytes/bytearray") + # TODO: check endianness + block_header = block[:80] + # version = block_header[:4] + # prev_merkle_root = block_header[4:36] + # merkle_root = block_header[36:68] + # timestamp = block_header[68:72] + # bits = block_header[72:76] + # nonce = block_header[76:80] + return block_header DIR diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py t@@ -205,7 +205,7 @@ class RequestCollection: self._handle_response(response) else: print( - f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}" + f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}" # pylint: disable=C0301 ) def _handle_response(self, response): t@@ -294,6 +294,14 @@ class Client: assert response.request_id == request.id_ return response.error_code, response.data + async def fetch_last_height(self): + """Fetch the blockchain tip and return integer height""" + command = b"blockchain.fetch_last_height" + error_code, data = await self._simple_request(command, b"") + if error_code: + return error_code, None + return error_code, struct.unpack("<I", data)[0] + async def fetch_block_header(self, index): """Fetch a block header by its height or integer index""" command = b"blockchain.fetch_block_header" t@@ -373,6 +381,21 @@ class Client: return error_code, functools.reduce( lambda accumulator, point: accumulator + point["value"], utxo, 0) + async def subscribe_to_blocks(self, queue): + asyncio.ensure_future(self._listen_for_blocks(queue)) + return queue + + async def _listen_for_blocks(self, queue): + """Infinite loop for block subscription. + Returns raw blocks as they're received. + """ + while True: + frame = await self._block_socket.recv_multipart() + seq = struct.unpack("<H", frame[0])[0] + height = struct.unpack("<I", frame[1])[0] + block_data = frame[2] + queue.put_nowait((seq, height, block_data)) + @staticmethod def __receives_without_spends(history): return (point for point in history if "spent" not in point)