URI: 
       tprotocol.py - obelisk - Electrum server using libbitcoin as its backend
  HTML git clone https://git.parazyd.org/obelisk
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tprotocol.py (31114B)
       ---
            1 #!/usr/bin/env python3
            2 # Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
            3 #
            4 # This file is part of obelisk
            5 #
            6 # This program is free software: you can redistribute it and/or modify
            7 # it under the terms of the GNU Affero General Public License version 3
            8 # as published by the Free Software Foundation.
            9 #
           10 # This program is distributed in the hope that it will be useful,
           11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
           12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
           13 # GNU Affero General Public License for more details.
           14 #
           15 # You should have received a copy of the GNU Affero General Public License
           16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
           17 """Implementation of the Electrum protocol as found on
           18 https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html
           19 """
           20 import asyncio
           21 import json
           22 import struct
           23 from binascii import unhexlify
           24 from traceback import print_exc
           25 
           26 from obelisk.errors_jsonrpc import JsonRPCError
           27 from obelisk.errors_libbitcoin import ZMQError
           28 from obelisk.mempool_api import get_mempool_fee_estimates
           29 from obelisk.merkle import merkle_branch, merkle_branch_and_root
           30 from obelisk.util import (
           31     bh2u,
           32     block_to_header,
           33     is_boolean,
           34     is_hash256_str,
           35     is_hex_str,
           36     is_non_negative_integer,
           37     safe_hexlify,
           38     sha256,
           39     double_sha256,
           40     hash_to_hex_str,
           41 )
           42 from obelisk.zeromq import Client
           43 
           44 VERSION = "0.0"
           45 SERVER_PROTO_MIN = "1.4"
           46 SERVER_PROTO_MAX = "1.4.2"
           47 DONATION_ADDR = "bc1q7an9p5pz6pjwjk4r48zke2yfaevafzpglg26mz"
           48 
           49 BANNER = ("""
           50 Welcome to obelisk
           51 
           52 "Tools for the people"
           53 
           54 obelisk is a server that uses libbitcoin-server as its backend.
           55 Source code can be found at: https://github.com/parazyd/obelisk
           56 
           57 Please consider donating: %s
           58 """ % DONATION_ADDR)
           59 
           60 
           61 class ElectrumProtocol(asyncio.Protocol):  # pylint: disable=R0904,R0902
           62     """Class implementing the Electrum protocol, with async support"""
           63 
           64     def __init__(self, log, chain, endpoints, server_cfg):
           65         self.log = log
           66         self.stopped = False
           67         self.endpoints = endpoints
           68         self.server_cfg = server_cfg
           69         self.loop = asyncio.get_event_loop()
           70         self.bx = Client(log, endpoints, self.loop)
           71         self.block_queue = None
           72         self.peers = {}
           73 
           74         self.chain = chain
           75         if self.chain == "mainnet":  # pragma: no cover
           76             self.genesis = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
           77         elif self.chain == "testnet":
           78             self.genesis = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
           79         else:
           80             raise ValueError(f"Invalid chain '{chain}'")  # pragma: no cover
           81 
           82         # Here we map available methods to their respective functions
           83         self.methodmap = {
           84             "blockchain.block.header": self.block_header,
           85             "blockchain.block.headers": self.block_headers,
           86             "blockchain.estimatefee": self.estimatefee,
           87             "blockchain.headers.subscribe": self.headers_subscribe,
           88             "blockchain.relayfee": self.relayfee,
           89             "blockchain.scripthash.get_balance": self.scripthash_get_balance,
           90             "blockchain.scripthash.get_history": self.scripthash_get_history,
           91             "blockchain.scripthash.get_mempool": self.scripthash_get_mempool,
           92             "blockchain.scripthash.listunspent": self.scripthash_listunspent,
           93             "blockchain.scripthash.subscribe": self.scripthash_subscribe,
           94             "blockchain.scripthash.unsubscribe": self.scripthash_unsubscribe,
           95             "blockchain.transaction.broadcast": self.transaction_broadcast,
           96             "blockchain.transaction.get": self.transaction_get,
           97             "blockchain.transaction.get_merkle": self.transaction_get_merkle,
           98             "blockchain.transaction.id_from_pos": self.transaction_id_from_pos,
           99             "mempool.get_fee_histogram": self.get_fee_histogram,
          100             "server_add_peer": self.add_peer,
          101             "server.banner": self.banner,
          102             "server.donation_address": self.donation_address,
          103             "server.features": self.server_features,
          104             "server.peers.subscribe": self.peers_subscribe,
          105             "server.ping": self.ping,
          106             "server.version": self.server_version,
          107         }
          108 
          109     async def stop(self):
          110         """Destructor function"""
          111         self.log.debug("ElectrumProtocol.stop()")
          112         self.stopped = True
          113         if self.bx:
          114             for i in self.peers:
          115                 await self._peer_cleanup(i)
          116             await self.bx.stop()
          117 
          118     async def _peer_cleanup(self, peer):
          119         """Cleanup tasks and data for peer"""
          120         self.log.debug("Cleaning up data for %s", peer)
          121         for i in self.peers[peer]["tasks"]:
          122             i.cancel()
          123         for i in self.peers[peer]["sh"]:
          124             self.peers[peer]["sh"][i]["task"].cancel()
          125 
          126     @staticmethod
          127     def _get_peer(writer):
          128         peer_t = writer._transport.get_extra_info("peername")  # pylint: disable=W0212
          129         return f"{peer_t[0]}:{peer_t[1]}"
          130 
          131     async def recv(self, reader, writer):
          132         """Loop ran upon a connection which acts as a JSON-RPC handler"""
          133         recv_buf = bytearray()
          134         self.peers[self._get_peer(writer)] = {"tasks": [], "sh": {}}
          135 
          136         while not self.stopped:
          137             data = await reader.read(4096)
          138             if not data or len(data) == 0:
          139                 await self._peer_cleanup(self._get_peer(writer))
          140                 return
          141             recv_buf.extend(data)
          142             lb = recv_buf.find(b"\n")
          143             if lb == -1:
          144                 continue
          145             while lb != -1:
          146                 line = recv_buf[:lb].rstrip()
          147                 recv_buf = recv_buf[lb + 1:]
          148                 lb = recv_buf.find(b"\n")
          149                 try:
          150                     line = line.decode("utf-8")
          151                     query = json.loads(line)
          152                 except (UnicodeDecodeError, json.JSONDecodeError) as err:
          153                     self.log.debug("%s", print_exc)
          154                     self.log.debug("Decode error: %s", repr(err))
          155                     break
          156                 self.log.debug("=> %s", line)
          157                 await self.handle_query(writer, query)
          158 
          159     async def _send_notification(self, writer, method, params):
          160         """Send JSON-RPC notification to given writer"""
          161         response = {"jsonrpc": "2.0", "method": method, "params": params}
          162         self.log.debug("<= %s", response)
          163         writer.write(json.dumps(response).encode("utf-8") + b"\n")
          164         await writer.drain()
          165 
          166     async def _send_response(self, writer, result, nid):
          167         """Send successful JSON-RPC response to given writer"""
          168         response = {"jsonrpc": "2.0", "result": result, "id": nid}
          169         self.log.debug("<= %s", response)
          170         writer.write(json.dumps(response).encode("utf-8") + b"\n")
          171         await writer.drain()
          172 
          173     async def _send_error(self, writer, error):
          174         """Send JSON-RPC error to given writer"""
          175         response = {"jsonrpc": "2.0", "error": error, "id": None}
          176         self.log.debug("<= %s", response)
          177         writer.write(json.dumps(response).encode("utf-8") + b"\n")
          178         await writer.drain()
          179 
          180     async def _send_reply(self, writer, resp, query):
          181         """Wrap function for sending replies"""
          182         if "error" in resp:
          183             return await self._send_error(writer, resp["error"])
          184         return await self._send_response(writer, resp["result"], query["id"])
          185 
          186     async def handle_query(self, writer, query):  # pylint: disable=R0915,R0912,R0911
          187         """Electrum protocol method handler mapper"""
          188         if "method" not in query or "id" not in query:
          189             return await self._send_reply(writer, JsonRPCError.invalidrequest(),
          190                                           None)
          191 
          192         method = query["method"]
          193         func = self.methodmap.get(method)
          194         if not func:
          195             self.log.error("Unhandled method %s, query=%s", method, query)
          196             return await self._send_reply(writer, JsonRPCError.methodnotfound(),
          197                                           query)
          198         resp = await func(writer, query)
          199         return await self._send_reply(writer, resp, query)
          200 
          201     async def _merkle_proof_for_headers(self, height, idx):
          202         """Extremely inefficient merkle proof for headers"""
          203         # The following works, but is extremely inefficient.
          204         # The best solution would be to figure something out in
          205         # libbitcoin-server
          206         cp_headers = []
          207 
          208         for i in range(0, height + 1):
          209             _ec, data = await self.bx.fetch_block_header(i)
          210             if _ec and _ec != ZMQError.success:
          211                 self.log.error("bx.fetch_block_header: %s", _ec.name)
          212                 return JsonRPCError.internalerror()
          213             cp_headers.append(data)
          214 
          215         branch, root = merkle_branch_and_root(
          216             [double_sha256(i) for i in cp_headers], idx)
          217 
          218         return {
          219             "branch": [hash_to_hex_str(i) for i in branch],
          220             "header": safe_hexlify(cp_headers[idx]),
          221             "root": hash_to_hex_str(root),
          222         }
          223 
          224     async def block_header(self, writer, query):  # pylint: disable=W0613,R0911
          225         """Method: blockchain.block.header
          226         Return the block header at the given height.
          227         """
          228         if "params" not in query or len(query["params"]) < 1:
          229             return JsonRPCError.invalidparams()
          230         index = query["params"][0]
          231         cp_height = query["params"][1] if len(query["params"]) == 2 else 0
          232 
          233         if not is_non_negative_integer(index):
          234             return JsonRPCError.invalidparams()
          235         if not is_non_negative_integer(cp_height):
          236             return JsonRPCError.invalidparams()
          237         if cp_height != 0 and not index <= cp_height:
          238             return JsonRPCError.invalidparams()
          239 
          240         if cp_height == 0:
          241             _ec, header = await self.bx.fetch_block_header(index)
          242             if _ec and _ec != ZMQError.success:
          243                 self.log.error("bx.fetch_block_header: %s", _ec.name)
          244                 return JsonRPCError.internalerror()
          245             return {"result": safe_hexlify(header)}
          246 
          247         res = await self._merkle_proof_for_headers(cp_height, index)
          248         return {"result": res}
          249 
          250     async def block_headers(self, writer, query):  # pylint: disable=W0613,R0911
          251         """Method: blockchain.block.headers
          252         Return a concatenated chunk of block headers from the main chain.
          253         """
          254         if "params" not in query or len(query["params"]) < 2:
          255             return JsonRPCError.invalidparams()
          256         # Electrum doesn't allow max_chunk_size to be less than 2016
          257         # gopher://bitreich.org/9/memecache/convenience-store.mkv
          258         max_chunk_size = 2016
          259         start_height = query["params"][0]
          260         count = query["params"][1]
          261         cp_height = query["params"][2] if len(query["params"]) == 3 else 0
          262 
          263         if not is_non_negative_integer(start_height):
          264             return JsonRPCError.invalidparams()
          265         if not is_non_negative_integer(count):
          266             return JsonRPCError.invalidparams()
          267         # BUG: spec says <= cp_height
          268         if cp_height != 0 and not start_height + (count - 1) < cp_height:
          269             return JsonRPCError.invalidparams()
          270 
          271         count = min(count, max_chunk_size)
          272         headers = bytearray()
          273         for i in range(count):
          274             _ec, data = await self.bx.fetch_block_header(start_height + i)
          275             if _ec and _ec != ZMQError.success:
          276                 self.log.error("bx.fetch_block_header: %s", _ec.name)
          277                 return JsonRPCError.internalerror()
          278             headers.extend(data)
          279 
          280         resp = {
          281             "hex": safe_hexlify(headers),
          282             "count": len(headers) // 80,
          283             "max": max_chunk_size,
          284         }
          285 
          286         if cp_height > 0:
          287             data = await self._merkle_proof_for_headers(
          288                 cp_height, start_height + (len(headers) // 80) - 1)
          289             resp["branch"] = data["branch"]
          290             resp["root"] = data["root"]
          291 
          292         return {"result": resp}
          293 
          294     async def estimatefee(self, writer, query):  # pylint: disable=W0613,disable=R0911
          295         """Method: blockchain.estimatefee
          296         Return the estimated transaction fee per kilobyte for a transaction
          297         to be confirmed within a certain number of blocks.
          298         """
          299         # NOTE: This solution is using the mempool.space API.
          300         # Let's try to eventually solve it with some internal logic.
          301         if "params" not in query or len(query["params"]) != 1:
          302             return JsonRPCError.invalidparams()
          303 
          304         num_blocks = query["params"][0]
          305         if not is_non_negative_integer(num_blocks):
          306             return JsonRPCError.invalidparams()
          307 
          308         if self.chain == "testnet":
          309             return {"result": 0.00001}
          310 
          311         fee_dict = get_mempool_fee_estimates()
          312         if not fee_dict:
          313             return {"result": -1}
          314 
          315         # Good enough.
          316         if num_blocks < 3:
          317             return {"result": "{:.8f}".format(fee_dict["fastestFee"] / 100000)}
          318 
          319         if num_blocks < 6:
          320             return {"result": "{:.8f}".format(fee_dict["halfHourFee"] / 100000)}
          321 
          322         if num_blocks < 10:
          323             return {"result": "{:.8f}".format(fee_dict["hourFee"] / 100000)}
          324 
          325         return {"result": "{:.8f}".format(fee_dict["minimumFee"] / 100000)}
          326 
          327     async def header_notifier(self, writer):
          328         self.block_queue = asyncio.Queue()
          329         await self.bx.subscribe_to_blocks(self.block_queue)
          330         while True:
          331             item = await self.block_queue.get()
          332             if len(item) != 3:
          333                 self.log.debug("error: item from block queue len != 3")
          334                 continue
          335 
          336             header = block_to_header(item[2])
          337             params = [{"height": item[1], "hex": safe_hexlify(header)}]
          338             await self._send_notification(writer,
          339                                           "blockchain.headers.subscribe",
          340                                           params)
          341 
          342     async def headers_subscribe(self, writer, query):  # pylint: disable=W0613
          343         """Method: blockchain.headers.subscribe
          344         Subscribe to receive block headers when a new block is found.
          345         """
          346         # Tip height and header are returned upon request
          347         _ec, height = await self.bx.fetch_last_height()
          348         if _ec and _ec != ZMQError.success:
          349             self.log.error("bx.fetch_last_height: %s", _ec.name)
          350             return JsonRPCError.internalerror()
          351         _ec, tip_header = await self.bx.fetch_block_header(height)
          352         if _ec and _ec != ZMQError.success:
          353             self.log.error("bx.fetch_block_header: %s", _ec.name)
          354             return JsonRPCError.internalerror()
          355 
          356         self.peers[self._get_peer(writer)]["tasks"].append(
          357             asyncio.create_task(self.header_notifier(writer)))
          358         ret = {"height": height, "hex": safe_hexlify(tip_header)}
          359         return {"result": ret}
          360 
          361     async def relayfee(self, writer, query):  # pylint: disable=W0613
          362         """Method: blockchain.relayfee
          363         Return the minimum fee a low-priority transaction must pay in order
          364         to be accepted to the daemon’s memory pool.
          365         """
          366         return {"result": 0.00001}
          367 
          368     async def scripthash_get_balance(self, writer, query):  # pylint: disable=W0613
          369         """Method: blockchain.scripthash.get_balance
          370         Return the confirmed and unconfirmed balances of a script hash.
          371         """
          372         if "params" not in query or len(query["params"]) != 1:
          373             return JsonRPCError.invalidparams()
          374 
          375         if not is_hash256_str(query["params"][0]):
          376             return JsonRPCError.invalidparams()
          377 
          378         _ec, data = await self.bx.fetch_balance(query["params"][0])
          379         if _ec and _ec != ZMQError.success:
          380             self.log.error("bx.fetch_balance: %s", _ec.name)
          381             return JsonRPCError.internalerror()
          382 
          383         ret = {"confirmed": data[0], "unconfirmed": data[1]}
          384         return {"result": ret}
          385 
          386     async def scripthash_get_history(self, writer, query):  # pylint: disable=W0613
          387         """Method: blockchain.scripthash.get_history
          388         Return the confirmed and unconfirmed history of a script hash.
          389         """
          390         if "params" not in query or len(query["params"]) != 1:
          391             return JsonRPCError.invalidparams()
          392 
          393         if not is_hash256_str(query["params"][0]):
          394             return JsonRPCError.invalidparams()
          395 
          396         _ec, data = await self.bx.fetch_history4(query["params"][0])
          397         if _ec and _ec != ZMQError.success:
          398             self.log.error("bx.fetch_history4: %s", _ec.name)
          399             return JsonRPCError.internalerror()
          400 
          401         self.log.debug("hist: %s", data)
          402         ret = []
          403         # TODO: mempool
          404         for i in data:
          405             if "received" in i:
          406                 ret.append({
          407                     "height": i["received"]["height"],
          408                     "tx_hash": hash_to_hex_str(i["received"]["hash"]),
          409                 })
          410             if "spent" in i:
          411                 ret.append({
          412                     "height": i["spent"]["height"],
          413                     "tx_hash": hash_to_hex_str(i["spent"]["hash"]),
          414                 })
          415 
          416         return {"result": ret}
          417 
          418     async def scripthash_get_mempool(self, writer, query):  # pylint: disable=W0613
          419         """Method: blockchain.scripthash.get_mempool
          420         Return the unconfirmed transactions of a script hash.
          421         """
          422         # TODO: Implement
          423         return JsonRPCError.invalidrequest()
          424 
          425     async def scripthash_listunspent(self, writer, query):  # pylint: disable=W0613
          426         """Method: blockchain.scripthash.listunspent
          427         Return an ordered list of UTXOs sent to a script hash.
          428         """
          429         if "params" not in query or len(query["params"]) != 1:
          430             return JsonRPCError.invalidparams()
          431 
          432         scripthash = query["params"][0]
          433         if not is_hash256_str(scripthash):
          434             return JsonRPCError.invalidparams()
          435 
          436         _ec, utxo = await self.bx.fetch_utxo(scripthash)
          437         if _ec and _ec != ZMQError.success:
          438             self.log.error("bx.fetch_utxo: %s", _ec.name)
          439             return JsonRPCError.internalerror()
          440 
          441         ret = []
          442         for i in utxo:
          443             rec = i["received"]
          444             ret.append({
          445                 "tx_pos": rec["index"],
          446                 "value": i["value"],
          447                 "tx_hash": hash_to_hex_str(rec["hash"]),
          448                 "height": rec["height"] if rec["height"] != 4294967295 else 0,
          449             })
          450 
          451         return {"result": ret}
          452 
          453     async def scripthash_renewer(self, scripthash, queue):
          454         while True:
          455             try:
          456                 self.log.debug("scriphash renewer: %s", scripthash)
          457                 _ec = await self.bx.subscribe_scripthash(scripthash, queue)
          458                 if _ec and _ec != ZMQError.success:
          459                     self.log.error("bx.subscribe_scripthash: %s", _ec.name)
          460                 await asyncio.sleep(60)
          461             except asyncio.CancelledError:
          462                 self.log.debug("subscription cancelled: %s", scripthash)
          463                 break
          464 
          465     async def scripthash_notifier(self, writer, scripthash):
          466         # TODO: Mempool
          467         # TODO: This is still flaky and not always notified. Investigate.
          468         self.log.debug("notifier")
          469         method = "blockchain.scripthash.subscribe"
          470         queue = asyncio.Queue()
          471         renew_task = asyncio.create_task(
          472             self.scripthash_renewer(scripthash, queue))
          473 
          474         while True:
          475             try:
          476                 item = await queue.get()
          477                 _ec, height, txid = struct.unpack("<HI32s", item)
          478 
          479                 self.log.debug("shnotifier: _ec: %d", _ec)
          480                 self.log.debug("shnotifier: height: %d", height)
          481                 self.log.debug("shnotifier: txid: %s", hash_to_hex_str(txid))
          482 
          483                 if (_ec == ZMQError.service_stopped.value and height == 0 and
          484                         not self.stopped):
          485                     self.log.debug("subscription expired: %s", scripthash)
          486                     # Subscription expired
          487                     continue
          488 
          489                 self.peers[self._get_peer(writer)]["sh"]["status"].append(
          490                     (hash_to_hex_str(txid), height))
          491 
          492                 params = [
          493                     scripthash,
          494                     ElectrumProtocol.__scripthash_status_encode(self.peers[
          495                         self._get_peer(writer)]["sh"]["scripthash"]["status"]),
          496                 ]
          497                 await self._send_notification(writer, method, params)
          498             except asyncio.CancelledError:
          499                 break
          500         renew_task.cancel()
          501 
          502     async def scripthash_subscribe(self, writer, query):  # pylint: disable=W0613
          503         """Method: blockchain.scripthash.subscribe
          504         Subscribe to a script hash.
          505         """
          506         if "params" not in query or len(query["params"]) != 1:
          507             return JsonRPCError.invalidparams()
          508 
          509         scripthash = query["params"][0]
          510         if not is_hash256_str(scripthash):
          511             return JsonRPCError.invalidparams()
          512 
          513         _ec, history = await self.bx.fetch_history4(scripthash)
          514         if _ec and _ec != ZMQError.success:
          515             self.log.error("bx.fetch_history4: %s", _ec.name)
          516             return JsonRPCError.internalerror()
          517 
          518         # TODO: Check how history4 acts for mempool/unconfirmed
          519         status = ElectrumProtocol.__scripthash_status_from_history(history)
          520         self.peers[self._get_peer(writer)]["sh"][scripthash] = {
          521             "status": status
          522         }
          523 
          524         task = asyncio.create_task(self.scripthash_notifier(writer, scripthash))
          525         self.peers[self._get_peer(writer)]["sh"][scripthash]["task"] = task
          526 
          527         if len(history) < 1:
          528             return {"result": None}
          529         return {"result": ElectrumProtocol.__scripthash_status_encode(status)}
          530 
          531     @staticmethod
          532     def __scripthash_status_from_history(history):
          533         status = []
          534         for i in history:
          535             if "received" in i:
          536                 status.append((
          537                     hash_to_hex_str(i["received"]["hash"]),
          538                     i["received"]["height"],
          539                 ))
          540             if "spent" in i:
          541                 status.append((
          542                     hash_to_hex_str(i["spent"]["hash"]),
          543                     i["spent"]["height"],
          544                 ))
          545         return status
          546 
          547     @staticmethod
          548     def __scripthash_status_encode(status):
          549         concat = ""
          550         for txid, height in status:
          551             concat += txid + ":%d:" % height
          552         return bh2u(sha256(concat.encode("ascii")))
          553 
          554     async def scripthash_unsubscribe(self, writer, query):  # pylint: disable=W0613
          555         """Method: blockchain.scripthash.unsubscribe
          556         Unsubscribe from a script hash, preventing future notifications
          557         if its status changes.
          558         """
          559         if "params" not in query or len(query["params"]) != 1:
          560             return JsonRPCError.invalidparams()
          561 
          562         scripthash = query["params"][0]
          563         if not is_hash256_str(scripthash):
          564             return JsonRPCError.invalidparams()
          565 
          566         if scripthash in self.peers[self._get_peer(writer)]["sh"]:
          567             self.peers[self._get_peer(
          568                 writer)]["sh"][scripthash]["task"].cancel()
          569             # await self.bx.unsubscribe_scripthash(scripthash)
          570             del self.peers[self._get_peer(writer)]["sh"][scripthash]
          571             return {"result": True}
          572 
          573         return {"result": False}
          574 
          575     async def transaction_broadcast(self, writer, query):  # pylint: disable=W0613
          576         """Method: blockchain.transaction.broadcast
          577         Broadcast a transaction to the network.
          578         """
          579         # Note: Not yet implemented in bs v4
          580         if "params" not in query or len(query["params"]) != 1:
          581             return JsonRPCError.invalidparams()
          582 
          583         hextx = query["params"][0]
          584         if not is_hex_str(hextx):
          585             return JsonRPCError.invalidparams()
          586 
          587         _ec, _ = await self.bx.broadcast_transaction(unhexlify(hextx)[::-1])
          588         if _ec and _ec != ZMQError.success:
          589             self.log.error("bx.broadcast_transaction: %s", _ec.name)
          590             return JsonRPCError.internalerror()
          591 
          592         rawtx = unhexlify(hextx)
          593         txid = double_sha256(rawtx)
          594         return {"result": hash_to_hex_str(txid)}
          595 
          596     async def transaction_get(self, writer, query):  # pylint: disable=W0613
          597         """Method: blockchain.transaction.get
          598         Return a raw transaction.
          599         """
          600         if "params" not in query or len(query["params"]) < 1:
          601             return JsonRPCError.invalidparams()
          602 
          603         tx_hash = query["params"][0]
          604         verbose = query["params"][1] if len(query["params"]) > 1 else False
          605 
          606         if not is_hex_str(tx_hash):
          607             return JsonRPCError.invalidparams()
          608 
          609         # _ec, rawtx = await self.bx.fetch_blockchain_transaction(tx_hash)
          610         _ec, rawtx = await self.bx.fetch_mempool_transaction(tx_hash)
          611         if _ec and _ec != ZMQError.success and _ec != ZMQError.not_found:
          612             self.log.error("fetch_mempool_transaction: %s", _ec.name)
          613             return JsonRPCError.internalerror()
          614 
          615         # Behaviour is undefined in spec
          616         if not rawtx:
          617             return JsonRPCError.internalerror()
          618             # return {"result": None}
          619 
          620         if verbose:
          621             # TODO: Help needed
          622             return JsonRPCError.invalidrequest()
          623 
          624         return {"result": bh2u(rawtx)}
          625 
          626     async def transaction_get_merkle(self, writer, query):  # pylint: disable=W0613
          627         """Method: blockchain.transaction.get_merkle
          628         Return the merkle branch to a confirmed transaction given its
          629         hash and height.
          630         """
          631         if "params" not in query or len(query["params"]) != 2:
          632             return JsonRPCError.invalidparams()
          633 
          634         tx_hash = query["params"][0]
          635         height = query["params"][1]
          636 
          637         if not is_hash256_str(tx_hash):
          638             return JsonRPCError.invalidparams()
          639         if not is_non_negative_integer(height):
          640             return JsonRPCError.invalidparams()
          641 
          642         _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
          643         if _ec and _ec != ZMQError.success:
          644             self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name)
          645             return JsonRPCError.internalerror()
          646 
          647         # Decouple from tuples
          648         hashes = [i[0] for i in hashes]
          649         tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
          650         branch = merkle_branch(hashes, tx_pos)
          651 
          652         res = {
          653             "block_height": int(height),
          654             "pos": int(tx_pos),
          655             "merkle": branch,
          656         }
          657         return {"result": res}
          658 
          659     async def transaction_id_from_pos(self, writer, query):  # pylint: disable=R0911,W0613
          660         """Method: blockchain.transaction.id_from_pos
          661         Return a transaction hash and optionally a merkle proof, given a
          662         block height and a position in the block.
          663         """
          664         if "params" not in query or len(query["params"]) < 2:
          665             return JsonRPCError.invalidparams()
          666 
          667         height = query["params"][0]
          668         tx_pos = query["params"][1]
          669         merkle = query["params"][2] if len(query["params"]) > 2 else False
          670 
          671         if not is_non_negative_integer(height):
          672             return JsonRPCError.invalidparams()
          673         if not is_non_negative_integer(tx_pos):
          674             return JsonRPCError.invalidparams()
          675         if not is_boolean(merkle):
          676             return JsonRPCError.invalidparams()
          677 
          678         _ec, hashes = await self.bx.fetch_block_transaction_hashes(height)
          679         if _ec and _ec != ZMQError.success:
          680             self.log.error("bx.fetch_block_transaction_hashes: %s", _ec.name)
          681             return JsonRPCError.internalerror()
          682 
          683         if len(hashes) - 1 < tx_pos:
          684             return JsonRPCError.internalerror()
          685 
          686         # Decouple from tuples
          687         hashes = [i[0] for i in hashes]
          688         txid = hash_to_hex_str(hashes[tx_pos])
          689 
          690         if not merkle:
          691             return {"result": txid}
          692 
          693         branch = merkle_branch(hashes, tx_pos)
          694         return {"result": {"tx_hash": txid, "merkle": branch}}
          695 
          696     async def get_fee_histogram(self, writer, query):  # pylint: disable=W0613
          697         """Method: mempool.get_fee_histogram
          698         Return a histogram of the fee rates paid by transactions in the
          699         memory pool, weighted by transaction size.
          700         """
          701         # TODO: Help wanted
          702         return {"result": [[0, 0]]}
          703 
          704     async def add_peer(self, writer, query):  # pylint: disable=W0613
          705         """Method: server.add_peer
          706         A newly-started server uses this call to get itself into other
          707         servers’ peers lists. It should not be used by wallet clients.
          708         """
          709         # TODO: Help wanted
          710         return {"result": False}
          711 
          712     async def banner(self, writer, query):  # pylint: disable=W0613
          713         """Method: server.banner
          714         Return a banner to be shown in the Electrum console.
          715         """
          716         _, bsversion = await self.bx.server_version()
          717         banner = "%s\nobelisk version: %s\nlibbitcoin-server version: %s" % (
          718             BANNER,
          719             VERSION,
          720             bsversion.decode(),
          721         )
          722         return {"result": banner}
          723 
          724     async def donation_address(self, writer, query):  # pylint: disable=W0613
          725         """Method: server.donation_address
          726         Return a server donation address.
          727         """
          728         return {"result": DONATION_ADDR}
          729 
          730     async def server_features(self, writer, query):  # pylint: disable=W0613 # pragma: no cover
          731         """Method: server.features
          732         Return a list of features and services supported by the server.
          733         """
          734         cfg = self.server_cfg
          735         hosts = {}
          736         for host in cfg["server_hostnames"]:
          737             hosts[host] = {"tcp_port": cfg["server_port"]}
          738 
          739         return {
          740             "result": {
          741                 "genesis_hash": self.genesis,
          742                 "hosts": hosts,
          743                 "protocol_max": SERVER_PROTO_MAX,
          744                 "protocol_min": SERVER_PROTO_MIN,
          745                 "pruning": None,
          746                 "server_version": f"obelisk {VERSION}",
          747                 "hash_function": "sha256",
          748             }
          749         }
          750 
          751     async def peers_subscribe(self, writer, query):  # pylint: disable=W0613
          752         """Method: server.peers.subscribe
          753         Return a list of peer servers. Despite the name this is not a
          754         subscription and the server must send no notifications.
          755         """
          756         # TODO: Help wanted
          757         return {"result": []}
          758 
          759     async def ping(self, writer, query):  # pylint: disable=W0613
          760         """Method: server.ping
          761         Ping the server to ensure it is responding, and to keep the session
          762         alive. The server may disconnect clients that have sent no requests
          763         for roughly 10 minutes.
          764         """
          765         return {"result": None}
          766 
          767     async def server_version(self, writer, query):  # pylint: disable=W0613
          768         """Method: server.version
          769         Identify the client to the server and negotiate the protocol version.
          770         """
          771         if "params" not in query or len(query["params"]) != 2:
          772             return JsonRPCError.invalidparams()
          773 
          774         client_ver = query["params"][1]
          775 
          776         if isinstance(client_ver, list):
          777             client_min, client_max = client_ver[0], client_ver[1]
          778         else:
          779             client_min = client_max = client_ver
          780 
          781         version = min(client_max, SERVER_PROTO_MAX)
          782 
          783         if version < max(client_min, SERVER_PROTO_MIN):
          784             return JsonRPCError.protonotsupported()
          785 
          786         return {"result": [f"obelisk {VERSION}", version]}