tnetwork.best_effort_reliable: use curio APIs instead of asyncio - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 24e4aa3ab91cd1d7ace8c1996ce86249ed4c7492 DIR parent 65d263801a7e419c953db38eb084deb3ea959ef1 HTML Author: SomberNight <somber.night@protonmail.com> Date: Fri, 12 Mar 2021 17:53:13 +0100 network.best_effort_reliable: use curio APIs instead of asyncio Diffstat: M electrum/network.py | 42 ++++++++++++++++---------------- 1 file changed, 21 insertions(+), 21 deletions(-) --- DIR diff --git a/electrum/network.py b/electrum/network.py t@@ -37,6 +37,7 @@ import traceback import concurrent from concurrent import futures import copy +import functools import aiorpcx from aiorpcx import TaskGroup, ignore_after t@@ -829,40 +830,39 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): return True def best_effort_reliable(func): + @functools.wraps(func) async def make_reliable_wrapper(self: 'Network', *args, **kwargs): for i in range(10): iface = self.interface # retry until there is a main interface if not iface: - try: - await asyncio.wait_for(self.default_server_changed_event.wait(), 1) - except asyncio.TimeoutError: - pass + async with ignore_after(1): + await self.default_server_changed_event.wait() continue # try again assert iface.ready.done(), "interface not ready yet" # try actual request - success_fut = asyncio.ensure_future(func(self, *args, **kwargs)) - await asyncio.wait([success_fut, iface.got_disconnected.wait()], return_when=asyncio.FIRST_COMPLETED) - if success_fut.done() and not success_fut.cancelled(): - if success_fut.exception(): - try: - raise success_fut.exception() - except RequestTimedOut: - await iface.close() - await iface.got_disconnected.wait() - continue # try again - except RequestCorrupted as e: - # TODO ban server? - iface.logger.exception(f"RequestCorrupted: {e}") - await iface.close() - await iface.got_disconnected.wait() - continue # try again - return success_fut.result() + try: + async with TaskGroup(wait=any) as group: + task = await group.spawn(func(self, *args, **kwargs)) + await group.spawn(iface.got_disconnected.wait()) + except RequestTimedOut: + await iface.close() + await iface.got_disconnected.wait() + continue # try again + except RequestCorrupted as e: + # TODO ban server? + iface.logger.exception(f"RequestCorrupted: {e}") + await iface.close() + await iface.got_disconnected.wait() + continue # try again + if task.done() and not task.cancelled(): + return task.result() # otherwise; try again raise BestEffortRequestFailed('no interface to do request on... gave up.') return make_reliable_wrapper def catch_server_exceptions(func): + @functools.wraps(func) async def wrapper(self, *args, **kwargs): try: return await func(self, *args, **kwargs)