URI: 
       tbig refactoring of the interface addition of the wallet verifier class for SPV - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 2da0c0b77e109ba48321519870e78e223f291403
   DIR parent 1212982fec1b93332a736447cb6092472af945d2
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Sun, 21 Oct 2012 02:57:31 +0200
       
       big refactoring of the interface
       addition of the wallet verifier class for SPV
       
       Diffstat:
         M electrum                            |      11 +++++++----
         M lib/__init__.py                     |       5 +++--
         M lib/gui_lite.py                     |       2 +-
         M lib/gui_qt.py                       |       6 +++---
         M lib/interface.py                    |     342 +++++++++++++------------------
         M lib/wallet.py                       |     264 ++++++++++++++++++++++++++++---
         M scripts/blocks                      |       2 +-
         M scripts/peers                       |       4 ++--
         M scripts/servers                     |      20 ++++++++------------
       
       9 files changed, 411 insertions(+), 245 deletions(-)
       ---
   DIR diff --git a/electrum b/electrum
       t@@ -36,9 +36,9 @@ except ImportError:
            sys.exit("Error: AES does not seem to be installed. Try 'sudo pip install slowaes'")
        
        try:
       -    from lib import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server
       +    from lib import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server
        except ImportError:
       -    from electrum import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server
       +    from electrum import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server
        
        from decimal import Decimal
        
       t@@ -185,8 +185,11 @@ if __name__ == '__main__':
                    sys.exit("Error: Unknown GUI: " + pref_gui )
        
                gui = gui.ElectrumGui(wallet, config)
       -        interface = WalletSynchronizer(wallet, config, True, gui.server_list_changed)
       -        interface.start()
       +        wallet.interface = Interface(config, True, gui.server_list_changed)
       +        wallet.interface.start()
       +
       +        WalletSynchronizer(wallet, config).start()
       +        WalletVerifier(wallet, config).start()
        
                try:
                    found = config.wallet_file_exists
   DIR diff --git a/lib/__init__.py b/lib/__init__.py
       t@@ -1,4 +1,5 @@
       -from wallet import Wallet, format_satoshis
       -from interface import WalletSynchronizer, Interface, pick_random_server, DEFAULT_SERVERS
       +from util import format_satoshis
       +from wallet import Wallet, WalletSynchronizer, WalletVerifier
       +from interface import Interface, pick_random_server, DEFAULT_SERVERS
        from simple_config import SimpleConfig
        import bitcoin
   DIR diff --git a/lib/gui_lite.py b/lib/gui_lite.py
       t@@ -800,7 +800,7 @@ class MiniDriver(QObject):
                self.wallet = wallet
                self.window = window
        
       -        self.wallet.register_callback(self.update_callback)
       +        self.wallet.interface.register_callback(self.update_callback)
        
                self.state = None
        
   DIR diff --git a/lib/gui_qt.py b/lib/gui_qt.py
       t@@ -207,7 +207,7 @@ class ElectrumWindow(QMainWindow):
                QMainWindow.__init__(self)
                self.wallet = wallet
                self.config = config
       -        self.wallet.register_callback(self.update_callback)
       +        self.wallet.interface.register_callback(self.update_callback)
        
                self.detailed_view = config.get('qt_detailed_view', False)
        
       t@@ -1577,7 +1577,7 @@ class ElectrumGui:
                    wallet.init_mpk( wallet.seed )
                    wallet.up_to_date_event.clear()
                    wallet.up_to_date = False
       -            wallet.interface.poke()
       +            wallet.interface.poke('synchronizer')
                    waiting_dialog(waiting)
                    # run a dialog indicating the seed, ask the user to remember it
                    ElectrumWindow.show_seed_dialog(wallet)
       t@@ -1589,7 +1589,7 @@ class ElectrumGui:
                    wallet.init_mpk( wallet.seed )
                    wallet.up_to_date_event.clear()
                    wallet.up_to_date = False
       -            wallet.interface.poke()
       +            wallet.interface.poke('synchronizer')
                    waiting_dialog(waiting)
                    if wallet.is_found():
                        # history and addressbook
   DIR diff --git a/lib/interface.py b/lib/interface.py
       t@@ -28,11 +28,11 @@ DEFAULT_TIMEOUT = 5
        DEFAULT_SERVERS = [ 
            'electrum.novit.ro:50001:t', 
            'electrum.pdmc.net:50001:t',
       -    #'ecdsa.org:50002:s',
       +    'ecdsa.org:50001:t',
            'electrum.bitcoins.sk:50001:t',
            'uncle-enzo.info:50001:t',
            'electrum.bytesized-hosting.com:50001:t',
       -    'california.stratum.bitcoin.cz:50001:t',
       +    'electrum.bitcoin.cz:50001:t',
            'electrum.bitfoo.org:50001:t'
            ]
        
       t@@ -42,24 +42,22 @@ proxy_modes = ['socks4', 'socks5', 'http']
        def pick_random_server():
            return random.choice( DEFAULT_SERVERS )
        
       -def pick_random_interface(config):
       -    servers = DEFAULT_SERVERS
       -    while servers:
       -        server = random.choice( servers )
       -        servers.remove(server)
       -        config.set_key('server', server, False)
       -        i = Interface(config)
       -        if i.is_connected:
       -            return i
       -    raise BaseException('no server available')
        
        
        
       -class InterfaceAncestor(threading.Thread):
       +class Interface(threading.Thread):
        
       -    def __init__(self, host, port, proxy=None, use_ssl=True):
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       +    def register_callback(self, update_callback):
       +        with self.lock:
       +            self.update_callbacks.append(update_callback)
       +
       +    def trigger_callbacks(self):
       +        with self.lock:
       +            callbacks = self.update_callbacks[:]
       +        [update() for update in callbacks]
       +
       +
       +    def init_server(self, host, port, proxy=None, use_ssl=True):
                self.host = host
                self.port = port
                self.proxy = proxy
       t@@ -74,13 +72,9 @@ class InterfaceAncestor(threading.Thread):
        
                #json
                self.message_id = 0
       -        self.responses = Queue.Queue()
                self.unanswered_requests = {}
        
        
       -    def poke(self):
       -        # push a fake response so that the getting thread exits its loop
       -        self.responses.put(None)
        
            def queue_json_response(self, c):
        
       t@@ -95,12 +89,19 @@ class InterfaceAncestor(threading.Thread):
                    return
        
                if msg_id is not None:
       -            method, params = self.unanswered_requests.pop(msg_id)
       +            with self.lock: 
       +                method, params, channel = self.unanswered_requests.pop(msg_id)
                    result = c.get('result')
                else:
       -            # notification
       +            # notification. we should find the channel(s)..
                    method = c.get('method')
                    params = c.get('params')
       +            with self.lock:
       +                for k,v in self.subscriptions.items():
       +                    if (method, params) in v:
       +                        channel = k
       +                else:
       +                    raise
        
                    if method == 'blockchain.numblocks.subscribe':
                        result = params[0]
       t@@ -111,32 +112,29 @@ class InterfaceAncestor(threading.Thread):
                        result = params[1]
                        params = [addr]
        
       -        self.responses.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
       -
       -
       +        response_queue = self.responses[channel]
       +        response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
        
       -    def subscribe(self, addresses):
       -        messages = []
       -        for addr in addresses:
       -            messages.append(('blockchain.address.subscribe', [addr]))
       -        self.send(messages)
        
        
       +    def get_response(self, channel='default', block=True, timeout=10000000000):
       +        return self.responses[channel].get(block, timeout)
        
       +    def register_channel(self, channel):
       +        with self.lock:
       +            self.responses[channel] = Queue.Queue()
        
       +    def poke(self, channel):
       +        self.responses[channel].put(None)
        
       -class HttpStratumInterface(InterfaceAncestor):
       -    """ non-persistent connection. synchronous calls"""
        
       -    def __init__(self, host, port, proxy=None, use_ssl=True):
       -        InterfaceAncestor.__init__(self, host, port, proxy, use_ssl)
       +    def init_http(self, host, port, proxy=None, use_ssl=True):
       +        self.init_server(host, port, proxy, use_ssl)
                self.session_id = None
                self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
        
       -    def get_history(self, address):
       -        self.send([('blockchain.address.get_history', [address] )])
        
       -    def run(self):
       +    def run_http(self):
                self.is_connected = True
                while self.is_connected:
                    try:
       t@@ -152,13 +150,13 @@ class HttpStratumInterface(InterfaceAncestor):
                        break
                    
                self.is_connected = False
       -        self.poke()
        
                        
            def poll(self):
                self.send([])
        
       -    def send(self, messages):
       +
       +    def send_http(self, messages, channel='default'):
                import urllib2, json, time, cookielib
                
                if self.proxy:
       t@@ -177,7 +175,7 @@ class HttpStratumInterface(InterfaceAncestor):
                    method, params = m
                    if type(params) != type([]): params = [params]
                    data.append( { 'method':method, 'id':self.message_id, 'params':params } )
       -            self.unanswered_requests[self.message_id] = method, params
       +            self.unanswered_requests[self.message_id] = method, params, channel
                    self.message_id += 1
        
                if data:
       t@@ -221,14 +219,9 @@ class HttpStratumInterface(InterfaceAncestor):
        
        
        
       -class TcpStratumInterface(InterfaceAncestor):
       -    """json-rpc over persistent TCP connection, asynchronous"""
       +    def init_tcp(self, host, port, proxy=None, use_ssl=True):
       +        self.init_server(host, port, proxy, use_ssl)
        
       -    def __init__(self, host, port, proxy=None, use_ssl=True):
       -        InterfaceAncestor.__init__(self, host, port, proxy, use_ssl)
       -        self.init_socket()
       -
       -    def init_socket(self):
                import ssl
                global proxy_modes
                self.connection_msg = "%s:%d"%(self.host,self.port)
       t@@ -251,17 +244,18 @@ class TcpStratumInterface(InterfaceAncestor):
                    s.settimeout(60)
                    self.s = s
                    self.is_connected = True
       -            self.send([('server.version', [ELECTRUM_VERSION])])
                except:
                    self.is_connected = False
                    self.s = None
        
       -    def run(self):
       +
       +    def run_tcp(self):
                try:
                    out = ''
                    while self.is_connected:
                        try: msg = self.s.recv(1024)
                        except socket.timeout:
       +                    print "timeout"
                            # ping the server with server.version, as a real ping does not exist yet
                            self.send([('server.version', [ELECTRUM_VERSION])])
                            continue
       t@@ -283,17 +277,16 @@ class TcpStratumInterface(InterfaceAncestor):
                    traceback.print_exc(file=sys.stdout)
        
                self.is_connected = False
       -        print "Poking"
       -        self.poke()
        
       -    def send(self, messages):
       +
       +    def send_tcp(self, messages, channel='default'):
                """return the ids of the requests that we sent"""
                out = ''
                ids = []
                for m in messages:
                    method, params = m 
                    request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
       -            self.unanswered_requests[self.message_id] = method, params
       +            self.unanswered_requests[self.message_id] = method, params, channel
                    ids.append(self.message_id)
                    # uncomment to debug
                    # print "-->",request
       t@@ -304,18 +297,55 @@ class TcpStratumInterface(InterfaceAncestor):
                    out = out[sent:]
                return ids
        
       -    def get_history(self, addr):
       -        self.send([('blockchain.address.get_history', [addr])])
        
        
       -
       -class Interface(TcpStratumInterface, HttpStratumInterface):
       -    
       -    def __init__(self, config = None):
       +    def __init__(self, config=None, loop=False, servers_loaded_callback=None):
        
                if config is None:
                    from simple_config import SimpleConfig
                    config = SimpleConfig()
       +
       +        threading.Thread.__init__(self)
       +        self.daemon = True
       +        self.loop = loop
       +        self.config = config
       +        self.servers_loaded_callback = servers_loaded_callback
       +
       +        self.subscriptions = {}
       +        self.responses = {}
       +        self.responses['default'] = Queue.Queue()
       +
       +        self.update_callbacks = []
       +        self.lock = threading.Lock()
       +        self.init_interface()
       +
       +
       +
       +    def init_interface(self):
       +        if self.config.get('server'):
       +            self.init_with_server(self.config)
       +        else:
       +            print "Using random server..."
       +            servers = DEFAULT_SERVERS
       +            while servers:
       +                server = random.choice( servers )
       +                servers.remove(server)
       +                self.config.set_key('server', server, False)
       +                self.init_with_server(self.config)
       +                if self.is_connected: break
       +
       +            if not servers:
       +                raise BaseException('no server available')
       +
       +        if self.is_connected:
       +            print "Connected to " + self.connection_msg
       +            self.send([('server.version', [ELECTRUM_VERSION])])
       +            #self.send([('server.banner',[])], 'synchronizer')
       +        else:
       +            print_error("Failed to connect " + self.connection_msg)
       +
       +
       +    def init_with_server(self, config):
                    
                s = config.get('server')
                host, port, protocol = s.split(':')
       t@@ -327,24 +357,41 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
        
                #print protocol, host, port
                if protocol in 'st':
       -            TcpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='s'))
       +            self.init_tcp(host, port, proxy, use_ssl=(protocol=='s'))
                elif protocol in 'gh':
       -            HttpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='g'))
       +            self.init_http(host, port, proxy, use_ssl=(protocol=='g'))
                else:
                    raise BaseException('Unknown protocol: %s'%protocol)
        
        
       -    def run(self):
       -        if self.protocol  in 'st':
       -            TcpStratumInterface.run(self)
       -        else:
       -            HttpStratumInterface.run(self)
       +    def send(self, messages, channel='default'):
       +
       +        sub = []
       +        for message in messages:
       +            m, v = message
       +            if m[-10:] == '.subscribe':
       +                sub.append(message)
       +
       +        if sub:
       +            with self.lock:
       +                if self.subscriptions.get(channel) is None: 
       +                    self.subscriptions[channel] = []
       +                self.subscriptions[channel] += sub
        
       -    def send(self, messages):
                if self.protocol in 'st':
       -            return TcpStratumInterface.send(self, messages)
       +            with self.lock:
       +                out = self.send_tcp(messages, channel)
                else:
       -            return HttpStratumInterface.send(self, messages)
       +            # do not use lock, http is synchronous
       +            out = self.send_http(messages, channel)
       +
       +        return out
       +
       +    def resend_subscriptions(self):
       +        for channel, messages in self.subscriptions.items():
       +            if messages:
       +                self.send(messages, channel)
       +
        
        
            def parse_proxy_options(self, s):
       t@@ -377,12 +424,30 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
                    print "changing server:", server, proxy
                    self.server = server
                    self.proxy = proxy
       +            if self.protocol in 'st':
       +                self.s.shutdown(socket.SHUT_RDWR)
       +                self.s.close()
                    self.is_connected = False  # this exits the polling loop
       -            self.poke()
        
        
       -    def is_up_to_date(self):
       -        return self.responses.empty() and not self.unanswered_requests
       +    def is_empty(self, channel):
       +        q = self.responses.get(channel)
       +        if q: 
       +            return q.empty()
       +        else:
       +            return True
       +
       +
       +    def get_pending_requests(self, channel):
       +        result = []
       +        with self.lock:
       +            for k, v in self.unanswered_requests.items():
       +                a, b, c = v
       +                if c == channel: result.append(k)
       +        return result
       +
       +    def is_up_to_date(self, channel):
       +        return self.is_empty(channel) and not self.get_pending_requests(channel)
        
        
            def synchronous_get(self, requests, timeout=100000000):
       t@@ -391,7 +456,7 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
                id2 = ids[:]
                res = {}
                while ids:
       -            r = self.responses.get(True, timeout)
       +            r = self.responses['default'].get(True, timeout)
                    _id = r.get('id')
                    if _id in ids:
                        ids.remove(_id)
       t@@ -403,130 +468,15 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
        
        
        
       +    def run(self):
       +        while True:
       +            self.run_tcp() if self.protocol in 'st' else self.run_http()
       +            self.trigger_callbacks()
       +            if not self.loop: break
        
       -class WalletSynchronizer(threading.Thread):
       -
       -    def __init__(self, wallet, config, loop=False, servers_loaded_callback=None):
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       -        self.wallet = wallet
       -        self.loop = loop
       -        self.config = config
       -        self.init_interface()
       -        self.servers_loaded_callback = servers_loaded_callback
       -
       -    def init_interface(self):
       -        if self.config.get('server'):
       -            self.interface = Interface(self.config)
       -        else:
       -            print "Using random server..."
       -            self.interface = pick_random_interface(self.config)
       -
       -        if self.interface.is_connected:
       -            print "Connected to " + self.interface.connection_msg
       -        else:
       -            print_error("Failed to connect " + self.interface.connection_msg)
       -
       -        self.wallet.interface = self.interface
       -
       -    def handle_response(self, r):
       -        if r is None:
       -            return
       -
       -        method = r['method']
       -        params = r['params']
       -        result = r['result']
       -
       -        if method == 'server.banner':
       -            self.wallet.banner = result
       -            self.wallet.was_updated = True
       -
       -        elif method == 'server.peers.subscribe':
       -            servers = []
       -            for item in result:
       -                s = []
       -                host = item[1]
       -                ports = []
       -                version = None
       -                if len(item) > 2:
       -                    for v in item[2]:
       -                        if re.match("[stgh]\d+", v):
       -                            ports.append((v[0], v[1:]))
       -                        if re.match("v(.?)+", v):
       -                            version = v[1:]
       -                if ports and version:
       -                    servers.append((host, ports))
       -            self.interface.servers = servers
       -            # servers_loaded_callback is None for commands, but should
       -            # NEVER be None when using the GUI.
       -            if self.servers_loaded_callback is not None:
       -                self.servers_loaded_callback()
       -
       -        elif method == 'blockchain.address.subscribe':
       -            addr = params[0]
       -            self.wallet.receive_status_callback(addr, result)
       -                            
       -        elif method == 'blockchain.address.get_history':
       -            addr = params[0]
       -            self.wallet.receive_history_callback(addr, result)
       -            self.wallet.was_updated = True
       -
       -        elif method == 'blockchain.transaction.broadcast':
       -            self.wallet.tx_result = result
       -            self.wallet.tx_event.set()
       -
       -        elif method == 'blockchain.numblocks.subscribe':
       -            self.wallet.blocks = result
       -            self.wallet.was_updated = True
       -
       -        elif method == 'server.version':
       -            pass
       -
       -        else:
       -            print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
       -
       -
       -    def start_interface(self):
       -        self.interface.start()
       -        if self.interface.is_connected:
       -            self.wallet.start_session(self.interface)
       -
       +            time.sleep(5)
       +            self.init_interface()
       +            self.resend_subscriptions()
        
        
       -    def run(self):
       -        import socket, time
       -        self.start_interface()
       -        while True:
       -            while self.interface.is_connected:
       -                new_addresses = self.wallet.synchronize()
       -                if new_addresses:
       -                    self.interface.subscribe(new_addresses)
       -
       -                if self.interface.is_up_to_date():
       -                    if not self.wallet.up_to_date:
       -                        self.wallet.up_to_date = True
       -                        self.wallet.was_updated = True
       -                        self.wallet.up_to_date_event.set()
       -                else:
       -                    if self.wallet.up_to_date:
       -                        self.wallet.up_to_date = False
       -                        self.wallet.was_updated = True
       -
       -                if self.wallet.was_updated:
       -                    self.wallet.trigger_callbacks()
       -                    self.wallet.was_updated = False
       -
       -                response = self.interface.responses.get()
       -                self.handle_response(response)
       -
       -            self.wallet.trigger_callbacks()
       -            if self.loop:
       -                time.sleep(5)
       -                # Server has been changed. Copy callback for new interface.
       -                self.proxy = self.interface.proxy
       -                self.init_interface()
       -                self.start_interface()
       -                continue
       -            else:
       -                break
        
   DIR diff --git a/lib/wallet.py b/lib/wallet.py
       t@@ -28,6 +28,7 @@ import threading
        import random
        import aes
        import ecdsa
       +import Queue
        
        from ecdsa.util import string_to_number, number_to_string
        from util import print_error, user_dir, format_satoshis
       t@@ -50,7 +51,6 @@ class Wallet:
        
                self.config = config
                self.electrum_version = ELECTRUM_VERSION
       -        self.update_callbacks = []
        
                # saved fields
                self.seed_version          = config.get('seed_version', SEED_VERSION)
       t@@ -94,16 +94,6 @@ class Wallet:
                    raise ValueError("This wallet seed is deprecated. Please run upgrade.py for a diagnostic.")
        
        
       -    def register_callback(self, update_callback):
       -        with self.lock:
       -            self.update_callbacks.append(update_callback)
       -
       -    def trigger_callbacks(self):
       -        with self.lock:
       -            callbacks = self.update_callbacks[:]
       -        [update() for update in callbacks]
       -
       -
            def import_key(self, keypair, password):
                address, key = keypair.split(':')
                if not self.is_valid(address):
       t@@ -480,7 +470,8 @@ class Wallet:
                    return s
        
            def get_status(self, address):
       -        h = self.history.get(address)
       +        with self.lock:
       +            h = self.history.get(address)
                if not h:
                    status = None
                else:
       t@@ -490,11 +481,6 @@ class Wallet:
                        status = status + ':%d'% len(h)
                return status
        
       -    def receive_status_callback(self, addr, status):
       -        with self.lock:
       -            if self.get_status(addr) != status:
       -                #print "updating status for", addr, status
       -                self.interface.get_history(addr)
        
            def receive_history_callback(self, addr, data): 
                #print "updating history for", addr
       t@@ -504,10 +490,26 @@ class Wallet:
                    self.save()
        
            def get_tx_history(self):
       -        lines = self.tx_history.values()
       +        with self.lock:
       +            lines = self.tx_history.values()
                lines = sorted(lines, key=operator.itemgetter("timestamp"))
                return lines
        
       +    def get_tx_hashes(self):
       +        with self.lock:
       +            hashes = self.tx_history.keys()
       +        return hashes
       +
       +    def get_transactions_at_height(self, height):
       +        with self.lock:
       +            values = self.tx_history.values()[:]
       +
       +        out = []
       +        for tx in values:
       +            if tx['height'] == height:
       +                out.append(tx['tx_hash'])
       +        return out
       +
            def update_tx_history(self):
                self.tx_history= {}
                for addr in self.all_addresses():
       t@@ -751,12 +753,6 @@ class Wallet:
                self.up_to_date_event.wait(10000000000)
        
        
       -    def start_session(self, interface):
       -        self.interface = interface
       -        self.interface.send([('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])])
       -        self.interface.subscribe(self.all_addresses())
       -
       -
            def freeze(self,addr):
                if addr in self.all_addresses() and addr not in self.frozen_addresses:
                    self.unprioritize(addr)
       t@@ -816,3 +812,223 @@ class Wallet:
                for k, v in s.items():
                    self.config.set_key(k,v)
                self.config.save()
       +
       +
       +
       +
       +
       +
       +class WalletSynchronizer(threading.Thread):
       +
       +
       +    def __init__(self, wallet, config):
       +        threading.Thread.__init__(self)
       +        self.daemon = True
       +        self.wallet = wallet
       +        self.interface = self.wallet.interface
       +        self.interface.register_channel('synchronizer')
       +
       +
       +    def synchronize_wallet(self):
       +        new_addresses = self.wallet.synchronize()
       +        if new_addresses:
       +            self.subscribe_to_addresses(new_addresses)
       +            
       +        if self.interface.is_up_to_date('synchronizer'):
       +            if not self.wallet.up_to_date:
       +                self.wallet.up_to_date = True
       +                self.wallet.was_updated = True
       +                self.wallet.up_to_date_event.set()
       +        else:
       +            if self.wallet.up_to_date:
       +                self.wallet.up_to_date = False
       +                self.wallet.was_updated = True
       +
       +        if self.wallet.was_updated:
       +            self.interface.trigger_callbacks()
       +            self.wallet.was_updated = False
       +
       +
       +    def subscribe_to_addresses(self, addresses):
       +        messages = []
       +        for addr in addresses:
       +            messages.append(('blockchain.address.subscribe', [addr]))
       +        self.interface.send( messages, 'synchronizer')
       +
       +
       +    def run(self):
       +
       +        # subscriptions
       +        self.interface.send([('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])], 'synchronizer')
       +        self.subscribe_to_addresses(self.wallet.all_addresses())
       +
       +        while True:
       +            # 1. send new requests
       +            self.synchronize_wallet()
       +
       +            # 2. get a response
       +            r = self.interface.get_response('synchronizer')
       +            if not r: continue
       +
       +            # 3. handle response
       +            method = r['method']
       +            params = r['params']
       +            result = r['result']
       +
       +            if method == 'blockchain.address.subscribe':
       +                addr = params[0]
       +                if self.wallet.get_status(addr) != result:
       +                    self.interface.send([('blockchain.address.get_history', [address] )])
       +                            
       +            elif method == 'blockchain.address.get_history':
       +                addr = params[0]
       +                self.wallet.receive_history_callback(addr, result)
       +                self.wallet.was_updated = True
       +
       +            elif method == 'blockchain.transaction.broadcast':
       +                self.wallet.tx_result = result
       +                self.wallet.tx_event.set()
       +
       +            elif method == 'blockchain.numblocks.subscribe':
       +                self.wallet.blocks = result
       +                self.wallet.was_updated = True
       +
       +            elif method == 'server.banner':
       +                self.wallet.banner = result
       +                self.wallet.was_updated = True
       +
       +            elif method == 'server.peers.subscribe':
       +                servers = []
       +                for item in result:
       +                    s = []
       +                    host = item[1]
       +                    ports = []
       +                    version = None
       +                    if len(item) > 2:
       +                        for v in item[2]:
       +                            if re.match("[stgh]\d+", v):
       +                                ports.append((v[0], v[1:]))
       +                            if re.match("v(.?)+", v):
       +                                version = v[1:]
       +                    if ports and version:
       +                        servers.append((host, ports))
       +                self.interface.servers = servers
       +
       +                # servers_loaded_callback is None for commands, but should
       +                # NEVER be None when using the GUI.
       +                #if self.servers_loaded_callback is not None:
       +                #    self.servers_loaded_callback()
       +
       +            elif method == 'server.version':
       +                pass
       +
       +            else:
       +                print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
       +
       +
       +encode = lambda x: x[::-1].encode('hex')
       +decode = lambda x: x.decode('hex')[::-1]
       +from bitcoin import Hash, rev_hex, int_to_hex
       +
       +class WalletVerifier(threading.Thread):
       +
       +    def __init__(self, wallet, config):
       +        threading.Thread.__init__(self)
       +        self.daemon = True
       +        self.wallet = wallet
       +        self.interface = self.wallet.interface
       +        self.interface.register_channel('verifier')
       +        self.validated = []
       +        self.merkle_roots = {}
       +        self.headers = {}
       +        self.lock = threading.Lock()
       +
       +    def run(self):
       +        requested = []
       +
       +        while True:
       +            txlist = self.wallet.get_tx_hashes()
       +            for tx in txlist:
       +                if tx not in requested:
       +                    requested.append(tx)
       +                    self.request_merkle(tx)
       +                    break
       +            try:
       +                r = self.interface.get_response('verifier',timeout=1)
       +            except Queue.Empty:
       +                continue
       +
       +            # 3. handle response
       +            method = r['method']
       +            params = r['params']
       +            result = r['result']
       +
       +            if method == 'blockchain.transaction.get_merkle':
       +                tx_hash = params[0]
       +                tx_height = result.get('block_height')
       +                self.merkle_roots[tx_hash] = self.hash_merkle_root(result['merkle'], tx_hash)
       +                # if we already have the header, check merkle root directly
       +                header = self.headers.get(tx_height)
       +                if header:
       +                    self.validated.append(tx_hash)
       +                    assert header.get('merkle_root') == self.merkle_roots[tx_hash]
       +                self.request_headers(tx_height) 
       +
       +            elif method == 'blockchain.block.get_header':
       +                self.validate_header(result)
       +
       +
       +    def request_merkle(self, tx_hash):
       +        self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash]) ], 'verifier')
       +        
       +
       +    def request_headers(self, tx_height, delta=10):
       +        headers_requests = []
       +        for height in range(tx_height-delta,tx_height+delta): # we might can request blocks that do not exist yet
       +            if height not in self.headers:
       +                headers_requests.append( ('blockchain.block.get_header',[height]) )
       +        self.interface.send(headers_requests,'verifier')
       +
       +
       +    def validate_header(self, header):
       +        """ if there is a previous or a next block in the list, check the hash"""
       +        height = header.get('block_height')
       +        with self.lock:
       +            self.headers[height] = header # detect conflicts
       +            prev_header = next_header = None
       +            if height-1 in self.headers:
       +                prev_header = self.headers[height-1]
       +            if height+1 in self.headers:
       +                next_header = self.headers[height+1]
       +
       +        if prev_header:
       +            prev_hash = self.hash_header(prev_header)
       +            assert prev_hash == header.get('prev_block_hash')
       +        if next_header:
       +            _hash = self.hash_header(header)
       +            assert _hash == next_header.get('prev_block_hash')
       +            
       +        # check if there are transactions at that height
       +        for tx_hash in self.wallet.get_transactions_at_height(height):
       +            if tx_hash in self.validated: continue
       +            # check if we already have the merkle root
       +            merkle_root = self.merkle_roots.get(tx_hash)
       +            if merkle_root:
       +                self.validated.append(tx_hash)
       +                assert header.get('merkle_root') == merkle_root
       +
       +    def hash_header(self, res):
       +        header = int_to_hex(res.get('version'),4) \
       +            + rev_hex(res.get('prev_block_hash')) \
       +            + rev_hex(res.get('merkle_root')) \
       +            + int_to_hex(int(res.get('timestamp')),4) \
       +            + int_to_hex(int(res.get('bits')),4) \
       +            + int_to_hex(int(res.get('nonce')),4)
       +        return rev_hex(Hash(header.decode('hex')).encode('hex'))
       +
       +    def hash_merkle_root(self, merkle_s, target_hash):
       +        h = decode(target_hash)
       +        for item in merkle_s:
       +            is_left = item[0] == 'L'
       +            h = Hash( h + decode(item[1:]) ) if is_left else Hash( decode(item[1:]) + h )
       +        return encode(h)
   DIR diff --git a/scripts/blocks b/scripts/blocks
       t@@ -8,7 +8,7 @@ i.send([('blockchain.numblocks.subscribe',[])])
        
        while True:
            try:
       -        r = i.responses.get(True, 100000000000)
       +        r = i.get_response()
            except KeyboardInterrupt:
                break
            if r.get('method') == 'blockchain.numblocks.subscribe':
   DIR diff --git a/scripts/peers b/scripts/peers
       t@@ -2,10 +2,10 @@
        
        from electrum import Interface
        
       -i = Interface({'server':'electrum.novit.ro:50001:t'})
       +i = Interface({'server':'ecdsa.org:50001:t'})
        i.start()
        i.send([('server.peers.subscribe',[])])
        
        while True:
       -    r = i.responses.get(True, 100000000000)
       +    r = i.get_response()
            print r.get('result')
   DIR diff --git a/scripts/servers b/scripts/servers
       t@@ -5,12 +5,12 @@ import time, Queue
        
        servers = DEFAULT_SERVERS
        interfaces = map ( lambda server: Interface({'server':server} ), servers )
       -results = []
        
        for i in interfaces:
            if i.is_connected:
                i.start()
                i.send([('blockchain.numblocks.subscribe',[])])
       +        i.status = "timed out"
            else:
                servers.remove(i.server)
                i.status = "unreachable"
       t@@ -18,29 +18,25 @@ for i in interfaces:
        for i in interfaces:
            while True:
                try:
       -            r = i.responses.get(True,1)
       +            r = i.get_response(timeout=1)
                except Queue.Empty:
                    break
        
                if r.get('method') == 'blockchain.numblocks.subscribe':
       -            results.append((i.host, r.get('result')))
       -            i.status = "ok"
                    servers.remove(i.server)
       +            i.status = "ok"
       +            i.blocks = r.get('result')
                    break
        
       -for s in servers:
       -    i.status = "timed out"
        
        from collections import defaultdict
        d = defaultdict(int)
       -for e in results:
       -    d[e[1]] += 1
       +for i in interfaces:
       +    if i.status == 'ok':
       +        d[i.blocks] += 1
        v = d.values()
        numblocks = d.keys()[v.index(max(v))]
        
        for i in interfaces:
       -    print i.host, i.status
       -
       -for s,n in results:
       -    print "%30s   %d   "%(s, n), "ok" if abs(n-numblocks)<2 else "lagging"
       +    print "%30s   %s   "%(i.host, i.status) #,  "ok" if abs(n-numblocks)<2 else "lagging"