tzeromq.py - obelisk - Electrum server using libbitcoin as its backend
HTML git clone https://git.parazyd.org/obelisk
DIR Log
DIR Files
DIR Refs
DIR README
DIR LICENSE
---
tzeromq.py (17187B)
---
1 #!/usr/bin/env python3
2 # Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
3 #
4 # This file is part of obelisk
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License version 3
8 # as published by the Free Software Foundation.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """ZeroMQ implementation for libbitcoin"""
18 import asyncio
19 import functools
20 import struct
21 from binascii import unhexlify
22 from random import randint
23
24 import zmq
25 import zmq.asyncio
26
27 from obelisk.errors_libbitcoin import make_error_code, ZMQError
28 from obelisk.util import hash_to_hex_str
29
30
31 def create_random_id():
32 """Generate a random request ID"""
33 max_uint32 = 4294967295
34 return randint(0, max_uint32)
35
36
37 def pack_block_index(index):
38 """struct.pack given index"""
39 if isinstance(index, str):
40 index = unhexlify(index)
41 assert len(index) == 32
42 return index
43 if isinstance(index, int):
44 return struct.pack("<I", index)
45
46 raise ValueError(
47 f"Unknown index type {type(index)} v:{index}, should be int or bytearray"
48 )
49
50
51 def to_int(xbytes):
52 """Make little-endian integer from given bytes"""
53 return int.from_bytes(xbytes, byteorder="little")
54
55
56 def checksum(xhash, index):
57 """
58 This method takes a transaction hash and an index and returns a checksum.
59
60 This checksum is based on 49 bits starting from the 12th byte of the
61 reversed hash. Combined with the last 15 bits of the 4 byte index.
62 """
63 mask = 0xFFFFFFFFFFFF8000
64 magic_start_position = 12
65
66 hash_bytes = bytes.fromhex(xhash)[::-1]
67 last_20_bytes = hash_bytes[magic_start_position:]
68
69 assert len(hash_bytes) == 32
70 assert index < 2**32
71
72 hash_upper_49_bits = to_int(last_20_bytes) & mask
73 index_lower_15_bits = index & ~mask
74 return hash_upper_49_bits | index_lower_15_bits
75
76
77 def make_tuple(row):
78 kind, height, tx_hash, index, value = row
79 return (
80 kind,
81 {
82 "hash": tx_hash,
83 "index": index
84 },
85 height,
86 value,
87 checksum(hash_to_hex_str(tx_hash), index),
88 )
89
90
91 def unpack_table(row_fmt, data):
92 """Function to unpack table received from libbitcoin"""
93 # Get the number of rows
94 row_size = struct.calcsize(row_fmt)
95 nrows = len(data) // row_size
96 # Unpack
97 rows = []
98 for idx in range(nrows):
99 offset = idx * row_size
100 row = struct.unpack_from(row_fmt, data, offset)
101 rows.append(row)
102 return rows
103
104
105 class ClientSettings:
106 """Class implementing ZMQ client settings"""
107
108 def __init__(self, timeout=10, context=None, loop=None):
109 self._timeout = timeout
110 self._context = context
111 self._loop = loop
112
113 @property
114 def context(self):
115 """ZMQ context property"""
116 if not self._context:
117 ctx = zmq.asyncio.Context()
118 ctx.linger = 500 # in milliseconds
119 self._context = ctx
120 return self._context
121
122 @context.setter
123 def context(self, context):
124 self._context = context # pragma: no cover
125
126 @property
127 def timeout(self):
128 """Set to None for no timeout"""
129 return self._timeout
130
131 @timeout.setter
132 def timeout(self, timeout):
133 self._timeout = timeout # pragma: no cover
134
135
136 class Request:
137 """Class implementing a _send_ request.
138 This is either a simple request/response affair or a subscription.
139 """
140
141 def __init__(self, socket, command, data):
142 self.id_ = create_random_id()
143 self.socket = socket
144 self.command = command
145 self.data = data
146 self.future = asyncio.Future()
147 self.queue = None
148
149 async def send(self):
150 """Send the ZMQ request"""
151 request = [self.command, struct.pack("<I", self.id_), self.data]
152 await self.socket.send_multipart(request)
153
154 def is_subscription(self):
155 """If the request is a subscription, then the response to this
156 request is a notification.
157 """
158 return self.queue is not None
159
160 def __str__(self):
161 return "Request(command, ID) {}, {:d}".format(self.command, self.id_)
162
163
164 class InvalidServerResponseException(Exception):
165 """Exception for invalid server responses"""
166
167
168 class Response:
169 """Class implementing a request response"""
170
171 def __init__(self, frame):
172 if len(frame) != 3:
173 raise InvalidServerResponseException(
174 f"Length of the frame was not 3: {len(frame)}")
175
176 self.command = frame[0]
177 self.request_id = struct.unpack("<I", frame[1])[0]
178 error_code = struct.unpack("<I", frame[2][:4])[0]
179 self.error_code = make_error_code(error_code)
180 self.data = frame[2][4:]
181
182 def is_bound_for_queue(self):
183 return len(self.data) > 0
184
185 def __str__(self):
186 return (
187 "Response(command, request ID, error code, data):" +
188 f" {self.command}, {self.request_id}, {self.error_code}, {self.data}"
189 )
190
191
192 class RequestCollection:
193 """RequestCollection carries a list of Requests and matches incoming
194 responses to them.
195 """
196
197 def __init__(self, socket, loop):
198 self._socket = socket
199 self._requests = {}
200 self._task = asyncio.ensure_future(self._run(), loop=loop)
201
202 async def _run(self):
203 while True:
204 await self._receive()
205
206 async def stop(self):
207 """Stops listening for incoming responses (or subscription) messages.
208 Returns the number of _responses_ expected but which are now dropped
209 on the floor.
210 """
211 self._task.cancel()
212 try:
213 await self._task
214 except asyncio.CancelledError:
215 return len(self._requests)
216
217 async def _receive(self):
218 frame = await self._socket.recv_multipart()
219 response = Response(frame)
220
221 if response.request_id in self._requests:
222 self._handle_response(response)
223 else:
224 print("DEBUG; RequestCollection unhandled response %s:%s" %
225 (response.command, response.request_id))
226
227 def _handle_response(self, response):
228 request = self._requests[response.request_id]
229
230 if request.is_subscription():
231 if response.is_bound_for_queue():
232 # TODO: decode the data into something usable
233 request.queue.put_nowait(response.data)
234 else:
235 request.future.set_result(response)
236 else:
237 self.delete_request(request)
238 request.future.set_result(response)
239
240 def add_request(self, request):
241 # TODO: we should maybe check if the request.id_ is unique
242 self._requests[request.id_] = request
243
244 def delete_request(self, request):
245 del self._requests[request.id_]
246
247
248 class Client:
249 """This class represents a connection to a libbitcoin server."""
250
251 def __init__(self, log, endpoints, loop):
252 self.log = log
253 self._endpoints = endpoints
254 self._settings = ClientSettings(loop=loop)
255 self._query_socket = self._create_query_socket()
256 self._block_socket = self._create_block_socket()
257 self._request_collection = RequestCollection(self._query_socket,
258 self._settings._loop)
259
260 async def stop(self):
261 self.log.debug("zmq Client.stop()")
262 self._query_socket.close()
263 self._block_socket.close()
264 return await self._request_collection.stop()
265
266 def _create_block_socket(self):
267 socket = self._settings.context.socket(
268 zmq.SUB, # pylint: disable=E1101
269 io_loop=self._settings._loop, # pylint: disable=W0212
270 )
271 socket.connect(self._endpoints["block"])
272 socket.setsockopt_string(zmq.SUBSCRIBE, "") # pylint: disable=E1101
273 return socket
274
275 def _create_query_socket(self):
276 socket = self._settings.context.socket(
277 zmq.DEALER, # pylint: disable=E1101
278 io_loop=self._settings._loop, # pylint: disable=W0212
279 )
280 socket.connect(self._endpoints["query"])
281 return socket
282
283 async def _subscription_request(self, command, data, queue):
284 request = await self._request(command, data)
285 request.queue = queue
286 error_code, _ = await self._wait_for_response(request)
287 return error_code
288
289 async def _simple_request(self, command, data):
290 return await self._wait_for_response(await self._request(command, data))
291
292 async def _request(self, command, data):
293 """Make a generic request. Both options are byte objects specified
294 like b'blockchain.fetch_block_header' as an example.
295 """
296 request = Request(self._query_socket, command, data)
297 await request.send()
298 self._request_collection.add_request(request)
299 return request
300
301 async def _wait_for_response(self, request):
302 try:
303 response = await asyncio.wait_for(request.future,
304 self._settings.timeout)
305 except asyncio.TimeoutError:
306 self._request_collection.delete_request(request)
307 return ZMQError.channel_timeout, None
308
309 assert response.command == request.command
310 assert response.request_id == request.id_
311 return response.error_code, response.data
312
313 async def server_version(self):
314 """Get the libbitcoin-server version"""
315 command = b"server.version"
316 error_code, data = await self._simple_request(command, b"")
317 if error_code:
318 return error_code, None
319 return error_code, data
320
321 async def fetch_last_height(self):
322 """Fetch the blockchain tip and return integer height"""
323 command = b"blockchain.fetch_last_height"
324 error_code, data = await self._simple_request(command, b"")
325 if error_code:
326 return error_code, None
327 return error_code, struct.unpack("<I", data)[0]
328
329 async def fetch_block_header(self, index):
330 """Fetch a block header by its height or integer index"""
331 command = b"blockchain.fetch_block_header"
332 data = pack_block_index(index)
333 return await self._simple_request(command, data)
334
335 async def fetch_block_transaction_hashes(self, index):
336 """Fetch transaction hashes in a block at height index"""
337 command = b"blockchain.fetch_block_transaction_hashes"
338 data = pack_block_index(index)
339 error_code, data = await self._simple_request(command, data)
340 if error_code:
341 return error_code, None
342 return error_code, unpack_table("32s", data)
343
344 async def fetch_blockchain_transaction(self, txid):
345 """Fetch transaction by txid (not including mempool)"""
346 command = b"blockchain.fetch_transaction2"
347 error_code, data = await self._simple_request(command,
348 bytes.fromhex(txid)[::-1])
349 if error_code:
350 return error_code, None
351 return error_code, data
352
353 async def fetch_mempool_transaction(self, txid):
354 """Fetch transaction by txid (including mempool)"""
355 command = b"transaction_pool.fetch_transaction2"
356 error_code, data = await self._simple_request(command,
357 bytes.fromhex(txid)[::-1])
358 if error_code:
359 return error_code, None
360 return error_code, data
361
362 async def subscribe_scripthash(self, scripthash, queue):
363 """Subscribe to scripthash"""
364 command = b"subscribe.key"
365 decoded_address = unhexlify(scripthash)
366 return await self._subscription_request(command, decoded_address, queue)
367
368 async def unsubscribe_scripthash(self, scripthash):
369 """Unsubscribe scripthash"""
370 # This call solicits a final call from the server with an
371 # `error::service_stopped` error code.
372 command = b"unsubscribe.key"
373 decoded_address = unhexlify(scripthash)
374 return await self._simple_request(command, decoded_address)
375
376 async def fetch_history4(self, scripthash, height=0):
377 """Fetch history for given scripthash"""
378 command = b"blockchain.fetch_history4"
379 decoded_address = unhexlify(scripthash)
380 error_code, raw_points = await self._simple_request(
381 command, decoded_address + struct.pack("<I", height))
382 if error_code:
383 return error_code, None
384
385 rows = unpack_table("<BI32sIQ", raw_points)
386 points = [make_tuple(row) for row in rows]
387 correlated_points = Client.__correlate(points)
388 # self.log.debug("history points: %s", points)
389 # self.log.debug("history correlated: %s", correlated_points)
390
391 # BUG: In libbitcoin v4 sometimes transactions mess up and double
392 # https://github.com/libbitcoin/libbitcoin-server/issues/545
393 #
394 # The following is not a very efficient solution
395 correlated = [
396 i for n, i in enumerate(correlated_points)
397 if i not in correlated_points[n + 1:]
398 ]
399 return error_code, self._sort_correlated_points(correlated)
400
401 @staticmethod
402 def _sort_correlated_points(points):
403 """Sort by ascending height"""
404 if len(points) < 2:
405 return points
406 return sorted(points, key=lambda x: list(x.values())[0]["height"])
407
408 async def broadcast_transaction(self, rawtx):
409 """Broadcast given raw transaction"""
410 command = b"transaction_pool.broadcast"
411 return await self._simple_request(command, rawtx)
412
413 async def fetch_balance(self, scripthash):
414 """Fetch balance for given scripthash"""
415 error_code, history = await self.fetch_history4(scripthash)
416 if error_code:
417 return error_code, None
418
419 utxo = Client.__receives_without_spends(history)
420
421 return error_code, (
422 # confirmed
423 functools.reduce(
424 lambda accumulator, point: accumulator + point["value"]
425 if point["received"]["height"] != 4294967295 else 0,
426 utxo,
427 0,
428 ),
429 # unconfirmed
430 functools.reduce(
431 lambda accumulator, point: accumulator + point["value"]
432 if point["received"]["height"] == 4294967295 else 0,
433 utxo,
434 0,
435 ),
436 )
437
438 async def fetch_utxo(self, scripthash):
439 """Find UTXO for given scripthash"""
440 error_code, history = await self.fetch_history4(scripthash)
441 if error_code:
442 return error_code, None
443 return error_code, Client.__receives_without_spends(history)
444
445 async def subscribe_to_blocks(self, queue):
446 asyncio.ensure_future(self._listen_for_blocks(queue))
447 return queue
448
449 async def _listen_for_blocks(self, queue):
450 """Infinite loop for block subscription.
451 Returns raw blocks as they're received.
452 """
453 while True:
454 frame = await self._block_socket.recv_multipart()
455 seq = struct.unpack("<H", frame[0])[0]
456 height = struct.unpack("<I", frame[1])[0]
457 block_data = frame[2]
458 queue.put_nowait((seq, height, block_data))
459
460 @staticmethod
461 def __receives_without_spends(history):
462 return (point for point in history if "spent" not in point)
463
464 @staticmethod
465 def __correlate(points):
466 transfers, checksum_to_index = Client.__find_receives(points)
467 transfers = Client.__correlate_spends_to_receives(
468 points, transfers, checksum_to_index)
469 return transfers
470
471 @staticmethod
472 def __correlate_spends_to_receives(points, transfers, checksum_to_index):
473 for point in points:
474 if point[0] == 1: # receive
475 continue
476
477 spent = {
478 "hash": point[1]["hash"],
479 "height": point[2],
480 "index": point[1]["index"],
481 }
482 if point[3] not in checksum_to_index:
483 transfers.append({"spent": spent})
484 else:
485 transfers[checksum_to_index[point[3]]]["spent"] = spent
486
487 return transfers
488
489 @staticmethod
490 def __find_receives(points):
491 transfers = []
492 checksum_to_index = {}
493
494 for point in points:
495 if point[0] == 0: # spent
496 continue
497
498 transfers.append({
499 "received": {
500 "hash": point[1]["hash"],
501 "height": point[2],
502 "index": point[1]["index"],
503 },
504 "value": point[3],
505 })
506
507 checksum_to_index[point[4]] = len(transfers) - 1
508
509 return transfers, checksum_to_index