tzeromq.py - obelisk - Electrum server using libbitcoin as its backend HTML git clone https://git.parazyd.org/obelisk DIR Log DIR Files DIR Refs DIR README DIR LICENSE --- tzeromq.py (17187B) --- 1 #!/usr/bin/env python3 2 # Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org> 3 # 4 # This file is part of obelisk 5 # 6 # This program is free software: you can redistribute it and/or modify 7 # it under the terms of the GNU Affero General Public License version 3 8 # as published by the Free Software Foundation. 9 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU Affero General Public License for more details. 14 # 15 # You should have received a copy of the GNU Affero General Public License 16 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 """ZeroMQ implementation for libbitcoin""" 18 import asyncio 19 import functools 20 import struct 21 from binascii import unhexlify 22 from random import randint 23 24 import zmq 25 import zmq.asyncio 26 27 from obelisk.errors_libbitcoin import make_error_code, ZMQError 28 from obelisk.util import hash_to_hex_str 29 30 31 def create_random_id(): 32 """Generate a random request ID""" 33 max_uint32 = 4294967295 34 return randint(0, max_uint32) 35 36 37 def pack_block_index(index): 38 """struct.pack given index""" 39 if isinstance(index, str): 40 index = unhexlify(index) 41 assert len(index) == 32 42 return index 43 if isinstance(index, int): 44 return struct.pack("<I", index) 45 46 raise ValueError( 47 f"Unknown index type {type(index)} v:{index}, should be int or bytearray" 48 ) 49 50 51 def to_int(xbytes): 52 """Make little-endian integer from given bytes""" 53 return int.from_bytes(xbytes, byteorder="little") 54 55 56 def checksum(xhash, index): 57 """ 58 This method takes a transaction hash and an index and returns a checksum. 59 60 This checksum is based on 49 bits starting from the 12th byte of the 61 reversed hash. Combined with the last 15 bits of the 4 byte index. 62 """ 63 mask = 0xFFFFFFFFFFFF8000 64 magic_start_position = 12 65 66 hash_bytes = bytes.fromhex(xhash)[::-1] 67 last_20_bytes = hash_bytes[magic_start_position:] 68 69 assert len(hash_bytes) == 32 70 assert index < 2**32 71 72 hash_upper_49_bits = to_int(last_20_bytes) & mask 73 index_lower_15_bits = index & ~mask 74 return hash_upper_49_bits | index_lower_15_bits 75 76 77 def make_tuple(row): 78 kind, height, tx_hash, index, value = row 79 return ( 80 kind, 81 { 82 "hash": tx_hash, 83 "index": index 84 }, 85 height, 86 value, 87 checksum(hash_to_hex_str(tx_hash), index), 88 ) 89 90 91 def unpack_table(row_fmt, data): 92 """Function to unpack table received from libbitcoin""" 93 # Get the number of rows 94 row_size = struct.calcsize(row_fmt) 95 nrows = len(data) // row_size 96 # Unpack 97 rows = [] 98 for idx in range(nrows): 99 offset = idx * row_size 100 row = struct.unpack_from(row_fmt, data, offset) 101 rows.append(row) 102 return rows 103 104 105 class ClientSettings: 106 """Class implementing ZMQ client settings""" 107 108 def __init__(self, timeout=10, context=None, loop=None): 109 self._timeout = timeout 110 self._context = context 111 self._loop = loop 112 113 @property 114 def context(self): 115 """ZMQ context property""" 116 if not self._context: 117 ctx = zmq.asyncio.Context() 118 ctx.linger = 500 # in milliseconds 119 self._context = ctx 120 return self._context 121 122 @context.setter 123 def context(self, context): 124 self._context = context # pragma: no cover 125 126 @property 127 def timeout(self): 128 """Set to None for no timeout""" 129 return self._timeout 130 131 @timeout.setter 132 def timeout(self, timeout): 133 self._timeout = timeout # pragma: no cover 134 135 136 class Request: 137 """Class implementing a _send_ request. 138 This is either a simple request/response affair or a subscription. 139 """ 140 141 def __init__(self, socket, command, data): 142 self.id_ = create_random_id() 143 self.socket = socket 144 self.command = command 145 self.data = data 146 self.future = asyncio.Future() 147 self.queue = None 148 149 async def send(self): 150 """Send the ZMQ request""" 151 request = [self.command, struct.pack("<I", self.id_), self.data] 152 await self.socket.send_multipart(request) 153 154 def is_subscription(self): 155 """If the request is a subscription, then the response to this 156 request is a notification. 157 """ 158 return self.queue is not None 159 160 def __str__(self): 161 return "Request(command, ID) {}, {:d}".format(self.command, self.id_) 162 163 164 class InvalidServerResponseException(Exception): 165 """Exception for invalid server responses""" 166 167 168 class Response: 169 """Class implementing a request response""" 170 171 def __init__(self, frame): 172 if len(frame) != 3: 173 raise InvalidServerResponseException( 174 f"Length of the frame was not 3: {len(frame)}") 175 176 self.command = frame[0] 177 self.request_id = struct.unpack("<I", frame[1])[0] 178 error_code = struct.unpack("<I", frame[2][:4])[0] 179 self.error_code = make_error_code(error_code) 180 self.data = frame[2][4:] 181 182 def is_bound_for_queue(self): 183 return len(self.data) > 0 184 185 def __str__(self): 186 return ( 187 "Response(command, request ID, error code, data):" + 188 f" {self.command}, {self.request_id}, {self.error_code}, {self.data}" 189 ) 190 191 192 class RequestCollection: 193 """RequestCollection carries a list of Requests and matches incoming 194 responses to them. 195 """ 196 197 def __init__(self, socket, loop): 198 self._socket = socket 199 self._requests = {} 200 self._task = asyncio.ensure_future(self._run(), loop=loop) 201 202 async def _run(self): 203 while True: 204 await self._receive() 205 206 async def stop(self): 207 """Stops listening for incoming responses (or subscription) messages. 208 Returns the number of _responses_ expected but which are now dropped 209 on the floor. 210 """ 211 self._task.cancel() 212 try: 213 await self._task 214 except asyncio.CancelledError: 215 return len(self._requests) 216 217 async def _receive(self): 218 frame = await self._socket.recv_multipart() 219 response = Response(frame) 220 221 if response.request_id in self._requests: 222 self._handle_response(response) 223 else: 224 print("DEBUG; RequestCollection unhandled response %s:%s" % 225 (response.command, response.request_id)) 226 227 def _handle_response(self, response): 228 request = self._requests[response.request_id] 229 230 if request.is_subscription(): 231 if response.is_bound_for_queue(): 232 # TODO: decode the data into something usable 233 request.queue.put_nowait(response.data) 234 else: 235 request.future.set_result(response) 236 else: 237 self.delete_request(request) 238 request.future.set_result(response) 239 240 def add_request(self, request): 241 # TODO: we should maybe check if the request.id_ is unique 242 self._requests[request.id_] = request 243 244 def delete_request(self, request): 245 del self._requests[request.id_] 246 247 248 class Client: 249 """This class represents a connection to a libbitcoin server.""" 250 251 def __init__(self, log, endpoints, loop): 252 self.log = log 253 self._endpoints = endpoints 254 self._settings = ClientSettings(loop=loop) 255 self._query_socket = self._create_query_socket() 256 self._block_socket = self._create_block_socket() 257 self._request_collection = RequestCollection(self._query_socket, 258 self._settings._loop) 259 260 async def stop(self): 261 self.log.debug("zmq Client.stop()") 262 self._query_socket.close() 263 self._block_socket.close() 264 return await self._request_collection.stop() 265 266 def _create_block_socket(self): 267 socket = self._settings.context.socket( 268 zmq.SUB, # pylint: disable=E1101 269 io_loop=self._settings._loop, # pylint: disable=W0212 270 ) 271 socket.connect(self._endpoints["block"]) 272 socket.setsockopt_string(zmq.SUBSCRIBE, "") # pylint: disable=E1101 273 return socket 274 275 def _create_query_socket(self): 276 socket = self._settings.context.socket( 277 zmq.DEALER, # pylint: disable=E1101 278 io_loop=self._settings._loop, # pylint: disable=W0212 279 ) 280 socket.connect(self._endpoints["query"]) 281 return socket 282 283 async def _subscription_request(self, command, data, queue): 284 request = await self._request(command, data) 285 request.queue = queue 286 error_code, _ = await self._wait_for_response(request) 287 return error_code 288 289 async def _simple_request(self, command, data): 290 return await self._wait_for_response(await self._request(command, data)) 291 292 async def _request(self, command, data): 293 """Make a generic request. Both options are byte objects specified 294 like b'blockchain.fetch_block_header' as an example. 295 """ 296 request = Request(self._query_socket, command, data) 297 await request.send() 298 self._request_collection.add_request(request) 299 return request 300 301 async def _wait_for_response(self, request): 302 try: 303 response = await asyncio.wait_for(request.future, 304 self._settings.timeout) 305 except asyncio.TimeoutError: 306 self._request_collection.delete_request(request) 307 return ZMQError.channel_timeout, None 308 309 assert response.command == request.command 310 assert response.request_id == request.id_ 311 return response.error_code, response.data 312 313 async def server_version(self): 314 """Get the libbitcoin-server version""" 315 command = b"server.version" 316 error_code, data = await self._simple_request(command, b"") 317 if error_code: 318 return error_code, None 319 return error_code, data 320 321 async def fetch_last_height(self): 322 """Fetch the blockchain tip and return integer height""" 323 command = b"blockchain.fetch_last_height" 324 error_code, data = await self._simple_request(command, b"") 325 if error_code: 326 return error_code, None 327 return error_code, struct.unpack("<I", data)[0] 328 329 async def fetch_block_header(self, index): 330 """Fetch a block header by its height or integer index""" 331 command = b"blockchain.fetch_block_header" 332 data = pack_block_index(index) 333 return await self._simple_request(command, data) 334 335 async def fetch_block_transaction_hashes(self, index): 336 """Fetch transaction hashes in a block at height index""" 337 command = b"blockchain.fetch_block_transaction_hashes" 338 data = pack_block_index(index) 339 error_code, data = await self._simple_request(command, data) 340 if error_code: 341 return error_code, None 342 return error_code, unpack_table("32s", data) 343 344 async def fetch_blockchain_transaction(self, txid): 345 """Fetch transaction by txid (not including mempool)""" 346 command = b"blockchain.fetch_transaction2" 347 error_code, data = await self._simple_request(command, 348 bytes.fromhex(txid)[::-1]) 349 if error_code: 350 return error_code, None 351 return error_code, data 352 353 async def fetch_mempool_transaction(self, txid): 354 """Fetch transaction by txid (including mempool)""" 355 command = b"transaction_pool.fetch_transaction2" 356 error_code, data = await self._simple_request(command, 357 bytes.fromhex(txid)[::-1]) 358 if error_code: 359 return error_code, None 360 return error_code, data 361 362 async def subscribe_scripthash(self, scripthash, queue): 363 """Subscribe to scripthash""" 364 command = b"subscribe.key" 365 decoded_address = unhexlify(scripthash) 366 return await self._subscription_request(command, decoded_address, queue) 367 368 async def unsubscribe_scripthash(self, scripthash): 369 """Unsubscribe scripthash""" 370 # This call solicits a final call from the server with an 371 # `error::service_stopped` error code. 372 command = b"unsubscribe.key" 373 decoded_address = unhexlify(scripthash) 374 return await self._simple_request(command, decoded_address) 375 376 async def fetch_history4(self, scripthash, height=0): 377 """Fetch history for given scripthash""" 378 command = b"blockchain.fetch_history4" 379 decoded_address = unhexlify(scripthash) 380 error_code, raw_points = await self._simple_request( 381 command, decoded_address + struct.pack("<I", height)) 382 if error_code: 383 return error_code, None 384 385 rows = unpack_table("<BI32sIQ", raw_points) 386 points = [make_tuple(row) for row in rows] 387 correlated_points = Client.__correlate(points) 388 # self.log.debug("history points: %s", points) 389 # self.log.debug("history correlated: %s", correlated_points) 390 391 # BUG: In libbitcoin v4 sometimes transactions mess up and double 392 # https://github.com/libbitcoin/libbitcoin-server/issues/545 393 # 394 # The following is not a very efficient solution 395 correlated = [ 396 i for n, i in enumerate(correlated_points) 397 if i not in correlated_points[n + 1:] 398 ] 399 return error_code, self._sort_correlated_points(correlated) 400 401 @staticmethod 402 def _sort_correlated_points(points): 403 """Sort by ascending height""" 404 if len(points) < 2: 405 return points 406 return sorted(points, key=lambda x: list(x.values())[0]["height"]) 407 408 async def broadcast_transaction(self, rawtx): 409 """Broadcast given raw transaction""" 410 command = b"transaction_pool.broadcast" 411 return await self._simple_request(command, rawtx) 412 413 async def fetch_balance(self, scripthash): 414 """Fetch balance for given scripthash""" 415 error_code, history = await self.fetch_history4(scripthash) 416 if error_code: 417 return error_code, None 418 419 utxo = Client.__receives_without_spends(history) 420 421 return error_code, ( 422 # confirmed 423 functools.reduce( 424 lambda accumulator, point: accumulator + point["value"] 425 if point["received"]["height"] != 4294967295 else 0, 426 utxo, 427 0, 428 ), 429 # unconfirmed 430 functools.reduce( 431 lambda accumulator, point: accumulator + point["value"] 432 if point["received"]["height"] == 4294967295 else 0, 433 utxo, 434 0, 435 ), 436 ) 437 438 async def fetch_utxo(self, scripthash): 439 """Find UTXO for given scripthash""" 440 error_code, history = await self.fetch_history4(scripthash) 441 if error_code: 442 return error_code, None 443 return error_code, Client.__receives_without_spends(history) 444 445 async def subscribe_to_blocks(self, queue): 446 asyncio.ensure_future(self._listen_for_blocks(queue)) 447 return queue 448 449 async def _listen_for_blocks(self, queue): 450 """Infinite loop for block subscription. 451 Returns raw blocks as they're received. 452 """ 453 while True: 454 frame = await self._block_socket.recv_multipart() 455 seq = struct.unpack("<H", frame[0])[0] 456 height = struct.unpack("<I", frame[1])[0] 457 block_data = frame[2] 458 queue.put_nowait((seq, height, block_data)) 459 460 @staticmethod 461 def __receives_without_spends(history): 462 return (point for point in history if "spent" not in point) 463 464 @staticmethod 465 def __correlate(points): 466 transfers, checksum_to_index = Client.__find_receives(points) 467 transfers = Client.__correlate_spends_to_receives( 468 points, transfers, checksum_to_index) 469 return transfers 470 471 @staticmethod 472 def __correlate_spends_to_receives(points, transfers, checksum_to_index): 473 for point in points: 474 if point[0] == 1: # receive 475 continue 476 477 spent = { 478 "hash": point[1]["hash"], 479 "height": point[2], 480 "index": point[1]["index"], 481 } 482 if point[3] not in checksum_to_index: 483 transfers.append({"spent": spent}) 484 else: 485 transfers[checksum_to_index[point[3]]]["spent"] = spent 486 487 return transfers 488 489 @staticmethod 490 def __find_receives(points): 491 transfers = [] 492 checksum_to_index = {} 493 494 for point in points: 495 if point[0] == 0: # spent 496 continue 497 498 transfers.append({ 499 "received": { 500 "hash": point[1]["hash"], 501 "height": point[2], 502 "index": point[1]["index"], 503 }, 504 "value": point[3], 505 }) 506 507 checksum_to_index[point[4]] = len(transfers) - 1 508 509 return transfers, checksum_to_index