tRename electrumobelisk to obelisk. - obelisk - Electrum server using libbitcoin as its backend
  HTML git clone https://git.parazyd.org/obelisk
   DIR Log
   DIR Files
   DIR Refs
   DIR commit a4c9fd4118e22f7318cc7ac59be9cf4ef3f60572
   DIR parent 1007fde5ad844e4ce2c8b517f132f7e45d270000
  HTML Author: parazyd <parazyd@dyne.org>
       Date:   Tue, 13 Apr 2021 00:01:24 +0200
       Rename electrumobelisk to obelisk.
         D electrumobelisk/merkle.py           |      57 -------------------------------
         D electrumobelisk/protocol.py         |     665 -------------------------------
         D electrumobelisk/zeromq.py           |     477 -------------------------------
         D obelisk.py                          |     109 -------------------------------
         A obelisk/__init__.py                 |       0 
         R electrumobelisk/errors.py -> obeli… |       0 
         R electrumobelisk/libbitcoin_errors.… |       0 
         A obelisk/merkle.py                   |      57 +++++++++++++++++++++++++++++++
         A obelisk/obelisk                     |       2 ++
         A obelisk/protocol.py                 |     665 +++++++++++++++++++++++++++++++
         R electrumobelisk/util.py -> obelisk… |       0 
         A obelisk/zeromq.py                   |     477 +++++++++++++++++++++++++++++++
         A run_obelisk                         |     109 +++++++++++++++++++++++++++++++
       13 files changed, 1310 insertions(+), 1308 deletions(-)
   DIR diff --git a/electrumobelisk/merkle.py b/electrumobelisk/merkle.py
       t@@ -1,57 +0,0 @@
       -#!/usr/bin/env python3
       -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       -# This file is part of obelisk
       -# This program is free software: you can redistribute it and/or modify
       -# it under the terms of the GNU Affero General Public License version 3
       -# as published by the Free Software Foundation.
       -# This program is distributed in the hope that it will be useful,
       -# but WITHOUT ANY WARRANTY; without even the implied warranty of
       -# GNU Affero General Public License for more details.
       -# You should have received a copy of the GNU Affero General Public License
       -# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       -"""Module for calculating merkle branches"""
       -from math import ceil, log
       -from electrumobelisk.util import double_sha256
       -def branch_length(hash_count):
       -    """Return the length of a merkle branch given the number of hashes"""
       -    return ceil(log(hash_count, 2))
       -def merkle_branch_and_root(hashes, index):
       -    """Return a (merkle branch, merkle_root) pair given hashes, and the
       -    index of one of those hashes.
       -    """
       -    hashes = list(hashes)
       -    if not isinstance(index, int):
       -        raise TypeError("index must be an integer")
       -    # This also asserts hashes is not empty
       -    if not 0 <= index < len(hashes):
       -        raise ValueError("index out of range")
       -    length = branch_length(len(hashes))
       -    branch = []
       -    for _ in range(length):
       -        if len(hashes) & 1:
       -            hashes.append(hashes[-1])
       -        branch.append(hashes[index ^ 1])
       -        index >>= 1
       -        hashes = [
       -            double_sha256(hashes[n] + hashes[n + 1])
       -            for n in range(0, len(hashes), 2)
       -        ]
       -    return branch, hashes[0]
       -def merkle_branch(tx_hashes, tx_pos):
       -    """Return a merkle branch given hashes and the tx position"""
       -    branch, _root = merkle_branch_and_root(tx_hashes, tx_pos)
       -    branch = [bytes(reversed(h)).hex() for h in branch]
       -    return branch
   DIR diff --git a/electrumobelisk/protocol.py b/electrumobelisk/protocol.py
       t@@ -1,665 +0,0 @@
       -#!/usr/bin/env python3
       -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       -# This file is part of obelisk
       -# This program is free software: you can redistribute it and/or modify
       -# it under the terms of the GNU Affero General Public License version 3
       -# as published by the Free Software Foundation.
       -# This program is distributed in the hope that it will be useful,
       -# but WITHOUT ANY WARRANTY; without even the implied warranty of
       -# GNU Affero General Public License for more details.
       -# You should have received a copy of the GNU Affero General Public License
       -# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       -"""Implementation of the Electrum protocol as found on
       -import asyncio
       -import json
       -from binascii import unhexlify
       -from electrumobelisk.errors import ERRORS
       -from electrumobelisk.merkle import merkle_branch
       -from electrumobelisk.util import (
       -    bh2u,
       -    block_to_header,
       -    is_boolean,
       -    is_hash256_str,
       -    is_hex_str,
       -    is_non_negative_integer,
       -    safe_hexlify,
       -    sha256,
       -    double_sha256,
       -    hash_to_hex_str,
       -from electrumobelisk.zeromq import Client
       -VERSION = "0.0"
       -SERVER_PROTO_MIN = "1.4"
       -SERVER_PROTO_MAX = "1.4.2"
       -DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz"
       -BANNER = ("""
       -Welcome to obelisk
       -"Tools for the people"
       -obelisk is a server that uses libbitcoin-server as its backend.
       -Source code can be found at: https://github.com/parazyd/obelisk
       -Please consider donating: %s
       -""" % DONATION_ADDR)
       -class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
       -    """Class implementing the Electrum protocol, with async support"""
       -    def __init__(self, log, chain, endpoints, server_cfg):
       -        self.log = log
       -        self.stopped = False
       -        self.endpoints = endpoints
       -        self.server_cfg = server_cfg
       -        self.loop = asyncio.get_event_loop()
       -        # Consider renaming bx to something else
       -        self.bx = Client(log, endpoints, self.loop)
       -        self.block_queue = None
       -        # TODO: Clean up on client disconnect
       -        self.tasks = []
       -        self.sh_subscriptions = {}
       -        if chain == "mainnet":
       -            self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
       -        elif chain == "testnet":
       -            self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
       -        else:
       -            raise ValueError(f"Invalid chain '{chain}'")
       -        # Here we map available methods to their respective functions
       -        self.methodmap = {
       -            "blockchain.block.header": self.blockchain_block_header,
       -            "blockchain.block.headers": self.blockchain_block_headers,
       -            "blockchain.estimatefee": self.blockchain_estimatefee,
       -            "blockchain.headers.subscribe": self.blockchain_headers_subscribe,
       -            "blockchain.relayfee": self.blockchain_relayfee,
       -            "blockchain.scripthash.get_balance":
       -            self.blockchain_scripthash_get_balance,
       -            "blockchain.scripthash.get_history":
       -            self.blockchain_scripthash_get_history,
       -            "blockchain.scripthash.get_mempool":
       -            self.blockchain_scripthash_get_mempool,
       -            "blockchain.scripthash.listunspent":
       -            self.blockchain_scripthash_listunspent,
       -            "blockchain.scripthash.subscribe":
       -            self.blockchain_scripthash_subscribe,
       -            "blockchain.scripthash.unsubscribe":
       -            self.blockchain_scripthash_unsubscribe,
       -            "blockchain.transaction.broadcast":
       -            self.blockchain_transaction_broadcast,
       -            "blockchain.transaction.get": self.blockchain_transaction_get,
       -            "blockchain.transaction.get_merkle":
       -            self.blockchain_transaction_get_merkle,
       -            "blockchain.transaction.id_from_pos":
       -            self.blockchain_transaction_from_pos,
       -            "mempool.get_fee_histogram": self.mempool_get_fee_histogram,
       -            "server_add_peer": self.server_add_peer,
       -            "server.banner": self.server_banner,
       -            "server.donation_address": self.server_donation_address,
       -            "server.features": self.server_features,
       -            "server.peers.subscribe": self.server_peers_subscribe,
       -            "server.ping": self.server_ping,
       -            "server.version": self.server_version,
       -        }
       -    async def stop(self):
       -        """Destructor function"""
       -        self.log.debug("ElectrumProtocol.stop()")
       -        if self.bx:
       -            unsub_pool = []
       -            for i in self.sh_subscriptions:
       -                self.log.debug("bx.unsubscribe %s", i)
       -                unsub_pool.append(self.bx.unsubscribe_scripthash(i))
       -            await asyncio.gather(*unsub_pool, return_exceptions=True)
       -            await self.bx.stop()
       -        # idxs = []
       -        # for task in self.tasks:
       -        # idxs.append(self.tasks.index(task))
       -        # task.cancel()
       -        # for i in idxs:
       -        # del self.tasks[i]
       -        self.stopped = True
       -    async def recv(self, reader, writer):
       -        """Loop ran upon a connection which acts as a JSON-RPC handler"""
       -        recv_buf = bytearray()
       -        while not self.stopped:
       -            data = await reader.read(4096)
       -            if not data or len(data) == 0:
       -                self.log.debug("Received EOF, disconnect")
       -                # TODO: cancel asyncio tasks for this client here?
       -                return
       -            recv_buf.extend(data)
       -            lb = recv_buf.find(b"\n")
       -            if lb == -1:
       -                continue
       -            while lb != -1:
       -                line = recv_buf[:lb].rstrip()
       -                recv_buf = recv_buf[lb + 1:]
       -                lb = recv_buf.find(b"\n")
       -                try:
       -                    line = line.decode("utf-8")
       -                    query = json.loads(line)
       -                except (UnicodeDecodeError, json.JSONDecodeError) as err:
       -                    self.log.debug("Got error: %s", repr(err))
       -                    break
       -                self.log.debug("=> " + line)
       -                await self.handle_query(writer, query)
       -    async def _send_notification(self, writer, method, params):
       -        """Send JSON-RPC notification to given writer"""
       -        response = {"jsonrpc": "2.0", "method": method, "params": params}
       -        self.log.debug("<= %s", response)
       -        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       -        await writer.drain()
       -    async def _send_response(self, writer, result, nid):
       -        """Send successful JSON-RPC response to given writer"""
       -        response = {"jsonrpc": "2.0", "result": result, "id": nid}
       -        self.log.debug("<= %s", response)
       -        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       -        await writer.drain()
       -    async def _send_error(self, writer, error, nid):
       -        """Send JSON-RPC error to given writer"""
       -        response = {"jsonrpc": "2.0", "error": error, "id": nid}
       -        self.log.debug("<= %s", response)
       -        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       -        await writer.drain()
       -    async def _send_reply(self, writer, resp, query):
       -        """Wrap function for sending replies"""
       -        if "error" in resp:
       -            return await self._send_error(writer, resp["error"], query["id"])
       -        return await self._send_response(writer, resp["result"], query["id"])
       -    async def handle_query(self, writer, query):  # pylint: disable=R0915,R0912,R0911
       -        """Electrum protocol method handler mapper"""
       -        if "method" not in query:
       -            self.log.debug("No 'method' in query: %s", query)
       -            return
       -        if "id" not in query:
       -            self.log.debug("No 'id' in query: %s", query)
       -            return
       -        method = query["method"]
       -        func = self.methodmap.get(method)
       -        if not func:
       -            self.log.error("Unhandled method %s, query=%s", method, query)
       -            return await self._send_reply(writer, ERRORS["nomethod"], query)
       -        resp = await func(writer, query)
       -        return await self._send_reply(writer, resp, query)
       -    async def blockchain_block_header(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.block.header
       -        Return the block header at the given height.
       -        """
       -        if "params" not in query or len(query["params"]) < 1:
       -            return ERRORS["invalidparams"]
       -        # TODO: cp_height
       -        index = query["params"][0]
       -        cp_height = query["params"][1] if len(query["params"]) == 2 else 0
       -        if not is_non_negative_integer(index):
       -            return ERRORS["invalidparams"]
       -        if not is_non_negative_integer(cp_height):
       -            return ERRORS["invalidparams"]
       -        _ec, data = await self.bx.fetch_block_header(index)
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        return {"result": safe_hexlify(data)}
       -    async def blockchain_block_headers(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.block.headers
       -        Return a concatenated chunk of block headers from the main chain.
       -        """
       -        if "params" not in query or len(query["params"]) < 2:
       -            return ERRORS["invalidparams"]
       -        # Electrum doesn't allow max_chunk_size to be less than 2016
       -        # gopher://bitreich.org/9/memecache/convenience-store.mkv
       -        # TODO: cp_height
       -        max_chunk_size = 2016
       -        start_height = query["params"][0]
       -        count = query["params"][1]
       -        if not is_non_negative_integer(start_height):
       -            return ERRORS["invalidparams"]
       -        if not is_non_negative_integer(count):
       -            return ERRORS["invalidparams"]
       -        count = min(count, max_chunk_size)
       -        headers = bytearray()
       -        for i in range(count):
       -            _ec, data = await self.bx.fetch_block_header(i)
       -            if _ec and _ec != 0:
       -                self.log.debug("Got error: %s", repr(_ec))
       -                return ERRORS["internalerror"]
       -            headers.extend(data)
       -        resp = {
       -            "hex": safe_hexlify(headers),
       -            "count": len(headers) // 80,
       -            "max": max_chunk_size,
       -        }
       -        return {"result": resp}
       -    async def blockchain_estimatefee(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.estimatefee
       -        Return the estimated transaction fee per kilobyte for a transaction
       -        to be confirmed within a certain number of blocks.
       -        """
       -        # TODO: Help wanted
       -        return {"result": -1}
       -    async def header_notifier(self, writer):
       -        self.block_queue = asyncio.Queue()
       -        await self.bx.subscribe_to_blocks(self.block_queue)
       -        while True:
       -            # item = (seq, height, block_data)
       -            item = await self.block_queue.get()
       -            if len(item) != 3:
       -                self.log.debug("error: item from block queue len != 3")
       -                continue
       -            header = block_to_header(item[2])
       -            params = [{"height": item[1], "hex": safe_hexlify(header)}]
       -            await self._send_notification(writer,
       -                                          "blockchain.headers.subscribe",
       -                                          params)
       -    async def blockchain_headers_subscribe(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.headers.subscribe
       -        Subscribe to receive block headers when a new block is found.
       -        """
       -        # Tip height and header are returned upon request
       -        _ec, height = await self.bx.fetch_last_height()
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        _ec, tip_header = await self.bx.fetch_block_header(height)
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        self.tasks.append(asyncio.create_task(self.header_notifier(writer)))
       -        ret = {"height": height, "hex": safe_hexlify(tip_header)}
       -        return {"result": ret}
       -    async def blockchain_relayfee(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.relayfee
       -        Return the minimum fee a low-priority transaction must pay in order
       -        to be accepted to the daemon’s memory pool.
       -        """
       -        # TODO: Help wanted
       -        return {"result": 0.00001}
       -    async def blockchain_scripthash_get_balance(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.scripthash.get_balance
       -        Return the confirmed and unconfirmed balances of a script hash.
       -        """
       -        if "params" not in query or len(query["params"]) != 1:
       -            return ERRORS["invalidparams"]
       -        if not is_hash256_str(query["params"][0]):
       -            return ERRORS["invalidparams"]
       -        _ec, data = await self.bx.fetch_balance(query["params"][0])
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        # TODO: confirmed/unconfirmed, see what's happening in libbitcoin
       -        ret = {"confirmed": data, "unconfirmed": 0}
       -        return {"result": ret}
       -    async def blockchain_scripthash_get_history(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.scripthash.get_history
       -        Return the confirmed and unconfirmed history of a script hash.
       -        """
       -        if "params" not in query or len(query["params"]) != 1:
       -            return ERRORS["invalidparams"]
       -        if not is_hash256_str(query["params"][0]):
       -            return ERRORS["invalidparams"]
       -        _ec, data = await self.bx.fetch_history4(query["params"][0])
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        self.log.debug("hist: %s", data)
       -        ret = []
       -        # TODO: mempool
       -        for i in data:
       -            if "received" in i:
       -                ret.append({
       -                    "height": i["received"]["height"],
       -                    "tx_hash": hash_to_hex_str(i["received"]["hash"]),
       -                })
       -            if "spent" in i:
       -                ret.append({
       -                    "height": i["spent"]["height"],
       -                    "tx_hash": hash_to_hex_str(i["spent"]["hash"]),
       -                })
       -        return {"result": ret}
       -    async def blockchain_scripthash_get_mempool(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.scripthash.get_mempool
       -        Return the unconfirmed transactions of a script hash.
       -        """
       -        return
       -    async def blockchain_scripthash_listunspent(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.scripthash.listunspent
       -        Return an ordered list of UTXOs sent to a script hash.
       -        """
       -        if "params" not in query or len(query["params"]) != 1:
       -            return ERRORS["invalidparams"]
       -        scripthash = query["params"][0]
       -        if not is_hash256_str(scripthash):
       -            return ERRORS["invalidparams"]
       -        _ec, utxo = await self.bx.fetch_utxo(scripthash)
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        # TODO: Check mempool
       -        ret = []
       -        for i in utxo:
       -            rec = i["received"]
       -            ret.append({
       -                "tx_pos": rec["index"],
       -                "value": i["value"],
       -                "tx_hash": hash_to_hex_str(rec["hash"]),
       -                "height": rec["height"],
       -            })
       -        return {"result": ret}
       -    async def scripthash_notifier(self, writer, scripthash):
       -        # TODO: Figure out how this actually works
       -        _ec, sh_queue = await self.bx.subscribe_scripthash(scripthash)
       -        if _ec and _ec != 0:
       -            self.log.error("bx.subscribe_scripthash failed:", repr(_ec))
       -            return
       -        while True:
       -            # item = (seq, height, block_data)
       -            item = await sh_queue.get()
       -            self.log.debug("sh_subscription item: %s", item)
       -    async def blockchain_scripthash_subscribe(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.scripthash.subscribe
       -        Subscribe to a script hash.
       -        """
       -        if "params" not in query or len(query["params"]) != 1:
       -            return ERRORS["invalidparamas"]
       -        scripthash = query["params"][0]
       -        if not is_hash256_str(scripthash):
       -            return ERRORS["invalidparams"]
       -        _ec, history = await self.bx.fetch_history4(scripthash)
       -        if _ec and _ec != 0:
       -            return ERRORS["internalerror"]
       -        task = asyncio.create_task(self.scripthash_notifier(
       -            writer, scripthash))
       -        self.sh_subscriptions[scripthash] = {"task": task}
       -        if len(history) < 1:
       -            return {"result": None}
       -        # TODO: Check how history4 acts for mempool/unconfirmed
       -        status = []
       -        for i in history:
       -            if "received" in i:
       -                status.append((
       -                    hash_to_hex_str(i["received"]["hash"]),
       -                    i["received"]["height"],
       -                ))
       -            if "spent" in i:
       -                status.append((
       -                    hash_to_hex_str(i["spent"]["hash"]),
       -                    i["spent"]["height"],
       -                ))
       -        self.sh_subscriptions[scripthash]["status"] = status
       -        return {"result": ElectrumProtocol.__scripthash_status(status)}
       -    @staticmethod
       -    def __scripthash_status(status):
       -        concat = ""
       -        for txid, height in status:
       -            concat += txid + ":%d:" % height
       -        return bh2u(sha256(concat.encode("ascii")))
       -    async def blockchain_scripthash_unsubscribe(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.scripthash.unsubscribe
       -        Unsubscribe from a script hash, preventing future notifications
       -        if its status changes.
       -        """
       -        if "params" not in query or len(query["params"]) != 1:
       -            return ERRORS["invalidparams"]
       -        scripthash = query["params"][0]
       -        if not is_hash256_str(scripthash):
       -            return ERRORS["invalidparams"]
       -        if scripthash in self.sh_subscriptions:
       -            self.sh_subscriptions[scripthash]["task"].cancel()
       -            await self.bx.unsubscribe_scripthash(scripthash)
       -            del self.sh_subscriptions[scripthash]
       -            return {"result": True}
       -        return {"result": False}
       -    async def blockchain_transaction_broadcast(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.transaction.broadcast
       -        Broadcast a transaction to the network.
       -        """
       -        # Note: Not yet implemented in bs v4
       -        if "params" not in query or len(query["params"]) != 1:
       -            return ERRORS["invalidparams"]
       -        hextx = query["params"][0]
       -        if not is_hex_str(hextx):
       -            return ERRORS["invalidparams"]
       -        _ec, _ = await self.bx.broadcast_transaction(hextx)
       -        if _ec and _ec != 0:
       -            return ERRORS["internalerror"]
       -        rawtx = unhexlify(hextx)
       -        txid = double_sha256(rawtx)
       -        return {"result": hash_to_hex_str(txid)}
       -    async def blockchain_transaction_get(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.transaction.get
       -        Return a raw transaction.
       -        """
       -        if "params" not in query or len(query["params"]) < 1:
       -            return ERRORS["invalidparams"]
       -        tx_hash = query["params"][0]
       -        verbose = query["params"][1] if len(query["params"]) > 1 else False
       -        # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash)
       -        _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash)
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        # Behaviour is undefined in spec
       -        if not rawtx:
       -            return {"result": None}
       -        if verbose:
       -            # TODO: Help needed
       -            return ERRORS["invalidrequest"]
       -        return {"result": bh2u(rawtx)}
       -    async def blockchain_transaction_get_merkle(self, writer, query):  # pylint: disable=W0613
       -        """Method: blockchain.transaction.get_merkle
       -        Return the merkle branch to a confirmed transaction given its
       -        hash and height.
       -        """
       -        if "params" not in query or len(query["params"]) != 2:
       -            return ERRORS["invalidparams"]
       -        tx_hash = query["params"][0]
       -        height = query["params"][1]
       -        if not is_hash256_str(tx_hash):
       -            return ERRORS["invalidparams"]
       -        if not is_non_negative_integer(height):
       -            return ERRORS["invalidparams"]
       -        _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        # Decouple from tuples
       -        hashes = [i[0] for i in hashes]
       -        tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
       -        branch = merkle_branch(hashes, tx_pos)
       -        res = {
       -            "block_height": int(height),
       -            "pos": int(tx_pos),
       -            "merkle": branch,
       -        }
       -        return {"result": res}
       -    async def blockchain_transaction_from_pos(self, writer, query):  # pylint: disable=R0911,W0613
       -        """Method: blockchain.transaction.id_from_pos
       -        Return a transaction hash and optionally a merkle proof, given a
       -        block height and a position in the block.
       -        """
       -        if "params" not in query or len(query["params"]) < 2:
       -            return ERRORS["invalidparams"]
       -        height = query["params"][0]
       -        tx_pos = query["params"][1]
       -        merkle = query["params"][2] if len(query["params"]) > 2 else False
       -        if not is_non_negative_integer(height):
       -            return ERRORS["invalidparams"]
       -        if not is_non_negative_integer(tx_pos):
       -            return ERRORS["invalidparams"]
       -        if not is_boolean(merkle):
       -            return ERRORS["invalidparams"]
       -        _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
       -        if _ec and _ec != 0:
       -            self.log.debug("Got error: %s", repr(_ec))
       -            return ERRORS["internalerror"]
       -        if len(hashes) - 1 < tx_pos:
       -            return ERRORS["internalerror"]
       -        # Decouple from tuples
       -        hashes = [i[0] for i in hashes]
       -        txid = hash_to_hex_str(hashes[tx_pos])
       -        if not merkle:
       -            return {"result": txid}
       -        branch = merkle_branch(hashes, tx_pos)
       -        return {"result": {"tx_hash": txid, "merkle": branch}}
       -    async def mempool_get_fee_histogram(self, writer, query):  # pylint: disable=W0613
       -        """Method: mempool.get_fee_histogram
       -        Return a histogram of the fee rates paid by transactions in the
       -        memory pool, weighted by transaction size.
       -        """
       -        # TODO: Help wanted
       -        return {"result": [[0, 0]]}
       -    async def server_add_peer(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.add_peer
       -        A newly-started server uses this call to get itself into other
       -        servers’ peers lists. It should not be used by wallet clients.
       -        """
       -        # TODO: Help wanted
       -        return {"result": False}
       -    async def server_banner(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.banner
       -        Return a banner to be shown in the Electrum console.
       -        """
       -        return {"result": BANNER}
       -    async def server_donation_address(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.donation_address
       -        Return a server donation address.
       -        """
       -        return {"result": DONATION_ADDR}
       -    async def server_features(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.features
       -        Return a list of features and services supported by the server.
       -        """
       -        cfg = self.server_cfg
       -        return {
       -            "result": {
       -                "genesis_hash": self.genesis,
       -                "hosts": {
       -                    cfg["server_hostname"]: {
       -                        "tcp_port": cfg["server_port"],
       -                        "ssl_port": None,
       -                    },
       -                },
       -                "protocol_max": SERVER_PROTO_MAX,
       -                "protocol_min": SERVER_PROTO_MIN,
       -                "pruning": None,
       -                "server_version": f"obelisk {VERSION}",
       -                "hash_function": "sha256",
       -            }
       -        }
       -    async def server_peers_subscribe(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.peers.subscribe
       -        Return a list of peer servers. Despite the name this is not a
       -        subscription and the server must send no notifications.
       -        """
       -        # TODO: Help wanted
       -        return {"result": []}
       -    async def server_ping(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.ping
       -        Ping the server to ensure it is responding, and to keep the session
       -        alive. The server may disconnect clients that have sent no requests
       -        for roughly 10 minutes.
       -        """
       -        return {"result": None}
       -    async def server_version(self, writer, query):  # pylint: disable=W0613
       -        """Method: server.version
       -        Identify the client to the server and negotiate the protocol version.
       -        """
       -        if "params" not in query or len(query["params"]) != 2:
       -            return ERRORS["invalidparams"]
       -        client_ver = query["params"][1]
       -        if isinstance(client_ver, list):
       -            client_min, client_max = client_ver[0], client_ver[1]
       -        else:
       -            client_min = client_max = client_ver
       -        version = min(client_max, SERVER_PROTO_MAX)
       -        if version < max(client_min, SERVER_PROTO_MIN):
       -            return ERRORS["protonotsupported"]
       -        return {"result": [f"obelisk {VERSION}", version]}
   DIR diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py
       t@@ -1,477 +0,0 @@
       -#!/usr/bin/env python3
       -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       -# This file is part of obelisk
       -# This program is free software: you can redistribute it and/or modify
       -# it under the terms of the GNU Affero General Public License version 3
       -# as published by the Free Software Foundation.
       -# This program is distributed in the hope that it will be useful,
       -# but WITHOUT ANY WARRANTY; without even the implied warranty of
       -# GNU Affero General Public License for more details.
       -# You should have received a copy of the GNU Affero General Public License
       -# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       -"""ZeroMQ implementation for libbitcoin"""
       -import asyncio
       -import functools
       -import struct
       -from binascii import unhexlify
       -from random import randint
       -import zmq
       -import zmq.asyncio
       -from electrumobelisk.libbitcoin_errors import make_error_code, ErrorCode
       -from electrumobelisk.util import bh2u
       -def create_random_id():
       -    """Generate a random request ID"""
       -    max_uint32 = 4294967295
       -    return randint(0, max_uint32)
       -def pack_block_index(index):
       -    """struct.pack given index"""
       -    if isinstance(index, str):
       -        index = unhexlify(index)
       -        assert len(index) == 32
       -        return index
       -    if isinstance(index, int):
       -        return struct.pack("<I", index)
       -    raise ValueError(
       -        f"Unknown index type {type(index)} v:{index}, should be int or bytearray"
       -    )
       -def to_int(xbytes):
       -    """Make little-endian integer from given bytes"""
       -    return int.from_bytes(xbytes, byteorder="little")
       -def checksum(xhash, index):
       -    """
       -    This method takes a transaction hash and an index and returns a checksum.
       -    This checksum is based on 49 bits starting from the 12th byte of the
       -    reversed hash. Combined with the last 15 bits of the 4 byte index.
       -    """
       -    mask = 0xFFFFFFFFFFFF8000
       -    magic_start_position = 12
       -    hash_bytes = bytes.fromhex(xhash)[::-1]
       -    last_20_bytes = hash_bytes[magic_start_position:]
       -    assert len(hash_bytes) == 32
       -    assert index < 2**32
       -    hash_upper_49_bits = to_int(last_20_bytes) & mask
       -    index_lower_15_bits = index & ~mask
       -    return hash_upper_49_bits | index_lower_15_bits
       -def unpack_table(row_fmt, data):
       -    """Function to unpack table received from libbitcoin"""
       -    # Get the number of rows
       -    row_size = struct.calcsize(row_fmt)
       -    nrows = len(data) // row_size
       -    # Unpack
       -    rows = []
       -    for idx in range(nrows):
       -        offset = idx * row_size
       -        row = struct.unpack_from(row_fmt, data, offset)
       -        rows.append(row)
       -    return rows
       -class ClientSettings:
       -    """Class implementing ZMQ client settings"""
       -    def __init__(self, timeout=10, context=None, loop=None):
       -        self._timeout = timeout
       -        self._context = context
       -        self._loop = loop
       -    @property
       -    def context(self):
       -        """ZMQ context property"""
       -        if not self._context:
       -            ctx = zmq.asyncio.Context()
       -            ctx.linger = 500  # in milliseconds
       -            self._context = ctx
       -        return self._context
       -    @context.setter
       -    def context(self, context):
       -        self._context = context
       -    @property
       -    def timeout(self):
       -        """Set to None for no timeout"""
       -        return self._timeout
       -    @timeout.setter
       -    def timeout(self, timeout):
       -        self._timeout = timeout
       -class Request:
       -    """Class implementing a _send_ request.
       -    This is either a simple request/response affair or a subscription.
       -    """
       -    def __init__(self, socket, command, data):
       -        self.id_ = create_random_id()
       -        self.socket = socket
       -        self.command = command
       -        self.data = data
       -        self.future = asyncio.Future()
       -        self.queue = None
       -    async def send(self):
       -        """Send the ZMQ request"""
       -        request = [self.command, struct.pack("<I", self.id_), self.data]
       -        await self.socket.send_multipart(request)
       -    def is_subscription(self):
       -        """If the request is a subscription, then the response to this
       -        request is a notification.
       -        """
       -        return self.queue is not None
       -    def __str__(self):
       -        return "Request(command, ID) {}, {:d}".format(self.command, self.id_)
       -class InvalidServerResponseException(Exception):
       -    """Exception for invalid server responses"""
       -class Response:
       -    """Class implementing a request response"""
       -    def __init__(self, frame):
       -        if len(frame) != 3:
       -            raise InvalidServerResponseException(
       -                f"Length of the frame was not 3: {len(frame)}")
       -        self.command = frame[0]
       -        self.request_id = struct.unpack("<I", frame[1])[0]
       -        error_code = struct.unpack("<I", frame[2][:4])[0]
       -        self.error_code = make_error_code(error_code)
       -        self.data = frame[2][4:]
       -    def is_bound_for_queue(self):
       -        return len(self.data) > 0
       -    def __str__(self):
       -        return (
       -            "Response(command, request ID, error code, data):" +
       -            f" {self.command}, {self.request_id}, {self.error_code}, {self.data}"
       -        )
       -class RequestCollection:
       -    """RequestCollection carries a list of Requests and matches incoming
       -    responses to them.
       -    """
       -    def __init__(self, socket, loop):
       -        self._socket = socket
       -        self._requests = {}
       -        self._task = asyncio.ensure_future(self._run(), loop=loop)
       -    async def _run(self):
       -        while True:
       -            await self._receive()
       -    async def stop(self):
       -        """Stops listening for incoming responses (or subscription) messages.
       -        Returns the number of _responses_ expected but which are now dropped
       -        on the floor.
       -        """
       -        self._task.cancel()
       -        try:
       -            await self._task
       -        except asyncio.CancelledError:
       -            return len(self._requests)
       -    async def _receive(self):
       -        frame = await self._socket.recv_multipart()
       -        response = Response(frame)
       -        if response.request_id in self._requests:
       -            self._handle_response(response)
       -        else:
       -            print(
       -                f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"  # pylint: disable=C0301
       -            )
       -    def _handle_response(self, response):
       -        request = self._requests[response.request_id]
       -        if request.is_subscription():
       -            if response.is_bound_for_queue():
       -                # TODO: decode the data into something usable
       -                request.queue.put_nowait(response.data)
       -            else:
       -                request.future.set_result(response)
       -        else:
       -            self.delete_request(request)
       -            request.future.set_result(response)
       -    def add_request(self, request):
       -        # TODO: we should maybe check if the request.id_ is unique
       -        self._requests[request.id_] = request
       -    def delete_request(self, request):
       -        del self._requests[request.id_]
       -class Client:
       -    """This class represents a connection to a libbitcoin server."""
       -    def __init__(self, log, endpoints, loop):
       -        self.log = log
       -        self._endpoints = endpoints
       -        self._settings = ClientSettings(loop=loop)
       -        self._query_socket = self._create_query_socket()
       -        self._block_socket = self._create_block_socket()
       -        self._request_collection = RequestCollection(self._query_socket,
       -                                                     self._settings._loop)
       -    async def stop(self):
       -        self.log.debug("zmq Client.stop()")
       -        self._query_socket.close()
       -        self._block_socket.close()
       -        return await self._request_collection.stop()
       -    def _create_block_socket(self):
       -        socket = self._settings.context.socket(
       -            zmq.SUB,  # pylint: disable=E1101
       -            io_loop=self._settings._loop,  # pylint: disable=W0212
       -        )
       -        socket.connect(self._endpoints["block"])
       -        socket.setsockopt_string(zmq.SUBSCRIBE, "")  # pylint: disable=E1101
       -        return socket
       -    def _create_query_socket(self):
       -        socket = self._settings.context.socket(
       -            zmq.DEALER,  # pylint: disable=E1101
       -            io_loop=self._settings._loop,  # pylint: disable=W0212
       -        )
       -        socket.connect(self._endpoints["query"])
       -        return socket
       -    async def _subscription_request(self, command, data):
       -        request = await self._request(command, data)
       -        request.queue = asyncio.Queue(loop=self._settings._loop)  # pylint: disable=W0212
       -        error_code, _ = await self._wait_for_response(request)
       -        return error_code, request.queue
       -    async def _simple_request(self, command, data):
       -        return await self._wait_for_response(await
       -                                             self._request(command, data))
       -    async def _request(self, command, data):
       -        """Make a generic request. Both options are byte objects specified
       -        like b'blockchain.fetch_block_header' as an example.
       -        """
       -        request = Request(self._query_socket, command, data)
       -        await request.send()
       -        self._request_collection.add_request(request)
       -        return request
       -    async def _wait_for_response(self, request):
       -        try:
       -            response = await asyncio.wait_for(request.future,
       -                                              self._settings.timeout)
       -        except asyncio.TimeoutError:
       -            self._request_collection.delete_request(request)
       -            return ErrorCode.channel_timeout, None
       -        assert response.command == request.command
       -        assert response.request_id == request.id_
       -        return response.error_code, response.data
       -    async def fetch_last_height(self):
       -        """Fetch the blockchain tip and return integer height"""
       -        command = b"blockchain.fetch_last_height"
       -        error_code, data = await self._simple_request(command, b"")
       -        if error_code:
       -            return error_code, None
       -        return error_code, struct.unpack("<I", data)[0]
       -    async def fetch_block_header(self, index):
       -        """Fetch a block header by its height or integer index"""
       -        command = b"blockchain.fetch_block_header"
       -        data = pack_block_index(index)
       -        return await self._simple_request(command, data)
       -    async def fetch_block_transaction_hashes(self, index):
       -        """Fetch transaction hashes in a block at height index"""
       -        command = b"blockchain.fetch_block_transaction_hashes"
       -        data = pack_block_index(index)
       -        error_code, data = await self._simple_request(command, data)
       -        if error_code:
       -            return error_code, None
       -        return error_code, unpack_table("32s", data)
       -    async def fetch_blockchain_transaction(self, txid):
       -        """Fetch transaction by txid (not including mempool)"""
       -        command = b"blockchain.fetch_transaction2"
       -        error_code, data = await self._simple_request(
       -            command,
       -            bytes.fromhex(txid)[::-1])
       -        if error_code:
       -            return error_code, None
       -        return error_code, data
       -    async def fetch_mempool_transaction(self, txid):
       -        """Fetch transaction by txid (including mempool)"""
       -        command = b"transaction_pool.fetch_transaction2"
       -        error_code, data = await self._simple_request(
       -            command,
       -            bytes.fromhex(txid)[::-1])
       -        if error_code:
       -            return error_code, None
       -        return error_code, data
       -    async def subscribe_scripthash(self, scripthash):
       -        """Subscribe to scripthash"""
       -        command = b"subscribe.key"
       -        decoded_address = unhexlify(scripthash)
       -        return await self._subscription_request(command, decoded_address)
       -    async def unsubscribe_scripthash(self, scripthash):
       -        """Unsubscribe scripthash"""
       -        # TODO: This call should ideally also remove the subscription
       -        # request from the RequestCollection.
       -        # This call solicits a final call from the server with an
       -        # `error::service_stopped` error code.
       -        command = b"unsubscribe.key"
       -        decoded_address = unhexlify(scripthash)
       -        return await self._simple_request(command, decoded_address)
       -    async def fetch_history4(self, scripthash, height=0):
       -        """Fetch history for given scripthash"""
       -        command = b"blockchain.fetch_history4"
       -        decoded_address = unhexlify(scripthash)
       -        error_code, raw_points = await self._simple_request(
       -            command, decoded_address + struct.pack("<I", height))
       -        if error_code:
       -            return error_code, None
       -        def make_tuple(row):
       -            kind, height, tx_hash, index, value = row
       -            return (
       -                kind,
       -                {
       -                    "hash": tx_hash,
       -                    "index": index
       -                },
       -                height,
       -                value,
       -                checksum(tx_hash[::-1].hex(), index),
       -            )
       -        rows = unpack_table("<BI32sIQ", raw_points)
       -        points = [make_tuple(row) for row in rows]
       -        correlated_points = Client.__correlate(points)
       -        # self.log.debug("history points: %s", points)
       -        # self.log.debug("history correlated: %s", correlated_points)
       -        return error_code, self._sort_correlated_points(correlated_points)
       -    @staticmethod
       -    def _sort_correlated_points(points):
       -        """Sort by ascending height"""
       -        if len(points) < 2:
       -            return points
       -        return sorted(points, key=lambda x: list(x.values())[0]["height"])
       -    async def broadcast_transaction(self, rawtx):
       -        """Broadcast given raw transaction"""
       -        command = b"transaction_pool.broadcast"
       -        return await self._simple_request(command, rawtx)
       -    async def fetch_balance(self, scripthash):
       -        """Fetch balance for given scripthash"""
       -        error_code, history = await self.fetch_history4(scripthash)
       -        if error_code:
       -            return error_code, None
       -        utxo = Client.__receives_without_spends(history)
       -        return error_code, functools.reduce(
       -            lambda accumulator, point: accumulator + point["value"], utxo, 0)
       -    async def fetch_utxo(self, scripthash):
       -        """Find UTXO for given scripthash"""
       -        error_code, history = await self.fetch_history4(scripthash)
       -        if error_code:
       -            return error_code, None
       -        return error_code, Client.__receives_without_spends(history)
       -    async def subscribe_to_blocks(self, queue):
       -        asyncio.ensure_future(self._listen_for_blocks(queue))
       -        return queue
       -    async def _listen_for_blocks(self, queue):
       -        """Infinite loop for block subscription.
       -        Returns raw blocks as they're received.
       -        """
       -        while True:
       -            frame = await self._block_socket.recv_multipart()
       -            seq = struct.unpack("<H", frame[0])[0]
       -            height = struct.unpack("<I", frame[1])[0]
       -            block_data = frame[2]
       -            queue.put_nowait((seq, height, block_data))
       -    @staticmethod
       -    def __receives_without_spends(history):
       -        return (point for point in history if "spent" not in point)
       -    @staticmethod
       -    def __correlate(points):
       -        transfers, checksum_to_index = Client.__find_receives(points)
       -        transfers = Client.__correlate_spends_to_receives(
       -            points, transfers, checksum_to_index)
       -        return transfers
       -    @staticmethod
       -    def __correlate_spends_to_receives(points, transfers, checksum_to_index):
       -        for point in points:
       -            if point[0] == 1:  # receive
       -                continue
       -            spent = {
       -                "hash": point[1]["hash"],
       -                "height": point[2],
       -                "index": point[1]["index"],
       -            }
       -            if point[3] not in checksum_to_index:
       -                transfers.append({"spent": spent})
       -            else:
       -                transfers[checksum_to_index[point[3]]]["spent"] = spent
       -        return transfers
       -    @staticmethod
       -    def __find_receives(points):
       -        transfers = []
       -        checksum_to_index = {}
       -        for point in points:
       -            if point[0] == 0:  # spent
       -                continue
       -            transfers.append({
       -                "received": {
       -                    "hash": point[1]["hash"],
       -                    "height": point[2],
       -                    "index": point[1]["index"],
       -                },
       -                "value": point[3],
       -            })
       -            checksum_to_index[point[4]] = len(transfers) - 1
       -        return transfers, checksum_to_index
   DIR diff --git a/obelisk.py b/obelisk.py
       t@@ -1,109 +0,0 @@
       -#!/usr/bin/env python3
       -# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       -# This file is part of obelisk
       -# This program is free software: you can redistribute it and/or modify
       -# it under the terms of the GNU Affero General Public License version 3
       -# as published by the Free Software Foundation.
       -# This program is distributed in the hope that it will be useful,
       -# but WITHOUT ANY WARRANTY; without even the implied warranty of
       -# GNU Affero General Public License for more details.
       -# You should have received a copy of the GNU Affero General Public License
       -# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       -import asyncio
       -import sys
       -from argparse import ArgumentParser
       -from configparser import RawConfigParser, NoSectionError
       -from logging import getLogger, FileHandler, Formatter, StreamHandler
       -from os import devnull
       -from electrumobelisk.protocol import ElectrumProtocol, VERSION
       -# Used for destructor/cleanup
       -PROTOCOL = None
       -def logger_config(log, config):
       -    """Setup logging"""
       -    fmt = Formatter("%(asctime)s\t%(levelname)s\t%(message)s")
       -    logstream = StreamHandler()
       -    logstream.setFormatter(fmt)
       -    debuglevel = config.get("obelisk", "log_level", fallback="INFO")
       -    logstream.setLevel(debuglevel)
       -    log.addHandler(logstream)
       -    filename = config.get("obelisk", "log_file", fallback=devnull)
       -    append_log = config.getboolean("obelisk", "append_log", fallback=False)
       -    logfile = FileHandler(filename, mode=("a" if append_log else "w"))
       -    logfile.setFormatter(fmt)
       -    logfile.setLevel(debuglevel)
       -    log.addHandler(logfile)
       -    log.setLevel(debuglevel)
       -    return log, filename
       -async def run_electrum_server(config, chain):
       -    """Server coroutine"""
       -    log = getLogger("obelisk")
       -    host = config.get("obelisk", "host")
       -    port = int(config.get("obelisk", "port"))
       -    endpoints = {}
       -    endpoints["query"] = config.get("obelisk", "query")
       -    endpoints["heart"] = config.get("obelisk", "heart")
       -    endpoints["block"] = config.get("obelisk", "block")
       -    endpoints["trans"] = config.get("obelisk", "trans")
       -    server_cfg = {}
       -    server_cfg["server_hostname"] = "localhost"  # TODO: <- should be public?
       -    server_cfg["server_port"] = port
       -    global PROTOCOL
       -    PROTOCOL = ElectrumProtocol(log, chain, endpoints, server_cfg)
       -    server = await asyncio.start_server(PROTOCOL.recv, host, port)
       -    async with server:
       -        await server.serve_forever()
       -def main():
       -    """Main orchestration"""
       -    parser = ArgumentParser(description=f"obelisk {VERSION}")
       -    parser.add_argument("config_file", help="Path to config file")
       -    args = parser.parse_args()
       -    try:
       -        config = RawConfigParser()
       -        config.read(args.config_file)
       -        config.options("obelisk")
       -    except NoSectionError:
       -        print(f"error: Invalid config file {args.config_file}")
       -        return 1
       -    log = getLogger("obelisk")
       -    log, logfilename = logger_config(log, config)
       -    log.info(f"Starting obelisk {VERSION}")
       -    log.info(f"Logging to {logfilename}")
       -    chain = config.get("obelisk", "chain")
       -    if chain not in ("mainnet", "testnet"):
       -        log.error("chain is not 'mainnet' or 'testnet'")
       -        return 1
       -    try:
       -        asyncio.run(run_electrum_server(config, chain))
       -    except KeyboardInterrupt:
       -        print("\r", end="")
       -        log.debug("Caught KeyboardInterrupt, exiting...")
       -        if PROTOCOL:
       -            asyncio.run(PROTOCOL.stop())
       -        return 0
       -    return 1
       -if __name__ == "__main__":
       -    sys.exit(main())
   DIR diff --git a/obelisk/__init__.py b/obelisk/__init__.py
   DIR diff --git a/electrumobelisk/errors.py b/obelisk/errors.py
   DIR diff --git a/electrumobelisk/libbitcoin_errors.py b/obelisk/libbitcoin_errors.py
   DIR diff --git a/obelisk/merkle.py b/obelisk/merkle.py
       t@@ -0,0 +1,57 @@
       +#!/usr/bin/env python3
       +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       +# This file is part of obelisk
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU Affero General Public License version 3
       +# as published by the Free Software Foundation.
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# GNU Affero General Public License for more details.
       +# You should have received a copy of the GNU Affero General Public License
       +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       +"""Module for calculating merkle branches"""
       +from math import ceil, log
       +from obelisk.util import double_sha256
       +def branch_length(hash_count):
       +    """Return the length of a merkle branch given the number of hashes"""
       +    return ceil(log(hash_count, 2))
       +def merkle_branch_and_root(hashes, index):
       +    """Return a (merkle branch, merkle_root) pair given hashes, and the
       +    index of one of those hashes.
       +    """
       +    hashes = list(hashes)
       +    if not isinstance(index, int):
       +        raise TypeError("index must be an integer")
       +    # This also asserts hashes is not empty
       +    if not 0 <= index < len(hashes):
       +        raise ValueError("index out of range")
       +    length = branch_length(len(hashes))
       +    branch = []
       +    for _ in range(length):
       +        if len(hashes) & 1:
       +            hashes.append(hashes[-1])
       +        branch.append(hashes[index ^ 1])
       +        index >>= 1
       +        hashes = [
       +            double_sha256(hashes[n] + hashes[n + 1])
       +            for n in range(0, len(hashes), 2)
       +        ]
       +    return branch, hashes[0]
       +def merkle_branch(tx_hashes, tx_pos):
       +    """Return a merkle branch given hashes and the tx position"""
       +    branch, _root = merkle_branch_and_root(tx_hashes, tx_pos)
       +    branch = [bytes(reversed(h)).hex() for h in branch]
       +    return branch
   DIR diff --git a/obelisk/obelisk b/obelisk/obelisk
       t@@ -0,0 +1 @@
       +\ No newline at end of file
   DIR diff --git a/obelisk/protocol.py b/obelisk/protocol.py
       t@@ -0,0 +1,665 @@
       +#!/usr/bin/env python3
       +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       +# This file is part of obelisk
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU Affero General Public License version 3
       +# as published by the Free Software Foundation.
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# GNU Affero General Public License for more details.
       +# You should have received a copy of the GNU Affero General Public License
       +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       +"""Implementation of the Electrum protocol as found on
       +import asyncio
       +import json
       +from binascii import unhexlify
       +from obelisk.errors import ERRORS
       +from obelisk.merkle import merkle_branch
       +from obelisk.util import (
       +    bh2u,
       +    block_to_header,
       +    is_boolean,
       +    is_hash256_str,
       +    is_hex_str,
       +    is_non_negative_integer,
       +    safe_hexlify,
       +    sha256,
       +    double_sha256,
       +    hash_to_hex_str,
       +from obelisk.zeromq import Client
       +VERSION = "0.0"
       +SERVER_PROTO_MIN = "1.4"
       +SERVER_PROTO_MAX = "1.4.2"
       +DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz"
       +BANNER = ("""
       +Welcome to obelisk
       +"Tools for the people"
       +obelisk is a server that uses libbitcoin-server as its backend.
       +Source code can be found at: https://github.com/parazyd/obelisk
       +Please consider donating: %s
       +""" % DONATION_ADDR)
       +class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
       +    """Class implementing the Electrum protocol, with async support"""
       +    def __init__(self, log, chain, endpoints, server_cfg):
       +        self.log = log
       +        self.stopped = False
       +        self.endpoints = endpoints
       +        self.server_cfg = server_cfg
       +        self.loop = asyncio.get_event_loop()
       +        # Consider renaming bx to something else
       +        self.bx = Client(log, endpoints, self.loop)
       +        self.block_queue = None
       +        # TODO: Clean up on client disconnect
       +        self.tasks = []
       +        self.sh_subscriptions = {}
       +        if chain == "mainnet":
       +            self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
       +        elif chain == "testnet":
       +            self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
       +        else:
       +            raise ValueError(f"Invalid chain '{chain}'")
       +        # Here we map available methods to their respective functions
       +        self.methodmap = {
       +            "blockchain.block.header": self.blockchain_block_header,
       +            "blockchain.block.headers": self.blockchain_block_headers,
       +            "blockchain.estimatefee": self.blockchain_estimatefee,
       +            "blockchain.headers.subscribe": self.blockchain_headers_subscribe,
       +            "blockchain.relayfee": self.blockchain_relayfee,
       +            "blockchain.scripthash.get_balance":
       +            self.blockchain_scripthash_get_balance,
       +            "blockchain.scripthash.get_history":
       +            self.blockchain_scripthash_get_history,
       +            "blockchain.scripthash.get_mempool":
       +            self.blockchain_scripthash_get_mempool,
       +            "blockchain.scripthash.listunspent":
       +            self.blockchain_scripthash_listunspent,
       +            "blockchain.scripthash.subscribe":
       +            self.blockchain_scripthash_subscribe,
       +            "blockchain.scripthash.unsubscribe":
       +            self.blockchain_scripthash_unsubscribe,
       +            "blockchain.transaction.broadcast":
       +            self.blockchain_transaction_broadcast,
       +            "blockchain.transaction.get": self.blockchain_transaction_get,
       +            "blockchain.transaction.get_merkle":
       +            self.blockchain_transaction_get_merkle,
       +            "blockchain.transaction.id_from_pos":
       +            self.blockchain_transaction_from_pos,
       +            "mempool.get_fee_histogram": self.mempool_get_fee_histogram,
       +            "server_add_peer": self.server_add_peer,
       +            "server.banner": self.server_banner,
       +            "server.donation_address": self.server_donation_address,
       +            "server.features": self.server_features,
       +            "server.peers.subscribe": self.server_peers_subscribe,
       +            "server.ping": self.server_ping,
       +            "server.version": self.server_version,
       +        }
       +    async def stop(self):
       +        """Destructor function"""
       +        self.log.debug("ElectrumProtocol.stop()")
       +        if self.bx:
       +            unsub_pool = []
       +            for i in self.sh_subscriptions:
       +                self.log.debug("bx.unsubscribe %s", i)
       +                unsub_pool.append(self.bx.unsubscribe_scripthash(i))
       +            await asyncio.gather(*unsub_pool, return_exceptions=True)
       +            await self.bx.stop()
       +        # idxs = []
       +        # for task in self.tasks:
       +        # idxs.append(self.tasks.index(task))
       +        # task.cancel()
       +        # for i in idxs:
       +        # del self.tasks[i]
       +        self.stopped = True
       +    async def recv(self, reader, writer):
       +        """Loop ran upon a connection which acts as a JSON-RPC handler"""
       +        recv_buf = bytearray()
       +        while not self.stopped:
       +            data = await reader.read(4096)
       +            if not data or len(data) == 0:
       +                self.log.debug("Received EOF, disconnect")
       +                # TODO: cancel asyncio tasks for this client here?
       +                return
       +            recv_buf.extend(data)
       +            lb = recv_buf.find(b"\n")
       +            if lb == -1:
       +                continue
       +            while lb != -1:
       +                line = recv_buf[:lb].rstrip()
       +                recv_buf = recv_buf[lb + 1:]
       +                lb = recv_buf.find(b"\n")
       +                try:
       +                    line = line.decode("utf-8")
       +                    query = json.loads(line)
       +                except (UnicodeDecodeError, json.JSONDecodeError) as err:
       +                    self.log.debug("Got error: %s", repr(err))
       +                    break
       +                self.log.debug("=> " + line)
       +                await self.handle_query(writer, query)
       +    async def _send_notification(self, writer, method, params):
       +        """Send JSON-RPC notification to given writer"""
       +        response = {"jsonrpc": "2.0", "method": method, "params": params}
       +        self.log.debug("<= %s", response)
       +        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       +        await writer.drain()
       +    async def _send_response(self, writer, result, nid):
       +        """Send successful JSON-RPC response to given writer"""
       +        response = {"jsonrpc": "2.0", "result": result, "id": nid}
       +        self.log.debug("<= %s", response)
       +        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       +        await writer.drain()
       +    async def _send_error(self, writer, error, nid):
       +        """Send JSON-RPC error to given writer"""
       +        response = {"jsonrpc": "2.0", "error": error, "id": nid}
       +        self.log.debug("<= %s", response)
       +        writer.write(json.dumps(response).encode("utf-8") + b"\n")
       +        await writer.drain()
       +    async def _send_reply(self, writer, resp, query):
       +        """Wrap function for sending replies"""
       +        if "error" in resp:
       +            return await self._send_error(writer, resp["error"], query["id"])
       +        return await self._send_response(writer, resp["result"], query["id"])
       +    async def handle_query(self, writer, query):  # pylint: disable=R0915,R0912,R0911
       +        """Electrum protocol method handler mapper"""
       +        if "method" not in query:
       +            self.log.debug("No 'method' in query: %s", query)
       +            return
       +        if "id" not in query:
       +            self.log.debug("No 'id' in query: %s", query)
       +            return
       +        method = query["method"]
       +        func = self.methodmap.get(method)
       +        if not func:
       +            self.log.error("Unhandled method %s, query=%s", method, query)
       +            return await self._send_reply(writer, ERRORS["nomethod"], query)
       +        resp = await func(writer, query)
       +        return await self._send_reply(writer, resp, query)
       +    async def blockchain_block_header(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.block.header
       +        Return the block header at the given height.
       +        """
       +        if "params" not in query or len(query["params"]) < 1:
       +            return ERRORS["invalidparams"]
       +        # TODO: cp_height
       +        index = query["params"][0]
       +        cp_height = query["params"][1] if len(query["params"]) == 2 else 0
       +        if not is_non_negative_integer(index):
       +            return ERRORS["invalidparams"]
       +        if not is_non_negative_integer(cp_height):
       +            return ERRORS["invalidparams"]
       +        _ec, data = await self.bx.fetch_block_header(index)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        return {"result": safe_hexlify(data)}
       +    async def blockchain_block_headers(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.block.headers
       +        Return a concatenated chunk of block headers from the main chain.
       +        """
       +        if "params" not in query or len(query["params"]) < 2:
       +            return ERRORS["invalidparams"]
       +        # Electrum doesn't allow max_chunk_size to be less than 2016
       +        # gopher://bitreich.org/9/memecache/convenience-store.mkv
       +        # TODO: cp_height
       +        max_chunk_size = 2016
       +        start_height = query["params"][0]
       +        count = query["params"][1]
       +        if not is_non_negative_integer(start_height):
       +            return ERRORS["invalidparams"]
       +        if not is_non_negative_integer(count):
       +            return ERRORS["invalidparams"]
       +        count = min(count, max_chunk_size)
       +        headers = bytearray()
       +        for i in range(count):
       +            _ec, data = await self.bx.fetch_block_header(i)
       +            if _ec and _ec != 0:
       +                self.log.debug("Got error: %s", repr(_ec))
       +                return ERRORS["internalerror"]
       +            headers.extend(data)
       +        resp = {
       +            "hex": safe_hexlify(headers),
       +            "count": len(headers) // 80,
       +            "max": max_chunk_size,
       +        }
       +        return {"result": resp}
       +    async def blockchain_estimatefee(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.estimatefee
       +        Return the estimated transaction fee per kilobyte for a transaction
       +        to be confirmed within a certain number of blocks.
       +        """
       +        # TODO: Help wanted
       +        return {"result": -1}
       +    async def header_notifier(self, writer):
       +        self.block_queue = asyncio.Queue()
       +        await self.bx.subscribe_to_blocks(self.block_queue)
       +        while True:
       +            # item = (seq, height, block_data)
       +            item = await self.block_queue.get()
       +            if len(item) != 3:
       +                self.log.debug("error: item from block queue len != 3")
       +                continue
       +            header = block_to_header(item[2])
       +            params = [{"height": item[1], "hex": safe_hexlify(header)}]
       +            await self._send_notification(writer,
       +                                          "blockchain.headers.subscribe",
       +                                          params)
       +    async def blockchain_headers_subscribe(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.headers.subscribe
       +        Subscribe to receive block headers when a new block is found.
       +        """
       +        # Tip height and header are returned upon request
       +        _ec, height = await self.bx.fetch_last_height()
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        _ec, tip_header = await self.bx.fetch_block_header(height)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        self.tasks.append(asyncio.create_task(self.header_notifier(writer)))
       +        ret = {"height": height, "hex": safe_hexlify(tip_header)}
       +        return {"result": ret}
       +    async def blockchain_relayfee(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.relayfee
       +        Return the minimum fee a low-priority transaction must pay in order
       +        to be accepted to the daemon’s memory pool.
       +        """
       +        # TODO: Help wanted
       +        return {"result": 0.00001}
       +    async def blockchain_scripthash_get_balance(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.scripthash.get_balance
       +        Return the confirmed and unconfirmed balances of a script hash.
       +        """
       +        if "params" not in query or len(query["params"]) != 1:
       +            return ERRORS["invalidparams"]
       +        if not is_hash256_str(query["params"][0]):
       +            return ERRORS["invalidparams"]
       +        _ec, data = await self.bx.fetch_balance(query["params"][0])
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        # TODO: confirmed/unconfirmed, see what's happening in libbitcoin
       +        ret = {"confirmed": data, "unconfirmed": 0}
       +        return {"result": ret}
       +    async def blockchain_scripthash_get_history(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.scripthash.get_history
       +        Return the confirmed and unconfirmed history of a script hash.
       +        """
       +        if "params" not in query or len(query["params"]) != 1:
       +            return ERRORS["invalidparams"]
       +        if not is_hash256_str(query["params"][0]):
       +            return ERRORS["invalidparams"]
       +        _ec, data = await self.bx.fetch_history4(query["params"][0])
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        self.log.debug("hist: %s", data)
       +        ret = []
       +        # TODO: mempool
       +        for i in data:
       +            if "received" in i:
       +                ret.append({
       +                    "height": i["received"]["height"],
       +                    "tx_hash": hash_to_hex_str(i["received"]["hash"]),
       +                })
       +            if "spent" in i:
       +                ret.append({
       +                    "height": i["spent"]["height"],
       +                    "tx_hash": hash_to_hex_str(i["spent"]["hash"]),
       +                })
       +        return {"result": ret}
       +    async def blockchain_scripthash_get_mempool(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.scripthash.get_mempool
       +        Return the unconfirmed transactions of a script hash.
       +        """
       +        return
       +    async def blockchain_scripthash_listunspent(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.scripthash.listunspent
       +        Return an ordered list of UTXOs sent to a script hash.
       +        """
       +        if "params" not in query or len(query["params"]) != 1:
       +            return ERRORS["invalidparams"]
       +        scripthash = query["params"][0]
       +        if not is_hash256_str(scripthash):
       +            return ERRORS["invalidparams"]
       +        _ec, utxo = await self.bx.fetch_utxo(scripthash)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        # TODO: Check mempool
       +        ret = []
       +        for i in utxo:
       +            rec = i["received"]
       +            ret.append({
       +                "tx_pos": rec["index"],
       +                "value": i["value"],
       +                "tx_hash": hash_to_hex_str(rec["hash"]),
       +                "height": rec["height"],
       +            })
       +        return {"result": ret}
       +    async def scripthash_notifier(self, writer, scripthash):
       +        # TODO: Figure out how this actually works
       +        _ec, sh_queue = await self.bx.subscribe_scripthash(scripthash)
       +        if _ec and _ec != 0:
       +            self.log.error("bx.subscribe_scripthash failed:", repr(_ec))
       +            return
       +        while True:
       +            # item = (seq, height, block_data)
       +            item = await sh_queue.get()
       +            self.log.debug("sh_subscription item: %s", item)
       +    async def blockchain_scripthash_subscribe(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.scripthash.subscribe
       +        Subscribe to a script hash.
       +        """
       +        if "params" not in query or len(query["params"]) != 1:
       +            return ERRORS["invalidparamas"]
       +        scripthash = query["params"][0]
       +        if not is_hash256_str(scripthash):
       +            return ERRORS["invalidparams"]
       +        _ec, history = await self.bx.fetch_history4(scripthash)
       +        if _ec and _ec != 0:
       +            return ERRORS["internalerror"]
       +        task = asyncio.create_task(self.scripthash_notifier(
       +            writer, scripthash))
       +        self.sh_subscriptions[scripthash] = {"task": task}
       +        if len(history) < 1:
       +            return {"result": None}
       +        # TODO: Check how history4 acts for mempool/unconfirmed
       +        status = []
       +        for i in history:
       +            if "received" in i:
       +                status.append((
       +                    hash_to_hex_str(i["received"]["hash"]),
       +                    i["received"]["height"],
       +                ))
       +            if "spent" in i:
       +                status.append((
       +                    hash_to_hex_str(i["spent"]["hash"]),
       +                    i["spent"]["height"],
       +                ))
       +        self.sh_subscriptions[scripthash]["status"] = status
       +        return {"result": ElectrumProtocol.__scripthash_status(status)}
       +    @staticmethod
       +    def __scripthash_status(status):
       +        concat = ""
       +        for txid, height in status:
       +            concat += txid + ":%d:" % height
       +        return bh2u(sha256(concat.encode("ascii")))
       +    async def blockchain_scripthash_unsubscribe(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.scripthash.unsubscribe
       +        Unsubscribe from a script hash, preventing future notifications
       +        if its status changes.
       +        """
       +        if "params" not in query or len(query["params"]) != 1:
       +            return ERRORS["invalidparams"]
       +        scripthash = query["params"][0]
       +        if not is_hash256_str(scripthash):
       +            return ERRORS["invalidparams"]
       +        if scripthash in self.sh_subscriptions:
       +            self.sh_subscriptions[scripthash]["task"].cancel()
       +            await self.bx.unsubscribe_scripthash(scripthash)
       +            del self.sh_subscriptions[scripthash]
       +            return {"result": True}
       +        return {"result": False}
       +    async def blockchain_transaction_broadcast(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.transaction.broadcast
       +        Broadcast a transaction to the network.
       +        """
       +        # Note: Not yet implemented in bs v4
       +        if "params" not in query or len(query["params"]) != 1:
       +            return ERRORS["invalidparams"]
       +        hextx = query["params"][0]
       +        if not is_hex_str(hextx):
       +            return ERRORS["invalidparams"]
       +        _ec, _ = await self.bx.broadcast_transaction(hextx)
       +        if _ec and _ec != 0:
       +            return ERRORS["internalerror"]
       +        rawtx = unhexlify(hextx)
       +        txid = double_sha256(rawtx)
       +        return {"result": hash_to_hex_str(txid)}
       +    async def blockchain_transaction_get(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.transaction.get
       +        Return a raw transaction.
       +        """
       +        if "params" not in query or len(query["params"]) < 1:
       +            return ERRORS["invalidparams"]
       +        tx_hash = query["params"][0]
       +        verbose = query["params"][1] if len(query["params"]) > 1 else False
       +        # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash)
       +        _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        # Behaviour is undefined in spec
       +        if not rawtx:
       +            return {"result": None}
       +        if verbose:
       +            # TODO: Help needed
       +            return ERRORS["invalidrequest"]
       +        return {"result": bh2u(rawtx)}
       +    async def blockchain_transaction_get_merkle(self, writer, query):  # pylint: disable=W0613
       +        """Method: blockchain.transaction.get_merkle
       +        Return the merkle branch to a confirmed transaction given its
       +        hash and height.
       +        """
       +        if "params" not in query or len(query["params"]) != 2:
       +            return ERRORS["invalidparams"]
       +        tx_hash = query["params"][0]
       +        height = query["params"][1]
       +        if not is_hash256_str(tx_hash):
       +            return ERRORS["invalidparams"]
       +        if not is_non_negative_integer(height):
       +            return ERRORS["invalidparams"]
       +        _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        # Decouple from tuples
       +        hashes = [i[0] for i in hashes]
       +        tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
       +        branch = merkle_branch(hashes, tx_pos)
       +        res = {
       +            "block_height": int(height),
       +            "pos": int(tx_pos),
       +            "merkle": branch,
       +        }
       +        return {"result": res}
       +    async def blockchain_transaction_from_pos(self, writer, query):  # pylint: disable=R0911,W0613
       +        """Method: blockchain.transaction.id_from_pos
       +        Return a transaction hash and optionally a merkle proof, given a
       +        block height and a position in the block.
       +        """
       +        if "params" not in query or len(query["params"]) < 2:
       +            return ERRORS["invalidparams"]
       +        height = query["params"][0]
       +        tx_pos = query["params"][1]
       +        merkle = query["params"][2] if len(query["params"]) > 2 else False
       +        if not is_non_negative_integer(height):
       +            return ERRORS["invalidparams"]
       +        if not is_non_negative_integer(tx_pos):
       +            return ERRORS["invalidparams"]
       +        if not is_boolean(merkle):
       +            return ERRORS["invalidparams"]
       +        _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
       +        if _ec and _ec != 0:
       +            self.log.debug("Got error: %s", repr(_ec))
       +            return ERRORS["internalerror"]
       +        if len(hashes) - 1 < tx_pos:
       +            return ERRORS["internalerror"]
       +        # Decouple from tuples
       +        hashes = [i[0] for i in hashes]
       +        txid = hash_to_hex_str(hashes[tx_pos])
       +        if not merkle:
       +            return {"result": txid}
       +        branch = merkle_branch(hashes, tx_pos)
       +        return {"result": {"tx_hash": txid, "merkle": branch}}
       +    async def mempool_get_fee_histogram(self, writer, query):  # pylint: disable=W0613
       +        """Method: mempool.get_fee_histogram
       +        Return a histogram of the fee rates paid by transactions in the
       +        memory pool, weighted by transaction size.
       +        """
       +        # TODO: Help wanted
       +        return {"result": [[0, 0]]}
       +    async def server_add_peer(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.add_peer
       +        A newly-started server uses this call to get itself into other
       +        servers’ peers lists. It should not be used by wallet clients.
       +        """
       +        # TODO: Help wanted
       +        return {"result": False}
       +    async def server_banner(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.banner
       +        Return a banner to be shown in the Electrum console.
       +        """
       +        return {"result": BANNER}
       +    async def server_donation_address(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.donation_address
       +        Return a server donation address.
       +        """
       +        return {"result": DONATION_ADDR}
       +    async def server_features(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.features
       +        Return a list of features and services supported by the server.
       +        """
       +        cfg = self.server_cfg
       +        return {
       +            "result": {
       +                "genesis_hash": self.genesis,
       +                "hosts": {
       +                    cfg["server_hostname"]: {
       +                        "tcp_port": cfg["server_port"],
       +                        "ssl_port": None,
       +                    },
       +                },
       +                "protocol_max": SERVER_PROTO_MAX,
       +                "protocol_min": SERVER_PROTO_MIN,
       +                "pruning": None,
       +                "server_version": f"obelisk {VERSION}",
       +                "hash_function": "sha256",
       +            }
       +        }
       +    async def server_peers_subscribe(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.peers.subscribe
       +        Return a list of peer servers. Despite the name this is not a
       +        subscription and the server must send no notifications.
       +        """
       +        # TODO: Help wanted
       +        return {"result": []}
       +    async def server_ping(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.ping
       +        Ping the server to ensure it is responding, and to keep the session
       +        alive. The server may disconnect clients that have sent no requests
       +        for roughly 10 minutes.
       +        """
       +        return {"result": None}
       +    async def server_version(self, writer, query):  # pylint: disable=W0613
       +        """Method: server.version
       +        Identify the client to the server and negotiate the protocol version.
       +        """
       +        if "params" not in query or len(query["params"]) != 2:
       +            return ERRORS["invalidparams"]
       +        client_ver = query["params"][1]
       +        if isinstance(client_ver, list):
       +            client_min, client_max = client_ver[0], client_ver[1]
       +        else:
       +            client_min = client_max = client_ver
       +        version = min(client_max, SERVER_PROTO_MAX)
       +        if version < max(client_min, SERVER_PROTO_MIN):
       +            return ERRORS["protonotsupported"]
       +        return {"result": [f"obelisk {VERSION}", version]}
   DIR diff --git a/electrumobelisk/util.py b/obelisk/util.py
   DIR diff --git a/obelisk/zeromq.py b/obelisk/zeromq.py
       t@@ -0,0 +1,477 @@
       +#!/usr/bin/env python3
       +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       +# This file is part of obelisk
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU Affero General Public License version 3
       +# as published by the Free Software Foundation.
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# GNU Affero General Public License for more details.
       +# You should have received a copy of the GNU Affero General Public License
       +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       +"""ZeroMQ implementation for libbitcoin"""
       +import asyncio
       +import functools
       +import struct
       +from binascii import unhexlify
       +from random import randint
       +import zmq
       +import zmq.asyncio
       +from obelisk.libbitcoin_errors import make_error_code, ErrorCode
       +from obelisk.util import bh2u
       +def create_random_id():
       +    """Generate a random request ID"""
       +    max_uint32 = 4294967295
       +    return randint(0, max_uint32)
       +def pack_block_index(index):
       +    """struct.pack given index"""
       +    if isinstance(index, str):
       +        index = unhexlify(index)
       +        assert len(index) == 32
       +        return index
       +    if isinstance(index, int):
       +        return struct.pack("<I", index)
       +    raise ValueError(
       +        f"Unknown index type {type(index)} v:{index}, should be int or bytearray"
       +    )
       +def to_int(xbytes):
       +    """Make little-endian integer from given bytes"""
       +    return int.from_bytes(xbytes, byteorder="little")
       +def checksum(xhash, index):
       +    """
       +    This method takes a transaction hash and an index and returns a checksum.
       +    This checksum is based on 49 bits starting from the 12th byte of the
       +    reversed hash. Combined with the last 15 bits of the 4 byte index.
       +    """
       +    mask = 0xFFFFFFFFFFFF8000
       +    magic_start_position = 12
       +    hash_bytes = bytes.fromhex(xhash)[::-1]
       +    last_20_bytes = hash_bytes[magic_start_position:]
       +    assert len(hash_bytes) == 32
       +    assert index < 2**32
       +    hash_upper_49_bits = to_int(last_20_bytes) & mask
       +    index_lower_15_bits = index & ~mask
       +    return hash_upper_49_bits | index_lower_15_bits
       +def unpack_table(row_fmt, data):
       +    """Function to unpack table received from libbitcoin"""
       +    # Get the number of rows
       +    row_size = struct.calcsize(row_fmt)
       +    nrows = len(data) // row_size
       +    # Unpack
       +    rows = []
       +    for idx in range(nrows):
       +        offset = idx * row_size
       +        row = struct.unpack_from(row_fmt, data, offset)
       +        rows.append(row)
       +    return rows
       +class ClientSettings:
       +    """Class implementing ZMQ client settings"""
       +    def __init__(self, timeout=10, context=None, loop=None):
       +        self._timeout = timeout
       +        self._context = context
       +        self._loop = loop
       +    @property
       +    def context(self):
       +        """ZMQ context property"""
       +        if not self._context:
       +            ctx = zmq.asyncio.Context()
       +            ctx.linger = 500  # in milliseconds
       +            self._context = ctx
       +        return self._context
       +    @context.setter
       +    def context(self, context):
       +        self._context = context
       +    @property
       +    def timeout(self):
       +        """Set to None for no timeout"""
       +        return self._timeout
       +    @timeout.setter
       +    def timeout(self, timeout):
       +        self._timeout = timeout
       +class Request:
       +    """Class implementing a _send_ request.
       +    This is either a simple request/response affair or a subscription.
       +    """
       +    def __init__(self, socket, command, data):
       +        self.id_ = create_random_id()
       +        self.socket = socket
       +        self.command = command
       +        self.data = data
       +        self.future = asyncio.Future()
       +        self.queue = None
       +    async def send(self):
       +        """Send the ZMQ request"""
       +        request = [self.command, struct.pack("<I", self.id_), self.data]
       +        await self.socket.send_multipart(request)
       +    def is_subscription(self):
       +        """If the request is a subscription, then the response to this
       +        request is a notification.
       +        """
       +        return self.queue is not None
       +    def __str__(self):
       +        return "Request(command, ID) {}, {:d}".format(self.command, self.id_)
       +class InvalidServerResponseException(Exception):
       +    """Exception for invalid server responses"""
       +class Response:
       +    """Class implementing a request response"""
       +    def __init__(self, frame):
       +        if len(frame) != 3:
       +            raise InvalidServerResponseException(
       +                f"Length of the frame was not 3: {len(frame)}")
       +        self.command = frame[0]
       +        self.request_id = struct.unpack("<I", frame[1])[0]
       +        error_code = struct.unpack("<I", frame[2][:4])[0]
       +        self.error_code = make_error_code(error_code)
       +        self.data = frame[2][4:]
       +    def is_bound_for_queue(self):
       +        return len(self.data) > 0
       +    def __str__(self):
       +        return (
       +            "Response(command, request ID, error code, data):" +
       +            f" {self.command}, {self.request_id}, {self.error_code}, {self.data}"
       +        )
       +class RequestCollection:
       +    """RequestCollection carries a list of Requests and matches incoming
       +    responses to them.
       +    """
       +    def __init__(self, socket, loop):
       +        self._socket = socket
       +        self._requests = {}
       +        self._task = asyncio.ensure_future(self._run(), loop=loop)
       +    async def _run(self):
       +        while True:
       +            await self._receive()
       +    async def stop(self):
       +        """Stops listening for incoming responses (or subscription) messages.
       +        Returns the number of _responses_ expected but which are now dropped
       +        on the floor.
       +        """
       +        self._task.cancel()
       +        try:
       +            await self._task
       +        except asyncio.CancelledError:
       +            return len(self._requests)
       +    async def _receive(self):
       +        frame = await self._socket.recv_multipart()
       +        response = Response(frame)
       +        if response.request_id in self._requests:
       +            self._handle_response(response)
       +        else:
       +            print(
       +                f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"  # pylint: disable=C0301
       +            )
       +    def _handle_response(self, response):
       +        request = self._requests[response.request_id]
       +        if request.is_subscription():
       +            if response.is_bound_for_queue():
       +                # TODO: decode the data into something usable
       +                request.queue.put_nowait(response.data)
       +            else:
       +                request.future.set_result(response)
       +        else:
       +            self.delete_request(request)
       +            request.future.set_result(response)
       +    def add_request(self, request):
       +        # TODO: we should maybe check if the request.id_ is unique
       +        self._requests[request.id_] = request
       +    def delete_request(self, request):
       +        del self._requests[request.id_]
       +class Client:
       +    """This class represents a connection to a libbitcoin server."""
       +    def __init__(self, log, endpoints, loop):
       +        self.log = log
       +        self._endpoints = endpoints
       +        self._settings = ClientSettings(loop=loop)
       +        self._query_socket = self._create_query_socket()
       +        self._block_socket = self._create_block_socket()
       +        self._request_collection = RequestCollection(self._query_socket,
       +                                                     self._settings._loop)
       +    async def stop(self):
       +        self.log.debug("zmq Client.stop()")
       +        self._query_socket.close()
       +        self._block_socket.close()
       +        return await self._request_collection.stop()
       +    def _create_block_socket(self):
       +        socket = self._settings.context.socket(
       +            zmq.SUB,  # pylint: disable=E1101
       +            io_loop=self._settings._loop,  # pylint: disable=W0212
       +        )
       +        socket.connect(self._endpoints["block"])
       +        socket.setsockopt_string(zmq.SUBSCRIBE, "")  # pylint: disable=E1101
       +        return socket
       +    def _create_query_socket(self):
       +        socket = self._settings.context.socket(
       +            zmq.DEALER,  # pylint: disable=E1101
       +            io_loop=self._settings._loop,  # pylint: disable=W0212
       +        )
       +        socket.connect(self._endpoints["query"])
       +        return socket
       +    async def _subscription_request(self, command, data):
       +        request = await self._request(command, data)
       +        request.queue = asyncio.Queue(loop=self._settings._loop)  # pylint: disable=W0212
       +        error_code, _ = await self._wait_for_response(request)
       +        return error_code, request.queue
       +    async def _simple_request(self, command, data):
       +        return await self._wait_for_response(await
       +                                             self._request(command, data))
       +    async def _request(self, command, data):
       +        """Make a generic request. Both options are byte objects specified
       +        like b'blockchain.fetch_block_header' as an example.
       +        """
       +        request = Request(self._query_socket, command, data)
       +        await request.send()
       +        self._request_collection.add_request(request)
       +        return request
       +    async def _wait_for_response(self, request):
       +        try:
       +            response = await asyncio.wait_for(request.future,
       +                                              self._settings.timeout)
       +        except asyncio.TimeoutError:
       +            self._request_collection.delete_request(request)
       +            return ErrorCode.channel_timeout, None
       +        assert response.command == request.command
       +        assert response.request_id == request.id_
       +        return response.error_code, response.data
       +    async def fetch_last_height(self):
       +        """Fetch the blockchain tip and return integer height"""
       +        command = b"blockchain.fetch_last_height"
       +        error_code, data = await self._simple_request(command, b"")
       +        if error_code:
       +            return error_code, None
       +        return error_code, struct.unpack("<I", data)[0]
       +    async def fetch_block_header(self, index):
       +        """Fetch a block header by its height or integer index"""
       +        command = b"blockchain.fetch_block_header"
       +        data = pack_block_index(index)
       +        return await self._simple_request(command, data)
       +    async def fetch_block_transaction_hashes(self, index):
       +        """Fetch transaction hashes in a block at height index"""
       +        command = b"blockchain.fetch_block_transaction_hashes"
       +        data = pack_block_index(index)
       +        error_code, data = await self._simple_request(command, data)
       +        if error_code:
       +            return error_code, None
       +        return error_code, unpack_table("32s", data)
       +    async def fetch_blockchain_transaction(self, txid):
       +        """Fetch transaction by txid (not including mempool)"""
       +        command = b"blockchain.fetch_transaction2"
       +        error_code, data = await self._simple_request(
       +            command,
       +            bytes.fromhex(txid)[::-1])
       +        if error_code:
       +            return error_code, None
       +        return error_code, data
       +    async def fetch_mempool_transaction(self, txid):
       +        """Fetch transaction by txid (including mempool)"""
       +        command = b"transaction_pool.fetch_transaction2"
       +        error_code, data = await self._simple_request(
       +            command,
       +            bytes.fromhex(txid)[::-1])
       +        if error_code:
       +            return error_code, None
       +        return error_code, data
       +    async def subscribe_scripthash(self, scripthash):
       +        """Subscribe to scripthash"""
       +        command = b"subscribe.key"
       +        decoded_address = unhexlify(scripthash)
       +        return await self._subscription_request(command, decoded_address)
       +    async def unsubscribe_scripthash(self, scripthash):
       +        """Unsubscribe scripthash"""
       +        # TODO: This call should ideally also remove the subscription
       +        # request from the RequestCollection.
       +        # This call solicits a final call from the server with an
       +        # `error::service_stopped` error code.
       +        command = b"unsubscribe.key"
       +        decoded_address = unhexlify(scripthash)
       +        return await self._simple_request(command, decoded_address)
       +    async def fetch_history4(self, scripthash, height=0):
       +        """Fetch history for given scripthash"""
       +        command = b"blockchain.fetch_history4"
       +        decoded_address = unhexlify(scripthash)
       +        error_code, raw_points = await self._simple_request(
       +            command, decoded_address + struct.pack("<I", height))
       +        if error_code:
       +            return error_code, None
       +        def make_tuple(row):
       +            kind, height, tx_hash, index, value = row
       +            return (
       +                kind,
       +                {
       +                    "hash": tx_hash,
       +                    "index": index
       +                },
       +                height,
       +                value,
       +                checksum(tx_hash[::-1].hex(), index),
       +            )
       +        rows = unpack_table("<BI32sIQ", raw_points)
       +        points = [make_tuple(row) for row in rows]
       +        correlated_points = Client.__correlate(points)
       +        # self.log.debug("history points: %s", points)
       +        # self.log.debug("history correlated: %s", correlated_points)
       +        return error_code, self._sort_correlated_points(correlated_points)
       +    @staticmethod
       +    def _sort_correlated_points(points):
       +        """Sort by ascending height"""
       +        if len(points) < 2:
       +            return points
       +        return sorted(points, key=lambda x: list(x.values())[0]["height"])
       +    async def broadcast_transaction(self, rawtx):
       +        """Broadcast given raw transaction"""
       +        command = b"transaction_pool.broadcast"
       +        return await self._simple_request(command, rawtx)
       +    async def fetch_balance(self, scripthash):
       +        """Fetch balance for given scripthash"""
       +        error_code, history = await self.fetch_history4(scripthash)
       +        if error_code:
       +            return error_code, None
       +        utxo = Client.__receives_without_spends(history)
       +        return error_code, functools.reduce(
       +            lambda accumulator, point: accumulator + point["value"], utxo, 0)
       +    async def fetch_utxo(self, scripthash):
       +        """Find UTXO for given scripthash"""
       +        error_code, history = await self.fetch_history4(scripthash)
       +        if error_code:
       +            return error_code, None
       +        return error_code, Client.__receives_without_spends(history)
       +    async def subscribe_to_blocks(self, queue):
       +        asyncio.ensure_future(self._listen_for_blocks(queue))
       +        return queue
       +    async def _listen_for_blocks(self, queue):
       +        """Infinite loop for block subscription.
       +        Returns raw blocks as they're received.
       +        """
       +        while True:
       +            frame = await self._block_socket.recv_multipart()
       +            seq = struct.unpack("<H", frame[0])[0]
       +            height = struct.unpack("<I", frame[1])[0]
       +            block_data = frame[2]
       +            queue.put_nowait((seq, height, block_data))
       +    @staticmethod
       +    def __receives_without_spends(history):
       +        return (point for point in history if "spent" not in point)
       +    @staticmethod
       +    def __correlate(points):
       +        transfers, checksum_to_index = Client.__find_receives(points)
       +        transfers = Client.__correlate_spends_to_receives(
       +            points, transfers, checksum_to_index)
       +        return transfers
       +    @staticmethod
       +    def __correlate_spends_to_receives(points, transfers, checksum_to_index):
       +        for point in points:
       +            if point[0] == 1:  # receive
       +                continue
       +            spent = {
       +                "hash": point[1]["hash"],
       +                "height": point[2],
       +                "index": point[1]["index"],
       +            }
       +            if point[3] not in checksum_to_index:
       +                transfers.append({"spent": spent})
       +            else:
       +                transfers[checksum_to_index[point[3]]]["spent"] = spent
       +        return transfers
       +    @staticmethod
       +    def __find_receives(points):
       +        transfers = []
       +        checksum_to_index = {}
       +        for point in points:
       +            if point[0] == 0:  # spent
       +                continue
       +            transfers.append({
       +                "received": {
       +                    "hash": point[1]["hash"],
       +                    "height": point[2],
       +                    "index": point[1]["index"],
       +                },
       +                "value": point[3],
       +            })
       +            checksum_to_index[point[4]] = len(transfers) - 1
       +        return transfers, checksum_to_index
   DIR diff --git a/run_obelisk b/run_obelisk
       t@@ -0,0 +1,109 @@
       +#!/usr/bin/env python3
       +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       +# This file is part of obelisk
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU Affero General Public License version 3
       +# as published by the Free Software Foundation.
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# GNU Affero General Public License for more details.
       +# You should have received a copy of the GNU Affero General Public License
       +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       +import asyncio
       +import sys
       +from argparse import ArgumentParser
       +from configparser import RawConfigParser, NoSectionError
       +from logging import getLogger, FileHandler, Formatter, StreamHandler
       +from os import devnull
       +from obelisk.protocol import ElectrumProtocol, VERSION
       +# Used for destructor/cleanup
       +PROTOCOL = None
       +def logger_config(log, config):
       +    """Setup logging"""
       +    fmt = Formatter("%(asctime)s\t%(levelname)s\t%(message)s")
       +    logstream = StreamHandler()
       +    logstream.setFormatter(fmt)
       +    debuglevel = config.get("obelisk", "log_level", fallback="INFO")
       +    logstream.setLevel(debuglevel)
       +    log.addHandler(logstream)
       +    filename = config.get("obelisk", "log_file", fallback=devnull)
       +    append_log = config.getboolean("obelisk", "append_log", fallback=False)
       +    logfile = FileHandler(filename, mode=("a" if append_log else "w"))
       +    logfile.setFormatter(fmt)
       +    logfile.setLevel(debuglevel)
       +    log.addHandler(logfile)
       +    log.setLevel(debuglevel)
       +    return log, filename
       +async def run_electrum_server(config, chain):
       +    """Server coroutine"""
       +    log = getLogger("obelisk")
       +    host = config.get("obelisk", "host")
       +    port = int(config.get("obelisk", "port"))
       +    endpoints = {}
       +    endpoints["query"] = config.get("obelisk", "query")
       +    endpoints["heart"] = config.get("obelisk", "heart")
       +    endpoints["block"] = config.get("obelisk", "block")
       +    endpoints["trans"] = config.get("obelisk", "trans")
       +    server_cfg = {}
       +    server_cfg["server_hostname"] = "localhost"  # TODO: <- should be public?
       +    server_cfg["server_port"] = port
       +    global PROTOCOL
       +    PROTOCOL = ElectrumProtocol(log, chain, endpoints, server_cfg)
       +    server = await asyncio.start_server(PROTOCOL.recv, host, port)
       +    async with server:
       +        await server.serve_forever()
       +def main():
       +    """Main orchestration"""
       +    parser = ArgumentParser(description=f"obelisk {VERSION}")
       +    parser.add_argument("config_file", help="Path to config file")
       +    args = parser.parse_args()
       +    try:
       +        config = RawConfigParser()
       +        config.read(args.config_file)
       +        config.options("obelisk")
       +    except NoSectionError:
       +        print(f"error: Invalid config file {args.config_file}")
       +        return 1
       +    log = getLogger("obelisk")
       +    log, logfilename = logger_config(log, config)
       +    log.info(f"Starting obelisk {VERSION}")
       +    log.info(f"Logging to {logfilename}")
       +    chain = config.get("obelisk", "chain")
       +    if chain not in ("mainnet", "testnet"):
       +        log.error("chain is not 'mainnet' or 'testnet'")
       +        return 1
       +    try:
       +        asyncio.run(run_electrum_server(config, chain))
       +    except KeyboardInterrupt:
       +        print("\r", end="")
       +        log.debug("Caught KeyboardInterrupt, exiting...")
       +        if PROTOCOL:
       +            asyncio.run(PROTOCOL.stop())
       +        return 0
       +    return 1
       +if __name__ == "__main__":
       +    sys.exit(main())