URI: 
       tMerge branch 'kyuupichan-synchronizer-unthread' - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 90d32038faf618031241e2ca4ea206ee7504051c
   DIR parent 0c37009cdb3cb276096cf14b5dca18b9338f7eaa
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Thu,  2 Jul 2015 09:25:43 +0200
       
       Merge branch 'kyuupichan-synchronizer-unthread'
       
       Diffstat:
         M lib/network_proxy.py                |       3 +++
         M lib/synchronizer.py                 |     310 +++++++++++++++----------------
         M lib/wallet.py                       |       7 ++++---
       
       3 files changed, 160 insertions(+), 160 deletions(-)
       ---
   DIR diff --git a/lib/network_proxy.py b/lib/network_proxy.py
       t@@ -61,10 +61,13 @@ class NetworkProxy(util.DaemonThread):
                self.blockchain_height = 0
                self.server_height = 0
                self.interfaces = []
       +        self.jobs = []
        
        
            def run(self):
                while self.is_running():
       +            for job in self.jobs:
       +                job()
                    try:
                        response = self.pipe.get()
                    except util.timeout:
   DIR diff --git a/lib/synchronizer.py b/lib/synchronizer.py
       t@@ -17,174 +17,170 @@
        # along with this program. If not, see <http://www.gnu.org/licenses/>.
        
        
       -import threading
       -import time
       -import Queue
       +from threading import Lock
        
       -import bitcoin
       -import util
       +from bitcoin import Hash, hash_encode
        from transaction import Transaction
       +from util import print_error, print_msg
        
        
       -class WalletSynchronizer(util.DaemonThread):
       +class WalletSynchronizer():
       +    '''The synchronizer keeps the wallet up-to-date with its set of
       +    addresses and their transactions.  It subscribes over the network
       +    to wallet addresses, gets the wallet to generate new addresses
       +    when necessary, requests the transaction history of any addresses
       +    we don't have the full history of, and requests binary transaction
       +    data of any transactions the wallet doesn't have.
       +
       +    External interface: __init__() and add() member functions.
       +    '''
        
            def __init__(self, wallet, network):
       -        util.DaemonThread.__init__(self)
                self.wallet = wallet
                self.network = network
       -        self.was_updated = True
       -        self.queue = Queue.Queue()
       -        self.address_queue = Queue.Queue()
       +        self.new_addresses = set()
       +        # Entries are (tx_hash, tx_height) tuples
       +        self.requested_tx = set()
       +        self.requested_histories = {}
       +        self.requested_addrs = set()
       +        self.lock = Lock()
       +        self.initialize()
       +
       +    def print_error(self, *msg):
       +        print_error("[Synchronizer]", *msg)
       +
       +    def print_msg(self, *msg):
       +        print_msg("[Synchronizer]", *msg)
       +
       +    def parse_response(self, response):
       +        if response.get('error'):
       +            self.print_error("response error:", response)
       +            return None, None
       +        return response['params'], response['result']
       +
       +    def is_up_to_date(self):
       +        return (not self.requested_tx and not self.requested_histories
       +                and not self.requested_addrs)
        
            def add(self, address):
       -        self.address_queue.put(address)
       +        '''This can be called from the proxy or GUI threads.'''
       +        with self.lock:
       +            self.new_addresses.add(address)
        
            def subscribe_to_addresses(self, addresses):
       -        messages = []
       -        for addr in addresses:
       -            messages.append(('blockchain.address.subscribe', [addr]))
       -        self.network.send(messages, self.queue.put)
       -
       -    def run(self):
       -        while self.is_running():
       -            if not self.network.is_connected():
       -                time.sleep(0.1)
       -                continue
       -            self.run_interface()
       -        self.print_error("stopped")
       -
       -    def run_interface(self):
       -        #print_error("synchronizer: connected to", self.network.get_parameters())
       -
       -        requested_tx = []
       -        missing_tx = []
       -        requested_histories = {}
       -
       -        # request any missing transactions
       +        if addresses:
       +            self.requested_addrs |= addresses
       +            msgs = map(lambda addr: ('blockchain.address.subscribe', [addr]),
       +                       addresses)
       +            self.network.send(msgs, self.addr_subscription_response)
       +
       +    def addr_subscription_response(self, response):
       +        params, result = self.parse_response(response)
       +        if not params:
       +            return
       +        addr = params[0]
       +        if addr in self.requested_addrs:  # Notifications won't be in
       +            self.requested_addrs.remove(addr)
       +        history = self.wallet.get_address_history(addr)
       +        if self.wallet.get_status(history) != result:
       +            if self.requested_histories.get(addr) is None:
       +                self.network.send([('blockchain.address.get_history', [addr])],
       +                                  self.addr_history_response)
       +                self.requested_histories[addr] = result
       +
       +    def addr_history_response(self, response):
       +        params, result = self.parse_response(response)
       +        if not params:
       +            return
       +        addr = params[0]
       +        self.print_error("receiving history", addr, len(result))
       +        server_status = self.requested_histories.pop(addr)
       +
       +        # Check that txids are unique
       +        hashes = set(map(lambda item: item['tx_hash'], result))
       +        if len(hashes) != len(result):
       +            self.print_error("error: server history has non-unique txids: %s"% addr)
       +            return
       +
       +        # Check that the status corresponds to what was announced
       +        hist = map(lambda item: (item['tx_hash'], item['height']), result)
       +        if self.wallet.get_status(hist) != server_status:
       +            self.print_error("error: status mismatch: %s" % addr)
       +            return
       +
       +        # Store received history
       +        self.wallet.receive_history_callback(addr, hist)
       +
       +        # Request transactions we don't have
       +        self.request_missing_txs(hist)
       +
       +    def tx_response(self, response):
       +        params, result = self.parse_response(response)
       +        if not params:
       +            return
       +        tx_hash, tx_height = params
       +        assert tx_hash == hash_encode(Hash(result.decode('hex')))
       +        tx = Transaction(result)
       +        try:
       +            tx.deserialize()
       +        except Exception:
       +            self.print_msg("cannot deserialize transaction, skipping", tx_hash)
       +            return
       +
       +        self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
       +        self.requested_tx.remove((tx_hash, tx_height))
       +        self.print_error("received tx:", tx_hash, len(tx.raw))
       +        if not self.requested_tx:
       +            self.network.trigger_callback('updated')
       +            # Updated gets called too many times from other places as
       +            # well; if we used that signal we get the notification
       +            # three times
       +            self.network.trigger_callback("new_transaction")
       +
       +    def request_missing_txs(self, hist):
       +        # "hist" is a list of [tx_hash, tx_height] lists
       +        missing = set()
       +        for tx_hash, tx_height in hist:
       +            if self.wallet.transactions.get(tx_hash) is None:
       +                missing.add((tx_hash, tx_height))
       +        missing -= self.requested_tx
       +        if missing:
       +            requests = [('blockchain.transaction.get', tx) for tx in missing]
       +            self.network.send(requests, self.tx_response)
       +            self.requested_tx |= missing
       +
       +    def initialize(self):
       +        '''Check the initial state of the wallet.  Subscribe to all its
       +        addresses, and request any transactions in its address history
       +        we don't have.
       +        '''
                for history in self.wallet.history.values():
       -            if history == ['*']: continue
       -            for tx_hash, tx_height in history:
       -                if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx:
       -                    missing_tx.append( (tx_hash, tx_height) )
       -
       -        if missing_tx:
       -            self.print_error("missing tx", missing_tx)
       -
       -        # subscriptions
       -        self.subscribe_to_addresses(self.wallet.addresses(True))
       -
       -        while self.is_running():
       -
       -            # 1. create new addresses
       -            self.wallet.synchronize()
       -
       -            # request missing addresses
       -            new_addresses = []
       -            while True:
       -                try:
       -                    addr = self.address_queue.get(block=False)
       -                except Queue.Empty:
       -                    break
       -                new_addresses.append(addr)
       -            if new_addresses:
       -                self.subscribe_to_addresses(new_addresses)
       -
       -            # request missing transactions
       -            for tx_hash, tx_height in missing_tx:
       -                if (tx_hash, tx_height) not in requested_tx:
       -                    self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], self.queue.put)
       -                    requested_tx.append( (tx_hash, tx_height) )
       -            missing_tx = []
       -
       -            # detect if situation has changed
       -            if self.network.is_up_to_date() and self.queue.empty():
       -                if not self.wallet.is_up_to_date():
       -                    self.wallet.set_up_to_date(True)
       -                    self.was_updated = True
       -                    self.wallet.save_transactions()
       -            else:
       -                if self.wallet.is_up_to_date():
       -                    self.wallet.set_up_to_date(False)
       -                    self.was_updated = True
       -
       -            if self.was_updated:
       -                self.network.trigger_callback('updated')
       -                self.was_updated = False
       -
       -            # 2. get a response
       -            try:
       -                r = self.queue.get(timeout=0.1)
       -            except Queue.Empty:
       -                continue
       -
       -            # 3. process response
       -            method = r['method']
       -            params = r['params']
       -            result = r.get('result')
       -            error = r.get('error')
       -            if error:
       -                self.print_error("error", r)
       +            # Old electrum servers returned ['*'] when all history for
       +            # the address was pruned.  This no longer happens but may
       +            # remain in old wallets.
       +            if history == ['*']:
                        continue
       -
       -            if method == 'blockchain.address.subscribe':
       -                addr = params[0]
       -                if self.wallet.get_status(self.wallet.get_address_history(addr)) != result:
       -                    if requested_histories.get(addr) is None:
       -                        self.network.send([('blockchain.address.get_history', [addr])], self.queue.put)
       -                        requested_histories[addr] = result
       -
       -            elif method == 'blockchain.address.get_history':
       -                addr = params[0]
       -                self.print_error("receiving history", addr, len(result))
       -                hist = []
       -                # check that txids are unique
       -                txids = []
       -                for item in result:
       -                    tx_hash = item['tx_hash']
       -                    if tx_hash not in txids:
       -                        txids.append(tx_hash)
       -                        hist.append( (tx_hash, item['height']) )
       -
       -                if len(hist) != len(result):
       -                    self.print_error("error: server sent history with non-unique txid", result)
       -                    continue
       -
       -                # check that the status corresponds to what was announced
       -                rs = requested_histories.pop(addr)
       -                if self.wallet.get_status(hist) != rs:
       -                    self.print_error("error: status mismatch: %s" % addr)
       -                    continue
       -
       -                # store received history
       -                self.wallet.receive_history_callback(addr, hist)
       -
       -                # request transactions that we don't have
       -                for tx_hash, tx_height in hist:
       -                    if self.wallet.transactions.get(tx_hash) is None:
       -                        if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx:
       -                            missing_tx.append( (tx_hash, tx_height) )
       -
       -            elif method == 'blockchain.transaction.get':
       -                tx_hash = params[0]
       -                tx_height = params[1]
       -                assert tx_hash == bitcoin.hash_encode(bitcoin.Hash(result.decode('hex')))
       -                tx = Transaction(result)
       -                try:
       -                    tx.deserialize()
       -                except Exception:
       -                    self.print_msg("Warning: Cannot deserialize transactions. skipping")
       -                    continue
       -
       -                self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
       -                self.was_updated = True
       -                requested_tx.remove( (tx_hash, tx_height) )
       -                self.print_error("received tx:", tx_hash, len(tx.raw))
       -
       -            else:
       -                self.print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) )
       -
       -            if self.was_updated and not requested_tx:
       -                self.network.trigger_callback('updated')
       -                # Updated gets called too many times from other places as well; if we use that signal we get the notification three times
       -                self.network.trigger_callback("new_transaction")
       -                self.was_updated = False
       +            self.request_missing_txs(history)
       +
       +        if self.requested_tx:
       +            self.print_error("missing tx", self.requested_tx)
       +        self.subscribe_to_addresses(set(self.wallet.addresses(True)))
       +
       +    def main_loop(self):
       +        '''Called from the network proxy thread main loop.'''
       +        # 1. Create new addresses
       +        self.wallet.synchronize()
       +
       +        # 2. Subscribe to new addresses
       +        with self.lock:
       +            addresses = self.new_addresses
       +            self.new_addresses = set()
       +        self.subscribe_to_addresses(addresses)
       +
       +        # 3. Detect if situation has changed
       +        up_to_date = self.is_up_to_date()
       +        if up_to_date != self.wallet.is_up_to_date():
       +            self.wallet.set_up_to_date(up_to_date)
       +            if up_to_date:
       +                self.wallet.save_transactions()
       +            self.network.trigger_callback('updated')
   DIR diff --git a/lib/wallet.py b/lib/wallet.py
       t@@ -1107,15 +1107,16 @@ class Abstract_Wallet(object):
                    self.verifier.start()
                    self.set_verifier(self.verifier)
                    self.synchronizer = WalletSynchronizer(self, network)
       -            self.synchronizer.start()
       +            network.jobs.append(self.synchronizer.main_loop)
                else:
                    self.verifier = None
       -            self.synchronizer =None
       +            self.synchronizer = None
        
            def stop_threads(self):
                if self.network:
                    self.verifier.stop()
       -            self.synchronizer.stop()
       +            self.network.jobs = []
       +            self.synchronizer = None
                    self.storage.put('stored_height', self.get_local_height(), True)
        
            def restore(self, cb):