URI: 
       tselect the longest blockchain from several servers - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 2a6e8927ddc3e60cf18e01da395031efae032de7
   DIR parent 48efc62b2d5c30d296ea228bc91b62afbade8f90
  HTML Author: thomasv <thomasv@gitorious>
       Date:   Mon,  2 Sep 2013 17:49:12 +0200
       
       select the longest blockchain from several servers
       
       Diffstat:
         M lib/blockchain.py                   |     243 +++++++++++++++++++++----------
         M lib/interface.py                    |       3 ++-
         M lib/verifier.py                     |      12 +++++++++++-
       
       3 files changed, 176 insertions(+), 82 deletions(-)
       ---
   DIR diff --git a/lib/blockchain.py b/lib/blockchain.py
       t@@ -29,79 +29,71 @@ class BlockchainVerifier(threading.Thread):
                threading.Thread.__init__(self)
                self.daemon = True
                self.config = config
       -        self.interface = interface
       -        self.interface.register_channel('verifier')
                self.lock = threading.Lock()
       -        self.pending_headers = [] # headers that have not been verified
                self.height = 0
                self.local_height = 0
                self.running = False
                self.headers_url = 'http://headers.electrum.org/blockchain_headers'
       +        self.interface = interface
       +        interface.register_channel('verifier')
       +        self.set_local_height()
       +
       +
       +
       +    def start_interfaces(self):
       +        import interface
       +        servers = interface.DEFAULT_SERVERS
       +        servers = interface.filter_protocol(servers,'s')
       +        print_error("using %d servers"% len(servers))
       +        self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers )
       +
       +        for i in self.interfaces:
       +            i.start()
       +            # subscribe to block headers
       +            i.register_channel('verifier')
       +            i.register_channel('get_header')
       +            i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
       +            # note: each interface should send its results directly to a queue, instead of channels
       +            # pass the queue to the interface, so that several can share the same queue
       +
       +
       +    def get_new_response(self):
       +        # listen to interfaces, forward to verifier using the queue
       +        while 1:
       +            for i in self.interfaces:
       +                try:
       +                    r = i.get_response('verifier',timeout=0)
       +                except Queue.Empty:
       +                    continue
       +
       +                result = r.get('result')
       +                if result:
       +                    return (i,result)
       +
       +            time.sleep(1)
       +
       +
        
        
            def stop(self):
                with self.lock: self.running = False
       -        self.interface.poke('verifier')
       +        #self.interface.poke('verifier')
        
            def is_running(self):
                with self.lock: return self.running
        
       -    def run(self):
        
       -        self.init_headers_file()
       -        self.set_local_height()
       -
       -        with self.lock:
       -            self.running = True
       -        requested_chunks = []
       -        requested_headers = []
       -        all_chunks = False
       -        
       -        # subscribe to block headers
       -        self.interface.send([ ('blockchain.headers.subscribe',[])], 'verifier')
       -
       -        while self.is_running():
       -            # request missing chunks
       -            if not all_chunks and self.height and not requested_chunks:
       -
       -                if self.local_height + 50 < self.height:
       -                    min_index = (self.local_height + 1)/2016
       -                    max_index = (self.height + 1)/2016
       -                    for i in range(min_index, max_index + 1):
       -                        print_error( "requesting chunk", i )
       -                        self.interface.send([ ('blockchain.block.get_chunk',[i])], 'verifier')
       -                        requested_chunks.append(i)
       -                        break
       -                else:
       -                    all_chunks = True
       -                    print_error("downloaded all chunks")
       -
       -
       -            # process pending headers
       -            if self.pending_headers and all_chunks:
       -                done = []
       -                for header in self.pending_headers:
       -                    if self.verify_header(header):
       -                        done.append(header)
       -                    else:
       -                        # request previous header
       -                        i = header.get('block_height') - 1
       -                        if i not in requested_headers:
       -                            print_error("requesting header %d"%i)
       -                            self.interface.send([ ('blockchain.block.get_header',[i])], 'verifier')
       -                            requested_headers.append(i)
       -                        # no point continuing
       -                        break
       -                if done:
       -                    self.interface.trigger_callback('updated')
       -                    for header in done: 
       -                        self.pending_headers.remove(header)
       +    def request_header(self, i, h):
       +        print_error("requesting header %d from %s"%(h, i.server))
       +        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
        
       +    def retrieve_header(self, i):
       +        while True:
                    try:
       -                r = self.interface.get_response('verifier',timeout=1)
       +                r = i.get_response('get_header',timeout=1)
                    except Queue.Empty:
       +                print_error('timeout')
                        continue
       -            if not r: continue
        
                    if r.get('error'):
                        print_error('Verifier received an error:', r)
       t@@ -112,23 +104,66 @@ class BlockchainVerifier(threading.Thread):
                    params = r['params']
                    result = r['result']
        
       -            if method == 'blockchain.block.get_chunk':
       -                index = params[0]
       -                self.verify_chunk(index, result)
       -                requested_chunks.remove(index)
       +            if method == 'blockchain.block.get_header':
       +                return result
       +                
        
       -            elif method in ['blockchain.headers.subscribe', 'blockchain.block.get_header']:
       +    def get_chain(self, interface, final_header):
        
       -                self.pending_headers.append(result)
       -                if method == 'blockchain.block.get_header':
       -                    requested_headers.remove(result.get('block_height'))
       -                else:
       -                    self.height = result.get('block_height')
       -                    ## fixme # self.interface.poke('synchronizer')
       -                
       -                self.pending_headers.sort(key=lambda x: x.get('block_height'))
       -                # print "pending headers", map(lambda x: x.get('block_height'), self.pending_headers)
       +        header = final_header
       +        chain = [ final_header ]
       +        requested_header = False
       +        
       +        while self.is_running():
       +
       +            if requested_header:
       +                header = self.retrieve_header(interface)
       +                if not header: return
       +                chain = [ header ] + chain
       +                requested_header = False
       +
       +            height = header.get('block_height')
       +            previous_header = self.read_header(height -1)
       +            if not previous_header:
       +                self.request_header(interface, height - 1)
       +                requested_header = True
       +                continue
       +
       +            # verify that it connects to my chain
       +            prev_hash = self.hash_header(previous_header)
       +            if prev_hash != header.get('prev_block_hash'):
       +                print_error("reorg")
       +                self.request_header(interface, height - 1)
       +                requested_header = True
       +                continue
       +
       +            else:
       +                # the chain is complete
       +                return chain
       +                    
       +            
       +    def verify_chain(self, chain):
       +
       +        first_header = chain[0]
       +        prev_header = self.read_header(first_header.get('block_height') -1)
       +        
       +        for header in chain:
        
       +            height = header.get('block_height')
       +
       +            prev_hash = self.hash_header(prev_header)
       +            bits, target = self.get_target(height/2016)
       +            _hash = self.hash_header(header)
       +            try:
       +                assert prev_hash == header.get('prev_block_hash')
       +                assert bits == header.get('bits')
       +                assert eval('0x'+_hash) < target
       +            except:
       +                return False
       +
       +            prev_header = header
       +
       +        return True
        
        
        
       t@@ -184,17 +219,8 @@ class BlockchainVerifier(threading.Thread):
                except:
                    # this can be caused by a reorg.
                    print_error("verify header failed"+ repr(header))
       -            # undo verifications
       -            with self.lock:
       -                items = self.verified_tx.items()[:]
       -            for tx_hash, item in items:
       -                tx_height, timestamp, pos = item
       -                if tx_height >= height:
       -                    print_error("redoing", tx_hash)
       -                    with self.lock:
       -                        self.verified_tx.pop(tx_hash)
       -                        if tx_hash in self.merkle_roots:
       -                            self.merkle_roots.pop(tx_hash)
       +            verifier.undo_verifications()
       +
                    # return False to request previous header.
                    return False
        
       t@@ -272,6 +298,7 @@ class BlockchainVerifier(threading.Thread):
                    h = os.path.getsize(name)/80 - 1
                    if self.local_height != h:
                        self.local_height = h
       +                self.height = self.local_height
        
        
            def read_header(self, block_height):
       t@@ -327,3 +354,59 @@ class BlockchainVerifier(threading.Thread):
        
        
        
       +
       +    def run(self):
       +        self.start_interfaces()
       +        
       +        self.init_headers_file()
       +        self.set_local_height()
       +        print_error( "blocks:", self.local_height )
       +
       +        with self.lock:
       +            self.running = True
       +
       +        while self.is_running():
       +
       +            i, header = self.get_new_response()
       +            
       +            height = header.get('block_height')
       +
       +            if height > self.local_height:
       +                # get missing parts from interface (until it connects to my chain)
       +                chain = self.get_chain( i, header )
       +
       +                # skip that server if the result is not consistent
       +                if not chain: continue
       +                
       +                # verify the chain
       +                if self.verify_chain( chain ):
       +                    print_error("height:", height, i.server)
       +                    for header in chain:
       +                        self.save_header(header)
       +                        self.height = height
       +                else:
       +                    print_error("error", i.server)
       +                    # todo: dismiss that server
       +
       +    
       +
       +
       +
       +if __name__ == "__main__":
       +    import interface, simple_config
       +    
       +    config = simple_config.SimpleConfig({'verbose':True})
       +
       +    i0 = interface.Interface()
       +    i0.start()
       +
       +    bv = BlockchainVerifier(i0, config)
       +    bv.start()
       +
       +
       +    # listen to interfaces, forward to verifier using the queue
       +    while 1:
       +        time.sleep(1)
       +
       +
       +
   DIR diff --git a/lib/interface.py b/lib/interface.py
       t@@ -332,7 +332,8 @@ class Interface(threading.Thread):
                try:
                    s.connect(( self.host.encode('ascii'), int(self.port)))
                except:
       -            traceback.print_exc(file=sys.stdout)
       +            #traceback.print_exc(file=sys.stdout)
       +            print_error("failed to connect", host, port)
                    self.is_connected = False
                    self.s = None
                    return
   DIR diff --git a/lib/verifier.py b/lib/verifier.py
       t@@ -155,4 +155,14 @@ class TxVerifier(threading.Thread):
        
        
        
       -
       +    def undo_verifications(self, height):
       +        with self.lock:
       +            items = self.verified_tx.items()[:]
       +        for tx_hash, item in items:
       +            tx_height, timestamp, pos = item
       +            if tx_height >= height:
       +                print_error("redoing", tx_hash)
       +                with self.lock:
       +                    self.verified_tx.pop(tx_hash)
       +                    if tx_hash in self.merkle_roots:
       +                        self.merkle_roots.pop(tx_hash)