URI: 
       tsynchronizer.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tsynchronizer.py (13823B)
       ---
            1 #!/usr/bin/env python
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2014 Thomas Voegtlin
            5 #
            6 # Permission is hereby granted, free of charge, to any person
            7 # obtaining a copy of this software and associated documentation files
            8 # (the "Software"), to deal in the Software without restriction,
            9 # including without limitation the rights to use, copy, modify, merge,
           10 # publish, distribute, sublicense, and/or sell copies of the Software,
           11 # and to permit persons to whom the Software is furnished to do so,
           12 # subject to the following conditions:
           13 #
           14 # The above copyright notice and this permission notice shall be
           15 # included in all copies or substantial portions of the Software.
           16 #
           17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           24 # SOFTWARE.
           25 import asyncio
           26 import hashlib
           27 from typing import Dict, List, TYPE_CHECKING, Tuple, Set
           28 from collections import defaultdict
           29 import logging
           30 
           31 from aiorpcx import TaskGroup, run_in_thread, RPCError
           32 
           33 from . import util
           34 from .transaction import Transaction, PartialTransaction
           35 from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy
           36 from .bitcoin import address_to_scripthash, is_address
           37 from .logging import Logger
           38 from .interface import GracefulDisconnect, NetworkTimeout
           39 
           40 if TYPE_CHECKING:
           41     from .network import Network
           42     from .address_synchronizer import AddressSynchronizer
           43 
           44 
           45 class SynchronizerFailure(Exception): pass
           46 
           47 
           48 def history_status(h):
           49     if not h:
           50         return None
           51     status = ''
           52     for tx_hash, height in h:
           53         status += tx_hash + ':%d:' % height
           54     return bh2u(hashlib.sha256(status.encode('ascii')).digest())
           55 
           56 
           57 class SynchronizerBase(NetworkJobOnDefaultServer):
           58     """Subscribe over the network to a set of addresses, and monitor their statuses.
           59     Every time a status changes, run a coroutine provided by the subclass.
           60     """
           61     def __init__(self, network: 'Network'):
           62         self.asyncio_loop = network.asyncio_loop
           63         self._reset_request_counters()
           64 
           65         NetworkJobOnDefaultServer.__init__(self, network)
           66 
           67     def _reset(self):
           68         super()._reset()
           69         self.requested_addrs = set()
           70         self.scripthash_to_address = {}
           71         self._processed_some_notifications = False  # so that we don't miss them
           72         self._reset_request_counters()
           73         # Queues
           74         self.add_queue = asyncio.Queue()
           75         self.status_queue = asyncio.Queue()
           76 
           77     async def _run_tasks(self, *, taskgroup):
           78         await super()._run_tasks(taskgroup=taskgroup)
           79         try:
           80             async with taskgroup as group:
           81                 await group.spawn(self.send_subscriptions())
           82                 await group.spawn(self.handle_status())
           83                 await group.spawn(self.main())
           84         finally:
           85             # we are being cancelled now
           86             # TODO: libbitcoin (this would be handled by zeromq.Client.stop()
           87             print("self.session.unsubscribe(self.status_queue)")
           88 
           89     def _reset_request_counters(self):
           90         self._requests_sent = 0
           91         self._requests_answered = 0
           92 
           93     def add(self, addr):
           94         asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
           95 
           96     async def _add_address(self, addr: str):
           97         # note: this method is async as add_queue.put_nowait is not thread-safe.
           98         if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
           99         if addr in self.requested_addrs: return
          100         self.requested_addrs.add(addr)
          101         self.add_queue.put_nowait(addr)
          102 
          103     async def _on_address_status(self, addr, status):
          104         """Handle the change of the status of an address."""
          105         raise NotImplementedError()  # implemented by subclasses
          106 
          107     async def send_subscriptions(self):
          108         async def subscribe_to_address(addr):
          109             h = address_to_scripthash(addr)
          110             self.scripthash_to_address[h] = addr
          111             self._requests_sent += 1
          112             async with self._network_request_semaphore:
          113                 # TODO: libbitcoin
          114                 print("await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)")
          115                 # TODO: libbitcoin XXX: Review this, it's probably incorrect
          116                 print(f"DEBUG: network: subscribe_to_address: {h}")
          117                 await self.interface.client._subscribe_to_scripthash(h, self.status_queue)
          118             self._requests_answered += 1
          119             self.requested_addrs.remove(addr)
          120 
          121         while True:
          122             addr = await self.add_queue.get()
          123             await self.taskgroup.spawn(subscribe_to_address, addr)
          124 
          125     async def handle_status(self):
          126         while True:
          127             h, status = await self.status_queue.get()
          128             addr = self.scripthash_to_address[h]
          129             await self.taskgroup.spawn(self._on_address_status, addr, status)
          130             self._processed_some_notifications = True
          131 
          132     def num_requests_sent_and_answered(self) -> Tuple[int, int]:
          133         return self._requests_sent, self._requests_answered
          134 
          135     async def main(self):
          136         raise NotImplementedError()  # implemented by subclasses
          137 
          138 
          139 class Synchronizer(SynchronizerBase):
          140     '''The synchronizer keeps the wallet up-to-date with its set of
          141     addresses and their transactions.  It subscribes over the network
          142     to wallet addresses, gets the wallet to generate new addresses
          143     when necessary, requests the transaction history of any addresses
          144     we don't have the full history of, and requests binary transaction
          145     data of any transactions the wallet doesn't have.
          146     '''
          147     def __init__(self, wallet: 'AddressSynchronizer'):
          148         self.wallet = wallet
          149         SynchronizerBase.__init__(self, wallet.network)
          150 
          151     def _reset(self):
          152         super()._reset()
          153         self.requested_tx = {}
          154         self.requested_histories = set()
          155         self._stale_histories = dict()  # type: Dict[str, asyncio.Task]
          156 
          157     def diagnostic_name(self):
          158         return self.wallet.diagnostic_name()
          159 
          160     def is_up_to_date(self):
          161         return (not self.requested_addrs
          162                 and not self.requested_histories
          163                 and not self.requested_tx
          164                 and not self._stale_histories)
          165 
          166     async def _on_address_status(self, addr, status):
          167         history = self.wallet.db.get_addr_history(addr)
          168         if history_status(history) == status:
          169             return
          170         # No point in requesting history twice for the same announced status.
          171         # However if we got announced a new status, we should request history again:
          172         if (addr, status) in self.requested_histories:
          173             return
          174         # request address history
          175         self.requested_histories.add((addr, status))
          176         self._stale_histories.pop(addr, asyncio.Future()).cancel()
          177         h = address_to_scripthash(addr)
          178         self._requests_sent += 1
          179         async with self._network_request_semaphore:
          180             result = await self.interface.get_history_for_scripthash(h)
          181         self._requests_answered += 1
          182         self.logger.info(f"receiving history {addr} {len(result)}")
          183         hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
          184         # tx_fees
          185         tx_fees = [(item['tx_hash'], item.get('fee')) for item in result]
          186         tx_fees = dict(filter(lambda x:x[1] is not None, tx_fees))
          187         # Check that the status corresponds to what was announced
          188         if history_status(hist) != status:
          189             # could happen naturally if history changed between getting status and history (race)
          190             self.logger.info(f"error: status mismatch: {addr}. we'll wait a bit for status update.")
          191             # The server is supposed to send a new status notification, which will trigger a new
          192             # get_history. We shall wait a bit for this to happen, otherwise we disconnect.
          193             async def disconnect_if_still_stale():
          194                 timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
          195                 await asyncio.sleep(timeout)
          196                 raise SynchronizerFailure(f"timeout reached waiting for addr {addr}: history still stale")
          197             self._stale_histories[addr] = await self.taskgroup.spawn(disconnect_if_still_stale)
          198         else:
          199             self._stale_histories.pop(addr, asyncio.Future()).cancel()
          200             # Store received history
          201             self.wallet.receive_history_callback(addr, hist, tx_fees)
          202             # Request transactions we don't have
          203             await self._request_missing_txs(hist)
          204 
          205         # Remove request; this allows up_to_date to be True
          206         self.requested_histories.discard((addr, status))
          207 
          208     async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False):
          209         # "hist" is a list of [tx_hash, tx_height] lists
          210         transaction_hashes = []
          211         for tx_hash, tx_height in hist:
          212             if tx_hash in self.requested_tx:
          213                 continue
          214             tx = self.wallet.db.get_transaction(tx_hash)
          215             if tx and not isinstance(tx, PartialTransaction):
          216                 continue  # already have complete tx
          217             transaction_hashes.append(tx_hash)
          218             self.requested_tx[tx_hash] = tx_height
          219 
          220         if not transaction_hashes: return
          221         async with TaskGroup() as group:
          222             for tx_hash in transaction_hashes:
          223                 await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))
          224 
          225     async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
          226         self._requests_sent += 1
          227         try:
          228             async with self._network_request_semaphore:
          229                 raw_tx = await self.interface.get_transaction(tx_hash)
          230         except RPCError as e:
          231             # most likely, "No such mempool or blockchain transaction"
          232             if allow_server_not_finding_tx:
          233                 self.requested_tx.pop(tx_hash)
          234                 return
          235             else:
          236                 raise
          237         finally:
          238             self._requests_answered += 1
          239         tx = Transaction(raw_tx)
          240         if tx_hash != tx.txid():
          241             raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
          242         tx_height = self.requested_tx.pop(tx_hash)
          243         self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
          244         self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}")
          245         # callbacks
          246         util.trigger_callback('new_transaction', self.wallet, tx)
          247 
          248     async def main(self):
          249         self.wallet.set_up_to_date(False)
          250         # request missing txns, if any
          251         for addr in random_shuffled_copy(self.wallet.db.get_history()):
          252             history = self.wallet.db.get_addr_history(addr)
          253             # Old electrum servers returned ['*'] when all history for the address
          254             # was pruned. This no longer happens but may remain in old wallets.
          255             if history == ['*']: continue
          256             await self._request_missing_txs(history, allow_server_not_finding_tx=True)
          257         # add addresses to bootstrap
          258         for addr in random_shuffled_copy(self.wallet.get_addresses()):
          259             await self._add_address(addr)
          260         # main loop
          261         while True:
          262             await asyncio.sleep(0.1)
          263             await run_in_thread(self.wallet.synchronize)
          264             up_to_date = self.is_up_to_date()
          265             if (up_to_date != self.wallet.is_up_to_date()
          266                     or up_to_date and self._processed_some_notifications):
          267                 self._processed_some_notifications = False
          268                 if up_to_date:
          269                     self._reset_request_counters()
          270                 self.wallet.set_up_to_date(up_to_date)
          271                 util.trigger_callback('wallet_updated', self.wallet)
          272 
          273 
          274 class Notifier(SynchronizerBase):
          275     """Watch addresses. Every time the status of an address changes,
          276     an HTTP POST is sent to the corresponding URL.
          277     """
          278     def __init__(self, network):
          279         SynchronizerBase.__init__(self, network)
          280         self.watched_addresses = defaultdict(list)  # type: Dict[str, List[str]]
          281         self._start_watching_queue = asyncio.Queue()  # type: asyncio.Queue[Tuple[str, str]]
          282 
          283     async def main(self):
          284         # resend existing subscriptions if we were restarted
          285         for addr in self.watched_addresses:
          286             await self._add_address(addr)
          287         # main loop
          288         while True:
          289             addr, url = await self._start_watching_queue.get()
          290             self.watched_addresses[addr].append(url)
          291             await self._add_address(addr)
          292 
          293     async def start_watching_addr(self, addr: str, url: str):
          294         await self._start_watching_queue.put((addr, url))
          295 
          296     async def stop_watching_addr(self, addr: str):
          297         self.watched_addresses.pop(addr, None)
          298         # TODO blockchain.scripthash.unsubscribe
          299 
          300     async def _on_address_status(self, addr, status):
          301         if addr not in self.watched_addresses:
          302             return
          303         self.logger.info(f'new status for addr {addr}')
          304         headers = {'content-type': 'application/json'}
          305         data = {'address': addr, 'status': status}
          306         for url in self.watched_addresses[addr]:
          307             try:
          308                 async with make_aiohttp_session(proxy=self.network.proxy, headers=headers) as session:
          309                     async with session.post(url, json=data, headers=headers) as resp:
          310                         await resp.text()
          311             except Exception as e:
          312                 self.logger.info(repr(e))
          313             else:
          314                 self.logger.info(f'Got Response for {addr}')