tasynchronous processing: use a queue, handle responses in wallet class - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit f60f6c28d3d5b0da360cf076ed5652bcd5613a6b DIR parent 39895f41cc72420171de52ea7297b7ffc36dec22 HTML Author: ThomasV <thomasv@gitorious> Date: Fri, 23 Mar 2012 16:34:34 +0100 asynchronous processing: use a queue, handle responses in wallet class Diffstat: M client/electrum | 6 +++--- M client/gui_qt.py | 20 ++++++++++---------- M client/interface.py | 118 ++++++------------------------- M client/wallet.py | 83 ++++++++++++++++++++++++++++++- 4 files changed, 116 insertions(+), 111 deletions(-) --- DIR diff --git a/client/electrum b/client/electrum t@@ -22,6 +22,7 @@ from optparse import OptionParser from wallet import Wallet, SecretToASecret from decimal import Decimal +import thread from wallet import format_satoshis from interface import loop_interfaces_thread, new_interface t@@ -62,8 +63,6 @@ if __name__ == '__main__': firstarg = args[1] if len(args) > 1 else '' if cmd == 'gui': - import thread - if options.gui=='gtk': import gui elif options.gui=='qt': t@@ -169,7 +168,8 @@ if __name__ == '__main__': addresses = wallet.all_addresses() version = wallet.electrum_version interface.start_session(addresses, version) - interface.update_wallet() + thread.start_new_thread(wallet.run, ()) + wallet.update() wallet.save() # check if --from_addr not in wallet (for mktx/payto) DIR diff --git a/client/gui_qt.py b/client/gui_qt.py t@@ -187,10 +187,10 @@ class ElectrumWindow(QMainWindow): def update_wallet(self): if self.wallet.interface.is_connected: - if self.wallet.interface.blocks == 0: + if self.wallet.blocks == 0: text = "Server not ready" icon = QIcon(":icons/status_disconnected.png") - elif not self.wallet.interface.is_up_to_date: + elif not self.wallet.up_to_date: text = "Synchronizing..." icon = QIcon(":icons/status_waiting.png") else: t@@ -208,9 +208,9 @@ class ElectrumWindow(QMainWindow): self.statusBar().showMessage(text) self.status_button.setIcon( icon ) - if self.wallet.interface.was_updated and self.wallet.interface.is_up_to_date: - self.wallet.interface.was_updated = False - self.textbox.setText( self.wallet.interface.message ) + if self.wallet.was_updated and self.wallet.up_to_date: + self.wallet.was_updated = False + self.textbox.setText( self.wallet.banner ) self.update_history_tab() self.update_receive_tab() self.update_contacts_tab() t@@ -236,7 +236,7 @@ class ElectrumWindow(QMainWindow): tx = self.wallet.tx_history.get(tx_hash) if tx['height']: - conf = self.wallet.interface.blocks - tx['height'] + 1 + conf = self.wallet.blocks - tx['height'] + 1 time_str = datetime.datetime.fromtimestamp( tx['nTime']).isoformat(' ')[:-3] else: conf = 0 t@@ -309,7 +309,7 @@ class ElectrumWindow(QMainWindow): for tx in self.wallet.get_tx_history(): tx_hash = tx['tx_hash'] if tx['height']: - conf = self.wallet.interface.blocks - tx['height'] + 1 + conf = self.wallet.blocks - tx['height'] + 1 time_str = datetime.datetime.fromtimestamp( tx['nTime']).isoformat(' ')[:-3] icon = QIcon(":icons/confirmed.png") else: t@@ -832,7 +832,7 @@ class ElectrumWindow(QMainWindow): interface = wallet.interface if parent: if interface.is_connected: - status = "Connected to %s:%d\n%d blocks\nresponse time: %f"%(interface.host, interface.port, interface.blocks, interface.rtime) + status = "Connected to %s:%d\n%d blocks\nresponse time: %f"%(interface.host, interface.port, wallet.blocks, interface.rtime) else: status = "Not connected" host = wallet.host t@@ -926,7 +926,7 @@ class ElectrumGui(): if not is_recovery: wallet.new_seed(None) # generate first key - wallet.synchronize() + #wallet.synchronize() # run a dialog indicating the seed, ask the user to remember it ElectrumWindow.show_seed_dialog(wallet) #ask for password t@@ -935,7 +935,7 @@ class ElectrumGui(): # ask for seed and gap. if not ElectrumWindow.seed_dialog( wallet ): return False wallet.init_mpk( wallet.seed ) # not encrypted at this point - wallet.synchronize() + #wallet.synchronize() if wallet.is_found(): # history and addressbook DIR diff --git a/client/interface.py b/client/interface.py t@@ -25,19 +25,16 @@ DEFAULT_SERVERS = ['electrum.bitcoins.sk','ecdsa.org','electrum.novit.ro'] # li class Interface: - def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None): + def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None, sync_cb=None): self.host = host self.port = port + self.sync_callback = sync_cb self.address_callback = address_callback self.history_callback = history_callback self.newblock_callback = newblock_callback self.servers = [] # actual list from IRC self.rtime = 0 - self.blocks = 0 - self.message = '' - self.was_updated = True # fixme: use a semaphore - self.is_up_to_date = False self.is_connected = False t@@ -45,15 +42,17 @@ class Interface: self.addresses_waiting_for_status = [] self.addresses_waiting_for_history = [] self.tx_event = threading.Event() - self.up_to_date_event = threading.Event() - self.up_to_date_event.clear() #json self.message_id = 0 self.messages = {} - self.responses = Queue.Queue() + + def is_up_to_date(self): + return not ( self.addresses_waiting_for_status or self.addresses_waiting_for_history ) + + def send_tx(self, data): self.tx_event.clear() self.send([('transaction.broadcast', [data])]) t@@ -61,9 +60,6 @@ class Interface: return self.tx_result - def start_session(self, addresses, version): - pass - def queue_json_response(self, c): #print repr(c) t@@ -80,74 +76,16 @@ class Interface: print "received error:", c, method, params else: #self.handle_response(method, params, result) + if method == 'address.subscribe': + addr = params[-1] + if addr in self.addresses_waiting_for_status: + self.addresses_waiting_for_status.remove(addr) + elif method == 'address.get_history': + addr = params[0] + if addr in self.addresses_waiting_for_history: + self.addresses_waiting_for_history.remove(addr) self.responses.put({'method':method, 'params':params, 'result':result}) - #self.is_up_to_date = True - - - - - def handle_response(self, r): - if r is None: - print "empty item" - return - method = r['method'] - params = r['params'] - result = r['result'] - - if method == 'server.banner': - self.message = result - self.was_updated = True - - elif method == 'session.poll': - # native poll - blocks, changed_addresses = result - if blocks == -1: raise BaseException("session not found") - self.blocks = int(blocks) - if changed_addresses: - self.is_up_to_date = False - self.was_updated = True - for addr, status in changed_addresses.items(): - apply(self.address_callback, (addr, status)) - else: - self.is_up_to_date = True - - elif method == 'server.peers': - #print "Received server list: ", result - self.servers = map( lambda x:x[1], result ) - - elif method == 'address.subscribe': - addr = params[-1] - if addr in self.addresses_waiting_for_status: - self.addresses_waiting_for_status.remove(addr) - apply(self.address_callback,(addr, result)) - - elif method == 'address.get_history': - addr = params[0] - if addr in self.addresses_waiting_for_history: - self.addresses_waiting_for_history.remove(addr) - apply(self.history_callback, (addr, result)) - self.was_updated = True - - elif method == 'transaction.broadcast': - self.tx_result = result - self.tx_event.set() - - elif method == 'numblocks.subscribe': - self.blocks = result - if self.newblock_callback: apply(self.newblock_callback,(result,)) - - elif method == 'client.version': - pass - - else: - print "unknown message:", method, params, result - - if self.addresses_waiting_for_status or self.addresses_waiting_for_history: - self.is_up_to_date = False - else: - self.is_up_to_date = True - self.up_to_date_event.set() def subscribe(self, addresses): t@@ -196,11 +134,6 @@ class PollingInterface(Interface): def poll(self): self.send([('session.poll', [])]) - def update_wallet(self): - while True: - self.poll() - if self.is_up_to_date: break - #if is_new or wallet.remote_url: # self.was_updated = True # is_new = wallet.synchronize() t@@ -213,7 +146,7 @@ class PollingInterface(Interface): def poll_thread(self, poll_interval): while self.is_connected: try: - self.update_wallet() + self.poll() time.sleep(poll_interval) except socket.gaierror: break t@@ -281,7 +214,6 @@ class NativeInterface(PollingInterface): if cmd == 'new_session': self.session_id, self.message = ast.literal_eval( out ) - self.was_updated = True else: self.responses.put({'method':method, 'params':params, 'result':out}) t@@ -379,9 +311,6 @@ class AsynchronousInterface(Interface): self.is_connected = False self.responses.put(None) - def update_wallet(self): - self.up_to_date_event.wait() - def send(self, messages): out = '' for m in messages: t@@ -416,8 +345,6 @@ def new_interface(wallet): else: host = random.choice( DEFAULT_SERVERS ) # random choice when the wallet is created port = wallet.port - address_cb = wallet.receive_status_callback - history_cb = wallet.receive_history_callback if port == 50000: InterfaceClass = NativeInterface t@@ -429,21 +356,20 @@ def new_interface(wallet): print "unknown port number: %d. using native protocol."%port InterfaceClass = NativeInterface - interface = InterfaceClass(host, port, address_cb, history_cb) - + interface = InterfaceClass(host, port) + return interface def loop_interfaces_thread(wallet): + while True: + interface = wallet.interface try: addresses = wallet.all_addresses() version = wallet.electrum_version - wallet.interface.start_session(addresses, version) - - while wallet.interface.is_connected: - response = wallet.interface.responses.get() - wallet.interface.handle_response(response) + interface.start_session(addresses, version) + wallet.run() print "Disconnected" except socket.error: DIR diff --git a/client/wallet.py b/client/wallet.py t@@ -17,7 +17,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import sys, base64, os, re, hashlib, copy, operator, ast +import sys, base64, os, re, hashlib, copy, operator, ast, threading try: import ecdsa t@@ -265,6 +265,12 @@ class Wallet: self.imported_keys = {} self.remote_url = None + self.was_updated = False + self.blocks = 0 + self.banner = '' + self.up_to_date_event = threading.Event() + self.up_to_date_event.clear() + def set_server(self, host, port): if host!= self.host or port!=self.port: t@@ -461,6 +467,8 @@ class Wallet: def synchronize(self): + if not self.master_public_key: + return False is_new = False while True: t@@ -495,6 +503,7 @@ class Wallet: return is_new + def get_remote_number(self): import jsonrpclib server = jsonrpclib.Server(self.remote_url) t@@ -708,7 +717,6 @@ class Wallet: def receive_history_callback(self, addr, data): #print "updating history for", addr self.history[addr] = data - self.synchronize() self.update_tx_history() self.save() t@@ -920,3 +928,74 @@ class Wallet: address = address + ' <' + payto_address + '>' return address, amount, label, message, signature, identity, url + + + + def handle_response(self, r): + if r is None: + print "empty item" + return + + method = r['method'] + params = r['params'] + result = r['result'] + + if method == 'server.banner': + self.banner = result + self.was_updated = True + + elif method == 'session.poll': + # native poll + blocks, changed_addresses = result + if blocks == -1: raise BaseException("session not found") + self.blocks = int(blocks) + if changed_addresses: + self.is_up_to_date = False + self.was_updated = True + for addr, status in changed_addresses.items(): + self.receive_status_callback(addr, status) + else: + self.is_up_to_date = True + + elif method == 'server.peers': + #print "Received server list: ", result + self.servers = map( lambda x:x[1], result ) + + elif method == 'address.subscribe': + addr = params[-1] + self.receive_status_callback(addr, result) + + elif method == 'address.get_history': + addr = params[0] + self.receive_history_callback(addr, result) + self.was_updated = True + + elif method == 'transaction.broadcast': + self.tx_result = result + self.tx_event.set() + + elif method == 'numblocks.subscribe': + self.blocks = result + #self.newblock_callback,(result,)) + + elif method == 'client.version': + pass + + else: + print "unknown message:", method, params, result + + def update(self): + self.up_to_date_event.wait() + + def run(self): + while self.interface.is_connected: + new = self.synchronize() + if self.interface.is_up_to_date() and not new: + self.up_to_date = True + self.up_to_date_event.set() + else: + self.up_to_date = False + + response = self.interface.responses.get() + self.handle_response(response) +