URI: 
       tnetwork.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tnetwork.py (58124B)
       ---
            1 # Electrum - Lightweight Bitcoin Client
            2 # Copyright (c) 2011-2016 Thomas Voegtlin
            3 #
            4 # Permission is hereby granted, free of charge, to any person
            5 # obtaining a copy of this software and associated documentation files
            6 # (the "Software"), to deal in the Software without restriction,
            7 # including without limitation the rights to use, copy, modify, merge,
            8 # publish, distribute, sublicense, and/or sell copies of the Software,
            9 # and to permit persons to whom the Software is furnished to do so,
           10 # subject to the following conditions:
           11 #
           12 # The above copyright notice and this permission notice shall be
           13 # included in all copies or substantial portions of the Software.
           14 #
           15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           22 # SOFTWARE.
           23 import asyncio
           24 import time
           25 import queue
           26 import os
           27 import random
           28 import re
           29 from collections import defaultdict
           30 import threading
           31 import socket
           32 import json
           33 import sys
           34 import asyncio
           35 from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any
           36 import traceback
           37 import concurrent
           38 from concurrent import futures
           39 import copy
           40 import functools
           41 
           42 import aiorpcx
           43 from aiorpcx import TaskGroup, ignore_after
           44 from aiohttp import ClientResponse
           45 
           46 from . import util
           47 from .util import (log_exceptions, ignore_exceptions,
           48                    bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter,
           49                    is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
           50                    nullcontext)
           51 from .bitcoin import COIN
           52 from . import constants
           53 from . import blockchain
           54 from . import bitcoin
           55 from . import dns_hacks
           56 from .transaction import Transaction
           57 from .blockchain import Blockchain, HEADER_SIZE
           58 from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
           59                         RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
           60                         NetworkException, RequestCorrupted, ServerAddr)
           61 from .version import PROTOCOL_VERSION
           62 from .simple_config import SimpleConfig
           63 from .i18n import _
           64 from .logging import get_logger, Logger
           65 from .lnutil import ChannelBlackList
           66 
           67 if TYPE_CHECKING:
           68     from .channel_db import ChannelDB
           69     from .lnworker import LNGossip
           70     from .lnwatcher import WatchTower
           71     from .daemon import Daemon
           72 
           73 
           74 _logger = get_logger(__name__)
           75 
           76 
           77 NUM_TARGET_CONNECTED_SERVERS = 10
           78 NUM_STICKY_SERVERS = 4
           79 NUM_RECENT_SERVERS = 20
           80 
           81 
           82 def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
           83     """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
           84     Also validate values, such as IP addresses and ports.
           85     """
           86     servers = {}
           87     for item in result:
           88         host = item[1]
           89         out = {}
           90         version = None
           91         pruning_level = '-'
           92         if len(item) > 2:
           93             for v in item[2]:
           94                 if re.match(r"[st]\d*", v):
           95                     protocol, port = v[0], v[1:]
           96                     if port == '': port = constants.net.DEFAULT_PORTS[protocol]
           97                     ServerAddr(host, port, protocol=protocol)  # check if raises
           98                     out[protocol] = port
           99                 elif re.match("v(.?)+", v):
          100                     version = v[1:]
          101                 elif re.match(r"p\d*", v):
          102                     pruning_level = v[1:]
          103                 if pruning_level == '': pruning_level = '0'
          104         if out:
          105             out['pruning'] = pruning_level
          106             out['version'] = version
          107             servers[host] = out
          108     return servers
          109 
          110 
          111 def filter_version(servers):
          112     def is_recent(version):
          113         try:
          114             return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
          115         except Exception as e:
          116             return False
          117     return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
          118 
          119 
          120 def filter_noonion(servers):
          121     return {k: v for k, v in servers.items() if not k.endswith('.onion')}
          122 
          123 
          124 def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
          125     """Filters the hostmap for those implementing protocol."""
          126     if allowed_protocols is None:
          127         allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
          128     eligible = []
          129     for host, portmap in hostmap.items():
          130         for protocol in allowed_protocols:
          131             port = portmap.get(protocol)
          132             if port:
          133                 eligible.append(ServerAddr(host, port, protocol=protocol))
          134     return eligible
          135 
          136 
          137 def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
          138                        exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
          139     if hostmap is None:
          140         hostmap = constants.net.DEFAULT_SERVERS
          141     if exclude_set is None:
          142         exclude_set = set()
          143     servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
          144     eligible = list(servers - exclude_set)
          145     return random.choice(eligible) if eligible else None
          146 
          147 
          148 class NetworkParameters(NamedTuple):
          149     server: ServerAddr
          150     proxy: Optional[dict]
          151     auto_connect: bool
          152     oneserver: bool = False
          153 
          154 
          155 proxy_modes = ['socks4', 'socks5']
          156 
          157 
          158 def serialize_proxy(p):
          159     if not isinstance(p, dict):
          160         return None
          161     return ':'.join([p.get('mode'), p.get('host'), p.get('port'),
          162                      p.get('user', ''), p.get('password', '')])
          163 
          164 
          165 def deserialize_proxy(s: str) -> Optional[dict]:
          166     if not isinstance(s, str):
          167         return None
          168     if s.lower() == 'none':
          169         return None
          170     proxy = { "mode":"socks5", "host":"localhost" }
          171     # FIXME raw IPv6 address fails here
          172     args = s.split(':')
          173     n = 0
          174     if proxy_modes.count(args[n]) == 1:
          175         proxy["mode"] = args[n]
          176         n += 1
          177     if len(args) > n:
          178         proxy["host"] = args[n]
          179         n += 1
          180     if len(args) > n:
          181         proxy["port"] = args[n]
          182         n += 1
          183     else:
          184         proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
          185     if len(args) > n:
          186         proxy["user"] = args[n]
          187         n += 1
          188     if len(args) > n:
          189         proxy["password"] = args[n]
          190     return proxy
          191 
          192 
          193 class BestEffortRequestFailed(NetworkException): pass
          194 
          195 
          196 class TxBroadcastError(NetworkException):
          197     def get_message_for_gui(self):
          198         raise NotImplementedError()
          199 
          200 
          201 class TxBroadcastHashMismatch(TxBroadcastError):
          202     def get_message_for_gui(self):
          203         return "{}\n{}\n\n{}" \
          204             .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
          205                     _("Consider trying to connect to a different server, or updating Electrum."),
          206                     str(self))
          207 
          208 
          209 class TxBroadcastServerReturnedError(TxBroadcastError):
          210     def get_message_for_gui(self):
          211         return "{}\n{}\n\n{}" \
          212             .format(_("The server returned an error when broadcasting the transaction."),
          213                     _("Consider trying to connect to a different server, or updating Electrum."),
          214                     str(self))
          215 
          216 
          217 class TxBroadcastUnknownError(TxBroadcastError):
          218     def get_message_for_gui(self):
          219         return "{}\n{}" \
          220             .format(_("Unknown error when broadcasting the transaction."),
          221                     _("Consider trying to connect to a different server, or updating Electrum."))
          222 
          223 
          224 class UntrustedServerReturnedError(NetworkException):
          225     def __init__(self, *, original_exception):
          226         self.original_exception = original_exception
          227 
          228     def get_message_for_gui(self) -> str:
          229         return str(self)
          230 
          231     def __str__(self):
          232         return _("The server returned an error.")
          233 
          234     def __repr__(self):
          235         return (f"<UntrustedServerReturnedError "
          236                 f"[DO NOT TRUST THIS MESSAGE] original_exception: {repr(self.original_exception)}>")
          237 
          238 
          239 _INSTANCE = None
          240 
          241 
          242 class Network(Logger, NetworkRetryManager[ServerAddr]):
          243     """The Network class manages a set of connections to remote electrum
          244     servers, each connected socket is handled by an Interface() object.
          245     """
          246 
          247     LOGGING_SHORTCUT = 'n'
          248 
          249     taskgroup: Optional[TaskGroup]
          250     interface: Optional[Interface]
          251     interfaces: Dict[ServerAddr, Interface]
          252     _connecting_ifaces: Set[ServerAddr]
          253     _closing_ifaces: Set[ServerAddr]
          254     default_server: ServerAddr
          255     _recent_servers: List[ServerAddr]
          256 
          257     channel_blacklist: 'ChannelBlackList'
          258     channel_db: Optional['ChannelDB'] = None
          259     lngossip: Optional['LNGossip'] = None
          260     local_watchtower: Optional['WatchTower'] = None
          261 
          262     def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
          263         global _INSTANCE
          264         assert _INSTANCE is None, "Network is a singleton!"
          265         _INSTANCE = self
          266 
          267         Logger.__init__(self)
          268         NetworkRetryManager.__init__(
          269             self,
          270             max_retry_delay_normal=600,
          271             init_retry_delay_normal=15,
          272             max_retry_delay_urgent=10,
          273             init_retry_delay_urgent=1,
          274         )
          275 
          276         self.asyncio_loop = asyncio.get_event_loop()
          277         assert self.asyncio_loop.is_running(), "event loop not running"
          278         try:
          279             self._loop_thread = self.asyncio_loop._mythread  # type: threading.Thread  # only used for sanity checks
          280         except AttributeError as e:
          281             self.logger.warning(f"asyncio loop does not have _mythread set: {e!r}")
          282             self._loop_thread = None
          283 
          284         assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
          285         self.config = config
          286 
          287         self.daemon = daemon
          288 
          289         blockchain.read_blockchains(self.config)
          290         blockchain.init_headers_file_for_best_chain()
          291         self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
          292         self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None)  # type: Dict[str, Any]
          293         if self._blockchain_preferred_block is None:
          294             self._set_preferred_chain(None)
          295         self._blockchain = blockchain.get_best_chain()
          296 
          297         self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
          298 
          299         # Server for addresses and transactions
          300         self.default_server = self.config.get('server', None)
          301         # Sanitize default server
          302         if self.default_server:
          303             try:
          304                 self.default_server = ServerAddr.from_str(self.default_server)
          305             except:
          306                 self.logger.warning('failed to parse server-string; falling back to localhost:1:s.')
          307                 self.default_server = ServerAddr.from_str("localhost:1:s")
          308         else:
          309             self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
          310         assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
          311 
          312         self.taskgroup = None
          313 
          314         # locks
          315         self.restart_lock = asyncio.Lock()
          316         self.bhi_lock = asyncio.Lock()
          317         self.recent_servers_lock = threading.RLock()       # <- re-entrant
          318         self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
          319 
          320         self.server_peers = {}  # returned by interface (servers that the main interface knows about)
          321         self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
          322 
          323         self.banner = ''
          324         self.donation_address = ''
          325         self.relay_fee = None  # type: Optional[int]
          326 
          327         dir_path = os.path.join(self.config.path, 'certs')
          328         util.make_dir(dir_path)
          329 
          330         # the main server we are currently communicating with
          331         self.interface = None
          332         self.default_server_changed_event = asyncio.Event()
          333         # Set of servers we have an ongoing connection with.
          334         # For any ServerAddr, at most one corresponding Interface object
          335         # can exist at any given time. Depending on the state of that Interface,
          336         # the ServerAddr can be found in one of the following sets.
          337         # Note: during a transition, the ServerAddr can appear in two sets momentarily.
          338         self._connecting_ifaces = set()
          339         self.interfaces = {}  # these are the ifaces in "initialised and usable" state
          340         self._closing_ifaces = set()
          341 
          342         self.auto_connect = self.config.get('auto_connect', True)
          343         self.proxy = None
          344         self._maybe_set_oneserver()
          345 
          346         # Dump network messages (all interfaces).  Set at runtime from the console.
          347         self.debug = False
          348 
          349         self._set_status('disconnected')
          350         self._has_ever_managed_to_connect_to_server = False
          351 
          352         # lightning network
          353         self.channel_blacklist = ChannelBlackList()
          354         if self.config.get('run_local_watchtower', False):
          355             from . import lnwatcher
          356             self.local_watchtower = lnwatcher.WatchTower(self)
          357             self.local_watchtower.start_network(self)
          358             asyncio.ensure_future(self.local_watchtower.start_watching())
          359 
          360     def has_internet_connection(self) -> bool:
          361         """Our guess whether the device has Internet-connectivity."""
          362         return self._has_ever_managed_to_connect_to_server
          363 
          364     def has_channel_db(self):
          365         return self.channel_db is not None
          366 
          367     def start_gossip(self):
          368         from . import lnrouter
          369         from . import channel_db
          370         from . import lnworker
          371         if not self.config.get('use_gossip'):
          372             return
          373         if self.lngossip is None:
          374             self.channel_db = channel_db.ChannelDB(self)
          375             self.path_finder = lnrouter.LNPathFinder(self.channel_db)
          376             self.channel_db.load_data()
          377             self.lngossip = lnworker.LNGossip()
          378             self.lngossip.start_network(self)
          379 
          380     async def stop_gossip(self, *, full_shutdown: bool = False):
          381         if self.lngossip:
          382             await self.lngossip.stop()
          383             self.lngossip = None
          384             self.channel_db.stop()
          385             if full_shutdown:
          386                 await self.channel_db.stopped_event.wait()
          387             self.channel_db = None
          388 
          389     def run_from_another_thread(self, coro, *, timeout=None):
          390         assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
          391         fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
          392         return fut.result(timeout)
          393 
          394     @staticmethod
          395     def get_instance() -> Optional["Network"]:
          396         return _INSTANCE
          397 
          398     def with_recent_servers_lock(func):
          399         def func_wrapper(self, *args, **kwargs):
          400             with self.recent_servers_lock:
          401                 return func(self, *args, **kwargs)
          402         return func_wrapper
          403 
          404     def _read_recent_servers(self) -> List[ServerAddr]:
          405         if not self.config.path:
          406             return []
          407         path = os.path.join(self.config.path, "recent_servers")
          408         try:
          409             with open(path, "r", encoding='utf-8') as f:
          410                 data = f.read()
          411                 servers_list = json.loads(data)
          412             return [ServerAddr.from_str(s) for s in servers_list]
          413         except:
          414             return []
          415 
          416     @with_recent_servers_lock
          417     def _save_recent_servers(self):
          418         if not self.config.path:
          419             return
          420         path = os.path.join(self.config.path, "recent_servers")
          421         s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
          422         try:
          423             with open(path, "w", encoding='utf-8') as f:
          424                 f.write(s)
          425         except:
          426             pass
          427 
          428     async def _server_is_lagging(self) -> bool:
          429         sh = self.get_server_height()
          430         if not sh:
          431             self.logger.info('no height for main interface')
          432             return True
          433         lh = self.get_local_height()
          434         result = (lh - sh) > 1
          435         if result:
          436             self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
          437         return result
          438 
          439     def _set_status(self, status):
          440         self.connection_status = status
          441         self.notify('status')
          442 
          443     def is_connected(self):
          444         interface = self.interface
          445         return interface is not None and interface.ready.done()
          446 
          447     def is_connecting(self):
          448         return self.connection_status == 'connecting'
          449 
          450     async def _request_server_info(self, interface: 'Interface'):
          451         await interface.ready
          452         # TODO: libbitcoin: session = interface.session
          453 
          454         async def get_banner():
          455             self.banner = await interface.get_server_banner()
          456             self.notify('banner')
          457         async def get_donation_address():
          458             self.donation_address = await interface.get_donation_address()
          459         async def get_server_peers():
          460             # ORIG: server_peers = await session.send_request('server.peers.subscribe')
          461             # TODO: libbitcoin
          462             server_peers = []
          463             random.shuffle(server_peers)
          464             max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
          465             server_peers = server_peers[:max_accepted_peers]
          466             # note that 'parse_servers' also validates the data (which is untrusted input!)
          467             self.server_peers = parse_servers(server_peers)
          468             self.notify('servers')
          469         async def get_relay_fee():
          470             self.relay_fee = await interface.get_relay_fee()
          471 
          472         async with TaskGroup() as group:
          473             await group.spawn(get_banner)
          474             await group.spawn(get_donation_address)
          475             await group.spawn(get_server_peers)
          476             await group.spawn(get_relay_fee)
          477             await group.spawn(self._request_fee_estimates(interface))
          478 
          479     async def _request_fee_estimates(self, interface):
          480         self.config.requested_fee_estimates()
          481         histogram = await interface.get_fee_histogram()
          482         self.config.mempool_fees = histogram
          483         self.logger.info(f'fee_histogram {histogram}')
          484         self.notify('fee_histogram')
          485 
          486     def get_status_value(self, key):
          487         if key == 'status':
          488             value = self.connection_status
          489         elif key == 'banner':
          490             value = self.banner
          491         elif key == 'fee':
          492             value = self.config.fee_estimates
          493         elif key == 'fee_histogram':
          494             value = self.config.mempool_fees
          495         elif key == 'servers':
          496             value = self.get_servers()
          497         else:
          498             raise Exception('unexpected trigger key {}'.format(key))
          499         return value
          500 
          501     def notify(self, key):
          502         if key in ['status', 'updated']:
          503             util.trigger_callback(key)
          504         else:
          505             util.trigger_callback(key, self.get_status_value(key))
          506 
          507     def get_parameters(self) -> NetworkParameters:
          508         return NetworkParameters(server=self.default_server,
          509                                  proxy=self.proxy,
          510                                  auto_connect=self.auto_connect,
          511                                  oneserver=self.oneserver)
          512 
          513     def get_donation_address(self):
          514         if self.is_connected():
          515             return self.donation_address
          516 
          517     def get_interfaces(self) -> List[ServerAddr]:
          518         """The list of servers for the connected interfaces."""
          519         with self.interfaces_lock:
          520             return list(self.interfaces)
          521 
          522     def get_fee_estimates(self):
          523         from statistics import median
          524         from .simple_config import FEE_ETA_TARGETS
          525         if self.auto_connect:
          526             with self.interfaces_lock:
          527                 out = {}
          528                 for n in FEE_ETA_TARGETS:
          529                     try:
          530                         out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
          531                     except:
          532                         continue
          533                 return out
          534         else:
          535             if not self.interface:
          536                 return {}
          537             return self.interface.fee_estimates_eta
          538 
          539     def update_fee_estimates(self):
          540         e = self.get_fee_estimates()
          541         for nblock_target, fee in e.items():
          542             self.config.update_fee_estimates(nblock_target, fee)
          543         if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != e:
          544             self._prev_fee_est = copy.copy(e)
          545             self.logger.info(f'fee_estimates {e}')
          546         self.notify('fee')
          547 
          548     @with_recent_servers_lock
          549     def get_servers(self):
          550         # note: order of sources when adding servers here is crucial!
          551         # don't let "server_peers" overwrite anything,
          552         # otherwise main server can eclipse the client
          553         out = dict()
          554         # add servers received from main interface
          555         server_peers = self.server_peers
          556         if server_peers:
          557             out.update(filter_version(server_peers.copy()))
          558         # hardcoded servers
          559         out.update(constants.net.DEFAULT_SERVERS)
          560         # add recent servers
          561         for server in self._recent_servers:
          562             port = str(server.port)
          563             if server.host in out:
          564                 out[server.host].update({server.protocol: port})
          565             else:
          566                 out[server.host] = {server.protocol: port}
          567         # potentially filter out some
          568         if self.config.get('noonion'):
          569             out = filter_noonion(out)
          570         return out
          571 
          572     def _get_next_server_to_try(self) -> Optional[ServerAddr]:
          573         now = time.time()
          574         with self.interfaces_lock:
          575             connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
          576         # First try from recent servers. (which are persisted)
          577         # As these are servers we successfully connected to recently, they are
          578         # most likely to work. This also makes servers "sticky".
          579         # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
          580         #       however if they succeed, the eclipsing would persist. To try to balance this,
          581         #       we only give priority to recent_servers up to NUM_STICKY_SERVERS.
          582         with self.recent_servers_lock:
          583             recent_servers = list(self._recent_servers)
          584         recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
          585         if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
          586             for server in recent_servers:
          587                 if server in connected_servers:
          588                     continue
          589                 if not self._can_retry_addr(server, now=now):
          590                     continue
          591                 return server
          592         # try all servers we know about, pick one at random
          593         hostmap = self.get_servers()
          594         servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
          595         random.shuffle(servers)
          596         for server in servers:
          597             if not self._can_retry_addr(server, now=now):
          598                 continue
          599             return server
          600         return None
          601 
          602     def _set_proxy(self, proxy: Optional[dict]):
          603         self.proxy = proxy
          604         dns_hacks.configure_dns_depending_on_proxy(bool(proxy))
          605         self.logger.info(f'setting proxy {proxy}')
          606         util.trigger_callback('proxy_set', self.proxy)
          607 
          608     @log_exceptions
          609     async def set_parameters(self, net_params: NetworkParameters):
          610         proxy = net_params.proxy
          611         proxy_str = serialize_proxy(proxy)
          612         server = net_params.server
          613         # sanitize parameters
          614         try:
          615             if proxy:
          616                 proxy_modes.index(proxy['mode']) + 1
          617                 int(proxy['port'])
          618         except:
          619             return
          620         self.config.set_key('auto_connect', net_params.auto_connect, False)
          621         self.config.set_key('oneserver', net_params.oneserver, False)
          622         self.config.set_key('proxy', proxy_str, False)
          623         self.config.set_key('server', str(server), True)
          624         # abort if changes were not allowed by config
          625         if self.config.get('server') != str(server) \
          626                 or self.config.get('proxy') != proxy_str \
          627                 or self.config.get('oneserver') != net_params.oneserver:
          628             return
          629 
          630         async with self.restart_lock:
          631             self.auto_connect = net_params.auto_connect
          632             if self.proxy != proxy or self.oneserver != net_params.oneserver:
          633                 # Restart the network defaulting to the given server
          634                 await self.stop(full_shutdown=False)
          635                 self.default_server = server
          636                 await self._start()
          637             elif self.default_server != server:
          638                 await self.switch_to_interface(server)
          639             else:
          640                 await self.switch_lagging_interface()
          641 
          642     def _maybe_set_oneserver(self) -> None:
          643         oneserver = bool(self.config.get('oneserver', False))
          644         self.oneserver = oneserver
          645         self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
          646 
          647     async def _switch_to_random_interface(self):
          648         '''Switch to a random connected server other than the current one'''
          649         servers = self.get_interfaces()    # Those in connected state
          650         if self.default_server in servers:
          651             servers.remove(self.default_server)
          652         if servers:
          653             await self.switch_to_interface(random.choice(servers))
          654 
          655     async def switch_lagging_interface(self):
          656         """If auto_connect and lagging, switch interface (only within fork)."""
          657         if self.auto_connect and await self._server_is_lagging():
          658             # switch to one that has the correct header (not height)
          659             best_header = self.blockchain().header_at_tip()
          660             with self.interfaces_lock: interfaces = list(self.interfaces.values())
          661             filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
          662             if filtered:
          663                 chosen_iface = random.choice(filtered)
          664                 await self.switch_to_interface(chosen_iface.server)
          665 
          666     async def switch_unwanted_fork_interface(self) -> None:
          667         """If auto_connect, maybe switch to another fork/chain."""
          668         if not self.auto_connect or not self.interface:
          669             return
          670         with self.interfaces_lock: interfaces = list(self.interfaces.values())
          671         pref_height = self._blockchain_preferred_block['height']
          672         pref_hash   = self._blockchain_preferred_block['hash']
          673         # shortcut for common case
          674         if pref_height == 0:
          675             return
          676         # maybe try switching chains; starting with most desirable first
          677         matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
          678         chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
          679         for rank, chain in enumerate(chains_to_try):
          680             # check if main interface is already on this fork
          681             if self.interface.blockchain == chain:
          682                 return
          683             # switch to another random interface that is on this fork, if any
          684             filtered = [iface for iface in interfaces
          685                         if iface.blockchain == chain]
          686             if filtered:
          687                 self.logger.info(f"switching to (more) preferred fork (rank {rank})")
          688                 chosen_iface = random.choice(filtered)
          689                 await self.switch_to_interface(chosen_iface.server)
          690                 return
          691         self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
          692 
          693     async def switch_to_interface(self, server: ServerAddr):
          694         """Switch to server as our main interface. If no connection exists,
          695         queue interface to be started. The actual switch will
          696         happen when the interface becomes ready.
          697         """
          698         self.default_server = server
          699         old_interface = self.interface
          700         old_server = old_interface.server if old_interface else None
          701 
          702         # Stop any current interface in order to terminate subscriptions,
          703         # and to cancel tasks in interface.taskgroup.
          704         if old_server and old_server != server:
          705             # don't wait for old_interface to close as that might be slow:
          706             await self.taskgroup.spawn(self._close_interface(old_interface))
          707 
          708         if server not in self.interfaces:
          709             self.interface = None
          710             await self.taskgroup.spawn(self._run_new_interface(server))
          711             return
          712 
          713         i = self.interfaces[server]
          714         if old_interface != i:
          715             self.logger.info(f"switching to {server}")
          716             assert i.ready.done(), "interface we are switching to is not ready yet"
          717             blockchain_updated = i.blockchain != self.blockchain()
          718             self.interface = i
          719             await i.taskgroup.spawn(self._request_server_info(i))
          720             util.trigger_callback('default_server_changed')
          721             self.default_server_changed_event.set()
          722             self.default_server_changed_event.clear()
          723             self._set_status('connected')
          724             util.trigger_callback('network_updated')
          725             if blockchain_updated:
          726                 util.trigger_callback('blockchain_updated')
          727 
          728     async def _close_interface(self, interface: Optional[Interface]):
          729         if not interface:
          730             return
          731         if interface.server in self._closing_ifaces:
          732             return
          733         self._closing_ifaces.add(interface.server)
          734         with self.interfaces_lock:
          735             if self.interfaces.get(interface.server) == interface:
          736                 self.interfaces.pop(interface.server)
          737         if interface == self.interface:
          738             self.interface = None
          739         try:
          740             # this can take some time if server/connection is slow:
          741             await interface.close()
          742             await interface.got_disconnected.wait()
          743         finally:
          744             self._closing_ifaces.discard(interface.server)
          745 
          746     @with_recent_servers_lock
          747     def _add_recent_server(self, server: ServerAddr) -> None:
          748         self._on_connection_successfully_established(server)
          749         # list is ordered
          750         if server in self._recent_servers:
          751             self._recent_servers.remove(server)
          752         self._recent_servers.insert(0, server)
          753         self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
          754         self._save_recent_servers()
          755 
          756     async def connection_down(self, interface: Interface):
          757         '''A connection to server either went down, or was never made.
          758         We distinguish by whether it is in self.interfaces.'''
          759         if not interface: return
          760         if interface.server == self.default_server:
          761             self._set_status('disconnected')
          762         await self._close_interface(interface)
          763         util.trigger_callback('network_updated')
          764 
          765     def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
          766         if self.oneserver and not self.auto_connect:
          767             return request_type.MOST_RELAXED
          768         if self.proxy:
          769             return request_type.RELAXED
          770         return request_type.NORMAL
          771 
          772     @ignore_exceptions  # do not kill outer taskgroup
          773     @log_exceptions
          774     async def _run_new_interface(self, server: ServerAddr):
          775         if (server in self.interfaces
          776                 or server in self._connecting_ifaces
          777                 or server in self._closing_ifaces):
          778             return
          779         self._connecting_ifaces.add(server)
          780         if server == self.default_server:
          781             self.logger.info(f"connecting to {server} as new interface")
          782             self._set_status('connecting')
          783         self._trying_addr_now(server)
          784 
          785         interface = Interface(network=self, server=server, proxy=self.proxy)
          786         # note: using longer timeouts here as DNS can sometimes be slow!
          787         timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
          788         try:
          789             await asyncio.wait_for(interface.ready, timeout)
          790         except BaseException as e:
          791             self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
          792             await interface.close()
          793             return
          794         else:
          795             with self.interfaces_lock:
          796                 assert server not in self.interfaces
          797                 self.interfaces[server] = interface
          798         finally:
          799             self._connecting_ifaces.discard(server)
          800 
          801         if server == self.default_server:
          802             await self.switch_to_interface(server)
          803 
          804         self._has_ever_managed_to_connect_to_server = True
          805         self._add_recent_server(server)
          806         util.trigger_callback('network_updated')
          807 
          808     def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
          809         # main interface is exempt. this makes switching servers easier
          810         if iface_to_check.is_main_server():
          811             return True
          812         if not iface_to_check.bucket_based_on_ipaddress():
          813             return True
          814         # bucket connected interfaces
          815         with self.interfaces_lock:
          816             interfaces = list(self.interfaces.values())
          817         if iface_to_check in interfaces:
          818             interfaces.remove(iface_to_check)
          819         buckets = defaultdict(list)
          820         for iface in interfaces:
          821             buckets[iface.bucket_based_on_ipaddress()].append(iface)
          822         # check proposed server against buckets
          823         onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
          824         if iface_to_check.is_tor():
          825             # keep number of onion servers below half of all connected servers
          826             if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
          827                 return False
          828         else:
          829             bucket = iface_to_check.bucket_based_on_ipaddress()
          830             if len(buckets[bucket]) > 0:
          831                 return False
          832         return True
          833 
          834     def best_effort_reliable(func):
          835         @functools.wraps(func)
          836         async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
          837             for i in range(10):
          838                 iface = self.interface
          839                 # retry until there is a main interface
          840                 if not iface:
          841                     async with ignore_after(1):
          842                         await self.default_server_changed_event.wait()
          843                     continue  # try again
          844                 assert iface.ready.done(), "interface not ready yet"
          845                 # try actual request
          846                 try:
          847                     async with TaskGroup(wait=any) as group:
          848                         task = await group.spawn(func(self, *args, **kwargs))
          849                         await group.spawn(iface.got_disconnected.wait())
          850                 except RequestTimedOut:
          851                     await iface.close()
          852                     await iface.got_disconnected.wait()
          853                     continue  # try again
          854                 except RequestCorrupted as e:
          855                     # TODO ban server?
          856                     iface.logger.exception(f"RequestCorrupted: {e}")
          857                     await iface.close()
          858                     await iface.got_disconnected.wait()
          859                     continue  # try again
          860                 if task.done() and not task.cancelled():
          861                     return task.result()
          862                 # otherwise; try again
          863             raise BestEffortRequestFailed('no interface to do request on... gave up.')
          864         return make_reliable_wrapper
          865 
          866     def catch_server_exceptions(func):
          867         @functools.wraps(func)
          868         async def wrapper(self, *args, **kwargs):
          869             try:
          870                 return await func(self, *args, **kwargs)
          871             except aiorpcx.jsonrpc.CodeMessageError as e:
          872                 raise UntrustedServerReturnedError(original_exception=e) from e
          873         return wrapper
          874 
          875     @best_effort_reliable
          876     @catch_server_exceptions
          877     async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
          878         return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
          879 
          880     @best_effort_reliable
          881     async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
          882         if timeout is None:
          883             timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
          884         # TODO: libbitcoin
          885         _ec = await self.interface.broadcast_transaction(tx.serialize(), timeout=timeout)
          886         if _ec != 0:
          887             raise TxBroadcastServerReturnedError(f"not validated, error: {_ec!r}")
          888 
          889     async def try_broadcasting(self, tx, name):
          890         try:
          891             await self.broadcast_transaction(tx)
          892         except Exception as e:
          893             self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
          894         else:
          895             self.logger.info(f'success: broadcasting {name} {tx.txid()}')
          896 
          897     @staticmethod
          898     def sanitize_tx_broadcast_response(server_msg) -> str:
          899         # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
          900         # So, we use substring matching to grok the error message.
          901         # server_msg is untrusted input so it should not be shown to the user. see #4968
          902         server_msg = str(server_msg)
          903         server_msg = server_msg.replace("\n", r"\n")
          904         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
          905         # grep "reason ="
          906         policy_error_messages = {
          907             r"version": _("Transaction uses non-standard version."),
          908             r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
          909             r"scriptsig-size": None,
          910             r"scriptsig-not-pushonly": None,
          911             r"scriptpubkey":
          912                 ("scriptpubkey\n" +
          913                  _("Some of the outputs pay to a non-standard script.")),
          914             r"bare-multisig": None,
          915             r"dust":
          916                 (_("Transaction could not be broadcast due to dust outputs.\n"
          917                    "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
          918                    "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
          919             r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
          920         }
          921         for substring in policy_error_messages:
          922             if substring in server_msg:
          923                 msg = policy_error_messages[substring]
          924                 return msg if msg else substring
          925         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
          926         script_error_messages = {
          927             r"Script evaluated without error but finished with a false/empty top stack element",
          928             r"Script failed an OP_VERIFY operation",
          929             r"Script failed an OP_EQUALVERIFY operation",
          930             r"Script failed an OP_CHECKMULTISIGVERIFY operation",
          931             r"Script failed an OP_CHECKSIGVERIFY operation",
          932             r"Script failed an OP_NUMEQUALVERIFY operation",
          933             r"Script is too big",
          934             r"Push value size limit exceeded",
          935             r"Operation limit exceeded",
          936             r"Stack size limit exceeded",
          937             r"Signature count negative or greater than pubkey count",
          938             r"Pubkey count negative or limit exceeded",
          939             r"Opcode missing or not understood",
          940             r"Attempted to use a disabled opcode",
          941             r"Operation not valid with the current stack size",
          942             r"Operation not valid with the current altstack size",
          943             r"OP_RETURN was encountered",
          944             r"Invalid OP_IF construction",
          945             r"Negative locktime",
          946             r"Locktime requirement not satisfied",
          947             r"Signature hash type missing or not understood",
          948             r"Non-canonical DER signature",
          949             r"Data push larger than necessary",
          950             r"Only push operators allowed in signatures",
          951             r"Non-canonical signature: S value is unnecessarily high",
          952             r"Dummy CHECKMULTISIG argument must be zero",
          953             r"OP_IF/NOTIF argument must be minimal",
          954             r"Signature must be zero for failed CHECK(MULTI)SIG operation",
          955             r"NOPx reserved for soft-fork upgrades",
          956             r"Witness version reserved for soft-fork upgrades",
          957             r"Taproot version reserved for soft-fork upgrades",
          958             r"OP_SUCCESSx reserved for soft-fork upgrades",
          959             r"Public key version reserved for soft-fork upgrades",
          960             r"Public key is neither compressed or uncompressed",
          961             r"Stack size must be exactly one after execution",
          962             r"Extra items left on stack after execution",
          963             r"Witness program has incorrect length",
          964             r"Witness program was passed an empty witness",
          965             r"Witness program hash mismatch",
          966             r"Witness requires empty scriptSig",
          967             r"Witness requires only-redeemscript scriptSig",
          968             r"Witness provided for non-witness script",
          969             r"Using non-compressed keys in segwit",
          970             r"Invalid Schnorr signature size",
          971             r"Invalid Schnorr signature hash type",
          972             r"Invalid Schnorr signature",
          973             r"Invalid Taproot control block size",
          974             r"Too much signature validation relative to witness weight",
          975             r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
          976             r"OP_IF/NOTIF argument must be minimal in tapscript",
          977             r"Using OP_CODESEPARATOR in non-witness script",
          978             r"Signature is found in scriptCode",
          979         }
          980         for substring in script_error_messages:
          981             if substring in server_msg:
          982                 return substring
          983         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
          984         # grep "REJECT_"
          985         # grep "TxValidationResult"
          986         # should come after script_error.cpp (due to e.g. non-mandatory-script-verify-flag)
          987         validation_error_messages = {
          988             r"coinbase": None,
          989             r"tx-size-small": None,
          990             r"non-final": None,
          991             r"txn-already-in-mempool": None,
          992             r"txn-mempool-conflict": None,
          993             r"txn-already-known": None,
          994             r"non-BIP68-final": None,
          995             r"bad-txns-nonstandard-inputs": None,
          996             r"bad-witness-nonstandard": None,
          997             r"bad-txns-too-many-sigops": None,
          998             r"mempool min fee not met":
          999                 ("mempool min fee not met\n" +
         1000                  _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
         1001                    "fit it into its mempool. The mempool is already full of hundreds of megabytes "
         1002                    "of transactions that all pay higher fees. Try to increase the fee.")),
         1003             r"min relay fee not met": None,
         1004             r"absurdly-high-fee": None,
         1005             r"max-fee-exceeded": None,
         1006             r"too-long-mempool-chain": None,
         1007             r"bad-txns-spends-conflicting-tx": None,
         1008             r"insufficient fee": ("insufficient fee\n" +
         1009                  _("Your transaction is trying to replace another one in the mempool but it "
         1010                    "does not meet the rules to do so. Try to increase the fee.")),
         1011             r"too many potential replacements": None,
         1012             r"replacement-adds-unconfirmed": None,
         1013             r"mempool full": None,
         1014             r"non-mandatory-script-verify-flag": None,
         1015             r"mandatory-script-verify-flag-failed": None,
         1016             r"Transaction check failed": None,
         1017         }
         1018         for substring in validation_error_messages:
         1019             if substring in server_msg:
         1020                 msg = validation_error_messages[substring]
         1021                 return msg if msg else substring
         1022         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
         1023         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
         1024         # grep "RPC_TRANSACTION"
         1025         # grep "RPC_DESERIALIZATION_ERROR"
         1026         rawtransaction_error_messages = {
         1027             r"Missing inputs": None,
         1028             r"Inputs missing or spent": None,
         1029             r"transaction already in block chain": None,
         1030             r"Transaction already in block chain": None,
         1031             r"TX decode failed": None,
         1032             r"Peer-to-peer functionality missing or disabled": None,
         1033             r"Transaction rejected by AcceptToMemoryPool": None,
         1034             r"AcceptToMemoryPool failed": None,
         1035             r"Fee exceeds maximum configured by user": None,
         1036         }
         1037         for substring in rawtransaction_error_messages:
         1038             if substring in server_msg:
         1039                 msg = rawtransaction_error_messages[substring]
         1040                 return msg if msg else substring
         1041         # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
         1042         # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
         1043         # grep "REJECT_"
         1044         # grep "TxValidationResult"
         1045         tx_verify_error_messages = {
         1046             r"bad-txns-vin-empty": None,
         1047             r"bad-txns-vout-empty": None,
         1048             r"bad-txns-oversize": None,
         1049             r"bad-txns-vout-negative": None,
         1050             r"bad-txns-vout-toolarge": None,
         1051             r"bad-txns-txouttotal-toolarge": None,
         1052             r"bad-txns-inputs-duplicate": None,
         1053             r"bad-cb-length": None,
         1054             r"bad-txns-prevout-null": None,
         1055             r"bad-txns-inputs-missingorspent":
         1056                 ("bad-txns-inputs-missingorspent\n" +
         1057                  _("You might have a local transaction in your wallet that this transaction "
         1058                    "builds on top. You need to either broadcast or remove the local tx.")),
         1059             r"bad-txns-premature-spend-of-coinbase": None,
         1060             r"bad-txns-inputvalues-outofrange": None,
         1061             r"bad-txns-in-belowout": None,
         1062             r"bad-txns-fee-outofrange": None,
         1063         }
         1064         for substring in tx_verify_error_messages:
         1065             if substring in server_msg:
         1066                 msg = tx_verify_error_messages[substring]
         1067                 return msg if msg else substring
         1068         # otherwise:
         1069         return _("Unknown error")
         1070 
         1071     @best_effort_reliable
         1072     @catch_server_exceptions
         1073     async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
         1074         return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
         1075 
         1076     @best_effort_reliable
         1077     @catch_server_exceptions
         1078     async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
         1079         return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
         1080 
         1081     @best_effort_reliable
         1082     @catch_server_exceptions
         1083     async def get_history_for_scripthash(self, sh: str) -> List[dict]:
         1084         return await self.interface.get_history_for_scripthash(sh)
         1085 
         1086     @best_effort_reliable
         1087     @catch_server_exceptions
         1088     async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
         1089         return await self.interface.listunspent_for_scripthash(sh)
         1090 
         1091     @best_effort_reliable
         1092     @catch_server_exceptions
         1093     async def get_balance_for_scripthash(self, sh: str) -> dict:
         1094         return await self.interface.get_balance_for_scripthash(sh)
         1095 
         1096     @best_effort_reliable
         1097     @catch_server_exceptions
         1098     async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
         1099         return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
         1100 
         1101     def blockchain(self) -> Blockchain:
         1102         interface = self.interface
         1103         if interface and interface.blockchain is not None:
         1104             self._blockchain = interface.blockchain
         1105         return self._blockchain
         1106 
         1107     def get_blockchains(self):
         1108         out = {}  # blockchain_id -> list(interfaces)
         1109         with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
         1110         with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
         1111         for chain_id, bc in blockchain_items:
         1112             r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
         1113             if r:
         1114                 out[chain_id] = r
         1115         return out
         1116 
         1117     def _set_preferred_chain(self, chain: Optional[Blockchain]):
         1118         if chain:
         1119             height = chain.get_max_forkpoint()
         1120             header_hash = chain.get_hash(height)
         1121         else:
         1122             height = 0
         1123             header_hash = constants.net.GENESIS
         1124         self._blockchain_preferred_block = {
         1125             'height': height,
         1126             'hash': header_hash,
         1127         }
         1128         self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block)
         1129 
         1130     async def follow_chain_given_id(self, chain_id: str) -> None:
         1131         bc = blockchain.blockchains.get(chain_id)
         1132         if not bc:
         1133             raise Exception('blockchain {} not found'.format(chain_id))
         1134         self._set_preferred_chain(bc)
         1135         # select server on this chain
         1136         with self.interfaces_lock: interfaces = list(self.interfaces.values())
         1137         interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
         1138         if len(interfaces_on_selected_chain) == 0: return
         1139         chosen_iface = random.choice(interfaces_on_selected_chain)  # type: Interface
         1140         # switch to server (and save to config)
         1141         net_params = self.get_parameters()
         1142         net_params = net_params._replace(server=chosen_iface.server)
         1143         await self.set_parameters(net_params)
         1144 
         1145     async def follow_chain_given_server(self, server: ServerAddr) -> None:
         1146         # note that server_str should correspond to a connected interface
         1147         iface = self.interfaces.get(server)
         1148         if iface is None:
         1149             return
         1150         self._set_preferred_chain(iface.blockchain)
         1151         # switch to server (and save to config)
         1152         net_params = self.get_parameters()
         1153         net_params = net_params._replace(server=server)
         1154         await self.set_parameters(net_params)
         1155 
         1156     def get_server_height(self) -> int:
         1157         """Length of header chain, as claimed by main interface."""
         1158         interface = self.interface
         1159         return interface.tip if interface else 0
         1160 
         1161     def get_local_height(self):
         1162         """Length of header chain, POW-verified.
         1163         In case of a chain split, this is for the branch the main interface is on,
         1164         but it is the tip of that branch (even if main interface is behind).
         1165         """
         1166         return self.blockchain().height()
         1167 
         1168     def export_checkpoints(self, path):
         1169         """Run manually to generate blockchain checkpoints.
         1170         Kept for console use only.
         1171         """
         1172         cp = self.blockchain().get_checkpoints()
         1173         with open(path, 'w', encoding='utf-8') as f:
         1174             f.write(json.dumps(cp, indent=4))
         1175 
         1176     async def _start(self):
         1177         assert not self.taskgroup
         1178         self.taskgroup = taskgroup = SilentTaskGroup()
         1179         assert not self.interface and not self.interfaces
         1180         assert not self._connecting_ifaces
         1181         assert not self._closing_ifaces
         1182         self.logger.info('starting network')
         1183         self._clear_addr_retry_times()
         1184         self._set_proxy(deserialize_proxy(self.config.get('proxy')))
         1185         self._maybe_set_oneserver()
         1186         await self.taskgroup.spawn(self._run_new_interface(self.default_server))
         1187 
         1188         async def main():
         1189             self.logger.info("starting taskgroup.")
         1190             try:
         1191                 # note: if a task finishes with CancelledError, that
         1192                 # will NOT raise, and the group will keep the other tasks running
         1193                 async with taskgroup as group:
         1194                     await group.spawn(self._maintain_sessions())
         1195                     [await group.spawn(job) for job in self._jobs]
         1196             except asyncio.CancelledError:
         1197                 raise
         1198             except Exception as e:
         1199                 self.logger.exception("taskgroup died.")
         1200             finally:
         1201                 self.logger.info("taskgroup stopped.")
         1202         asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
         1203 
         1204         util.trigger_callback('network_updated')
         1205 
         1206     def start(self, jobs: Iterable = None):
         1207         """Schedule starting the network, along with the given job co-routines.
         1208 
         1209         Note: the jobs will *restart* every time the network restarts, e.g. on proxy
         1210         setting changes.
         1211         """
         1212         self._jobs = jobs or []
         1213         asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
         1214 
         1215     @log_exceptions
         1216     async def stop(self, *, full_shutdown: bool = True):
         1217         self.logger.info("stopping network")
         1218         # timeout: if full_shutdown, it is up to the caller to time us out,
         1219         #          otherwise if e.g. restarting due to proxy changes, we time out fast
         1220         async with (nullcontext() if full_shutdown else ignore_after(1)):
         1221             async with TaskGroup() as group:
         1222                 await group.spawn(self.taskgroup.cancel_remaining())
         1223                 if full_shutdown:
         1224                     await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
         1225         self.taskgroup = None
         1226         self.interface = None
         1227         self.interfaces = {}
         1228         self._connecting_ifaces.clear()
         1229         self._closing_ifaces.clear()
         1230         if not full_shutdown:
         1231             util.trigger_callback('network_updated')
         1232 
         1233     async def _ensure_there_is_a_main_interface(self):
         1234         if self.is_connected():
         1235             return
         1236         # if auto_connect is set, try a different server
         1237         if self.auto_connect and not self.is_connecting():
         1238             await self._switch_to_random_interface()
         1239         # if auto_connect is not set, or still no main interface, retry current
         1240         if not self.is_connected() and not self.is_connecting():
         1241             if self._can_retry_addr(self.default_server, urgent=True):
         1242                 await self.switch_to_interface(self.default_server)
         1243 
         1244     async def _maintain_sessions(self):
         1245         async def maybe_start_new_interfaces():
         1246             num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
         1247             for i in range(self.num_server - num_existing_ifaces):
         1248                 # FIXME this should try to honour "healthy spread of connected servers"
         1249                 server = self._get_next_server_to_try()
         1250                 if server:
         1251                     await self.taskgroup.spawn(self._run_new_interface(server))
         1252         async def maintain_healthy_spread_of_connected_servers():
         1253             with self.interfaces_lock: interfaces = list(self.interfaces.values())
         1254             random.shuffle(interfaces)
         1255             for iface in interfaces:
         1256                 if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
         1257                     self.logger.info(f"disconnecting from {iface.server}. too many connected "
         1258                                      f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
         1259                     await self._close_interface(iface)
         1260         async def maintain_main_interface():
         1261             await self._ensure_there_is_a_main_interface()
         1262             if self.is_connected():
         1263                 if self.config.is_fee_estimates_update_required():
         1264                     await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
         1265 
         1266         while True:
         1267             try:
         1268                 await maybe_start_new_interfaces()
         1269                 await maintain_healthy_spread_of_connected_servers()
         1270                 await maintain_main_interface()
         1271             except asyncio.CancelledError:
         1272                 # suppress spurious cancellations
         1273                 group = self.taskgroup
         1274                 if not group or group.closed():
         1275                     raise
         1276             await asyncio.sleep(0.1)
         1277 
         1278     @classmethod
         1279     async def _send_http_on_proxy(cls, method: str, url: str, params: str = None,
         1280                                   body: bytes = None, json: dict = None, headers=None,
         1281                                   on_finish=None, timeout=None):
         1282         async def default_on_finish(resp: ClientResponse):
         1283             resp.raise_for_status()
         1284             return await resp.text()
         1285         if headers is None:
         1286             headers = {}
         1287         if on_finish is None:
         1288             on_finish = default_on_finish
         1289         network = cls.get_instance()
         1290         proxy = network.proxy if network else None
         1291         async with make_aiohttp_session(proxy, timeout=timeout) as session:
         1292             if method == 'get':
         1293                 async with session.get(url, params=params, headers=headers) as resp:
         1294                     return await on_finish(resp)
         1295             elif method == 'post':
         1296                 assert body is not None or json is not None, 'body or json must be supplied if method is post'
         1297                 if body is not None:
         1298                     async with session.post(url, data=body, headers=headers) as resp:
         1299                         return await on_finish(resp)
         1300                 elif json is not None:
         1301                     async with session.post(url, json=json, headers=headers) as resp:
         1302                         return await on_finish(resp)
         1303             else:
         1304                 assert False
         1305 
         1306     @classmethod
         1307     def send_http_on_proxy(cls, method, url, **kwargs):
         1308         network = cls.get_instance()
         1309         if network:
         1310             assert network._loop_thread is not threading.currentThread()
         1311             loop = network.asyncio_loop
         1312         else:
         1313             loop = asyncio.get_event_loop()
         1314         coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
         1315         # note: _send_http_on_proxy has its own timeout, so no timeout here:
         1316         return coro.result()
         1317 
         1318     # methods used in scripts
         1319     async def get_peers(self):
         1320         while not self.is_connected():
         1321             await asyncio.sleep(1)
         1322         session = self.interface.session
         1323         # TODO: libbitcoin
         1324         return parse_servers(await session.send_request('server.peers.subscribe'))
         1325 
         1326     async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence):
         1327         responses = dict()
         1328         async def get_response(server: ServerAddr):
         1329             interface = Interface(network=self, server=server, proxy=self.proxy)
         1330             timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
         1331             try:
         1332                 await asyncio.wait_for(interface.ready, timeout)
         1333             except BaseException as e:
         1334                 await interface.close()
         1335                 return
         1336             try:
         1337                 # TODO: libbitcoin XXX:
         1338                 res = await interface.session.send_request(method, params, timeout=10)
         1339             except Exception as e:
         1340                 res = e
         1341             responses[interface.server] = res
         1342         async with TaskGroup() as group:
         1343             for server in servers:
         1344                 await group.spawn(get_response(server))
         1345         return responses