URI: 
       tasynchronous processing: use a queue, handle responses in wallet class - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit f60f6c28d3d5b0da360cf076ed5652bcd5613a6b
   DIR parent 39895f41cc72420171de52ea7297b7ffc36dec22
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Fri, 23 Mar 2012 16:34:34 +0100
       
       asynchronous processing: use a queue, handle responses in wallet class
       
       Diffstat:
         M client/electrum                     |       6 +++---
         M client/gui_qt.py                    |      20 ++++++++++----------
         M client/interface.py                 |     118 ++++++-------------------------
         M client/wallet.py                    |      83 ++++++++++++++++++++++++++++++-
       
       4 files changed, 116 insertions(+), 111 deletions(-)
       ---
   DIR diff --git a/client/electrum b/client/electrum
       t@@ -22,6 +22,7 @@ from optparse import OptionParser
        from wallet import Wallet, SecretToASecret
        from decimal import Decimal
        
       +import thread
        
        from wallet import format_satoshis
        from interface import loop_interfaces_thread, new_interface
       t@@ -62,8 +63,6 @@ if __name__ == '__main__':
                firstarg = args[1] if len(args) > 1 else ''
                
            if cmd == 'gui':
       -        import thread
       -
                if options.gui=='gtk':
                    import gui
                elif options.gui=='qt':
       t@@ -169,7 +168,8 @@ if __name__ == '__main__':
                addresses = wallet.all_addresses()
                version = wallet.electrum_version
                interface.start_session(addresses, version)
       -        interface.update_wallet()
       +        thread.start_new_thread(wallet.run, ())
       +        wallet.update()
                wallet.save()
        
            # check if --from_addr not in wallet (for mktx/payto)
   DIR diff --git a/client/gui_qt.py b/client/gui_qt.py
       t@@ -187,10 +187,10 @@ class ElectrumWindow(QMainWindow):
        
            def update_wallet(self):
                if self.wallet.interface.is_connected:
       -            if self.wallet.interface.blocks == 0:
       +            if self.wallet.blocks == 0:
                        text = "Server not ready"
                        icon = QIcon(":icons/status_disconnected.png")
       -            elif not self.wallet.interface.is_up_to_date:
       +            elif not self.wallet.up_to_date:
                        text = "Synchronizing..."
                        icon = QIcon(":icons/status_waiting.png")
                    else:
       t@@ -208,9 +208,9 @@ class ElectrumWindow(QMainWindow):
                self.statusBar().showMessage(text)
                self.status_button.setIcon( icon )
        
       -        if self.wallet.interface.was_updated and self.wallet.interface.is_up_to_date:
       -            self.wallet.interface.was_updated = False
       -            self.textbox.setText( self.wallet.interface.message )
       +        if self.wallet.was_updated and self.wallet.up_to_date:
       +            self.wallet.was_updated = False
       +            self.textbox.setText( self.wallet.banner )
                    self.update_history_tab()
                    self.update_receive_tab()
                    self.update_contacts_tab()
       t@@ -236,7 +236,7 @@ class ElectrumWindow(QMainWindow):
                tx = self.wallet.tx_history.get(tx_hash)
        
                if tx['height']:
       -            conf = self.wallet.interface.blocks - tx['height'] + 1
       +            conf = self.wallet.blocks - tx['height'] + 1
                    time_str = datetime.datetime.fromtimestamp( tx['nTime']).isoformat(' ')[:-3]
                else:
                    conf = 0
       t@@ -309,7 +309,7 @@ class ElectrumWindow(QMainWindow):
                for tx in self.wallet.get_tx_history():
                    tx_hash = tx['tx_hash']
                    if tx['height']:
       -                conf = self.wallet.interface.blocks - tx['height'] + 1
       +                conf = self.wallet.blocks - tx['height'] + 1
                        time_str = datetime.datetime.fromtimestamp( tx['nTime']).isoformat(' ')[:-3]
                        icon = QIcon(":icons/confirmed.png")
                    else:
       t@@ -832,7 +832,7 @@ class ElectrumWindow(QMainWindow):
                interface = wallet.interface
                if parent:
                    if interface.is_connected:
       -                status = "Connected to %s:%d\n%d blocks\nresponse time: %f"%(interface.host, interface.port, interface.blocks, interface.rtime)
       +                status = "Connected to %s:%d\n%d blocks\nresponse time: %f"%(interface.host, interface.port, wallet.blocks, interface.rtime)
                    else:
                        status = "Not connected"
                    host = wallet.host
       t@@ -926,7 +926,7 @@ class ElectrumGui():
                if not is_recovery:
                    wallet.new_seed(None)
                    # generate first key
       -            wallet.synchronize()
       +            #wallet.synchronize()
                    # run a dialog indicating the seed, ask the user to remember it
                    ElectrumWindow.show_seed_dialog(wallet)
                    #ask for password
       t@@ -935,7 +935,7 @@ class ElectrumGui():
                    # ask for seed and gap.
                    if not ElectrumWindow.seed_dialog( wallet ): return False
                    wallet.init_mpk( wallet.seed )  # not encrypted at this point
       -            wallet.synchronize()
       +            #wallet.synchronize()
        
                    if wallet.is_found():
                        # history and addressbook
   DIR diff --git a/client/interface.py b/client/interface.py
       t@@ -25,19 +25,16 @@ DEFAULT_SERVERS = ['electrum.bitcoins.sk','ecdsa.org','electrum.novit.ro']  # li
        
        
        class Interface:
       -    def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
       +    def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None, sync_cb=None):
                self.host = host
                self.port = port
       +        self.sync_callback = sync_cb
                self.address_callback = address_callback
                self.history_callback = history_callback
                self.newblock_callback = newblock_callback
        
                self.servers = [] # actual list from IRC
                self.rtime = 0
       -        self.blocks = 0 
       -        self.message = ''
       -        self.was_updated = True # fixme: use a semaphore
       -        self.is_up_to_date = False
        
                self.is_connected = False
        
       t@@ -45,15 +42,17 @@ class Interface:
                self.addresses_waiting_for_status = []
                self.addresses_waiting_for_history = []
                self.tx_event = threading.Event()
       -        self.up_to_date_event = threading.Event()
       -        self.up_to_date_event.clear()
        
                #json
                self.message_id = 0
                self.messages = {}
       -
                self.responses = Queue.Queue()
        
       +
       +    def is_up_to_date(self):
       +        return not ( self.addresses_waiting_for_status or self.addresses_waiting_for_history )
       +
       +
            def send_tx(self, data):
                self.tx_event.clear()
                self.send([('transaction.broadcast', [data])])
       t@@ -61,9 +60,6 @@ class Interface:
                return self.tx_result
        
        
       -    def start_session(self, addresses, version):
       -        pass
       -
        
            def queue_json_response(self, c):
                #print repr(c)
       t@@ -80,74 +76,16 @@ class Interface:
                    print "received error:", c, method, params
                else:
                    #self.handle_response(method, params, result)
       +            if method == 'address.subscribe':
       +                addr = params[-1]
       +                if addr in self.addresses_waiting_for_status:
       +                    self.addresses_waiting_for_status.remove(addr)
       +            elif method == 'address.get_history':
       +                addr = params[0]
       +                if addr in self.addresses_waiting_for_history:
       +                    self.addresses_waiting_for_history.remove(addr)
                    self.responses.put({'method':method, 'params':params, 'result':result})
       -            #self.is_up_to_date = True
       -        
       -
       -
       -
       -    def handle_response(self, r):
       -        if r is None:
       -            print "empty item"
       -            return
        
       -        method = r['method']
       -        params = r['params']
       -        result = r['result']
       -
       -        if method == 'server.banner':
       -            self.message = result
       -            self.was_updated = True
       -
       -        elif method == 'session.poll':
       -            # native poll
       -            blocks, changed_addresses = result 
       -            if blocks == -1: raise BaseException("session not found")
       -            self.blocks = int(blocks)
       -            if changed_addresses:
       -                self.is_up_to_date = False
       -                self.was_updated = True
       -                for addr, status in changed_addresses.items():
       -                    apply(self.address_callback, (addr, status))
       -            else:
       -                self.is_up_to_date = True
       -
       -        elif method == 'server.peers':
       -            #print "Received server list: ", result
       -            self.servers = map( lambda x:x[1], result )
       -
       -        elif method == 'address.subscribe':
       -            addr = params[-1]
       -            if addr in self.addresses_waiting_for_status:
       -                self.addresses_waiting_for_status.remove(addr)
       -            apply(self.address_callback,(addr, result))
       -                            
       -        elif method == 'address.get_history':
       -            addr = params[0]
       -            if addr in self.addresses_waiting_for_history:
       -                self.addresses_waiting_for_history.remove(addr)
       -            apply(self.history_callback, (addr, result))
       -            self.was_updated = True
       -
       -        elif method == 'transaction.broadcast':
       -            self.tx_result = result
       -            self.tx_event.set()
       -
       -        elif method == 'numblocks.subscribe':
       -            self.blocks = result
       -            if self.newblock_callback: apply(self.newblock_callback,(result,))
       -
       -        elif method == 'client.version':
       -            pass
       -
       -        else:
       -            print "unknown message:", method, params, result
       -
       -        if self.addresses_waiting_for_status or self.addresses_waiting_for_history:
       -            self.is_up_to_date = False
       -        else:
       -            self.is_up_to_date = True
       -            self.up_to_date_event.set()
        
        
            def subscribe(self, addresses):
       t@@ -196,11 +134,6 @@ class PollingInterface(Interface):
            def poll(self):
                self.send([('session.poll', [])])
        
       -    def update_wallet(self):
       -        while True:
       -            self.poll()
       -            if self.is_up_to_date: break
       -
                #if is_new or wallet.remote_url:
                #    self.was_updated = True
                #    is_new = wallet.synchronize()
       t@@ -213,7 +146,7 @@ class PollingInterface(Interface):
            def poll_thread(self, poll_interval):
                while self.is_connected:
                    try:
       -                self.update_wallet()
       +                self.poll()
                        time.sleep(poll_interval)
                    except socket.gaierror:
                        break
       t@@ -281,7 +214,6 @@ class NativeInterface(PollingInterface):
        
                    if cmd == 'new_session':
                        self.session_id, self.message = ast.literal_eval( out )
       -                self.was_updated = True
                    else:
                        self.responses.put({'method':method, 'params':params, 'result':out})
        
       t@@ -379,9 +311,6 @@ class AsynchronousInterface(Interface):
                self.is_connected = False
                self.responses.put(None)
        
       -    def update_wallet(self):
       -        self.up_to_date_event.wait()
       -
            def send(self, messages):
                out = ''
                for m in messages:
       t@@ -416,8 +345,6 @@ def new_interface(wallet):
            else:
                host = random.choice( DEFAULT_SERVERS )         # random choice when the wallet is created
            port = wallet.port
       -    address_cb = wallet.receive_status_callback
       -    history_cb = wallet.receive_history_callback
        
            if port == 50000:
                InterfaceClass = NativeInterface
       t@@ -429,21 +356,20 @@ def new_interface(wallet):
                print "unknown port number: %d. using native protocol."%port
                InterfaceClass = NativeInterface
        
       -    interface = InterfaceClass(host, port, address_cb, history_cb)
       -        
       +    interface = InterfaceClass(host, port)
       +    
            return interface
        
        
        def loop_interfaces_thread(wallet):
       +    
            while True:
       +        interface = wallet.interface
                try:
                    addresses = wallet.all_addresses()
                    version = wallet.electrum_version
       -            wallet.interface.start_session(addresses, version)
       -
       -            while wallet.interface.is_connected:
       -                response = wallet.interface.responses.get()
       -                wallet.interface.handle_response(response)
       +            interface.start_session(addresses, version)
       +            wallet.run()
        
                    print "Disconnected"
                except socket.error:
   DIR diff --git a/client/wallet.py b/client/wallet.py
       t@@ -17,7 +17,7 @@
        # along with this program. If not, see <http://www.gnu.org/licenses/>.
        
        
       -import sys, base64, os, re, hashlib, copy, operator, ast
       +import sys, base64, os, re, hashlib, copy, operator, ast, threading
        
        try:
            import ecdsa  
       t@@ -265,6 +265,12 @@ class Wallet:
                self.imported_keys = {}
                self.remote_url = None
        
       +        self.was_updated = False 
       +        self.blocks = 0 
       +        self.banner = ''
       +        self.up_to_date_event = threading.Event()
       +        self.up_to_date_event.clear()
       +
        
            def set_server(self, host, port):
                if host!= self.host or port!=self.port:
       t@@ -461,6 +467,8 @@ class Wallet:
        
        
            def synchronize(self):
       +        if not self.master_public_key:
       +            return False
        
                is_new = False
                while True:
       t@@ -495,6 +503,7 @@ class Wallet:
        
                return is_new
        
       +
            def get_remote_number(self):
                import jsonrpclib
                server = jsonrpclib.Server(self.remote_url)
       t@@ -708,7 +717,6 @@ class Wallet:
            def receive_history_callback(self, addr, data):
                #print "updating history for", addr
                self.history[addr] = data
       -        self.synchronize()
                self.update_tx_history()
                self.save()
        
       t@@ -920,3 +928,74 @@ class Wallet:
                        address = address + ' <' + payto_address + '>'
        
                return address, amount, label, message, signature, identity, url
       +
       +
       +
       +    def handle_response(self, r):
       +        if r is None:
       +            print "empty item"
       +            return
       +
       +        method = r['method']
       +        params = r['params']
       +        result = r['result']
       +
       +        if method == 'server.banner':
       +            self.banner = result
       +            self.was_updated = True
       +
       +        elif method == 'session.poll':
       +            # native poll
       +            blocks, changed_addresses = result 
       +            if blocks == -1: raise BaseException("session not found")
       +            self.blocks = int(blocks)
       +            if changed_addresses:
       +                self.is_up_to_date = False
       +                self.was_updated = True
       +                for addr, status in changed_addresses.items():
       +                    self.receive_status_callback(addr, status)
       +            else:
       +                self.is_up_to_date = True
       +
       +        elif method == 'server.peers':
       +            #print "Received server list: ", result
       +            self.servers = map( lambda x:x[1], result )
       +
       +        elif method == 'address.subscribe':
       +            addr = params[-1]
       +            self.receive_status_callback(addr, result)
       +                            
       +        elif method == 'address.get_history':
       +            addr = params[0]
       +            self.receive_history_callback(addr, result)
       +            self.was_updated = True
       +
       +        elif method == 'transaction.broadcast':
       +            self.tx_result = result
       +            self.tx_event.set()
       +
       +        elif method == 'numblocks.subscribe':
       +            self.blocks = result
       +            #self.newblock_callback,(result,))
       +
       +        elif method == 'client.version':
       +            pass
       +
       +        else:
       +            print "unknown message:", method, params, result
       +
       +    def update(self):
       +        self.up_to_date_event.wait()
       +
       +    def run(self):
       +        while self.interface.is_connected:
       +            new = self.synchronize()
       +            if self.interface.is_up_to_date() and not new:
       +                self.up_to_date = True
       +                self.up_to_date_event.set()
       +            else:
       +                self.up_to_date = False
       +
       +            response = self.interface.responses.get()
       +            self.handle_response(response)
       +