URI: 
       tnetwork: clean-up. make external API clear. rm interface_lock (mostly). - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 952e9b87e14b93dd82b603ffdd749a1d8a48212b
   DIR parent 7cc628dc7909c3ff44a5e5dfc0de60bc8e438ce1
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Tue, 25 Sep 2018 16:38:26 +0200
       
       network: clean-up. make external API clear. rm interface_lock (mostly).
       
       Diffstat:
         M electrum/commands.py                |       2 +-
         M electrum/daemon.py                  |       7 ++++---
         M electrum/gui/kivy/main_window.py    |      15 +++++++++------
         M electrum/gui/kivy/uix/dialogs/sett… |       5 +++--
         M electrum/gui/kivy/uix/ui_screens/p… |       2 +-
         M electrum/gui/kivy/uix/ui_screens/s… |       2 +-
         M electrum/gui/qt/main_window.py      |       3 ++-
         M electrum/gui/qt/network_dialog.py   |      16 ++++++++--------
         M electrum/gui/stdio.py               |       3 ++-
         M electrum/gui/text.py                |       6 ++++--
         M electrum/interface.py               |      44 +++++++++++++------------------
         M electrum/network.py                 |     421 +++++++++++++++----------------
         M electrum/plugin.py                  |       1 +
         M electrum/verifier.py                |      11 +++--------
       
       14 files changed, 255 insertions(+), 283 deletions(-)
       ---
   DIR diff --git a/electrum/commands.py b/electrum/commands.py
       t@@ -255,7 +255,7 @@ class Commands:
            def broadcast(self, tx):
                """Broadcast a transaction to the network. """
                tx = Transaction(tx)
       -        return self.network.broadcast_transaction_from_non_network_thread(tx)
       +        return self.network.run_from_another_thread(self.network.broadcast_transaction(tx))
        
            @command('')
            def createmultisig(self, num, pubkeys):
   DIR diff --git a/electrum/daemon.py b/electrum/daemon.py
       t@@ -28,11 +28,11 @@ import os
        import time
        import traceback
        import sys
       +import threading
        
       -# from jsonrpc import JSONRPCResponseManager
        import jsonrpclib
       -from .jsonrpc import VerifyingJSONRPCServer
        
       +from .jsonrpc import VerifyingJSONRPCServer
        from .version import ELECTRUM_VERSION
        from .network import Network
        from .util import json_decode, DaemonThread
       t@@ -129,7 +129,7 @@ class Daemon(DaemonThread):
                    self.network = Network(config)
                self.fx = FxThread(config, self.network)
                if self.network:
       -            self.network.start(self.fx.run())
       +            self.network.start([self.fx.run])
                self.gui = None
                self.wallets = {}
                # Setup JSONRPC server
       t@@ -308,6 +308,7 @@ class Daemon(DaemonThread):
                    gui_name = 'qt'
                gui = __import__('electrum.gui.' + gui_name, fromlist=['electrum'])
                self.gui = gui.ElectrumGui(config, self, plugins)
       +        threading.current_thread().setName('GUI')
                try:
                    self.gui.main()
                except BaseException as e:
   DIR diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py
       t@@ -16,6 +16,7 @@ from electrum.plugin import run_hook
        from electrum.util import format_satoshis, format_satoshis_plain
        from electrum.paymentrequest import PR_UNPAID, PR_PAID, PR_UNKNOWN, PR_EXPIRED
        from electrum import blockchain
       +from electrum.network import Network
        from .i18n import _
        
        from kivy.app import App
       t@@ -96,7 +97,7 @@ class ElectrumWindow(App):
            def on_auto_connect(self, instance, x):
                net_params = self.network.get_parameters()
                net_params = net_params._replace(auto_connect=self.auto_connect)
       -        self.network.set_parameters(net_params)
       +        self.network.run_from_another_thread(self.network.set_parameters(net_params))
            def toggle_auto_connect(self, x):
                self.auto_connect = not self.auto_connect
        
       t@@ -116,9 +117,10 @@ class ElectrumWindow(App):
                from .uix.dialogs.choice_dialog import ChoiceDialog
                chains = self.network.get_blockchains()
                def cb(name):
       -            for index, b in blockchain.blockchains.items():
       +            with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
       +            for index, b in blockchain_items:
                        if name == b.get_name():
       -                    self.network.follow_chain(index)
       +                    self.network.run_from_another_thread(self.network.follow_chain(index))
                names = [blockchain.blockchains[b].get_name() for b in chains]
                if len(names) > 1:
                    cur_chain = self.network.blockchain().get_name()
       t@@ -265,7 +267,7 @@ class ElectrumWindow(App):
                title = _('Electrum App')
                self.electrum_config = config = kwargs.get('config', None)
                self.language = config.get('language', 'en')
       -        self.network = network = kwargs.get('network', None)
       +        self.network = network = kwargs.get('network', None)  # type: Network
                if self.network:
                    self.num_blocks = self.network.get_local_height()
                    self.num_nodes = len(self.network.get_interfaces())
       t@@ -708,7 +710,7 @@ class ElectrumWindow(App):
                    status = _("Offline")
                elif self.network.is_connected():
                    server_height = self.network.get_server_height()
       -            server_lag = self.network.get_local_height() - server_height
       +            server_lag = self.num_blocks - server_height
                    if not self.wallet.up_to_date or server_height == 0:
                        status = _("Synchronizing...")
                    elif server_lag > 1:
       t@@ -885,7 +887,8 @@ class ElectrumWindow(App):
                Clock.schedule_once(lambda dt: on_success(tx))
        
            def _broadcast_thread(self, tx, on_complete):
       -        ok, txid = self.network.broadcast_transaction_from_non_network_thread(tx)
       +        ok, txid = self.network.run_from_another_thread(
       +            self.network.broadcast_transaction(tx))
                Clock.schedule_once(lambda dt: on_complete(ok, txid))
        
            def broadcast(self, tx, pr=None):
   DIR diff --git a/electrum/gui/kivy/uix/dialogs/settings.py b/electrum/gui/kivy/uix/dialogs/settings.py
       t@@ -159,8 +159,9 @@ class SettingsDialog(Factory.Popup):
                return proxy.get('host') +':' + proxy.get('port') if proxy else _('None')
        
            def proxy_dialog(self, item, dt):
       +        network = self.app.network
                if self._proxy_dialog is None:
       -            net_params = self.app.network.get_parameters()
       +            net_params = network.get_parameters()
                    proxy = net_params.proxy
                    def callback(popup):
                        nonlocal net_params
       t@@ -175,7 +176,7 @@ class SettingsDialog(Factory.Popup):
                        else:
                            proxy = None
                        net_params = net_params._replace(proxy=proxy)
       -                self.app.network.set_parameters(net_params)
       +                network.run_from_another_thread(network.set_parameters(net_params))
                        item.status = self.proxy_status()
                    popup = Builder.load_file('electrum/gui/kivy/uix/ui_screens/proxy.kv')
                    popup.ids.mode.text = proxy.get('mode') if proxy else 'None'
   DIR diff --git a/electrum/gui/kivy/uix/ui_screens/proxy.kv b/electrum/gui/kivy/uix/ui_screens/proxy.kv
       t@@ -72,6 +72,6 @@ Popup:
                            proxy['password']=str(root.ids.password.text)
                            if proxy['mode']=='none': proxy = None
                            net_params = net_params._replace(proxy=proxy)
       -                    app.network.set_parameters(net_params)
       +                    app.network.run_from_another_thread(app.network.set_parameters(net_params))
                            app.proxy_config = proxy if proxy else {}
                            nd.dismiss()
   DIR diff --git a/electrum/gui/kivy/uix/ui_screens/server.kv b/electrum/gui/kivy/uix/ui_screens/server.kv
       t@@ -58,5 +58,5 @@ Popup:
                        on_release:
                            net_params = app.network.get_parameters()
                            net_params = net_params._replace(host=str(root.ids.host.text), port=str(root.ids.port.text))
       -                    app.network.set_parameters(net_params)
       +                    app.network.run_from_another_thread(app.network.set_parameters(net_params))
                            nd.dismiss()
   DIR diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py
       t@@ -1635,7 +1635,8 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError):
                    if pr and pr.has_expired():
                        self.payment_request = None
                        return False, _("Payment request has expired")
       -            status, msg = self.network.broadcast_transaction_from_non_network_thread(tx)
       +            status, msg = self.network.run_from_another_thread(
       +                self.network.broadcast_transaction(tx))
                    if pr and status is True:
                        self.invoices.set_paid(pr, tx.txid())
                        self.invoices.save()
   DIR diff --git a/electrum/gui/qt/network_dialog.py b/electrum/gui/qt/network_dialog.py
       t@@ -34,6 +34,7 @@ from electrum.i18n import _
        from electrum import constants, blockchain
        from electrum.util import print_error
        from electrum.interface import serialize_server, deserialize_server
       +from electrum.network import Network
        
        from .util import *
        
       t@@ -97,7 +98,7 @@ class NodesListWidget(QTreeWidget):
                pt.setX(50)
                self.customContextMenuRequested.emit(pt)
        
       -    def update(self, network):
       +    def update(self, network: Network):
                self.clear()
                self.addChild = self.addTopLevelItem
                chains = network.get_blockchains()
       t@@ -187,7 +188,7 @@ class ServerListWidget(QTreeWidget):
        
        class NetworkChoiceLayout(object):
        
       -    def __init__(self, network, config, wizard=False):
       +    def __init__(self, network: Network, config, wizard=False):
                self.network = network
                self.config = config
                self.protocol = None
       t@@ -361,7 +362,7 @@ class NetworkChoiceLayout(object):
                status = _("Connected to {0} nodes.").format(n) if n else _("Not connected")
                self.status_label.setText(status)
                chains = self.network.get_blockchains()
       -        if len(chains)>1:
       +        if len(chains) > 1:
                    chain = self.network.blockchain()
                    forkpoint = chain.get_forkpoint()
                    name = chain.get_name()
       t@@ -410,15 +411,14 @@ class NetworkChoiceLayout(object):
                self.set_server()
        
            def follow_branch(self, index):
       -        self.network.follow_chain(index)
       +        self.network.run_from_another_thread(self.network.follow_chain(index))
                self.update()
        
            def follow_server(self, server):
       -        self.network.switch_to_interface(server)
                net_params = self.network.get_parameters()
                host, port, protocol = deserialize_server(server)
                net_params = net_params._replace(host=host, port=port, protocol=protocol)
       -        self.network.set_parameters(net_params)
       +        self.network.run_from_another_thread(self.network.set_parameters(net_params))
                self.update()
        
            def server_changed(self, x):
       t@@ -451,7 +451,7 @@ class NetworkChoiceLayout(object):
                net_params = net_params._replace(host=str(self.server_host.text()),
                                                 port=str(self.server_port.text()),
                                                 auto_connect=self.autoconnect_cb.isChecked())
       -        self.network.set_parameters(net_params)
       +        self.network.run_from_another_thread(self.network.set_parameters(net_params))
        
            def set_proxy(self):
                net_params = self.network.get_parameters()
       t@@ -465,7 +465,7 @@ class NetworkChoiceLayout(object):
                    proxy = None
                    self.tor_cb.setChecked(False)
                net_params = net_params._replace(proxy=proxy)
       -        self.network.set_parameters(net_params)
       +        self.network.run_from_another_thread(self.network.set_parameters(net_params))
        
            def suggest_proxy(self, found_proxy):
                self.tor_proxy = found_proxy
   DIR diff --git a/electrum/gui/stdio.py b/electrum/gui/stdio.py
       t@@ -200,7 +200,8 @@ class ElectrumGui:
                    self.wallet.labels[tx.txid()] = self.str_description
        
                print(_("Please wait..."))
       -        status, msg = self.network.broadcast_transaction_from_non_network_thread(tx)
       +        status, msg = self.network.run_from_another_thread(
       +            self.network.broadcast_transaction(tx))
        
                if status:
                    print(_('Payment sent.'))
   DIR diff --git a/electrum/gui/text.py b/electrum/gui/text.py
       t@@ -365,7 +365,8 @@ class ElectrumGui:
                    self.wallet.labels[tx.txid()] = self.str_description
        
                self.show_message(_("Please wait..."), getchar=False)
       -        status, msg = self.network.broadcast_transaction_from_non_network_thread(tx)
       +        status, msg = self.network.run_from_another_thread(
       +            self.network.broadcast_transaction(tx))
        
                if status:
                    self.show_message(_('Payment sent.'))
       t@@ -410,7 +411,8 @@ class ElectrumGui:
                                return False
                    if out.get('server') or out.get('proxy'):
                        proxy = electrum.network.deserialize_proxy(out.get('proxy')) if out.get('proxy') else proxy_config
       -                self.network.set_parameters(NetworkParameters(host, port, protocol, proxy, auto_connect))
       +                net_params = NetworkParameters(host, port, protocol, proxy, auto_connect)
       +                self.network.run_from_another_thread(self.network.set_parameters(net_params))
        
            def settings_dialog(self):
                fee = str(Decimal(self.config.fee_per_kb()) / COIN)
   DIR diff --git a/electrum/interface.py b/electrum/interface.py
       t@@ -107,11 +107,7 @@ class NotificationSession(ClientSession):
        
        
        class GracefulDisconnect(Exception): pass
       -
       -
        class ErrorParsingSSLCert(Exception): pass
       -
       -
        class ErrorGettingSSLCertFromServer(Exception): pass
        
        
       t@@ -150,8 +146,11 @@ class Interface(PrintError):
                self.tip_header = None
                self.tip = 0
        
       -        # TODO combine?
       -        self.fut = asyncio.get_event_loop().create_task(self.run())
       +        # note that an interface dying MUST NOT kill the whole network,
       +        # hence exceptions raised by "run" need to be caught not to kill
       +        # main_taskgroup! the aiosafe decorator does this.
       +        asyncio.run_coroutine_threadsafe(
       +            self.network.main_taskgroup.spawn(self.run()), self.network.asyncio_loop)
                self.group = SilentTaskGroup()
        
            def diagnostic_name(self):
       t@@ -239,31 +238,29 @@ class Interface(PrintError):
                    sslc.check_hostname = 0
                return sslc
        
       -    def handle_graceful_disconnect(func):
       +    def handle_disconnect(func):
                async def wrapper_func(self, *args, **kwargs):
                    try:
                        return await func(self, *args, **kwargs)
                    except GracefulDisconnect as e:
                        self.print_error("disconnecting gracefully. {}".format(e))
       -                self.exception = e
       +            finally:
       +                await self.network.connection_down(self.server)
                return wrapper_func
        
            @aiosafe
       -    @handle_graceful_disconnect
       +    @handle_disconnect
            async def run(self):
                try:
                    ssl_context = await self._get_ssl_context()
                except (ErrorParsingSSLCert, ErrorGettingSSLCertFromServer) as e:
       -            self.exception = e
       +            self.print_error('disconnecting due to: {} {}'.format(e, type(e)))
                    return
                try:
                    await self.open_session(ssl_context, exit_early=False)
                except (asyncio.CancelledError, OSError, aiorpcx.socks.SOCKSFailure) as e:
                    self.print_error('disconnecting due to: {} {}'.format(e, type(e)))
       -            self.exception = e
                    return
       -        # should never get here (can only exit via exception)
       -        assert False
        
            def mark_ready(self):
                if self.ready.cancelled():
       t@@ -352,9 +349,9 @@ class Interface(PrintError):
                    self.print_error("connection established. version: {}".format(ver))
        
                    async with self.group as group:
       -                await group.spawn(self.ping())
       -                await group.spawn(self.run_fetch_blocks())
       -                await group.spawn(self.monitor_connection())
       +                await group.spawn(self.ping)
       +                await group.spawn(self.run_fetch_blocks)
       +                await group.spawn(self.monitor_connection)
                        # NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group!
        
            async def monitor_connection(self):
       t@@ -368,11 +365,8 @@ class Interface(PrintError):
                    await asyncio.sleep(300)
                    await self.session.send_request('server.ping')
        
       -    def close(self):
       -        async def job():
       -            self.fut.cancel()
       -            await self.group.cancel_remaining()
       -        asyncio.run_coroutine_threadsafe(job(), self.network.asyncio_loop)
       +    async def close(self):
       +        await self.group.cancel_remaining()
        
            async def run_fetch_blocks(self):
                header_queue = asyncio.Queue()
       t@@ -389,7 +383,7 @@ class Interface(PrintError):
                    self.mark_ready()
                    await self._process_header_at_tip()
                    self.network.trigger_callback('network_updated')
       -            self.network.switch_lagging_interface()
       +            await self.network.switch_lagging_interface()
        
            async def _process_header_at_tip(self):
                height, header = self.tip, self.tip_header
       t@@ -517,7 +511,7 @@ class Interface(PrintError):
                        return 'fork_conflict', height
                    self.print_error('forkpoint conflicts with existing fork', branch.path())
                    self._raise_if_fork_conflicts_with_default_server(branch)
       -            self._disconnect_from_interfaces_on_conflicting_blockchain(branch)
       +            await self._disconnect_from_interfaces_on_conflicting_blockchain(branch)
                    branch.write(b'', 0)
                    branch.save_header(bad_header)
                    self.blockchain = branch
       t@@ -543,8 +537,8 @@ class Interface(PrintError):
                if chain_to_delete == chain_of_default_server:
                    raise GracefulDisconnect('refusing to overwrite blockchain of default server')
        
       -    def _disconnect_from_interfaces_on_conflicting_blockchain(self, chain: Blockchain) -> None:
       -        ifaces = self.network.disconnect_from_interfaces_on_given_blockchain(chain)
       +    async def _disconnect_from_interfaces_on_conflicting_blockchain(self, chain: Blockchain) -> None:
       +        ifaces = await self.network.disconnect_from_interfaces_on_given_blockchain(chain)
                if not ifaces: return
                servers = [interface.server for interface in ifaces]
                self.print_error("forcing disconnect of other interfaces: {}".format(servers))
   DIR diff --git a/electrum/network.py b/electrum/network.py
       t@@ -32,18 +32,19 @@ import json
        import sys
        import ipaddress
        import asyncio
       -from typing import NamedTuple, Optional, Sequence
       +from typing import NamedTuple, Optional, Sequence, List
       +import traceback
        
        import dns
        import dns.resolver
        from aiorpcx import TaskGroup
        
        from . import util
       -from .util import PrintError, print_error, aiosafe, bfh
       +from .util import PrintError, print_error, aiosafe, bfh, SilentTaskGroup
        from .bitcoin import COIN
        from . import constants
        from . import blockchain
       -from .blockchain import Blockchain
       +from .blockchain import Blockchain, HEADER_SIZE
        from .interface import Interface, serialize_server, deserialize_server
        from .version import PROTOCOL_VERSION
        from .simple_config import SimpleConfig
       t@@ -160,14 +161,6 @@ INSTANCE = None
        class Network(PrintError):
            """The Network class manages a set of connections to remote electrum
            servers, each connected socket is handled by an Interface() object.
       -    Connections are initiated by a Connection() thread which stops once
       -    the connection succeeds or fails.
       -
       -    Our external API:
       -
       -    - Member functions get_header(), get_interfaces(), get_local_height(),
       -          get_parameters(), get_server_height(), get_status_value(),
       -          is_connected(), set_parameters(), stop()
            """
            verbosity_filter = 'n'
        
       t@@ -195,14 +188,18 @@ class Network(PrintError):
                if not self.default_server:
                    self.default_server = pick_random_server()
        
       -        # locks: if you need to take multiple ones, acquire them in the order they are defined here!
       +        self.main_taskgroup = None
       +        self._jobs = []
       +
       +        # locks
       +        self.restart_lock = asyncio.Lock()
                self.bhi_lock = asyncio.Lock()
       -        self.interface_lock = threading.RLock()            # <- re-entrant
                self.callback_lock = threading.Lock()
                self.recent_servers_lock = threading.RLock()       # <- re-entrant
       +        self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
        
                self.server_peers = {}  # returned by interface (servers that the main interface knows about)
       -        self.recent_servers = self.read_recent_servers()  # note: needs self.recent_servers_lock
       +        self.recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
        
                self.banner = ''
                self.donation_address = ''
       t@@ -219,26 +216,30 @@ class Network(PrintError):
                # kick off the network.  interface is the main server we are currently
                # communicating with.  interfaces is the set of servers we are connecting
                # to or have an ongoing connection with
       -        self.interface = None              # note: needs self.interface_lock
       -        self.interfaces = {}               # note: needs self.interface_lock
       +        self.interface = None
       +        self.interfaces = {}
                self.auto_connect = self.config.get('auto_connect', True)
                self.connecting = set()
                self.server_queue = None
       -        self.server_queue_group = None
       +        self.proxy = None
       +
                self.asyncio_loop = asyncio.get_event_loop()
       -        self.start_network(deserialize_server(self.default_server)[2],
       -                           deserialize_proxy(self.config.get('proxy')))
       +        #self.asyncio_loop.set_debug(1)
       +        self._run_forever = asyncio.Future()
       +        self._thread = threading.Thread(target=self.asyncio_loop.run_until_complete,
       +                                                args=(self._run_forever,),
       +                                                name='Network')
       +        self._thread.start()
       +
       +    def run_from_another_thread(self, coro):
       +        assert self._thread != threading.current_thread(), 'must not be called from network thread'
       +        fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
       +        return fut.result()
        
            @staticmethod
            def get_instance():
                return INSTANCE
        
       -    def with_interface_lock(func):
       -        def func_wrapper(self, *args, **kwargs):
       -            with self.interface_lock:
       -                return func(self, *args, **kwargs)
       -        return func_wrapper
       -
            def with_recent_servers_lock(func):
                def func_wrapper(self, *args, **kwargs):
                    with self.recent_servers_lock:
       t@@ -266,7 +267,7 @@ class Network(PrintError):
                    else:
                        self.asyncio_loop.call_soon_threadsafe(callback, event, *args)
        
       -    def read_recent_servers(self):
       +    def _read_recent_servers(self):
                if not self.config.path:
                    return []
                path = os.path.join(self.config.path, "recent_servers")
       t@@ -278,7 +279,7 @@ class Network(PrintError):
                    return []
        
            @with_recent_servers_lock
       -    def save_recent_servers(self):
       +    def _save_recent_servers(self):
                if not self.config.path:
                    return
                path = os.path.join(self.config.path, "recent_servers")
       t@@ -289,11 +290,11 @@ class Network(PrintError):
                except:
                    pass
        
       -    @with_interface_lock
            def get_server_height(self):
       -        return self.interface.tip if self.interface else 0
       +        interface = self.interface
       +        return interface.tip if interface else 0
        
       -    def server_is_lagging(self):
       +    async def _server_is_lagging(self):
                sh = self.get_server_height()
                if not sh:
                    self.print_error('no height for main interface')
       t@@ -304,7 +305,7 @@ class Network(PrintError):
                    self.print_error('%s is lagging (%d vs %d)' % (self.default_server, sh, lh))
                return result
        
       -    def set_status(self, status):
       +    def _set_status(self, status):
                self.connection_status = status
                self.notify('status')
        
       t@@ -315,7 +316,7 @@ class Network(PrintError):
            def is_connecting(self):
                return self.connection_status == 'connecting'
        
       -    async def request_server_info(self, interface):
       +    async def _request_server_info(self, interface):
                await interface.ready
                session = interface.session
        
       t@@ -340,9 +341,9 @@ class Network(PrintError):
                    await group.spawn(get_donation_address)
                    await group.spawn(get_server_peers)
                    await group.spawn(get_relay_fee)
       -            await group.spawn(self.request_fee_estimates(interface))
       +            await group.spawn(self._request_fee_estimates(interface))
        
       -    async def request_fee_estimates(self, interface):
       +    async def _request_fee_estimates(self, interface):
                session = interface.session
                from .simple_config import FEE_ETA_TARGETS
                self.config.requested_fee_estimates()
       t@@ -389,10 +390,10 @@ class Network(PrintError):
                if self.is_connected():
                    return self.donation_address
        
       -    @with_interface_lock
       -    def get_interfaces(self):
       -        '''The interfaces that are in connected state'''
       -        return list(self.interfaces.keys())
       +    def get_interfaces(self) -> List[str]:
       +        """The list of servers for the connected interfaces."""
       +        with self.interfaces_lock:
       +            return list(self.interfaces)
        
            @with_recent_servers_lock
            def get_servers(self):
       t@@ -407,31 +408,31 @@ class Network(PrintError):
                    if host not in out:
                        out[host] = {protocol: port}
                # add servers received from main interface
       -        if self.server_peers:
       -            out.update(filter_version(self.server_peers.copy()))
       +        server_peers = self.server_peers
       +        if server_peers:
       +            out.update(filter_version(server_peers.copy()))
                # potentially filter out some
                if self.config.get('noonion'):
                    out = filter_noonion(out)
                return out
        
       -    @with_interface_lock
       -    def start_interface(self, server):
       +    def _start_interface(self, server):
                if server not in self.interfaces and server not in self.connecting:
                    if server == self.default_server:
                        self.print_error("connecting to %s as new interface" % server)
       -                self.set_status('connecting')
       +                self._set_status('connecting')
                    self.connecting.add(server)
                    self.server_queue.put(server)
        
       -    def start_random_interface(self):
       -        with self.interface_lock:
       +    def _start_random_interface(self):
       +        with self.interfaces_lock:
                    exclude_set = self.disconnected_servers | set(self.interfaces) | self.connecting
                server = pick_random_server(self.get_servers(), self.protocol, exclude_set)
                if server:
       -            self.start_interface(server)
       +            self._start_interface(server)
                return server
        
       -    def set_proxy(self, proxy: Optional[dict]):
       +    def _set_proxy(self, proxy: Optional[dict]):
                self.proxy = proxy
                # Store these somewhere so we can un-monkey-patch
                if not hasattr(socket, "_getaddrinfo"):
       t@@ -467,10 +468,10 @@ class Network(PrintError):
                        addr = str(answers[0])
                    else:
                        addr = host
       -        except dns.exception.DNSException:
       +        except dns.exception.DNSException as e:
                    # dns failed for some reason, e.g. dns.resolver.NXDOMAIN
                    # this is normal. Simply report back failure:
       -            raise socket.gaierror(11001, 'getaddrinfo failed')
       +            raise socket.gaierror(11001, 'getaddrinfo failed') from e
                except BaseException as e:
                    # Possibly internal error in dnspython :( see #4483
                    # Fall back to original socket.getaddrinfo to resolve dns.
       t@@ -478,48 +479,8 @@ class Network(PrintError):
                    addr = host
                return socket._getaddrinfo(addr, *args, **kwargs)
        
       -    @with_interface_lock
       -    def start_network(self, protocol: str, proxy: Optional[dict]):
       -        assert not self.interface and not self.interfaces
       -        assert not self.connecting and not self.server_queue
       -        assert not self.server_queue_group
       -        self.print_error('starting network')
       -        self.disconnected_servers = set([])  # note: needs self.interface_lock
       -        self.protocol = protocol
       -        self._init_server_queue()
       -        self.set_proxy(proxy)
       -        self.start_interface(self.default_server)
       -        self.trigger_callback('network_updated')
       -
       -    def _init_server_queue(self):
       -        self.server_queue = queue.Queue()
       -        self.server_queue_group = server_queue_group = TaskGroup()
       -        async def job():
       -            forever = asyncio.Event()
       -            async with server_queue_group as group:
       -                await group.spawn(forever.wait())
       -        asyncio.run_coroutine_threadsafe(job(), self.asyncio_loop)
       -
       -    @with_interface_lock
       -    def stop_network(self):
       -        self.print_error("stopping network")
       -        for interface in list(self.interfaces.values()):
       -            self.close_interface(interface)
       -        if self.interface:
       -            self.close_interface(self.interface)
       -        assert self.interface is None
       -        assert not self.interfaces
       -        self.connecting.clear()
       -        self._stop_server_queue()
       -        self.trigger_callback('network_updated')
       -
       -    def _stop_server_queue(self):
       -        # Get a new queue - no old pending connections thanks!
       -        self.server_queue = None
       -        asyncio.run_coroutine_threadsafe(self.server_queue_group.cancel_remaining(), self.asyncio_loop)
       -        self.server_queue_group = None
       -
       -    def set_parameters(self, net_params: NetworkParameters):
       +    @aiosafe
       +    async def set_parameters(self, net_params: NetworkParameters):
                proxy = net_params.proxy
                proxy_str = serialize_proxy(proxy)
                host, port, protocol = net_params.host, net_params.port, net_params.protocol
       t@@ -538,30 +499,30 @@ class Network(PrintError):
                # abort if changes were not allowed by config
                if self.config.get('server') != server_str or self.config.get('proxy') != proxy_str:
                    return
       -        self.auto_connect = net_params.auto_connect
       -        if self.proxy != proxy or self.protocol != protocol:
       -            # Restart the network defaulting to the given server
       -            with self.interface_lock:
       -                self.stop_network()
       +
       +        async with self.restart_lock:
       +            self.auto_connect = net_params.auto_connect
       +            if self.proxy != proxy or self.protocol != protocol:
       +                # Restart the network defaulting to the given server
       +                await self._stop()
                        self.default_server = server_str
       -                self.start_network(protocol, proxy)
       -        elif self.default_server != server_str:
       -            self.switch_to_interface(server_str)
       -        else:
       -            self.switch_lagging_interface()
       +                await self._start()
       +            elif self.default_server != server_str:
       +                await self.switch_to_interface(server_str)
       +            else:
       +                await self.switch_lagging_interface()
        
       -    def switch_to_random_interface(self):
       +    async def _switch_to_random_interface(self):
                '''Switch to a random connected server other than the current one'''
                servers = self.get_interfaces()    # Those in connected state
                if self.default_server in servers:
                    servers.remove(self.default_server)
                if servers:
       -            self.switch_to_interface(random.choice(servers))
       +            await self.switch_to_interface(random.choice(servers))
        
       -    @with_interface_lock
       -    def switch_lagging_interface(self):
       +    async def switch_lagging_interface(self):
                '''If auto_connect and lagging, switch interface'''
       -        if self.server_is_lagging() and self.auto_connect:
       +        if await self._server_is_lagging() and self.auto_connect:
                    # switch to one that has the correct header (not height)
                    header = self.blockchain().read_header(self.get_local_height())
                    def filt(x):
       t@@ -569,111 +530,105 @@ class Network(PrintError):
                        b = header
                        assert type(a) is type(b)
                        return a == b
       -            filtered = list(map(lambda x: x[0], filter(filt, self.interfaces.items())))
       +
       +            with self.interfaces_lock: interfaces_items = list(self.interfaces.items())
       +            filtered = list(map(lambda x: x[0], filter(filt, interfaces_items)))
                    if filtered:
                        choice = random.choice(filtered)
       -                self.switch_to_interface(choice)
       -
       -    @with_interface_lock
       -    def switch_to_interface(self, server):
       -        '''Switch to server as our interface.  If no connection exists nor
       -        being opened, start a thread to connect.  The actual switch will
       -        happen on receipt of the connection notification.  Do nothing
       -        if server already is our interface.'''
       +                await self.switch_to_interface(choice)
       +
       +    async def switch_to_interface(self, server: str):
       +        """Switch to server as our main interface. If no connection exists,
       +        queue interface to be started. The actual switch will
       +        happen when the interface becomes ready.
       +        """
                self.default_server = server
       +        old_interface = self.interface
       +        old_server = old_interface.server if old_interface else None
       +
       +        # Stop any current interface in order to terminate subscriptions,
       +        # and to cancel tasks in interface.group.
       +        # However, for headers sub, give preference to this interface
       +        # over unknown ones, i.e. start it again right away.
       +        if old_server and old_server != server:
       +            await self._close_interface(old_interface)
       +            if len(self.interfaces) <= self.num_server:
       +                self._start_interface(old_server)
       +
                if server not in self.interfaces:
                    self.interface = None
       -            self.start_interface(server)
       +            self._start_interface(server)
                    return
        
                i = self.interfaces[server]
       -        if self.interface != i:
       +        if old_interface != i:
                    self.print_error("switching to", server)
       -            blockchain_updated = False
       -            if self.interface is not None:
       -                blockchain_updated = i.blockchain != self.interface.blockchain
       -                # Stop any current interface in order to terminate subscriptions,
       -                # and to cancel tasks in interface.group.
       -                # However, for headers sub, give preference to this interface
       -                # over unknown ones, i.e. start it again right away.
       -                old_server = self.interface.server
       -                self.close_interface(self.interface)
       -                if old_server != server and len(self.interfaces) <= self.num_server:
       -                    self.start_interface(old_server)
       -
       +            blockchain_updated = i.blockchain != self.blockchain()
                    self.interface = i
       -            asyncio.run_coroutine_threadsafe(
       -                i.group.spawn(self.request_server_info(i)), self.asyncio_loop)
       +            await i.group.spawn(self._request_server_info(i))
                    self.trigger_callback('default_server_changed')
       -            self.set_status('connected')
       +            self._set_status('connected')
                    self.trigger_callback('network_updated')
                    if blockchain_updated: self.trigger_callback('blockchain_updated')
        
       -    @with_interface_lock
       -    def close_interface(self, interface):
       +    async def _close_interface(self, interface):
                if interface:
       -            if interface.server in self.interfaces:
       -                self.interfaces.pop(interface.server)
       +            with self.interfaces_lock:
       +                if self.interfaces.get(interface.server) == interface:
       +                    self.interfaces.pop(interface.server)
                    if interface.server == self.default_server:
                        self.interface = None
       -            interface.close()
       +            await interface.close()
        
            @with_recent_servers_lock
       -    def add_recent_server(self, server):
       +    def _add_recent_server(self, server):
                # list is ordered
                if server in self.recent_servers:
                    self.recent_servers.remove(server)
                self.recent_servers.insert(0, server)
                self.recent_servers = self.recent_servers[0:20]
       -        self.save_recent_servers()
       +        self._save_recent_servers()
        
       -    @with_interface_lock
       -    def connection_down(self, server):
       +    async def connection_down(self, server):
                '''A connection to server either went down, or was never made.
                We distinguish by whether it is in self.interfaces.'''
                self.disconnected_servers.add(server)
                if server == self.default_server:
       -            self.set_status('disconnected')
       -        if server in self.interfaces:
       -            self.close_interface(self.interfaces[server])
       +            self._set_status('disconnected')
       +        interface = self.interfaces.get(server, None)
       +        if interface:
       +            await self._close_interface(interface)
                    self.trigger_callback('network_updated')
        
            @aiosafe
       -    async def new_interface(self, server):
       +    async def _run_new_interface(self, server):
                interface = Interface(self, server, self.config.path, self.proxy)
                timeout = 10 if not self.proxy else 20
                try:
                    await asyncio.wait_for(interface.ready, timeout)
                except BaseException as e:
       -            #import traceback
                    #traceback.print_exc()
                    self.print_error(server, "couldn't launch because", str(e), str(type(e)))
       -            # note: connection_down will not call interface.close() as
       -            # interface is not yet in self.interfaces. OTOH, calling
       -            # interface.close() here will sometimes raise deep inside the
       -            # asyncio internal select.select... instead, interface will close
       -            # itself when it detects the cancellation of interface.ready;
       -            # however this might take several seconds...
       -            self.connection_down(server)
       +            await interface.close()
                    return
                else:
       -            with self.interface_lock:
       +            with self.interfaces_lock:
       +                assert server not in self.interfaces
                        self.interfaces[server] = interface
                finally:
       -            with self.interface_lock:
       -                try: self.connecting.remove(server)
       -                except KeyError: pass
       +            try: self.connecting.remove(server)
       +            except KeyError: pass
        
                if server == self.default_server:
       -            self.switch_to_interface(server)
       +            await self.switch_to_interface(server)
        
       -        self.add_recent_server(server)
       +        self._add_recent_server(server)
                self.trigger_callback('network_updated')
        
       -    def init_headers_file(self):
       +    async def _init_headers_file(self):
                b = blockchain.blockchains[0]
                filename = b.path()
       -        length = 80 * len(constants.net.CHECKPOINTS) * 2016
       +        length = HEADER_SIZE * len(constants.net.CHECKPOINTS) * 2016
                if not os.path.exists(filename) or os.path.getsize(filename) < length:
                    with open(filename, 'wb') as f:
                        if length > 0:
       t@@ -686,11 +641,6 @@ class Network(PrintError):
            async def get_merkle_for_transaction(self, tx_hash, tx_height):
                return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
        
       -    def broadcast_transaction_from_non_network_thread(self, tx, timeout=10):
       -        # note: calling this from the network thread will deadlock it
       -        fut = asyncio.run_coroutine_threadsafe(self.broadcast_transaction(tx, timeout=timeout), self.asyncio_loop)
       -        return fut.result()
       -
            async def broadcast_transaction(self, tx, timeout=10):
                try:
                    out = await self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout)
       t@@ -706,101 +656,124 @@ class Network(PrintError):
            async def request_chunk(self, height, tip=None, *, can_return_early=False):
                return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
        
       -    @with_interface_lock
            def blockchain(self):
       -        if self.interface and self.interface.blockchain is not None:
       -            self.blockchain_index = self.interface.blockchain.forkpoint
       +        interface = self.interface
       +        if interface and interface.blockchain is not None:
       +            self.blockchain_index = interface.blockchain.forkpoint
                return blockchain.blockchains[self.blockchain_index]
        
       -    @with_interface_lock
            def get_blockchains(self):
                out = {}  # blockchain_id -> list(interfaces)
                with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
       +        with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
                for chain_id, bc in blockchain_items:
       -            r = list(filter(lambda i: i.blockchain==bc, list(self.interfaces.values())))
       +            r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
                    if r:
                        out[chain_id] = r
                return out
        
       -    @with_interface_lock
       -    def disconnect_from_interfaces_on_given_blockchain(self, chain: Blockchain) -> Sequence[Interface]:
       +    async def disconnect_from_interfaces_on_given_blockchain(self, chain: Blockchain) -> Sequence[Interface]:
                chain_id = chain.forkpoint
                ifaces = self.get_blockchains().get(chain_id) or []
                for interface in ifaces:
       -            self.connection_down(interface.server)
       +            await self.connection_down(interface.server)
                return ifaces
        
       -    def follow_chain(self, index):
       -        bc = blockchain.blockchains.get(index)
       +    async def follow_chain(self, chain_id):
       +        bc = blockchain.blockchains.get(chain_id)
                if bc:
       -            self.blockchain_index = index
       -            self.config.set_key('blockchain_index', index)
       -            with self.interface_lock:
       -                interfaces = list(self.interfaces.values())
       -            for i in interfaces:
       -                if i.blockchain == bc:
       -                    self.switch_to_interface(i.server)
       +            self.blockchain_index = chain_id
       +            self.config.set_key('blockchain_index', chain_id)
       +            with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
       +            for iface in interfaces_values:
       +                if iface.blockchain == bc:
       +                    await self.switch_to_interface(iface.server)
                            break
                else:
       -            raise Exception('blockchain not found', index)
       +            raise Exception('blockchain not found', chain_id)
        
       -        with self.interface_lock:
       -            if self.interface:
       -                net_params = self.get_parameters()
       -                host, port, protocol = deserialize_server(self.interface.server)
       -                net_params = net_params._replace(host=host, port=port, protocol=protocol)
       -                self.set_parameters(net_params)
       +        if self.interface:
       +            net_params = self.get_parameters()
       +            host, port, protocol = deserialize_server(self.interface.server)
       +            net_params = net_params._replace(host=host, port=port, protocol=protocol)
       +            await self.set_parameters(net_params)
        
            def get_local_height(self):
                return self.blockchain().height()
        
            def export_checkpoints(self, path):
       -        # run manually from the console to generate checkpoints
       +        """Run manually to generate blockchain checkpoints.
       +        Kept for console use only.
       +        """
                cp = self.blockchain().get_checkpoints()
                with open(path, 'w', encoding='utf-8') as f:
                    f.write(json.dumps(cp, indent=4))
        
       -    def start(self, fx=None):
       -        self.main_taskgroup = TaskGroup()
       +    async def _start(self, jobs=None):
       +        if jobs is None: jobs = self._jobs
       +        self._jobs = jobs
       +        assert not self.main_taskgroup
       +        self.main_taskgroup = SilentTaskGroup()
       +
                async def main():
       -            self.init_headers_file()
       -            async with self.main_taskgroup as group:
       -                await group.spawn(self.maintain_sessions())
       -                if fx: await group.spawn(fx)
       -        self._wrapper_thread = threading.Thread(target=self.asyncio_loop.run_until_complete, args=(main(),))
       -        self._wrapper_thread.start()
       +            try:
       +                await self._init_headers_file()
       +                async with self.main_taskgroup as group:
       +                    await group.spawn(self._maintain_sessions())
       +                    [await group.spawn(job) for job in jobs]
       +            except Exception as e:
       +                traceback.print_exc(file=sys.stderr)
       +                raise e
       +        asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
       +
       +        assert not self.interface and not self.interfaces
       +        assert not self.connecting and not self.server_queue
       +        self.print_error('starting network')
       +        self.disconnected_servers = set([])
       +        self.protocol = deserialize_server(self.default_server)[2]
       +        self.server_queue = queue.Queue()
       +        self._set_proxy(deserialize_proxy(self.config.get('proxy')))
       +        self._start_interface(self.default_server)
       +        self.trigger_callback('network_updated')
       +
       +    def start(self, jobs=None):
       +        asyncio.run_coroutine_threadsafe(self._start(jobs=jobs), self.asyncio_loop)
       +
       +    async def _stop(self, full_shutdown=False):
       +        self.print_error("stopping network")
       +        try:
       +            asyncio.wait_for(await self.main_taskgroup.cancel_remaining(), timeout=2)
       +        except asyncio.TimeoutError: pass
       +        self.main_taskgroup = None
       +
       +        assert self.interface is None
       +        assert not self.interfaces
       +        self.connecting.clear()
       +        self.server_queue = None
       +        self.trigger_callback('network_updated')
       +
       +        if full_shutdown:
       +            self._run_forever.set_result(1)
        
            def stop(self):
       -        asyncio.run_coroutine_threadsafe(self.main_taskgroup.cancel_remaining(), self.asyncio_loop)
       +        assert self._thread != threading.current_thread(), 'must not be called from network thread'
       +        fut = asyncio.run_coroutine_threadsafe(self._stop(full_shutdown=True), self.asyncio_loop)
       +        fut.result()
        
            def join(self):
       -        self._wrapper_thread.join(1)
       +        self._thread.join(1)
        
       -    async def maintain_sessions(self):
       +    async def _maintain_sessions(self):
                while True:
       +            # launch already queued up new interfaces
                    while self.server_queue.qsize() > 0:
                        server = self.server_queue.get()
       -                await self.server_queue_group.spawn(self.new_interface(server))
       -            remove = []
       -            for k, i in self.interfaces.items():
       -                if i.fut.done() and not i.exception:
       -                    assert False, "interface future should not finish without exception"
       -                if i.exception:
       -                    if not i.fut.done():
       -                        try: i.fut.cancel()
       -                        except Exception as e: self.print_error('exception while cancelling fut', e)
       -                    try:
       -                        raise i.exception
       -                    except BaseException as e:
       -                        self.print_error(i.server, "errored because:", str(e), str(type(e)))
       -                    remove.append(k)
       -            for k in remove:
       -                self.connection_down(k)
       -
       -            # nodes
       +                await self.main_taskgroup.spawn(self._run_new_interface(server))
       +
       +            # maybe queue new interfaces to be launched later
                    now = time.time()
                    for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
       -                self.start_random_interface()
       +                self._start_random_interface()
                    if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
                        self.print_error('network: retrying connections')
                        self.disconnected_servers = set([])
       t@@ -810,16 +783,16 @@ class Network(PrintError):
                    if not self.is_connected():
                        if self.auto_connect:
                            if not self.is_connecting():
       -                        self.switch_to_random_interface()
       +                        await self._switch_to_random_interface()
                        else:
                            if self.default_server in self.disconnected_servers:
                                if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
                                    self.disconnected_servers.remove(self.default_server)
                                    self.server_retry_time = now
                            else:
       -                        self.switch_to_interface(self.default_server)
       +                        await self.switch_to_interface(self.default_server)
                    else:
                        if self.config.is_fee_estimates_update_required():
       -                    await self.interface.group.spawn(self.request_fee_estimates, self.interface)
       +                    await self.interface.group.spawn(self._request_fee_estimates, self.interface)
        
                    await asyncio.sleep(0.1)
   DIR diff --git a/electrum/plugin.py b/electrum/plugin.py
       t@@ -47,6 +47,7 @@ class Plugins(DaemonThread):
            @profiler
            def __init__(self, config, is_local, gui_name):
                DaemonThread.__init__(self)
       +        self.setName('Plugins')
                self.pkgpath = os.path.dirname(plugins.__file__)
                self.config = config
                self.hw_wallets = {}
   DIR diff --git a/electrum/verifier.py b/electrum/verifier.py
       t@@ -47,7 +47,6 @@ class SPV(PrintError):
            def __init__(self, network, wallet):
                self.wallet = wallet
                self.network = network
       -        self.blockchain = network.blockchain()
                self.merkle_roots = {}  # txid -> merkle root (once it has been verified)
                self.requested_merkle = set()  # txid set of pending requests
        
       t@@ -55,18 +54,14 @@ class SPV(PrintError):
                return '{}:{}'.format(self.__class__.__name__, self.wallet.diagnostic_name())
        
            async def main(self, group: TaskGroup):
       +        self.blockchain = self.network.blockchain()
                while True:
                    await self._maybe_undo_verifications()
                    await self._request_proofs(group)
                    await asyncio.sleep(0.1)
        
            async def _request_proofs(self, group: TaskGroup):
       -        blockchain = self.network.blockchain()
       -        if not blockchain:
       -            self.print_error("no blockchain")
       -            return
       -
       -        local_height = self.network.get_local_height()
       +        local_height = self.blockchain.height()
                unverified = self.wallet.get_unverified_txs()
        
                for tx_hash, tx_height in unverified.items():
       t@@ -77,7 +72,7 @@ class SPV(PrintError):
                    if tx_height <= 0 or tx_height > local_height:
                        continue
                    # if it's in the checkpoint region, we still might not have the header
       -            header = blockchain.read_header(tx_height)
       +            header = self.blockchain.read_header(tx_height)
                    if header is None:
                        if tx_height < constants.net.max_checkpoint():
                            await group.spawn(self.network.request_chunk(tx_height, None, can_return_early=True))