taddress_synchronizer.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
taddress_synchronizer.py (38895B)
---
1 # Electrum - lightweight Bitcoin client
2 # Copyright (C) 2018 The Electrum Developers
3 #
4 # Permission is hereby granted, free of charge, to any person
5 # obtaining a copy of this software and associated documentation files
6 # (the "Software"), to deal in the Software without restriction,
7 # including without limitation the rights to use, copy, modify, merge,
8 # publish, distribute, sublicense, and/or sell copies of the Software,
9 # and to permit persons to whom the Software is furnished to do so,
10 # subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be
13 # included in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 # SOFTWARE.
23
24 import asyncio
25 import threading
26 import asyncio
27 import itertools
28 from collections import defaultdict
29 from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List
30
31 from aiorpcx import TaskGroup
32
33 from . import bitcoin, util
34 from .bitcoin import COINBASE_MATURITY
35 from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException
36 from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
37 from .synchronizer import Synchronizer
38 from .verifier import SPV
39 from .blockchain import hash_header
40 from .i18n import _
41 from .logging import Logger
42
43 if TYPE_CHECKING:
44 from .network import Network
45 from .wallet_db import WalletDB
46
47
48 TX_HEIGHT_FUTURE = -3
49 TX_HEIGHT_LOCAL = -2
50 TX_HEIGHT_UNCONF_PARENT = -1
51 TX_HEIGHT_UNCONFIRMED = 0
52
53
54 class HistoryItem(NamedTuple):
55 txid: str
56 tx_mined_status: TxMinedInfo
57 delta: int
58 fee: Optional[int]
59 balance: int
60
61
62 class TxWalletDelta(NamedTuple):
63 is_relevant: bool # "related to wallet?"
64 is_any_input_ismine: bool
65 is_all_input_ismine: bool
66 delta: int
67 fee: Optional[int]
68
69
70 class AddressSynchronizer(Logger):
71 """
72 inherited by wallet
73 """
74
75 network: Optional['Network']
76 synchronizer: Optional['Synchronizer']
77 verifier: Optional['SPV']
78
79 def __init__(self, db: 'WalletDB'):
80 self.db = db
81 self.network = None
82 Logger.__init__(self)
83 # verifier (SPV) and synchronizer are started in start_network
84 self.synchronizer = None
85 self.verifier = None
86 # locks: if you need to take multiple ones, acquire them in the order they are defined here!
87 self.lock = threading.RLock()
88 self.transaction_lock = threading.RLock()
89 self.future_tx = {} # type: Dict[str, int] # txid -> blocks remaining
90 # Transactions pending verification. txid -> tx_height. Access with self.lock.
91 self.unverified_tx = defaultdict(int)
92 # true when synchronized
93 self.up_to_date = False
94 # thread local storage for caching stuff
95 self.threadlocal_cache = threading.local()
96
97 self._get_addr_balance_cache = {}
98
99 self.load_and_cleanup()
100
101 def with_lock(func):
102 def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
103 with self.lock:
104 return func(self, *args, **kwargs)
105 return func_wrapper
106
107 def with_transaction_lock(func):
108 def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
109 with self.transaction_lock:
110 return func(self, *args, **kwargs)
111 return func_wrapper
112
113 def load_and_cleanup(self):
114 self.load_local_history()
115 self.check_history()
116 self.load_unverified_transactions()
117 self.remove_local_transactions_we_dont_have()
118
119 def is_mine(self, address: Optional[str]) -> bool:
120 if not address: return False
121 return self.db.is_addr_in_history(address)
122
123 def get_addresses(self):
124 return sorted(self.db.get_history())
125
126 def get_address_history(self, addr: str) -> Sequence[Tuple[str, int]]:
127 """Returns the history for the address, in the format that would be returned by a server.
128
129 Note: The difference between db.get_addr_history and this method is that
130 db.get_addr_history stores the response from a server, so it only includes txns
131 a server sees, i.e. that does not contain local and future txns.
132 """
133 h = []
134 # we need self.transaction_lock but get_tx_height will take self.lock
135 # so we need to take that too here, to enforce order of locks
136 with self.lock, self.transaction_lock:
137 related_txns = self._history_local.get(addr, set())
138 for tx_hash in related_txns:
139 tx_height = self.get_tx_height(tx_hash).height
140 h.append((tx_hash, tx_height))
141 return h
142
143 def get_address_history_len(self, addr: str) -> int:
144 """Return number of transactions where address is involved."""
145 return len(self._history_local.get(addr, ()))
146
147 def get_txin_address(self, txin: TxInput) -> Optional[str]:
148 if isinstance(txin, PartialTxInput):
149 if txin.address:
150 return txin.address
151 prevout_hash = txin.prevout.txid.hex()
152 prevout_n = txin.prevout.out_idx
153 for addr in self.db.get_txo_addresses(prevout_hash):
154 d = self.db.get_txo_addr(prevout_hash, addr)
155 if prevout_n in d:
156 return addr
157 tx = self.db.get_transaction(prevout_hash)
158 if tx:
159 return tx.outputs()[prevout_n].address
160 return None
161
162 def get_txin_value(self, txin: TxInput, *, address: str = None) -> Optional[int]:
163 if txin.value_sats() is not None:
164 return txin.value_sats()
165 prevout_hash = txin.prevout.txid.hex()
166 prevout_n = txin.prevout.out_idx
167 if address is None:
168 address = self.get_txin_address(txin)
169 if address:
170 d = self.db.get_txo_addr(prevout_hash, address)
171 try:
172 v, cb = d[prevout_n]
173 return v
174 except KeyError:
175 pass
176 tx = self.db.get_transaction(prevout_hash)
177 if tx:
178 return tx.outputs()[prevout_n].value
179 return None
180
181 def get_txout_address(self, txo: TxOutput) -> Optional[str]:
182 return txo.address
183
184 def load_unverified_transactions(self):
185 # review transactions that are in the history
186 for addr in self.db.get_history():
187 hist = self.db.get_addr_history(addr)
188 for tx_hash, tx_height in hist:
189 # add it in case it was previously unconfirmed
190 self.add_unverified_tx(tx_hash, tx_height)
191
192 def start_network(self, network: Optional['Network']) -> None:
193 self.network = network
194 if self.network is not None:
195 self.synchronizer = Synchronizer(self)
196 self.verifier = SPV(self.network, self)
197 util.register_callback(self.on_blockchain_updated, ['blockchain_updated'])
198
199 def on_blockchain_updated(self, event, *args):
200 self._get_addr_balance_cache = {} # invalidate cache
201
202 async def stop(self):
203 if self.network:
204 try:
205 async with TaskGroup() as group:
206 if self.synchronizer:
207 await group.spawn(self.synchronizer.stop())
208 if self.verifier:
209 await group.spawn(self.verifier.stop())
210 finally: # even if we get cancelled
211 self.synchronizer = None
212 self.verifier = None
213 util.unregister_callback(self.on_blockchain_updated)
214 self.db.put('stored_height', self.get_local_height())
215
216 def add_address(self, address):
217 if not self.db.get_addr_history(address):
218 self.db.history[address] = []
219 self.set_up_to_date(False)
220 if self.synchronizer:
221 self.synchronizer.add(address)
222
223 def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False):
224 """Returns a set of transaction hashes from the wallet history that are
225 directly conflicting with tx, i.e. they have common outpoints being
226 spent with tx.
227
228 include_self specifies whether the tx itself should be reported as a
229 conflict (if already in wallet history)
230 """
231 conflicting_txns = set()
232 with self.transaction_lock:
233 for txin in tx.inputs():
234 if txin.is_coinbase_input():
235 continue
236 prevout_hash = txin.prevout.txid.hex()
237 prevout_n = txin.prevout.out_idx
238 spending_tx_hash = self.db.get_spent_outpoint(prevout_hash, prevout_n)
239 if spending_tx_hash is None:
240 continue
241 # this outpoint has already been spent, by spending_tx
242 # annoying assert that has revealed several bugs over time:
243 assert self.db.get_transaction(spending_tx_hash), "spending tx not in wallet db"
244 conflicting_txns |= {spending_tx_hash}
245 if tx_hash in conflicting_txns:
246 # this tx is already in history, so it conflicts with itself
247 if len(conflicting_txns) > 1:
248 raise Exception('Found conflicting transactions already in wallet history.')
249 if not include_self:
250 conflicting_txns -= {tx_hash}
251 return conflicting_txns
252
253 def add_transaction(self, tx: Transaction, *, allow_unrelated=False) -> bool:
254 """Returns whether the tx was successfully added to the wallet history."""
255 assert tx, tx
256 # note: tx.is_complete() is not necessarily True; tx might be partial
257 # but it *needs* to have a txid:
258 tx_hash = tx.txid()
259 if tx_hash is None:
260 raise Exception("cannot add tx without txid to wallet history")
261 # we need self.transaction_lock but get_tx_height will take self.lock
262 # so we need to take that too here, to enforce order of locks
263 with self.lock, self.transaction_lock:
264 # NOTE: returning if tx in self.transactions might seem like a good idea
265 # BUT we track is_mine inputs in a txn, and during subsequent calls
266 # of add_transaction tx, we might learn of more-and-more inputs of
267 # being is_mine, as we roll the gap_limit forward
268 is_coinbase = tx.inputs()[0].is_coinbase_input()
269 tx_height = self.get_tx_height(tx_hash).height
270 if not allow_unrelated:
271 # note that during sync, if the transactions are not properly sorted,
272 # it could happen that we think tx is unrelated but actually one of the inputs is is_mine.
273 # this is the main motivation for allow_unrelated
274 is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()])
275 is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()])
276 if not is_mine and not is_for_me:
277 raise UnrelatedTransactionException()
278 # Find all conflicting transactions.
279 # In case of a conflict,
280 # 1. confirmed > mempool > local
281 # 2. this new txn has priority over existing ones
282 # When this method exits, there must NOT be any conflict, so
283 # either keep this txn and remove all conflicting (along with dependencies)
284 # or drop this txn
285 conflicting_txns = self.get_conflicting_transactions(tx_hash, tx)
286 if conflicting_txns:
287 existing_mempool_txn = any(
288 self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT)
289 for tx_hash2 in conflicting_txns)
290 existing_confirmed_txn = any(
291 self.get_tx_height(tx_hash2).height > 0
292 for tx_hash2 in conflicting_txns)
293 if existing_confirmed_txn and tx_height <= 0:
294 # this is a non-confirmed tx that conflicts with confirmed txns; drop.
295 return False
296 if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL:
297 # this is a local tx that conflicts with non-local txns; drop.
298 return False
299 # keep this txn and remove all conflicting
300 for tx_hash2 in conflicting_txns:
301 self.remove_transaction(tx_hash2)
302 # add inputs
303 def add_value_from_prev_output():
304 # note: this takes linear time in num is_mine outputs of prev_tx
305 addr = self.get_txin_address(txi)
306 if addr and self.is_mine(addr):
307 outputs = self.db.get_txo_addr(prevout_hash, addr)
308 try:
309 v, is_cb = outputs[prevout_n]
310 except KeyError:
311 pass
312 else:
313 self.db.add_txi_addr(tx_hash, addr, ser, v)
314 self._get_addr_balance_cache.pop(addr, None) # invalidate cache
315 for txi in tx.inputs():
316 if txi.is_coinbase_input():
317 continue
318 prevout_hash = txi.prevout.txid.hex()
319 prevout_n = txi.prevout.out_idx
320 ser = txi.prevout.to_str()
321 self.db.set_spent_outpoint(prevout_hash, prevout_n, tx_hash)
322 add_value_from_prev_output()
323 # add outputs
324 for n, txo in enumerate(tx.outputs()):
325 v = txo.value
326 ser = tx_hash + ':%d'%n
327 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
328 self.db.add_prevout_by_scripthash(scripthash, prevout=TxOutpoint.from_str(ser), value=v)
329 addr = self.get_txout_address(txo)
330 if addr and self.is_mine(addr):
331 self.db.add_txo_addr(tx_hash, addr, n, v, is_coinbase)
332 self._get_addr_balance_cache.pop(addr, None) # invalidate cache
333 # give v to txi that spends me
334 next_tx = self.db.get_spent_outpoint(tx_hash, n)
335 if next_tx is not None:
336 self.db.add_txi_addr(next_tx, addr, ser, v)
337 self._add_tx_to_local_history(next_tx)
338 # add to local history
339 self._add_tx_to_local_history(tx_hash)
340 # save
341 self.db.add_transaction(tx_hash, tx)
342 self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs()))
343 return True
344
345 def remove_transaction(self, tx_hash: str) -> None:
346 """Removes a transaction AND all its dependents/children
347 from the wallet history.
348 """
349 with self.lock, self.transaction_lock:
350 to_remove = {tx_hash}
351 to_remove |= self.get_depending_transactions(tx_hash)
352 for txid in to_remove:
353 self._remove_transaction(txid)
354
355 def _remove_transaction(self, tx_hash: str) -> None:
356 """Removes a single transaction from the wallet history, and attempts
357 to undo all effects of the tx (spending inputs, creating outputs, etc).
358 """
359 def remove_from_spent_outpoints():
360 # undo spends in spent_outpoints
361 if tx is not None:
362 # if we have the tx, this branch is faster
363 for txin in tx.inputs():
364 if txin.is_coinbase_input():
365 continue
366 prevout_hash = txin.prevout.txid.hex()
367 prevout_n = txin.prevout.out_idx
368 self.db.remove_spent_outpoint(prevout_hash, prevout_n)
369 else:
370 # expensive but always works
371 for prevout_hash, prevout_n in self.db.list_spent_outpoints():
372 spending_txid = self.db.get_spent_outpoint(prevout_hash, prevout_n)
373 if spending_txid == tx_hash:
374 self.db.remove_spent_outpoint(prevout_hash, prevout_n)
375
376 with self.lock, self.transaction_lock:
377 self.logger.info(f"removing tx from history {tx_hash}")
378 tx = self.db.remove_transaction(tx_hash)
379 remove_from_spent_outpoints()
380 self._remove_tx_from_local_history(tx_hash)
381 for addr in itertools.chain(self.db.get_txi_addresses(tx_hash), self.db.get_txo_addresses(tx_hash)):
382 self._get_addr_balance_cache.pop(addr, None) # invalidate cache
383 self.db.remove_txi(tx_hash)
384 self.db.remove_txo(tx_hash)
385 self.db.remove_tx_fee(tx_hash)
386 self.db.remove_verified_tx(tx_hash)
387 self.unverified_tx.pop(tx_hash, None)
388 if tx:
389 for idx, txo in enumerate(tx.outputs()):
390 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
391 prevout = TxOutpoint(bfh(tx_hash), idx)
392 self.db.remove_prevout_by_scripthash(scripthash, prevout=prevout, value=txo.value)
393
394 def get_depending_transactions(self, tx_hash: str) -> Set[str]:
395 """Returns all (grand-)children of tx_hash in this wallet."""
396 with self.transaction_lock:
397 children = set()
398 for n in self.db.get_spent_outpoints(tx_hash):
399 other_hash = self.db.get_spent_outpoint(tx_hash, n)
400 children.add(other_hash)
401 children |= self.get_depending_transactions(other_hash)
402 return children
403
404 def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None:
405 self.add_unverified_tx(tx_hash, tx_height)
406 self.add_transaction(tx, allow_unrelated=True)
407
408 def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]):
409 with self.lock:
410 old_hist = self.get_address_history(addr)
411 for tx_hash, height in old_hist:
412 if (tx_hash, height) not in hist:
413 # make tx local
414 self.unverified_tx.pop(tx_hash, None)
415 self.db.remove_verified_tx(tx_hash)
416 if self.verifier:
417 self.verifier.remove_spv_proof_for_tx(tx_hash)
418 self.db.set_addr_history(addr, hist)
419
420 for tx_hash, tx_height in hist:
421 # add it in case it was previously unconfirmed
422 self.add_unverified_tx(tx_hash, tx_height)
423 # if addr is new, we have to recompute txi and txo
424 tx = self.db.get_transaction(tx_hash)
425 if tx is None:
426 continue
427 self.add_transaction(tx, allow_unrelated=True)
428
429 # Store fees
430 for tx_hash, fee_sat in tx_fees.items():
431 self.db.add_tx_fee_from_server(tx_hash, fee_sat)
432
433 @profiler
434 def load_local_history(self):
435 self._history_local = {} # type: Dict[str, Set[str]] # address -> set(txid)
436 self._address_history_changed_events = defaultdict(asyncio.Event) # address -> Event
437 for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
438 self._add_tx_to_local_history(txid)
439
440 @profiler
441 def check_history(self):
442 hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.db.get_history()))
443 hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.db.get_history()))
444 for addr in hist_addrs_not_mine:
445 self.db.remove_addr_history(addr)
446 for addr in hist_addrs_mine:
447 hist = self.db.get_addr_history(addr)
448 for tx_hash, tx_height in hist:
449 if self.db.get_txi_addresses(tx_hash) or self.db.get_txo_addresses(tx_hash):
450 continue
451 tx = self.db.get_transaction(tx_hash)
452 if tx is not None:
453 self.add_transaction(tx, allow_unrelated=True)
454
455 def remove_local_transactions_we_dont_have(self):
456 for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
457 tx_height = self.get_tx_height(txid).height
458 if tx_height == TX_HEIGHT_LOCAL and not self.db.get_transaction(txid):
459 self.remove_transaction(txid)
460
461 def clear_history(self):
462 with self.lock:
463 with self.transaction_lock:
464 self.db.clear_history()
465 self._history_local.clear()
466 self._get_addr_balance_cache = {} # invalidate cache
467
468 def get_txpos(self, tx_hash):
469 """Returns (height, txpos) tuple, even if the tx is unverified."""
470 with self.lock:
471 verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
472 if verified_tx_mined_info:
473 return verified_tx_mined_info.height, verified_tx_mined_info.txpos
474 elif tx_hash in self.unverified_tx:
475 height = self.unverified_tx[tx_hash]
476 return (height, -1) if height > 0 else ((1e9 - height), -1)
477 else:
478 return (1e9+1, -1)
479
480 def with_local_height_cached(func):
481 # get local height only once, as it's relatively expensive.
482 # take care that nested calls work as expected
483 def f(self, *args, **kwargs):
484 orig_val = getattr(self.threadlocal_cache, 'local_height', None)
485 self.threadlocal_cache.local_height = orig_val or self.get_local_height()
486 try:
487 return func(self, *args, **kwargs)
488 finally:
489 self.threadlocal_cache.local_height = orig_val
490 return f
491
492 @with_lock
493 @with_transaction_lock
494 @with_local_height_cached
495 def get_history(self, *, domain=None) -> Sequence[HistoryItem]:
496 # get domain
497 if domain is None:
498 domain = self.get_addresses()
499 domain = set(domain)
500 # 1. Get the history of each address in the domain, maintain the
501 # delta of a tx as the sum of its deltas on domain addresses
502 tx_deltas = defaultdict(int) # type: Dict[str, int]
503 for addr in domain:
504 h = self.get_address_history(addr)
505 for tx_hash, height in h:
506 tx_deltas[tx_hash] += self.get_tx_delta(tx_hash, addr)
507 # 2. create sorted history
508 history = []
509 for tx_hash in tx_deltas:
510 delta = tx_deltas[tx_hash]
511 tx_mined_status = self.get_tx_height(tx_hash)
512 fee = self.get_tx_fee(tx_hash)
513 history.append((tx_hash, tx_mined_status, delta, fee))
514 history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True)
515 # 3. add balance
516 c, u, x = self.get_balance(domain)
517 balance = c + u + x
518 h2 = []
519 for tx_hash, tx_mined_status, delta, fee in history:
520 h2.append(HistoryItem(txid=tx_hash,
521 tx_mined_status=tx_mined_status,
522 delta=delta,
523 fee=fee,
524 balance=balance))
525 balance -= delta
526 h2.reverse()
527
528 if balance != 0:
529 raise Exception("wallet.get_history() failed balance sanity-check")
530
531 return h2
532
533 def _add_tx_to_local_history(self, txid):
534 with self.transaction_lock:
535 for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
536 cur_hist = self._history_local.get(addr, set())
537 cur_hist.add(txid)
538 self._history_local[addr] = cur_hist
539 self._mark_address_history_changed(addr)
540
541 def _remove_tx_from_local_history(self, txid):
542 with self.transaction_lock:
543 for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
544 cur_hist = self._history_local.get(addr, set())
545 try:
546 cur_hist.remove(txid)
547 except KeyError:
548 pass
549 else:
550 self._history_local[addr] = cur_hist
551
552 def _mark_address_history_changed(self, addr: str) -> None:
553 # history for this address changed, wake up coroutines:
554 self._address_history_changed_events[addr].set()
555 # clear event immediately so that coroutines can wait() for the next change:
556 self._address_history_changed_events[addr].clear()
557
558 async def wait_for_address_history_to_change(self, addr: str) -> None:
559 """Wait until the server tells us about a new transaction related to addr.
560
561 Unconfirmed and confirmed transactions are not distinguished, and so e.g. SPV
562 is not taken into account.
563 """
564 assert self.is_mine(addr), "address needs to be is_mine to be watched"
565 await self._address_history_changed_events[addr].wait()
566
567 def add_unverified_tx(self, tx_hash, tx_height):
568 if self.db.is_in_verified_tx(tx_hash):
569 if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
570 with self.lock:
571 self.db.remove_verified_tx(tx_hash)
572 if self.verifier:
573 self.verifier.remove_spv_proof_for_tx(tx_hash)
574 else:
575 with self.lock:
576 # tx will be verified only if height > 0
577 self.unverified_tx[tx_hash] = tx_height
578
579 def remove_unverified_tx(self, tx_hash, tx_height):
580 with self.lock:
581 new_height = self.unverified_tx.get(tx_hash)
582 if new_height == tx_height:
583 self.unverified_tx.pop(tx_hash, None)
584
585 def add_verified_tx(self, tx_hash: str, info: TxMinedInfo):
586 # Remove from the unverified map and add to the verified map
587 with self.lock:
588 self.unverified_tx.pop(tx_hash, None)
589 self.db.add_verified_tx(tx_hash, info)
590 tx_mined_status = self.get_tx_height(tx_hash)
591 util.trigger_callback('verified', self, tx_hash, tx_mined_status)
592
593 def get_unverified_txs(self):
594 '''Returns a map from tx hash to transaction height'''
595 with self.lock:
596 return dict(self.unverified_tx) # copy
597
598 def undo_verifications(self, blockchain, above_height):
599 '''Used by the verifier when a reorg has happened'''
600 txs = set()
601 with self.lock:
602 for tx_hash in self.db.list_verified_tx():
603 info = self.db.get_verified_tx(tx_hash)
604 tx_height = info.height
605 if tx_height > above_height:
606 header = blockchain.read_header(tx_height)
607 if not header or hash_header(header) != info.header_hash:
608 self.db.remove_verified_tx(tx_hash)
609 # NOTE: we should add these txns to self.unverified_tx,
610 # but with what height?
611 # If on the new fork after the reorg, the txn is at the
612 # same height, we will not get a status update for the
613 # address. If the txn is not mined or at a diff height,
614 # we should get a status update. Unless we put tx into
615 # unverified_tx, it will turn into local. So we put it
616 # into unverified_tx with the old height, and if we get
617 # a status update, that will overwrite it.
618 self.unverified_tx[tx_hash] = tx_height
619 txs.add(tx_hash)
620 return txs
621
622 def get_local_height(self) -> int:
623 """ return last known height if we are offline """
624 cached_local_height = getattr(self.threadlocal_cache, 'local_height', None)
625 if cached_local_height is not None:
626 return cached_local_height
627 return self.network.get_local_height() if self.network else self.db.get('stored_height', 0)
628
629 def add_future_tx(self, tx: Transaction, num_blocks: int) -> bool:
630 assert num_blocks > 0, num_blocks
631 with self.lock:
632 tx_was_added = self.add_transaction(tx)
633 if tx_was_added:
634 self.future_tx[tx.txid()] = num_blocks
635 return tx_was_added
636
637 def get_tx_height(self, tx_hash: str) -> TxMinedInfo:
638 if tx_hash is None: # ugly backwards compat...
639 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
640 with self.lock:
641 verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
642 if verified_tx_mined_info:
643 conf = max(self.get_local_height() - verified_tx_mined_info.height + 1, 0)
644 return verified_tx_mined_info._replace(conf=conf)
645 elif tx_hash in self.unverified_tx:
646 height = self.unverified_tx[tx_hash]
647 return TxMinedInfo(height=height, conf=0)
648 elif tx_hash in self.future_tx:
649 num_blocks_remainining = self.future_tx[tx_hash]
650 assert num_blocks_remainining > 0, num_blocks_remainining
651 return TxMinedInfo(height=TX_HEIGHT_FUTURE, conf=-num_blocks_remainining)
652 else:
653 # local transaction
654 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
655
656 def set_up_to_date(self, up_to_date):
657 with self.lock:
658 status_changed = self.up_to_date != up_to_date
659 self.up_to_date = up_to_date
660 if self.network:
661 self.network.notify('status')
662 if status_changed:
663 self.logger.info(f'set_up_to_date: {up_to_date}')
664
665 def is_up_to_date(self):
666 with self.lock: return self.up_to_date
667
668 def get_history_sync_state_details(self) -> Tuple[int, int]:
669 if self.synchronizer:
670 return self.synchronizer.num_requests_sent_and_answered()
671 else:
672 return 0, 0
673
674 @with_transaction_lock
675 def get_tx_delta(self, tx_hash: str, address: str) -> int:
676 """effect of tx on address"""
677 delta = 0
678 # subtract the value of coins sent from address
679 d = self.db.get_txi_addr(tx_hash, address)
680 for n, v in d:
681 delta -= v
682 # add the value of the coins received at address
683 d = self.db.get_txo_addr(tx_hash, address)
684 for n, (v, cb) in d.items():
685 delta += v
686 return delta
687
688 def get_wallet_delta(self, tx: Transaction) -> TxWalletDelta:
689 """effect of tx on wallet"""
690 is_relevant = False # "related to wallet?"
691 num_input_ismine = 0
692 v_in = v_in_mine = v_out = v_out_mine = 0
693 with self.lock, self.transaction_lock:
694 for txin in tx.inputs():
695 addr = self.get_txin_address(txin)
696 value = self.get_txin_value(txin, address=addr)
697 if self.is_mine(addr):
698 num_input_ismine += 1
699 is_relevant = True
700 assert value is not None
701 v_in_mine += value
702 if value is None:
703 v_in = None
704 elif v_in is not None:
705 v_in += value
706 for txout in tx.outputs():
707 v_out += txout.value
708 if self.is_mine(txout.address):
709 v_out_mine += txout.value
710 is_relevant = True
711 delta = v_out_mine - v_in_mine
712 if v_in is not None:
713 fee = v_in - v_out
714 else:
715 fee = None
716 if fee is None and isinstance(tx, PartialTransaction):
717 fee = tx.get_fee()
718 return TxWalletDelta(
719 is_relevant=is_relevant,
720 is_any_input_ismine=num_input_ismine > 0,
721 is_all_input_ismine=num_input_ismine == len(tx.inputs()),
722 delta=delta,
723 fee=fee,
724 )
725
726 def get_tx_fee(self, txid: str) -> Optional[int]:
727 """ Returns tx_fee or None. Use server fee only if tx is unconfirmed and not mine"""
728 # check if stored fee is available
729 fee = self.db.get_tx_fee(txid, trust_server=False)
730 if fee is not None:
731 return fee
732 # delete server-sent fee for confirmed txns
733 confirmed = self.get_tx_height(txid).conf > 0
734 if confirmed:
735 self.db.add_tx_fee_from_server(txid, None)
736 # if all inputs are ismine, try to calc fee now;
737 # otherwise, return stored value
738 num_all_inputs = self.db.get_num_all_inputs_of_tx(txid)
739 if num_all_inputs is not None:
740 # check if tx is mine
741 num_ismine_inputs = self.db.get_num_ismine_inputs_of_tx(txid)
742 assert num_ismine_inputs <= num_all_inputs, (num_ismine_inputs, num_all_inputs)
743 # trust server if tx is unconfirmed and not mine
744 if num_ismine_inputs < num_all_inputs:
745 return None if confirmed else self.db.get_tx_fee(txid, trust_server=True)
746 # lookup tx and deserialize it.
747 # note that deserializing is expensive, hence above hacks
748 tx = self.db.get_transaction(txid)
749 if not tx:
750 return None
751 fee = self.get_wallet_delta(tx).fee
752 # save result
753 self.db.add_tx_fee_we_calculated(txid, fee)
754 self.db.add_num_inputs_to_tx(txid, len(tx.inputs()))
755 return fee
756
757 def get_addr_io(self, address):
758 with self.lock, self.transaction_lock:
759 h = self.get_address_history(address)
760 received = {}
761 sent = {}
762 for tx_hash, height in h:
763 d = self.db.get_txo_addr(tx_hash, address)
764 for n, (v, is_cb) in d.items():
765 received[tx_hash + ':%d'%n] = (height, v, is_cb)
766 for tx_hash, height in h:
767 l = self.db.get_txi_addr(tx_hash, address)
768 for txi, v in l:
769 sent[txi] = height
770 return received, sent
771
772
773 def get_addr_outputs(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
774 coins, spent = self.get_addr_io(address)
775 out = {}
776 for prevout_str, v in coins.items():
777 tx_height, value, is_cb = v
778 prevout = TxOutpoint.from_str(prevout_str)
779 utxo = PartialTxInput(prevout=prevout, is_coinbase_output=is_cb)
780 utxo._trusted_address = address
781 utxo._trusted_value_sats = value
782 utxo.block_height = tx_height
783 utxo.spent_height = spent.get(prevout_str, None)
784 out[prevout] = utxo
785 return out
786
787 def get_addr_utxo(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
788 out = self.get_addr_outputs(address)
789 for k, v in list(out.items()):
790 if v.spent_height is not None:
791 out.pop(k)
792 return out
793
794 # return the total amount ever received by an address
795 def get_addr_received(self, address):
796 received, sent = self.get_addr_io(address)
797 return sum([v for height, v, is_cb in received.values()])
798
799 @with_local_height_cached
800 def get_addr_balance(self, address, *, excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
801 """Return the balance of a bitcoin address:
802 confirmed and matured, unconfirmed, unmatured
803 """
804 if not excluded_coins: # cache is only used if there are no excluded_coins
805 cached_value = self._get_addr_balance_cache.get(address)
806 if cached_value:
807 return cached_value
808 if excluded_coins is None:
809 excluded_coins = set()
810 assert isinstance(excluded_coins, set), f"excluded_coins should be set, not {type(excluded_coins)}"
811 received, sent = self.get_addr_io(address)
812 c = u = x = 0
813 mempool_height = self.get_local_height() + 1 # height of next block
814 for txo, (tx_height, v, is_cb) in received.items():
815 if txo in excluded_coins:
816 continue
817 if is_cb and tx_height + COINBASE_MATURITY > mempool_height:
818 x += v
819 elif tx_height > 0:
820 c += v
821 else:
822 u += v
823 if txo in sent:
824 if sent[txo] > 0:
825 c -= v
826 else:
827 u -= v
828 result = c, u, x
829 # cache result.
830 if not excluded_coins:
831 # Cache needs to be invalidated if a transaction is added to/
832 # removed from history; or on new blocks (maturity...)
833 self._get_addr_balance_cache[address] = result
834 return result
835
836 @with_local_height_cached
837 def get_utxos(self, domain=None, *, excluded_addresses=None,
838 mature_only: bool = False, confirmed_only: bool = False,
839 nonlocal_only: bool = False) -> Sequence[PartialTxInput]:
840 coins = []
841 if domain is None:
842 domain = self.get_addresses()
843 domain = set(domain)
844 if excluded_addresses:
845 domain = set(domain) - set(excluded_addresses)
846 mempool_height = self.get_local_height() + 1 # height of next block
847 for addr in domain:
848 utxos = self.get_addr_utxo(addr)
849 for utxo in utxos.values():
850 if confirmed_only and utxo.block_height <= 0:
851 continue
852 if nonlocal_only and utxo.block_height == TX_HEIGHT_LOCAL:
853 continue
854 if (mature_only and utxo.is_coinbase_output()
855 and utxo.block_height + COINBASE_MATURITY > mempool_height):
856 continue
857 coins.append(utxo)
858 continue
859 return coins
860
861 def get_balance(self, domain=None, *, excluded_addresses: Set[str] = None,
862 excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
863 if domain is None:
864 domain = self.get_addresses()
865 if excluded_addresses is None:
866 excluded_addresses = set()
867 assert isinstance(excluded_addresses, set), f"excluded_addresses should be set, not {type(excluded_addresses)}"
868 domain = set(domain) - excluded_addresses
869 cc = uu = xx = 0
870 for addr in domain:
871 c, u, x = self.get_addr_balance(addr, excluded_coins=excluded_coins)
872 cc += c
873 uu += u
874 xx += x
875 return cc, uu, xx
876
877 def is_used(self, address: str) -> bool:
878 return self.get_address_history_len(address) != 0
879
880 def is_empty(self, address: str) -> bool:
881 c, u, x = self.get_addr_balance(address)
882 return c+u+x == 0
883
884 def synchronize(self):
885 pass