tinterface: send from same thread and simplify timeouts - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 45fd3ef3436eb5122fac87daba2a9f0a8c38a695 DIR parent 78f5dbb72ebb86420c46844aaad96798c6f5c3e9 HTML Author: ThomasV <thomasv@gitorious> Date: Wed, 6 May 2015 16:42:18 +0200 interface: send from same thread and simplify timeouts Diffstat: M lib/interface.py | 63 +++++++++++++------------------ 1 file changed, 27 insertions(+), 36 deletions(-) --- DIR diff --git a/lib/interface.py b/lib/interface.py t@@ -59,13 +59,11 @@ class TcpInterface(threading.Thread): self.debug = False # dump network messages. can be changed at runtime using the console self.message_id = 0 self.response_queue = response_queue - self.lock = threading.Lock() + self.request_queue = Queue.Queue() self.unanswered_requests = {} # request timeouts - self.request_time = False - # are we waiting for a pong? - self.is_ping = False - self.ping_time = 0 + self.response_time = time.time() + self.ping_time = time.time() # parse server self.server = server self.host, self.port, self.protocol = self.server.split(':') t@@ -84,8 +82,7 @@ class TcpInterface(threading.Thread): result = response.get('result') if msg_id is not None: - with self.lock: - method, params, _id, queue = self.unanswered_requests.pop(msg_id) + method, params, _id, queue = self.unanswered_requests.pop(msg_id) if queue is None: queue = self.response_queue else: t@@ -108,7 +105,6 @@ class TcpInterface(threading.Thread): if method == 'server.version': self.server_version = result - self.is_ping = False return if error: t@@ -252,7 +248,13 @@ class TcpInterface(threading.Thread): return s def send_request(self, request, response_queue = None): - with self.lock: + '''Queue a request. Blocking only if called from other threads.''' + self.request_queue.put((request, response_queue), threading.current_thread() != self) + + def send_requests(self): + '''Sends all queued requests''' + while self.is_connected() and not self.request_queue.empty(): + request, response_queue = self.request_queue.get() method = request.get('method') params = request.get('params') r = {'id': self.message_id, 'method': method, 'params': params} t@@ -278,46 +280,32 @@ class TcpInterface(threading.Thread): self.print_error("stopped") def maybe_ping(self): - # ping the server with server.version? + # ping the server with server.version if time.time() - self.ping_time > 60: - if self.is_ping: - self.print_error("ping timeout") - self.stop() - else: - self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) - self.is_ping = True - self.ping_time = time.time() - - def get_and_process_one_response(self): + self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) + self.ping_time = time.time() + # stop interface if we have been waiting for more than 10 seconds + if self.unanswered_requests and time.time() - self.response_time > 10: + self.print_error("interface timeout", len(self.unanswered_requests)) + self.stop() + + def get_response(self): if self.is_connected(): try: response = self.pipe.get() + self.response_time = time.time() except util.timeout: - if self.unanswered_requests: - if self.request_time is False: - self.request_time = time.time() - self.print_error("setting timer") - else: - if time.time() - self.request_time > 10: - self.print_error("request timeout", len(self.unanswered_requests)) - self.stop() return - - if self.request_time is not False: - self.print_error("stopping timer") - self.request_time = False - # If remote side closed the socket, SocketPipe closes our socket and returns None if response is None: self.connected = False - else: - self.process_response(response) + return response def run(self): self.s = self.get_socket() if self.s: self.pipe = util.SocketPipe(self.s) - self.s.settimeout(2) + self.s.settimeout(0.1) self.connected = True self.print_error("connected") t@@ -327,7 +315,10 @@ class TcpInterface(threading.Thread): while self.connected: self.maybe_ping() - self.get_and_process_one_response() + self.send_requests() + response = self.get_response() + if response: + self.process_response(response) self.change_status()