tnetwork: stop pending connections when stopping network - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 2e61359d50cf4fbee6e0880e33a59e9b66634676 DIR parent 23f56ffa8ab64e9d22f2c1427c5114114c2ff879 HTML Author: SomberNight <somber.night@protonmail.com> Date: Thu, 13 Sep 2018 21:20:55 +0200 network: stop pending connections when stopping network Diffstat: M electrum/network.py | 35 +++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 9 deletions(-) --- DIR diff --git a/electrum/network.py b/electrum/network.py t@@ -224,10 +224,11 @@ class Network(PrintError): self.auto_connect = self.config.get('auto_connect', True) self.connecting = set() self.requested_chunks = set() - self.socket_queue = queue.Queue() + self.server_queue = None + self.server_queue_group = None + self.asyncio_loop = asyncio.get_event_loop() self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) - self.asyncio_loop = asyncio.get_event_loop() @staticmethod def get_instance(): t@@ -417,12 +418,12 @@ class Network(PrintError): @with_interface_lock def start_interface(self, server): - if (not server in self.interfaces and not server in self.connecting): + if server not in self.interfaces and server not in self.connecting: if server == self.default_server: self.print_error("connecting to %s as new interface" % server) self.set_status('connecting') self.connecting.add(server) - self.socket_queue.put(server) + self.server_queue.put(server) def start_random_interface(self): with self.interface_lock: t@@ -482,13 +483,24 @@ class Network(PrintError): @with_interface_lock def start_network(self, protocol: str, proxy: Optional[dict]): assert not self.interface and not self.interfaces - assert not self.connecting and self.socket_queue.empty() + assert not self.connecting and not self.server_queue + assert not self.server_queue_group self.print_error('starting network') self.disconnected_servers = set([]) # note: needs self.interface_lock self.protocol = protocol + self._init_server_queue() self.set_proxy(proxy) self.start_interface(self.default_server) + def _init_server_queue(self): + self.server_queue = queue.Queue() + self.server_queue_group = server_queue_group = TaskGroup() + async def job(): + forever = asyncio.Event() + async with server_queue_group as group: + await group.spawn(forever.wait()) + asyncio.run_coroutine_threadsafe(job(), self.asyncio_loop) + @with_interface_lock def stop_network(self): self.print_error("stopping network") t@@ -499,8 +511,13 @@ class Network(PrintError): assert self.interface is None assert not self.interfaces self.connecting.clear() + self._stop_server_queue() + + def _stop_server_queue(self): # Get a new queue - no old pending connections thanks! - self.socket_queue = queue.Queue() + self.server_queue = None + asyncio.run_coroutine_threadsafe(self.server_queue_group.cancel_remaining(), self.asyncio_loop) + self.server_queue_group = None def set_parameters(self, net_params: NetworkParameters): proxy = net_params.proxy t@@ -768,9 +785,9 @@ class Network(PrintError): async def maintain_sessions(self): while True: - while self.socket_queue.qsize() > 0: - server = self.socket_queue.get() - asyncio.get_event_loop().create_task(self.new_interface(server)) + while self.server_queue.qsize() > 0: + server = self.server_queue.get() + await self.server_queue_group.spawn(self.new_interface(server)) remove = [] for k, i in self.interfaces.items(): if i.fut.done() and not i.exception: