tnetwork: implement exponential backoff for retries - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 8baa79be882745375226c1bd1a241c0c168bc2ab DIR parent cf1f2ba4dca51f15f485211a530022165a89c4c4 HTML Author: SomberNight <somber.night@protonmail.com> Date: Tue, 14 Apr 2020 18:28:41 +0200 network: implement exponential backoff for retries Diffstat: M electrum/network.py | 106 +++++++++++++++++++------------ 1 file changed, 65 insertions(+), 41 deletions(-) --- DIR diff --git a/electrum/network.py b/electrum/network.py t@@ -71,11 +71,12 @@ if TYPE_CHECKING: _logger = get_logger(__name__) - -NODES_RETRY_INTERVAL = 60 -SERVER_RETRY_INTERVAL = 10 NUM_TARGET_CONNECTED_SERVERS = 10 NUM_RECENT_SERVERS = 20 +MAX_RETRY_DELAY_FOR_SERVERS = 600 # sec +INIT_RETRY_DELAY_FOR_SERVERS = 15 # sec +MAX_RETRY_DELAY_FOR_MAIN_SERVER = 10 # sec +INIT_RETRY_DELAY_FOR_MAIN_SERVER = 1 # sec def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]: t@@ -245,8 +246,8 @@ class Network(Logger): interfaces: Dict[ServerAddr, Interface] connecting: Set[ServerAddr] server_queue: 'Optional[queue.Queue[ServerAddr]]' - disconnected_servers: Set[ServerAddr] default_server: ServerAddr + _recent_servers: List[ServerAddr] def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None): global _INSTANCE t@@ -291,7 +292,7 @@ class Network(Logger): self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces self.server_peers = {} # returned by interface (servers that the main interface knows about) - self.recent_servers = self._read_recent_servers() # note: needs self.recent_servers_lock + self._recent_servers = self._read_recent_servers() # note: needs self.recent_servers_lock self.banner = '' self.donation_address = '' t@@ -301,8 +302,7 @@ class Network(Logger): util.make_dir(dir_path) # retry times - self.server_retry_time = time.time() - self.nodes_retry_time = time.time() + self._last_tried_server = {} # type: Dict[ServerAddr, Tuple[float, int]] # unix ts, num_attempts # the main server we are currently communicating with self.interface = None self.default_server_changed_event = asyncio.Event() t@@ -373,7 +373,7 @@ class Network(Logger): if not self.config.path: return path = os.path.join(self.config.path, "recent_servers") - s = json.dumps(self.recent_servers, indent=4, sort_keys=True, cls=MyEncoder) + s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder) try: with open(path, "w", encoding='utf-8') as f: f.write(s) t@@ -526,7 +526,7 @@ class Network(Logger): # hardcoded servers out.update(constants.net.DEFAULT_SERVERS) # add recent servers - for server in self.recent_servers: + for server in self._recent_servers: port = str(server.port) if server.host in out: out[server.host].update({server.protocol: port}) t@@ -538,20 +538,52 @@ class Network(Logger): return out def _start_interface(self, server: ServerAddr): - if server not in self.interfaces and server not in self.connecting: - if server == self.default_server: - self.logger.info(f"connecting to {server} as new interface") - self._set_status('connecting') - self.connecting.add(server) - self.server_queue.put(server) - - def _start_random_interface(self) -> Optional[ServerAddr]: + if server in self.interfaces or server in self.connecting: + return + if server == self.default_server: + self.logger.info(f"connecting to {server} as new interface") + self._set_status('connecting') + self.connecting.add(server) + self.server_queue.put(server) + # update _last_tried_server + last_time, num_attempts = self._last_tried_server.get(server, (0, 0)) + self._last_tried_server[server] = time.time(), num_attempts + 1 + + def _can_retry_server(self, server: ServerAddr, *, now: float = None) -> bool: + if now is None: + now = time.time() + last_time, num_attempts = self._last_tried_server.get(server, (0, 0)) + if server == self.default_server: + delay = min(MAX_RETRY_DELAY_FOR_MAIN_SERVER, + INIT_RETRY_DELAY_FOR_MAIN_SERVER * 2 ** num_attempts) + else: + delay = min(MAX_RETRY_DELAY_FOR_SERVERS, + INIT_RETRY_DELAY_FOR_SERVERS * 2 ** num_attempts) + next_time = last_time + delay + return next_time < now + + def _get_next_server_to_try(self) -> Optional[ServerAddr]: + now = time.time() with self.interfaces_lock: - exclude_set = self.disconnected_servers | set(self.interfaces) | self.connecting - server = pick_random_server(self.get_servers(), protocol=self.protocol, exclude_set=exclude_set) - if server: - self._start_interface(server) - return server + exclude_set = set(self.interfaces) | self.connecting + # first try from recent servers + with self.recent_servers_lock: + recent_servers = list(self._recent_servers) + recent_servers = [s for s in recent_servers if s.protocol == self.protocol] + for server in recent_servers: + if server in exclude_set: + continue + if not self._can_retry_server(server, now=now): + continue + return server + # try all servers we know about + hostmap = self.get_servers() + servers = set(filter_protocol(hostmap, self.protocol)) - exclude_set + for server in servers: + if not self._can_retry_server(server, now=now): + continue + return server + return None def _set_proxy(self, proxy: Optional[dict]): self.proxy = proxy t@@ -701,11 +733,12 @@ class Network(Logger): @with_recent_servers_lock def _add_recent_server(self, server): + self._last_tried_server[server] = time.time(), 0 # list is ordered - if server in self.recent_servers: - self.recent_servers.remove(server) - self.recent_servers.insert(0, server) - self.recent_servers = self.recent_servers[:NUM_RECENT_SERVERS] + if server in self._recent_servers: + self._recent_servers.remove(server) + self._recent_servers.insert(0, server) + self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS] self._save_recent_servers() async def connection_down(self, interface: Interface): t@@ -713,7 +746,6 @@ class Network(Logger): We distinguish by whether it is in self.interfaces.''' if not interface: return server = interface.server - self.disconnected_servers.add(server) if server == self.default_server: self._set_status('disconnected') await self._close_interface(interface) t@@ -752,7 +784,7 @@ class Network(Logger): self._add_recent_server(server) util.trigger_callback('network_updated') - def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool: + def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool: # main interface is exempt. this makes switching servers easier if iface_to_check.is_main_server(): return True t@@ -1115,7 +1147,7 @@ class Network(Logger): assert not self.interface and not self.interfaces assert not self.connecting and not self.server_queue self.logger.info('starting network') - self.disconnected_servers = set([]) + self._last_tried_server.clear() self.protocol = self.default_server.protocol self.server_queue = queue.Queue() self._set_proxy(deserialize_proxy(self.config.get('proxy'))) t@@ -1174,17 +1206,12 @@ class Network(Logger): async def _ensure_there_is_a_main_interface(self): if self.is_connected(): return - now = time.time() # if auto_connect is set, try a different server if self.auto_connect and not self.is_connecting(): await self._switch_to_random_interface() # if auto_connect is not set, or still no main interface, retry current if not self.is_connected() and not self.is_connecting(): - if self.default_server in self.disconnected_servers: - if now - self.server_retry_time > SERVER_RETRY_INTERVAL: - self.disconnected_servers.remove(self.default_server) - self.server_retry_time = now - else: + if self._can_retry_server(self.default_server): await self.switch_to_interface(self.default_server) async def _maintain_sessions(self): t@@ -1193,14 +1220,11 @@ class Network(Logger): server = self.server_queue.get() await self.taskgroup.spawn(self._run_new_interface(server)) async def maybe_queue_new_interfaces_to_be_launched_later(): - now = time.time() for i in range(self.num_server - len(self.interfaces) - len(self.connecting)): # FIXME this should try to honour "healthy spread of connected servers" - self._start_random_interface() - if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: - self.logger.info('network: retrying connections') - self.disconnected_servers = set([]) - self.nodes_retry_time = now + server = self._get_next_server_to_try() + if server: + self._start_interface(server) async def maintain_healthy_spread_of_connected_servers(): with self.interfaces_lock: interfaces = list(self.interfaces.values()) random.shuffle(interfaces)