URI: 
       tseparate blockchain and network - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 328315f94baeccaa646c713a5626960709879c0a
   DIR parent ac26abfed3e9f4d36b4419c90ca9f48aa285c4ee
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Sun,  8 Sep 2013 17:23:01 +0200
       
       separate blockchain and network
       
       Diffstat:
         M electrum                            |      14 +++++---------
         M gui/gui_classic.py                  |      13 ++++++-------
         M lib/__init__.py                     |       2 +-
         M lib/blockchain.py                   |     275 +++++++++++++------------------
         M lib/interface.py                    |      78 +++++++++++--------------------
         A lib/network.py                      |     121 +++++++++++++++++++++++++++++++
         M lib/wallet.py                       |       9 +++++----
         M setup.py                            |      19 ++++++++++---------
       
       8 files changed, 288 insertions(+), 243 deletions(-)
       ---
   DIR diff --git a/electrum b/electrum
       t@@ -128,18 +128,14 @@ if __name__ == '__main__':
                    #sys.exit("Error: Unknown GUI: " + gui_name )
                
                # network interface
       -        interface = Interface(config, True)
       -        interface.start(wait = False)
       -        interface.send([('server.peers.subscribe',[])])
       +        network = Network(config)
       +        network.start()
       +        #interface.send([('server.peers.subscribe',[])])
        
       -        blockchain = BlockchainVerifier(interface, config)
       -        blockchain.start()
       -
       -        gui = gui.ElectrumGui(config, interface, blockchain)
       +        gui = gui.ElectrumGui(config, network)
                gui.main(url)
                
       -        interface.stop()
       -        blockchain.stop()
       +        network.stop()
        
                # we use daemon threads, their termination is enforced.
                # this sleep command gives them time to terminate cleanly. 
   DIR diff --git a/gui/gui_classic.py b/gui/gui_classic.py
       t@@ -568,8 +568,6 @@ class ElectrumWindow(QMainWindow):
                    self.config.set_key('io_dir', os.path.dirname(fileName), True)
                return fileName
        
       -
       -
            def close(self):
                QMainWindow.close(self)
                self.run_hook('close_main_window')
       t@@ -1367,7 +1365,7 @@ class ElectrumWindow(QMainWindow):
                console.history = self.config.get("console-history",[])
                console.history_index = len(console.history)
        
       -        console.updateNamespace({'wallet' : self.wallet, 'interface' : self.wallet.interface, 'gui':self})
       +        console.updateNamespace({'wallet' : self.wallet, 'network' : self.wallet.network, 'gui':self})
                console.updateNamespace({'util' : util, 'bitcoin':bitcoin})
        
                c = commands.Commands(self.wallet, self.wallet.interface, lambda: self.console.set_json(True))
       t@@ -2258,10 +2256,11 @@ class OpenFileEventFilter(QObject):
        
        class ElectrumGui:
        
       -    def __init__(self, config, interface, blockchain, app=None):
       -        self.interface = interface
       +    def __init__(self, config, network, app=None):
       +        self.network = network
       +        #self.interface = interface
                self.config = config
       -        self.blockchain = blockchain
       +        #self.blockchain = network.blockchain
                self.windows = []
                self.efilter = OpenFileEventFilter(self.windows)
                if app is None:
       t@@ -2281,7 +2280,7 @@ class ElectrumGui:
                else:
                    wallet = Wallet(storage)
        
       -        wallet.start_threads(self.interface, self.blockchain)
       +        wallet.start_threads(self.network)
        
                s = Timer()
                s.start()
   DIR diff --git a/lib/__init__.py b/lib/__init__.py
       t@@ -3,7 +3,7 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos
        from wallet import WalletSynchronizer, WalletStorage
        from wallet_factory import WalletFactory as Wallet
        from verifier import TxVerifier
       -from blockchain import BlockchainVerifier
       +from network import Network
        from interface import Interface, pick_random_server, DEFAULT_SERVERS
        from simple_config import SimpleConfig
        import bitcoin
   DIR diff --git a/lib/blockchain.py b/lib/blockchain.py
       t@@ -22,10 +22,9 @@ from util import user_dir, appdata_dir, print_error
        from bitcoin import *
        
        
       -class BlockchainVerifier(threading.Thread):
       -    """ Simple Payment Verification """
       +class Blockchain(threading.Thread):
        
       -    def __init__(self, interface, config):
       +    def __init__(self, config):
                threading.Thread.__init__(self)
                self.daemon = True
                self.config = config
       t@@ -34,112 +33,62 @@ class BlockchainVerifier(threading.Thread):
                self.local_height = 0
                self.running = False
                self.headers_url = 'http://headers.electrum.org/blockchain_headers'
       -        self.interface = interface
       -        interface.register_channel('verifier')
                self.set_local_height()
       +        self.queue = Queue.Queue()
        
       -
       -
       -    def start_interfaces(self):
       -        import interface
       -        servers = interface.DEFAULT_SERVERS
       -        servers = interface.filter_protocol(servers,'s')
       -        print_error("using %d servers"% len(servers))
       -        self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers )
       -
       -        for i in self.interfaces:
       -            i.start()
       -            # subscribe to block headers
       -            i.register_channel('verifier')
       -            i.register_channel('get_header')
       -            i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
       -            # note: each interface should send its results directly to a queue, instead of channels
       -            # pass the queue to the interface, so that several can share the same queue
       -
       -
       -    def get_new_response(self):
       -        # listen to interfaces, forward to verifier using the queue
       -        while self.is_running():
       -            for i in self.interfaces:
       -                try:
       -                    r = i.get_response('verifier',timeout=0)
       -                except Queue.Empty:
       -                    continue
       -
       -                result = r.get('result')
       -                if result:
       -                    return (i,result)
       -
       -            time.sleep(1)
       -
       -
       -
       -
       +    
            def stop(self):
                with self.lock: self.running = False
       -        #self.interface.poke('verifier')
       +
        
            def is_running(self):
                with self.lock: return self.running
        
        
       -    def request_header(self, i, h):
       -        print_error("requesting header %d from %s"%(h, i.server))
       -        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
       +    def run(self):
       +        self.init_headers_file()
       +        self.set_local_height()
       +        print_error( "blocks:", self.local_height )
       +
       +        with self.lock:
       +            self.running = True
       +
       +        while self.is_running():
        
       -    def retrieve_header(self, i):
       -        while True:
                    try:
       -                r = i.get_response('get_header',timeout=1)
       +                i, result = self.queue.get()
                    except Queue.Empty:
       -                print_error('timeout')
                        continue
        
       -            if r.get('error'):
       -                print_error('Verifier received an error:', r)
       -                continue
       -
       -            # 3. handle response
       -            method = r['method']
       -            params = r['params']
       -            result = r['result']
       +            header= result.get('result')
       +            #print_error( i.server, header )
       +            height = header.get('block_height')
        
       -            if method == 'blockchain.block.get_header':
       -                return result
       -                
       +            if height > self.local_height + 50:
       +                self.get_chunks(i, header, height)
       +                i.trigger_callback('updated')
        
       -    def get_chain(self, interface, final_header):
       +            if height > self.local_height:
       +                # get missing parts from interface (until it connects to my chain)
       +                chain = self.get_chain( i, header )
        
       -        header = final_header
       -        chain = [ final_header ]
       -        requested_header = False
       -        
       -        while self.is_running():
       +                # skip that server if the result is not consistent
       +                if not chain: continue
       +                
       +                # verify the chain
       +                if self.verify_chain( chain ):
       +                    print_error("height:", height, i.server)
       +                    for header in chain:
       +                        self.save_header(header)
       +                        self.height = height
       +                else:
       +                    print_error("error", i.server)
       +                    # todo: dismiss that server
        
       -            if requested_header:
       -                header = self.retrieve_header(interface)
       -                if not header: return
       -                chain = [ header ] + chain
       -                requested_header = False
       +                i.trigger_callback('updated')
        
       -            height = header.get('block_height')
       -            previous_header = self.read_header(height -1)
       -            if not previous_header:
       -                self.request_header(interface, height - 1)
       -                requested_header = True
       -                continue
        
       -            # verify that it connects to my chain
       -            prev_hash = self.hash_header(previous_header)
       -            if prev_hash != header.get('prev_block_hash'):
       -                print_error("reorg")
       -                self.request_header(interface, height - 1)
       -                requested_header = True
       -                continue
        
       -            else:
       -                # the chain is complete
       -                return chain
                            
                    
            def verify_chain(self, chain):
       t@@ -166,37 +115,6 @@ class BlockchainVerifier(threading.Thread):
                return True
        
        
       -    def get_chunks(self, i, header, height):
       -        requested_chunks = []
       -        min_index = (self.local_height + 1)/2016
       -        max_index = (height + 1)/2016
       -        for n in range(min_index, max_index + 1):
       -            print_error( "requesting chunk", n )
       -            i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
       -            requested_chunks.append(n)
       -            break
       -
       -        while requested_chunks:
       -            try:
       -                r = i.get_response('get_header',timeout=1)
       -            except Queue.Empty:
       -                continue
       -            if not r: continue
       -
       -            if r.get('error'):
       -                print_error('Verifier received an error:', r)
       -                continue
       -
       -            # 3. handle response
       -            method = r['method']
       -            params = r['params']
       -            result = r['result']
       -
       -            if method == 'blockchain.block.get_chunk':
       -                index = params[0]
       -                self.verify_chunk(index, result)
       -                requested_chunks.remove(index)
       -
        
            def verify_chunk(self, index, hexdata):
                data = hexdata.decode('hex')
       t@@ -259,8 +177,6 @@ class BlockchainVerifier(threading.Thread):
                return True
                
        
       -            
       -
            def header_to_string(self, res):
                s = int_to_hex(res.get('version'),4) \
                    + rev_hex(res.get('prev_block_hash')) \
       t@@ -383,65 +299,100 @@ class BlockchainVerifier(threading.Thread):
                return new_bits, new_target
        
        
       +    def request_header(self, i, h):
       +        print_error("requesting header %d from %s"%(h, i.server))
       +        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
        
       +    def retrieve_header(self, i):
       +        while True:
       +            try:
       +                r = i.get_response('get_header',timeout=1)
       +            except Queue.Empty:
       +                print_error('timeout')
       +                continue
        
       -    def run(self):
       -        self.start_interfaces()
       -        
       -        self.init_headers_file()
       -        self.set_local_height()
       -        print_error( "blocks:", self.local_height )
       +            if r.get('error'):
       +                print_error('Verifier received an error:', r)
       +                continue
        
       -        with self.lock:
       -            self.running = True
       +            # 3. handle response
       +            method = r['method']
       +            params = r['params']
       +            result = r['result']
       +
       +            if method == 'blockchain.block.get_header':
       +                return result
       +                
       +
       +
       +    def get_chain(self, interface, final_header):
        
       +        header = final_header
       +        chain = [ final_header ]
       +        requested_header = False
       +        
                while self.is_running():
        
       -            i, header = self.get_new_response()
       -            
       +            if requested_header:
       +                header = self.retrieve_header(interface)
       +                if not header: return
       +                chain = [ header ] + chain
       +                requested_header = False
       +
                    height = header.get('block_height')
       +            previous_header = self.read_header(height -1)
       +            if not previous_header:
       +                self.request_header(interface, height - 1)
       +                requested_header = True
       +                continue
        
       -            if height > self.local_height + 50:
       -                self.get_chunks(i, header, height)
       -                self.interface.trigger_callback('updated')
       +            # verify that it connects to my chain
       +            prev_hash = self.hash_header(previous_header)
       +            if prev_hash != header.get('prev_block_hash'):
       +                print_error("reorg")
       +                self.request_header(interface, height - 1)
       +                requested_header = True
       +                continue
        
       -            if height > self.local_height:
       -                # get missing parts from interface (until it connects to my chain)
       -                chain = self.get_chain( i, header )
       +            else:
       +                # the chain is complete
       +                return chain
        
       -                # skip that server if the result is not consistent
       -                if not chain: continue
       -                
       -                # verify the chain
       -                if self.verify_chain( chain ):
       -                    print_error("height:", height, i.server)
       -                    for header in chain:
       -                        self.save_header(header)
       -                        self.height = height
       -                else:
       -                    print_error("error", i.server)
       -                    # todo: dismiss that server
        
       -                self.interface.trigger_callback('updated')
       -    
       +    def get_chunks(self, i, header, height):
       +        requested_chunks = []
       +        min_index = (self.local_height + 1)/2016
       +        max_index = (height + 1)/2016
       +        for n in range(min_index, max_index + 1):
       +            print_error( "requesting chunk", n )
       +            i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
       +            requested_chunks.append(n)
       +            break
        
       +        while requested_chunks:
       +            try:
       +                r = i.get_response('get_header',timeout=1)
       +            except Queue.Empty:
       +                continue
       +            if not r: continue
        
       +            if r.get('error'):
       +                print_error('Verifier received an error:', r)
       +                continue
        
       -if __name__ == "__main__":
       -    import interface, simple_config
       -    
       -    config = simple_config.SimpleConfig({'verbose':True})
       +            # 3. handle response
       +            method = r['method']
       +            params = r['params']
       +            result = r['result']
       +
       +            if method == 'blockchain.block.get_chunk':
       +                index = params[0]
       +                self.verify_chunk(index, result)
       +                requested_chunks.remove(index)
        
       -    i0 = interface.Interface()
       -    i0.start()
        
       -    bv = BlockchainVerifier(i0, config)
       -    bv.start()
        
        
       -    # listen to interfaces, forward to verifier using the queue
       -    while 1:
       -        time.sleep(1)
        
        
        
   DIR diff --git a/lib/interface.py b/lib/interface.py
       t@@ -66,18 +66,21 @@ def pick_random_server():
        
        class Interface(threading.Thread):
        
       +
            def register_callback(self, event, callback):
                with self.lock:
                    if not self.callbacks.get(event):
                        self.callbacks[event] = []
                    self.callbacks[event].append(callback)
        
       +
            def trigger_callback(self, event):
                with self.lock:
                    callbacks = self.callbacks.get(event,[])[:]
                if callbacks:
                    [callback() for callback in callbacks]
        
       +
            def init_server(self, host, port, proxy=None, use_ssl=True):
                self.host = host
                self.port = port
       t@@ -188,16 +191,19 @@ class Interface(threading.Thread):
                            return
                        
                response_queue = self.responses[channel]
       -        response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
       +        response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id}))
        
        
        
            def get_response(self, channel='default', block=True, timeout=10000000000):
       -        return self.responses[channel].get(block, timeout)
       +        i, r = self.responses[channel].get(block, timeout)
       +        return r
        
       -    def register_channel(self, channel):
       +    def register_channel(self, channel, queue=None):
       +        if queue is None:
       +            queue = Queue.Queue()
                with self.lock:
       -            self.responses[channel] = Queue.Queue()
       +            self.responses[channel] = queue
        
            def poke(self, channel):
                self.responses[channel].put(None)
       t@@ -418,7 +424,7 @@ class Interface(threading.Thread):
        
        
        
       -    def __init__(self, config=None, loop=False):
       +    def __init__(self, config=None):
                self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
                self.proxy = None
        
       t@@ -428,7 +434,6 @@ class Interface(threading.Thread):
        
                threading.Thread.__init__(self)
                self.daemon = True
       -        self.loop = loop
                self.config = config
                self.connect_event = threading.Event()
        
       t@@ -457,32 +462,11 @@ class Interface(threading.Thread):
                    if self.config.get('auto_cycle') is None:
                        self.config.set_key('auto_cycle', True, False)
        
       -        if not self.is_connected and self.config.get('auto_cycle'):
       -            print_msg("Using random server...")
       -            servers = filter_protocol(DEFAULT_SERVERS, 's')
       -            while servers:
       -                server = random.choice( servers )
       -                servers.remove(server)
       -                print server
       -                self.config.set_key('server', server, False)
       -                self.init_with_server(self.config)
       -                if self.is_connected: break
       -
       -            if not self.is_connected:
       -                print 'no server available'
       -                self.connect_event.set() # to finish start
       -                self.server = 'ecdsa.org:50001:t'
       -                self.proxy = None
       -                return
       +        if not self.is_connected: 
       +            self.connect_event.set()
       +            return
        
                self.connect_event.set()
       -        if self.is_connected:
       -            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
       -            self.send([('server.banner',[])])
       -            self.trigger_callback('connected')
       -        else:
       -            self.trigger_callback('notconnected')
       -            #print_error("Failed to connect " + self.connection_msg)
        
        
            def init_with_server(self, config):
       t@@ -532,12 +516,6 @@ class Interface(threading.Thread):
        
                return out
        
       -    def resend_subscriptions(self):
       -        for channel, messages in self.subscriptions.items():
       -            if messages:
       -                self.send(messages, channel)
       -
       -
        
            def parse_proxy_options(self, s):
                if type(s) == type({}): return s  # fixme: type should be fixed
       t@@ -625,26 +603,24 @@ class Interface(threading.Thread):
                return out
        
        
       -    def start(self, wait=True):
       +    def start(self, queue):
       +        self.queue = queue
                threading.Thread.start(self)
       -        if wait:
       -            # wait until connection is established
       -            self.connect_event.wait()
       -            if not self.is_connected:
       -                return False
       -        return True
       +
       +
        
            def run(self):
       -        while True:
       -            self.init_interface()
       -            if self.is_connected:
       -                self.resend_subscriptions()
       -                self.run_tcp() if self.protocol in 'st' else self.run_http()
       +        self.init_interface()
       +        if self.is_connected:
       +            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
       +            self.change_status()
       +            self.run_tcp() if self.protocol in 'st' else self.run_http()
       +        self.change_status()
       +        
       +    def change_status(self):
       +        self.queue.put(self)
        
       -            self.trigger_callback('disconnected')
        
       -            if not self.loop: break
       -            time.sleep(5)
        
        
        
   DIR diff --git a/lib/network.py b/lib/network.py
       t@@ -0,0 +1,121 @@
       +import interface
       +from blockchain import Blockchain
       +import threading, time, Queue, os, sys, shutil
       +from util import user_dir, appdata_dir, print_error
       +from bitcoin import *
       +
       +
       +class Network(threading.Thread):
       +
       +    def __init__(self, config):
       +        threading.Thread.__init__(self)
       +        self.daemon = True
       +        self.config = config
       +        self.lock = threading.Lock()
       +        self.blockchain = Blockchain(config)
       +        self.interfaces = {}
       +        self.queue = Queue.Queue()
       +        self.default_server = self.config.get('server')
       +        self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
       +
       +
       +
       +    def start_interfaces(self):
       +
       +        for server in self.servers_list:
       +            self.interfaces[server] = interface.Interface({'server':server})
       +
       +        for i in self.interfaces.values():
       +            i.start(self.queue)
       +
       +        if self.default_server:
       +            self.interface = interface.Interface({'server':self.default_server})
       +            self.interface.start(self.queue)
       +        else:
       +            self.interface = self.interfaces[0]
       +
       +
       +
       +
       +
       +
       +    def run(self):
       +        self.blockchain.start()
       +        self.start_interfaces()
       +
       +        with self.lock:
       +            self.running = True
       +
       +        while self.is_running():
       +            i = self.queue.get()
       +
       +            if i.is_connected:
       +                i.register_channel('verifier', self.blockchain.queue)
       +                i.register_channel('get_header')
       +                i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
       +                if i == self.interface:
       +                    i.send([('server.banner',[])])
       +                    i.send([('server.peers.subscribe',[])])
       +            else:
       +                self.interfaces.pop(i.server)
       +                if i == self.interface:
       +                    if self.default_server is None:
       +                        print_msg("Using random server...")
       +                        server = random.choice( self.servers_list )
       +                        self.interface = interface.Interface({'server':self.default_server})
       +                    else:
       +                        #i.trigger_callback('disconnected')
       +                        pass
       +
       +    def on_peers(self, resut):
       +        pass
       +
       +    def on_banner(self, result):
       +        pass
       +
       +    def stop(self):
       +        with self.lock: self.running = False
       +
       +    def is_running(self):
       +        with self.lock: return self.running
       +
       +
       +    def resend_subscriptions(self):
       +        for channel, messages in self.subscriptions.items():
       +            if messages:
       +                self.send(messages, channel)
       +
       +
       +    def auto_cycle(self):
       +        if not self.is_connected and self.config.get('auto_cycle'):
       +            print_msg("Using random server...")
       +            servers = filter_protocol(DEFAULT_SERVERS, 's')
       +            while servers:
       +                server = random.choice( servers )
       +                servers.remove(server)
       +                print server
       +                self.config.set_key('server', server, False)
       +                self.init_with_server(self.config)
       +                if self.is_connected: break
       +
       +            if not self.is_connected:
       +                print 'no server available'
       +                self.connect_event.set() # to finish start
       +                self.server = 'ecdsa.org:50001:t'
       +                self.proxy = None
       +                return
       +
       +
       +
       +
       +if __name__ == "__main__":
       +    import simple_config
       +    config = simple_config.SimpleConfig({'verbose':True})
       +    network = Network(config)
       +    network.start()
       +
       +    while 1:
       +        time.sleep(1)
       +
       +
       +
   DIR diff --git a/lib/wallet.py b/lib/wallet.py
       t@@ -1343,10 +1343,11 @@ class Wallet:
                return True
        
        
       -    def start_threads(self, interface, blockchain):
       +    def start_threads(self, network):
                from verifier import TxVerifier
       -        self.interface = interface
       -        self.verifier = TxVerifier(interface, blockchain, self.storage)
       +        self.network = network
       +        self.interface = network.interface
       +        self.verifier = TxVerifier(self.interface, network.blockchain, self.storage)
                self.verifier.start()
                self.set_verifier(self.verifier)
                self.synchronizer = WalletSynchronizer(self)
       t@@ -1370,7 +1371,7 @@ class WalletSynchronizer(threading.Thread):
                wallet.synchronizer = self
                self.interface = self.wallet.interface
                self.interface.register_channel('synchronizer')
       -        self.wallet.interface.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
       +        #self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
                self.was_updated = True
                self.running = False
                self.lock = threading.Lock()
   DIR diff --git a/setup.py b/setup.py
       t@@ -53,22 +53,23 @@ setup(name = "Electrum",
            package_dir = {'electrum': 'lib', 'electrum_gui': 'gui', 'electrum_plugins':'plugins'},
            scripts= ['electrum'],
            data_files = data_files,
       -    py_modules = ['electrum.version',
       -                  'electrum.wallet',
       -                  'electrum.wallet_bitkey',
       -                  'electrum.wallet_factory',
       -                  'electrum.interface',
       +    py_modules = ['electrum.account',
       +                  'electrum.bitcoin',
                          'electrum.blockchain',
                          'electrum.commands',
       +                  'electrum.interface',
                          'electrum.mnemonic',
       +                  'electrum.msqr',
       +                  'electrum.network',
                          'electrum.simple_config',
                          'electrum.socks',
       -                  'electrum.msqr',
       -                  'electrum.util',
       -                  'electrum.account',
       -                  'electrum.bitcoin',
                          'electrum.transaction',
       +                  'electrum.util',
       +                  'electrum.version',
                          'electrum.verifier',
       +                  'electrum.wallet',
       +                  'electrum.wallet_bitkey',
       +                  'electrum.wallet_factory',
                          'electrum_gui.gui_gtk',
                          'electrum_gui.qt_console',
                          'electrum_gui.gui_classic',