URI: 
       tredo inter-thread communication using pipes - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 035ecbc7cd41036bf96dd8a9b6b57d09b7c4eabc
   DIR parent bd3bfb5e535b59f2ad2f0450f4e5a2ce600f2a7b
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Sun, 27 Jul 2014 11:33:02 +0200
       
       redo inter-thread communication using pipes
       
       Diffstat:
         M electrum                            |      11 ++++-------
         M lib/daemon.py                       |     187 ++++++++++---------------------
         M lib/network.py                      |      98 +++++++++++++++++++++++--------
         M lib/network_proxy.py                |     100 +++++++++++++++++--------------
         M lib/synchronizer.py                 |       2 +-
         M lib/util.py                         |      83 +++++++++++++++++++++++++++++++
         M lib/wallet.py                       |       2 +-
         M scripts/block_headers               |      30 ++++++++++++++++++++----------
       
       8 files changed, 298 insertions(+), 215 deletions(-)
       ---
   DIR diff --git a/electrum b/electrum
       t@@ -43,6 +43,7 @@ if is_local:
            sys.path.append('packages')
        
        
       +from electrum import util
        from electrum import SimpleConfig, Network, Wallet, WalletStorage, NetworkProxy, Commands, known_commands, pick_random_server
        from electrum.util import print_msg, print_stderr, print_json, set_verbosity
        
       t@@ -150,7 +151,7 @@ def do_start_daemon():
            import subprocess
            logfile = open(os.path.join(config.path, 'daemon.log'),'w')
            p = subprocess.Popen([__file__,"daemon"], stderr=logfile, stdout=logfile, close_fds=True)
       -    print "starting daemon (PID %d)"%p.pid
       +    print_stderr("starting daemon (PID %d)"%p.pid)
        
        
        def daemon_socket(start_daemon=True):
       t@@ -222,12 +223,8 @@ if __name__ == '__main__':
                # network interface
                if not options.offline:
                    s = daemon_socket(start_daemon=options.daemon)
       -            if s:
       -                network = NetworkProxy(s, config)
       -                network.start()
       -            else:
       -                network = Network(config)
       -                network.start()
       +            network = NetworkProxy(s, config)
       +            network.start()
                else:
                    network = None
        
   DIR diff --git a/lib/daemon.py b/lib/daemon.py
       t@@ -24,140 +24,72 @@ import threading
        import traceback
        import json
        import Queue
       +
       +import util
        from network import Network
        from util import print_error, print_stderr, parse_json
        from simple_config import SimpleConfig
        
        
       -"""
       -The Network object is not aware of clients/subscribers
       -It only does subscribe/unsubscribe to addresses
       -Which client has wich address is managed by the daemon
       -Network also reports status changes
       -"""
        
        DAEMON_PORT=8001
        
        
        
       -
       -
        class ClientThread(threading.Thread):
       -    # read messages from client (socket), and sends them to Network
       -    # responses are sent back on the same socket
        
       -    def __init__(self, server, network, s):
       +    def __init__(self, server, s):
                threading.Thread.__init__(self)
                self.server = server
                self.daemon = True
       -        self.s = s
       -        self.s.settimeout(0.1)
       -        self.network = network
       -        self.queue = Queue.Queue()
       -        self.unanswered_requests = {}
       -        self.debug = False
       +        self.client_pipe = util.SocketPipe(s)
       +        self.daemon_pipe = util.QueuePipe(send_queue = self.server.network.requests_queue)
                self.server.add_client(self)
        
       -
       -    def run(self):
       -
       -        message = ''
       -        while True:
       -            try:
       -                self.send_responses()
       -            except socket.error:
       -                break
       -
       +    def reading_thread(self):
       +        while self.running:
                    try:
       -                data = self.s.recv(1024)
       -            except socket.timeout:
       +                request = self.client_pipe.get()
       +            except util.timeout:
                        continue
       -            except:
       -                data = ''
       -            if not data:
       +            if request is None:
       +                self.running = False
                        break
       -            message += data
       -            while True:
       -                cmd, message = parse_json(message)
       -                if not cmd:
       -                    break
       -                self.process(cmd)
       -
       -        self.server.remove_client(self)
        
       +            if request.get('method') == 'daemon.stop':
       +                self.server.stop()
       +                continue
        
       +            self.daemon_pipe.send(request)
        
       -
       -    def process(self, request):
       -        if self.debug: 
       -            print_error("<--", request)
       -        method = request['method']
       -        params = request['params']
       -        _id = request['id']
       -
       -        if method == ('daemon.stop'):
       -            self.server.stop()
       -            return
       -
       -        if method.startswith('network.'):
       -            out = {'id':_id}
       +    def run(self):
       +        self.running = True
       +        threading.Thread(target=self.reading_thread).start()
       +        while self.running:
                    try:
       -                f = getattr(self.network, method[8:])
       -            except AttributeError:
       -                out['error'] = "unknown method"
       +                response = self.daemon_pipe.get()
       +            except util.timeout:
       +                continue
                    try:
       -                out['result'] = f(*params)
       -            except BaseException as e:
       -                out['error'] = str(e)
       -                print_error("network error", str(e))
       -
       -            self.queue.put(out)
       -            return
       -
       -        def cb(i,r):
       -            _id = r.get('id')
       -            if _id is not None:
       -                my_id = self.unanswered_requests.pop(_id)
       -                r['id'] = my_id
       -            self.queue.put(r)
       -
       -        try:
       -            new_id = self.network.interface.send([(method, params)], cb) [0]
       -        except Exception as e:
       -            self.queue.put({'id':_id, 'error':str(e)}) 
       -            print_error("network interface error", str(e))
       -            return
       -
       -        self.unanswered_requests[new_id] = _id
       +                self.client_pipe.send(response)
       +            except socket.error:
       +                self.running = False
       +                break
       +        self.server.remove_client(self)
        
        
       -    def send_responses(self):
       -        while True:
       -            try:
       -                r = self.queue.get_nowait()
       -            except Queue.Empty:
       -                break
       -            out = json.dumps(r) + '\n'
       -            while out:
       -                n = self.s.send(out)
       -                out = out[n:]
       -            if self.debug: 
       -                print_error("-->", r)
        
        
        
        class NetworkServer:
        
            def __init__(self, config):
       +        self.config = config
                self.network = Network(config)
       -        self.network.trigger_callback = self.trigger_callback
       -        self.network.start()
       -        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       -        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       -        self.daemon_port = config.get('daemon_port', DAEMON_PORT)
       -        self.socket.bind(('', self.daemon_port))
       -        self.socket.listen(5)
       -        self.socket.settimeout(1)
       +        # network sends responses on that queue
       +        self.network_queue = Queue.Queue()
       +        self.network.start(self.network_queue)
       +
                self.running = False
                # daemon terminates after period of inactivity
                self.timeout = config.get('daemon_timeout', 5*60)
       t@@ -165,16 +97,21 @@ class NetworkServer:
        
                # each GUI is a client of the daemon
                self.clients = []
       -        # daemon needs to know which client subscribed to which address
       +        # todo: the daemon needs to know which client subscribed to which address
       +
       +    def is_running(self):
       +        with self.lock:
       +            return self.running
        
            def stop(self):
       +        self.network.stop()
                with self.lock:
                    self.running = False
        
            def add_client(self, client):
                for key in ['status','banner','updated','servers','interfaces']:
       -            value = self.get_status_value(key)
       -            client.queue.put({'method':'network.status', 'params':[key, value]})
       +            value = self.network.get_status_value(key)
       +            client.daemon_pipe.get_queue.put({'method':'network.status', 'params':[key, value]})
                with self.lock:
                    self.clients.append(client)
        
       t@@ -184,27 +121,28 @@ class NetworkServer:
                    self.clients.remove(client)
                print_error("client quit:", len(self.clients))
        
       -    def get_status_value(self, key):
       -        if key == 'status':
       -            value = self.network.connection_status
       -        elif key == 'banner':
       -            value = self.network.banner
       -        elif key == 'updated':
       -            value = (self.network.get_local_height(), self.network.get_server_height())
       -        elif key == 'servers':
       -            value = self.network.get_servers()
       -        elif key == 'interfaces':
       -            value = self.network.get_interfaces()
       -        return value
       -
       -    def trigger_callback(self, key):
       -        value = self.get_status_value(key)
       -        print_error("daemon trigger callback", key, len(self.clients))
       -        for client in self.clients:
       -            client.queue.put({'method':'network.status', 'params':[key, value]})
       +
        
            def main_loop(self):
                self.running = True
       +        threading.Thread(target=self.listen_thread).start()
       +        while self.is_running():
       +            try:
       +                response = self.network_queue.get(timeout=0.1)
       +            except Queue.Empty:
       +                continue
       +            for client in self.clients:
       +                client.daemon_pipe.get_queue.put(response)
       +
       +        print_error("Daemon exiting")
       +
       +    def listen_thread(self):
       +        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       +        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       +        self.daemon_port = self.config.get('daemon_port', DAEMON_PORT)
       +        self.socket.bind(('', self.daemon_port))
       +        self.socket.listen(5)
       +        self.socket.settimeout(1)
                t = time.time()
                while self.running:
                    try:
       t@@ -218,11 +156,10 @@ class NetworkServer:
                            t = time.time()
                        continue
                    t = time.time()
       -            client = ClientThread(self, self.network, connection)
       +            client = ClientThread(self, connection)
                    client.start()
       -        print_error("Daemon exiting")
       -
       -
       +        self.stop()
       +        print_error("listen thread exiting")
        
        
        if __name__ == '__main__':
   DIR diff --git a/lib/network.py b/lib/network.py
       t@@ -85,7 +85,6 @@ class Network(threading.Thread):
                self.blockchain = Blockchain(self.config, self)
                self.interfaces = {}
                self.queue = Queue.Queue()
       -        self.callbacks = {}
                self.protocol = self.config.get('protocol','s')
                self.running = False
        
       t@@ -118,6 +117,9 @@ class Network(threading.Thread):
        
                self.connection_status = 'connecting'
        
       +        self.requests_queue = Queue.Queue()
       +        self.unanswered_requests = {}
       +
            def get_server_height(self):
                return self.heights.get(self.default_server,0)
        
       t@@ -162,21 +164,22 @@ class Network(threading.Thread):
                else:
                    return False
        
       -
       -    def register_callback(self, event, callback):
       -        with self.lock:
       -            if not self.callbacks.get(event):
       -                self.callbacks[event] = []
       -            self.callbacks[event].append(callback)
       -
       -
       -    def trigger_callback(self, event):
       -        # note: this method is overwritten by daemon
       -        with self.lock:
       -            callbacks = self.callbacks.get(event,[])[:]
       -        if callbacks:
       -            [callback() for callback in callbacks]
       -
       +    def get_status_value(self, key):
       +        if key == 'status':
       +            value = self.connection_status
       +        elif key == 'banner':
       +            value = self.banner
       +        elif key == 'updated':
       +            value = (self.get_local_height(), self.get_server_height())
       +        elif key == 'servers':
       +            value = self.get_servers()
       +        elif key == 'interfaces':
       +            value = self.get_interfaces()
       +        return value
       +
       +    def trigger_callback(self, key):
       +        value = self.get_status_value(key)
       +        self.response_queue.put({'method':'network.status', 'params':[key, value]})
        
            def random_server(self):
                choice_list = []
       t@@ -234,9 +237,13 @@ class Network(threading.Thread):
                for i in range(self.num_server):
                    self.start_random_interface()
                    
       -    def start(self):
       +    def start(self, response_queue):
       +        self.running = True
       +        self.response_queue = response_queue
                self.start_interfaces()
                threading.Thread.start(self)
       +        threading.Thread(target=self.process_thread).start()
       +        self.blockchain.start()
        
            def set_parameters(self, host, port, protocol, proxy, auto_connect):
                self.config.set_key('auto_cycle', auto_connect, True)
       t@@ -321,16 +328,55 @@ class Network(threading.Thread):
                        print_error( "Server is lagging", blockchain_height, self.get_server_height())
                        if self.config.get('auto_cycle'):
                            self.set_server(i.server)
       -        
                self.trigger_callback('updated')
        
        
       -    def run(self):
       -        self.blockchain.start()
       +    def process_thread(self):
       +        while self.is_running():
       +            try:
       +                request = self.requests_queue.get(timeout=0.1)
       +            except Queue.Empty:
       +                continue
       +            self.process(request)
        
       -        with self.lock:
       -            self.running = True
       +    def process(self, request):
       +        method = request['method']
       +        params = request['params']
       +        _id = request['id']
       +
       +        if method.startswith('network.'):
       +            out = {'id':_id}
       +            try:
       +                f = getattr(self, method[8:])
       +            except AttributeError:
       +                out['error'] = "unknown method"
       +            try:
       +                out['result'] = f(*params)
       +            except BaseException as e:
       +                out['error'] = str(e)
       +                print_error("network error", str(e))
       +
       +            self.response_queue.put(out)
       +            return
       +
       +        def cb(i,r):
       +            _id = r.get('id')
       +            if _id is not None:
       +                my_id = self.unanswered_requests.pop(_id)
       +                r['id'] = my_id
       +            self.response_queue.put(r)
       +
       +        try:
       +            new_id = self.interface.send([(method, params)], cb) [0]
       +        except Exception as e:
       +            self.response_queue.put({'id':_id, 'error':str(e)}) 
       +            print_error("network interface error", str(e))
       +            return
        
       +        self.unanswered_requests[new_id] = _id
       +
       +
       +    def run(self):
                while self.is_running():
                    try:
                        i = self.queue.get(timeout = 30 if self.interfaces else 3)
       t@@ -382,10 +428,8 @@ class Network(threading.Thread):
                    if self.server_is_lagging() and self.config.get('auto_cycle'):
                        print_error( "Server lagging, stopping interface")
                        self.stop_interface()
       -
                    self.trigger_callback('updated')
        
       -
            def on_peers(self, i, r):
                if not r: return
                self.irc_servers = parse_servers(r.get('result'))
       t@@ -396,10 +440,12 @@ class Network(threading.Thread):
                self.trigger_callback('banner')
        
            def stop(self):
       -        with self.lock: self.running = False
       +        with self.lock:
       +            self.running = False
        
            def is_running(self):
       -        with self.lock: return self.running
       +        with self.lock:
       +            return self.running
        
            
            def synchronous_get(self, requests, timeout=100000000):
   DIR diff --git a/lib/network_proxy.py b/lib/network_proxy.py
       t@@ -24,25 +24,23 @@ import threading
        import traceback
        import json
        import Queue
       +
       +import util
        from network import Network
        from util import print_error, print_stderr, parse_json
        from simple_config import SimpleConfig
       -
        from daemon import NetworkServer, DAEMON_PORT
        
        
        
       -
       -
        class NetworkProxy(threading.Thread):
        
            def __init__(self, socket, config=None):
       +
                if config is None:
                    config = {}  # Do not use mutables as default arguments!
                threading.Thread.__init__(self)
                self.config = SimpleConfig(config) if type(config) == type({}) else config
       -        self.socket = socket
       -        self.socket.settimeout(0.1)
                self.message_id = 0
                self.unanswered_requests = {}
                self.subscriptions = {}
       t@@ -53,6 +51,14 @@ class NetworkProxy(threading.Thread):
                self.running = True
                self.daemon = True
        
       +        if socket:
       +            self.pipe = util.SocketPipe(socket)
       +            self.network = None
       +        else:
       +            self.network = Network(config)
       +            self.pipe = util.QueuePipe(send_queue=self.network.requests_queue)
       +            self.network.start(self.pipe.get_queue)
       +
                # status variables
                self.status = 'connecting'
                self.servers = {}
       t@@ -65,35 +71,23 @@ class NetworkProxy(threading.Thread):
                return self.running
        
            def run(self):
       -        # read responses and trigger callbacks
       -        message = ''
                while self.is_running():
                    try:
       -                data = self.socket.recv(1024)
       -            except socket.timeout:
       +                response = self.pipe.get()
       +            except util.timeout:
                        continue
       -            except:
       -                data = ''
       -            if not data:
       +            if response is None:
                        break
       -            message += data
       -            while True:
       -                response, message = parse_json(message)
       -                if response is not None: 
       -                    self.process(response)
       -                else:
       -                    break
       -        # fixme: server does not detect if we don't call shutdown
       -        self.socket.shutdown(2)
       -        self.socket.close()
       +            self.process(response)
       +
                print_error("NetworkProxy thread terminating")
       +        self.stop()
        
            def process(self, response):
                if self.debug: 
                    print_error("<--", response)
        
                if response.get('method') == 'network.status':
       -            #print_error("<--", response)
                    key, value = response.get('params')
                    if key == 'status':
                        self.status = value
       t@@ -109,48 +103,63 @@ class NetworkProxy(threading.Thread):
                    return
        
                msg_id = response.get('id')
       -        with self.lock: 
       -            method, params, callback = self.unanswered_requests.pop(msg_id)
       -
                result = response.get('result')
       -        callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
       -
       +        if msg_id is not None:
       +            with self.lock:
       +                method, params, callback = self.unanswered_requests.pop(msg_id)
       +        else:
       +            method = response.get('method')
       +            params = response.get('params')
       +            with self.lock:
       +                for k,v in self.subscriptions.items():
       +                    if (method, params) in v:
       +                        callback = k
       +                        break
       +                else:
       +                    print_error( "received unexpected notification", method, params)
       +                    return
        
       -    def subscribe(self, messages, callback):
       -        # detect if it is a subscription
       -        with self.lock:
       -            if self.subscriptions.get(callback) is None: 
       -                self.subscriptions[callback] = []
       -            for message in messages:
       -                if message not in self.subscriptions[callback]:
       -                    self.subscriptions[callback].append(message)
       +        callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
        
       -        self.send( messages, callback )
        
        
            def send(self, messages, callback):
                """return the ids of the requests that we sent"""
       +
       +        # detect subscriptions
       +        sub = []
       +        for message in messages:
       +            m, v = message
       +            if m[-10:] == '.subscribe':
       +                sub.append(message)
       +        if sub:
       +            with self.lock:
       +                if self.subscriptions.get(callback) is None: 
       +                    self.subscriptions[callback] = []
       +                for message in sub:
       +                    if message not in self.subscriptions[callback]:
       +                        self.subscriptions[callback].append(message)
       +
                with self.lock:
       -            out = ''
       +            requests = []
                    ids = []
                    for m in messages:
                        method, params = m 
                        request = { 'id':self.message_id, 'method':method, 'params':params }
                        self.unanswered_requests[self.message_id] = method, params, callback
                        ids.append(self.message_id)
       +                requests.append(request)
                        if self.debug: 
                            print_error("-->", request)
                        self.message_id += 1
       -                out += json.dumps(request) + '\n'
       -            while out:
       -                sent = self.socket.send( out )
       -                out = out[sent:]
       +
       +            self.pipe.send_all(requests)
                    return ids
        
        
            def synchronous_get(self, requests, timeout=100000000):
                queue = Queue.Queue()
       -        ids = self.send(requests, lambda i,x: queue.put(x))
       +        ids = self.send(requests, queue.put)
                id2 = ids[:]
                res = {}
                while ids:
       t@@ -189,7 +198,7 @@ class NetworkProxy(threading.Thread):
                return self.status == 'connecting'
        
            def is_up_to_date(self):
       -        return self.synchronous_get([('network.is_up_to_date',[])])[0]
       +        return self.unanswered_requests == {}
        
            def get_parameters(self):
                return self.synchronous_get([('network.get_parameters',[])])[0]
       t@@ -199,6 +208,8 @@ class NetworkProxy(threading.Thread):
        
            def stop(self):
                self.running = False
       +        if self.network:
       +            self.network.stop()
        
            def stop_daemon(self):
                return self.send([('daemon.stop',[])], None)
       t@@ -215,4 +226,3 @@ class NetworkProxy(threading.Thread):
                if callbacks:
                    [callback() for callback in callbacks]
        
       -        print_error("trigger_callback", event, len(callbacks))
   DIR diff --git a/lib/synchronizer.py b/lib/synchronizer.py
       t@@ -54,7 +54,7 @@ class WalletSynchronizer(threading.Thread):
                messages = []
                for addr in addresses:
                    messages.append(('blockchain.address.subscribe', [addr]))
       -        self.network.subscribe( messages, lambda i,r: self.queue.put(r))
       +        self.network.send(messages, lambda r: self.queue.put(r))
        
            def run(self):
                with self.lock:
   DIR diff --git a/lib/util.py b/lib/util.py
       t@@ -223,3 +223,86 @@ def parse_json(message):
            except:
                j = None
            return j, message[n+1:]
       +
       +
       +
       +
       +class timeout(Exception):
       +    pass
       +
       +import socket, json
       +
       +class SocketPipe:
       +
       +    def __init__(self, socket):
       +        self.socket = socket
       +        self.message = ''
       +        self.set_timeout(0.1)
       +
       +    def set_timeout(self, t):
       +        self.socket.settimeout(t)
       +
       +    def get(self):
       +        while True:
       +            response, self.message = parse_json(self.message)
       +            if response:
       +                return response
       +            try:
       +                data = self.socket.recv(1024)
       +            except socket.timeout:
       +                raise timeout
       +            except:
       +                data = ''
       +            if not data:
       +                self.socket.close()
       +                return None
       +            self.message += data
       +
       +    def send(self, request):
       +        out = json.dumps(request) + '\n'
       +        while out:
       +            sent = self.socket.send( out )
       +            out = out[sent:]
       +
       +    def send_all(self, requests):
       +        out = ''.join(map(lambda x: json.dumps(x) + '\n', requests))
       +        while out:
       +            sent = self.socket.send( out )
       +            out = out[sent:]
       +
       +
       +import Queue
       +
       +class QueuePipe:
       +
       +    def __init__(self, send_queue=None, get_queue=None):
       +        self.send_queue = send_queue if send_queue else Queue.Queue()
       +        self.get_queue = get_queue if get_queue else Queue.Queue()
       +        self.set_timeout(0.1)
       +
       +    def get(self):
       +        try:
       +            return self.get_queue.get(timeout=self.timeout)
       +        except Queue.Empty:
       +            raise timeout
       +
       +    def get_all(self):
       +        responses = []
       +        while True:
       +            try:
       +                r = self.get_queue.get_nowait()
       +                responses.append(r)
       +            except Queue.Empty:
       +                break
       +        return responses
       +
       +    def set_timeout(self, t):
       +        self.timeout = t
       +
       +    def send(self, request):
       +        self.send_queue.put(request)
       +
       +    def send_all(self, requests):
       +        for request in requests:
       +            self.send(request)
       +
   DIR diff --git a/lib/wallet.py b/lib/wallet.py
       t@@ -791,7 +791,7 @@ class Abstract_Wallet(object):
                self.network.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast)
                return tx.hash()
        
       -    def on_broadcast(self, i, r):
       +    def on_broadcast(self, r):
                self.tx_result = r.get('result')
                self.tx_event.set()
        
   DIR diff --git a/scripts/block_headers b/scripts/block_headers
       t@@ -4,17 +4,27 @@
        
        import time, electrum
        
       -# 1. start the interface and wait for connection
       -interface = electrum.Interface('ecdsa.net:50002:s')
       -interface.start(wait = True)
       -if not interface.is_connected:
       -    print "not connected"
       -    exit()
       +# start network
       +network = electrum.NetworkProxy(False)
       +network.start()
       +
       +# wait until connected
       +while network.is_connecting():
       +    time.sleep(0.1)
       +
       +if not network.is_connected():
       +    print_msg("daemon is not connected")
       +    sys.exit(1)
        
        # 2. send the subscription
       -callback = lambda _,result: electrum.print_json(result.get('result'))
       -interface.send([('blockchain.headers.subscribe',[])], callback)
       +callback = lambda result: electrum.print_json(result.get('result'))
       +network.send([('blockchain.headers.subscribe',[])], callback)
        
        # 3. wait for results
       -while interface.is_connected:
       -    time.sleep(1)
       +while network.is_connected():
       +    try:
       +        time.sleep(1)
       +    except:
       +        break
       +
       +network.stop()