URI: 
       tzeromq.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tzeromq.py (18114B)
       ---
            1 #!/usr/bin/env python
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2011 thomasv@gitorious
            5 # Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
            6 # Copyright (C) 2018 Harm Aarts <harmaarts@gmail.com>
            7 #
            8 # Permission is hereby granted, free of charge, to any person
            9 # obtaining a copy of this software and associated documentation files
           10 # (the "Software"), to deal in the Software without restriction,
           11 # including without limitation the rights to use, copy, modify, merge,
           12 # publish, distribute, sublicense, and/or sell copies of the Software,
           13 # and to permit persons to whom the Software is furnished to do so,
           14 # subject to the following conditions:
           15 #
           16 # The above copyright notice and this permission notice shall be
           17 # included in all copies or substantial portions of the Software.
           18 #
           19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           20 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           21 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           22 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           23 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           24 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           25 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           26 # SOFTWARE.
           27 import asyncio
           28 import logging
           29 import functools
           30 import hashlib
           31 import struct
           32 from random import randint
           33 from binascii import hexlify, unhexlify
           34 
           35 import zmq
           36 import zmq.asyncio
           37 
           38 from .logging import Logger
           39 from .libbitcoin_errors import make_error_code, ErrorCode
           40 from .util import bh2u
           41 
           42 
           43 from datetime import datetime
           44 def __(msg):
           45     print("***********************")
           46     print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
           47 
           48 
           49 def create_random_id():
           50     """Generate a random request ID"""
           51     max_uint32 = 4294967295
           52     return randint(0, max_uint32)
           53 
           54 
           55 def checksum(hash_, index):
           56     """This method takes a transaction hash and an index and returns
           57     a checksum.
           58     This checksum is based on 49 bits starting from the 12th byte of the
           59     reversed hash. Combined with the last 15 bits of the 4 byte index.
           60     """
           61     mask = 0xffffffffffff8000
           62     magic_start_position = 12
           63 
           64     hash_bytes = bytes.fromhex(hash_)[::-1]
           65     last_20_bytes = hash_bytes[magic_start_position:]
           66 
           67     assert len(hash_bytes) == 32
           68     assert index < 2**32
           69 
           70     hash_upper_49_bits = to_int(last_20_bytes) & mask
           71     index_lower_15_bits = index & ~mask
           72 
           73     return hash_upper_49_bits | index_lower_15_bits
           74 
           75 
           76 def to_int(some_bytes):
           77     return int.from_bytes(some_bytes, byteorder='little')
           78 
           79 
           80 def pack_block_index(index):
           81     if isinstance(index, str):
           82         index = unhexlify(index)
           83         assert len(index) == 32
           84         return index
           85     elif isinstance(index, int):
           86         return struct.pack('<I', index)
           87     else:
           88         raise ValueError(f"Unknown index type {type(index)} v:{index}, should be int or bytearray")
           89 
           90 
           91 def unpack_table(row_fmt, data):
           92     # get the number of rows
           93     row_size = struct.calcsize(row_fmt)
           94     nrows = len(data) // row_size
           95 
           96     # unpack
           97     rows = []
           98     for idx in range(nrows):
           99         offset = idx * row_size
          100         row = struct.unpack_from(row_fmt, data, offset)
          101         rows.append(row)
          102     return rows
          103 
          104 
          105 class ClientSettings:
          106     """Class implementing client settings"""
          107     def __init__(self, timeout=10, context=None, loop=None):
          108         __("Zeromq ClientSettings: __init__")
          109         self._timeout = timeout
          110         self._context = context
          111         self._loop = loop
          112 
          113     @property
          114     def context(self):
          115         """zmq context property"""
          116         if not self._context:
          117             ctx = zmq.asyncio.Context()
          118             ctx.linger = 500  # in milliseconds
          119             self._context = ctx
          120         return self._context
          121 
          122     @context.setter
          123     def context(self, context):
          124         self._context = context
          125 
          126     @property
          127     def timeout(self):
          128         """Set to None for no timeout"""
          129         return self._timeout
          130 
          131     @timeout.setter
          132     def timeout(self, timeout):
          133         self._timeout = timeout
          134 
          135 
          136 class Request:
          137     """Class implementing a _send_ request.
          138     This is either a simple request/response affair or a subscription.
          139     """
          140     def __init__(self, socket, command, data):
          141         __("Zeromq Request: __init__")
          142         self.id_ = create_random_id()
          143         self.socket = socket
          144         self.command = command
          145         self.data = data
          146         self.future = asyncio.Future()
          147         self.queue = None
          148 
          149     async def send(self):
          150         """Send the zmq request"""
          151         __(f"Zeromq Request: send: {self.command}, {self.data}")
          152         request = [self.command, struct.pack('<I', self.id_), self.data]
          153         await self.socket.send_multipart(request)
          154 
          155     def is_subscription(self):
          156         """If the request is a subscription, then the response to this
          157         request is a notification.
          158         """
          159         return self.queue is not None
          160 
          161     def __str__(self):
          162         return 'Request(command, ID) {}, {:d}'.format(self.command,
          163                                                       self.id_)
          164 
          165 
          166 class InvalidServerResponseException(Exception): pass
          167 
          168 
          169 class Response:
          170     """Class implementing a request response"""
          171     def __init__(self, frame):
          172         __("Zeromq Response: __init__")
          173         if len(frame) != 3:
          174             raise InvalidServerResponseException(
          175                 'Length of the frame was not 3: %d' % len(frame))
          176 
          177         self.command = frame[0]
          178         self.request_id = struct.unpack('<I', frame[1])[0]
          179         error_code = struct.unpack('<I', frame[2][:4])[0]
          180         self.error_code = make_error_code(error_code)
          181         self.data = frame[2][4:]
          182 
          183     def is_bound_for_queue(self):
          184         return len(self.data) > 0
          185 
          186     def __str__(self):
          187         return 'Response(command, request ID, error code, data):' \
          188             + ' %s, %d, %s, %s' \
          189             % (self.command, self.request_id, self.error_code, self.data)
          190 
          191 
          192 class RequestCollection:
          193     """RequestCollection carries a list of Requests and matches incoming
          194     responses to them.
          195     """
          196     def __init__(self, socket, loop):
          197         __("Zeromq RequestCollection: __init__")
          198         self._socket = socket
          199         self._requests = {}
          200         self._task = asyncio.ensure_future(self._run(), loop=loop)
          201 
          202     async def _run(self):
          203         while True:
          204             __("Zeromq RequestCollection: _run loop iteration")
          205             await self._receive()
          206 
          207     async def stop(self):
          208         """Stops listening for incoming responses (or subscription) messages).
          209         Returns the number of _responses_ expected but which are now dropped
          210         on the floor.
          211         """
          212         __("Zeromq RequestCollection: stop")
          213         self._task.cancel()
          214         try:
          215             await self._task
          216         except asyncio.CancelledError:
          217             return len(self._requests)
          218 
          219     async def _receive(self):
          220         __("Zeromq RequestCollection: receive")
          221         frame = await self._socket.recv_multipart()
          222         response = Response(frame)
          223 
          224         if response.request_id in self._requests:
          225             self._handle_response(response)
          226         else:
          227             __("Zeromq RequestCollection: receive: unhandled response %s:%s" % (response.command, response.request_id))
          228 
          229     def _handle_response(self, response):
          230         __("Zeromq RequestCollection: _handle_response")
          231         request = self._requests[response.request_id]
          232 
          233         if request.is_subscription():
          234             if response.is_bound_for_queue():
          235                 # TODO: decode the data into something usable
          236                 request.queue.put_nowait(response.data)
          237             else:
          238                 request.future.set_result(response)
          239         else:
          240             self.delete_request(request)
          241             request.future.set_result(response)
          242 
          243     def add_request(self, request):
          244         __("Zeromq RequestCollection: add_request")
          245         # TODO: we should maybe check if the request.id_ is unique
          246         self._requests[request.id_] = request
          247 
          248     def delete_request(self, request):
          249         __("Zeromq RequestCollection: delete_request")
          250         del self._requests[request.id_]
          251 
          252 
          253 class Client:
          254     """This class represents a connection to a libbitcoin server.
          255     hostname -- the server DNS name to connect to.
          256     ports -- a dictionary containing four keys; query/heartbeat/block/tx
          257     """
          258     # def __init__(self, hostname, ports, settings=ClientSettings()):
          259     def __init__(self, hostname, ports, loop):
          260         __("Zeromq Client: __init__")
          261         self._hostname = hostname
          262         self._ports = ports
          263         # self._settings = settings
          264         self._settings = ClientSettings(loop=loop)
          265         self._query_socket = self._create_query_socket()
          266         self._block_socket = self._create_block_socket()
          267         self._request_collection = RequestCollection(
          268             self._query_socket, self._settings._loop)
          269 
          270     async def stop(self):
          271         __("Zeromq Client: stop")
          272         self._query_socket.close()
          273         self._block_socket.close()
          274         return await self._request_collection.stop()
          275 
          276     def _create_block_socket(self):
          277         __("Zeromq Client: _create_block_socket")
          278         socket = self._settings.context.socket(
          279             zmq.SUB, io_loop=self._settings._loop)  # pylint: disable=E1101
          280         socket.connect(self.__server_url(self._hostname,
          281                                          self._ports['block']))
          282         socket.setsockopt_string(zmq.SUBSCRIBE, '')  # pylint: disable=E1101
          283         return socket
          284 
          285     def _create_query_socket(self):
          286         __("Zeromq Client: _create_query_socket")
          287         socket = self._settings.context.socket(
          288             zmq.DEALER, io_loop=self._settings._loop)  # pylint: disable=E1101
          289         socket.connect(self.__server_url(self._hostname,
          290                                          self._ports['query']))
          291         return socket
          292 
          293     async def _subscription_request(self, command, data):
          294         __("Zeromq Client: _subscription_request")
          295         request = await self._request(command, data)
          296         request.queue = asyncio.Queue(loop=self._settings._loop)
          297         error_code, _ = await self._wait_for_response(request)
          298         return error_code, request.queue
          299 
          300     async def _simple_request(self, command, data):
          301         __("Zeromq Client: _simple_request")
          302         return await self._wait_for_response(
          303             await self._request(command, data))
          304 
          305     async def _request(self, command, data):
          306         """Make a generic request. Both options are byte objects
          307         specified like b'blockchain.fetch_block_header' as an example.
          308         """
          309         __("Zeromq Client: _request")
          310         request = Request(self._query_socket, command, data)
          311         await request.send()
          312         self._request_collection.add_request(request)
          313         return request
          314 
          315     async def _wait_for_response(self, request):
          316         __("Zeromq Client: _wait_for_response")
          317         try:
          318             response = await asyncio.wait_for(request.future,
          319                                               self._settings.timeout)
          320         except asyncio.TimeoutError:
          321             self._request_collection.delete_request(request)
          322             return ErrorCode.channel_timeout, None
          323 
          324         assert response.command == request.command
          325         assert response.request_id == request.id_
          326         return response.error_code, response.data
          327 
          328     @staticmethod
          329     def __server_url(hostname, port):
          330         return 'tcp://' + hostname + ':' + str(port)
          331 
          332     async def last_height(self):
          333         __("Zeromq Client: last_height")
          334         command = b'blockchain.fetch_last_height'
          335         error_code, data = await self._simple_request(command, b'')
          336         if error_code:
          337             return error_code, None
          338         height = struct.unpack('<I', data)[0]
          339         return error_code, height
          340 
          341     async def subscribe_to_blocks(self, queue):
          342         __("Zeromq Client: subscribe_to_blocks")
          343         asyncio.ensure_future(self._listen_for_blocks(queue))
          344         return queue
          345 
          346     async def _listen_for_blocks(self, queue):
          347         __("Zeromq Client: _listen_for_blocks")
          348         _ec, tip = await self.last_height()
          349         _, header = await self.block_header(tip)
          350         queue.put_nowait((0, tip, header))
          351         while True:
          352             __("Zeromq Client: _listen_for_blocks loop iteration")
          353             frame = await self._block_socket.recv_multipart()
          354             seq = struct.unpack('<H', frame[0])[0]
          355             height = struct.unpack('<I', frame[1])[0]
          356             block_data = frame[2]
          357             block_header = block_data[:80]
          358             # block_header = raw[:80]
          359             # version = block_header[:4]
          360             # prev_merkle_root = block_header[4:36]
          361             # merkle_root = block_header[36:68]
          362             # timestamp = block_header[68:72]
          363             # bits = block_header[72:76]
          364             # nonce = blockheader[76:80]
          365             queue.put_nowait((seq, height, block_header))
          366 
          367     async def _subscribe_to_scripthash(self, sh, queue):
          368         __("Zeromq Client: _subscribe_to_scripthash (stub)")
          369         # TODO: libbitcoin here get history and make status (also review this entire func)
          370         # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-subscribe
          371         # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status
          372         # https://parazyd.org/git/electrum-obelisk/file/electrumobelisk/protocol.py.html#l57
          373         # queue.put_nowait((something,))
          374         # while True:
          375         #     recv and put in queue
          376 
          377 
          378     async def block_header(self, index):
          379         """Fetches the block header by height or integer index"""
          380         __("Zeromq Client: block_header")
          381         command = b'blockchain.fetch_block_header'
          382         data = pack_block_index(index)
          383         error_code, data = await self._simple_request(command, data)
          384         if error_code:
          385             return error_code, None
          386         return error_code, data
          387 
          388     async def block_transaction_hashes(self, index):
          389         __("Zeromq Client: block_transaction_hashes")
          390         command = b'blockchain.fetch_block_transaction_hashes'
          391         data = pack_block_index(index)
          392         error_code, data = await self._simple_request(command, data)
          393         if error_code:
          394             return error_code, None
          395         data = unpack_table('32s', data)
          396         return error_code, data
          397 
          398     async def transaction(self, hash_):
          399         __("Zeromq Client: transaction")
          400         command = b'blockchain.fetch_transaction2'
          401         error_code, data = await self._simple_request(
          402             command, bytes.fromhex(hash_)[::-1])
          403         if error_code:
          404             return error_code, None
          405         return None, data
          406 
          407     async def mempool_transaction(self, hash_):
          408         __("Zeromq Client: mempool_transaction")
          409         command = b'transaction_pool.fetch_transaction2'
          410         error_code, data = await self._simple_request(
          411             command, bytes.fromhex(hash_)[::-1])
          412         if error_code:
          413             return error_code, None
          414         return None, bh2u(data)
          415 
          416     async def broadcast_transaction(self, rawtx):
          417         __("Zeromq Client: broadcast_transaction")
          418         __(rawtx)
          419         command = b'transaction_pool.broadcast'
          420         return await self._simple_request(command, unhexlify(rawtx))
          421 
          422     async def history4(self, scripthash, height=0):
          423         __("Zeromq Client: history4")
          424         command = b'blockchain.fetch_history4'
          425         decoded_address = unhexlify(scripthash)[::-1]  # TODO: check byte order
          426         error_code, raw_points = await self._simple_request(
          427             command, decoded_address + struct.pack('<I', height))
          428         if error_code:
          429             return error_code, None
          430 
          431         def make_tuple(row):
          432             kind, tx_hash, index, height, value = row
          433             return (
          434                 kind,
          435                 #COutPoint(tx_hash, index),  # TODO: libbitcoin XXX:
          436                 (tx_hash, index),
          437                 height,
          438                 value,
          439                 checksum(tx_hash[::-1].hex(), index),
          440             )
          441 
          442         rows = unpack_table('<B32sIIQ', raw_points)
          443         points = [make_tuple(row) for row in rows]
          444         correlated_points = Client.__correlate(points)
          445         return None, correlated_points
          446 
          447     async def balance(self, scripthash):
          448         __("Zeromq Client: balance")
          449         error, hist = await self.history4(scripthash)
          450         if error:
          451             return error, None
          452         utxo = Client.__receives_without_spends(hist)
          453         return None, functools.reduce(
          454             lambda accumulator, point: accumulator + point['value'], utxo, 0)
          455 
          456     async def unspent(self, scripthash):
          457         __("Zeromq Client: unspent")
          458         error, hist = await self.history4(scripthash)
          459         if error:
          460             return error, None
          461         return None, Client.__receives_without_spends(hist)
          462 
          463     @staticmethod
          464     def __receives_without_spends(hist):
          465         return (point for point in hist if 'spent' not in point)
          466 
          467     @staticmethod
          468     def __correlate(points):
          469         transfers, checksum_to_index = Client.__find_receives(points)
          470         transfers = Client.__correlate_spends_to_receives(
          471             points, transfers, checksum_to_index)
          472         return transfers
          473 
          474     @staticmethod
          475     def __correlate_spends_to_receives(points, transfers, checksum_to_index):
          476         for point in points:
          477             if point[0] == 0: # receive
          478                 continue
          479 
          480             spent = {
          481                 'hash': point[1].hash,
          482                 'height': point[2],
          483                 'index': point[1].n,
          484             }
          485             if point[3] not in checksum_to_index:
          486                 transfers.append({'spent': spent})
          487             else:
          488                 transfers[checksum_to_index[point[3]]]['spent'] = spent
          489 
          490         return transfers
          491 
          492     @staticmethod
          493     def __find_receives(points):
          494         transfers = []
          495         checksum_to_index = {}
          496 
          497         for point in points:
          498             if point[0] == 1:  # spent
          499                 continue
          500 
          501             transfers.append({
          502                 'received': {
          503                     'hash': point[1].hash,
          504                     'height': point[2],
          505                     'index': point[1].n,
          506                 },
          507                 'value': point[3],
          508             })
          509 
          510             checksum_to_index[point[4]] = len(transfers) - 1
          511 
          512         return transfers, checksum_to_index