URI: 
       tsynchronizer: make 'add' thread-safe, and some clean-up - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 002b8a99e264a824bfee721c320ec8b763c149bd
   DIR parent eccb8ec2d6644a5e52dd85c4925e29402046ad4c
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Thu, 20 Sep 2018 18:11:26 +0200
       
       synchronizer: make 'add' thread-safe, and some clean-up
       
       Diffstat:
         M electrum/synchronizer.py            |      46 +++++++++++++++++--------------
       
       1 file changed, 25 insertions(+), 21 deletions(-)
       ---
   DIR diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py
       t@@ -51,6 +51,7 @@ class Synchronizer(PrintError):
            '''
            def __init__(self, wallet):
                self.wallet = wallet
       +        self.asyncio_loop = wallet.network.asyncio_loop
                self.requested_tx = {}
                self.requested_histories = {}
                self.requested_addrs = set()
       t@@ -69,10 +70,13 @@ class Synchronizer(PrintError):
                        and not self.requested_tx)
        
            def add(self, addr):
       +        asyncio.run_coroutine_threadsafe(self._add(addr), self.asyncio_loop)
       +
       +    async def _add(self, addr):
                self.requested_addrs.add(addr)
       -        self.add_queue.put_nowait(addr)
       +        await self.add_queue.put(addr)
        
       -    async def on_address_status(self, addr, status):
       +    async def _on_address_status(self, addr, status):
                history = self.wallet.history.get(addr, [])
                if history_status(history) == status:
                    return
       t@@ -98,12 +102,12 @@ class Synchronizer(PrintError):
                    # Store received history
                    self.wallet.receive_history_callback(addr, hist, tx_fees)
                    # Request transactions we don't have
       -            await self.request_missing_txs(hist)
       +            await self._request_missing_txs(hist)
        
                # Remove request; this allows up_to_date to be True
                self.requested_histories.pop(addr)
        
       -    async def request_missing_txs(self, hist):
       +    async def _request_missing_txs(self, hist):
                # "hist" is a list of [tx_hash, tx_height] lists
                transaction_hashes = []
                for tx_hash, tx_height in hist:
       t@@ -114,11 +118,12 @@ class Synchronizer(PrintError):
                    transaction_hashes.append(tx_hash)
                    self.requested_tx[tx_hash] = tx_height
        
       +        if not transaction_hashes: return
                async with TaskGroup() as group:
                    for tx_hash in transaction_hashes:
       -                await group.spawn(self.get_transaction, tx_hash)
       +                await group.spawn(self._get_transaction, tx_hash)
        
       -    async def get_transaction(self, tx_hash):
       +    async def _get_transaction(self, tx_hash):
                result = await self.session.send_request('blockchain.transaction.get', [tx_hash])
                tx = Transaction(result)
                try:
       t@@ -137,22 +142,22 @@ class Synchronizer(PrintError):
                # callbacks
                self.wallet.network.trigger_callback('new_transaction', self.wallet, tx)
        
       -    async def subscribe_to_address(self, addr):
       -        h = address_to_scripthash(addr)
       -        self.scripthash_to_address[h] = addr
       -        await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
       -        self.requested_addrs.remove(addr)
       -
            async def send_subscriptions(self, group: TaskGroup):
       +        async def subscribe_to_address(addr):
       +            h = address_to_scripthash(addr)
       +            self.scripthash_to_address[h] = addr
       +            await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
       +            self.requested_addrs.remove(addr)
       +
                while True:
                    addr = await self.add_queue.get()
       -            await group.spawn(self.subscribe_to_address, addr)
       +            await group.spawn(subscribe_to_address, addr)
        
            async def handle_status(self, group: TaskGroup):
                while True:
                    h, status = await self.status_queue.get()
                    addr = self.scripthash_to_address[h]
       -            await group.spawn(self.on_address_status, addr, status)
       +            await group.spawn(self._on_address_status, addr, status)
                    self._processed_some_notifications = True
        
            @property
       t@@ -164,15 +169,14 @@ class Synchronizer(PrintError):
            async def main(self):
                self.wallet.set_up_to_date(False)
                # request missing txns, if any
       -        async with TaskGroup() as group:
       -            for history in self.wallet.history.values():
       -                # Old electrum servers returned ['*'] when all history for the address
       -                # was pruned. This no longer happens but may remain in old wallets.
       -                if history == ['*']: continue
       -                await group.spawn(self.request_missing_txs, history)
       +        for history in self.wallet.history.values():
       +            # Old electrum servers returned ['*'] when all history for the address
       +            # was pruned. This no longer happens but may remain in old wallets.
       +            if history == ['*']: continue
       +            await self._request_missing_txs(history)
                # add addresses to bootstrap
                for addr in self.wallet.get_addresses():
       -            self.add(addr)
       +            await self._add(addr)
                # main loop
                while True:
                    await asyncio.sleep(0.1)