URI: 
       taddress_synchronizer.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       taddress_synchronizer.py (38895B)
       ---
            1 # Electrum - lightweight Bitcoin client
            2 # Copyright (C) 2018 The Electrum Developers
            3 #
            4 # Permission is hereby granted, free of charge, to any person
            5 # obtaining a copy of this software and associated documentation files
            6 # (the "Software"), to deal in the Software without restriction,
            7 # including without limitation the rights to use, copy, modify, merge,
            8 # publish, distribute, sublicense, and/or sell copies of the Software,
            9 # and to permit persons to whom the Software is furnished to do so,
           10 # subject to the following conditions:
           11 #
           12 # The above copyright notice and this permission notice shall be
           13 # included in all copies or substantial portions of the Software.
           14 #
           15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           22 # SOFTWARE.
           23 
           24 import asyncio
           25 import threading
           26 import asyncio
           27 import itertools
           28 from collections import defaultdict
           29 from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List
           30 
           31 from aiorpcx import TaskGroup
           32 
           33 from . import bitcoin, util
           34 from .bitcoin import COINBASE_MATURITY
           35 from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException
           36 from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
           37 from .synchronizer import Synchronizer
           38 from .verifier import SPV
           39 from .blockchain import hash_header
           40 from .i18n import _
           41 from .logging import Logger
           42 
           43 if TYPE_CHECKING:
           44     from .network import Network
           45     from .wallet_db import WalletDB
           46 
           47 
           48 TX_HEIGHT_FUTURE = -3
           49 TX_HEIGHT_LOCAL = -2
           50 TX_HEIGHT_UNCONF_PARENT = -1
           51 TX_HEIGHT_UNCONFIRMED = 0
           52 
           53 
           54 class HistoryItem(NamedTuple):
           55     txid: str
           56     tx_mined_status: TxMinedInfo
           57     delta: int
           58     fee: Optional[int]
           59     balance: int
           60 
           61 
           62 class TxWalletDelta(NamedTuple):
           63     is_relevant: bool  # "related to wallet?"
           64     is_any_input_ismine: bool
           65     is_all_input_ismine: bool
           66     delta: int
           67     fee: Optional[int]
           68 
           69 
           70 class AddressSynchronizer(Logger):
           71     """
           72     inherited by wallet
           73     """
           74 
           75     network: Optional['Network']
           76     synchronizer: Optional['Synchronizer']
           77     verifier: Optional['SPV']
           78 
           79     def __init__(self, db: 'WalletDB'):
           80         self.db = db
           81         self.network = None
           82         Logger.__init__(self)
           83         # verifier (SPV) and synchronizer are started in start_network
           84         self.synchronizer = None
           85         self.verifier = None
           86         # locks: if you need to take multiple ones, acquire them in the order they are defined here!
           87         self.lock = threading.RLock()
           88         self.transaction_lock = threading.RLock()
           89         self.future_tx = {}  # type: Dict[str, int]  # txid -> blocks remaining
           90         # Transactions pending verification.  txid -> tx_height. Access with self.lock.
           91         self.unverified_tx = defaultdict(int)
           92         # true when synchronized
           93         self.up_to_date = False
           94         # thread local storage for caching stuff
           95         self.threadlocal_cache = threading.local()
           96 
           97         self._get_addr_balance_cache = {}
           98 
           99         self.load_and_cleanup()
          100 
          101     def with_lock(func):
          102         def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
          103             with self.lock:
          104                 return func(self, *args, **kwargs)
          105         return func_wrapper
          106 
          107     def with_transaction_lock(func):
          108         def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
          109             with self.transaction_lock:
          110                 return func(self, *args, **kwargs)
          111         return func_wrapper
          112 
          113     def load_and_cleanup(self):
          114         self.load_local_history()
          115         self.check_history()
          116         self.load_unverified_transactions()
          117         self.remove_local_transactions_we_dont_have()
          118 
          119     def is_mine(self, address: Optional[str]) -> bool:
          120         if not address: return False
          121         return self.db.is_addr_in_history(address)
          122 
          123     def get_addresses(self):
          124         return sorted(self.db.get_history())
          125 
          126     def get_address_history(self, addr: str) -> Sequence[Tuple[str, int]]:
          127         """Returns the history for the address, in the format that would be returned by a server.
          128 
          129         Note: The difference between db.get_addr_history and this method is that
          130         db.get_addr_history stores the response from a server, so it only includes txns
          131         a server sees, i.e. that does not contain local and future txns.
          132         """
          133         h = []
          134         # we need self.transaction_lock but get_tx_height will take self.lock
          135         # so we need to take that too here, to enforce order of locks
          136         with self.lock, self.transaction_lock:
          137             related_txns = self._history_local.get(addr, set())
          138             for tx_hash in related_txns:
          139                 tx_height = self.get_tx_height(tx_hash).height
          140                 h.append((tx_hash, tx_height))
          141         return h
          142 
          143     def get_address_history_len(self, addr: str) -> int:
          144         """Return number of transactions where address is involved."""
          145         return len(self._history_local.get(addr, ()))
          146 
          147     def get_txin_address(self, txin: TxInput) -> Optional[str]:
          148         if isinstance(txin, PartialTxInput):
          149             if txin.address:
          150                 return txin.address
          151         prevout_hash = txin.prevout.txid.hex()
          152         prevout_n = txin.prevout.out_idx
          153         for addr in self.db.get_txo_addresses(prevout_hash):
          154             d = self.db.get_txo_addr(prevout_hash, addr)
          155             if prevout_n in d:
          156                 return addr
          157         tx = self.db.get_transaction(prevout_hash)
          158         if tx:
          159             return tx.outputs()[prevout_n].address
          160         return None
          161 
          162     def get_txin_value(self, txin: TxInput, *, address: str = None) -> Optional[int]:
          163         if txin.value_sats() is not None:
          164             return txin.value_sats()
          165         prevout_hash = txin.prevout.txid.hex()
          166         prevout_n = txin.prevout.out_idx
          167         if address is None:
          168             address = self.get_txin_address(txin)
          169         if address:
          170             d = self.db.get_txo_addr(prevout_hash, address)
          171             try:
          172                 v, cb = d[prevout_n]
          173                 return v
          174             except KeyError:
          175                 pass
          176         tx = self.db.get_transaction(prevout_hash)
          177         if tx:
          178             return tx.outputs()[prevout_n].value
          179         return None
          180 
          181     def get_txout_address(self, txo: TxOutput) -> Optional[str]:
          182         return txo.address
          183 
          184     def load_unverified_transactions(self):
          185         # review transactions that are in the history
          186         for addr in self.db.get_history():
          187             hist = self.db.get_addr_history(addr)
          188             for tx_hash, tx_height in hist:
          189                 # add it in case it was previously unconfirmed
          190                 self.add_unverified_tx(tx_hash, tx_height)
          191 
          192     def start_network(self, network: Optional['Network']) -> None:
          193         self.network = network
          194         if self.network is not None:
          195             self.synchronizer = Synchronizer(self)
          196             self.verifier = SPV(self.network, self)
          197             util.register_callback(self.on_blockchain_updated, ['blockchain_updated'])
          198 
          199     def on_blockchain_updated(self, event, *args):
          200         self._get_addr_balance_cache = {}  # invalidate cache
          201 
          202     async def stop(self):
          203         if self.network:
          204             try:
          205                 async with TaskGroup() as group:
          206                     if self.synchronizer:
          207                         await group.spawn(self.synchronizer.stop())
          208                     if self.verifier:
          209                         await group.spawn(self.verifier.stop())
          210             finally:  # even if we get cancelled
          211                 self.synchronizer = None
          212                 self.verifier = None
          213                 util.unregister_callback(self.on_blockchain_updated)
          214                 self.db.put('stored_height', self.get_local_height())
          215 
          216     def add_address(self, address):
          217         if not self.db.get_addr_history(address):
          218             self.db.history[address] = []
          219             self.set_up_to_date(False)
          220         if self.synchronizer:
          221             self.synchronizer.add(address)
          222 
          223     def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False):
          224         """Returns a set of transaction hashes from the wallet history that are
          225         directly conflicting with tx, i.e. they have common outpoints being
          226         spent with tx.
          227 
          228         include_self specifies whether the tx itself should be reported as a
          229         conflict (if already in wallet history)
          230         """
          231         conflicting_txns = set()
          232         with self.transaction_lock:
          233             for txin in tx.inputs():
          234                 if txin.is_coinbase_input():
          235                     continue
          236                 prevout_hash = txin.prevout.txid.hex()
          237                 prevout_n = txin.prevout.out_idx
          238                 spending_tx_hash = self.db.get_spent_outpoint(prevout_hash, prevout_n)
          239                 if spending_tx_hash is None:
          240                     continue
          241                 # this outpoint has already been spent, by spending_tx
          242                 # annoying assert that has revealed several bugs over time:
          243                 assert self.db.get_transaction(spending_tx_hash), "spending tx not in wallet db"
          244                 conflicting_txns |= {spending_tx_hash}
          245             if tx_hash in conflicting_txns:
          246                 # this tx is already in history, so it conflicts with itself
          247                 if len(conflicting_txns) > 1:
          248                     raise Exception('Found conflicting transactions already in wallet history.')
          249                 if not include_self:
          250                     conflicting_txns -= {tx_hash}
          251             return conflicting_txns
          252 
          253     def add_transaction(self, tx: Transaction, *, allow_unrelated=False) -> bool:
          254         """Returns whether the tx was successfully added to the wallet history."""
          255         assert tx, tx
          256         # note: tx.is_complete() is not necessarily True; tx might be partial
          257         # but it *needs* to have a txid:
          258         tx_hash = tx.txid()
          259         if tx_hash is None:
          260             raise Exception("cannot add tx without txid to wallet history")
          261         # we need self.transaction_lock but get_tx_height will take self.lock
          262         # so we need to take that too here, to enforce order of locks
          263         with self.lock, self.transaction_lock:
          264             # NOTE: returning if tx in self.transactions might seem like a good idea
          265             # BUT we track is_mine inputs in a txn, and during subsequent calls
          266             # of add_transaction tx, we might learn of more-and-more inputs of
          267             # being is_mine, as we roll the gap_limit forward
          268             is_coinbase = tx.inputs()[0].is_coinbase_input()
          269             tx_height = self.get_tx_height(tx_hash).height
          270             if not allow_unrelated:
          271                 # note that during sync, if the transactions are not properly sorted,
          272                 # it could happen that we think tx is unrelated but actually one of the inputs is is_mine.
          273                 # this is the main motivation for allow_unrelated
          274                 is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()])
          275                 is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()])
          276                 if not is_mine and not is_for_me:
          277                     raise UnrelatedTransactionException()
          278             # Find all conflicting transactions.
          279             # In case of a conflict,
          280             #     1. confirmed > mempool > local
          281             #     2. this new txn has priority over existing ones
          282             # When this method exits, there must NOT be any conflict, so
          283             # either keep this txn and remove all conflicting (along with dependencies)
          284             #     or drop this txn
          285             conflicting_txns = self.get_conflicting_transactions(tx_hash, tx)
          286             if conflicting_txns:
          287                 existing_mempool_txn = any(
          288                     self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT)
          289                     for tx_hash2 in conflicting_txns)
          290                 existing_confirmed_txn = any(
          291                     self.get_tx_height(tx_hash2).height > 0
          292                     for tx_hash2 in conflicting_txns)
          293                 if existing_confirmed_txn and tx_height <= 0:
          294                     # this is a non-confirmed tx that conflicts with confirmed txns; drop.
          295                     return False
          296                 if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL:
          297                     # this is a local tx that conflicts with non-local txns; drop.
          298                     return False
          299                 # keep this txn and remove all conflicting
          300                 for tx_hash2 in conflicting_txns:
          301                     self.remove_transaction(tx_hash2)
          302             # add inputs
          303             def add_value_from_prev_output():
          304                 # note: this takes linear time in num is_mine outputs of prev_tx
          305                 addr = self.get_txin_address(txi)
          306                 if addr and self.is_mine(addr):
          307                     outputs = self.db.get_txo_addr(prevout_hash, addr)
          308                     try:
          309                         v, is_cb = outputs[prevout_n]
          310                     except KeyError:
          311                         pass
          312                     else:
          313                         self.db.add_txi_addr(tx_hash, addr, ser, v)
          314                         self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
          315             for txi in tx.inputs():
          316                 if txi.is_coinbase_input():
          317                     continue
          318                 prevout_hash = txi.prevout.txid.hex()
          319                 prevout_n = txi.prevout.out_idx
          320                 ser = txi.prevout.to_str()
          321                 self.db.set_spent_outpoint(prevout_hash, prevout_n, tx_hash)
          322                 add_value_from_prev_output()
          323             # add outputs
          324             for n, txo in enumerate(tx.outputs()):
          325                 v = txo.value
          326                 ser = tx_hash + ':%d'%n
          327                 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
          328                 self.db.add_prevout_by_scripthash(scripthash, prevout=TxOutpoint.from_str(ser), value=v)
          329                 addr = self.get_txout_address(txo)
          330                 if addr and self.is_mine(addr):
          331                     self.db.add_txo_addr(tx_hash, addr, n, v, is_coinbase)
          332                     self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
          333                     # give v to txi that spends me
          334                     next_tx = self.db.get_spent_outpoint(tx_hash, n)
          335                     if next_tx is not None:
          336                         self.db.add_txi_addr(next_tx, addr, ser, v)
          337                         self._add_tx_to_local_history(next_tx)
          338             # add to local history
          339             self._add_tx_to_local_history(tx_hash)
          340             # save
          341             self.db.add_transaction(tx_hash, tx)
          342             self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs()))
          343             return True
          344 
          345     def remove_transaction(self, tx_hash: str) -> None:
          346         """Removes a transaction AND all its dependents/children
          347         from the wallet history.
          348         """
          349         with self.lock, self.transaction_lock:
          350             to_remove = {tx_hash}
          351             to_remove |= self.get_depending_transactions(tx_hash)
          352             for txid in to_remove:
          353                 self._remove_transaction(txid)
          354 
          355     def _remove_transaction(self, tx_hash: str) -> None:
          356         """Removes a single transaction from the wallet history, and attempts
          357          to undo all effects of the tx (spending inputs, creating outputs, etc).
          358         """
          359         def remove_from_spent_outpoints():
          360             # undo spends in spent_outpoints
          361             if tx is not None:
          362                 # if we have the tx, this branch is faster
          363                 for txin in tx.inputs():
          364                     if txin.is_coinbase_input():
          365                         continue
          366                     prevout_hash = txin.prevout.txid.hex()
          367                     prevout_n = txin.prevout.out_idx
          368                     self.db.remove_spent_outpoint(prevout_hash, prevout_n)
          369             else:
          370                 # expensive but always works
          371                 for prevout_hash, prevout_n in self.db.list_spent_outpoints():
          372                     spending_txid = self.db.get_spent_outpoint(prevout_hash, prevout_n)
          373                     if spending_txid == tx_hash:
          374                         self.db.remove_spent_outpoint(prevout_hash, prevout_n)
          375 
          376         with self.lock, self.transaction_lock:
          377             self.logger.info(f"removing tx from history {tx_hash}")
          378             tx = self.db.remove_transaction(tx_hash)
          379             remove_from_spent_outpoints()
          380             self._remove_tx_from_local_history(tx_hash)
          381             for addr in itertools.chain(self.db.get_txi_addresses(tx_hash), self.db.get_txo_addresses(tx_hash)):
          382                 self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
          383             self.db.remove_txi(tx_hash)
          384             self.db.remove_txo(tx_hash)
          385             self.db.remove_tx_fee(tx_hash)
          386             self.db.remove_verified_tx(tx_hash)
          387             self.unverified_tx.pop(tx_hash, None)
          388             if tx:
          389                 for idx, txo in enumerate(tx.outputs()):
          390                     scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
          391                     prevout = TxOutpoint(bfh(tx_hash), idx)
          392                     self.db.remove_prevout_by_scripthash(scripthash, prevout=prevout, value=txo.value)
          393 
          394     def get_depending_transactions(self, tx_hash: str) -> Set[str]:
          395         """Returns all (grand-)children of tx_hash in this wallet."""
          396         with self.transaction_lock:
          397             children = set()
          398             for n in self.db.get_spent_outpoints(tx_hash):
          399                 other_hash = self.db.get_spent_outpoint(tx_hash, n)
          400                 children.add(other_hash)
          401                 children |= self.get_depending_transactions(other_hash)
          402             return children
          403 
          404     def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None:
          405         self.add_unverified_tx(tx_hash, tx_height)
          406         self.add_transaction(tx, allow_unrelated=True)
          407 
          408     def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]):
          409         with self.lock:
          410             old_hist = self.get_address_history(addr)
          411             for tx_hash, height in old_hist:
          412                 if (tx_hash, height) not in hist:
          413                     # make tx local
          414                     self.unverified_tx.pop(tx_hash, None)
          415                     self.db.remove_verified_tx(tx_hash)
          416                     if self.verifier:
          417                         self.verifier.remove_spv_proof_for_tx(tx_hash)
          418             self.db.set_addr_history(addr, hist)
          419 
          420         for tx_hash, tx_height in hist:
          421             # add it in case it was previously unconfirmed
          422             self.add_unverified_tx(tx_hash, tx_height)
          423             # if addr is new, we have to recompute txi and txo
          424             tx = self.db.get_transaction(tx_hash)
          425             if tx is None:
          426                 continue
          427             self.add_transaction(tx, allow_unrelated=True)
          428 
          429         # Store fees
          430         for tx_hash, fee_sat in tx_fees.items():
          431             self.db.add_tx_fee_from_server(tx_hash, fee_sat)
          432 
          433     @profiler
          434     def load_local_history(self):
          435         self._history_local = {}  # type: Dict[str, Set[str]]  # address -> set(txid)
          436         self._address_history_changed_events = defaultdict(asyncio.Event)  # address -> Event
          437         for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
          438             self._add_tx_to_local_history(txid)
          439 
          440     @profiler
          441     def check_history(self):
          442         hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.db.get_history()))
          443         hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.db.get_history()))
          444         for addr in hist_addrs_not_mine:
          445             self.db.remove_addr_history(addr)
          446         for addr in hist_addrs_mine:
          447             hist = self.db.get_addr_history(addr)
          448             for tx_hash, tx_height in hist:
          449                 if self.db.get_txi_addresses(tx_hash) or self.db.get_txo_addresses(tx_hash):
          450                     continue
          451                 tx = self.db.get_transaction(tx_hash)
          452                 if tx is not None:
          453                     self.add_transaction(tx, allow_unrelated=True)
          454 
          455     def remove_local_transactions_we_dont_have(self):
          456         for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
          457             tx_height = self.get_tx_height(txid).height
          458             if tx_height == TX_HEIGHT_LOCAL and not self.db.get_transaction(txid):
          459                 self.remove_transaction(txid)
          460 
          461     def clear_history(self):
          462         with self.lock:
          463             with self.transaction_lock:
          464                 self.db.clear_history()
          465                 self._history_local.clear()
          466                 self._get_addr_balance_cache = {}  # invalidate cache
          467 
          468     def get_txpos(self, tx_hash):
          469         """Returns (height, txpos) tuple, even if the tx is unverified."""
          470         with self.lock:
          471             verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
          472             if verified_tx_mined_info:
          473                 return verified_tx_mined_info.height, verified_tx_mined_info.txpos
          474             elif tx_hash in self.unverified_tx:
          475                 height = self.unverified_tx[tx_hash]
          476                 return (height, -1) if height > 0 else ((1e9 - height), -1)
          477             else:
          478                 return (1e9+1, -1)
          479 
          480     def with_local_height_cached(func):
          481         # get local height only once, as it's relatively expensive.
          482         # take care that nested calls work as expected
          483         def f(self, *args, **kwargs):
          484             orig_val = getattr(self.threadlocal_cache, 'local_height', None)
          485             self.threadlocal_cache.local_height = orig_val or self.get_local_height()
          486             try:
          487                 return func(self, *args, **kwargs)
          488             finally:
          489                 self.threadlocal_cache.local_height = orig_val
          490         return f
          491 
          492     @with_lock
          493     @with_transaction_lock
          494     @with_local_height_cached
          495     def get_history(self, *, domain=None) -> Sequence[HistoryItem]:
          496         # get domain
          497         if domain is None:
          498             domain = self.get_addresses()
          499         domain = set(domain)
          500         # 1. Get the history of each address in the domain, maintain the
          501         #    delta of a tx as the sum of its deltas on domain addresses
          502         tx_deltas = defaultdict(int)  # type: Dict[str, int]
          503         for addr in domain:
          504             h = self.get_address_history(addr)
          505             for tx_hash, height in h:
          506                 tx_deltas[tx_hash] += self.get_tx_delta(tx_hash, addr)
          507         # 2. create sorted history
          508         history = []
          509         for tx_hash in tx_deltas:
          510             delta = tx_deltas[tx_hash]
          511             tx_mined_status = self.get_tx_height(tx_hash)
          512             fee = self.get_tx_fee(tx_hash)
          513             history.append((tx_hash, tx_mined_status, delta, fee))
          514         history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True)
          515         # 3. add balance
          516         c, u, x = self.get_balance(domain)
          517         balance = c + u + x
          518         h2 = []
          519         for tx_hash, tx_mined_status, delta, fee in history:
          520             h2.append(HistoryItem(txid=tx_hash,
          521                                   tx_mined_status=tx_mined_status,
          522                                   delta=delta,
          523                                   fee=fee,
          524                                   balance=balance))
          525             balance -= delta
          526         h2.reverse()
          527 
          528         if balance != 0:
          529             raise Exception("wallet.get_history() failed balance sanity-check")
          530 
          531         return h2
          532 
          533     def _add_tx_to_local_history(self, txid):
          534         with self.transaction_lock:
          535             for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
          536                 cur_hist = self._history_local.get(addr, set())
          537                 cur_hist.add(txid)
          538                 self._history_local[addr] = cur_hist
          539                 self._mark_address_history_changed(addr)
          540 
          541     def _remove_tx_from_local_history(self, txid):
          542         with self.transaction_lock:
          543             for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
          544                 cur_hist = self._history_local.get(addr, set())
          545                 try:
          546                     cur_hist.remove(txid)
          547                 except KeyError:
          548                     pass
          549                 else:
          550                     self._history_local[addr] = cur_hist
          551 
          552     def _mark_address_history_changed(self, addr: str) -> None:
          553         # history for this address changed, wake up coroutines:
          554         self._address_history_changed_events[addr].set()
          555         # clear event immediately so that coroutines can wait() for the next change:
          556         self._address_history_changed_events[addr].clear()
          557 
          558     async def wait_for_address_history_to_change(self, addr: str) -> None:
          559         """Wait until the server tells us about a new transaction related to addr.
          560 
          561         Unconfirmed and confirmed transactions are not distinguished, and so e.g. SPV
          562         is not taken into account.
          563         """
          564         assert self.is_mine(addr), "address needs to be is_mine to be watched"
          565         await self._address_history_changed_events[addr].wait()
          566 
          567     def add_unverified_tx(self, tx_hash, tx_height):
          568         if self.db.is_in_verified_tx(tx_hash):
          569             if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
          570                 with self.lock:
          571                     self.db.remove_verified_tx(tx_hash)
          572                 if self.verifier:
          573                     self.verifier.remove_spv_proof_for_tx(tx_hash)
          574         else:
          575             with self.lock:
          576                 # tx will be verified only if height > 0
          577                 self.unverified_tx[tx_hash] = tx_height
          578 
          579     def remove_unverified_tx(self, tx_hash, tx_height):
          580         with self.lock:
          581             new_height = self.unverified_tx.get(tx_hash)
          582             if new_height == tx_height:
          583                 self.unverified_tx.pop(tx_hash, None)
          584 
          585     def add_verified_tx(self, tx_hash: str, info: TxMinedInfo):
          586         # Remove from the unverified map and add to the verified map
          587         with self.lock:
          588             self.unverified_tx.pop(tx_hash, None)
          589             self.db.add_verified_tx(tx_hash, info)
          590         tx_mined_status = self.get_tx_height(tx_hash)
          591         util.trigger_callback('verified', self, tx_hash, tx_mined_status)
          592 
          593     def get_unverified_txs(self):
          594         '''Returns a map from tx hash to transaction height'''
          595         with self.lock:
          596             return dict(self.unverified_tx)  # copy
          597 
          598     def undo_verifications(self, blockchain, above_height):
          599         '''Used by the verifier when a reorg has happened'''
          600         txs = set()
          601         with self.lock:
          602             for tx_hash in self.db.list_verified_tx():
          603                 info = self.db.get_verified_tx(tx_hash)
          604                 tx_height = info.height
          605                 if tx_height > above_height:
          606                     header = blockchain.read_header(tx_height)
          607                     if not header or hash_header(header) != info.header_hash:
          608                         self.db.remove_verified_tx(tx_hash)
          609                         # NOTE: we should add these txns to self.unverified_tx,
          610                         # but with what height?
          611                         # If on the new fork after the reorg, the txn is at the
          612                         # same height, we will not get a status update for the
          613                         # address. If the txn is not mined or at a diff height,
          614                         # we should get a status update. Unless we put tx into
          615                         # unverified_tx, it will turn into local. So we put it
          616                         # into unverified_tx with the old height, and if we get
          617                         # a status update, that will overwrite it.
          618                         self.unverified_tx[tx_hash] = tx_height
          619                         txs.add(tx_hash)
          620         return txs
          621 
          622     def get_local_height(self) -> int:
          623         """ return last known height if we are offline """
          624         cached_local_height = getattr(self.threadlocal_cache, 'local_height', None)
          625         if cached_local_height is not None:
          626             return cached_local_height
          627         return self.network.get_local_height() if self.network else self.db.get('stored_height', 0)
          628 
          629     def add_future_tx(self, tx: Transaction, num_blocks: int) -> bool:
          630         assert num_blocks > 0, num_blocks
          631         with self.lock:
          632             tx_was_added = self.add_transaction(tx)
          633             if tx_was_added:
          634                 self.future_tx[tx.txid()] = num_blocks
          635             return tx_was_added
          636 
          637     def get_tx_height(self, tx_hash: str) -> TxMinedInfo:
          638         if tx_hash is None:  # ugly backwards compat...
          639             return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
          640         with self.lock:
          641             verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
          642             if verified_tx_mined_info:
          643                 conf = max(self.get_local_height() - verified_tx_mined_info.height + 1, 0)
          644                 return verified_tx_mined_info._replace(conf=conf)
          645             elif tx_hash in self.unverified_tx:
          646                 height = self.unverified_tx[tx_hash]
          647                 return TxMinedInfo(height=height, conf=0)
          648             elif tx_hash in self.future_tx:
          649                 num_blocks_remainining = self.future_tx[tx_hash]
          650                 assert num_blocks_remainining > 0, num_blocks_remainining
          651                 return TxMinedInfo(height=TX_HEIGHT_FUTURE, conf=-num_blocks_remainining)
          652             else:
          653                 # local transaction
          654                 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
          655 
          656     def set_up_to_date(self, up_to_date):
          657         with self.lock:
          658             status_changed = self.up_to_date != up_to_date
          659             self.up_to_date = up_to_date
          660         if self.network:
          661             self.network.notify('status')
          662         if status_changed:
          663             self.logger.info(f'set_up_to_date: {up_to_date}')
          664 
          665     def is_up_to_date(self):
          666         with self.lock: return self.up_to_date
          667 
          668     def get_history_sync_state_details(self) -> Tuple[int, int]:
          669         if self.synchronizer:
          670             return self.synchronizer.num_requests_sent_and_answered()
          671         else:
          672             return 0, 0
          673 
          674     @with_transaction_lock
          675     def get_tx_delta(self, tx_hash: str, address: str) -> int:
          676         """effect of tx on address"""
          677         delta = 0
          678         # subtract the value of coins sent from address
          679         d = self.db.get_txi_addr(tx_hash, address)
          680         for n, v in d:
          681             delta -= v
          682         # add the value of the coins received at address
          683         d = self.db.get_txo_addr(tx_hash, address)
          684         for n, (v, cb) in d.items():
          685             delta += v
          686         return delta
          687 
          688     def get_wallet_delta(self, tx: Transaction) -> TxWalletDelta:
          689         """effect of tx on wallet"""
          690         is_relevant = False  # "related to wallet?"
          691         num_input_ismine = 0
          692         v_in = v_in_mine = v_out = v_out_mine = 0
          693         with self.lock, self.transaction_lock:
          694             for txin in tx.inputs():
          695                 addr = self.get_txin_address(txin)
          696                 value = self.get_txin_value(txin, address=addr)
          697                 if self.is_mine(addr):
          698                     num_input_ismine += 1
          699                     is_relevant = True
          700                     assert value is not None
          701                     v_in_mine += value
          702                 if value is None:
          703                     v_in = None
          704                 elif v_in is not None:
          705                     v_in += value
          706             for txout in tx.outputs():
          707                 v_out += txout.value
          708                 if self.is_mine(txout.address):
          709                     v_out_mine += txout.value
          710                     is_relevant = True
          711         delta = v_out_mine - v_in_mine
          712         if v_in is not None:
          713             fee = v_in - v_out
          714         else:
          715             fee = None
          716         if fee is None and isinstance(tx, PartialTransaction):
          717             fee = tx.get_fee()
          718         return TxWalletDelta(
          719             is_relevant=is_relevant,
          720             is_any_input_ismine=num_input_ismine > 0,
          721             is_all_input_ismine=num_input_ismine == len(tx.inputs()),
          722             delta=delta,
          723             fee=fee,
          724         )
          725 
          726     def get_tx_fee(self, txid: str) -> Optional[int]:
          727         """ Returns tx_fee or None. Use server fee only if tx is unconfirmed and not mine"""
          728         # check if stored fee is available
          729         fee = self.db.get_tx_fee(txid, trust_server=False)
          730         if fee is not None:
          731             return fee
          732         # delete server-sent fee for confirmed txns
          733         confirmed = self.get_tx_height(txid).conf > 0
          734         if confirmed:
          735             self.db.add_tx_fee_from_server(txid, None)
          736         # if all inputs are ismine, try to calc fee now;
          737         # otherwise, return stored value
          738         num_all_inputs = self.db.get_num_all_inputs_of_tx(txid)
          739         if num_all_inputs is not None:
          740             # check if tx is mine
          741             num_ismine_inputs = self.db.get_num_ismine_inputs_of_tx(txid)
          742             assert num_ismine_inputs <= num_all_inputs, (num_ismine_inputs, num_all_inputs)
          743             # trust server if tx is unconfirmed and not mine
          744             if num_ismine_inputs < num_all_inputs:
          745                 return None if confirmed else self.db.get_tx_fee(txid, trust_server=True)
          746         # lookup tx and deserialize it.
          747         # note that deserializing is expensive, hence above hacks
          748         tx = self.db.get_transaction(txid)
          749         if not tx:
          750             return None
          751         fee = self.get_wallet_delta(tx).fee
          752         # save result
          753         self.db.add_tx_fee_we_calculated(txid, fee)
          754         self.db.add_num_inputs_to_tx(txid, len(tx.inputs()))
          755         return fee
          756 
          757     def get_addr_io(self, address):
          758         with self.lock, self.transaction_lock:
          759             h = self.get_address_history(address)
          760             received = {}
          761             sent = {}
          762             for tx_hash, height in h:
          763                 d = self.db.get_txo_addr(tx_hash, address)
          764                 for n, (v, is_cb) in d.items():
          765                     received[tx_hash + ':%d'%n] = (height, v, is_cb)
          766             for tx_hash, height in h:
          767                 l = self.db.get_txi_addr(tx_hash, address)
          768                 for txi, v in l:
          769                     sent[txi] = height
          770         return received, sent
          771 
          772 
          773     def get_addr_outputs(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
          774         coins, spent = self.get_addr_io(address)
          775         out = {}
          776         for prevout_str, v in coins.items():
          777             tx_height, value, is_cb = v
          778             prevout = TxOutpoint.from_str(prevout_str)
          779             utxo = PartialTxInput(prevout=prevout, is_coinbase_output=is_cb)
          780             utxo._trusted_address = address
          781             utxo._trusted_value_sats = value
          782             utxo.block_height = tx_height
          783             utxo.spent_height = spent.get(prevout_str, None)
          784             out[prevout] = utxo
          785         return out
          786 
          787     def get_addr_utxo(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
          788         out = self.get_addr_outputs(address)
          789         for k, v in list(out.items()):
          790             if v.spent_height is not None:
          791                 out.pop(k)
          792         return out
          793 
          794     # return the total amount ever received by an address
          795     def get_addr_received(self, address):
          796         received, sent = self.get_addr_io(address)
          797         return sum([v for height, v, is_cb in received.values()])
          798 
          799     @with_local_height_cached
          800     def get_addr_balance(self, address, *, excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
          801         """Return the balance of a bitcoin address:
          802         confirmed and matured, unconfirmed, unmatured
          803         """
          804         if not excluded_coins:  # cache is only used if there are no excluded_coins
          805             cached_value = self._get_addr_balance_cache.get(address)
          806             if cached_value:
          807                 return cached_value
          808         if excluded_coins is None:
          809             excluded_coins = set()
          810         assert isinstance(excluded_coins, set), f"excluded_coins should be set, not {type(excluded_coins)}"
          811         received, sent = self.get_addr_io(address)
          812         c = u = x = 0
          813         mempool_height = self.get_local_height() + 1  # height of next block
          814         for txo, (tx_height, v, is_cb) in received.items():
          815             if txo in excluded_coins:
          816                 continue
          817             if is_cb and tx_height + COINBASE_MATURITY > mempool_height:
          818                 x += v
          819             elif tx_height > 0:
          820                 c += v
          821             else:
          822                 u += v
          823             if txo in sent:
          824                 if sent[txo] > 0:
          825                     c -= v
          826                 else:
          827                     u -= v
          828         result = c, u, x
          829         # cache result.
          830         if not excluded_coins:
          831             # Cache needs to be invalidated if a transaction is added to/
          832             # removed from history; or on new blocks (maturity...)
          833             self._get_addr_balance_cache[address] = result
          834         return result
          835 
          836     @with_local_height_cached
          837     def get_utxos(self, domain=None, *, excluded_addresses=None,
          838                   mature_only: bool = False, confirmed_only: bool = False,
          839                   nonlocal_only: bool = False) -> Sequence[PartialTxInput]:
          840         coins = []
          841         if domain is None:
          842             domain = self.get_addresses()
          843         domain = set(domain)
          844         if excluded_addresses:
          845             domain = set(domain) - set(excluded_addresses)
          846         mempool_height = self.get_local_height() + 1  # height of next block
          847         for addr in domain:
          848             utxos = self.get_addr_utxo(addr)
          849             for utxo in utxos.values():
          850                 if confirmed_only and utxo.block_height <= 0:
          851                     continue
          852                 if nonlocal_only and utxo.block_height == TX_HEIGHT_LOCAL:
          853                     continue
          854                 if (mature_only and utxo.is_coinbase_output()
          855                         and utxo.block_height + COINBASE_MATURITY > mempool_height):
          856                     continue
          857                 coins.append(utxo)
          858                 continue
          859         return coins
          860 
          861     def get_balance(self, domain=None, *, excluded_addresses: Set[str] = None,
          862                     excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
          863         if domain is None:
          864             domain = self.get_addresses()
          865         if excluded_addresses is None:
          866             excluded_addresses = set()
          867         assert isinstance(excluded_addresses, set), f"excluded_addresses should be set, not {type(excluded_addresses)}"
          868         domain = set(domain) - excluded_addresses
          869         cc = uu = xx = 0
          870         for addr in domain:
          871             c, u, x = self.get_addr_balance(addr, excluded_coins=excluded_coins)
          872             cc += c
          873             uu += u
          874             xx += x
          875         return cc, uu, xx
          876 
          877     def is_used(self, address: str) -> bool:
          878         return self.get_address_history_len(address) != 0
          879 
          880     def is_empty(self, address: str) -> bool:
          881         c, u, x = self.get_addr_balance(address)
          882         return c+u+x == 0
          883 
          884     def synchronize(self):
          885         pass