URI: 
       tInitial ZeroMQ Interface implementation. - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 28ec2b6be35f4d2e8dfdfbc33200ffa31478ecae
   DIR parent 139fc78e196ec76ebc22ec7b50a8e6b84522062b
  HTML Author: parazyd <parazyd@dyne.org>
       Date:   Fri, 12 Mar 2021 18:20:26 +0100
       
       Initial ZeroMQ Interface implementation.
       
       We can now fully sync the headers, and get notified about new headers.
       Testing with:
       
               ./run_electrum --testnet -V i --oneserver
       
       The libbitcoin v4 testnet server is hardcoded in the Interface class for now.
       
       grep for "TODO: libbitcoin" for further work.
       
       Diffstat:
         M electrum/interface.py               |     597 ++++++++++---------------------
         A electrum/libbitcoin_errors.py       |      72 +++++++++++++++++++++++++++++++
         A electrum/merkle.py                  |      62 +++++++++++++++++++++++++++++++
         M electrum/network.py                 |       9 +++++++--
         M electrum/synchronizer.py            |       6 ++++--
         M electrum/util.py                    |       5 ++++-
         A electrum/zeromq.py                  |     493 +++++++++++++++++++++++++++++++
       
       7 files changed, 829 insertions(+), 415 deletions(-)
       ---
   DIR diff --git a/electrum/interface.py b/electrum/interface.py
       t@@ -2,6 +2,7 @@
        #
        # Electrum - lightweight Bitcoin client
        # Copyright (C) 2011 thomasv@gitorious
       +# Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
        #
        # Permission is hereby granted, free of charge, to any person
        # obtaining a copy of this software and associated documentation files
       t@@ -23,42 +24,34 @@
        # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        # SOFTWARE.
        import os
       -import re
       -import ssl
        import sys
       -import traceback
        import asyncio
       -import socket
       -from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence
       +from typing import (Tuple, Union, List, TYPE_CHECKING, Optional, Set,
       +                    NamedTuple, Any, Sequence)
        from collections import defaultdict
       -from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
       -import itertools
       +from ipaddress import (IPv4Network, IPv6Network, ip_address, IPv6Address,
       +                       IPv4Address)
       +from binascii import hexlify, unhexlify
        import logging
       -import hashlib
       -import functools
       -
       -import aiorpcx
       -from aiorpcx import TaskGroup
       -from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
       -from aiorpcx.curio import timeout_after, TaskTimeout
       -from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
       -from aiorpcx.rawsocket import RSClient
       +
       +from aiorpcx import NetAddress
        import certifi
        
       -from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy,
       -                   is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
       -                   is_int_or_float, is_non_negative_int_or_float)
       +from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup,
       +                   MySocksProxy, is_integer, is_non_negative_integer,
       +                   is_hash256_str, is_hex_str, is_int_or_float,
       +                   is_non_negative_int_or_float)
        from . import util
       -from . import x509
       -from . import pem
        from . import version
        from . import blockchain
        from .blockchain import Blockchain, HEADER_SIZE
        from . import bitcoin
        from . import constants
       +from . import zeromq
        from .i18n import _
        from .logging import Logger
        from .transaction import Transaction
       +from .merkle import merkle_branch
        
        if TYPE_CHECKING:
            from .network import Network
       t@@ -126,102 +119,11 @@ def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
                raise RequestCorrupted(f'required field {field_name!r} missing from dict')
            return d[field_name]
        
       -
        def assert_list_or_tuple(val: Any) -> None:
            if not isinstance(val, (list, tuple)):
                raise RequestCorrupted(f'{val!r} should be a list or tuple')
        
        
       -class NotificationSession(RPCSession):
       -
       -    def __init__(self, *args, interface: 'Interface', **kwargs):
       -        super(NotificationSession, self).__init__(*args, **kwargs)
       -        self.subscriptions = defaultdict(list)
       -        self.cache = {}
       -        self.default_timeout = NetworkTimeout.Generic.NORMAL
       -        self._msg_counter = itertools.count(start=1)
       -        self.interface = interface
       -        self.cost_hard_limit = 0  # disable aiorpcx resource limits
       -
       -    async def handle_request(self, request):
       -        self.maybe_log(f"--> {request}")
       -        try:
       -            if isinstance(request, Notification):
       -                params, result = request.args[:-1], request.args[-1]
       -                key = self.get_hashable_key_for_rpc_call(request.method, params)
       -                if key in self.subscriptions:
       -                    self.cache[key] = result
       -                    for queue in self.subscriptions[key]:
       -                        await queue.put(request.args)
       -                else:
       -                    raise Exception(f'unexpected notification')
       -            else:
       -                raise Exception(f'unexpected request. not a notification')
       -        except Exception as e:
       -            self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
       -            await self.close()
       -
       -    async def send_request(self, *args, timeout=None, **kwargs):
       -        # note: semaphores/timeouts/backpressure etc are handled by
       -        # aiorpcx. the timeout arg here in most cases should not be set
       -        msg_id = next(self._msg_counter)
       -        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
       -        try:
       -            # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
       -            # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
       -            response = await asyncio.wait_for(
       -                super().send_request(*args, **kwargs),
       -                timeout)
       -        except (TaskTimeout, asyncio.TimeoutError) as e:
       -            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
       -        except CodeMessageError as e:
       -            self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
       -            raise
       -        else:
       -            self.maybe_log(f"--> {response} (id: {msg_id})")
       -            return response
       -
       -    def set_default_timeout(self, timeout):
       -        self.sent_request_timeout = timeout
       -        self.max_send_delay = timeout
       -
       -    async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
       -        # note: until the cache is written for the first time,
       -        # each 'subscribe' call might make a request on the network.
       -        key = self.get_hashable_key_for_rpc_call(method, params)
       -        self.subscriptions[key].append(queue)
       -        if key in self.cache:
       -            result = self.cache[key]
       -        else:
       -            result = await self.send_request(method, params)
       -            self.cache[key] = result
       -        await queue.put(params + [result])
       -
       -    def unsubscribe(self, queue):
       -        """Unsubscribe a callback to free object references to enable GC."""
       -        # note: we can't unsubscribe from the server, so we keep receiving
       -        # subsequent notifications
       -        for v in self.subscriptions.values():
       -            if queue in v:
       -                v.remove(queue)
       -
       -    @classmethod
       -    def get_hashable_key_for_rpc_call(cls, method, params):
       -        """Hashable index for subscriptions and cache"""
       -        return str(method) + repr(params)
       -
       -    def maybe_log(self, msg: str) -> None:
       -        if not self.interface: return
       -        if self.interface.debug or self.interface.network.debug:
       -            self.interface.logger.debug(msg)
       -
       -    def default_framer(self):
       -        # overridden so that max_size can be customized
       -        max_size = int(self.interface.network.config.get('network_max_incoming_msg_size',
       -                                                         MAX_INCOMING_MSG_SIZE))
       -        return NewlineFramer(max_size=max_size)
       -
       -
        class NetworkException(Exception): pass
        
        
       t@@ -248,15 +150,6 @@ class InvalidOptionCombination(Exception): pass
        class ConnectError(NetworkException): pass
        
        
       -class _RSClient(RSClient):
       -    async def create_connection(self):
       -        try:
       -            return await super().create_connection()
       -        except OSError as e:
       -            # note: using "from e" here will set __cause__ of ConnectError
       -            raise ConnectError(e) from e
       -
       -
        class ServerAddr:
        
            def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
       t@@ -284,6 +177,7 @@ class ServerAddr:
                host, port, protocol = str(s).rsplit(':', 2)
                return ServerAddr(host=host, port=port, protocol=protocol)
        
       +
            @classmethod
            def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
                """Construct ServerAddr from str, guessing missing details.
       t@@ -328,7 +222,7 @@ class ServerAddr:
                        and self.protocol == other.protocol)
        
            def __ne__(self, other):
       -        return not (self == other)
       +        return not self == other
        
            def __hash__(self):
                return hash((self.host, self.port, self.protocol))
       t@@ -346,11 +240,18 @@ def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
            return os.path.join(config.path, 'certs', filename)
        
        
       +from datetime import datetime
       +def __(msg):
       +    print("***********************")
       +    print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
       +
       +
        class Interface(Logger):
        
            LOGGING_SHORTCUT = 'i'
        
            def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]):
       +        __("Interface: __init__")
                self.ready = asyncio.Future()
                self.got_disconnected = asyncio.Event()
                self.server = server
       t@@ -364,6 +265,15 @@ class Interface(Logger):
                self.session = None  # type: Optional[NotificationSession]
                self._ipaddr_bucket = None
        
       +        # TODO: libbitcoin (these are for testnet2.libbitcoin.net)
       +        # This should be incorporated with ServerAddr somehow.
       +        self.client = None
       +        self.bs = 'testnet2.libbitcoin.net'
       +        self.bsports = {'query': 29091,
       +                        'heartbeat': 29092,
       +                        'block': 29093,
       +                        'tx': 29094}
       +
                # Latest block header and corresponding height, as claimed by the server.
                # Note that these values are updated before they are verified.
                # Especially during initial header sync, verification can take a long time.
       t@@ -379,6 +289,7 @@ class Interface(Logger):
                self.taskgroup = SilentTaskGroup()
        
                async def spawn_task():
       +            __("Interface: spawn_task")
                    task = await self.network.taskgroup.spawn(self.run())
                    if sys.version_info >= (3, 8):
                        task.set_name(f"interface::{str(server)}")
       t@@ -402,125 +313,23 @@ class Interface(Logger):
            def __str__(self):
                return f"<Interface {self.diagnostic_name()}>"
        
       -    async def is_server_ca_signed(self, ca_ssl_context):
       -        """Given a CA enforcing SSL context, returns True if the connection
       -        can be established. Returns False if the server has a self-signed
       -        certificate but otherwise is okay. Any other failures raise.
       -        """
       -        try:
       -            await self.open_session(ca_ssl_context, exit_early=True)
       -        except ConnectError as e:
       -            cause = e.__cause__
       -            if isinstance(cause, ssl.SSLError) and cause.reason == 'CERTIFICATE_VERIFY_FAILED':
       -                # failures due to self-signed certs are normal
       -                return False
       -            raise
       -        return True
       -
       -    async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
       -        ca_signed = await self.is_server_ca_signed(ca_ssl_context)
       -        if ca_signed:
       -            if self._get_expected_fingerprint():
       -                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
       -            with open(self.cert_path, 'w') as f:
       -                # empty file means this is CA signed, not self-signed
       -                f.write('')
       -        else:
       -            await self._save_certificate()
       -
       -    def _is_saved_ssl_cert_available(self):
       -        if not os.path.exists(self.cert_path):
       -            return False
       -        with open(self.cert_path, 'r') as f:
       -            contents = f.read()
       -        if contents == '':  # CA signed
       -            if self._get_expected_fingerprint():
       -                raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
       -            return True
       -        # pinned self-signed cert
       -        try:
       -            b = pem.dePem(contents, 'CERTIFICATE')
       -        except SyntaxError as e:
       -            self.logger.info(f"error parsing already saved cert: {e}")
       -            raise ErrorParsingSSLCert(e) from e
       -        try:
       -            x = x509.X509(b)
       -        except Exception as e:
       -            self.logger.info(f"error parsing already saved cert: {e}")
       -            raise ErrorParsingSSLCert(e) from e
       -        try:
       -            x.check_date()
       -        except x509.CertificateError as e:
       -            self.logger.info(f"certificate has expired: {e}")
       -            os.unlink(self.cert_path)  # delete pinned cert only in this case
       -            return False
       -        self._verify_certificate_fingerprint(bytearray(b))
       -        return True
       -
       -    async def _get_ssl_context(self):
       -        if self.protocol != 's':
       -            # using plaintext TCP
       -            return None
       -
       -        # see if we already have cert for this server; or get it for the first time
       -        ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
       -        if not self._is_saved_ssl_cert_available():
       -            try:
       -                await self._try_saving_ssl_cert_for_first_time(ca_sslc)
       -            except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
       -                raise ErrorGettingSSLCertFromServer(e) from e
       -        # now we have a file saved in our certificate store
       -        siz = os.stat(self.cert_path).st_size
       -        if siz == 0:
       -            # CA signed cert
       -            sslc = ca_sslc
       -        else:
       -            # pinned self-signed cert
       -            sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
       -            sslc.check_hostname = 0
       -        return sslc
       -
       -    def handle_disconnect(func):
       -        @functools.wraps(func)
       -        async def wrapper_func(self: 'Interface', *args, **kwargs):
       -            try:
       -                return await func(self, *args, **kwargs)
       -            except GracefulDisconnect as e:
       -                self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
       -            except aiorpcx.jsonrpc.RPCError as e:
       -                self.logger.warning(f"disconnecting due to {repr(e)}")
       -                self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
       -            finally:
       -                self.got_disconnected.set()
       -                await self.network.connection_down(self)
       -                # if was not 'ready' yet, schedule waiting coroutines:
       -                self.ready.cancel()
       -        return wrapper_func
       -
       -    @ignore_exceptions  # do not kill network.taskgroup
       +    # @ignore_exceptions  # do not kill network.taskgroup
            @log_exceptions
       -    @handle_disconnect
       +    # @handle_disconnect
            async def run(self):
       -        try:
       -            ssl_context = await self._get_ssl_context()
       -        except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
       -            self.logger.info(f'disconnecting due to: {repr(e)}')
       -            return
       -        try:
       -            await self.open_session(ssl_context)
       -        except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
       -            # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
       -            if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
       -                    and self.is_main_server() and not self.network.auto_connect):
       -                self.logger.warning(f'Cannot connect to main server due to SSL error '
       -                                    f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
       -            else:
       -                self.logger.info(f'disconnecting due to: {repr(e)}')
       -            return
       +        __("Interface: run")
       +        self.client = zeromq.Client(self.bs, self.bsports,
       +                                    loop=self.network.asyncio_loop)
       +        async with self.taskgroup as group:
       +            await group.spawn(self.ping)
       +            await group.spawn(self.request_fee_estimates)
       +            await group.spawn(self.run_fetch_blocks)
       +            await group.spawn(self.monitor_connection)
        
            def _mark_ready(self) -> None:
       +        __("Interface: _mark_ready")
                if self.ready.cancelled():
       -            raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
       +            raise GracefulDisconnect('conn establishment was too slow; %s' % '*ready* future was cancelled')
                if self.ready.done():
                    return
        
       t@@ -536,61 +345,20 @@ class Interface(Logger):
        
                self.ready.set_result(1)
        
       -    async def _save_certificate(self) -> None:
       -        if not os.path.exists(self.cert_path):
       -            # we may need to retry this a few times, in case the handshake hasn't completed
       -            for _ in range(10):
       -                dercert = await self._fetch_certificate()
       -                if dercert:
       -                    self.logger.info("succeeded in getting cert")
       -                    self._verify_certificate_fingerprint(dercert)
       -                    with open(self.cert_path, 'w') as f:
       -                        cert = ssl.DER_cert_to_PEM_cert(dercert)
       -                        # workaround android bug
       -                        cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
       -                        f.write(cert)
       -                        # even though close flushes we can't fsync when closed.
       -                        # and we must flush before fsyncing, cause flush flushes to OS buffer
       -                        # fsync writes to OS buffer to disk
       -                        f.flush()
       -                        os.fsync(f.fileno())
       -                    break
       -                await asyncio.sleep(1)
       -            else:
       -                raise GracefulDisconnect("could not get certificate after 10 tries")
       -
       -    async def _fetch_certificate(self) -> bytes:
       -        sslc = ssl.SSLContext()
       -        async with _RSClient(session_factory=RPCSession,
       -                             host=self.host, port=self.port,
       -                             ssl=sslc, proxy=self.proxy) as session:
       -            asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
       -            ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
       -            return ssl_object.getpeercert(binary_form=True)
       -
       -    def _get_expected_fingerprint(self) -> Optional[str]:
       -        if self.is_main_server():
       -            return self.network.config.get("serverfingerprint")
       -
       -    def _verify_certificate_fingerprint(self, certificate):
       -        expected_fingerprint = self._get_expected_fingerprint()
       -        if not expected_fingerprint:
       -            return
       -        fingerprint = hashlib.sha256(certificate).hexdigest()
       -        fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
       -        if not fingerprints_match:
       -            util.trigger_callback('cert_mismatch')
       -            raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
       -        self.logger.info("cert fingerprint verification passed")
       -
            async def get_block_header(self, height, assert_mode):
       +        __(f"Interface: get_block_header: {height}")
                self.logger.info(f'requesting block header {height} in mode {assert_mode}')
                # use lower timeout as we usually have network.bhi_lock here
                timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
       -        res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
       -        return blockchain.deserialize_header(bytes.fromhex(res), height)
       +        # ORIG: res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
       +        _ec, res = await self.client.block_header(height)
       +        if _ec is not None and _ec != 0:
       +            raise RequestCorrupted(f'got error {_ec}')
       +        #return blockchain.deserialize_header(bytes.fromhex(res), height)
       +        return blockchain.deserialize_header(res, height)
        
            async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
       +        __("Interface: request_chunk")
                if not is_non_negative_integer(height):
                    raise Exception(f"{repr(height)} is not a block height")
                index = height // 2016
       t@@ -603,9 +371,23 @@ class Interface(Logger):
                    size = max(size, 0)
                try:
                    self._requested_chunks.add(index)
       -            res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
       +            #ORIG: res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
       +            concat = bytearray()
       +            for i in range(size):
       +                _ec, data = await self.client.block_header(index*2016+i)
       +                if _ec is not None and _ec != 0:
       +                    # TODO: Don't imply error means we reached tip
       +                    break
       +                concat.extend(data)
                finally:
                    self._requested_chunks.discard(index)
       +        # TODO: max in case of libbitcoin is unnecessary
       +        res = {
       +            'hex': str(hexlify(concat), 'utf-8'),
       +            'count': len(concat)//80,
       +            'max': 2016,
       +        }
       +        # TODO: cleanup
                assert_dict_contains_field(res, field_name='count')
                assert_dict_contains_field(res, field_name='hex')
                assert_dict_contains_field(res, field_name='max')
       t@@ -625,58 +407,30 @@ class Interface(Logger):
                return conn, res['count']
        
            def is_main_server(self) -> bool:
       +        # __("Interface: is_main_server")
                return (self.network.interface == self or
                        self.network.interface is None and self.network.default_server == self.server)
        
       -    async def open_session(self, sslc, exit_early=False):
       -        session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
       -        async with _RSClient(session_factory=session_factory,
       -                             host=self.host, port=self.port,
       -                             ssl=sslc, proxy=self.proxy) as session:
       -            self.session = session  # type: NotificationSession
       -            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
       -            try:
       -                ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
       -            except aiorpcx.jsonrpc.RPCError as e:
       -                raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
       -            if exit_early:
       -                return
       -            if ver[1] != version.PROTOCOL_VERSION:
       -                raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
       -                                         f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
       -            if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
       -                raise GracefulDisconnect(f'too many connected servers already '
       -                                         f'in bucket {self.bucket_based_on_ipaddress()}')
       -            self.logger.info(f"connection established. version: {ver}")
       -
       -            try:
       -                async with self.taskgroup as group:
       -                    await group.spawn(self.ping)
       -                    await group.spawn(self.request_fee_estimates)
       -                    await group.spawn(self.run_fetch_blocks)
       -                    await group.spawn(self.monitor_connection)
       -            except aiorpcx.jsonrpc.RPCError as e:
       -                if e.code in (JSONRPC.EXCESSIVE_RESOURCE_USAGE,
       -                              JSONRPC.SERVER_BUSY,
       -                              JSONRPC.METHOD_NOT_FOUND):
       -                    raise GracefulDisconnect(e, log_level=logging.WARNING) from e
       -                raise
       -
            async def monitor_connection(self):
       +        __("Interface: monitor_connection")
                while True:
                    await asyncio.sleep(1)
       -            if not self.session or self.session.is_closing():
       +            if not self.client:
       +                # TODO: libbitcoin ^ Implement is_closing() in zeromq.Client and check ^
                        raise GracefulDisconnect('session was closed')
        
            async def ping(self):
       +        __("Interface: ping")
                while True:
                    await asyncio.sleep(300)
       -            await self.session.send_request('server.ping')
       +            __("Interface: ping loop iteration")
       +            # TODO: libbitcoin bs heartbeat service here?
        
            async def request_fee_estimates(self):
       +        __("Interface: request_fee_estimates")
                from .simple_config import FEE_ETA_TARGETS
                while True:
       -            async with TaskGroup() as group:
       +            async with SilentTaskGroup() as group:
                        fee_tasks = []
                        for i in FEE_ETA_TARGETS:
                            fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
       t@@ -688,26 +442,24 @@ class Interface(Logger):
                    await asyncio.sleep(60)
        
            async def close(self, *, force_after: int = None):
       -        """Closes the connection and waits for it to be closed.
       -        We try to flush buffered data to the wire, so this can take some time.
       -        """
       -        if force_after is None:
       -            # We give up after a while and just abort the connection.
       -            # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
       -            #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
       -            force_after = 1  # seconds
       +        __("Interface: close")
       +        # TODO: libbitcoin
                if self.session:
       -            await self.session.close(force_after=force_after)
       -        # monitor_connection will cancel tasks
       +            await self.session.stop()
       +        if self.client:
       +            await self.client.stop()
        
            async def run_fetch_blocks(self):
       +        __("Interface: run_fetch_blocks")
                header_queue = asyncio.Queue()
       -        await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
       +        # ORIG: await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
       +        await self.client.subscribe_to_blocks(header_queue)
                while True:
                    item = await header_queue.get()
       -            raw_header = item[0]
       -            height = raw_header['height']
       -            header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
       +            # TODO: block to header
       +            header = item[2]
       +            height = item[1]
       +            header = blockchain.deserialize_header(header, height)
                    self.tip_header = header
                    self.tip = height
                    if self.tip < constants.net.max_checkpoint():
       t@@ -721,6 +473,7 @@ class Interface(Logger):
                    await self.network.switch_lagging_interface()
        
            async def _process_header_at_tip(self):
       +        __("Interface: _process_header_at_tip")
                height, header = self.tip, self.tip_header
                async with self.network.bhi_lock:
                    if self.blockchain.height() >= height and self.blockchain.check_header(header):
       t@@ -733,6 +486,7 @@ class Interface(Logger):
                        await self.sync_until(height)
        
            async def sync_until(self, height, next_height=None):
       +        __("Interface: sync_until")
                if next_height is None:
                    next_height = self.tip
                last = None
       t@@ -755,6 +509,7 @@ class Interface(Logger):
                return last, height
        
            async def step(self, height, header=None):
       +        __("Interface: step")
                assert 0 <= height <= self.tip, (height, self.tip)
                if header is None:
                    header = await self.get_block_header(height, 'catchup')
       t@@ -787,6 +542,7 @@ class Interface(Logger):
                return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
        
            async def _search_headers_binary(self, height, bad, bad_header, chain):
       +        __("Interface: _search_headers_binary")
                assert bad == bad_header['block_height']
                _assert_header_does_not_check_against_any_chain(bad_header)
        
       t@@ -817,6 +573,7 @@ class Interface(Logger):
                return good, bad, bad_header
        
            async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
       +        __("Interface: _resolve_potential_chain_fork_given_forkpoint")
                assert good + 1 == bad
                assert bad == bad_header['block_height']
                _assert_header_does_not_check_against_any_chain(bad_header)
       t@@ -840,6 +597,7 @@ class Interface(Logger):
                return 'fork', height
        
            async def _search_headers_backwards(self, height, header):
       +        __("Interface: _search_headers_backwards")
                async def iterate():
                    nonlocal height, header
                    checkp = False
       t@@ -871,24 +629,31 @@ class Interface(Logger):
        
            @classmethod
            def client_name(cls) -> str:
       +        __("Interface: client_name")
                return f'electrum/{version.ELECTRUM_VERSION}'
        
            def is_tor(self):
       +        __("Interface: is_tor")
                return self.host.endswith('.onion')
        
            def ip_addr(self) -> Optional[str]:
       -        session = self.session
       -        if not session: return None
       -        peer_addr = session.remote_address()
       -        if not peer_addr: return None
       -        return str(peer_addr.host)
       +        __("Interface: ip_addr")
       +        return None
       +        # TODO: libbitcoin
       +        # This seems always None upstream since remote_address does not exist?
       +        # session = self.session
       +        # if not session: return None
       +        # peer_addr = session.remote_address()
       +        # if not peer_addr: return None
       +        # return str(peer_addr.host)
        
            def bucket_based_on_ipaddress(self) -> str:
       +        __("Interface: bucket_based_on_ipaddress")
                def do_bucket():
                    if self.is_tor():
                        return BUCKET_NAME_OF_ONION_SERVERS
                    try:
       -                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
       +                ip_addr = ip_address(self.ip_addr())  # type: Union[IPv5Address, IPv6Address]
                    except ValueError:
                        return ''
                    if not ip_addr:
       t@@ -908,13 +673,20 @@ class Interface(Logger):
                return self._ipaddr_bucket
        
            async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
       +        __("Interface: get_merkle_for_transaction")
                if not is_hash256_str(tx_hash):
                    raise Exception(f"{repr(tx_hash)} is not a txid")
                if not is_non_negative_integer(tx_height):
                    raise Exception(f"{repr(tx_height)} is not a block height")
                # do request
       -        res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
       -        # check response
       +        # ORIG: res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
       +        # TODO: Rework to use txid rather than height with libbitcoin?
       +        _ec, hashes = await self.client.block_transaction_hashes(tx_height)
       +        if _ec is not None and _ec != 0:
       +            raise RequestCorrupted(f'got error {_ec}')
       +        tx_pos = hashes.index(unhexlify(tx_hash)[::-1])
       +        branch = merkle_branch(hashes, tx_pos)
       +        res = {'block_height': tx_height, 'merkle': branch, 'pos': tx_pos}
                block_height = assert_dict_contains_field(res, field_name='block_height')
                merkle = assert_dict_contains_field(res, field_name='merkle')
                pos = assert_dict_contains_field(res, field_name='pos')
       t@@ -927,9 +699,12 @@ class Interface(Logger):
                return res
        
            async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
       +        __("Interface: get_transaction")
                if not is_hash256_str(tx_hash):
                    raise Exception(f"{repr(tx_hash)} is not a txid")
       -        raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
       +        # ORIG: raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
       +        raw = self.client.mempool_transaction(tx_hash)
       +        # raw = self.client.transaction(tx_hash)
                # validate response
                if not is_hex_str(raw):
                    raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
       t@@ -942,11 +717,18 @@ class Interface(Logger):
                    raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
                return raw
        
       +
            async def get_history_for_scripthash(self, sh: str) -> List[dict]:
       +        __(f"Interface: get_history_for_scripthash {sh}")
                if not is_hash256_str(sh):
                    raise Exception(f"{repr(sh)} is not a scripthash")
                # do request
       -        res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
       +        # ORIG: res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
       +        _ec, history = await self.client.history4(sh)
       +        if _ec is not None and _ec != 0:
       +            raise RequestCorrupted('got error %d' % _ec)
       +        __("Interface: get_history_for_scripthash: got history: %s" % (history))
       +        res = {}
                # check response
                assert_list_or_tuple(res)
                prev_height = 1
       t@@ -970,13 +752,20 @@ class Interface(Logger):
                    # a recently mined tx could be included in both last block and mempool?
                    # Still, it's simplest to just disregard the response.
                    raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
       +
                return res
        
            async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
       +        __(f"Interface: listunspent_for_scripthash {sh}")
                if not is_hash256_str(sh):
                    raise Exception(f"{repr(sh)} is not a scripthash")
                # do request
       -        res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
       +        # ORIG: res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
       +        _ec, unspent = await self.client.unspent(sh)
       +        if _ec is not None and _ec != 0:
       +            raise RequestCorrupted('got error %d' % _ec)
       +        __("Interface: listunspent_for_scripthash: got unspent: %s" % unspent)
       +        res = {}
                # check response
                assert_list_or_tuple(res)
                for utxo_item in res:
       t@@ -991,10 +780,17 @@ class Interface(Logger):
                return res
        
            async def get_balance_for_scripthash(self, sh: str) -> dict:
       +        __(f"Interface: get_balance_for_scripthash {sh}")
                if not is_hash256_str(sh):
                    raise Exception(f"{repr(sh)} is not a scripthash")
                # do request
       -        res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
       +        # ORIG: res = await self.sessions.send_request('blockchains.scripthash.get_balance', [sh])
       +        _ec, balance = await self.client.balance(sh)
       +        if _ec is not None and _ec != 0:
       +            raise RequestCorrupted('got error %d' % _ec)
       +        __("Interface: get_balance_for_scripthash: got balance: %s" % balance)
       +        # TODO: libbitcoin
       +        res = {}
                # check response
                assert_dict_contains_field(res, field_name='confirmed')
                assert_dict_contains_field(res, field_name='unconfirmed')
       t@@ -1003,30 +799,40 @@ class Interface(Logger):
                return res
        
            async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
       +        __("Interface: get_txid_from_txpos")
                if not is_non_negative_integer(tx_height):
                    raise Exception(f"{repr(tx_height)} is not a block height")
                if not is_non_negative_integer(tx_pos):
                    raise Exception(f"{repr(tx_pos)} should be non-negative integer")
                # do request
       -        res = await self.session.send_request(
       -            'blockchain.transaction.id_from_pos',
       -            [tx_height, tx_pos, merkle],
       -        )
       +        # ORIG: res = await self.session.send_request(
       +            # 'blockchain.transaction.id_from_pos',
       +            # [tx_height, tx_pos, merkle],
       +        # )
       +        _ec, hashes = await self.client.block_transaction_hashes(tx_height)
       +        if _ec is not None and _ec != 0:
       +            raise RequestCorrupted('got error %d' % _ec)
       +        txid = hexlify(hashes[tx_pos][::-1])
                # check response
       -        if merkle:
       -            assert_dict_contains_field(res, field_name='tx_hash')
       -            assert_dict_contains_field(res, field_name='merkle')
       -            assert_hash256_str(res['tx_hash'])
       -            assert_list_or_tuple(res['merkle'])
       -            for node_hash in res['merkle']:
       -                assert_hash256_str(node_hash)
       -        else:
       -            assert_hash256_str(res)
       +        if not merkle:
       +            assert_hash256_str(txid)
       +            return txid
       +        branch = merkle_branch(hashes, tx_pos)
       +        res = {'tx_hash': txid, 'merkle': branch}
       +        assert_dict_contains_field(res, field_name='tx_hash')
       +        assert_dict_contains_field(res, field_name='merkle')
       +        assert_hash256_str(res['tx_hash'])
       +        assert_list_or_tuple(res['merkle'])
       +        for node_hash in res['merkle']:
       +            assert_hash256_str(node_hash)
                return res
        
            async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
       +        __("Interface: get_fee_histogram")
                # do request
       -        res = await self.session.send_request('mempool.get_fee_histogram')
       +        # ORIG: res = await self.session.send_request('mempool.get_fee_histogram')
       +        # TODO: libbitcoin
       +        res = [[0, 0]]
                # check response
                assert_list_or_tuple(res)
                prev_fee = float('inf')
       t@@ -1039,16 +845,22 @@ class Interface(Logger):
                return res
        
            async def get_server_banner(self) -> str:
       +        __("Interface: get_server_banner")
                # do request
       -        res = await self.session.send_request('server.banner')
       +        # ORIG: res = await self.session.send_request('server.banner')
       +        # TODO: libbitcoin
       +        res = 'libbitcoin'
                # check response
                if not isinstance(res, str):
                    raise RequestCorrupted(f'{res!r} should be a str')
                return res
        
            async def get_donation_address(self) -> str:
       +        __("Interface: get_donation_address")
                # do request
       -        res = await self.session.send_request('server.donation_address')
       +        # ORIG: res = await self.session.send_request('server.donation_address')
       +        # TODO: libbitcoin
       +        res = None
                # check response
                if not res:  # ignore empty string
                    return ''
       t@@ -1061,8 +873,11 @@ class Interface(Logger):
        
            async def get_relay_fee(self) -> int:
                """Returns the min relay feerate in sat/kbyte."""
       +        __("Interface: get_relay_fee")
                # do request
       -        res = await self.session.send_request('blockchain.relayfee')
       +        # ORIG: res = await self.session.send_request('blockchain.relayfee')
       +        # TODO: libbitcoin
       +        res = 0.00001
                # check response
                assert_non_negative_int_or_float(res)
                relayfee = int(res * bitcoin.COIN)
       t@@ -1070,13 +885,16 @@ class Interface(Logger):
                return relayfee
        
            async def get_estimatefee(self, num_blocks: int) -> int:
       -        """Returns a feerate estimate for getting confirmed within
       +        """Returns a feerate estimtte for getting confirmed within
                num_blocks blocks, in sat/kbyte.
                """
       +        __("Interface: get_estimatefee")
                if not is_non_negative_integer(num_blocks):
                    raise Exception(f"{repr(num_blocks)} is not a num_blocks")
                # do request
       -        res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
       +        # ORIG: res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
       +        # TODO: libbitcoin
       +        res = -1
                # check response
                if res != -1:
                    assert_non_negative_int_or_float(res)
       t@@ -1085,48 +903,7 @@ class Interface(Logger):
        
        
        def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
       +    __("Interface: _assert_header_does_not_check_against_any_chain")
            chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
            if chain_bad:
                raise Exception('bad_header must not check!')
       -
       -
       -def check_cert(host, cert):
       -    try:
       -        b = pem.dePem(cert, 'CERTIFICATE')
       -        x = x509.X509(b)
       -    except:
       -        traceback.print_exc(file=sys.stdout)
       -        return
       -
       -    try:
       -        x.check_date()
       -        expired = False
       -    except:
       -        expired = True
       -
       -    m = "host: %s\n"%host
       -    m += "has_expired: %s\n"% expired
       -    util.print_msg(m)
       -
       -
       -# Used by tests
       -def _match_hostname(name, val):
       -    if val == name:
       -        return True
       -
       -    return val.startswith('*.') and name.endswith(val[1:])
       -
       -
       -def test_certificates():
       -    from .simple_config import SimpleConfig
       -    config = SimpleConfig()
       -    mydir = os.path.join(config.path, "certs")
       -    certs = os.listdir(mydir)
       -    for c in certs:
       -        p = os.path.join(mydir,c)
       -        with open(p, encoding='utf-8') as f:
       -            cert = f.read()
       -        check_cert(c, cert)
       -
       -if __name__ == "__main__":
       -    test_certificates()
   DIR diff --git a/electrum/libbitcoin_errors.py b/electrum/libbitcoin_errors.py
       t@@ -0,0 +1,72 @@
       +import enum
       +
       +
       +def make_error_code(ec):
       +    if not ec:
       +        return None
       +    return ErrorCode(ec)
       +
       +
       +class ErrorCode(enum.Enum):
       +
       +    nothing = 0
       +
       +    service_stopped = 1
       +    operation_failed = 2
       +
       +    # blockchain errors
       +    not_found = 3
       +    duplicate = 4
       +    unspent_output = 5
       +    unsupported_payment_type = 6
       +
       +    # network errors
       +    resolve_failed = 7
       +    network_unreachable = 8
       +    address_in_use = 9
       +    listen_failed = 10
       +    accept_failed = 11
       +    bad_stream = 12
       +    channel_timeout = 13
       +
       +    # transaction pool
       +    blockchain_reorganized = 14
       +    pool_filled = 15
       +
       +    # validate tx
       +    coinbase_transaction = 16
       +    is_not_standard = 17
       +    double_spend = 18
       +    input_not_found = 19
       +
       +    # check_transaction()
       +    empty_transaction = 20
       +    output_value_overflow = 21
       +    invalid_coinbase_script_size = 22
       +    previous_output_null = 23
       +
       +    # validate block
       +    previous_block_invalid = 24
       +
       +    # check_block()
       +    size_limits = 25
       +    proof_of_work = 26
       +    futuristic_timestamp = 27
       +    first_not_coinbase = 28
       +    extra_coinbases = 29
       +    too_many_sigs = 30
       +    merkle_mismatch = 31
       +
       +    # accept_block()
       +    incorrect_proof_of_work = 32
       +    timestamp_too_early = 33
       +    non_final_transaction = 34
       +    checkpoints_failed = 35
       +    old_version_block = 36
       +    coinbase_height_mismatch = 37
       +
       +    # connect_block()
       +    duplicate_or_spent = 38
       +    validate_inputs_failed = 39
       +    fees_out_of_range = 40
       +    coinbase_too_large = 41
   DIR diff --git a/electrum/merkle.py b/electrum/merkle.py
       t@@ -0,0 +1,62 @@
       +#!/usr/bin/env python
       +#
       +# Electrum -lightweight Bitcoin client
       +# Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
       +#
       +# Permission is hereby granted, free of charge, to any person
       +# obtaining a copy of this software and associated documentation files
       +# (the "Software"), to deal in the Software without restriction,
       +# including without limitation the rights to use, copy, modify, merge,
       +# publish, distribute, sublicense, and/or sell copies of the Software,
       +# and to permit persons to whom the Software is furnished to do so,
       +# subject to the following conditions:
       +#
       +# The above copyright notice and this permission notice shall be
       +# included in all copies or substantial portions of the Software.
       +#
       +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
       +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
       +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
       +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
       +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
       +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
       +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
       +# SOFTWARE.
       +"""Module for calculating merkle branches"""
       +from math import ceil, log
       +
       +from .crypto import sha256d
       +
       +
       +def branch_length(hash_count):
       +    """Return the length of a merkle branch given the number of hashes"""
       +    return ceil(log(hash_count, 2))
       +
       +
       +def merkle_branch_and_root(hashes, index):
       +    """Return a (merkle branch, merkle_root) pair given hashes, and the
       +    index of one of those hashes.
       +    """
       +    hashes = list(hashes)
       +    if not isinstance(index, int):
       +        raise TypeError('index must be an integer')
       +    # This also asserts hashes is not empty
       +    if not 0 <= index < len(hashes):
       +        raise ValueError('index out of range')
       +    length = branch_length(len(hashes))
       +
       +    branch = []
       +    for _ in range(length):
       +        if len(hashes) & 1:
       +            hashes.append(hashes[-1])
       +        branch.append(hashes[index ^ 1])
       +        index >>= 1
       +        hashes = [sha256d(hashes[n] + hashes[n+1])
       +                  for n in range(0, len(hashes), 2)]
       +    return branch, hashes[0]
       +
       +def merkle_branch(tx_hashes, tx_pos):
       +    """Return a merkle branch given hashes and the tx position"""
       +    branch, _root = merkle_branch_and_root(tx_hashes, tx_pos)
       +    branch = [bytes(reversed(h)).hex() for h in branch]
       +    return branch
   DIR diff --git a/electrum/network.py b/electrum/network.py
       t@@ -449,7 +449,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
        
            async def _request_server_info(self, interface: 'Interface'):
                await interface.ready
       -        session = interface.session
       +        # TODO: libbitcoin: session = interface.session
        
                async def get_banner():
                    self.banner = await interface.get_server_banner()
       t@@ -457,7 +457,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
                async def get_donation_address():
                    self.donation_address = await interface.get_donation_address()
                async def get_server_peers():
       -            server_peers = await session.send_request('server.peers.subscribe')
       +            # ORIG: server_peers = await session.send_request('server.peers.subscribe')
       +            # TODO: libbitcoin
       +            server_peers = []
                    random.shuffle(server_peers)
                    max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
                    server_peers = server_peers[:max_accepted_peers]
       t@@ -880,6 +882,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
                if timeout is None:
                    timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
                try:
       +            # TODO: libbitcoin
                    out = await self.interface.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
                    # note: both 'out' and exception messages are untrusted input from the server
                except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
       t@@ -1329,6 +1332,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
                while not self.is_connected():
                    await asyncio.sleep(1)
                session = self.interface.session
       +        # TODO: libbitcoin
                return parse_servers(await session.send_request('server.peers.subscribe'))
        
            async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence):
       t@@ -1342,6 +1346,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
                        await interface.close()
                        return
                    try:
       +                # TODO: libbitcoin
                        res = await interface.session.send_request(method, params, timeout=10)
                    except Exception as e:
                        res = e
   DIR diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py
       t@@ -83,7 +83,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
                        await group.spawn(self.main())
                finally:
                    # we are being cancelled now
       -            self.session.unsubscribe(self.status_queue)
       +            # TODO: libbitcoin
       +            print("self.session.unsubscribe(self.status_queue)")
        
            def _reset_request_counters(self):
                self._requests_sent = 0
       t@@ -110,7 +111,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
                    self._requests_sent += 1
                    try:
                        async with self._network_request_semaphore:
       -                    await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
       +                    # TODO: libbitcoin
       +                    print("await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)")
                    except RPCError as e:
                        if e.message == 'history too large':  # no unique error code
                            raise GracefulDisconnect(e, log_level=logging.ERROR) from e
   DIR diff --git a/electrum/util.py b/electrum/util.py
       t@@ -1225,7 +1225,9 @@ class NetworkJobOnDefaultServer(Logger, ABC):
        
            @property
            def session(self):
       -        s = self.interface.session
       +        # ORIG: s = self.interface.session
       +        # TODO: libbitcoin
       +        s = self.interface.client
                assert s is not None
                return s
        
       t@@ -1525,6 +1527,7 @@ class JsonRPCClient:
                self._id = 0
        
            async def request(self, endpoint, *args):
       +        # TODO: libbitcoin
                self._id += 1
                data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
                        % (self._id, endpoint, json.dumps(args)))
   DIR diff --git a/electrum/zeromq.py b/electrum/zeromq.py
       t@@ -0,0 +1,493 @@
       +#!/usr/bin/env python
       +#
       +# Electrum - lightweight Bitcoin client
       +# Copyright (C) 2011 thomasv@gitorious
       +# Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
       +# Copyright (C) 2018 Harm Aarts <harmaarts@gmail.com>
       +#
       +# Permission is hereby granted, free of charge, to any person
       +# obtaining a copy of this software and associated documentation files
       +# (the "Software"), to deal in the Software without restriction,
       +# including without limitation the rights to use, copy, modify, merge,
       +# publish, distribute, sublicense, and/or sell copies of the Software,
       +# and to permit persons to whom the Software is furnished to do so,
       +# subject to the following conditions:
       +#
       +# The above copyright notice and this permission notice shall be
       +# included in all copies or substantial portions of the Software.
       +#
       +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
       +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
       +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
       +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
       +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
       +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
       +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
       +# SOFTWARE.
       +import asyncio
       +import logging
       +import functools
       +import hashlib
       +import struct
       +from random import randint
       +from binascii import hexlify, unhexlify
       +
       +import zmq
       +import zmq.asyncio
       +
       +from .logging import Logger
       +from .libbitcoin_errors import make_error_code, ErrorCode
       +
       +from datetime import datetime
       +def __(msg):
       +    print("***********************")
       +    print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
       +
       +
       +def create_random_id():
       +    """Generate a random request ID"""
       +    max_uint32 = 4294967295
       +    return randint(0, max_uint32)
       +
       +
       +def checksum(hash_, index):
       +    """This method takes a transaction hash and an index and returns
       +    a checksum.
       +    This checksum is based on 49 bits starting from the 12th byte of the
       +    reversed hash. Combined with the last 15 bits of the 4 byte index.
       +    """
       +    mask = 0xffffffffffff8000
       +    magic_start_position = 12
       +
       +    hash_bytes = bytes.fromhex(hash_)[::-1]
       +    last_20_bytes = hash_bytes[magic_start_position:]
       +
       +    assert len(hash_bytes) == 32
       +    assert index < 2**32
       +
       +    hash_upper_49_bits = to_int(last_20_bytes) & mask
       +    index_lower_15_bits = index & ~mask
       +
       +    return hash_upper_49_bits | index_lower_15_bits
       +
       +
       +def to_int(some_bytes):
       +    return int.from_bytes(some_bytes, byteorder='little')
       +
       +
       +def pack_block_index(index):
       +    if isinstance(index, str):
       +        index = unhexlify(index)
       +        assert len(index) == 32
       +        return index
       +    elif isinstance(index, int):
       +        return struct.pack('<I', index)
       +    else:
       +        raise ValueError(f"Unknown index type {type(index)} v:{index}, should be int or bytearray")
       +
       +
       +def unpack_table(row_fmt, data):
       +    # get the number of rows
       +    row_size = struct.calcsize(row_fmt)
       +    nrows = len(data) // row_size
       +
       +    # unpack
       +    rows = []
       +    for idx in range(nrows):
       +        offset = idx * row_size
       +        row = struct.unpack_from(row_fmt, data, offset)
       +        rows.append(row)
       +    return rows
       +
       +
       +class ClientSettings:
       +    """Class implementing client settings"""
       +    def __init__(self, timeout=10, context=None, loop=None):
       +        __("Zeromq ClientSettings: __init__")
       +        self._timeout = timeout
       +        self._context = context
       +        self._loop = loop
       +
       +    @property
       +    def context(self):
       +        """zmq context property"""
       +        if not self._context:
       +            ctx = zmq.asyncio.Context()
       +            ctx.linger = 500  # in milliseconds
       +            self._context = ctx
       +        return self._context
       +
       +    @context.setter
       +    def context(self, context):
       +        self._context = context
       +
       +    @property
       +    def timeout(self):
       +        """Set to None for no timeout"""
       +        return self._timeout
       +
       +    @timeout.setter
       +    def timeout(self, timeout):
       +        self._timeout = timeout
       +
       +
       +class Request:
       +    """Class implementing a _send_ request.
       +    This is either a simple request/response affair or a subscription.
       +    """
       +    def __init__(self, socket, command, data):
       +        __("Zeromq Request: __init__")
       +        self.id_ = create_random_id()
       +        self.socket = socket
       +        self.command = command
       +        self.data = data
       +        self.future = asyncio.Future()
       +        self.queue = None
       +
       +    async def send(self):
       +        """Send the zmq request"""
       +        __(f"Zeromq Request: send: {self.command}, {self.data}")
       +        request = [self.command, struct.pack('<I', self.id_), self.data]
       +        await self.socket.send_multipart(request)
       +
       +    def is_subscription(self):
       +        """If the request is a subscription, then the response to this
       +        request is a notification.
       +        """
       +        return self.queue is not None
       +
       +    def __str__(self):
       +        return 'Request(command, ID) {}, {:d}'.format(self.command,
       +                                                      self.id_)
       +
       +
       +class InvalidServerResponseException(Exception): pass
       +
       +
       +class Response:
       +    """Class implementing a request response"""
       +    def __init__(self, frame):
       +        __("Zeromq Response: __init__")
       +        if len(frame) != 3:
       +            raise InvalidServerResponseException(
       +                'Length of the frame was not 3: %d' % len(frame))
       +
       +        self.command = frame[0]
       +        self.request_id = struct.unpack('<I', frame[1])[0]
       +        error_code = struct.unpack('<I', frame[2][:4])[0]
       +        self.error_code = make_error_code(error_code)
       +        self.data = frame[2][4:]
       +
       +    def is_bound_for_queue(self):
       +        return len(self.data) > 0
       +
       +    def __str__(self):
       +        return 'Response(command, request ID, error code, data):' \
       +            + ' %s, %d, %s, %s' \
       +            % (self.command, self.request_id, self.error_code, self.data)
       +
       +
       +class RequestCollection:
       +    """RequestCollection carries a list of Requests and matches incoming
       +    responses to them.
       +    """
       +    def __init__(self, socket, loop):
       +        __("Zeromq RequestCollection: __init__")
       +        self._socket = socket
       +        self._requests = {}
       +        self._task = asyncio.ensure_future(self._run(), loop=loop)
       +
       +    async def _run(self):
       +        while True:
       +            __("Zeromq RequestCollection: _run loop iteration")
       +            await self._receive()
       +
       +    async def stop(self):
       +        """Stops listening for incoming responses (or subscription) messages).
       +        Returns the number of _responses_ expected but which are now dropped
       +        on the floor.
       +        """
       +        __("Zeromq RequestCollection: stop")
       +        self._task.cancel()
       +        try:
       +            await self._task
       +        except asyncio.CancelledError:
       +            return len(self._requests)
       +
       +    async def _receive(self):
       +        __("Zeromq RequestCollection: receive")
       +        frame = await self._socket.recv_multipart()
       +        response = Response(frame)
       +
       +        if response.request_id in self._requests:
       +            self._handle_response(response)
       +        else:
       +            __("Zeromq RequestCollection: receive: unhandled response %s:%s" % (response.command, response.request_id))
       +
       +    def _handle_response(self, response):
       +        __("Zeromq RequestCollection: _handle_response")
       +        request = self._requests[response.request_id]
       +
       +        if request.is_subscription():
       +            if response.is_bound_for_queue():
       +                # TODO: decode the data into something usable
       +                request.queue.put_nowait(response.data)
       +            else:
       +                request.future.set_result(response)
       +        else:
       +            self.delete_request(request)
       +            request.future.set_result(response)
       +
       +    def add_request(self, request):
       +        __("Zeromq RequestCollection: add_request")
       +        # TODO: we should maybe check if the request.id_ is unique
       +        self._requests[request.id_] = request
       +
       +    def delete_request(self, request):
       +        __("Zeromq RequestCollection: delete_request")
       +        del self._requests[request.id_]
       +
       +
       +class Client:
       +    """This class represents a connection to a libbitcoin server.
       +    hostname -- the server DNS name to connect to.
       +    ports -- a dictionary containing four keys; query/heartbeat/block/tx
       +    """
       +    # def __init__(self, hostname, ports, settings=ClientSettings()):
       +    def __init__(self, hostname, ports, loop):
       +        __("Zeromq Client: __init__")
       +        self._hostname = hostname
       +        self._ports = ports
       +        # self._settings = settings
       +        self._settings = ClientSettings(loop=loop)
       +        self._query_socket = self._create_query_socket()
       +        self._block_socket = self._create_block_socket()
       +        self._request_collection = RequestCollection(
       +            self._query_socket, self._settings._loop)
       +
       +    async def stop(self):
       +        __("Zeromq Client: stop")
       +        self._query_socket.close()
       +        self._block_socket.close()
       +        return await self._request_collection.stop()
       +
       +    def _create_block_socket(self):
       +        __("Zeromq Client: _create_block_socket")
       +        socket = self._settings.context.socket(
       +            zmq.SUB, io_loop=self._settings._loop)  # pylint: disable=E1101
       +        socket.connect(self.__server_url(self._hostname,
       +                                         self._ports['block']))
       +        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # pylint: disable=E1101
       +        return socket
       +
       +    def _create_query_socket(self):
       +        __("Zeromq Client: _create_query_socket")
       +        socket = self._settings.context.socket(
       +            zmq.DEALER, io_loop=self._settings._loop)  # pylint: disable=E1101
       +        socket.connect(self.__server_url(self._hostname,
       +                                         self._ports['query']))
       +        return socket
       +
       +    async def _subscription_request(self, command, data):
       +        __("Zeromq Client: _subscription_request")
       +        request = await self._request(command, data)
       +        request.queue = asyncio.Queue(loop=self._settings._loop)
       +        error_code, _ = await self._wait_for_response(request)
       +        return error_code, request.queue
       +
       +    async def _simple_request(self, command, data):
       +        __("Zeromq Client: _simple_request")
       +        return await self._wait_for_response(
       +            await self._request(command, data))
       +
       +    async def _request(self, command, data):
       +        """Make a generic request. Both options are byte objects
       +        specified like b'blockchain.fetch_block_header' as an example.
       +        """
       +        __("Zeromq Client: _request")
       +        request = Request(self._query_socket, command, data)
       +        await request.send()
       +        self._request_collection.add_request(request)
       +        return request
       +
       +    async def _wait_for_response(self, request):
       +        __("Zeromq Client: _wait_for_response")
       +        try:
       +            response = await asyncio.wait_for(request.future,
       +                                              self._settings.timeout)
       +        except asyncio.TimeoutError:
       +            self._request_collection.delete_request(request)
       +            return ErrorCode.channel_timeout, None
       +
       +        assert response.command == request.command
       +        assert response.request_id == request.id_
       +        return response.error_code, response.data
       +
       +    @staticmethod
       +    def __server_url(hostname, port):
       +        return 'tcp://' + hostname + ':' + str(port)
       +
       +    async def last_height(self):
       +        __("Zeromq Client: last_height")
       +        command = b'blockchain.fetch_last_height'
       +        error_code, data = await self._simple_request(command, b'')
       +        if error_code:
       +            return error_code, None
       +        height = struct.unpack('<I', data)[0]
       +        return error_code, height
       +
       +    async def subscribe_to_blocks(self, queue):
       +        __("Zeromq Client: subscribe_to_blocks")
       +        asyncio.ensure_future(self._listen_for_blocks(queue))
       +        return queue
       +
       +    async def _listen_for_blocks(self, queue):
       +        __("Zeromq Client: _listen_for_blocks")
       +        _ec, tip = await self.last_height()
       +        _, header = await self.block_header(tip)
       +        queue.put_nowait((0, tip, header))
       +        while True:
       +            __("Zeromq Client: _listen_for_blocks loop iteration")
       +            frame = await self._block_socket.recv_multipart()
       +            seq = struct.unpack('<H', frame[0])[0]
       +            height = struct.unpack('<I', frame[1])[0]
       +            block_data = frame[2]
       +            block_header = block_data[:80]
       +            # block_header = raw[:80]
       +            # version = block_header[:4]
       +            # prev_merkle_root = block_header[4:36]
       +            # merkle_root = block_header[36:68]
       +            # timestamp = block_header[68:72]
       +            # bits = block_header[72:76]
       +            # nonce = blockheader[76:80]
       +            queue.put_nowait((seq, height, block_header))
       +
       +    async def block_header(self, index):
       +        """Fetches the block header by height or integer index"""
       +        __("Zeromq Client: block_header")
       +        command = b'blockchain.fetch_block_header'
       +        data = pack_block_index(index)
       +        error_code, data = await self._simple_request(command, data)
       +        if error_code:
       +            return error_code, None
       +        return error_code, data
       +
       +    async def block_transaction_hashes(self, index):
       +        __("Zeromq Client: block_transaction_hashes")
       +        command = b'blockchain.fetch_block_transaction_hashes'
       +        data = pack_block_index(index)
       +        error_code, data = await self._simple_request(command, data)
       +        if error_code:
       +            return error_code, None
       +        data = unpack_table('32s', data)
       +        return error_code, data
       +
       +    async def transaction(self, hash_):
       +        __("Zeromq Client: transaction")
       +        command = b'blockchain.fetch_transaction2'
       +        error_code, data = await self._simple_request(
       +            command, bytes.fromhex(hash_)[::-1])
       +        if error_code:
       +            return error_code, None
       +        return None, data
       +
       +    async def mempool_transaction(self, hash_):
       +        __("Zeromq Client: mempool_transaction")
       +        command = b'transaction_pool.fetch_transaction2'
       +        error_code, data = await self._simple_request(
       +            command, bytes.fromhex(hash_)[::-1])
       +        if error_code:
       +            return error_code, None
       +        return None, data
       +
       +    async def history4(self, scripthash, height=0):
       +        __("Zeromq Client: history4")
       +        command = b'blockchain.fetch_history4'
       +        decoded_address = unhexlify(scripthash)[::-1]  # TODO: check byte order
       +        error_code, raw_points = await self._simple_request(
       +            command, decoded_address + struct.pack('<I', height))
       +        if error_code:
       +            return error_code, None
       +
       +        def make_tuple(row):
       +            kind, tx_hash, index, height, value = row
       +            return (
       +                kind,
       +                #COutPoint(tx_hash, index),
       +                (tx_hash, index),
       +                height,
       +                value,
       +                checksum(tx_hash[::-1].hex(), index),
       +            )
       +
       +        rows = unpack_table('<B32sIIQ', raw_points)
       +        points = [make_tuple(row) for row in rows]
       +        correlated_points = Client.__correlate(points)
       +        return None, correlated_points
       +
       +    async def balance(self, scripthash):
       +        __("Zeromq Client: balance")
       +        error, hist = await self.history4(scripthash)
       +        if error:
       +            return error, None
       +        utxo = Client.__receives_without_spends(hist)
       +        return None, functools.reduce(
       +            lambda accumulator, point: accumulator + point['value'], utxo, 0)
       +
       +    async def unspent(self, scripthash):
       +        __("Zeromq Client: unspent")
       +        error, hist = await self.history4(scripthash)
       +        if error:
       +            return error, None
       +        return None, Client.__receives_without_spends(hist)
       +
       +    @staticmethod
       +    def __receives_without_spends(hist):
       +        return (point for point in hist if 'spent' not in point)
       +
       +    @staticmethod
       +    def __correlate(points):
       +        transfers, checksum_to_index = Client.__find_receives(points)
       +        transfers = Client.__correlate_spends_to_receives(
       +            points, transfers, checksum_to_index)
       +        return transfers
       +
       +    @staticmethod
       +    def __correlate_spends_to_receives(points, transfers, checksum_to_index):
       +        for point in points:
       +            if point[0] == 0: # receive
       +                continue
       +
       +            spent = {
       +                'hash': point[1].hash,
       +                'height': point[2],
       +                'index': point[1].n,
       +            }
       +            if point[3] not in checksum_to_index:
       +                transfers.append({'spent': spent})
       +            else:
       +                transfers[checksum_to_index[point[3]]]['spent'] = spent
       +
       +        return transfers
       +
       +    @staticmethod
       +    def __find_receives(points):
       +        transfers = []
       +        checksum_to_index = {}
       +
       +        for point in points:
       +            if point[0] == 1:  # spent
       +                continue
       +
       +            transfers.append({
       +                'received': {
       +                    'hash': point[1].hash,
       +                    'height': point[2],
       +                    'index': point[1].n,
       +                },
       +                'value': point[3],
       +            })
       +
       +            checksum_to_index[point[4]] = len(transfers) - 1
       +
       +        return transfers, checksum_to_index