URI: 
       tzeromq.py - obelisk - Electrum server using libbitcoin as its backend
  HTML git clone https://git.parazyd.org/obelisk
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tzeromq.py (16384B)
       ---
            1 #!/usr/bin/env python3
            2 # Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
            3 #
            4 # This file is part of obelisk
            5 #
            6 # This program is free software: you can redistribute it and/or modify
            7 # it under the terms of the GNU Affero General Public License version 3
            8 # as published by the Free Software Foundation.
            9 #
           10 # This program is distributed in the hope that it will be useful,
           11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
           12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
           13 # GNU Affero General Public License for more details.
           14 #
           15 # You should have received a copy of the GNU Affero General Public License
           16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
           17 """ZeroMQ implementation for libbitcoin"""
           18 import asyncio
           19 import functools
           20 import struct
           21 from binascii import unhexlify
           22 from random import randint
           23 
           24 import zmq
           25 import zmq.asyncio
           26 
           27 from electrumobelisk.libbitcoin_errors import make_error_code, ErrorCode
           28 from electrumobelisk.util import bh2u
           29 
           30 
           31 def create_random_id():
           32     """Generate a random request ID"""
           33     max_uint32 = 4294967295
           34     return randint(0, max_uint32)
           35 
           36 
           37 def pack_block_index(index):
           38     """struct.pack given index"""
           39     if isinstance(index, str):
           40         index = unhexlify(index)
           41         assert len(index) == 32
           42         return index
           43     if isinstance(index, int):
           44         return struct.pack("<I", index)
           45 
           46     raise ValueError(
           47         f"Unknown index type {type(index)} v:{index}, should be int or bytearray"
           48     )
           49 
           50 
           51 def to_int(xbytes):
           52     """Make little-endian integer from given bytes"""
           53     return int.from_bytes(xbytes, byteorder="little")
           54 
           55 
           56 def checksum(xhash, index):
           57     """
           58     This method takes a transaction hash and an index and returns a checksum.
           59 
           60     This checksum is based on 49 bits starting from the 12th byte of the
           61     reversed hash. Combined with the last 15 bits of the 4 byte index.
           62     """
           63     mask = 0xFFFFFFFFFFFF8000
           64     magic_start_position = 12
           65 
           66     hash_bytes = bytes.fromhex(xhash)[::-1]
           67     last_20_bytes = hash_bytes[magic_start_position:]
           68 
           69     assert len(hash_bytes) == 32
           70     assert index < 2**32
           71 
           72     hash_upper_49_bits = to_int(last_20_bytes) & mask
           73     index_lower_15_bits = index & ~mask
           74     return hash_upper_49_bits | index_lower_15_bits
           75 
           76 
           77 def unpack_table(row_fmt, data):
           78     """Function to unpack table received from libbitcoin"""
           79     # Get the number of rows
           80     row_size = struct.calcsize(row_fmt)
           81     nrows = len(data) // row_size
           82     # Unpack
           83     rows = []
           84     for idx in range(nrows):
           85         offset = idx * row_size
           86         row = struct.unpack_from(row_fmt, data, offset)
           87         rows.append(row)
           88     return rows
           89 
           90 
           91 class ClientSettings:
           92     """Class implementing ZMQ client settings"""
           93     def __init__(self, timeout=10, context=None, loop=None):
           94         self._timeout = timeout
           95         self._context = context
           96         self._loop = loop
           97 
           98     @property
           99     def context(self):
          100         """ZMQ context property"""
          101         if not self._context:
          102             ctx = zmq.asyncio.Context()
          103             ctx.linger = 500  # in milliseconds
          104             self._context = ctx
          105         return self._context
          106 
          107     @context.setter
          108     def context(self, context):
          109         self._context = context
          110 
          111     @property
          112     def timeout(self):
          113         """Set to None for no timeout"""
          114         return self._timeout
          115 
          116     @timeout.setter
          117     def timeout(self, timeout):
          118         self._timeout = timeout
          119 
          120 
          121 class Request:
          122     """Class implementing a _send_ request.
          123     This is either a simple request/response affair or a subscription.
          124     """
          125     def __init__(self, socket, command, data):
          126         self.id_ = create_random_id()
          127         self.socket = socket
          128         self.command = command
          129         self.data = data
          130         self.future = asyncio.Future()
          131         self.queue = None
          132 
          133     async def send(self):
          134         """Send the ZMQ request"""
          135         request = [self.command, struct.pack("<I", self.id_), self.data]
          136         await self.socket.send_multipart(request)
          137 
          138     def is_subscription(self):
          139         """If the request is a subscription, then the response to this
          140         request is a notification.
          141         """
          142         return self.queue is not None
          143 
          144     def __str__(self):
          145         return "Request(command, ID) {}, {:d}".format(self.command, self.id_)
          146 
          147 
          148 class InvalidServerResponseException(Exception):
          149     """Exception for invalid server responses"""
          150 
          151 
          152 class Response:
          153     """Class implementing a request response"""
          154     def __init__(self, frame):
          155         if len(frame) != 3:
          156             raise InvalidServerResponseException(
          157                 f"Length of the frame was not 3: {len(frame)}")
          158 
          159         self.command = frame[0]
          160         self.request_id = struct.unpack("<I", frame[1])[0]
          161         error_code = struct.unpack("<I", frame[2][:4])[0]
          162         self.error_code = make_error_code(error_code)
          163         self.data = frame[2][4:]
          164 
          165     def is_bound_for_queue(self):
          166         return len(self.data) > 0
          167 
          168     def __str__(self):
          169         return (
          170             "Response(command, request ID, error code, data):" +
          171             f" {self.command}, {self.request_id}, {self.error_code}, {self.data}"
          172         )
          173 
          174 
          175 class RequestCollection:
          176     """RequestCollection carries a list of Requests and matches incoming
          177     responses to them.
          178     """
          179     def __init__(self, socket, loop):
          180         self._socket = socket
          181         self._requests = {}
          182         self._task = asyncio.ensure_future(self._run(), loop=loop)
          183 
          184     async def _run(self):
          185         while True:
          186             await self._receive()
          187 
          188     async def stop(self):
          189         """Stops listening for incoming responses (or subscription) messages.
          190         Returns the number of _responses_ expected but which are now dropped
          191         on the floor.
          192         """
          193         self._task.cancel()
          194         try:
          195             await self._task
          196         except asyncio.CancelledError:
          197             return len(self._requests)
          198 
          199     async def _receive(self):
          200         frame = await self._socket.recv_multipart()
          201         response = Response(frame)
          202 
          203         if response.request_id in self._requests:
          204             self._handle_response(response)
          205         else:
          206             print(
          207                 f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"  # pylint: disable=C0301
          208             )
          209 
          210     def _handle_response(self, response):
          211         request = self._requests[response.request_id]
          212 
          213         if request.is_subscription():
          214             if response.is_bound_for_queue():
          215                 # TODO: decode the data into something usable
          216                 request.queue.put_nowait(response.data)
          217             else:
          218                 request.future.set_result(response)
          219         else:
          220             self.delete_request(request)
          221             request.future.set_result(response)
          222 
          223     def add_request(self, request):
          224         # TODO: we should maybe check if the request.id_ is unique
          225         self._requests[request.id_] = request
          226 
          227     def delete_request(self, request):
          228         del self._requests[request.id_]
          229 
          230 
          231 class Client:
          232     """This class represents a connection to a libbitcoin server."""
          233     def __init__(self, log, endpoints, loop):
          234         self.log = log
          235         self._endpoints = endpoints
          236         self._settings = ClientSettings(loop=loop)
          237         self._query_socket = self._create_query_socket()
          238         self._block_socket = self._create_block_socket()
          239         self._request_collection = RequestCollection(self._query_socket,
          240                                                      self._settings._loop)
          241 
          242     async def stop(self):
          243         self.log.debug("zmq Client.stop()")
          244         self._query_socket.close()
          245         self._block_socket.close()
          246         return await self._request_collection.stop()
          247 
          248     def _create_block_socket(self):
          249         socket = self._settings.context.socket(
          250             zmq.SUB,  # pylint: disable=E1101
          251             io_loop=self._settings._loop,  # pylint: disable=W0212
          252         )
          253         socket.connect(self._endpoints["block"])
          254         socket.setsockopt_string(zmq.SUBSCRIBE, "")  # pylint: disable=E1101
          255         return socket
          256 
          257     def _create_query_socket(self):
          258         socket = self._settings.context.socket(
          259             zmq.DEALER,  # pylint: disable=E1101
          260             io_loop=self._settings._loop,  # pylint: disable=W0212
          261         )
          262         socket.connect(self._endpoints["query"])
          263         return socket
          264 
          265     async def _subscription_request(self, command, data):
          266         request = await self._request(command, data)
          267         request.queue = asyncio.Queue(loop=self._settings._loop)  # pylint: disable=W0212
          268         error_code, _ = await self._wait_for_response(request)
          269         return error_code, request.queue
          270 
          271     async def _simple_request(self, command, data):
          272         return await self._wait_for_response(await
          273                                              self._request(command, data))
          274 
          275     async def _request(self, command, data):
          276         """Make a generic request. Both options are byte objects specified
          277         like b'blockchain.fetch_block_header' as an example.
          278         """
          279         request = Request(self._query_socket, command, data)
          280         await request.send()
          281         self._request_collection.add_request(request)
          282         return request
          283 
          284     async def _wait_for_response(self, request):
          285         try:
          286             response = await asyncio.wait_for(request.future,
          287                                               self._settings.timeout)
          288         except asyncio.TimeoutError:
          289             self._request_collection.delete_request(request)
          290             return ErrorCode.channel_timeout, None
          291 
          292         assert response.command == request.command
          293         assert response.request_id == request.id_
          294         return response.error_code, response.data
          295 
          296     async def fetch_last_height(self):
          297         """Fetch the blockchain tip and return integer height"""
          298         command = b"blockchain.fetch_last_height"
          299         error_code, data = await self._simple_request(command, b"")
          300         if error_code:
          301             return error_code, None
          302         return error_code, struct.unpack("<I", data)[0]
          303 
          304     async def fetch_block_header(self, index):
          305         """Fetch a block header by its height or integer index"""
          306         command = b"blockchain.fetch_block_header"
          307         data = pack_block_index(index)
          308         return await self._simple_request(command, data)
          309 
          310     async def fetch_block_transaction_hashes(self, index):
          311         """Fetch transaction hashes in a block at height index"""
          312         command = b"blockchain.fetch_block_transaction_hashes"
          313         data = pack_block_index(index)
          314         error_code, data = await self._simple_request(command, data)
          315         if error_code:
          316             return error_code, None
          317         return error_code, unpack_table("32s", data)
          318 
          319     async def fetch_blockchain_transaction(self, txid):
          320         """Fetch transaction by txid (not including mempool)"""
          321         command = b"blockchain.fetch_transaction2"
          322         error_code, data = await self._simple_request(
          323             command,
          324             bytes.fromhex(txid)[::-1])
          325         if error_code:
          326             return error_code, None
          327         return error_code, data
          328 
          329     async def fetch_mempool_transaction(self, txid):
          330         """Fetch transaction by txid (including mempool)"""
          331         command = b"transaction_pool.fetch_transaction2"
          332         error_code, data = await self._simple_request(
          333             command,
          334             bytes.fromhex(txid)[::-1])
          335         if error_code:
          336             return error_code, None
          337         return error_code, data
          338 
          339     async def subscribe_scripthash(self, scripthash):
          340         """Subscribe to scripthash"""
          341         command = b"subscribe.key"
          342         decoded_address = unhexlify(scripthash)
          343         return await self._subscription_request(command, decoded_address)
          344 
          345     async def unsubscribe_scripthash(self, scripthash):
          346         """Unsubscribe scripthash"""
          347         # TODO: This call should ideally also remove the subscription
          348         # request from the RequestCollection.
          349         # This call solicits a final call from the server with an
          350         # `error::service_stopped` error code.
          351         command = b"unsubscribe.key"
          352         decoded_address = unhexlify(scripthash)
          353         return await self._simple_request(command, decoded_address)
          354 
          355     async def fetch_history4(self, scripthash, height=0):
          356         """Fetch history for given scripthash"""
          357         command = b"blockchain.fetch_history4"
          358         decoded_address = unhexlify(scripthash)
          359         error_code, raw_points = await self._simple_request(
          360             command, decoded_address + struct.pack("<I", height))
          361         if error_code:
          362             return error_code, None
          363 
          364         def make_tuple(row):
          365             kind, height, tx_hash, index, value = row
          366             return (
          367                 kind,
          368                 {
          369                     "hash": tx_hash,
          370                     "index": index
          371                 },
          372                 height,
          373                 value,
          374                 checksum(tx_hash[::-1].hex(), index),
          375             )
          376 
          377         rows = unpack_table("<BI32sIQ", raw_points)
          378         points = [make_tuple(row) for row in rows]
          379         correlated_points = Client.__correlate(points)
          380         # self.log.debug("history points: %s", points)
          381         # self.log.debug("history correlated: %s", correlated_points)
          382         return error_code, self._sort_correlated_points(correlated_points)
          383 
          384     @staticmethod
          385     def _sort_correlated_points(points):
          386         """Sort by ascending height"""
          387         if len(points) < 2:
          388             return points
          389         return sorted(points, key=lambda x: list(x.values())[0]["height"])
          390 
          391     async def broadcast_transaction(self, rawtx):
          392         """Broadcast given raw transaction"""
          393         command = b"transaction_pool.broadcast"
          394         return await self._simple_request(command, rawtx)
          395 
          396     async def fetch_balance(self, scripthash):
          397         """Fetch balance for given scripthash"""
          398         error_code, history = await self.fetch_history4(scripthash)
          399         if error_code:
          400             return error_code, None
          401 
          402         utxo = Client.__receives_without_spends(history)
          403         return error_code, functools.reduce(
          404             lambda accumulator, point: accumulator + point["value"], utxo, 0)
          405 
          406     async def fetch_utxo(self, scripthash):
          407         """Find UTXO for given scripthash"""
          408         error_code, history = await self.fetch_history4(scripthash)
          409         if error_code:
          410             return error_code, None
          411         return error_code, Client.__receives_without_spends(history)
          412 
          413     async def subscribe_to_blocks(self, queue):
          414         asyncio.ensure_future(self._listen_for_blocks(queue))
          415         return queue
          416 
          417     async def _listen_for_blocks(self, queue):
          418         """Infinite loop for block subscription.
          419         Returns raw blocks as they're received.
          420         """
          421         while True:
          422             frame = await self._block_socket.recv_multipart()
          423             seq = struct.unpack("<H", frame[0])[0]
          424             height = struct.unpack("<I", frame[1])[0]
          425             block_data = frame[2]
          426             queue.put_nowait((seq, height, block_data))
          427 
          428     @staticmethod
          429     def __receives_without_spends(history):
          430         return (point for point in history if "spent" not in point)
          431 
          432     @staticmethod
          433     def __correlate(points):
          434         transfers, checksum_to_index = Client.__find_receives(points)
          435         transfers = Client.__correlate_spends_to_receives(
          436             points, transfers, checksum_to_index)
          437         return transfers
          438 
          439     @staticmethod
          440     def __correlate_spends_to_receives(points, transfers, checksum_to_index):
          441         for point in points:
          442             if point[0] == 1:  # receive
          443                 continue
          444 
          445             spent = {
          446                 "hash": point[1]["hash"],
          447                 "height": point[2],
          448                 "index": point[1]["index"],
          449             }
          450             if point[3] not in checksum_to_index:
          451                 transfers.append({"spent": spent})
          452             else:
          453                 transfers[checksum_to_index[point[3]]]["spent"] = spent
          454 
          455         return transfers
          456 
          457     @staticmethod
          458     def __find_receives(points):
          459         transfers = []
          460         checksum_to_index = {}
          461 
          462         for point in points:
          463             if point[0] == 0:  # spent
          464                 continue
          465 
          466             transfers.append({
          467                 "received": {
          468                     "hash": point[1]["hash"],
          469                     "height": point[2],
          470                     "index": point[1]["index"],
          471                 },
          472                 "value": point[3],
          473             })
          474 
          475             checksum_to_index[point[4]] = len(transfers) - 1
          476 
          477         return transfers, checksum_to_index