tredo inter-thread communication using pipes - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 035ecbc7cd41036bf96dd8a9b6b57d09b7c4eabc DIR parent bd3bfb5e535b59f2ad2f0450f4e5a2ce600f2a7b HTML Author: ThomasV <thomasv@gitorious> Date: Sun, 27 Jul 2014 11:33:02 +0200 redo inter-thread communication using pipes Diffstat: M electrum | 11 ++++------- M lib/daemon.py | 187 ++++++++++--------------------- M lib/network.py | 98 +++++++++++++++++++++++-------- M lib/network_proxy.py | 100 +++++++++++++++++-------------- M lib/synchronizer.py | 2 +- M lib/util.py | 83 +++++++++++++++++++++++++++++++ M lib/wallet.py | 2 +- M scripts/block_headers | 30 ++++++++++++++++++++---------- 8 files changed, 298 insertions(+), 215 deletions(-) --- DIR diff --git a/electrum b/electrum t@@ -43,6 +43,7 @@ if is_local: sys.path.append('packages') +from electrum import util from electrum import SimpleConfig, Network, Wallet, WalletStorage, NetworkProxy, Commands, known_commands, pick_random_server from electrum.util import print_msg, print_stderr, print_json, set_verbosity t@@ -150,7 +151,7 @@ def do_start_daemon(): import subprocess logfile = open(os.path.join(config.path, 'daemon.log'),'w') p = subprocess.Popen([__file__,"daemon"], stderr=logfile, stdout=logfile, close_fds=True) - print "starting daemon (PID %d)"%p.pid + print_stderr("starting daemon (PID %d)"%p.pid) def daemon_socket(start_daemon=True): t@@ -222,12 +223,8 @@ if __name__ == '__main__': # network interface if not options.offline: s = daemon_socket(start_daemon=options.daemon) - if s: - network = NetworkProxy(s, config) - network.start() - else: - network = Network(config) - network.start() + network = NetworkProxy(s, config) + network.start() else: network = None DIR diff --git a/lib/daemon.py b/lib/daemon.py t@@ -24,140 +24,72 @@ import threading import traceback import json import Queue + +import util from network import Network from util import print_error, print_stderr, parse_json from simple_config import SimpleConfig -""" -The Network object is not aware of clients/subscribers -It only does subscribe/unsubscribe to addresses -Which client has wich address is managed by the daemon -Network also reports status changes -""" DAEMON_PORT=8001 - - class ClientThread(threading.Thread): - # read messages from client (socket), and sends them to Network - # responses are sent back on the same socket - def __init__(self, server, network, s): + def __init__(self, server, s): threading.Thread.__init__(self) self.server = server self.daemon = True - self.s = s - self.s.settimeout(0.1) - self.network = network - self.queue = Queue.Queue() - self.unanswered_requests = {} - self.debug = False + self.client_pipe = util.SocketPipe(s) + self.daemon_pipe = util.QueuePipe(send_queue = self.server.network.requests_queue) self.server.add_client(self) - - def run(self): - - message = '' - while True: - try: - self.send_responses() - except socket.error: - break - + def reading_thread(self): + while self.running: try: - data = self.s.recv(1024) - except socket.timeout: + request = self.client_pipe.get() + except util.timeout: continue - except: - data = '' - if not data: + if request is None: + self.running = False break - message += data - while True: - cmd, message = parse_json(message) - if not cmd: - break - self.process(cmd) - - self.server.remove_client(self) + if request.get('method') == 'daemon.stop': + self.server.stop() + continue + self.daemon_pipe.send(request) - - def process(self, request): - if self.debug: - print_error("<--", request) - method = request['method'] - params = request['params'] - _id = request['id'] - - if method == ('daemon.stop'): - self.server.stop() - return - - if method.startswith('network.'): - out = {'id':_id} + def run(self): + self.running = True + threading.Thread(target=self.reading_thread).start() + while self.running: try: - f = getattr(self.network, method[8:]) - except AttributeError: - out['error'] = "unknown method" + response = self.daemon_pipe.get() + except util.timeout: + continue try: - out['result'] = f(*params) - except BaseException as e: - out['error'] = str(e) - print_error("network error", str(e)) - - self.queue.put(out) - return - - def cb(i,r): - _id = r.get('id') - if _id is not None: - my_id = self.unanswered_requests.pop(_id) - r['id'] = my_id - self.queue.put(r) - - try: - new_id = self.network.interface.send([(method, params)], cb) [0] - except Exception as e: - self.queue.put({'id':_id, 'error':str(e)}) - print_error("network interface error", str(e)) - return - - self.unanswered_requests[new_id] = _id + self.client_pipe.send(response) + except socket.error: + self.running = False + break + self.server.remove_client(self) - def send_responses(self): - while True: - try: - r = self.queue.get_nowait() - except Queue.Empty: - break - out = json.dumps(r) + '\n' - while out: - n = self.s.send(out) - out = out[n:] - if self.debug: - print_error("-->", r) class NetworkServer: def __init__(self, config): + self.config = config self.network = Network(config) - self.network.trigger_callback = self.trigger_callback - self.network.start() - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.daemon_port = config.get('daemon_port', DAEMON_PORT) - self.socket.bind(('', self.daemon_port)) - self.socket.listen(5) - self.socket.settimeout(1) + # network sends responses on that queue + self.network_queue = Queue.Queue() + self.network.start(self.network_queue) + self.running = False # daemon terminates after period of inactivity self.timeout = config.get('daemon_timeout', 5*60) t@@ -165,16 +97,21 @@ class NetworkServer: # each GUI is a client of the daemon self.clients = [] - # daemon needs to know which client subscribed to which address + # todo: the daemon needs to know which client subscribed to which address + + def is_running(self): + with self.lock: + return self.running def stop(self): + self.network.stop() with self.lock: self.running = False def add_client(self, client): for key in ['status','banner','updated','servers','interfaces']: - value = self.get_status_value(key) - client.queue.put({'method':'network.status', 'params':[key, value]}) + value = self.network.get_status_value(key) + client.daemon_pipe.get_queue.put({'method':'network.status', 'params':[key, value]}) with self.lock: self.clients.append(client) t@@ -184,27 +121,28 @@ class NetworkServer: self.clients.remove(client) print_error("client quit:", len(self.clients)) - def get_status_value(self, key): - if key == 'status': - value = self.network.connection_status - elif key == 'banner': - value = self.network.banner - elif key == 'updated': - value = (self.network.get_local_height(), self.network.get_server_height()) - elif key == 'servers': - value = self.network.get_servers() - elif key == 'interfaces': - value = self.network.get_interfaces() - return value - - def trigger_callback(self, key): - value = self.get_status_value(key) - print_error("daemon trigger callback", key, len(self.clients)) - for client in self.clients: - client.queue.put({'method':'network.status', 'params':[key, value]}) + def main_loop(self): self.running = True + threading.Thread(target=self.listen_thread).start() + while self.is_running(): + try: + response = self.network_queue.get(timeout=0.1) + except Queue.Empty: + continue + for client in self.clients: + client.daemon_pipe.get_queue.put(response) + + print_error("Daemon exiting") + + def listen_thread(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.daemon_port = self.config.get('daemon_port', DAEMON_PORT) + self.socket.bind(('', self.daemon_port)) + self.socket.listen(5) + self.socket.settimeout(1) t = time.time() while self.running: try: t@@ -218,11 +156,10 @@ class NetworkServer: t = time.time() continue t = time.time() - client = ClientThread(self, self.network, connection) + client = ClientThread(self, connection) client.start() - print_error("Daemon exiting") - - + self.stop() + print_error("listen thread exiting") if __name__ == '__main__': DIR diff --git a/lib/network.py b/lib/network.py t@@ -85,7 +85,6 @@ class Network(threading.Thread): self.blockchain = Blockchain(self.config, self) self.interfaces = {} self.queue = Queue.Queue() - self.callbacks = {} self.protocol = self.config.get('protocol','s') self.running = False t@@ -118,6 +117,9 @@ class Network(threading.Thread): self.connection_status = 'connecting' + self.requests_queue = Queue.Queue() + self.unanswered_requests = {} + def get_server_height(self): return self.heights.get(self.default_server,0) t@@ -162,21 +164,22 @@ class Network(threading.Thread): else: return False - - def register_callback(self, event, callback): - with self.lock: - if not self.callbacks.get(event): - self.callbacks[event] = [] - self.callbacks[event].append(callback) - - - def trigger_callback(self, event): - # note: this method is overwritten by daemon - with self.lock: - callbacks = self.callbacks.get(event,[])[:] - if callbacks: - [callback() for callback in callbacks] - + def get_status_value(self, key): + if key == 'status': + value = self.connection_status + elif key == 'banner': + value = self.banner + elif key == 'updated': + value = (self.get_local_height(), self.get_server_height()) + elif key == 'servers': + value = self.get_servers() + elif key == 'interfaces': + value = self.get_interfaces() + return value + + def trigger_callback(self, key): + value = self.get_status_value(key) + self.response_queue.put({'method':'network.status', 'params':[key, value]}) def random_server(self): choice_list = [] t@@ -234,9 +237,13 @@ class Network(threading.Thread): for i in range(self.num_server): self.start_random_interface() - def start(self): + def start(self, response_queue): + self.running = True + self.response_queue = response_queue self.start_interfaces() threading.Thread.start(self) + threading.Thread(target=self.process_thread).start() + self.blockchain.start() def set_parameters(self, host, port, protocol, proxy, auto_connect): self.config.set_key('auto_cycle', auto_connect, True) t@@ -321,16 +328,55 @@ class Network(threading.Thread): print_error( "Server is lagging", blockchain_height, self.get_server_height()) if self.config.get('auto_cycle'): self.set_server(i.server) - self.trigger_callback('updated') - def run(self): - self.blockchain.start() + def process_thread(self): + while self.is_running(): + try: + request = self.requests_queue.get(timeout=0.1) + except Queue.Empty: + continue + self.process(request) - with self.lock: - self.running = True + def process(self, request): + method = request['method'] + params = request['params'] + _id = request['id'] + + if method.startswith('network.'): + out = {'id':_id} + try: + f = getattr(self, method[8:]) + except AttributeError: + out['error'] = "unknown method" + try: + out['result'] = f(*params) + except BaseException as e: + out['error'] = str(e) + print_error("network error", str(e)) + + self.response_queue.put(out) + return + + def cb(i,r): + _id = r.get('id') + if _id is not None: + my_id = self.unanswered_requests.pop(_id) + r['id'] = my_id + self.response_queue.put(r) + + try: + new_id = self.interface.send([(method, params)], cb) [0] + except Exception as e: + self.response_queue.put({'id':_id, 'error':str(e)}) + print_error("network interface error", str(e)) + return + self.unanswered_requests[new_id] = _id + + + def run(self): while self.is_running(): try: i = self.queue.get(timeout = 30 if self.interfaces else 3) t@@ -382,10 +428,8 @@ class Network(threading.Thread): if self.server_is_lagging() and self.config.get('auto_cycle'): print_error( "Server lagging, stopping interface") self.stop_interface() - self.trigger_callback('updated') - def on_peers(self, i, r): if not r: return self.irc_servers = parse_servers(r.get('result')) t@@ -396,10 +440,12 @@ class Network(threading.Thread): self.trigger_callback('banner') def stop(self): - with self.lock: self.running = False + with self.lock: + self.running = False def is_running(self): - with self.lock: return self.running + with self.lock: + return self.running def synchronous_get(self, requests, timeout=100000000): DIR diff --git a/lib/network_proxy.py b/lib/network_proxy.py t@@ -24,25 +24,23 @@ import threading import traceback import json import Queue + +import util from network import Network from util import print_error, print_stderr, parse_json from simple_config import SimpleConfig - from daemon import NetworkServer, DAEMON_PORT - - class NetworkProxy(threading.Thread): def __init__(self, socket, config=None): + if config is None: config = {} # Do not use mutables as default arguments! threading.Thread.__init__(self) self.config = SimpleConfig(config) if type(config) == type({}) else config - self.socket = socket - self.socket.settimeout(0.1) self.message_id = 0 self.unanswered_requests = {} self.subscriptions = {} t@@ -53,6 +51,14 @@ class NetworkProxy(threading.Thread): self.running = True self.daemon = True + if socket: + self.pipe = util.SocketPipe(socket) + self.network = None + else: + self.network = Network(config) + self.pipe = util.QueuePipe(send_queue=self.network.requests_queue) + self.network.start(self.pipe.get_queue) + # status variables self.status = 'connecting' self.servers = {} t@@ -65,35 +71,23 @@ class NetworkProxy(threading.Thread): return self.running def run(self): - # read responses and trigger callbacks - message = '' while self.is_running(): try: - data = self.socket.recv(1024) - except socket.timeout: + response = self.pipe.get() + except util.timeout: continue - except: - data = '' - if not data: + if response is None: break - message += data - while True: - response, message = parse_json(message) - if response is not None: - self.process(response) - else: - break - # fixme: server does not detect if we don't call shutdown - self.socket.shutdown(2) - self.socket.close() + self.process(response) + print_error("NetworkProxy thread terminating") + self.stop() def process(self, response): if self.debug: print_error("<--", response) if response.get('method') == 'network.status': - #print_error("<--", response) key, value = response.get('params') if key == 'status': self.status = value t@@ -109,48 +103,63 @@ class NetworkProxy(threading.Thread): return msg_id = response.get('id') - with self.lock: - method, params, callback = self.unanswered_requests.pop(msg_id) - result = response.get('result') - callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id}) - + if msg_id is not None: + with self.lock: + method, params, callback = self.unanswered_requests.pop(msg_id) + else: + method = response.get('method') + params = response.get('params') + with self.lock: + for k,v in self.subscriptions.items(): + if (method, params) in v: + callback = k + break + else: + print_error( "received unexpected notification", method, params) + return - def subscribe(self, messages, callback): - # detect if it is a subscription - with self.lock: - if self.subscriptions.get(callback) is None: - self.subscriptions[callback] = [] - for message in messages: - if message not in self.subscriptions[callback]: - self.subscriptions[callback].append(message) + callback({'method':method, 'params':params, 'result':result, 'id':msg_id}) - self.send( messages, callback ) def send(self, messages, callback): """return the ids of the requests that we sent""" + + # detect subscriptions + sub = [] + for message in messages: + m, v = message + if m[-10:] == '.subscribe': + sub.append(message) + if sub: + with self.lock: + if self.subscriptions.get(callback) is None: + self.subscriptions[callback] = [] + for message in sub: + if message not in self.subscriptions[callback]: + self.subscriptions[callback].append(message) + with self.lock: - out = '' + requests = [] ids = [] for m in messages: method, params = m request = { 'id':self.message_id, 'method':method, 'params':params } self.unanswered_requests[self.message_id] = method, params, callback ids.append(self.message_id) + requests.append(request) if self.debug: print_error("-->", request) self.message_id += 1 - out += json.dumps(request) + '\n' - while out: - sent = self.socket.send( out ) - out = out[sent:] + + self.pipe.send_all(requests) return ids def synchronous_get(self, requests, timeout=100000000): queue = Queue.Queue() - ids = self.send(requests, lambda i,x: queue.put(x)) + ids = self.send(requests, queue.put) id2 = ids[:] res = {} while ids: t@@ -189,7 +198,7 @@ class NetworkProxy(threading.Thread): return self.status == 'connecting' def is_up_to_date(self): - return self.synchronous_get([('network.is_up_to_date',[])])[0] + return self.unanswered_requests == {} def get_parameters(self): return self.synchronous_get([('network.get_parameters',[])])[0] t@@ -199,6 +208,8 @@ class NetworkProxy(threading.Thread): def stop(self): self.running = False + if self.network: + self.network.stop() def stop_daemon(self): return self.send([('daemon.stop',[])], None) t@@ -215,4 +226,3 @@ class NetworkProxy(threading.Thread): if callbacks: [callback() for callback in callbacks] - print_error("trigger_callback", event, len(callbacks)) DIR diff --git a/lib/synchronizer.py b/lib/synchronizer.py t@@ -54,7 +54,7 @@ class WalletSynchronizer(threading.Thread): messages = [] for addr in addresses: messages.append(('blockchain.address.subscribe', [addr])) - self.network.subscribe( messages, lambda i,r: self.queue.put(r)) + self.network.send(messages, lambda r: self.queue.put(r)) def run(self): with self.lock: DIR diff --git a/lib/util.py b/lib/util.py t@@ -223,3 +223,86 @@ def parse_json(message): except: j = None return j, message[n+1:] + + + + +class timeout(Exception): + pass + +import socket, json + +class SocketPipe: + + def __init__(self, socket): + self.socket = socket + self.message = '' + self.set_timeout(0.1) + + def set_timeout(self, t): + self.socket.settimeout(t) + + def get(self): + while True: + response, self.message = parse_json(self.message) + if response: + return response + try: + data = self.socket.recv(1024) + except socket.timeout: + raise timeout + except: + data = '' + if not data: + self.socket.close() + return None + self.message += data + + def send(self, request): + out = json.dumps(request) + '\n' + while out: + sent = self.socket.send( out ) + out = out[sent:] + + def send_all(self, requests): + out = ''.join(map(lambda x: json.dumps(x) + '\n', requests)) + while out: + sent = self.socket.send( out ) + out = out[sent:] + + +import Queue + +class QueuePipe: + + def __init__(self, send_queue=None, get_queue=None): + self.send_queue = send_queue if send_queue else Queue.Queue() + self.get_queue = get_queue if get_queue else Queue.Queue() + self.set_timeout(0.1) + + def get(self): + try: + return self.get_queue.get(timeout=self.timeout) + except Queue.Empty: + raise timeout + + def get_all(self): + responses = [] + while True: + try: + r = self.get_queue.get_nowait() + responses.append(r) + except Queue.Empty: + break + return responses + + def set_timeout(self, t): + self.timeout = t + + def send(self, request): + self.send_queue.put(request) + + def send_all(self, requests): + for request in requests: + self.send(request) + DIR diff --git a/lib/wallet.py b/lib/wallet.py t@@ -791,7 +791,7 @@ class Abstract_Wallet(object): self.network.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast) return tx.hash() - def on_broadcast(self, i, r): + def on_broadcast(self, r): self.tx_result = r.get('result') self.tx_event.set() DIR diff --git a/scripts/block_headers b/scripts/block_headers t@@ -4,17 +4,27 @@ import time, electrum -# 1. start the interface and wait for connection -interface = electrum.Interface('ecdsa.net:50002:s') -interface.start(wait = True) -if not interface.is_connected: - print "not connected" - exit() +# start network +network = electrum.NetworkProxy(False) +network.start() + +# wait until connected +while network.is_connecting(): + time.sleep(0.1) + +if not network.is_connected(): + print_msg("daemon is not connected") + sys.exit(1) # 2. send the subscription -callback = lambda _,result: electrum.print_json(result.get('result')) -interface.send([('blockchain.headers.subscribe',[])], callback) +callback = lambda result: electrum.print_json(result.get('result')) +network.send([('blockchain.headers.subscribe',[])], callback) # 3. wait for results -while interface.is_connected: - time.sleep(1) +while network.is_connected(): + try: + time.sleep(1) + except: + break + +network.stop()