tmove event loop construction to daemon - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit c2ecfaf239b584e1a3ba9f10f03be0a5441ceac6 DIR parent ca8eae919f14f13de7e0a3a35ba544d100d4f5de HTML Author: SomberNight <somber.night@protonmail.com> Date: Thu, 1 Nov 2018 16:30:03 +0100 move event loop construction to daemon Diffstat: M electrum/daemon.py | 30 ++++++++++++++++++++++++++++-- M electrum/network.py | 45 +++++++++---------------------- 2 files changed, 41 insertions(+), 34 deletions(-) --- DIR diff --git a/electrum/daemon.py b/electrum/daemon.py t@@ -30,6 +30,7 @@ import traceback import sys import threading from typing import Dict, Optional, Tuple +import re import jsonrpclib t@@ -127,10 +128,12 @@ class Daemon(DaemonThread): if fd is None and listen_jsonrpc: fd, server = get_fd_or_server(config) if fd is None: raise Exception('failed to lock daemon; already running?') + self.create_and_start_event_loop() if config.get('offline'): self.network = None else: self.network = Network(config) + self.network._loop_thread = self._loop_thread self.fx = FxThread(config, self.network) if self.network: self.network.start([self.fx.run]) t@@ -170,7 +173,7 @@ class Daemon(DaemonThread): return True def run_daemon(self, config_options): - asyncio.set_event_loop(self.network.asyncio_loop) # FIXME what if self.network is None? + asyncio.set_event_loop(self.asyncio_loop) config = SimpleConfig(config_options) sub = config.get('subcommand') assert sub in [None, 'start', 'stop', 'status', 'load_wallet', 'close_wallet'] t@@ -265,7 +268,7 @@ class Daemon(DaemonThread): wallet.stop_threads() def run_cmdline(self, config_options): - asyncio.set_event_loop(self.network.asyncio_loop) # FIXME what if self.network is None? + asyncio.set_event_loop(self.asyncio_loop) password = config_options.get('password') new_password = config_options.get('new_password') config = SimpleConfig(config_options) t@@ -297,11 +300,15 @@ class Daemon(DaemonThread): def run(self): while self.is_running(): self.server.handle_request() if self.server else time.sleep(0.1) + # stop network/wallets for k, wallet in self.wallets.items(): wallet.stop_threads() if self.network: self.print_error("shutting down network") self.network.stop() + # stop event loop + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) self.on_stop() def stop(self): t@@ -323,3 +330,22 @@ class Daemon(DaemonThread): except BaseException as e: traceback.print_exc(file=sys.stdout) # app will exit now + + def create_and_start_event_loop(self): + def on_exception(loop, context): + """Suppress spurious messages it appears we cannot control.""" + SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' + 'SSL error in data received') + message = context.get('message') + if message and SUPPRESS_MESSAGE_REGEX.match(message): + return + loop.default_exception_handler(context) + + self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop.set_exception_handler(on_exception) + # self.asyncio_loop.set_debug(1) + self._stop_loop = asyncio.Future() + self._loop_thread = threading.Thread(target=self.asyncio_loop.run_until_complete, + args=(self._stop_loop,), + name='EventLoop') + self._loop_thread.start() DIR diff --git a/electrum/network.py b/electrum/network.py t@@ -168,6 +168,10 @@ class Network(PrintError): def __init__(self, config: SimpleConfig=None): global INSTANCE INSTANCE = self + + self.asyncio_loop = asyncio.get_event_loop() + self._loop_thread = None # type: threading.Thread # set by caller; only used for sanity checks + if config is None: config = {} # Do not use mutables as default values! self.config = SimpleConfig(config) if isinstance(config, dict) else config # type: SimpleConfig t@@ -221,17 +225,8 @@ class Network(PrintError): self.server_queue = None self.proxy = None - self.asyncio_loop = asyncio.get_event_loop() - self.asyncio_loop.set_exception_handler(self.on_event_loop_exception) - #self.asyncio_loop.set_debug(1) - self._run_forever = asyncio.Future() - self._thread = threading.Thread(target=self.asyncio_loop.run_until_complete, - args=(self._run_forever,), - name='Network') - self._thread.start() - def run_from_another_thread(self, coro): - assert self._thread != threading.current_thread(), 'must not be called from network thread' + assert self._loop_thread != threading.current_thread(), 'must not be called from network thread' fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop) return fut.result() t@@ -239,15 +234,6 @@ class Network(PrintError): def get_instance(): return INSTANCE - def on_event_loop_exception(self, loop, context): - """Suppress spurious messages it appears we cannot control.""" - SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' - 'SSL error in data received') - message = context.get('message') - if message and SUPPRESS_MESSAGE_REGEX.match(message): - return - loop.default_exception_handler(context) - def with_recent_servers_lock(func): def func_wrapper(self, *args, **kwargs): with self.recent_servers_lock: t@@ -845,25 +831,20 @@ class Network(PrintError): await asyncio.wait_for(self.main_taskgroup.cancel_remaining(), timeout=2) except (asyncio.TimeoutError, asyncio.CancelledError) as e: self.print_error(f"exc during main_taskgroup cancellation: {repr(e)}") - try: - self.main_taskgroup = None - self.interface = None # type: Interface - self.interfaces = {} # type: Dict[str, Interface] - self.connecting.clear() - self.server_queue = None - if not full_shutdown: - self.trigger_callback('network_updated') - finally: - if full_shutdown: - self._run_forever.set_result(1) + self.main_taskgroup = None + self.interface = None # type: Interface + self.interfaces = {} # type: Dict[str, Interface] + self.connecting.clear() + self.server_queue = None + if not full_shutdown: + self.trigger_callback('network_updated') def stop(self): - assert self._thread != threading.current_thread(), 'must not be called from network thread' + assert self._loop_thread != threading.current_thread(), 'must not be called from network thread' fut = asyncio.run_coroutine_threadsafe(self._stop(full_shutdown=True), self.asyncio_loop) try: fut.result(timeout=2) except (asyncio.TimeoutError, asyncio.CancelledError): pass - self._thread.join(timeout=1) async def _ensure_there_is_a_main_interface(self): if self.is_connected():