URI: 
       twallet synchronizer thread - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit c3bbc35fa865fcf4fc18bdd1329b59a1ad4eba7b
   DIR parent 96eaf00af855fe59b81bdd5fcce1e818ad31dc10
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Sat, 31 Mar 2012 11:47:16 +0200
       
       wallet synchronizer thread
       
       Diffstat:
         M client/electrum                     |      14 ++++----------
         M client/interface.py                 |     206 +++++++++++++++++++++++++------
         M client/wallet.py                    |     136 ++++---------------------------
       
       3 files changed, 189 insertions(+), 167 deletions(-)
       ---
   DIR diff --git a/client/electrum b/client/electrum
       t@@ -20,12 +20,9 @@ import re, sys, getpass
        
        from optparse import OptionParser
        from wallet import Wallet, SecretToASecret
       +from interface import WalletSynchronizer
        from decimal import Decimal
       -
       -import thread
       -
        from wallet import format_satoshis
       -from interface import loop_interfaces_thread
        
        known_commands = ['help', 'validateaddress', 'balance', 'contacts', 'create', 'restore', 'payto', 'sendtx', 'password', 'addresses', 'history', 'label', 'mktx','seed','import','signmessage','verifymessage','eval']
        offline_commands = ['password', 'mktx', 'history', 'label', 'contacts', 'help', 'validateaddress', 'signmessage', 'verifymessage', 'eval', 'create', 'addresses', 'import', 'seed']
       t@@ -71,7 +68,7 @@ if __name__ == '__main__':
                    exit(1)
        
                gui = gui.ElectrumGui(wallet)
       -        thread.start_new_thread(loop_interfaces_thread, (wallet,))
       +        WalletSynchronizer(wallet,True).start()
        
                try:
                    found = wallet.file_exists
       t@@ -138,11 +135,9 @@ if __name__ == '__main__':
                        sys.exit(1)
        
                    wallet.seed = str(seed)
       -            wallet.start_interface()
       +            WalletSynchronizer(wallet).start()
                    print "recovering wallet..."
                    wallet.init_mpk( wallet.seed )
       -            wallet.start_interface()
       -            thread.start_new_thread(wallet.run, ())
                    wallet.update()
                    if wallet.is_found():
                        wallet.fill_addressbook()
       t@@ -175,8 +170,7 @@ if __name__ == '__main__':
        
            # open session
            if cmd not in offline_commands:
       -        wallet.start_interface()
       -        thread.start_new_thread(wallet.run, ())
       +        WalletSynchronizer(wallet).start()
                wallet.update()
                wallet.save()
        
   DIR diff --git a/client/interface.py b/client/interface.py
       t@@ -17,8 +17,8 @@
        # along with this program. If not, see <http://www.gnu.org/licenses/>.
        
        
       -import random, socket, ast
       -import thread, threading, traceback, sys, time, json, Queue
       +import random, socket, ast, re
       +import threading, traceback, sys, time, json, Queue
        
        DEFAULT_TIMEOUT = 5
        DEFAULT_SERVERS = ['ecdsa.org:50001:t'] #  ['electrum.bitcoins.sk','ecdsa.org','electrum.novit.ro']  # list of default servers
       t@@ -33,8 +33,10 @@ def old_to_new(s):
            return s
        
        
       -class Interface:
       +class Interface(threading.Thread):
            def __init__(self, host, port):
       +        threading.Thread.__init__(self)
       +        self.daemon = True
                self.host = host
                self.port = port
        
       t@@ -98,7 +100,6 @@ class Interface:
        
            def start_session(self, addresses, version):
                #print "Starting new session: %s:%d"%(self.host,self.port)
       -        self.start()
                self.send([('server.version', [version]), ('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])])
                self.subscribe(addresses)
        
       t@@ -106,13 +107,15 @@ class Interface:
        class PollingInterface(Interface):
            """ non-persistent connection. synchronous calls"""
        
       +    def __init__(self, host, port):
       +        Interface.__init__(self, host, port)
       +        self.session_id = None
        
            def get_history(self, address):
                self.send([('blockchain.address.get_history', [address] )])
        
            def poll(self):
       -        self.send([('session.poll', [])])
       -
       +        pass
                #if is_new or wallet.remote_url:
                #    self.was_updated = True
                #    is_new = wallet.synchronize()
       t@@ -122,10 +125,12 @@ class PollingInterface(Interface):
                #else:
                #    return False
        
       -    def poll_thread(self):
       +    def run(self):
       +        self.is_connected = True
                while self.is_connected:
                    try:
       -                self.poll()
       +                if self.session_id:
       +                    self.poll()
                        time.sleep(self.poll_interval)
                    except socket.gaierror:
                        break
       t@@ -136,7 +141,7 @@ class PollingInterface(Interface):
                        break
                    
                self.is_connected = False
       -        self.responses.put(None)
       +        self.poke()
        
                        
        
       t@@ -148,7 +153,9 @@ class NativeInterface(PollingInterface):
            def start_session(self, addresses, version):
                self.send([('session.new', [ version, addresses ])] )
                self.send([('server.peers.subscribe',[])])
       -        thread.start_new_thread(self.poll_thread, ())
       +
       +    def poll(self):
       +        self.send([('session.poll', [])])
        
            def send(self, messages):
                import time
       t@@ -211,13 +218,8 @@ class NativeInterface(PollingInterface):
        
        class HttpInterface(PollingInterface):
        
       -    def start(self):
       -        self.session_id = None
       -        thread.start_new_thread(self.poll_thread, ())
       -
            def poll(self):
       -        if self.session_id:
       -            self.send( [] )
       +        self.send([])
        
            def send(self, messages):
                import urllib2, json, time, cookielib
       t@@ -278,13 +280,25 @@ class HttpInterface(PollingInterface):
        class AsynchronousInterface(Interface):
            """json-rpc over persistent TCP connection, asynchronous"""
        
       -    def listen_thread(self):
       +    def __init__(self, host, port):
       +        Interface.__init__(self, host, port)
       +        self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
       +        self.s.settimeout(5)
       +        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                try:
       +            self.s.connect(( self.host, self.port))
                    self.is_connected = True
       +        except:
       +            self.is_connected = False
       +            print "not connected"
       +
       +    def run(self):
       +        try:
                    out = ''
                    while self.is_connected:
                        try: msg = self.s.recv(1024)
       -                except socket.timeout: continue
       +                except socket.timeout: 
       +                    continue
                        out += msg
                        if msg == '': 
                            self.is_connected = False
       t@@ -316,31 +330,149 @@ class AsynchronousInterface(Interface):
            def get_history(self, addr):
                self.send([('blockchain.address.get_history', [addr])])
        
       -    def start(self):
       -        self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
       -        self.s.settimeout(5)
       -        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
       -        self.s.connect(( self.host, self.port))
       -        thread.start_new_thread(self.listen_thread, ())
        
        
        
        
       +class WalletSynchronizer(threading.Thread):
       +
       +    def __init__(self, wallet, loop=False):
       +        threading.Thread.__init__(self)
       +        self.daemon = True
       +        self.wallet = wallet
       +        self.loop = loop
       +        self.start_interface()
        
       -    
        
       -def loop_interfaces_thread(wallet):
       -    while True:
       +    def handle_response(self, r):
       +        if r is None:
       +            return
       +
       +        method = r['method']
       +        params = r['params']
       +        result = r['result']
       +
       +        if method == 'server.banner':
       +            self.wallet.banner = result
       +            self.wallet.was_updated = True
       +
       +        elif method == 'session.poll':
       +            # native poll
       +            blocks, changed_addresses = result 
       +            if blocks == -1: raise BaseException("session not found")
       +            self.wallet.blocks = int(blocks)
       +            if changed_addresses:
       +                self.wallet.was_updated = True
       +                for addr, status in changed_addresses.items():
       +                    self.wallet.receive_status_callback(addr, status)
       +
       +        elif method == 'server.peers.subscribe':
       +            servers = []
       +            for item in result:
       +                s = []
       +                host = item[1]
       +                if len(item)>2:
       +                    for v in item[2]:
       +                        if re.match("[thn]\d+",v):
       +                            s.append(host+":"+v[1:]+":"+v[0])
       +                    #if not s:
       +                    #    s.append(host+":50000:n")
       +                #else:
       +                #    s.append(host+":50000:n")
       +                servers = servers + s
       +            self.interface.servers = servers
       +
       +        elif method == 'blockchain.address.subscribe':
       +            addr = params[-1]
       +            self.wallet.receive_status_callback(addr, result)
       +                            
       +        elif method == 'blockchain.address.get_history':
       +            addr = params[0]
       +            self.wallet.receive_history_callback(addr, result)
       +            self.wallet.was_updated = True
       +
       +        elif method == 'blockchain.transaction.broadcast':
       +            self.wallet.tx_result = result
       +            self.wallet.tx_event.set()
       +
       +        elif method == 'blockchain.numblocks.subscribe':
       +            self.wallet.blocks = result
       +
       +        elif method == 'server.version':
       +            pass
       +
       +        else:
       +            print "unknown message:", method, params, result
       +
       +
       +    def start_interface(self):
                try:
       -            wallet.start_interface()
       -            wallet.run()
       -        except socket.error:
       -            print "socket error"
       -            wallet.interface.is_connected = False
       -            time.sleep(5)
       +            host, port, protocol = self.wallet.server.split(':')
       +            port = int(port)
                except:
       -            traceback.print_exc(file=sys.stdout)
       -            wallet.interface.is_connected = False
       -            time.sleep(5)
       -            continue
       +            self.wallet.pick_random_server()
       +            host, port, protocol = self.wallet.server.split(':')
       +            port = int(port)
       +
       +        #print protocol, host, port
       +        if protocol == 'n':
       +            InterfaceClass = NativeInterface
       +        elif protocol == 't':
       +            InterfaceClass = AsynchronousInterface
       +        elif protocol == 'h':
       +            InterfaceClass = HttpInterface
       +        else:
       +            print "unknown protocol"
       +            InterfaceClass = NativeInterface
       +
       +        self.interface = InterfaceClass(host, port)
       +        self.wallet.interface = self.interface
       +
       +        with self.wallet.lock:
       +            self.wallet.addresses_waiting_for_status = []
       +            self.wallet.addresses_waiting_for_history = []
       +            addresses = self.wallet.all_addresses()
       +            version = self.wallet.electrum_version
       +            for addr in addresses:
       +                self.wallet.addresses_waiting_for_status.append(addr)
       +
       +        try:
       +            self.interface.start()
       +            self.interface.start_session(addresses,version)
       +        except:
       +            self.interface.is_connected = False
       +
       +
       +    def run(self):
       +        import socket, time
       +        while True:
       +            try:
       +                while self.interface.is_connected:
       +                    new_addresses = self.wallet.synchronize()
       +                    if new_addresses:
       +                        self.interface.subscribe(new_addresses)
       +                        for addr in new_addresses:
       +                            with self.wallet.lock:
       +                                self.wallet.addresses_waiting_for_status.append(addr)
       +
       +                    if self.wallet.is_up_to_date():
       +                        self.wallet.up_to_date = True
       +                        self.wallet.up_to_date_event.set()
       +                    else:
       +                        self.wallet.up_to_date = False
       +
       +                    response = self.interface.responses.get(True,100000000000) # workaround so that it can be keyboard interrupted
       +                    self.handle_response(response)
       +            except socket.error:
       +                print "socket error"
       +                wallet.interface.is_connected = False
       +
       +            if self.loop:
       +                time.sleep(5)
       +                self.start_interface()
       +                continue
       +            else:
       +                break
       +
       +
        
   DIR diff --git a/client/wallet.py b/client/wallet.py
       t@@ -271,7 +271,7 @@ class Wallet:
                self.up_to_date_event = threading.Event()
                self.up_to_date_event.clear()
                self.up_to_date = False
       -        self.interface_lock = threading.Lock()
       +        self.lock = threading.Lock()
                self.tx_event = threading.Event()
        
                #
       t@@ -571,7 +571,7 @@ class Wallet:
                    self.fee = int( d.get('fee') )
                    self.seed = d.get('seed')
                    self.server = d.get('server')
       -            blocks = d.get('blocks')
       +            #blocks = d.get('blocks')
                    self.addresses = d.get('addresses')
                    self.change_addresses = d.get('change_addresses')
                    self.history = d.get('history')
       t@@ -703,18 +703,21 @@ class Wallet:
                return status
        
            def receive_status_callback(self, addr, status):
       -        if self.get_status(addr) != status:
       -            #print "updating status for", addr, status
       -            self.addresses_waiting_for_history.append(addr)
       -            self.interface.get_history(addr)
       -        if addr in self.addresses_waiting_for_status: self.addresses_waiting_for_status.remove(addr)
       -
       -    def receive_history_callback(self, addr, data):
       +        with self.lock:
       +            if self.get_status(addr) != status:
       +                #print "updating status for", addr, status
       +                self.addresses_waiting_for_history.append(addr)
       +                self.interface.get_history(addr)
       +            if addr in self.addresses_waiting_for_status: 
       +                self.addresses_waiting_for_status.remove(addr)
       +
       +    def receive_history_callback(self, addr, data): 
                #print "updating history for", addr
       -        self.history[addr] = data
       -        self.update_tx_history()
       -        self.save()
       -        if addr in self.addresses_waiting_for_history: self.addresses_waiting_for_history.remove(addr)
       +        with self.lock:
       +            self.history[addr] = data
       +            self.update_tx_history()
       +            self.save()
       +            if addr in self.addresses_waiting_for_history: self.addresses_waiting_for_history.remove(addr)
        
            def get_tx_history(self):
                lines = self.tx_history.values()
       t@@ -929,116 +932,9 @@ class Wallet:
                return address, amount, label, message, signature, identity, url
        
        
       -
       -    def handle_response(self, r):
       -        if r is None:
       -            return
       -
       -        method = r['method']
       -        params = r['params']
       -        result = r['result']
       -
       -        if method == 'server.banner':
       -            self.banner = result
       -            self.was_updated = True
       -
       -        elif method == 'session.poll':
       -            # native poll
       -            blocks, changed_addresses = result 
       -            if blocks == -1: raise BaseException("session not found")
       -            self.blocks = int(blocks)
       -            if changed_addresses:
       -                self.was_updated = True
       -                for addr, status in changed_addresses.items():
       -                    self.receive_status_callback(addr, status)
       -
       -        elif method == 'server.peers.subscribe':
       -            servers = []
       -            for item in result:
       -                s = []
       -                host = item[1]
       -                if len(item)>2:
       -                    for v in item[2]:
       -                        if re.match("[thn]\d+",v):
       -                            s.append(host+":"+v[1:]+":"+v[0])
       -                    #if not s:
       -                    #    s.append(host+":50000:n")
       -                #else:
       -                #    s.append(host+":50000:n")
       -                servers = servers + s
       -            self.interface.servers = servers
       -
       -        elif method == 'blockchain.address.subscribe':
       -            addr = params[-1]
       -            self.receive_status_callback(addr, result)
       -                            
       -        elif method == 'blockchain.address.get_history':
       -            addr = params[0]
       -            self.receive_history_callback(addr, result)
       -            self.was_updated = True
       -
       -        elif method == 'blockchain.transaction.broadcast':
       -            self.tx_result = result
       -            self.tx_event.set()
       -
       -        elif method == 'blockchain.numblocks.subscribe':
       -            self.blocks = result
       -
       -        elif method == 'server.version':
       -            pass
       -
       -        else:
       -            print "unknown message:", method, params, result
       -
       -
            def update(self):
                self.interface.poke()
                self.up_to_date_event.wait()
        
        
       -    def run(self):
       -        while self.interface.is_connected:
       -            new_addresses = self.synchronize()
       -            if new_addresses:
       -                self.interface.subscribe(new_addresses)
       -                for addr in new_addresses:
       -                    self.addresses_waiting_for_status.append(addr)
        
       -            if self.is_up_to_date():
       -                self.up_to_date = True
       -                self.up_to_date_event.set()
       -            else:
       -                self.up_to_date = False
       -
       -            response = self.interface.responses.get(True,100000000000) # workaround so that it can be keyboard interrupted
       -            self.handle_response(response)
       -
       -
       -    def start_interface(self):
       -
       -        try:
       -            host, port, protocol = self.server.split(':')
       -            port = int(port)
       -        except:
       -            self.pick_random_server()
       -            host, port, protocol = self.server.split(':')
       -            port = int(port)
       -
       -        if protocol == 'n':
       -            InterfaceClass = NativeInterface
       -        elif protocol == 't':
       -            InterfaceClass = AsynchronousInterface
       -        elif protocol == 'h':
       -            InterfaceClass = HttpInterface
       -        else:
       -            print "unknown protocol"
       -            InterfaceClass = NativeInterface
       -
       -        self.interface = InterfaceClass(host, port)
       -        addresses = self.all_addresses()
       -        version = self.electrum_version
       -        for addr in addresses:
       -            self.addresses_waiting_for_status.append(addr)
       -        self.interface.start_session(addresses,version)
       -
       -