tseparate blockchain and network - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 328315f94baeccaa646c713a5626960709879c0a DIR parent ac26abfed3e9f4d36b4419c90ca9f48aa285c4ee HTML Author: ThomasV <thomasv@gitorious> Date: Sun, 8 Sep 2013 17:23:01 +0200 separate blockchain and network Diffstat: M electrum | 14 +++++--------- M gui/gui_classic.py | 13 ++++++------- M lib/__init__.py | 2 +- M lib/blockchain.py | 275 +++++++++++++------------------ M lib/interface.py | 78 +++++++++++-------------------- A lib/network.py | 121 +++++++++++++++++++++++++++++++ M lib/wallet.py | 9 +++++---- M setup.py | 19 ++++++++++--------- 8 files changed, 288 insertions(+), 243 deletions(-) --- DIR diff --git a/electrum b/electrum t@@ -128,18 +128,14 @@ if __name__ == '__main__': #sys.exit("Error: Unknown GUI: " + gui_name ) # network interface - interface = Interface(config, True) - interface.start(wait = False) - interface.send([('server.peers.subscribe',[])]) + network = Network(config) + network.start() + #interface.send([('server.peers.subscribe',[])]) - blockchain = BlockchainVerifier(interface, config) - blockchain.start() - - gui = gui.ElectrumGui(config, interface, blockchain) + gui = gui.ElectrumGui(config, network) gui.main(url) - interface.stop() - blockchain.stop() + network.stop() # we use daemon threads, their termination is enforced. # this sleep command gives them time to terminate cleanly. DIR diff --git a/gui/gui_classic.py b/gui/gui_classic.py t@@ -568,8 +568,6 @@ class ElectrumWindow(QMainWindow): self.config.set_key('io_dir', os.path.dirname(fileName), True) return fileName - - def close(self): QMainWindow.close(self) self.run_hook('close_main_window') t@@ -1367,7 +1365,7 @@ class ElectrumWindow(QMainWindow): console.history = self.config.get("console-history",[]) console.history_index = len(console.history) - console.updateNamespace({'wallet' : self.wallet, 'interface' : self.wallet.interface, 'gui':self}) + console.updateNamespace({'wallet' : self.wallet, 'network' : self.wallet.network, 'gui':self}) console.updateNamespace({'util' : util, 'bitcoin':bitcoin}) c = commands.Commands(self.wallet, self.wallet.interface, lambda: self.console.set_json(True)) t@@ -2258,10 +2256,11 @@ class OpenFileEventFilter(QObject): class ElectrumGui: - def __init__(self, config, interface, blockchain, app=None): - self.interface = interface + def __init__(self, config, network, app=None): + self.network = network + #self.interface = interface self.config = config - self.blockchain = blockchain + #self.blockchain = network.blockchain self.windows = [] self.efilter = OpenFileEventFilter(self.windows) if app is None: t@@ -2281,7 +2280,7 @@ class ElectrumGui: else: wallet = Wallet(storage) - wallet.start_threads(self.interface, self.blockchain) + wallet.start_threads(self.network) s = Timer() s.start() DIR diff --git a/lib/__init__.py b/lib/__init__.py t@@ -3,7 +3,7 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos from wallet import WalletSynchronizer, WalletStorage from wallet_factory import WalletFactory as Wallet from verifier import TxVerifier -from blockchain import BlockchainVerifier +from network import Network from interface import Interface, pick_random_server, DEFAULT_SERVERS from simple_config import SimpleConfig import bitcoin DIR diff --git a/lib/blockchain.py b/lib/blockchain.py t@@ -22,10 +22,9 @@ from util import user_dir, appdata_dir, print_error from bitcoin import * -class BlockchainVerifier(threading.Thread): - """ Simple Payment Verification """ +class Blockchain(threading.Thread): - def __init__(self, interface, config): + def __init__(self, config): threading.Thread.__init__(self) self.daemon = True self.config = config t@@ -34,112 +33,62 @@ class BlockchainVerifier(threading.Thread): self.local_height = 0 self.running = False self.headers_url = 'http://headers.electrum.org/blockchain_headers' - self.interface = interface - interface.register_channel('verifier') self.set_local_height() + self.queue = Queue.Queue() - - - def start_interfaces(self): - import interface - servers = interface.DEFAULT_SERVERS - servers = interface.filter_protocol(servers,'s') - print_error("using %d servers"% len(servers)) - self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers ) - - for i in self.interfaces: - i.start() - # subscribe to block headers - i.register_channel('verifier') - i.register_channel('get_header') - i.send([ ('blockchain.headers.subscribe',[])], 'verifier') - # note: each interface should send its results directly to a queue, instead of channels - # pass the queue to the interface, so that several can share the same queue - - - def get_new_response(self): - # listen to interfaces, forward to verifier using the queue - while self.is_running(): - for i in self.interfaces: - try: - r = i.get_response('verifier',timeout=0) - except Queue.Empty: - continue - - result = r.get('result') - if result: - return (i,result) - - time.sleep(1) - - - - + def stop(self): with self.lock: self.running = False - #self.interface.poke('verifier') + def is_running(self): with self.lock: return self.running - def request_header(self, i, h): - print_error("requesting header %d from %s"%(h, i.server)) - i.send([ ('blockchain.block.get_header',[h])], 'get_header') + def run(self): + self.init_headers_file() + self.set_local_height() + print_error( "blocks:", self.local_height ) + + with self.lock: + self.running = True + + while self.is_running(): - def retrieve_header(self, i): - while True: try: - r = i.get_response('get_header',timeout=1) + i, result = self.queue.get() except Queue.Empty: - print_error('timeout') continue - if r.get('error'): - print_error('Verifier received an error:', r) - continue - - # 3. handle response - method = r['method'] - params = r['params'] - result = r['result'] + header= result.get('result') + #print_error( i.server, header ) + height = header.get('block_height') - if method == 'blockchain.block.get_header': - return result - + if height > self.local_height + 50: + self.get_chunks(i, header, height) + i.trigger_callback('updated') - def get_chain(self, interface, final_header): + if height > self.local_height: + # get missing parts from interface (until it connects to my chain) + chain = self.get_chain( i, header ) - header = final_header - chain = [ final_header ] - requested_header = False - - while self.is_running(): + # skip that server if the result is not consistent + if not chain: continue + + # verify the chain + if self.verify_chain( chain ): + print_error("height:", height, i.server) + for header in chain: + self.save_header(header) + self.height = height + else: + print_error("error", i.server) + # todo: dismiss that server - if requested_header: - header = self.retrieve_header(interface) - if not header: return - chain = [ header ] + chain - requested_header = False + i.trigger_callback('updated') - height = header.get('block_height') - previous_header = self.read_header(height -1) - if not previous_header: - self.request_header(interface, height - 1) - requested_header = True - continue - # verify that it connects to my chain - prev_hash = self.hash_header(previous_header) - if prev_hash != header.get('prev_block_hash'): - print_error("reorg") - self.request_header(interface, height - 1) - requested_header = True - continue - else: - # the chain is complete - return chain def verify_chain(self, chain): t@@ -166,37 +115,6 @@ class BlockchainVerifier(threading.Thread): return True - def get_chunks(self, i, header, height): - requested_chunks = [] - min_index = (self.local_height + 1)/2016 - max_index = (height + 1)/2016 - for n in range(min_index, max_index + 1): - print_error( "requesting chunk", n ) - i.send([ ('blockchain.block.get_chunk',[n])], 'get_header') - requested_chunks.append(n) - break - - while requested_chunks: - try: - r = i.get_response('get_header',timeout=1) - except Queue.Empty: - continue - if not r: continue - - if r.get('error'): - print_error('Verifier received an error:', r) - continue - - # 3. handle response - method = r['method'] - params = r['params'] - result = r['result'] - - if method == 'blockchain.block.get_chunk': - index = params[0] - self.verify_chunk(index, result) - requested_chunks.remove(index) - def verify_chunk(self, index, hexdata): data = hexdata.decode('hex') t@@ -259,8 +177,6 @@ class BlockchainVerifier(threading.Thread): return True - - def header_to_string(self, res): s = int_to_hex(res.get('version'),4) \ + rev_hex(res.get('prev_block_hash')) \ t@@ -383,65 +299,100 @@ class BlockchainVerifier(threading.Thread): return new_bits, new_target + def request_header(self, i, h): + print_error("requesting header %d from %s"%(h, i.server)) + i.send([ ('blockchain.block.get_header',[h])], 'get_header') + def retrieve_header(self, i): + while True: + try: + r = i.get_response('get_header',timeout=1) + except Queue.Empty: + print_error('timeout') + continue - def run(self): - self.start_interfaces() - - self.init_headers_file() - self.set_local_height() - print_error( "blocks:", self.local_height ) + if r.get('error'): + print_error('Verifier received an error:', r) + continue - with self.lock: - self.running = True + # 3. handle response + method = r['method'] + params = r['params'] + result = r['result'] + + if method == 'blockchain.block.get_header': + return result + + + + def get_chain(self, interface, final_header): + header = final_header + chain = [ final_header ] + requested_header = False + while self.is_running(): - i, header = self.get_new_response() - + if requested_header: + header = self.retrieve_header(interface) + if not header: return + chain = [ header ] + chain + requested_header = False + height = header.get('block_height') + previous_header = self.read_header(height -1) + if not previous_header: + self.request_header(interface, height - 1) + requested_header = True + continue - if height > self.local_height + 50: - self.get_chunks(i, header, height) - self.interface.trigger_callback('updated') + # verify that it connects to my chain + prev_hash = self.hash_header(previous_header) + if prev_hash != header.get('prev_block_hash'): + print_error("reorg") + self.request_header(interface, height - 1) + requested_header = True + continue - if height > self.local_height: - # get missing parts from interface (until it connects to my chain) - chain = self.get_chain( i, header ) + else: + # the chain is complete + return chain - # skip that server if the result is not consistent - if not chain: continue - - # verify the chain - if self.verify_chain( chain ): - print_error("height:", height, i.server) - for header in chain: - self.save_header(header) - self.height = height - else: - print_error("error", i.server) - # todo: dismiss that server - self.interface.trigger_callback('updated') - + def get_chunks(self, i, header, height): + requested_chunks = [] + min_index = (self.local_height + 1)/2016 + max_index = (height + 1)/2016 + for n in range(min_index, max_index + 1): + print_error( "requesting chunk", n ) + i.send([ ('blockchain.block.get_chunk',[n])], 'get_header') + requested_chunks.append(n) + break + while requested_chunks: + try: + r = i.get_response('get_header',timeout=1) + except Queue.Empty: + continue + if not r: continue + if r.get('error'): + print_error('Verifier received an error:', r) + continue -if __name__ == "__main__": - import interface, simple_config - - config = simple_config.SimpleConfig({'verbose':True}) + # 3. handle response + method = r['method'] + params = r['params'] + result = r['result'] + + if method == 'blockchain.block.get_chunk': + index = params[0] + self.verify_chunk(index, result) + requested_chunks.remove(index) - i0 = interface.Interface() - i0.start() - bv = BlockchainVerifier(i0, config) - bv.start() - # listen to interfaces, forward to verifier using the queue - while 1: - time.sleep(1) DIR diff --git a/lib/interface.py b/lib/interface.py t@@ -66,18 +66,21 @@ def pick_random_server(): class Interface(threading.Thread): + def register_callback(self, event, callback): with self.lock: if not self.callbacks.get(event): self.callbacks[event] = [] self.callbacks[event].append(callback) + def trigger_callback(self, event): with self.lock: callbacks = self.callbacks.get(event,[])[:] if callbacks: [callback() for callback in callbacks] + def init_server(self, host, port, proxy=None, use_ssl=True): self.host = host self.port = port t@@ -188,16 +191,19 @@ class Interface(threading.Thread): return response_queue = self.responses[channel] - response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id}) + response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id})) def get_response(self, channel='default', block=True, timeout=10000000000): - return self.responses[channel].get(block, timeout) + i, r = self.responses[channel].get(block, timeout) + return r - def register_channel(self, channel): + def register_channel(self, channel, queue=None): + if queue is None: + queue = Queue.Queue() with self.lock: - self.responses[channel] = Queue.Queue() + self.responses[channel] = queue def poke(self, channel): self.responses[channel].put(None) t@@ -418,7 +424,7 @@ class Interface(threading.Thread): - def __init__(self, config=None, loop=False): + def __init__(self, config=None): self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's')) self.proxy = None t@@ -428,7 +434,6 @@ class Interface(threading.Thread): threading.Thread.__init__(self) self.daemon = True - self.loop = loop self.config = config self.connect_event = threading.Event() t@@ -457,32 +462,11 @@ class Interface(threading.Thread): if self.config.get('auto_cycle') is None: self.config.set_key('auto_cycle', True, False) - if not self.is_connected and self.config.get('auto_cycle'): - print_msg("Using random server...") - servers = filter_protocol(DEFAULT_SERVERS, 's') - while servers: - server = random.choice( servers ) - servers.remove(server) - print server - self.config.set_key('server', server, False) - self.init_with_server(self.config) - if self.is_connected: break - - if not self.is_connected: - print 'no server available' - self.connect_event.set() # to finish start - self.server = 'ecdsa.org:50001:t' - self.proxy = None - return + if not self.is_connected: + self.connect_event.set() + return self.connect_event.set() - if self.is_connected: - self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])]) - self.send([('server.banner',[])]) - self.trigger_callback('connected') - else: - self.trigger_callback('notconnected') - #print_error("Failed to connect " + self.connection_msg) def init_with_server(self, config): t@@ -532,12 +516,6 @@ class Interface(threading.Thread): return out - def resend_subscriptions(self): - for channel, messages in self.subscriptions.items(): - if messages: - self.send(messages, channel) - - def parse_proxy_options(self, s): if type(s) == type({}): return s # fixme: type should be fixed t@@ -625,26 +603,24 @@ class Interface(threading.Thread): return out - def start(self, wait=True): + def start(self, queue): + self.queue = queue threading.Thread.start(self) - if wait: - # wait until connection is established - self.connect_event.wait() - if not self.is_connected: - return False - return True + + def run(self): - while True: - self.init_interface() - if self.is_connected: - self.resend_subscriptions() - self.run_tcp() if self.protocol in 'st' else self.run_http() + self.init_interface() + if self.is_connected: + self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])]) + self.change_status() + self.run_tcp() if self.protocol in 'st' else self.run_http() + self.change_status() + + def change_status(self): + self.queue.put(self) - self.trigger_callback('disconnected') - if not self.loop: break - time.sleep(5) DIR diff --git a/lib/network.py b/lib/network.py t@@ -0,0 +1,121 @@ +import interface +from blockchain import Blockchain +import threading, time, Queue, os, sys, shutil +from util import user_dir, appdata_dir, print_error +from bitcoin import * + + +class Network(threading.Thread): + + def __init__(self, config): + threading.Thread.__init__(self) + self.daemon = True + self.config = config + self.lock = threading.Lock() + self.blockchain = Blockchain(config) + self.interfaces = {} + self.queue = Queue.Queue() + self.default_server = self.config.get('server') + self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s') + + + + def start_interfaces(self): + + for server in self.servers_list: + self.interfaces[server] = interface.Interface({'server':server}) + + for i in self.interfaces.values(): + i.start(self.queue) + + if self.default_server: + self.interface = interface.Interface({'server':self.default_server}) + self.interface.start(self.queue) + else: + self.interface = self.interfaces[0] + + + + + + + def run(self): + self.blockchain.start() + self.start_interfaces() + + with self.lock: + self.running = True + + while self.is_running(): + i = self.queue.get() + + if i.is_connected: + i.register_channel('verifier', self.blockchain.queue) + i.register_channel('get_header') + i.send([ ('blockchain.headers.subscribe',[])], 'verifier') + if i == self.interface: + i.send([('server.banner',[])]) + i.send([('server.peers.subscribe',[])]) + else: + self.interfaces.pop(i.server) + if i == self.interface: + if self.default_server is None: + print_msg("Using random server...") + server = random.choice( self.servers_list ) + self.interface = interface.Interface({'server':self.default_server}) + else: + #i.trigger_callback('disconnected') + pass + + def on_peers(self, resut): + pass + + def on_banner(self, result): + pass + + def stop(self): + with self.lock: self.running = False + + def is_running(self): + with self.lock: return self.running + + + def resend_subscriptions(self): + for channel, messages in self.subscriptions.items(): + if messages: + self.send(messages, channel) + + + def auto_cycle(self): + if not self.is_connected and self.config.get('auto_cycle'): + print_msg("Using random server...") + servers = filter_protocol(DEFAULT_SERVERS, 's') + while servers: + server = random.choice( servers ) + servers.remove(server) + print server + self.config.set_key('server', server, False) + self.init_with_server(self.config) + if self.is_connected: break + + if not self.is_connected: + print 'no server available' + self.connect_event.set() # to finish start + self.server = 'ecdsa.org:50001:t' + self.proxy = None + return + + + + +if __name__ == "__main__": + import simple_config + config = simple_config.SimpleConfig({'verbose':True}) + network = Network(config) + network.start() + + while 1: + time.sleep(1) + + + DIR diff --git a/lib/wallet.py b/lib/wallet.py t@@ -1343,10 +1343,11 @@ class Wallet: return True - def start_threads(self, interface, blockchain): + def start_threads(self, network): from verifier import TxVerifier - self.interface = interface - self.verifier = TxVerifier(interface, blockchain, self.storage) + self.network = network + self.interface = network.interface + self.verifier = TxVerifier(self.interface, network.blockchain, self.storage) self.verifier.start() self.set_verifier(self.verifier) self.synchronizer = WalletSynchronizer(self) t@@ -1370,7 +1371,7 @@ class WalletSynchronizer(threading.Thread): wallet.synchronizer = self self.interface = self.wallet.interface self.interface.register_channel('synchronizer') - self.wallet.interface.register_callback('connected', lambda: self.wallet.set_up_to_date(False)) + #self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False)) self.was_updated = True self.running = False self.lock = threading.Lock() DIR diff --git a/setup.py b/setup.py t@@ -53,22 +53,23 @@ setup(name = "Electrum", package_dir = {'electrum': 'lib', 'electrum_gui': 'gui', 'electrum_plugins':'plugins'}, scripts= ['electrum'], data_files = data_files, - py_modules = ['electrum.version', - 'electrum.wallet', - 'electrum.wallet_bitkey', - 'electrum.wallet_factory', - 'electrum.interface', + py_modules = ['electrum.account', + 'electrum.bitcoin', 'electrum.blockchain', 'electrum.commands', + 'electrum.interface', 'electrum.mnemonic', + 'electrum.msqr', + 'electrum.network', 'electrum.simple_config', 'electrum.socks', - 'electrum.msqr', - 'electrum.util', - 'electrum.account', - 'electrum.bitcoin', 'electrum.transaction', + 'electrum.util', + 'electrum.version', 'electrum.verifier', + 'electrum.wallet', + 'electrum.wallet_bitkey', + 'electrum.wallet_factory', 'electrum_gui.gui_gtk', 'electrum_gui.qt_console', 'electrum_gui.gui_classic',