tsimplify interface: use callbacks - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 907dca6eb9c6e012ac0cb6de32f66f364e43f443 DIR parent b363d81fc1a4c74c678a5d02898ac5c8b239774f HTML Author: ThomasV <thomasv@gitorious> Date: Thu, 12 Sep 2013 08:41:27 +0200 simplify interface: use callbacks Diffstat: M gui/gui_classic/main_window.py | 4 ++-- M gui/gui_classic/network_dialog.py | 4 ++-- M lib/__init__.py | 4 ++-- M lib/blockchain.py | 35 ++++++++++++++++++------------- M lib/interface.py | 178 ++++++------------------------- M lib/network.py | 97 ++++++++++++++++++++++++++----- M lib/verifier.py | 8 +++++--- M lib/wallet.py | 47 ++++++++++++++++--------------- 8 files changed, 169 insertions(+), 208 deletions(-) --- DIR diff --git a/gui/gui_classic/main_window.py b/gui/gui_classic/main_window.py t@@ -206,7 +206,7 @@ class ElectrumWindow(QMainWindow): QShortcut(QKeySequence("Ctrl+PgDown"), self, lambda: tabs.setCurrentIndex( (tabs.currentIndex() + 1 )%tabs.count() )) self.connect(self, QtCore.SIGNAL('update_status'), self.update_status) - self.connect(self, QtCore.SIGNAL('banner_signal'), lambda: self.console.showMessage(self.wallet.interface.banner) ) + self.connect(self, QtCore.SIGNAL('banner_signal'), lambda: self.console.showMessage(self.network.banner) ) self.connect(self, QtCore.SIGNAL('transaction_signal'), lambda: self.notify_transactions() ) self.history_list.setFocus(True) t@@ -240,7 +240,7 @@ class ElectrumWindow(QMainWindow): self.setWindowTitle( title ) self.update_wallet() # set initial message - self.console.showMessage(self.wallet.interface.banner) + self.console.showMessage(self.network.banner) # Once GUI has been initialized check if we want to announce something since the callback has been called before the GUI was initialized self.notify_transactions() DIR diff --git a/gui/gui_classic/network_dialog.py b/gui/gui_classic/network_dialog.py t@@ -23,7 +23,7 @@ import os.path, json, ast, traceback from PyQt4.QtGui import * from PyQt4.QtCore import * -from electrum.interface import DEFAULT_SERVERS, DEFAULT_PORTS +from electrum import DEFAULT_SERVERS, DEFAULT_PORTS from qt_util import * t@@ -62,7 +62,7 @@ class NetworkDialog(QDialog): status = _("Please choose a server.") + "\n" + _("Select 'Cancel' if you are offline.") server = interface.server - self.servers = interface.get_servers() + self.servers = network.get_servers() vbox = QVBoxLayout() DIR diff --git a/lib/__init__.py b/lib/__init__.py t@@ -3,8 +3,8 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos from wallet import WalletSynchronizer, WalletStorage from wallet_factory import WalletFactory as Wallet from verifier import TxVerifier -from network import Network -from interface import Interface, pick_random_server, DEFAULT_SERVERS +from network import Network, DEFAULT_SERVERS, DEFAULT_PORTS +from interface import Interface from simple_config import SimpleConfig import bitcoin import account DIR diff --git a/lib/blockchain.py b/lib/blockchain.py t@@ -308,18 +308,23 @@ class Blockchain(threading.Thread): return new_bits, new_target - def request_header(self, i, h): + def request_header(self, i, h, queue): print_error("requesting header %d from %s"%(h, i.server)) - i.send([ ('blockchain.block.get_header',[h])], 'get_header') + i.send([ ('blockchain.block.get_header',[h])], lambda i,r: queue.put((i,r))) - def retrieve_header(self, i): + def retrieve_header(self, i, queue): while True: try: - r = i.get_response('get_header',timeout=1) + ir = queue.get(timeout=1) except Queue.Empty: print_error('timeout') continue + if not ir: + continue + + i, r = ir + if r.get('error'): print_error('Verifier received an error:', r) continue t@@ -339,11 +344,12 @@ class Blockchain(threading.Thread): header = final_header chain = [ final_header ] requested_header = False - + queue = Queue.Queue() + while self.is_running(): if requested_header: - header = self.retrieve_header(interface) + header = self.retrieve_header(interface, queue) if not header: return chain = [ header ] + chain requested_header = False t@@ -351,7 +357,7 @@ class Blockchain(threading.Thread): height = header.get('block_height') previous_header = self.read_header(height -1) if not previous_header: - self.request_header(interface, height - 1) + self.request_header(interface, height - 1, queue) requested_header = True continue t@@ -359,7 +365,7 @@ class Blockchain(threading.Thread): prev_hash = self.hash_header(previous_header) if prev_hash != header.get('prev_block_hash'): print_error("reorg") - self.request_header(interface, height - 1) + self.request_header(interface, height - 1, queue) requested_header = True continue t@@ -370,17 +376,18 @@ class Blockchain(threading.Thread): def get_chunks(self, i, header, height): requested_chunks = [] + queue = Queue.Queue() min_index = (self.local_height + 1)/2016 max_index = (height + 1)/2016 for n in range(min_index, max_index + 1): print_error( "requesting chunk", n ) - i.send([ ('blockchain.block.get_chunk',[n])], 'get_header') + i.send([ ('blockchain.block.get_chunk',[n])], lambda i,r:queue.put(r)) requested_chunks.append(n) break while requested_chunks: try: - r = i.get_response('get_header',timeout=1) + r = queue.get(timeout=1) except Queue.Empty: continue if not r: continue t@@ -390,14 +397,12 @@ class Blockchain(threading.Thread): continue # 3. handle response - method = r['method'] params = r['params'] result = r['result'] - if method == 'blockchain.block.get_chunk': - index = params[0] - self.verify_chunk(index, result) - requested_chunks.remove(index) + index = params[0] + self.verify_chunk(index, result) + requested_chunks.remove(index) DIR diff --git a/lib/interface.py b/lib/interface.py t@@ -25,36 +25,6 @@ from util import print_error, print_msg DEFAULT_TIMEOUT = 5 -DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} - -DEFAULT_SERVERS = { - 'the9ull.homelinux.org': {'h': '8082', 't': '50001'}, - 'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.dynaloop.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.mooo.com': {'h': '8081', 't': '50001'}, - 'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'}, - 'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'}, - 'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'}, - 'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'} -} - - - -def filter_protocol(servers, p): - l = [] - for k, protocols in servers.items(): - if p in protocols: - l.append( ':'.join([k, protocols[p], p]) ) - return l - - - proxy_modes = ['socks4', 'socks5', 'http'] t@@ -62,12 +32,9 @@ def pick_random_server(): return random.choice( filter_protocol(DEFAULT_SERVERS,'s') ) - - class Interface(threading.Thread): - def init_server(self, host, port, proxy=None, use_ssl=True): self.host = host self.port = port t@@ -78,43 +45,9 @@ class Interface(threading.Thread): #json self.message_id = 0 self.unanswered_requests = {} - #banner - self.banner = '' self.pending_transactions_for_notifications= [] - def parse_servers(self, result): - """ parse servers list into dict format""" - - servers = {} - for item in result: - host = item[1] - out = {} - version = None - pruning_level = '-' - if len(item) > 2: - for v in item[2]: - if re.match("[stgh]\d*", v): - protocol, port = v[0], v[1:] - if port == '': port = DEFAULT_PORTS[protocol] - out[protocol] = port - elif re.match("v(.?)+", v): - version = v[1:] - elif re.match("p\d*", v): - pruning_level = v[1:] - if pruning_level == '': pruning_level = '0' - try: - is_recent = float(version)>=float(PROTOCOL_VERSION) - except: - is_recent = False - - if out and is_recent: - out['pruning'] = pruning_level - servers[host] = out - - return servers - - def queue_json_response(self, c): # uncomment to debug t@@ -127,30 +60,18 @@ class Interface(threading.Thread): print_error("received error:", c) if msg_id is not None: with self.lock: - method, params, channel = self.unanswered_requests.pop(msg_id) - response_queue = self.responses[channel] - response_queue.put((self,{'method':method, 'params':params, 'error':error, 'id':msg_id})) + method, params, callback = self.unanswered_requests.pop(msg_id) + callback(self,{'method':method, 'params':params, 'error':error, 'id':msg_id}) return if msg_id is not None: with self.lock: - method, params, channel = self.unanswered_requests.pop(msg_id) + method, params, callback = self.unanswered_requests.pop(msg_id) result = c.get('result') - if method == 'server.version': - self.server_version = result - - elif method == 'server.banner': - self.banner = result - self.network.trigger_callback('banner') - - elif method == 'server.peers.subscribe': - self.servers = self.parse_servers(result) - self.network.trigger_callback('peers') - else: - # notification: find the channel(s) + # notification method = c.get('method') params = c.get('params') t@@ -170,31 +91,19 @@ class Interface(threading.Thread): with self.lock: for k,v in self.subscriptions.items(): if (method, params) in v: - channel = k + callback = k break else: print_error( "received unexpected notification", method, params) print_error( self.subscriptions ) return - - response_queue = self.responses[channel] - response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id})) - - def get_response(self, channel='default', block=True, timeout=10000000000): - ir = self.responses[channel].get(block, timeout) - if ir: - return ir[1] + callback(self, {'method':method, 'params':params, 'result':result, 'id':msg_id}) - def register_channel(self, channel, queue=None): - if queue is None: - queue = Queue.Queue() - with self.lock: - self.responses[channel] = queue - def poke(self, channel): - self.responses[channel].put(None) + def on_version(self, i, result): + self.server_version = result def init_http(self, host, port, proxy=None, use_ssl=True): t@@ -237,7 +146,7 @@ class Interface(threading.Thread): self.send([]) - def send_http(self, messages, channel='default'): + def send_http(self, messages, callback): import urllib2, json, time, cookielib print_error( "send_http", messages ) t@@ -257,7 +166,7 @@ class Interface(threading.Thread): method, params = m if type(params) != type([]): params = [params] data.append( { 'method':method, 'id':self.message_id, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params, channel + self.unanswered_requests[self.message_id] = method, params, callback self.message_id += 1 if data: t@@ -359,7 +268,7 @@ class Interface(threading.Thread): if timeout: # ping the server with server.version, as a real ping does not exist yet - self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])]) + self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version) continue out += msg t@@ -381,14 +290,14 @@ class Interface(threading.Thread): self.is_connected = False - def send_tcp(self, messages, channel='default'): + def send_tcp(self, messages, callback): """return the ids of the requests that we sent""" out = '' ids = [] for m in messages: method, params = m request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params, channel + self.unanswered_requests[self.message_id] = method, params, callback ids.append(self.message_id) # uncomment to debug # print "-->", request t@@ -413,7 +322,7 @@ class Interface(threading.Thread): def __init__(self, config=None): - self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's')) + #self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's')) self.proxy = None if config is None: t@@ -426,9 +335,6 @@ class Interface(threading.Thread): self.connect_event = threading.Event() self.subscriptions = {} - self.responses = {} - self.responses['default'] = Queue.Queue() - self.lock = threading.Lock() self.servers = {} # actual list from IRC t@@ -475,7 +381,7 @@ class Interface(threading.Thread): raise BaseException('Unknown protocol: %s'%protocol) - def send(self, messages, channel='default'): + def send(self, messages, callback): sub = [] for message in messages: t@@ -485,21 +391,21 @@ class Interface(threading.Thread): if sub: with self.lock: - if self.subscriptions.get(channel) is None: - self.subscriptions[channel] = [] + if self.subscriptions.get(callback) is None: + self.subscriptions[callback] = [] for message in sub: - if message not in self.subscriptions[channel]: - self.subscriptions[channel].append(message) + if message not in self.subscriptions[callback]: + self.subscriptions[callback].append(message) if not self.is_connected: return if self.protocol in 'st': with self.lock: - out = self.send_tcp(messages, channel) + out = self.send_tcp(messages, callback) else: # do not use lock, http is synchronous - out = self.send_http(messages, channel) + out = self.send_http(messages, callback) return out t@@ -525,6 +431,7 @@ class Interface(threading.Thread): def set_server(self, server, proxy=None): + "todo: remove this" # raise an error if the format isnt correct a,b,c = server.split(':') b = int(b) t@@ -540,46 +447,25 @@ class Interface(threading.Thread): self.is_connected = False # this exits the polling loop self.trigger_callback('disconnecting') # for actively disconnecting + def stop(self): if self.is_connected and self.protocol in 'st' and self.s: self.s.shutdown(socket.SHUT_RDWR) self.s.close() - def get_servers(self): - if not self.servers: - return DEFAULT_SERVERS - else: - return self.servers - - - def is_empty(self, channel): - q = self.responses.get(channel) - if q: - return q.empty() - else: - return True - - - def get_pending_requests(self, channel): - result = [] - with self.lock: - for k, v in self.unanswered_requests.items(): - a, b, c = v - if c == channel: result.append(k) - return result - - def is_up_to_date(self, channel): - return self.is_empty(channel) and not self.get_pending_requests(channel) + def is_up_to_date(self): + return self.unanswered_requests == {} def synchronous_get(self, requests, timeout=100000000): # todo: use generators, unanswered_requests should be a list of arrays... - ids = self.send(requests) + q = Queue.Queue() + ids = self.send(requests, lambda i,r: queue.put(r)) id2 = ids[:] res = {} while ids: - r = self.responses['default'].get(True, timeout) + r = queue.get(True, timeout) _id = r.get('id') if _id in ids: ids.remove(_id) t@@ -595,20 +481,16 @@ class Interface(threading.Thread): threading.Thread.start(self) - def run(self): self.init_interface() if self.is_connected: - self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])]) + self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version) self.change_status() self.run_tcp() if self.protocol in 'st' else self.run_http() self.change_status() + def change_status(self): #print "change status", self.server, self.is_connected self.queue.put(self) - - - - DIR diff --git a/lib/network.py b/lib/network.py t@@ -4,6 +4,35 @@ from bitcoin import * import interface from blockchain import Blockchain +DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} + +DEFAULT_SERVERS = { + 'the9ull.homelinux.org': {'h': '8082', 't': '50001'}, + 'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'electrum.dynaloop.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'electrum.mooo.com': {'h': '8081', 't': '50001'}, + 'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'}, + 'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'}, + 'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'}, + 'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, + 'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'} +} + + + +def filter_protocol(servers, p): + l = [] + for k, protocols in servers.items(): + if p in protocols: + l.append( ':'.join([k, protocols[p], p]) ) + return l + + class Network(threading.Thread): t@@ -16,8 +45,10 @@ class Network(threading.Thread): self.interfaces = {} self.queue = Queue.Queue() self.default_server = self.config.get('server') - self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s') + self.servers_list = filter_protocol(DEFAULT_SERVERS,'s') self.callbacks = {} + #banner + self.banner = '' def register_callback(self, event, callback): t@@ -45,11 +76,17 @@ class Network(threading.Thread): return server + def get_servers(self): + if not self.servers: + return DEFAULT_SERVERS + else: + return self.servers + + def start_interface(self, server): if server in self.interfaces.keys(): return i = interface.Interface({'server':server}) - i.network = self # fixme self.interfaces[server] = i i.start(self.queue) t@@ -97,13 +134,10 @@ class Network(threading.Thread): i = self.queue.get() if i.is_connected: - i.register_channel('verifier', self.blockchain.queue) - i.register_channel('get_header') - i.send([ ('blockchain.headers.subscribe',[])], 'verifier') - + i.send([ ('blockchain.headers.subscribe',[])], self.on_header) if i == self.interface: - i.send([('server.banner',[])]) - i.send([('server.peers.subscribe',[])]) + i.send([('server.banner',[])], self.on_banner) + i.send([('server.peers.subscribe',[])], self.on_peers) else: self.servers_list.remove(i.server) self.interfaces.pop(i.server) t@@ -116,13 +150,16 @@ class Network(threading.Thread): else: self.trigger_callback('disconnected') + def on_header(self, i, result): + self.blockchain.queue.put((i,result)) - def on_peers(self, result): - # populate servers list here - pass + def on_peers(self, i, r): + self.servers = self.parse_servers(r.get('result')) + self.trigger_callback('peers') - def on_banner(self, result): - pass + def on_banner(self, i, r): + self.banner = r.get('result') + self.trigger_callback('banner') def stop(self): with self.lock: self.running = False t@@ -131,6 +168,38 @@ class Network(threading.Thread): with self.lock: return self.running + def parse_servers(self, result): + """ parse servers list into dict format""" + from version import PROTOCOL_VERSION + servers = {} + for item in result: + host = item[1] + out = {} + version = None + pruning_level = '-' + if len(item) > 2: + for v in item[2]: + if re.match("[stgh]\d*", v): + protocol, port = v[0], v[1:] + if port == '': port = DEFAULT_PORTS[protocol] + out[protocol] = port + elif re.match("v(.?)+", v): + version = v[1:] + elif re.match("p\d*", v): + pruning_level = v[1:] + if pruning_level == '': pruning_level = '0' + try: + is_recent = float(version)>=float(PROTOCOL_VERSION) + except: + is_recent = False + + if out and is_recent: + out['pruning'] = pruning_level + servers[host] = out + + return servers + + def resend_subscriptions(self, subscriptions): for channel, messages in subscriptions.items(): if messages: t@@ -141,7 +210,7 @@ class Network(threading.Thread): if __name__ == "__main__": import simple_config - config = simple_config.SimpleConfig({'verbose':True}) + config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'}) network = Network(config) network.start() DIR diff --git a/lib/verifier.py b/lib/verifier.py t@@ -35,11 +35,12 @@ class TxVerifier(threading.Thread): self.blockchain = network.blockchain self.interface = network.interface self.transactions = {} # requested verifications (with height sent by the requestor) - self.interface.register_channel('txverifier') + #self.interface.register_channel('txverifier') self.verified_tx = storage.get('verified_tx3',{}) # height, timestamp of verified transactions self.merkle_roots = storage.get('merkle_roots',{}) # hashed by me self.lock = threading.Lock() self.running = False + self.queue = Queue.Queue() def get_confirmations(self, tx): t@@ -107,13 +108,14 @@ class TxVerifier(threading.Thread): if tx_hash not in self.verified_tx: if self.merkle_roots.get(tx_hash) is None and tx_hash not in requested_merkle: print_error('requesting merkle', tx_hash) - self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], 'txverifier') + self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r)) requested_merkle.append(tx_hash) try: - r = self.interface.get_response('txverifier',timeout=1) + r = self.queue.get(timeout=1) except Queue.Empty: continue + if not r: continue if r.get('error'): DIR diff --git a/lib/wallet.py b/lib/wallet.py t@@ -217,13 +217,17 @@ class Wallet: def set_up_to_date(self,b): with self.lock: self.up_to_date = b + def is_up_to_date(self): with self.lock: return self.up_to_date + def update(self): self.up_to_date = False - self.interface.poke('synchronizer') - while not self.is_up_to_date(): time.sleep(0.1) + #self.interface.poke('synchronizer') + while not self.is_up_to_date(): + time.sleep(0.1) + def import_key(self, sec, password): # check password t@@ -652,7 +656,7 @@ class Wallet: if value >= self.gap_limit: self.gap_limit = value self.storage.put('gap_limit', self.gap_limit, True) - self.interface.poke('synchronizer') + #self.interface.poke('synchronizer') return True elif value >= self.min_acceptable_gap(): t@@ -1184,9 +1188,13 @@ class Wallet: def send_tx(self, tx): # asynchronous self.tx_event.clear() - self.interface.send([('blockchain.transaction.broadcast', [str(tx)])], 'synchronizer') + self.interface.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast) return tx.hash() + def on_broadcast(self, i, result): + self.tx_result = result + self.tx_event.set() + def receive_tx(self,tx_hash): out = self.tx_result if out != tx_hash: t@@ -1378,15 +1386,14 @@ class WalletSynchronizer(threading.Thread): self.wallet = wallet wallet.synchronizer = self self.interface = self.wallet.interface - self.interface.register_channel('synchronizer') #self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False)) self.was_updated = True self.running = False self.lock = threading.Lock() + self.queue = Queue.Queue() def stop(self): with self.lock: self.running = False - self.interface.poke('synchronizer') def is_running(self): with self.lock: return self.running t@@ -1396,7 +1403,7 @@ class WalletSynchronizer(threading.Thread): messages = [] for addr in addresses: messages.append(('blockchain.address.subscribe', [addr])) - self.interface.send( messages, 'synchronizer') + self.interface.send( messages, lambda i,r: self.queue.put(r)) def run(self): t@@ -1436,26 +1443,26 @@ class WalletSynchronizer(threading.Thread): # request missing transactions for tx_hash, tx_height in missing_tx: if (tx_hash, tx_height) not in requested_tx: - self.interface.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], 'synchronizer') + self.interface.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r)) requested_tx.append( (tx_hash, tx_height) ) missing_tx = [] # detect if situation has changed - if not self.interface.is_up_to_date('synchronizer'): - if self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(False) - self.was_updated = True - else: + if self.interface.is_up_to_date() and self.queue.empty(): if not self.wallet.is_up_to_date(): self.wallet.set_up_to_date(True) self.was_updated = True + else: + if self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(False) + self.was_updated = True if self.was_updated: - self.interface.network.trigger_callback('updated') + self.wallet.network.trigger_callback('updated') self.was_updated = False # 2. get a response - r = self.interface.get_response('synchronizer') + r = self.queue.get(block=True, timeout=10000000000) # poke sends None. (needed during stop) if not r: continue t@@ -1473,7 +1480,7 @@ class WalletSynchronizer(threading.Thread): addr = params[0] if self.wallet.get_status(self.wallet.get_history(addr)) != result: if requested_histories.get(addr) is None: - self.interface.send([('blockchain.address.get_history', [addr])], 'synchronizer') + self.interface.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r)) requested_histories[addr] = result elif method == 'blockchain.address.get_history': t@@ -1519,15 +1526,11 @@ class WalletSynchronizer(threading.Thread): requested_tx.remove( (tx_hash, tx_height) ) print_error("received tx:", tx_hash, len(tx.raw)) - elif method == 'blockchain.transaction.broadcast': - self.wallet.tx_result = result - self.wallet.tx_event.set() - else: print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) ) if self.was_updated and not requested_tx: - self.interface.network.trigger_callback('updated') - self.interface.network.trigger_callback("new_transaction") # Updated gets called too many times from other places as well; if we use that signal we get the notification three times + self.wallet.network.trigger_callback('updated') + self.wallet.network.trigger_callback("new_transaction") # Updated gets called too many times from other places as well; if we use that signal we get the notification three times self.was_updated = False