URI: 
       tnetwork: separate callbacks from unanswered_requests - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 43df795b1ffc2e99b45ff406670da1e44cb60b0d
   DIR parent d8ca881457e27f0a4368e8781bcea33766541728
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Thu, 26 Nov 2015 10:57:43 +0100
       
       network: separate callbacks from unanswered_requests
       
       Diffstat:
         M lib/network.py                      |      51 +++++++++++++++----------------
       
       1 file changed, 25 insertions(+), 26 deletions(-)
       ---
   DIR diff --git a/lib/network.py b/lib/network.py
       t@@ -166,7 +166,9 @@ class Network(util.DaemonThread):
                self.heights = {}
                self.merkle_roots = {}
                self.utxo_roots = {}
       +        # callbacks passed with subscriptions
                self.subscriptions = defaultdict(list)
       +        # callbacks set by the GUI
                self.callbacks = defaultdict(list)
        
                dir_path = os.path.join( self.config.path, 'certs')
       t@@ -454,7 +456,7 @@ class Network(util.DaemonThread):
                self.switch_lagging_interface(i.server)
                self.notify('updated')
        
       -    def process_response(self, interface, response, callback):
       +    def process_response(self, interface, response):
                if self.debug:
                    self.print_error("<--", response)
                error = response.get('error')
       t@@ -485,17 +487,9 @@ class Network(util.DaemonThread):
                elif method == 'blockchain.block.get_header':
                    self.on_get_header(interface, response)
                else:
       -            if callback is None:
       -                params = response['params']
       -                with self.lock:
       -                    for k,v in self.subscriptions.items():
       -                        if (method, params) in v:
       -                            callback = k
       -                            break
       -            if callback is None:
       -                self.print_error("received unexpected notification",
       -                                 method, params)
       -            else:
       +            params = response['params']
       +            callbacks = self.subscriptions.get(repr((method, params)), [])
       +            for callback in callbacks:
                        callback(response)
        
            def process_responses(self, interface):
       t@@ -511,7 +505,7 @@ class Network(util.DaemonThread):
                        client_req = self.unanswered_requests.pop(message_id, None)
                        if client_req:
                            assert interface == self.interface
       -                    callback = client_req[2]
       +
                        # Copy the request method and params to the response
                        response['method'] = method
                        response['params'] = params
       t@@ -534,12 +528,21 @@ class Network(util.DaemonThread):
                            response['result'] = params[1]
        
                    # Response is now in canonical form
       -            self.process_response(interface, response, callback)
       +            self.process_response(interface, response)
        
            def send(self, messages, callback):
                '''Messages is a list of (method, params) tuples'''
                with self.lock:
       -            self.pending_sends.append((messages, callback))
       +            subs = filter(lambda (m,v): m.endswith('.subscribe'), messages)
       +            for method, params in subs:
       +                k = repr((method, params))
       +                l = self.subscriptions.get(k, [])
       +                if callback not in l:
       +                    l.append(callback)
       +                self.subscriptions[k] = l
       +
       +            self.pending_sends += messages
       +
        
            def process_pending_sends(self):
                # Requests needs connectivity.  If we don't have an interface,
       t@@ -551,23 +554,19 @@ class Network(util.DaemonThread):
                    sends = self.pending_sends
                    self.pending_sends = []
        
       -        for messages, callback in sends:
       -            subs = filter(lambda (m,v): m.endswith('.subscribe'), messages)
       -            with self.lock:
       -                for sub in subs:
       -                    if sub not in self.subscriptions[callback]:
       -                        self.subscriptions[callback].append(sub)
       -
       -            for method, params in messages:
       -                message_id = self.queue_request(method, params)
       -                self.unanswered_requests[message_id] = method, params, callback
       +        for method, params in sends:
       +            message_id = self.queue_request(method, params)
       +            self.unanswered_requests[message_id] = method, params
        
            def unsubscribe(self, callback):
                '''Unsubscribe a callback to free object references to enable GC.'''
                # Note: we can't unsubscribe from the server, so if we receive
                # subsequent notifications process_response() will emit a harmless
                # "received unexpected notification" warning
       -        self.subscriptions.pop(callback, None)
       +        with self.lock:
       +            for v in self.subscriptions.values():
       +                if callback in v:
       +                    v.remove(callback)
        
            def connection_down(self, server):
                '''A connection to server either went down, or was never made.