URI: 
       told_interface.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       told_interface.py (47478B)
       ---
            1 #!/usr/bin/env python
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2011 thomasv@gitorious
            5 #
            6 # Permission is hereby granted, free of charge, to any person
            7 # obtaining a copy of this software and associated documentation files
            8 # (the "Software"), to deal in the Software without restriction,
            9 # including without limitation the rights to use, copy, modify, merge,
           10 # publish, distribute, sublicense, and/or sell copies of the Software,
           11 # and to permit persons to whom the Software is furnished to do so,
           12 # subject to the following conditions:
           13 #
           14 # The above copyright notice and this permission notice shall be
           15 # included in all copies or substantial portions of the Software.
           16 #
           17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           24 # SOFTWARE.
           25 import os
           26 import re
           27 import ssl
           28 import sys
           29 import traceback
           30 import asyncio
           31 import socket
           32 from typing import Tuple, Union, List, TYPE_CHECKING, Optional, Set, NamedTuple, Any, Sequence
           33 from collections import defaultdict
           34 from ipaddress import IPv4Network, IPv6Network, ip_address, IPv6Address, IPv4Address
           35 import itertools
           36 import logging
           37 import hashlib
           38 import functools
           39 
           40 import aiorpcx
           41 from aiorpcx import TaskGroup
           42 from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
           43 from aiorpcx.curio import timeout_after, TaskTimeout
           44 from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
           45 from aiorpcx.rawsocket import RSClient
           46 import certifi
           47 
           48 from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy,
           49                    is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
           50                    is_int_or_float, is_non_negative_int_or_float)
           51 from . import util
           52 from . import x509
           53 from . import pem
           54 from . import version
           55 from . import blockchain
           56 from .blockchain import Blockchain, HEADER_SIZE
           57 from . import bitcoin
           58 from . import constants
           59 from .i18n import _
           60 from .logging import Logger
           61 from .transaction import Transaction
           62 
           63 if TYPE_CHECKING:
           64     from .network import Network
           65     from .simple_config import SimpleConfig
           66 
           67 
           68 ca_path = certifi.where()
           69 
           70 BUCKET_NAME_OF_ONION_SERVERS = 'onion'
           71 
           72 MAX_INCOMING_MSG_SIZE = 1_000_000  # in bytes
           73 
           74 _KNOWN_NETWORK_PROTOCOLS = {'t', 's'}
           75 PREFERRED_NETWORK_PROTOCOL = 's'
           76 assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
           77 
           78 
           79 class NetworkTimeout:
           80     # seconds
           81     class Generic:
           82         NORMAL = 30
           83         RELAXED = 45
           84         MOST_RELAXED = 600
           85 
           86     class Urgent(Generic):
           87         NORMAL = 10
           88         RELAXED = 20
           89         MOST_RELAXED = 60
           90 
           91 
           92 def assert_non_negative_integer(val: Any) -> None:
           93     if not is_non_negative_integer(val):
           94         raise RequestCorrupted(f'{val!r} should be a non-negative integer')
           95 
           96 
           97 def assert_integer(val: Any) -> None:
           98     if not is_integer(val):
           99         raise RequestCorrupted(f'{val!r} should be an integer')
          100 
          101 
          102 def assert_int_or_float(val: Any) -> None:
          103     if not is_int_or_float(val):
          104         raise RequestCorrupted(f'{val!r} should be int or float')
          105 
          106 
          107 def assert_non_negative_int_or_float(val: Any) -> None:
          108     if not is_non_negative_int_or_float(val):
          109         raise RequestCorrupted(f'{val!r} should be a non-negative int or float')
          110 
          111 
          112 def assert_hash256_str(val: Any) -> None:
          113     if not is_hash256_str(val):
          114         raise RequestCorrupted(f'{val!r} should be a hash256 str')
          115 
          116 
          117 def assert_hex_str(val: Any) -> None:
          118     if not is_hex_str(val):
          119         raise RequestCorrupted(f'{val!r} should be a hex str')
          120 
          121 
          122 def assert_dict_contains_field(d: Any, *, field_name: str) -> Any:
          123     if not isinstance(d, dict):
          124         raise RequestCorrupted(f'{d!r} should be a dict')
          125     if field_name not in d:
          126         raise RequestCorrupted(f'required field {field_name!r} missing from dict')
          127     return d[field_name]
          128 
          129 
          130 def assert_list_or_tuple(val: Any) -> None:
          131     if not isinstance(val, (list, tuple)):
          132         raise RequestCorrupted(f'{val!r} should be a list or tuple')
          133 
          134 
          135 class NotificationSession(RPCSession):
          136 
          137     def __init__(self, *args, interface: 'Interface', **kwargs):
          138         super(NotificationSession, self).__init__(*args, **kwargs)
          139         self.subscriptions = defaultdict(list)
          140         self.cache = {}
          141         self.default_timeout = NetworkTimeout.Generic.NORMAL
          142         self._msg_counter = itertools.count(start=1)
          143         self.interface = interface
          144         self.cost_hard_limit = 0  # disable aiorpcx resource limits
          145 
          146     async def handle_request(self, request):
          147         self.maybe_log(f"--> {request}")
          148         try:
          149             if isinstance(request, Notification):
          150                 params, result = request.args[:-1], request.args[-1]
          151                 key = self.get_hashable_key_for_rpc_call(request.method, params)
          152                 if key in self.subscriptions:
          153                     self.cache[key] = result
          154                     for queue in self.subscriptions[key]:
          155                         await queue.put(request.args)
          156                 else:
          157                     raise Exception(f'unexpected notification')
          158             else:
          159                 raise Exception(f'unexpected request. not a notification')
          160         except Exception as e:
          161             self.interface.logger.info(f"error handling request {request}. exc: {repr(e)}")
          162             await self.close()
          163 
          164     async def send_request(self, *args, timeout=None, **kwargs):
          165         # note: semaphores/timeouts/backpressure etc are handled by
          166         # aiorpcx. the timeout arg here in most cases should not be set
          167         msg_id = next(self._msg_counter)
          168         self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
          169         try:
          170             # note: RPCSession.send_request raises TaskTimeout in case of a timeout.
          171             # TaskTimeout is a subclass of CancelledError, which is *suppressed* in TaskGroups
          172             response = await asyncio.wait_for(
          173                 super().send_request(*args, **kwargs),
          174                 timeout)
          175         except (TaskTimeout, asyncio.TimeoutError) as e:
          176             raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
          177         except CodeMessageError as e:
          178             self.maybe_log(f"--> {repr(e)} (id: {msg_id})")
          179             raise
          180         else:
          181             self.maybe_log(f"--> {response} (id: {msg_id})")
          182             return response
          183 
          184     def set_default_timeout(self, timeout):
          185         self.sent_request_timeout = timeout
          186         self.max_send_delay = timeout
          187 
          188     async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
          189         # note: until the cache is written for the first time,
          190         # each 'subscribe' call might make a request on the network.
          191         key = self.get_hashable_key_for_rpc_call(method, params)
          192         self.subscriptions[key].append(queue)
          193         if key in self.cache:
          194             result = self.cache[key]
          195         else:
          196             result = await self.send_request(method, params)
          197             self.cache[key] = result
          198         await queue.put(params + [result])
          199 
          200     def unsubscribe(self, queue):
          201         """Unsubscribe a callback to free object references to enable GC."""
          202         # note: we can't unsubscribe from the server, so we keep receiving
          203         # subsequent notifications
          204         for v in self.subscriptions.values():
          205             if queue in v:
          206                 v.remove(queue)
          207 
          208     @classmethod
          209     def get_hashable_key_for_rpc_call(cls, method, params):
          210         """Hashable index for subscriptions and cache"""
          211         return str(method) + repr(params)
          212 
          213     def maybe_log(self, msg: str) -> None:
          214         if not self.interface: return
          215         if self.interface.debug or self.interface.network.debug:
          216             self.interface.logger.debug(msg)
          217 
          218     def default_framer(self):
          219         # overridden so that max_size can be customized
          220         max_size = int(self.interface.network.config.get('network_max_incoming_msg_size',
          221                                                          MAX_INCOMING_MSG_SIZE))
          222         return NewlineFramer(max_size=max_size)
          223 
          224 
          225 class NetworkException(Exception): pass
          226 
          227 
          228 class GracefulDisconnect(NetworkException):
          229     log_level = logging.INFO
          230 
          231     def __init__(self, *args, log_level=None, **kwargs):
          232         Exception.__init__(self, *args, **kwargs)
          233         if log_level is not None:
          234             self.log_level = log_level
          235 
          236 
          237 class RequestTimedOut(GracefulDisconnect):
          238     def __str__(self):
          239         return _("Network request timed out.")
          240 
          241 
          242 class RequestCorrupted(Exception): pass
          243 
          244 class ErrorParsingSSLCert(Exception): pass
          245 class ErrorGettingSSLCertFromServer(Exception): pass
          246 class ErrorSSLCertFingerprintMismatch(Exception): pass
          247 class InvalidOptionCombination(Exception): pass
          248 class ConnectError(NetworkException): pass
          249 
          250 
          251 class _RSClient(RSClient):
          252     async def create_connection(self):
          253         try:
          254             return await super().create_connection()
          255         except OSError as e:
          256             # note: using "from e" here will set __cause__ of ConnectError
          257             raise ConnectError(e) from e
          258 
          259 
          260 class ServerAddr:
          261 
          262     def __init__(self, host: str, port: Union[int, str], *, protocol: str = None):
          263         assert isinstance(host, str), repr(host)
          264         if protocol is None:
          265             protocol = 's'
          266         if not host:
          267             raise ValueError('host must not be empty')
          268         if host[0] == '[' and host[-1] == ']':  # IPv6
          269             host = host[1:-1]
          270         try:
          271             net_addr = NetAddress(host, port)  # this validates host and port
          272         except Exception as e:
          273             raise ValueError(f"cannot construct ServerAddr: invalid host or port (host={host}, port={port})") from e
          274         if protocol not in _KNOWN_NETWORK_PROTOCOLS:
          275             raise ValueError(f"invalid network protocol: {protocol}")
          276         self.host = str(net_addr.host)  # canonical form (if e.g. IPv6 address)
          277         self.port = int(net_addr.port)
          278         self.protocol = protocol
          279         self._net_addr_str = str(net_addr)
          280 
          281     @classmethod
          282     def from_str(cls, s: str) -> 'ServerAddr':
          283         # host might be IPv6 address, hence do rsplit:
          284         host, port, protocol = str(s).rsplit(':', 2)
          285         return ServerAddr(host=host, port=port, protocol=protocol)
          286 
          287     @classmethod
          288     def from_str_with_inference(cls, s: str) -> Optional['ServerAddr']:
          289         """Construct ServerAddr from str, guessing missing details.
          290         Ongoing compatibility not guaranteed.
          291         """
          292         if not s:
          293             return None
          294         items = str(s).rsplit(':', 2)
          295         if len(items) < 2:
          296             return None  # although maybe we could guess the port too?
          297         host = items[0]
          298         port = items[1]
          299         if len(items) >= 3:
          300             protocol = items[2]
          301         else:
          302             protocol = PREFERRED_NETWORK_PROTOCOL
          303         return ServerAddr(host=host, port=port, protocol=protocol)
          304 
          305     def to_friendly_name(self) -> str:
          306         # note: this method is closely linked to from_str_with_inference
          307         if self.protocol == 's':  # hide trailing ":s"
          308             return self.net_addr_str()
          309         return str(self)
          310 
          311     def __str__(self):
          312         return '{}:{}'.format(self.net_addr_str(), self.protocol)
          313 
          314     def to_json(self) -> str:
          315         return str(self)
          316 
          317     def __repr__(self):
          318         return f'<ServerAddr host={self.host} port={self.port} protocol={self.protocol}>'
          319 
          320     def net_addr_str(self) -> str:
          321         return self._net_addr_str
          322 
          323     def __eq__(self, other):
          324         if not isinstance(other, ServerAddr):
          325             return False
          326         return (self.host == other.host
          327                 and self.port == other.port
          328                 and self.protocol == other.protocol)
          329 
          330     def __ne__(self, other):
          331         return not (self == other)
          332 
          333     def __hash__(self):
          334         return hash((self.host, self.port, self.protocol))
          335 
          336 
          337 def _get_cert_path_for_host(*, config: 'SimpleConfig', host: str) -> str:
          338     filename = host
          339     try:
          340         ip = ip_address(host)
          341     except ValueError:
          342         pass
          343     else:
          344         if isinstance(ip, IPv6Address):
          345             filename = f"ipv6_{ip.packed.hex()}"
          346     return os.path.join(config.path, 'certs', filename)
          347 
          348 
          349 class Interface(Logger):
          350 
          351     LOGGING_SHORTCUT = 'i'
          352 
          353     def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[dict]):
          354         self.ready = asyncio.Future()
          355         self.got_disconnected = asyncio.Event()
          356         self.server = server
          357         Logger.__init__(self)
          358         assert network.config.path
          359         self.cert_path = _get_cert_path_for_host(config=network.config, host=self.host)
          360         self.blockchain = None  # type: Optional[Blockchain]
          361         self._requested_chunks = set()  # type: Set[int]
          362         self.network = network
          363         self.proxy = MySocksProxy.from_proxy_dict(proxy)
          364         self.session = None  # type: Optional[NotificationSession]
          365         self._ipaddr_bucket = None
          366 
          367         # Latest block header and corresponding height, as claimed by the server.
          368         # Note that these values are updated before they are verified.
          369         # Especially during initial header sync, verification can take a long time.
          370         # Failing verification will get the interface closed.
          371         self.tip_header = None
          372         self.tip = 0
          373 
          374         self.fee_estimates_eta = {}
          375 
          376         # Dump network messages (only for this interface).  Set at runtime from the console.
          377         self.debug = False
          378 
          379         self.taskgroup = SilentTaskGroup()
          380 
          381         async def spawn_task():
          382             task = await self.network.taskgroup.spawn(self.run())
          383             if sys.version_info >= (3, 8):
          384                 task.set_name(f"interface::{str(server)}")
          385         asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
          386 
          387     @property
          388     def host(self):
          389         return self.server.host
          390 
          391     @property
          392     def port(self):
          393         return self.server.port
          394 
          395     @property
          396     def protocol(self):
          397         return self.server.protocol
          398 
          399     def diagnostic_name(self):
          400         return self.server.net_addr_str()
          401 
          402     def __str__(self):
          403         return f"<Interface {self.diagnostic_name()}>"
          404 
          405     async def is_server_ca_signed(self, ca_ssl_context):
          406         """Given a CA enforcing SSL context, returns True if the connection
          407         can be established. Returns False if the server has a self-signed
          408         certificate but otherwise is okay. Any other failures raise.
          409         """
          410         try:
          411             await self.open_session(ca_ssl_context, exit_early=True)
          412         except ConnectError as e:
          413             cause = e.__cause__
          414             if isinstance(cause, ssl.SSLError) and cause.reason == 'CERTIFICATE_VERIFY_FAILED':
          415                 # failures due to self-signed certs are normal
          416                 return False
          417             raise
          418         return True
          419 
          420     async def _try_saving_ssl_cert_for_first_time(self, ca_ssl_context):
          421         ca_signed = await self.is_server_ca_signed(ca_ssl_context)
          422         if ca_signed:
          423             if self._get_expected_fingerprint():
          424                 raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
          425             with open(self.cert_path, 'w') as f:
          426                 # empty file means this is CA signed, not self-signed
          427                 f.write('')
          428         else:
          429             await self._save_certificate()
          430 
          431     def _is_saved_ssl_cert_available(self):
          432         if not os.path.exists(self.cert_path):
          433             return False
          434         with open(self.cert_path, 'r') as f:
          435             contents = f.read()
          436         if contents == '':  # CA signed
          437             if self._get_expected_fingerprint():
          438                 raise InvalidOptionCombination("cannot use --serverfingerprint with CA signed servers")
          439             return True
          440         # pinned self-signed cert
          441         try:
          442             b = pem.dePem(contents, 'CERTIFICATE')
          443         except SyntaxError as e:
          444             self.logger.info(f"error parsing already saved cert: {e}")
          445             raise ErrorParsingSSLCert(e) from e
          446         try:
          447             x = x509.X509(b)
          448         except Exception as e:
          449             self.logger.info(f"error parsing already saved cert: {e}")
          450             raise ErrorParsingSSLCert(e) from e
          451         try:
          452             x.check_date()
          453         except x509.CertificateError as e:
          454             self.logger.info(f"certificate has expired: {e}")
          455             os.unlink(self.cert_path)  # delete pinned cert only in this case
          456             return False
          457         self._verify_certificate_fingerprint(bytearray(b))
          458         return True
          459 
          460     async def _get_ssl_context(self):
          461         if self.protocol != 's':
          462             # using plaintext TCP
          463             return None
          464 
          465         # see if we already have cert for this server; or get it for the first time
          466         ca_sslc = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
          467         if not self._is_saved_ssl_cert_available():
          468             try:
          469                 await self._try_saving_ssl_cert_for_first_time(ca_sslc)
          470             except (OSError, ConnectError, aiorpcx.socks.SOCKSError) as e:
          471                 raise ErrorGettingSSLCertFromServer(e) from e
          472         # now we have a file saved in our certificate store
          473         siz = os.stat(self.cert_path).st_size
          474         if siz == 0:
          475             # CA signed cert
          476             sslc = ca_sslc
          477         else:
          478             # pinned self-signed cert
          479             sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
          480             sslc.check_hostname = 0
          481         return sslc
          482 
          483     def handle_disconnect(func):
          484         @functools.wraps(func)
          485         async def wrapper_func(self: 'Interface', *args, **kwargs):
          486             try:
          487                 return await func(self, *args, **kwargs)
          488             except GracefulDisconnect as e:
          489                 self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
          490             except aiorpcx.jsonrpc.RPCError as e:
          491                 self.logger.warning(f"disconnecting due to {repr(e)}")
          492                 self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True)
          493             finally:
          494                 self.got_disconnected.set()
          495                 await self.network.connection_down(self)
          496                 # if was not 'ready' yet, schedule waiting coroutines:
          497                 self.ready.cancel()
          498         return wrapper_func
          499 
          500     @ignore_exceptions  # do not kill network.taskgroup
          501     @log_exceptions
          502     @handle_disconnect
          503     async def run(self):
          504         try:
          505             ssl_context = await self._get_ssl_context()
          506         except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
          507             self.logger.info(f'disconnecting due to: {repr(e)}')
          508             return
          509         try:
          510             await self.open_session(ssl_context)
          511         except (asyncio.CancelledError, ConnectError, aiorpcx.socks.SOCKSError) as e:
          512             # make SSL errors for main interface more visible (to help servers ops debug cert pinning issues)
          513             if (isinstance(e, ConnectError) and isinstance(e.__cause__, ssl.SSLError)
          514                     and self.is_main_server() and not self.network.auto_connect):
          515                 self.logger.warning(f'Cannot connect to main server due to SSL error '
          516                                     f'(maybe cert changed compared to "{self.cert_path}"). Exc: {repr(e)}')
          517             else:
          518                 self.logger.info(f'disconnecting due to: {repr(e)}')
          519             return
          520 
          521     def _mark_ready(self) -> None:
          522         if self.ready.cancelled():
          523             raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
          524         if self.ready.done():
          525             return
          526 
          527         assert self.tip_header
          528         chain = blockchain.check_header(self.tip_header)
          529         if not chain:
          530             self.blockchain = blockchain.get_best_chain()
          531         else:
          532             self.blockchain = chain
          533         assert self.blockchain is not None
          534 
          535         self.logger.info(f"set blockchain with height {self.blockchain.height()}")
          536 
          537         self.ready.set_result(1)
          538 
          539     async def _save_certificate(self) -> None:
          540         if not os.path.exists(self.cert_path):
          541             # we may need to retry this a few times, in case the handshake hasn't completed
          542             for _ in range(10):
          543                 dercert = await self._fetch_certificate()
          544                 if dercert:
          545                     self.logger.info("succeeded in getting cert")
          546                     self._verify_certificate_fingerprint(dercert)
          547                     with open(self.cert_path, 'w') as f:
          548                         cert = ssl.DER_cert_to_PEM_cert(dercert)
          549                         # workaround android bug
          550                         cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
          551                         f.write(cert)
          552                         # even though close flushes we can't fsync when closed.
          553                         # and we must flush before fsyncing, cause flush flushes to OS buffer
          554                         # fsync writes to OS buffer to disk
          555                         f.flush()
          556                         os.fsync(f.fileno())
          557                     break
          558                 await asyncio.sleep(1)
          559             else:
          560                 raise GracefulDisconnect("could not get certificate after 10 tries")
          561 
          562     async def _fetch_certificate(self) -> bytes:
          563         sslc = ssl.SSLContext()
          564         async with _RSClient(session_factory=RPCSession,
          565                              host=self.host, port=self.port,
          566                              ssl=sslc, proxy=self.proxy) as session:
          567             asyncio_transport = session.transport._asyncio_transport  # type: asyncio.BaseTransport
          568             ssl_object = asyncio_transport.get_extra_info("ssl_object")  # type: ssl.SSLObject
          569             return ssl_object.getpeercert(binary_form=True)
          570 
          571     def _get_expected_fingerprint(self) -> Optional[str]:
          572         if self.is_main_server():
          573             return self.network.config.get("serverfingerprint")
          574 
          575     def _verify_certificate_fingerprint(self, certificate):
          576         expected_fingerprint = self._get_expected_fingerprint()
          577         if not expected_fingerprint:
          578             return
          579         fingerprint = hashlib.sha256(certificate).hexdigest()
          580         fingerprints_match = fingerprint.lower() == expected_fingerprint.lower()
          581         if not fingerprints_match:
          582             util.trigger_callback('cert_mismatch')
          583             raise ErrorSSLCertFingerprintMismatch('Refusing to connect to server due to cert fingerprint mismatch')
          584         self.logger.info("cert fingerprint verification passed")
          585 
          586     async def get_block_header(self, height, assert_mode):
          587         self.logger.info(f'requesting block header {height} in mode {assert_mode}')
          588         # use lower timeout as we usually have network.bhi_lock here
          589         timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
          590         res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
          591         return blockchain.deserialize_header(bytes.fromhex(res), height)
          592 
          593     async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
          594         if not is_non_negative_integer(height):
          595             raise Exception(f"{repr(height)} is not a block height")
          596         index = height // 2016
          597         if can_return_early and index in self._requested_chunks:
          598             return
          599         self.logger.info(f"requesting chunk from height {height}")
          600         size = 2016
          601         if tip is not None:
          602             size = min(size, tip - index * 2016 + 1)
          603             size = max(size, 0)
          604         try:
          605             self._requested_chunks.add(index)
          606             res = await self.session.send_request('blockchain.block.headers', [index * 2016, size])
          607         finally:
          608             self._requested_chunks.discard(index)
          609         assert_dict_contains_field(res, field_name='count')
          610         assert_dict_contains_field(res, field_name='hex')
          611         assert_dict_contains_field(res, field_name='max')
          612         assert_non_negative_integer(res['count'])
          613         assert_non_negative_integer(res['max'])
          614         assert_hex_str(res['hex'])
          615         if len(res['hex']) != HEADER_SIZE * 2 * res['count']:
          616             raise RequestCorrupted('inconsistent chunk hex and count')
          617         # we never request more than 2016 headers, but we enforce those fit in a single response
          618         if res['max'] < 2016:
          619             raise RequestCorrupted(f"server uses too low 'max' count for block.headers: {res['max']} < 2016")
          620         if res['count'] != size:
          621             raise RequestCorrupted(f"expected {size} headers but only got {res['count']}")
          622         conn = self.blockchain.connect_chunk(index, res['hex'])
          623         if not conn:
          624             return conn, 0
          625         return conn, res['count']
          626 
          627     def is_main_server(self) -> bool:
          628         return (self.network.interface == self or
          629                 self.network.interface is None and self.network.default_server == self.server)
          630 
          631     async def open_session(self, sslc, exit_early=False):
          632         session_factory = lambda *args, iface=self, **kwargs: NotificationSession(*args, **kwargs, interface=iface)
          633         async with _RSClient(session_factory=session_factory,
          634                              host=self.host, port=self.port,
          635                              ssl=sslc, proxy=self.proxy) as session:
          636             self.session = session  # type: NotificationSession
          637             self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
          638             try:
          639                 ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
          640             except aiorpcx.jsonrpc.RPCError as e:
          641                 raise GracefulDisconnect(e)  # probably 'unsupported protocol version'
          642             if exit_early:
          643                 return
          644             if ver[1] != version.PROTOCOL_VERSION:
          645                 raise GracefulDisconnect(f'server violated protocol-version-negotiation. '
          646                                          f'we asked for {version.PROTOCOL_VERSION!r}, they sent {ver[1]!r}')
          647             if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
          648                 raise GracefulDisconnect(f'too many connected servers already '
          649                                          f'in bucket {self.bucket_based_on_ipaddress()}')
          650             self.logger.info(f"connection established. version: {ver}")
          651 
          652             try:
          653                 async with self.taskgroup as group:
          654                     await group.spawn(self.ping)
          655                     await group.spawn(self.request_fee_estimates)
          656                     await group.spawn(self.run_fetch_blocks)
          657                     await group.spawn(self.monitor_connection)
          658             except aiorpcx.jsonrpc.RPCError as e:
          659                 if e.code in (JSONRPC.EXCESSIVE_RESOURCE_USAGE,
          660                               JSONRPC.SERVER_BUSY,
          661                               JSONRPC.METHOD_NOT_FOUND):
          662                     raise GracefulDisconnect(e, log_level=logging.WARNING) from e
          663                 raise
          664 
          665     async def monitor_connection(self):
          666         while True:
          667             await asyncio.sleep(1)
          668             if not self.session or self.session.is_closing():
          669                 raise GracefulDisconnect('session was closed')
          670 
          671     async def ping(self):
          672         while True:
          673             await asyncio.sleep(300)
          674             await self.session.send_request('server.ping')
          675 
          676     async def request_fee_estimates(self):
          677         from .simple_config import FEE_ETA_TARGETS
          678         while True:
          679             async with TaskGroup() as group:
          680                 fee_tasks = []
          681                 for i in FEE_ETA_TARGETS:
          682                     fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
          683             for nblock_target, task in fee_tasks:
          684                 fee = task.result()
          685                 if fee < 0: continue
          686                 self.fee_estimates_eta[nblock_target] = fee
          687             self.network.update_fee_estimates()
          688             await asyncio.sleep(60)
          689 
          690     async def close(self, *, force_after: int = None):
          691         """Closes the connection and waits for it to be closed.
          692         We try to flush buffered data to the wire, so this can take some time.
          693         """
          694         if force_after is None:
          695             # We give up after a while and just abort the connection.
          696             # Note: specifically if the server is running Fulcrum, waiting seems hopeless,
          697             #       the connection must be aborted (see https://github.com/cculianu/Fulcrum/issues/76)
          698             force_after = 1  # seconds
          699         if self.session:
          700             await self.session.close(force_after=force_after)
          701         # monitor_connection will cancel tasks
          702 
          703     async def run_fetch_blocks(self):
          704         header_queue = asyncio.Queue()
          705         await self.session.subscribe('blockchain.headers.subscribe', [], header_queue)
          706         while True:
          707             item = await header_queue.get()
          708             raw_header = item[0]
          709             height = raw_header['height']
          710             header = blockchain.deserialize_header(bfh(raw_header['hex']), height)
          711             self.tip_header = header
          712             self.tip = height
          713             if self.tip < constants.net.max_checkpoint():
          714                 raise GracefulDisconnect('server tip below max checkpoint')
          715             self._mark_ready()
          716             await self._process_header_at_tip()
          717             # header processing done
          718             util.trigger_callback('blockchain_updated')
          719             util.trigger_callback('network_updated')
          720             await self.network.switch_unwanted_fork_interface()
          721             await self.network.switch_lagging_interface()
          722 
          723     async def _process_header_at_tip(self):
          724         height, header = self.tip, self.tip_header
          725         async with self.network.bhi_lock:
          726             if self.blockchain.height() >= height and self.blockchain.check_header(header):
          727                 # another interface amended the blockchain
          728                 self.logger.info(f"skipping header {height}")
          729                 return
          730             _, height = await self.step(height, header)
          731             # in the simple case, height == self.tip+1
          732             if height <= self.tip:
          733                 await self.sync_until(height)
          734 
          735     async def sync_until(self, height, next_height=None):
          736         if next_height is None:
          737             next_height = self.tip
          738         last = None
          739         while last is None or height <= next_height:
          740             prev_last, prev_height = last, height
          741             if next_height > height + 10:
          742                 could_connect, num_headers = await self.request_chunk(height, next_height)
          743                 if not could_connect:
          744                     if height <= constants.net.max_checkpoint():
          745                         raise GracefulDisconnect('server chain conflicts with checkpoints or genesis')
          746                     last, height = await self.step(height)
          747                     continue
          748                 util.trigger_callback('network_updated')
          749                 height = (height // 2016 * 2016) + num_headers
          750                 assert height <= next_height+1, (height, self.tip)
          751                 last = 'catchup'
          752             else:
          753                 last, height = await self.step(height)
          754             assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until'
          755         return last, height
          756 
          757     async def step(self, height, header=None):
          758         assert 0 <= height <= self.tip, (height, self.tip)
          759         if header is None:
          760             header = await self.get_block_header(height, 'catchup')
          761 
          762         chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
          763         if chain:
          764             self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
          765             # note: there is an edge case here that is not handled.
          766             # we might know the blockhash (enough for check_header) but
          767             # not have the header itself. e.g. regtest chain with only genesis.
          768             # this situation resolves itself on the next block
          769             return 'catchup', height+1
          770 
          771         can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
          772         if not can_connect:
          773             self.logger.info(f"can't connect {height}")
          774             height, header, bad, bad_header = await self._search_headers_backwards(height, header)
          775             chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
          776             can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
          777             assert chain or can_connect
          778         if can_connect:
          779             self.logger.info(f"could connect {height}")
          780             height += 1
          781             if isinstance(can_connect, Blockchain):  # not when mocking
          782                 self.blockchain = can_connect
          783                 self.blockchain.save_header(header)
          784             return 'catchup', height
          785 
          786         good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain)
          787         return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header)
          788 
          789     async def _search_headers_binary(self, height, bad, bad_header, chain):
          790         assert bad == bad_header['block_height']
          791         _assert_header_does_not_check_against_any_chain(bad_header)
          792 
          793         self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
          794         good = height
          795         while True:
          796             assert good < bad, (good, bad)
          797             height = (good + bad) // 2
          798             self.logger.info(f"binary step. good {good}, bad {bad}, height {height}")
          799             header = await self.get_block_header(height, 'binary')
          800             chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
          801             if chain:
          802                 self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain
          803                 good = height
          804             else:
          805                 bad = height
          806                 bad_header = header
          807             if good + 1 == bad:
          808                 break
          809 
          810         mock = 'mock' in bad_header and bad_header['mock']['connect'](height)
          811         real = not mock and self.blockchain.can_connect(bad_header, check_height=False)
          812         if not real and not mock:
          813             raise Exception('unexpected bad header during binary: {}'.format(bad_header))
          814         _assert_header_does_not_check_against_any_chain(bad_header)
          815 
          816         self.logger.info(f"binary search exited. good {good}, bad {bad}")
          817         return good, bad, bad_header
          818 
          819     async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header):
          820         assert good + 1 == bad
          821         assert bad == bad_header['block_height']
          822         _assert_header_does_not_check_against_any_chain(bad_header)
          823         # 'good' is the height of a block 'good_header', somewhere in self.blockchain.
          824         # bad_header connects to good_header; bad_header itself is NOT in self.blockchain.
          825 
          826         bh = self.blockchain.height()
          827         assert bh >= good, (bh, good)
          828         if bh == good:
          829             height = good + 1
          830             self.logger.info(f"catching up from {height}")
          831             return 'no_fork', height
          832 
          833         # this is a new fork we don't yet have
          834         height = bad + 1
          835         self.logger.info(f"new fork at bad height {bad}")
          836         forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork']
          837         b = forkfun(bad_header)  # type: Blockchain
          838         self.blockchain = b
          839         assert b.forkpoint == bad
          840         return 'fork', height
          841 
          842     async def _search_headers_backwards(self, height, header):
          843         async def iterate():
          844             nonlocal height, header
          845             checkp = False
          846             if height <= constants.net.max_checkpoint():
          847                 height = constants.net.max_checkpoint()
          848                 checkp = True
          849             header = await self.get_block_header(height, 'backward')
          850             chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
          851             can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height)
          852             if chain or can_connect:
          853                 return False
          854             if checkp:
          855                 raise GracefulDisconnect("server chain conflicts with checkpoints")
          856             return True
          857 
          858         bad, bad_header = height, header
          859         _assert_header_does_not_check_against_any_chain(bad_header)
          860         with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values())
          861         local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf')
          862         height = min(local_max + 1, height - 1)
          863         while await iterate():
          864             bad, bad_header = height, header
          865             delta = self.tip - height
          866             height = self.tip - 2 * delta
          867 
          868         _assert_header_does_not_check_against_any_chain(bad_header)
          869         self.logger.info(f"exiting backward mode at {height}")
          870         return height, header, bad, bad_header
          871 
          872     @classmethod
          873     def client_name(cls) -> str:
          874         return f'electrum/{version.ELECTRUM_VERSION}'
          875 
          876     def is_tor(self):
          877         return self.host.endswith('.onion')
          878 
          879     def ip_addr(self) -> Optional[str]:
          880         session = self.session
          881         if not session: return None
          882         peer_addr = session.remote_address()
          883         if not peer_addr: return None
          884         return str(peer_addr.host)
          885 
          886     def bucket_based_on_ipaddress(self) -> str:
          887         def do_bucket():
          888             if self.is_tor():
          889                 return BUCKET_NAME_OF_ONION_SERVERS
          890             try:
          891                 ip_addr = ip_address(self.ip_addr())  # type: Union[IPv4Address, IPv6Address]
          892             except ValueError:
          893                 return ''
          894             if not ip_addr:
          895                 return ''
          896             if ip_addr.is_loopback:  # localhost is exempt
          897                 return ''
          898             if ip_addr.version == 4:
          899                 slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
          900                 return str(slash16)
          901             elif ip_addr.version == 6:
          902                 slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
          903                 return str(slash48)
          904             return ''
          905 
          906         if not self._ipaddr_bucket:
          907             self._ipaddr_bucket = do_bucket()
          908         return self._ipaddr_bucket
          909 
          910     async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
          911         if not is_hash256_str(tx_hash):
          912             raise Exception(f"{repr(tx_hash)} is not a txid")
          913         if not is_non_negative_integer(tx_height):
          914             raise Exception(f"{repr(tx_height)} is not a block height")
          915         # do request
          916         res = await self.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
          917         # check response
          918         block_height = assert_dict_contains_field(res, field_name='block_height')
          919         merkle = assert_dict_contains_field(res, field_name='merkle')
          920         pos = assert_dict_contains_field(res, field_name='pos')
          921         # note: tx_height was just a hint to the server, don't enforce the response to match it
          922         assert_non_negative_integer(block_height)
          923         assert_non_negative_integer(pos)
          924         assert_list_or_tuple(merkle)
          925         for item in merkle:
          926             assert_hash256_str(item)
          927         return res
          928 
          929     async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
          930         if not is_hash256_str(tx_hash):
          931             raise Exception(f"{repr(tx_hash)} is not a txid")
          932         raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
          933         # validate response
          934         if not is_hex_str(raw):
          935             raise RequestCorrupted(f"received garbage (non-hex) as tx data (txid {tx_hash}): {raw!r}")
          936         tx = Transaction(raw)
          937         try:
          938             tx.deserialize()  # see if raises
          939         except Exception as e:
          940             raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
          941         if tx.txid() != tx_hash:
          942             raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
          943         return raw
          944 
          945     async def get_history_for_scripthash(self, sh: str) -> List[dict]:
          946         if not is_hash256_str(sh):
          947             raise Exception(f"{repr(sh)} is not a scripthash")
          948         # do request
          949         res = await self.session.send_request('blockchain.scripthash.get_history', [sh])
          950         # check response
          951         assert_list_or_tuple(res)
          952         prev_height = 1
          953         for tx_item in res:
          954             height = assert_dict_contains_field(tx_item, field_name='height')
          955             assert_dict_contains_field(tx_item, field_name='tx_hash')
          956             assert_integer(height)
          957             assert_hash256_str(tx_item['tx_hash'])
          958             if height in (-1, 0):
          959                 assert_dict_contains_field(tx_item, field_name='fee')
          960                 assert_non_negative_integer(tx_item['fee'])
          961                 prev_height = - float("inf")  # this ensures confirmed txs can't follow mempool txs
          962             else:
          963                 # check monotonicity of heights
          964                 if height < prev_height:
          965                     raise RequestCorrupted(f'heights of confirmed txs must be in increasing order')
          966                 prev_height = height
          967         hashes = set(map(lambda item: item['tx_hash'], res))
          968         if len(hashes) != len(res):
          969             # Either server is sending garbage... or maybe if server is race-prone
          970             # a recently mined tx could be included in both last block and mempool?
          971             # Still, it's simplest to just disregard the response.
          972             raise RequestCorrupted(f"server history has non-unique txids for sh={sh}")
          973         return res
          974 
          975     async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
          976         if not is_hash256_str(sh):
          977             raise Exception(f"{repr(sh)} is not a scripthash")
          978         # do request
          979         res = await self.session.send_request('blockchain.scripthash.listunspent', [sh])
          980         # check response
          981         assert_list_or_tuple(res)
          982         for utxo_item in res:
          983             assert_dict_contains_field(utxo_item, field_name='tx_pos')
          984             assert_dict_contains_field(utxo_item, field_name='value')
          985             assert_dict_contains_field(utxo_item, field_name='tx_hash')
          986             assert_dict_contains_field(utxo_item, field_name='height')
          987             assert_non_negative_integer(utxo_item['tx_pos'])
          988             assert_non_negative_integer(utxo_item['value'])
          989             assert_non_negative_integer(utxo_item['height'])
          990             assert_hash256_str(utxo_item['tx_hash'])
          991         return res
          992 
          993     async def get_balance_for_scripthash(self, sh: str) -> dict:
          994         if not is_hash256_str(sh):
          995             raise Exception(f"{repr(sh)} is not a scripthash")
          996         # do request
          997         res = await self.session.send_request('blockchain.scripthash.get_balance', [sh])
          998         # check response
          999         assert_dict_contains_field(res, field_name='confirmed')
         1000         assert_dict_contains_field(res, field_name='unconfirmed')
         1001         assert_non_negative_integer(res['confirmed'])
         1002         assert_non_negative_integer(res['unconfirmed'])
         1003         return res
         1004 
         1005     async def get_txid_from_txpos(self, tx_height: int, tx_pos: int, merkle: bool):
         1006         if not is_non_negative_integer(tx_height):
         1007             raise Exception(f"{repr(tx_height)} is not a block height")
         1008         if not is_non_negative_integer(tx_pos):
         1009             raise Exception(f"{repr(tx_pos)} should be non-negative integer")
         1010         # do request
         1011         res = await self.session.send_request(
         1012             'blockchain.transaction.id_from_pos',
         1013             [tx_height, tx_pos, merkle],
         1014         )
         1015         # check response
         1016         if merkle:
         1017             assert_dict_contains_field(res, field_name='tx_hash')
         1018             assert_dict_contains_field(res, field_name='merkle')
         1019             assert_hash256_str(res['tx_hash'])
         1020             assert_list_or_tuple(res['merkle'])
         1021             for node_hash in res['merkle']:
         1022                 assert_hash256_str(node_hash)
         1023         else:
         1024             assert_hash256_str(res)
         1025         return res
         1026 
         1027     async def get_fee_histogram(self) -> Sequence[Tuple[Union[float, int], int]]:
         1028         # do request
         1029         res = await self.session.send_request('mempool.get_fee_histogram')
         1030         # check response
         1031         assert_list_or_tuple(res)
         1032         prev_fee = float('inf')
         1033         for fee, s in res:
         1034             assert_non_negative_int_or_float(fee)
         1035             assert_non_negative_integer(s)
         1036             if fee >= prev_fee:  # check monotonicity
         1037                 raise RequestCorrupted(f'fees must be in decreasing order')
         1038             prev_fee = fee
         1039         return res
         1040 
         1041     async def get_server_banner(self) -> str:
         1042         # do request
         1043         res = await self.session.send_request('server.banner')
         1044         # check response
         1045         if not isinstance(res, str):
         1046             raise RequestCorrupted(f'{res!r} should be a str')
         1047         return res
         1048 
         1049     async def get_donation_address(self) -> str:
         1050         # do request
         1051         res = await self.session.send_request('server.donation_address')
         1052         # check response
         1053         if not res:  # ignore empty string
         1054             return ''
         1055         if not bitcoin.is_address(res):
         1056             # note: do not hard-fail -- allow server to use future-type
         1057             #       bitcoin address we do not recognize
         1058             self.logger.info(f"invalid donation address from server: {repr(res)}")
         1059             res = ''
         1060         return res
         1061 
         1062     async def get_relay_fee(self) -> int:
         1063         """Returns the min relay feerate in sat/kbyte."""
         1064         # do request
         1065         res = await self.session.send_request('blockchain.relayfee')
         1066         # check response
         1067         assert_non_negative_int_or_float(res)
         1068         relayfee = int(res * bitcoin.COIN)
         1069         relayfee = max(0, relayfee)
         1070         return relayfee
         1071 
         1072     async def get_estimatefee(self, num_blocks: int) -> int:
         1073         """Returns a feerate estimate for getting confirmed within
         1074         num_blocks blocks, in sat/kbyte.
         1075         """
         1076         if not is_non_negative_integer(num_blocks):
         1077             raise Exception(f"{repr(num_blocks)} is not a num_blocks")
         1078         # do request
         1079         res = await self.session.send_request('blockchain.estimatefee', [num_blocks])
         1080         # check response
         1081         if res != -1:
         1082             assert_non_negative_int_or_float(res)
         1083             res = int(res * bitcoin.COIN)
         1084         return res
         1085 
         1086 
         1087 def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
         1088     chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
         1089     if chain_bad:
         1090         raise Exception('bad_header must not check!')
         1091 
         1092 
         1093 def check_cert(host, cert):
         1094     try:
         1095         b = pem.dePem(cert, 'CERTIFICATE')
         1096         x = x509.X509(b)
         1097     except:
         1098         traceback.print_exc(file=sys.stdout)
         1099         return
         1100 
         1101     try:
         1102         x.check_date()
         1103         expired = False
         1104     except:
         1105         expired = True
         1106 
         1107     m = "host: %s\n"%host
         1108     m += "has_expired: %s\n"% expired
         1109     util.print_msg(m)
         1110 
         1111 
         1112 # Used by tests
         1113 def _match_hostname(name, val):
         1114     if val == name:
         1115         return True
         1116 
         1117     return val.startswith('*.') and name.endswith(val[1:])
         1118 
         1119 
         1120 def test_certificates():
         1121     from .simple_config import SimpleConfig
         1122     config = SimpleConfig()
         1123     mydir = os.path.join(config.path, "certs")
         1124     certs = os.listdir(mydir)
         1125     for c in certs:
         1126         p = os.path.join(mydir,c)
         1127         with open(p, encoding='utf-8') as f:
         1128             cert = f.read()
         1129         check_cert(c, cert)
         1130 
         1131 if __name__ == "__main__":
         1132     test_certificates()