URI: 
       tUnify message IDs between network and interfaces - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 6f1367fea69306452b070f3d4c9b4fff06a97519
   DIR parent ec9eccd0dd8f83a10f547fff57fee230374e28b6
  HTML Author: Neil Booth <kyuupichan@gmail.com>
       Date:   Wed,  9 Sep 2015 20:41:08 +0900
       
       Unify message IDs between network and interfaces
       
       Previously network.py had its own idea of request IDs,
       and each interface had its own which was sent on the wire.
       The interface would jump through hoops to translate one
       tto the other.
       
       This unifies them so that a message ID is passed when
       queueing a request, in addition to the method and params.
       network.py is now solely responsible for message ID management.
       
       Apart from being simpler and clearer, this also should be faster
       as there is much less data structure manipulation and rebuilding
       happening.
       
       Diffstat:
         M lib/interface.py                    |      67 ++++++++++++++-----------------
         M lib/network.py                      |     105 ++++++++++++++++---------------
         M scripts/estimate_fee                |       6 +-----
         M scripts/peers                       |       5 +----
         M scripts/servers                     |       2 +-
         M scripts/txradar                     |       3 +--
         M scripts/util.py                     |      20 ++++++++++----------
       
       7 files changed, 98 insertions(+), 110 deletions(-)
       ---
   DIR diff --git a/lib/interface.py b/lib/interface.py
       t@@ -221,7 +221,6 @@ class Interface(util.PrintError):
                self.pipe.set_timeout(0.0)  # Don't wait for data
                # Dump network messages.  Set at runtime from the console.
                self.debug = False
       -        self.message_id = 0
                self.unsent_requests = []
                self.unanswered_requests = {}
                # Set last ping to zero to ensure immediate ping
       t@@ -241,32 +240,26 @@ class Interface(util.PrintError):
                    self.socket.shutdown(socket.SHUT_RDWR)
                self.socket.close()
        
       -    def queue_request(self, request):
       -        '''Queue a request.'''
       +    def queue_request(self, *args):  # method, params, _id
       +        '''Queue a request, later to be send with send_requests when the
       +        socket is available for writing.
       +        '''
                self.request_time = time.time()
       -        self.unsent_requests.append(request)
       +        self.unsent_requests.append(args)
        
            def send_requests(self):
                '''Sends all queued requests.  Returns False on failure.'''
       -        def copy_request(orig):
       -            # Replace ID after making copy - mustn't change caller's copy
       -            request = orig.copy()
       -            request['id'] = self.message_id
       -            self.message_id += 1
       -            if self.debug:
       -                self.print_error("-->", request, orig.get('id'))
       -            return request
       -
       -        requests_as_sent = map(copy_request, self.unsent_requests)
       +        make_dict = lambda (m, p, i): {'method': m, 'params': p, 'id': i}
       +        wire_requests = map(make_dict, self.unsent_requests)
                try:
       -            self.pipe.send_all(requests_as_sent)
       +            self.pipe.send_all(wire_requests)
                except socket.error, e:
                    self.print_error("socket error:", e)
                    return False
       -        # unanswered_requests stores the original unmodified user
       -        # request, keyed by wire ID
       -        for n, request in enumerate(self.unsent_requests):
       -            self.unanswered_requests[requests_as_sent[n]['id']] = request
       +        for request in self.unsent_requests:
       +            if self.debug:
       +                self.print_error("-->", request)
       +            self.unanswered_requests[request[2]] = request
                self.unsent_requests = []
                return True
        
       t@@ -291,37 +284,39 @@ class Interface(util.PrintError):
        
            def get_responses(self):
                '''Call if there is data available on the socket.  Returns a list of
       -        notifications and a list of responses.  The notifications are
       -        singleton unsolicited responses presumably as a result of
       -        prior subscriptions.  The responses are (request, response)
       -        pairs.  If the connection was closed remotely or the remote
       -        server is misbehaving, the last notification will be None.
       +        (request, response) pairs.  Notifications are singleton
       +        unsolicited responses presumably as a result of prior
       +        subscriptions, so request is None and there is no 'id' member.
       +        Otherwise it is a response, which has an 'id' member and a
       +        corresponding request.  If the connection was closed remotely
       +        or the remote server is misbehaving, a (None, None) will appear.
                '''
       -        notifications, responses = [], []
       +        responses = []
                while True:
                    try:
                        response = self.pipe.get()
                    except util.timeout:
                        break
                    if response is None:
       -                notifications.append(None)
       +                responses.append((None, None))
                        self.closed_remotely = True
                        self.print_error("connection closed remotely")
                        break
                    if self.debug:
                        self.print_error("<--", response)
       -            wire_id = response.pop('id', None)
       -            if wire_id is None:
       -                notifications.append(response)
       -            elif wire_id in self.unanswered_requests:
       -                request = self.unanswered_requests.pop(wire_id)
       -                responses.append((request, response))
       +            wire_id = response.get('id', None)
       +            if wire_id is None:  # Notification
       +                responses.append((None, response))
                    else:
       -                notifications.append(None)
       -                self.print_error("unknown wire ID", wire_id)
       -                break
       +                request = self.unanswered_requests.pop(wire_id, None)
       +                if request:
       +                    responses.append((request, response))
       +                else:
       +                    self.print_error("unknown wire ID", wire_id)
       +                    responses.append(None, None) # Signal
       +                    break
        
       -        return notifications, responses
       +        return responses
        
        
        def check_cert(host, cert):
   DIR diff --git a/lib/network.py b/lib/network.py
       t@@ -254,15 +254,26 @@ class Network(util.DaemonThread):
            def is_up_to_date(self):
                return self.unanswered_requests == {}
        
       -    def queue_request(self, method, params):
       -        self.interface.queue_request({'method': method, 'params': params})
       +    def queue_request(self, method, params, interface=None):
       +        # If you want to queue a request on any interface it must go
       +        # through this function so message ids are properly tracked
       +        if interface is None:
       +            interface = self.interface
       +        message_id = self.message_id
       +        self.message_id += 1
       +        interface.queue_request(method, params, message_id)
       +        return message_id
        
            def send_subscriptions(self):
                # clear cache
                self.cached_responses = {}
                self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
       -        for r in self.unanswered_requests.values():
       -            self.interface.queue_request(r[0])
       +        # Resend unanswered requests
       +        requests = self.unanswered_requests.values()
       +        self.unanswered_requests = {}
       +        for request in requests:
       +            message_id = self.queue_request(request[0], request[1])
       +            self.unanswered_requests[message_id] = request
                for addr in self.subscribed_addresses:
                    self.queue_request('blockchain.address.subscribe', [addr])
                self.queue_request('server.banner', [])
       t@@ -488,38 +499,39 @@ class Network(util.DaemonThread):
                        callback(response)
        
            def process_responses(self, interface):
       -        notifications, responses = interface.get_responses()
       +        responses = interface.get_responses()
        
                for request, response in responses:
       -            # Client ID was given by the daemon or proxy
       -            client_id = request.get('id')
       -            if client_id is not None:
       -                if interface != self.interface:
       -                    continue
       -                _req, callback = self.unanswered_requests.pop(client_id)
       +            callback = None
       +            if request:
       +                method, params, message_id = request
       +                # client requests go through self.send() with a
       +                # callback, are only sent to the current interface,
       +                # and are placed in the unanswered_requests dictionary
       +                client_req = self.unanswered_requests.pop(message_id, None)
       +                if client_req:
       +                    assert interface == self.interface
       +                    callback = client_req[2]
       +                # Copy the request method and params to the response
       +                response['method'] = method
       +                response['params'] = params
                    else:
       -                callback = None
       -            # Copy the request method and params to the response
       -            response['method'] = request.get('method')
       -            response['params'] = request.get('params')
       -            response['id'] = client_id
       +                if not response:  # Closed remotely / misbehaving
       +                    self.connection_down(interface.server)
       +                    break
       +                # Rewrite response shape to match subscription request response
       +                method = response.get('method')
       +                params = response.get('params')
       +                if method == 'blockchain.headers.subscribe':
       +                    response['result'] = params[0]
       +                    response['params'] = []
       +                elif method == 'blockchain.address.subscribe':
       +                    response['params'] = [params[0]]  # addr
       +                    response['result'] = params[1]
       +
       +            # Response is now in canonical form
                    self.process_response(interface, response, callback)
        
       -        for response in notifications:
       -            if not response:  # Closed remotely
       -                self.connection_down(interface.server)
       -                break
       -            # Rewrite response shape to match subscription request response
       -            method = response.get('method')
       -            params = response.get('params')
       -            if method == 'blockchain.headers.subscribe':
       -                response['result'] = params[0]
       -                response['params'] = []
       -            elif method == 'blockchain.address.subscribe':
       -                response['params'] = [params[0]]  # addr
       -                response['result'] = params[1]
       -            self.process_response(interface, response, None)
       -
            def send(self, messages, callback):
                '''Messages is a list of (method, value) tuples'''
                with self.lock:
       t@@ -535,16 +547,11 @@ class Network(util.DaemonThread):
                        for sub in subs:
                            if sub not in self.subscriptions[callback]:
                                self.subscriptions[callback].append(sub)
       -                _id = self.message_id
       -                self.message_id += len(messages)
        
                    unsent = []
                    for message in messages:
       -                method, params = message
       -                request = {'id': _id, 'method': method, 'params': params}
       -                if not self.process_request(request, callback):
       +                if not self.process_request(message, callback):
                            unsent.append(message)
       -                _id += 1
        
                    if unsent:
                        with self.lock:
       t@@ -553,12 +560,10 @@ class Network(util.DaemonThread):
            # FIXME: inline this function
            def process_request(self, request, callback):
                '''Returns true if the request was processed.'''
       -        method = request['method']
       -        params = request['params']
       -        _id = request['id']
       +        method, params = request
        
                if method.startswith('network.'):
       -            out = {'id':_id}
       +            out = {}
                    try:
                        f = getattr(self, method[8:])
                        out['result'] = f(*params)
       t@@ -585,8 +590,8 @@ class Network(util.DaemonThread):
        
                if self.debug:
                    self.print_error("-->", request)
       -        self.unanswered_requests[_id] = request, callback
       -        self.interface.queue_request(request)
       +        message_id = self.queue_request(method, params)
       +        self.unanswered_requests[message_id] = method, params, callback
                return True
        
            def connection_down(self, server):
       t@@ -603,8 +608,7 @@ class Network(util.DaemonThread):
            def new_interface(self, server, socket):
                self.add_recent_server(server)
                self.interfaces[server] = interface = Interface(server, socket)
       -        interface.queue_request({'method': 'blockchain.headers.subscribe',
       -                                 'params': []})
       +        self.queue_request('blockchain.headers.subscribe', [], interface)
                if server == self.default_server:
                    self.switch_to_interface(server)
                self.notify('interfaces')
       t@@ -625,9 +629,8 @@ class Network(util.DaemonThread):
                    if interface.has_timed_out():
                        self.connection_down(interface.server)
                    elif interface.ping_required():
       -                version_req = {'method': 'server.version',
       -                               'params': [ELECTRUM_VERSION, PROTOCOL_VERSION]}
       -                interface.queue_request(version_req)
       +                params = [ELECTRUM_VERSION, PROTOCOL_VERSION]
       +                self.queue_request('server.version', params, interface)
        
                now = time.time()
                # nodes
       t@@ -653,8 +656,7 @@ class Network(util.DaemonThread):
        
            def request_chunk(self, interface, data, idx):
                interface.print_error("requesting chunk %d" % idx)
       -        interface.queue_request({'method':'blockchain.block.get_chunk',
       -                                 'params':[idx]})
       +        self.queue_request('blockchain.block.get_chunk', [idx], interface)
                data['chunk_idx'] = idx
                data['req_time'] = time.time()
        
       t@@ -675,8 +677,7 @@ class Network(util.DaemonThread):
        
            def request_header(self, interface, data, height):
                interface.print_error("requesting header %d" % height)
       -        interface.queue_request({'method':'blockchain.block.get_header',
       -                                 'params':[height]})
       +        self.queue_request('blockchain.block.get_header', [height], interface)
                data['header_height'] = height
                data['req_time'] = time.time()
                if not 'chain' in data:
   DIR diff --git a/scripts/estimate_fee b/scripts/estimate_fee
       t@@ -1,9 +1,5 @@
        #!/usr/bin/env python
        import util, json
        peers = util.get_peers()
       -results = util.send_request(peers, {'method':'blockchain.estimatefee','params':[2]})
       +results = util.send_request(peers, 'blockchain.estimatefee', [2])
        print json.dumps(results, indent=4)
       -
       -
       -
       -
   DIR diff --git a/scripts/peers b/scripts/peers
       t@@ -31,12 +31,9 @@ def analyze(results):
        
        
        peers = util.get_peers()
       -results = util.send_request(peers, {'method':'blockchain.headers.subscribe','params':[]})
       +results = util.send_request(peers, 'blockchain.headers.subscribe', [])
        
        errors = analyze(results).keys()
        
        for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')):
            print "%40s"%n, v.get('block_height'), v.get('utxo_root'), "error" if n in errors else "ok"
       -
       -
       -
   DIR diff --git a/scripts/servers b/scripts/servers
       t@@ -10,7 +10,7 @@ set_verbosity(False)
        
        config = SimpleConfig()
        servers = filter_protocol(protocol = 't')
       -results = util.send_request(servers, {'method':'blockchain.headers.subscribe', 'params':[]})
       +results = util.send_request(servers, 'blockchain.headers.subscribe', [])
        
        d = defaultdict(int)
        
   DIR diff --git a/scripts/txradar b/scripts/txradar
       t@@ -7,7 +7,7 @@ except:
            sys.exit(1)
        
        peers = util.get_peers()
       -results = util.send_request(peers, {'method':'blockchain.transaction.get','params':[tx]})
       +results = util.send_request(peers, 'blockchain.transaction.get', [tx])
        
        r1 = []
        r2 = []
       t@@ -17,4 +17,3 @@ for k, v in results.items():
        
        print "Received %d answers"%len(results)
        print "Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2)))
       -
   DIR diff --git a/scripts/util.py b/scripts/util.py
       t@@ -16,13 +16,15 @@ def get_interfaces(servers, timeout=10):
                    connecting[server] = Connection(server, socket_queue, config.path)
            interfaces = {}
            timeout = time.time() + timeout
       -    while time.time() < timeout:
       +    count = 0
       +    while time.time() < timeout and count < len(servers):
                try:
       -            server, socket = socket_queue.get(True, 1)
       +            server, socket = socket_queue.get(True, 0.3)
                except Queue.Empty:
                    continue
                if socket:
                    interfaces[server] = Interface(server, socket)
       +        count += 1
            return interfaces
        
        def wait_on_interfaces(interfaces, timeout=10):
       t@@ -37,7 +39,7 @@ def wait_on_interfaces(interfaces, timeout=10):
                for interface in wout:
                    interface.send_requests()
                for interface in rout:
       -            notifications, responses = interface.get_responses()
       +            responses = interface.get_responses()
                    if responses:
                        result[interface.server].extend(responses)
            return result
       t@@ -52,25 +54,23 @@ def get_peers():
                return []
            # 2. get list of peers
            interface = interfaces[server]
       -    interface.queue_request({'id':0, 'method': 'server.peers.subscribe',
       -                             'params': []})
       -    responses = wait_on_interfaces(interfaces)
       -    responses = responses.get(server)
       +    interface.queue_request('server.peers.subscribe', [], 0)
       +    responses = wait_on_interfaces(interfaces).get(server)
            if responses:
                response = responses[0][1]  # One response, (req, response) tuple
                peers = parse_servers(response.get('result'))
                peers = filter_protocol(peers,'s')
            return peers
        
       -def send_request(peers, request):
       +def send_request(peers, method, params):
            print "Contacting %d servers"%len(peers)
            interfaces = get_interfaces(peers)
            print "%d servers could be reached" % len(interfaces)
            for peer in peers:
                if not peer in interfaces:
                    print "Connection failed:", peer
       -    for i in interfaces.values():
       -        i.queue_request(request)
       +    for msg_id, i in enumerate(interfaces.values()):
       +        i.queue_request(method, params, msg_id)
            responses = wait_on_interfaces(interfaces)
            for peer in interfaces:
                if not peer in responses: