URI: 
       tupdate to aiorpcx 0.17 - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 51e0672da6ed4d8d8b078b0aad8cd515d59aaeef
   DIR parent d92a4e8365c48b1531ad8fb561a804a0ff91fcb2
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Tue, 30 Apr 2019 21:24:39 +0200
       
       update to aiorpcx 0.17
       
       Diffstat:
         M contrib/requirements/requirements.… |       2 +-
         M electrum/interface.py               |      71 +++++++++++++++++--------------
       
       2 files changed, 40 insertions(+), 33 deletions(-)
       ---
   DIR diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt
       t@@ -5,7 +5,7 @@ protobuf
        dnspython
        jsonrpclib-pelix
        qdarkstyle<2.6
       -aiorpcx>=0.9,<0.11
       +aiorpcx>=0.17,<0.18
        aiohttp>=3.3.0
        aiohttp_socks
        certifi
   DIR diff --git a/electrum/interface.py b/electrum/interface.py
       t@@ -34,6 +34,7 @@ from ipaddress import IPv4Network, IPv6Network, ip_address
        
        import aiorpcx
        from aiorpcx import RPCSession, Notification
       +from aiorpcx.curio import timeout_after, TaskTimeout
        import certifi
        
        from .util import PrintError, ignore_exceptions, log_exceptions, bfh, SilentTaskGroup
       t@@ -72,10 +73,10 @@ class NotificationSession(RPCSession):
                super(NotificationSession, self).__init__(*args, **kwargs)
                self.subscriptions = defaultdict(list)
                self.cache = {}
       -        self.in_flight_requests_semaphore = asyncio.Semaphore(100)
                self.default_timeout = NetworkTimeout.Generic.NORMAL
                self._msg_counter = 0
                self.interface = None  # type: Optional[Interface]
       +        self.cost_hard_limit = 0  # disable aiorpcx resource limits
        
            def _get_and_inc_msg_counter(self):
                # runs in event loop thread, no need for lock
       t@@ -84,35 +85,40 @@ class NotificationSession(RPCSession):
        
            async def handle_request(self, request):
                self.maybe_log(f"--> {request}")
       -        # note: if server sends malformed request and we raise, the superclass
       -        # will catch the exception, count errors, and at some point disconnect
       -        if isinstance(request, Notification):
       -            params, result = request.args[:-1], request.args[-1]
       -            key = self.get_hashable_key_for_rpc_call(request.method, params)
       -            if key in self.subscriptions:
       -                self.cache[key] = result
       -                for queue in self.subscriptions[key]:
       -                    await queue.put(request.args)
       +        try:
       +            if isinstance(request, Notification):
       +                params, result = request.args[:-1], request.args[-1]
       +                key = self.get_hashable_key_for_rpc_call(request.method, params)
       +                if key in self.subscriptions:
       +                    self.cache[key] = result
       +                    for queue in self.subscriptions[key]:
       +                        await queue.put(request.args)
       +                else:
       +                    raise Exception(f'unexpected notification')
                    else:
       -                raise Exception('unexpected request: {}'.format(repr(request)))
       +                raise Exception(f'unexpected request. not a notification')
       +        except Exception as e:
       +            self.interface.print_error(f"error handling request {request}. exc: {repr(e)}")
       +            await self.close()
        
            async def send_request(self, *args, timeout=None, **kwargs):
       -        # note: the timeout starts after the request touches the wire!
       -        if timeout is None:
       -            timeout = self.default_timeout
       -        # note: the semaphore implementation guarantees no starvation
       -        async with self.in_flight_requests_semaphore:
       -            msg_id = self._get_and_inc_msg_counter()
       -            self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
       -            try:
       -                response = await asyncio.wait_for(
       -                    super().send_request(*args, **kwargs),
       -                    timeout)
       -            except asyncio.TimeoutError as e:
       -                raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
       -            else:
       -                self.maybe_log(f"--> {response} (id: {msg_id})")
       -                return response
       +        # note: semaphores/timeouts/backpressure etc are handled by
       +        # aiorpcx. the timeout arg here in most cases should not be set
       +        msg_id = self._get_and_inc_msg_counter()
       +        self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
       +        try:
       +            response = await asyncio.wait_for(
       +                super().send_request(*args, **kwargs),
       +                timeout)
       +        except (TaskTimeout, asyncio.TimeoutError) as e:
       +            raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
       +        else:
       +            self.maybe_log(f"--> {response} (id: {msg_id})")
       +            return response
       +
       +    def set_default_timeout(self, timeout):
       +        self.sent_request_timeout = timeout
       +        self.max_send_delay = timeout
        
            async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
                # note: until the cache is written for the first time,
       t@@ -212,10 +218,11 @@ class Interface(PrintError):
                        auth = None
                    else:
                        auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
       +            addr = "{}:{}".format(proxy['host'], proxy['port'])
                    if proxy['mode'] == "socks4":
       -                self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS4a, auth)
       +                self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS4a, auth)
                    elif proxy['mode'] == "socks5":
       -                self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth)
       +                self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS5, auth)
                    else:
                        raise NotImplementedError  # http proxy not available with aiorpcx
                else:
       t@@ -408,7 +415,7 @@ class Interface(PrintError):
                                             ssl=sslc, proxy=self.proxy) as session:
                    self.session = session  # type: NotificationSession
                    self.session.interface = self
       -            self.session.default_timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
       +            self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
                    try:
                        ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
                    except aiorpcx.jsonrpc.RPCError as e:
       t@@ -620,9 +627,9 @@ class Interface(PrintError):
            def ip_addr(self) -> Optional[str]:
                session = self.session
                if not session: return None
       -        peer_addr = session.peer_address()
       +        peer_addr = session.remote_address()
                if not peer_addr: return None
       -        return peer_addr[0]
       +        return str(peer_addr.host)
        
            def bucket_based_on_ipaddress(self) -> str:
                def do_bucket():