URI: 
       tnetwork: implement exponential backoff for retries - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 8baa79be882745375226c1bd1a241c0c168bc2ab
   DIR parent cf1f2ba4dca51f15f485211a530022165a89c4c4
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Tue, 14 Apr 2020 18:28:41 +0200
       
       network: implement exponential backoff for retries
       
       Diffstat:
         M electrum/network.py                 |     106 +++++++++++++++++++------------
       
       1 file changed, 65 insertions(+), 41 deletions(-)
       ---
   DIR diff --git a/electrum/network.py b/electrum/network.py
       t@@ -71,11 +71,12 @@ if TYPE_CHECKING:
        _logger = get_logger(__name__)
        
        
       -
       -NODES_RETRY_INTERVAL = 60
       -SERVER_RETRY_INTERVAL = 10
        NUM_TARGET_CONNECTED_SERVERS = 10
        NUM_RECENT_SERVERS = 20
       +MAX_RETRY_DELAY_FOR_SERVERS = 600  # sec
       +INIT_RETRY_DELAY_FOR_SERVERS = 15  # sec
       +MAX_RETRY_DELAY_FOR_MAIN_SERVER = 10  # sec
       +INIT_RETRY_DELAY_FOR_MAIN_SERVER = 1  # sec
        
        
        def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
       t@@ -245,8 +246,8 @@ class Network(Logger):
            interfaces: Dict[ServerAddr, Interface]
            connecting: Set[ServerAddr]
            server_queue: 'Optional[queue.Queue[ServerAddr]]'
       -    disconnected_servers: Set[ServerAddr]
            default_server: ServerAddr
       +    _recent_servers: List[ServerAddr]
        
            def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
                global _INSTANCE
       t@@ -291,7 +292,7 @@ class Network(Logger):
                self.interfaces_lock = threading.Lock()            # for mutating/iterating self.interfaces
        
                self.server_peers = {}  # returned by interface (servers that the main interface knows about)
       -        self.recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
       +        self._recent_servers = self._read_recent_servers()  # note: needs self.recent_servers_lock
        
                self.banner = ''
                self.donation_address = ''
       t@@ -301,8 +302,7 @@ class Network(Logger):
                util.make_dir(dir_path)
        
                # retry times
       -        self.server_retry_time = time.time()
       -        self.nodes_retry_time = time.time()
       +        self._last_tried_server = {}  # type: Dict[ServerAddr, Tuple[float, int]]  # unix ts, num_attempts
                # the main server we are currently communicating with
                self.interface = None
                self.default_server_changed_event = asyncio.Event()
       t@@ -373,7 +373,7 @@ class Network(Logger):
                if not self.config.path:
                    return
                path = os.path.join(self.config.path, "recent_servers")
       -        s = json.dumps(self.recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
       +        s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
                try:
                    with open(path, "w", encoding='utf-8') as f:
                        f.write(s)
       t@@ -526,7 +526,7 @@ class Network(Logger):
                # hardcoded servers
                out.update(constants.net.DEFAULT_SERVERS)
                # add recent servers
       -        for server in self.recent_servers:
       +        for server in self._recent_servers:
                    port = str(server.port)
                    if server.host in out:
                        out[server.host].update({server.protocol: port})
       t@@ -538,20 +538,52 @@ class Network(Logger):
                return out
        
            def _start_interface(self, server: ServerAddr):
       -        if server not in self.interfaces and server not in self.connecting:
       -            if server == self.default_server:
       -                self.logger.info(f"connecting to {server} as new interface")
       -                self._set_status('connecting')
       -            self.connecting.add(server)
       -            self.server_queue.put(server)
       -
       -    def _start_random_interface(self) -> Optional[ServerAddr]:
       +        if server in self.interfaces or server in self.connecting:
       +            return
       +        if server == self.default_server:
       +            self.logger.info(f"connecting to {server} as new interface")
       +            self._set_status('connecting')
       +        self.connecting.add(server)
       +        self.server_queue.put(server)
       +        # update _last_tried_server
       +        last_time, num_attempts = self._last_tried_server.get(server, (0, 0))
       +        self._last_tried_server[server] = time.time(), num_attempts + 1
       +
       +    def _can_retry_server(self, server: ServerAddr, *, now: float = None) -> bool:
       +        if now is None:
       +            now = time.time()
       +        last_time, num_attempts = self._last_tried_server.get(server, (0, 0))
       +        if server == self.default_server:
       +            delay = min(MAX_RETRY_DELAY_FOR_MAIN_SERVER,
       +                        INIT_RETRY_DELAY_FOR_MAIN_SERVER * 2 ** num_attempts)
       +        else:
       +            delay = min(MAX_RETRY_DELAY_FOR_SERVERS,
       +                        INIT_RETRY_DELAY_FOR_SERVERS * 2 ** num_attempts)
       +        next_time = last_time + delay
       +        return next_time < now
       +
       +    def _get_next_server_to_try(self) -> Optional[ServerAddr]:
       +        now = time.time()
                with self.interfaces_lock:
       -            exclude_set = self.disconnected_servers | set(self.interfaces) | self.connecting
       -        server = pick_random_server(self.get_servers(), protocol=self.protocol, exclude_set=exclude_set)
       -        if server:
       -            self._start_interface(server)
       -        return server
       +            exclude_set = set(self.interfaces) | self.connecting
       +        # first try from recent servers
       +        with self.recent_servers_lock:
       +            recent_servers = list(self._recent_servers)
       +        recent_servers = [s for s in recent_servers if s.protocol == self.protocol]
       +        for server in recent_servers:
       +            if server in exclude_set:
       +                continue
       +            if not self._can_retry_server(server, now=now):
       +                continue
       +            return server
       +        # try all servers we know about
       +        hostmap = self.get_servers()
       +        servers = set(filter_protocol(hostmap, self.protocol)) - exclude_set
       +        for server in servers:
       +            if not self._can_retry_server(server, now=now):
       +                continue
       +            return server
       +        return None
        
            def _set_proxy(self, proxy: Optional[dict]):
                self.proxy = proxy
       t@@ -701,11 +733,12 @@ class Network(Logger):
        
            @with_recent_servers_lock
            def _add_recent_server(self, server):
       +        self._last_tried_server[server] = time.time(), 0
                # list is ordered
       -        if server in self.recent_servers:
       -            self.recent_servers.remove(server)
       -        self.recent_servers.insert(0, server)
       -        self.recent_servers = self.recent_servers[:NUM_RECENT_SERVERS]
       +        if server in self._recent_servers:
       +            self._recent_servers.remove(server)
       +        self._recent_servers.insert(0, server)
       +        self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
                self._save_recent_servers()
        
            async def connection_down(self, interface: Interface):
       t@@ -713,7 +746,6 @@ class Network(Logger):
                We distinguish by whether it is in self.interfaces.'''
                if not interface: return
                server = interface.server
       -        self.disconnected_servers.add(server)
                if server == self.default_server:
                    self._set_status('disconnected')
                await self._close_interface(interface)
       t@@ -752,7 +784,7 @@ class Network(Logger):
                self._add_recent_server(server)
                util.trigger_callback('network_updated')
        
       -    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool:
       +    def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
                # main interface is exempt. this makes switching servers easier
                if iface_to_check.is_main_server():
                    return True
       t@@ -1115,7 +1147,7 @@ class Network(Logger):
                assert not self.interface and not self.interfaces
                assert not self.connecting and not self.server_queue
                self.logger.info('starting network')
       -        self.disconnected_servers = set([])
       +        self._last_tried_server.clear()
                self.protocol = self.default_server.protocol
                self.server_queue = queue.Queue()
                self._set_proxy(deserialize_proxy(self.config.get('proxy')))
       t@@ -1174,17 +1206,12 @@ class Network(Logger):
            async def _ensure_there_is_a_main_interface(self):
                if self.is_connected():
                    return
       -        now = time.time()
                # if auto_connect is set, try a different server
                if self.auto_connect and not self.is_connecting():
                    await self._switch_to_random_interface()
                # if auto_connect is not set, or still no main interface, retry current
                if not self.is_connected() and not self.is_connecting():
       -            if self.default_server in self.disconnected_servers:
       -                if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
       -                    self.disconnected_servers.remove(self.default_server)
       -                    self.server_retry_time = now
       -            else:
       +            if self._can_retry_server(self.default_server):
                        await self.switch_to_interface(self.default_server)
        
            async def _maintain_sessions(self):
       t@@ -1193,14 +1220,11 @@ class Network(Logger):
                        server = self.server_queue.get()
                        await self.taskgroup.spawn(self._run_new_interface(server))
                async def maybe_queue_new_interfaces_to_be_launched_later():
       -            now = time.time()
                    for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
                        # FIXME this should try to honour "healthy spread of connected servers"
       -                self._start_random_interface()
       -            if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
       -                self.logger.info('network: retrying connections')
       -                self.disconnected_servers = set([])
       -                self.nodes_retry_time = now
       +                server = self._get_next_server_to_try()
       +                if server:
       +                    self._start_interface(server)
                async def maintain_healthy_spread_of_connected_servers():
                    with self.interfaces_lock: interfaces = list(self.interfaces.values())
                    random.shuffle(interfaces)