URI: 
       tInitial ZeroMQ client implementation. - obelisk - Electrum server using libbitcoin as its backend
  HTML git clone https://git.parazyd.org/obelisk
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
   DIR commit bae345a9b95dff3ae89ec354a12b6bab8182c7d6
   DIR parent 3fa9d2257f2d75456b9479d22a39f633fda58858
  HTML Author: parazyd <parazyd@dyne.org>
       Date:   Wed,  7 Apr 2021 16:46:53 +0200
       
       Initial ZeroMQ client implementation.
       
       Diffstat:
         A electrumobelisk/zeromq.py           |     246 +++++++++++++++++++++++++++++++
       
       1 file changed, 246 insertions(+), 0 deletions(-)
       ---
   DIR diff --git a/electrumobelisk/zeromq.py b/electrumobelisk/zeromq.py
       t@@ -0,0 +1,246 @@
       +#!/usr/bin/env python3
       +# Copyright (C) 2020-2021 Ivan J. <parazyd@dyne.org>
       +#
       +# This file is part of obelisk
       +#
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU Affero General Public License version 3
       +# as published by the Free Software Foundation.
       +#
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
       +# GNU Affero General Public License for more details.
       +#
       +# You should have received a copy of the GNU Affero General Public License
       +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
       +"""ZeroMQ implementation for libbitcoin"""
       +import asyncio
       +import struct
       +from random import randint
       +
       +import zmq
       +import zmq.asyncio
       +
       +from libbitcoin_errors import make_error_code, ErrorCode
       +
       +
       +def create_random_id():
       +    """Generate a random request ID"""
       +    max_uint32 = 4294967295
       +    return randint(0, max_uint32)
       +
       +
       +class ClientSettings:
       +    """Class implementing ZMQ client settings"""
       +    def __init__(self, timeout=10, context=None, loop=None):
       +        self._timeout = timeout
       +        self._context = context
       +        self._loop = loop
       +
       +    @property
       +    def context(self):
       +        """ZMQ context property"""
       +        if not self.context:
       +            ctx = zmq.asyncio.Context()
       +            ctx.linger = 500  # in milliseconds
       +            self._context = ctx
       +        return self._context
       +
       +    @context.setter
       +    def context(self, context):
       +        self._context = context
       +
       +    @property
       +    def timeout(self):
       +        """Set to None for no timeout"""
       +        return self._timeout
       +
       +    @timeout.setter
       +    def timeout(self, timeout):
       +        self._timeout = timeout
       +
       +
       +class Request:
       +    """Class implementing a _send_ request.
       +    This is either a simple request/response affair or a subscription.
       +    """
       +    def __init__(self, socket, command, data):
       +        self.id_ = create_random_id()
       +        self.socket = socket
       +        self.command = command
       +        self.data = data
       +        self.future = asyncio.Future()
       +        self.queue = None
       +
       +    async def send(self):
       +        """Send the ZMQ request"""
       +        request = [self.command, struct.pack("<I", self.id_), self.data]
       +        await self.socket.send_multipart(request)
       +
       +    def is_subscription(self):
       +        """If the request is a subscription, then the response to this
       +        request is a notification.
       +        """
       +        return self.queue is not None
       +
       +    def __str__(self):
       +        return "Request(command, ID) {}, {:d}".format(self.command, self.id_)
       +
       +
       +class InvalidServerResponseException(Exception):
       +    """Exception for invalid server responses"""
       +
       +
       +class Response:
       +    """Class implementing a request response"""
       +    def __init__(self, frame):
       +        if len(frame) != 3:
       +            raise InvalidServerResponseException(
       +                f"Length of the frame was not 3: {len(frame)}")
       +
       +        self.command = frame[0]
       +        self.request_id = struct.unpack("<I", frame[1])[0]
       +        error_code = struct.unpack("<I", frame[2][:4])[0]
       +        self.error_code = make_error_code(error_code)
       +        self.data = frame[2][4:]
       +
       +    def is_bound_for_queue(self):
       +        return len(self.data) > 0
       +
       +    def __str__(self):
       +        return (
       +            "Response(command, request ID, error code, data):" +
       +            f" {self.command}, {self.request_id}, {self.error_code}, {self.data}"
       +        )
       +
       +
       +class RequestCollection:
       +    """RequestCollection carries a list of Requests and matches incoming
       +    responses to them.
       +    """
       +    def __init__(self, socket, loop):
       +        self._socket = socket
       +        self._requests = {}
       +        self._task = asyncio.ensure_future(self._run(), loop=loop)
       +
       +    async def _run(self):
       +        while True:
       +            await self._receive()
       +
       +    async def stop(self):
       +        """Stops listening for incoming responses (or subscription) messages.
       +        Returns the number of _responses_ expected but which are now dropped
       +        on the floor.
       +        """
       +        self._task.cancel()
       +        try:
       +            await self._task
       +        except asyncio.CancelledError:
       +            return len(self._requests)
       +
       +    async def _receive(self):
       +        frame = await self._socket.recv_multipart()
       +        response = Response(frame)
       +
       +        if response.request_id in self._requests:
       +            self._handle_response(response)
       +        else:
       +            print(
       +                f"DEBUG: RequestCollection unhandled response {response.command}:{response.request_id}"
       +            )
       +
       +    def _handle_response(self, response):
       +        request = self._requests[response.request_id]
       +
       +        if request.is_subscription():
       +            if response.is_bound_for_queue():
       +                # TODO: decode the data into something usable
       +                request.queue.put_nowait(response.data)
       +            else:
       +                request.future.set_result(response)
       +        else:
       +            self.delete_request(request)
       +            request.future.set_result(response)
       +
       +    def add_request(self, request):
       +        # TODO: we should maybe check if the request.id_ is unique
       +        self._requests[request.id_] = request
       +
       +    def delete_request(self, request):
       +        del self._requests[request.id_]
       +
       +
       +class Client:
       +    """This class represents a connection to a libbitcoin server.
       +    hostname -- the server DNS name to connect to
       +    ports -- a dictionary containing four keys; query/heartbeat/block/tx
       +    """
       +
       +    # def __init__(self, hostname, ports, settings=ClientSettings()):
       +    def __init__(self, hostname, ports, loop):
       +        self._hostname = hostname
       +        self._ports = ports
       +        # self._settings = settings
       +        self._settings = ClientSettings(loop=loop)
       +        self._query_socket = self._create_query_socket()
       +        self._block_socket = self._create_block_socket()
       +        self._request_collection = RequestCollection(self._query_socket,
       +                                                     self._settings._loop)
       +
       +    async def stop(self):
       +        self._query_socket.close()
       +        self._block_socket.close()
       +        return await self._request_collection.stop()
       +
       +    def _create_block_socket(self):
       +        socket = self._settings.context.socket(
       +            zmq.SUB,  # pylint: disable=E1101
       +            io_loop=self._settings._loop,  # pylint: disable=W0212
       +        )
       +        socket.connect(self.__server_url(self._hostname, self._ports["block"]))
       +        socket.setsockopt_string(zmq.SUBSCRIBE, "")  # pylint: disable=E1101
       +        return socket
       +
       +    def _create_query_socket(self):
       +        socket = self._settings.context.socket(
       +            zmq.DEALER,  # pylint: disable=E1101
       +            io_loop=self._settings._loop,  # pylint: disable=W0212
       +        )
       +        socket.connect(self.__server_url(self._hostname, self._ports["query"]))
       +        return socket
       +
       +    async def _subscription_request(self, command, data):
       +        request = await self._request(command, data)
       +        request.queue = asyncio.Queue(loop=self._settings._loop)  # pylint: disable=W0212
       +        error_code, _ = await self._wait_for_response(request)
       +        return error_code, request.queue
       +
       +    async def _simple_request(self, command, data):
       +        return await self._wait_for_response(await
       +                                             self._request(command, data))
       +
       +    async def _request(self, command, data):
       +        """Make a generic request. Both options are byte objects specified
       +        like b'blockchain.fetch_block_header' as an example.
       +        """
       +        request = Request(self._query_socket, command, data)
       +        await request.send()
       +        self._request_collection.add_request(request)
       +        return request
       +
       +    async def _wait_for_response(self, request):
       +        try:
       +            response = await asyncio.wait_for(request.future,
       +                                              self._settings.timeout)
       +        except asyncio.TimeoutError:
       +            self._request_collection.delete_request(request)
       +            return ErrorCode.channel_timeout, None
       +
       +        assert response.command == request.command
       +        assert response.request_id == request.id_
       +        return response.error_code, response.data
       +
       +    @staticmethod
       +    def __server_url(hostname, port):
       +        return "tcp://" + hostname + ":" + str(port)