URI: 
       tlnworker: add own taskgroup (run in daemon.taskgroup) - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit c8260249b0d9fc5ffb8a97a817ec9489563e4a7c
   DIR parent 0bf09d14a05caf920cf89a277e656a4a9e37a088
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Thu, 27 Feb 2020 18:50:03 +0100
       
       lnworker: add own taskgroup (run in daemon.taskgroup)
       
       Diffstat:
         M electrum/lnpeer.py                  |       2 +-
         M electrum/lnworker.py                |      32 ++++++++++++++++++++-----------
       
       2 files changed, 22 insertions(+), 12 deletions(-)
       ---
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -238,7 +238,7 @@ class Peer(Logger):
                        self.close_and_cleanup()
                return wrapper_func
        
       -    @ignore_exceptions  # do not kill main_taskgroup
       +    @ignore_exceptions  # do not kill outer taskgroup
            @log_exceptions
            @handle_disconnect
            async def main_loop(self):
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -32,7 +32,7 @@ from .transaction import Transaction
        from .crypto import sha256
        from .bip32 import BIP32Node
        from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
       -from .util import ignore_exceptions, make_aiohttp_session
       +from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup
        from .util import timestamp_to_datetime
        from .util import MyEncoder
        from .logging import Logger
       t@@ -126,6 +126,7 @@ class LNWorker(Logger):
                Logger.__init__(self)
                self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
                self.peers = {}  # type: Dict[bytes, Peer]  # pubkey -> Peer
       +        self.taskgroup = SilentTaskGroup()
                # set some feature flags as baseline for both LNWallet and LNGossip
                # note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it
                self.localfeatures = LnLocalFeatures(0)
       t@@ -136,6 +137,7 @@ class LNWorker(Logger):
                return {}
        
            async def maybe_listen(self):
       +        # FIXME: only one LNWorker can listen at a time (single port)
                listen_addr = self.config.get('lightning_listen')
                if listen_addr:
                    addr, port = listen_addr.rsplit(':', 2)
       t@@ -151,11 +153,21 @@ class LNWorker(Logger):
                            return
                        peer = Peer(self, node_id, transport)
                        self.peers[node_id] = peer
       -                await self.network.main_taskgroup.spawn(peer.main_loop())
       -            await asyncio.start_server(cb, addr, int(port))
       +                await self.taskgroup.spawn(peer.main_loop())
       +            try:
       +                await asyncio.start_server(cb, addr, int(port))
       +            except OSError as e:
       +                self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
        
       -    @log_exceptions
       +    @ignore_exceptions  # don't kill outer taskgroup
            async def main_loop(self):
       +        try:
       +            async with self.taskgroup as group:
       +                await group.spawn(self._maintain_connectivity())
       +        except Exception as e:
       +            self.logger.exception("taskgroup died.")
       +
       +    async def _maintain_connectivity(self):
                while True:
                    await asyncio.sleep(1)
                    now = time.time()
       t@@ -176,7 +188,7 @@ class LNWorker(Logger):
                self._last_tried_peer[peer_addr] = time.time()
                self.logger.info(f"adding peer {peer_addr}")
                peer = Peer(self, node_id, transport)
       -        await self.network.main_taskgroup.spawn(peer.main_loop())
       +        await self.taskgroup.spawn(peer.main_loop())
                self.peers[node_id] = peer
                return peer
        
       t@@ -328,7 +340,7 @@ class LNGossip(LNWorker):
            def start_network(self, network: 'Network'):
                assert network
                super().start_network(network)
       -        asyncio.run_coroutine_threadsafe(network.daemon.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
       +        asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
        
            async def maintain_db(self):
                await self.channel_db.load_data()
       t@@ -429,7 +441,6 @@ class LNWallet(LNWorker):
                self.lnwatcher = LNWalletWatcher(self, network)
                self.lnwatcher.start_network(network)
                self.network = network
       -        daemon = network.daemon
                for chan_id, chan in self.channels.items():
                    self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
        
       t@@ -441,8 +452,8 @@ class LNWallet(LNWorker):
                        self.sync_with_local_watchtower(),
                        self.sync_with_remote_watchtower(),
                ]:
       -            # FIXME: exceptions in those coroutines will cancel daemon.taskgroup
       -            asyncio.run_coroutine_threadsafe(daemon.taskgroup.spawn(coro), self.network.asyncio_loop)
       +            tg_coro = self.taskgroup.spawn(coro)
       +            asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
        
            def peer_closed(self, peer):
                for chan in self.channels_for_peer(peer.pubkey).values():
       t@@ -1285,8 +1296,7 @@ class LNWallet(LNWorker):
                        if peer:
                            await peer.group.spawn(peer.reestablish_channel(chan))
                        else:
       -                    await self.network.main_taskgroup.spawn(
       -                        self.reestablish_peer_for_given_channel(chan))
       +                    await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
        
            def current_feerate_per_kw(self):
                from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED