URI: 
       tsplit network main_taskgroup: create daemon.taskgroup - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 37747d74695daf000ffb0db9626191193a6b0555
   DIR parent 796853106537e7909729172a1315935028e6765c
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Thu,  9 Jan 2020 17:50:05 +0100
       
       split network main_taskgroup: create daemon.taskgroup
       
       network.main_taskgroup restarts every time the proxy settings are changed,
       many long-running tasks (some introduced with lightning) are not prepared for and do not want this.
       
       Diffstat:
         M electrum/daemon.py                  |      52 ++++++++++++++++++++++---------
         M electrum/lnworker.py                |      10 ++++++----
         M electrum/network.py                 |      19 ++++++++++++++-----
       
       3 files changed, 58 insertions(+), 23 deletions(-)
       ---
   DIR diff --git a/electrum/daemon.py b/electrum/daemon.py
       t@@ -29,7 +29,7 @@ import time
        import traceback
        import sys
        import threading
       -from typing import Dict, Optional, Tuple
       +from typing import Dict, Optional, Tuple, Iterable
        from base64 import b64decode
        from collections import defaultdict
        
       t@@ -39,6 +39,7 @@ import jsonrpcclient
        import jsonrpcserver
        from jsonrpcserver import response
        from jsonrpcclient.clients.aiohttp_client import AiohttpClient
       +from aiorpcx import TaskGroup
        
        from .network import Network
        from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
       t@@ -280,28 +281,44 @@ class Daemon(Logger):
                    if fd is None:
                        raise Exception('failed to lock daemon; already running?')
                self.asyncio_loop = asyncio.get_event_loop()
       -        if config.get('offline'):
       -            self.network = None
       -        else:
       -            self.network = Network(config)
       +        self.network = None
       +        if not config.get('offline'):
       +            self.network = Network(config, daemon=self)
                self.fx = FxThread(config, self.network)
                self.gui_object = None
                # path -> wallet;   make sure path is standardized.
                self._wallets = {}  # type: Dict[str, Abstract_Wallet]
       -        jobs = [self.fx.run]
       +        daemon_jobs = []
                # Setup JSONRPC server
                if listen_jsonrpc:
       -            jobs.append(self.start_jsonrpc(config, fd))
       +            daemon_jobs.append(self.start_jsonrpc(config, fd))
                # request server
       -        if self.config.get('run_payserver'):
       +        self.pay_server = None
       +        if not config.get('offline') and self.config.get('run_payserver'):
                    self.pay_server = PayServer(self)
       -            jobs.append(self.pay_server.run())
       +            daemon_jobs.append(self.pay_server.run())
                # server-side watchtower
       -        if self.config.get('run_watchtower'):
       +        self.watchtower = None
       +        if not config.get('offline') and self.config.get('run_watchtower'):
                    self.watchtower = WatchTowerServer(self.network)
       -            jobs.append(self.watchtower.run)
       +            daemon_jobs.append(self.watchtower.run)
                if self.network:
       -            self.network.start(jobs)
       +            self.network.start(jobs=[self.fx.run])
       +
       +        self.taskgroup = TaskGroup()
       +        asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)
       +
       +    @log_exceptions
       +    async def _run(self, jobs: Iterable = None):
       +        if jobs is None:
       +            jobs = []
       +        try:
       +            async with self.taskgroup as group:
       +                [await group.spawn(job) for job in jobs]
       +        except BaseException as e:
       +            self.logger.exception('daemon.taskgroup died.')
       +        finally:
       +            self.logger.info("stopping daemon.taskgroup")
        
            async def authenticate(self, headers):
                if self.rpc_password == '':
       t@@ -462,7 +479,7 @@ class Daemon(Logger):
        
            def is_running(self):
                with self.running_lock:
       -            return self.running
       +            return self.running and not self.taskgroup.closed()
        
            def stop(self):
                with self.running_lock:
       t@@ -477,8 +494,15 @@ class Daemon(Logger):
                if self.network:
                    self.logger.info("shutting down network")
                    self.network.stop()
       -        self.logger.info("stopping, removing lockfile")
       +        self.logger.info("stopping taskgroup")
       +        fut = asyncio.run_coroutine_threadsafe(self.taskgroup.cancel_remaining(), self.asyncio_loop)
       +        try:
       +            fut.result(timeout=2)
       +        except (asyncio.TimeoutError, asyncio.CancelledError):
       +            pass
       +        self.logger.info("removing lockfile")
                remove_lockfile(get_lockfile(self.config))
       +        self.logger.info("stopped")
        
            def run_gui(self, config, plugins):
                threading.current_thread().setName('GUI')
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -188,10 +188,11 @@ class LNWorker(Logger):
            def start_network(self, network: 'Network'):
                self.network = network
                self.config = network.config
       +        daemon = network.daemon
                self.channel_db = self.network.channel_db
                self._last_tried_peer = {}  # type: Dict[LNPeerAddr, float]  # LNPeerAddr -> unix timestamp
                self._add_peers_from_config()
       -        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
       +        asyncio.run_coroutine_threadsafe(daemon.taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
        
            def _add_peers_from_config(self):
                peer_list = self.config.get('lightning_peers', [])
       t@@ -306,7 +307,7 @@ class LNGossip(LNWorker):
        
            def start_network(self, network: 'Network'):
                super().start_network(network)
       -        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
       +        asyncio.run_coroutine_threadsafe(network.daemon.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
        
            async def maintain_db(self):
                await self.channel_db.load_data()
       t@@ -409,6 +410,7 @@ class LNWallet(LNWorker):
                self.lnwatcher = LNWatcher(network)
                self.lnwatcher.start_network(network)
                self.network = network
       +        daemon = network.daemon
                self.network.register_callback(self.on_update_open_channel, ['update_open_channel'])
                self.network.register_callback(self.on_update_closed_channel, ['update_closed_channel'])
                for chan_id, chan in self.channels.items():
       t@@ -422,8 +424,8 @@ class LNWallet(LNWorker):
                        self.sync_with_local_watchtower(),
                        self.sync_with_remote_watchtower(),
                ]:
       -            # FIXME: exceptions in those coroutines will cancel network.main_taskgroup
       -            asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
       +            # FIXME: exceptions in those coroutines will cancel daemon.taskgroup
       +            asyncio.run_coroutine_threadsafe(daemon.taskgroup.spawn(coro), self.network.asyncio_loop)
        
            def peer_closed(self, peer):
                for chan in self.channels_for_peer(peer.pubkey).values():
   DIR diff --git a/electrum/network.py b/electrum/network.py
       t@@ -33,7 +33,7 @@ import json
        import sys
        import ipaddress
        import asyncio
       -from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING
       +from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable
        import traceback
        import concurrent
        from concurrent import futures
       t@@ -67,6 +67,7 @@ if TYPE_CHECKING:
            from .lnworker import LNGossip
            from .lnwatcher import WatchTower
            from .transaction import Transaction
       +    from .daemon import Daemon
        
        
        _logger = get_logger(__name__)
       t@@ -237,7 +238,7 @@ class Network(Logger):
        
            LOGGING_SHORTCUT = 'n'
        
       -    def __init__(self, config: SimpleConfig):
       +    def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
                global _INSTANCE
                assert _INSTANCE is None, "Network is a singleton!"
                _INSTANCE = self
       t@@ -250,6 +251,9 @@ class Network(Logger):
        
                assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
                self.config = config
       +
       +        self.daemon = daemon
       +
                blockchain.read_blockchains(self.config)
                self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
                self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None)  # type: Optional[Dict]
       t@@ -747,7 +751,7 @@ class Network(Logger):
                    self.trigger_callback('network_updated')
                    if blockchain_updated: self.trigger_callback('blockchain_updated')
        
       -    async def _close_interface(self, interface):
       +    async def _close_interface(self, interface: Interface):
                if interface:
                    with self.interfaces_lock:
                        if self.interfaces.get(interface.server) == interface:
       t@@ -1185,7 +1189,12 @@ class Network(Logger):
        
                self.trigger_callback('network_updated')
        
       -    def start(self, jobs: List=None):
       +    def start(self, jobs: Iterable = None):
       +        """Schedule starting the network, along with the given job co-routines.
       +
       +        Note: the jobs will *restart* every time the network restarts, e.g. on proxy
       +        setting changes.
       +        """
                self._jobs = jobs or []
                asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
        
       t@@ -1264,7 +1273,7 @@ class Network(Logger):
                    except asyncio.CancelledError:
                        # suppress spurious cancellations
                        group = self.main_taskgroup
       -                if not group or group._closed:
       +                if not group or group.closed():
                            raise
                    await asyncio.sleep(0.1)