URI: 
       tsynchronizer: fix rare race where synchronizer could get stuck - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 1dbff51fce762c212913c2c446d6e5945f283f75
   DIR parent 34413a9c306a297c833ed2b5e267e61ca297a225
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Fri,  5 Mar 2021 20:46:41 +0100
       
       synchronizer: fix rare race where synchronizer could get stuck
       
       Diffstat:
         M electrum/lnverifier.py              |       5 +++--
         M electrum/synchronizer.py            |       5 +++--
         M electrum/util.py                    |      19 +++++++++++++------
         M electrum/verifier.py                |       5 +++--
       
       4 files changed, 22 insertions(+), 12 deletions(-)
       ---
   DIR diff --git a/electrum/lnverifier.py b/electrum/lnverifier.py
       t@@ -74,8 +74,9 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
                    self.unverified_channel_info[short_channel_id] = msg
                    return True
        
       -    async def _start_tasks(self):
       -        async with self.taskgroup as group:
       +    async def _run_tasks(self, *, taskgroup):
       +        await super()._run_tasks(taskgroup=taskgroup)
       +        async with taskgroup as group:
                    await group.spawn(self.main)
        
            async def main(self):
   DIR diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py
       t@@ -74,9 +74,10 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
                self.add_queue = asyncio.Queue()
                self.status_queue = asyncio.Queue()
        
       -    async def _start_tasks(self):
       +    async def _run_tasks(self, *, taskgroup):
       +        await super()._run_tasks(taskgroup=taskgroup)
                try:
       -            async with self.taskgroup as group:
       +            async with taskgroup as group:
                        await group.spawn(self.send_subscriptions())
                        await group.spawn(self.handle_status())
                        await group.spawn(self.main())
   DIR diff --git a/electrum/util.py b/electrum/util.py
       t@@ -46,6 +46,7 @@ from ipaddress import IPv4Address, IPv6Address
        import random
        import secrets
        import functools
       +from abc import abstractmethod, ABC
        
        import attr
        import aiohttp
       t@@ -1163,7 +1164,7 @@ class SilentTaskGroup(TaskGroup):
                return super().spawn(*args, **kwargs)
        
        
       -class NetworkJobOnDefaultServer(Logger):
       +class NetworkJobOnDefaultServer(Logger, ABC):
            """An abstract base class for a job that runs on the main network
            interface. Every time the main interface changes, the job is
            restarted, and some of its internals are reset.
       t@@ -1179,8 +1180,10 @@ class NetworkJobOnDefaultServer(Logger):
                self._network_request_semaphore = asyncio.Semaphore(100)
        
                self._reset()
       -        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
       +        # every time the main interface changes, restart:
                register_callback(self._restart, ['default_server_changed'])
       +        # also schedule a one-off restart now, as there might already be a main interface:
       +        asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
        
            def _reset(self):
                """Initialise fields. Called every time the underlying
       t@@ -1190,13 +1193,17 @@ class NetworkJobOnDefaultServer(Logger):
        
            async def _start(self, interface: 'Interface'):
                self.interface = interface
       -        await interface.taskgroup.spawn(self._start_tasks)
       +        await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup))
        
       -    async def _start_tasks(self):
       -        """Start tasks in self.taskgroup. Called every time the underlying
       +    @abstractmethod
       +    async def _run_tasks(self, *, taskgroup: TaskGroup) -> None:
       +        """Start tasks in taskgroup. Called every time the underlying
                server connection changes.
                """
       -        raise NotImplementedError()  # implemented by subclasses
       +        # If self.taskgroup changed, don't start tasks. This can happen if we have
       +        # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
       +        if taskgroup != self.taskgroup:
       +            raise asyncio.CancelledError()
        
            async def stop(self):
                unregister_callback(self._restart)
   DIR diff --git a/electrum/verifier.py b/electrum/verifier.py
       t@@ -58,8 +58,9 @@ class SPV(NetworkJobOnDefaultServer):
                self.merkle_roots = {}  # txid -> merkle root (once it has been verified)
                self.requested_merkle = set()  # txid set of pending requests
        
       -    async def _start_tasks(self):
       -        async with self.taskgroup as group:
       +    async def _run_tasks(self, *, taskgroup):
       +        await super()._run_tasks(taskgroup=taskgroup)
       +        async with taskgroup as group:
                    await group.spawn(self.main)
        
            def diagnostic_name(self):