URI: 
       tImplement blockchain.headers.subscribe - obelisk - Electrum server using libbitcoin as its backend
  HTML git clone https://git.parazyd.org/obelisk
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
   DIR commit b6b8e2b59c752401bbae754be7a380cbd203255c
   DIR parent bd0f5497d09c336ca58688672f4b941c5ac9c688
  HTML Author: parazyd <parazyd@dyne.org>
       Date:   Thu,  8 Apr 2021 14:50:40 +0200
       
       Implement blockchain.headers.subscribe
       
       Diffstat:
         M electrumobelisk/protocol.py         |      90 ++++++++++++++++++++++---------
         M electrumobelisk/util.py             |      15 +++++++++++++++
         M electrumobelisk/zeromq.py           |      25 ++++++++++++++++++++++++-
       
       3 files changed, 104 insertions(+), 26 deletions(-)
       ---
   DIR diff --git a/electrumobelisk/protocol.py b/electrumobelisk/protocol.py
       t@@ -24,6 +24,7 @@ from binascii import unhexlify
        from electrumobelisk.hashes import double_sha256, hash_to_hex_str
        from electrumobelisk.merkle import merkle_branch
        from electrumobelisk.util import (
       +    block_to_header,
            is_boolean,
            is_hash256_str,
            is_hex_str,
       t@@ -60,6 +61,9 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                self.version_called = False
                # Consider renaming bx to something else
                self.bx = Client(log, endpoints, self.loop)
       +        self.block_queue = None
       +        # TODO: Clean up on client disconnect
       +        self.tasks = []
        
                if chain == "mainnet":
                    self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
       t@@ -117,6 +121,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                    data = await reader.read(4096)
                    if not data or len(data) == 0:
                        self.log.debug("Received EOF, disconnect")
       +                # TODO: cancel asyncio tasks for this client here?
                        return
                    recv_buf.extend(data)
                    lb = recv_buf.find(b"\n")
       t@@ -135,6 +140,13 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                        self.log.debug("=> " + line)
                        await self.handle_query(writer, query)
        
       +    async def _send_notification(self, writer, method, params):
       +        """Send JSON-RPC notification to given writer"""
       +        response = {"jsonrpc": "2.0", "method": method, "params": params}
       +        self.log.debug("<= %s", response)
       +        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       +        await writer.drain()
       +
            async def _send_response(self, writer, result, nid):
                """Send successful JSON-RPC response to given writer"""
                response = {"jsonrpc": "2.0", "result": result, "id": nid}
       t@@ -169,10 +181,10 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                if not func:
                    self.log.error("Unhandled method %s, query=%s", method, query)
                    return
       -        resp = await func(query)
       +        resp = await func(writer, query)
                return await self._send_reply(writer, resp, query)
        
       -    async def blockchain_block_header(self, query):
       +    async def blockchain_block_header(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.block.header
                Return the block header at the given height.
                """
       t@@ -193,7 +205,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                    return {"error": "request corrupted"}
                return {"result": safe_hexlify(data)}
        
       -    async def blockchain_block_headers(self, query):
       +    async def blockchain_block_headers(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.block.headers
                Return a concatenated chunk of block headers from the main chain.
                """
       t@@ -227,7 +239,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                }
                return {"result": resp}
        
       -    async def blockchain_estimatefee(self, query):  # pylint: disable=W0613
       +    async def blockchain_estimatefee(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.estimatefee
                Return the estimated transaction fee per kilobyte for a transaction
                to be confirmed within a certain number of blocks.
       t@@ -235,13 +247,41 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                # TODO: Help wanted
                return {"result": -1}
        
       -    async def blockchain_headers_subscribe(self, query):
       +    async def header_notifier(self, writer):
       +        self.block_queue = asyncio.Queue()
       +        await self.bx.subscribe_to_blocks(self.block_queue)
       +        while True:
       +            # item = (seq, height, block_data)
       +            item = await self.block_queue.get()
       +            if len(item) != 3:
       +                self.log.debug("error: item from block queue len != 3")
       +                continue
       +
       +            header = block_to_header(item[2])
       +            params = [{"height": item[1], "hex": safe_hexlify(header)}]
       +            await self._send_notification(writer,
       +                                          "blockchain.headers.subscribe",
       +                                          params)
       +
       +    async def blockchain_headers_subscribe(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.headers.subscribe
                Subscribe to receive block headers when a new block is found.
                """
       -        return
       +        # Tip height and header are returned upon request
       +        _ec, height = await self.bx.fetch_last_height()
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return {"error": "internal error"}
       +        _ec, tip_header = await self.bx.fetch_block_header(height)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return {"error": "internal error"}
       +
       +        self.tasks.append(asyncio.create_task(self.header_notifier(writer)))
       +        ret = {"height": height, "hex": safe_hexlify(tip_header)}
       +        return {"result": ret}
        
       -    async def blockchain_relayfee(self, query):  # pylint: disable=W0613
       +    async def blockchain_relayfee(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.relayfee
                Return the minimum fee a low-priority transaction must pay in order
                to be accepted to the daemon’s memory pool.
       t@@ -249,7 +289,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                # TODO: Help wanted
                return {"result": 0.00001}
        
       -    async def blockchain_scripthash_get_balance(self, query):
       +    async def blockchain_scripthash_get_balance(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.scripthash.get_balance
                Return the confirmed and unconfirmed balances of a script hash.
                """
       t@@ -268,38 +308,38 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                ret = {"confirmed": data, "unconfirmed": 0}
                return {"result": ret}
        
       -    async def blockchain_scripthash_get_history(self, query):
       +    async def blockchain_scripthash_get_history(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.scripthash.get_history
                Return the confirmed and unconfirmed history of a script hash.
                """
                return
        
       -    async def blockchain_scripthash_get_mempool(self, query):
       +    async def blockchain_scripthash_get_mempool(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.scripthash.get_mempool
                Return the unconfirmed transactions of a script hash.
                """
                return
        
       -    async def blockchain_scripthash_listunspent(self, query):
       +    async def blockchain_scripthash_listunspent(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.scripthash.listunspent
                Return an ordered list of UTXOs sent to a script hash.
                """
                return
        
       -    async def blockchain_scripthash_subscribe(self, query):
       +    async def blockchain_scripthash_subscribe(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.scripthash.subscribe
                Subscribe to a script hash.
                """
                return
        
       -    async def blockchain_scripthash_unsubscribe(self, query):
       +    async def blockchain_scripthash_unsubscribe(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.scripthash.unsubscribe
                Unsubscribe from a script hash, preventing future notifications
                if its status changes.
                """
                return
        
       -    async def blockchain_transaction_broadcast(self, query):
       +    async def blockchain_transaction_broadcast(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.transaction.broadcast
                Broadcast a transaction to the network.
                """
       t@@ -319,7 +359,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                txid = double_sha256(rawtx)
                return {"result": hash_to_hex_str(txid)}
        
       -    async def blockchain_transaction_get(self, query):
       +    async def blockchain_transaction_get(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.transaction.get
                Return a raw transaction.
                """
       t@@ -342,7 +382,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
        
                return {"result", safe_hexlify(rawtx)}
        
       -    async def blockchain_transaction_get_merkle(self, query):
       +    async def blockchain_transaction_get_merkle(self, writer, query):  # pylint: disable=W0613
                """Method: blockchain.transaction.get_merkle
                Return the merkle branch to a confirmed transaction given its
                hash and height.
       t@@ -374,7 +414,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                }
                return {"result": res}
        
       -    async def blockchain_transaction_from_pos(self, query):  # pylint: disable=R0911
       +    async def blockchain_transaction_from_pos(self, writer, query):  # pylint: disable=R0911,W0613
                """Method: blockchain.transaction.id_from_pos
                Return a transaction hash and optionally a merkle proof, given a
                block height and a position in the block.
       t@@ -409,7 +449,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                branch = merkle_branch(hashes, tx_pos)
                return {"result": {"tx_hash": txid, "merkle": branch}}
        
       -    async def mempool_get_fee_histogram(self, query):  # pylint: disable=W0613
       +    async def mempool_get_fee_histogram(self, writer, query):  # pylint: disable=W0613
                """Method: mempool.get_fee_histogram
                Return a histogram of the fee rates paid by transactions in the
                memory pool, weighted by transaction size.
       t@@ -417,7 +457,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                # TODO: Help wanted
                return {"result": [[0, 0]]}
        
       -    async def server_add_peer(self, query):  # pylint: disable=W0613
       +    async def server_add_peer(self, writer, query):  # pylint: disable=W0613
                """Method: server.add_peer
                A newly-started server uses this call to get itself into other
                servers’ peers lists. It should not be used by wallet clients.
       t@@ -425,19 +465,19 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                # TODO: Help wanted
                return {"result": False}
        
       -    async def server_banner(self, query):  # pylint: disable=W0613
       +    async def server_banner(self, writer, query):  # pylint: disable=W0613
                """Method: server.banner
                Return a banner to be shown in the Electrum console.
                """
                return {"result": BANNER}
        
       -    async def server_donation_address(self, query):  # pylint: disable=W0613
       +    async def server_donation_address(self, writer, query):  # pylint: disable=W0613
                """Method: server.donation_address
                Return a server donation address.
                """
                return {"result": DONATION_ADDR}
        
       -    async def server_features(self, query):
       +    async def server_features(self, writer, query):  # pylint: disable=W0613
                """Method: server.features
                Return a list of features and services supported by the server.
                """
       t@@ -461,7 +501,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                    }
                }
        
       -    async def server_peers_subscribe(self, query):  # pylint: disable=W0613
       +    async def server_peers_subscribe(self, writer, query):  # pylint: disable=W0613
                """Method: server.peers.subscribe
                Return a list of peer servers. Despite the name this is not a
                subscription and the server must send no notifications.
       t@@ -469,7 +509,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                # TODO: Help wanted
                return {"result": []}
        
       -    async def server_ping(self, query):  # pylint: disable=W0613
       +    async def server_ping(self, writer, query):  # pylint: disable=W0613
                """Method: server.ping
                Ping the server to ensure it is responding, and to keep the session
                alive. The server may disconnect clients that have sent no requests
       t@@ -477,7 +517,7 @@ class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
                """
                return {"result": None}
        
       -    async def server_version(self, query):
       +    async def server_version(self, writer, query):  # pylint: disable=W0613
                """Method: server.version
                Identify the client to the server and negotiate the protocol version.
                Only the first server.version() message is accepted.
   DIR diff --git a/electrumobelisk/util.py b/electrumobelisk/util.py
       t@@ -72,3 +72,18 @@ def bh2u(val):
            '01020A'
            """
            return val.hex()
       +
       +
       +def block_to_header(block):
       +    """Return block header from raw block"""
       +    if not isinstance(block, (bytes, bytearray)):
       +        raise ValueError("block is not of type bytes/bytearray")
       +    # TODO: check endianness
       +    block_header = block[:80]
       +    # version = block_header[:4]
       +    # prev_merkle_root = block_header[4:36]
       +    # merkle_root = block_header[36:68]
       +    # timestamp = block_header[68:72]
       +    # bits = block_header[72:76]
       +    # nonce = block_header[76:80]
       +    return block_header
   DIR diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py
       t@@ -205,7 +205,7 @@ class RequestCollection:
                    self._handle_response(response)
                else:
                    print(
       -                f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"
       +                f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"  # pylint: disable=C0301
                    )
        
            def _handle_response(self, response):
       t@@ -294,6 +294,14 @@ class Client:
                assert response.request_id == request.id_
                return response.error_code, response.data
        
       +    async def fetch_last_height(self):
       +        """Fetch the blockchain tip and return integer height"""
       +        command = b"blockchain.fetch_last_height"
       +        error_code, data = await self._simple_request(command, b"")
       +        if error_code:
       +            return error_code, None
       +        return error_code, struct.unpack("<I", data)[0]
       +
            async def fetch_block_header(self, index):
                """Fetch a block header by its height or integer index"""
                command = b"blockchain.fetch_block_header"
       t@@ -373,6 +381,21 @@ class Client:
                return error_code, functools.reduce(
                    lambda accumulator, point: accumulator + point["value"], utxo, 0)
        
       +    async def subscribe_to_blocks(self, queue):
       +        asyncio.ensure_future(self._listen_for_blocks(queue))
       +        return queue
       +
       +    async def _listen_for_blocks(self, queue):
       +        """Infinite loop for block subscription.
       +        Returns raw blocks as they're received.
       +        """
       +        while True:
       +            frame = await self._block_socket.recv_multipart()
       +            seq = struct.unpack("<H", frame[0])[0]
       +            height = struct.unpack("<I", frame[1])[0]
       +            block_data = frame[2]
       +            queue.put_nowait((seq, height, block_data))
       +
            @staticmethod
            def __receives_without_spends(history):
                return (point for point in history if "spent" not in point)