tsynchronizer.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tsynchronizer.py (13823B)
---
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation files
8 # (the "Software"), to deal in the Software without restriction,
9 # including without limitation the rights to use, copy, modify, merge,
10 # publish, distribute, sublicense, and/or sell copies of the Software,
11 # and to permit persons to whom the Software is furnished to do so,
12 # subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25 import asyncio
26 import hashlib
27 from typing import Dict, List, TYPE_CHECKING, Tuple, Set
28 from collections import defaultdict
29 import logging
30
31 from aiorpcx import TaskGroup, run_in_thread, RPCError
32
33 from . import util
34 from .transaction import Transaction, PartialTransaction
35 from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy
36 from .bitcoin import address_to_scripthash, is_address
37 from .logging import Logger
38 from .interface import GracefulDisconnect, NetworkTimeout
39
40 if TYPE_CHECKING:
41 from .network import Network
42 from .address_synchronizer import AddressSynchronizer
43
44
45 class SynchronizerFailure(Exception): pass
46
47
48 def history_status(h):
49 if not h:
50 return None
51 status = ''
52 for tx_hash, height in h:
53 status += tx_hash + ':%d:' % height
54 return bh2u(hashlib.sha256(status.encode('ascii')).digest())
55
56
57 class SynchronizerBase(NetworkJobOnDefaultServer):
58 """Subscribe over the network to a set of addresses, and monitor their statuses.
59 Every time a status changes, run a coroutine provided by the subclass.
60 """
61 def __init__(self, network: 'Network'):
62 self.asyncio_loop = network.asyncio_loop
63 self._reset_request_counters()
64
65 NetworkJobOnDefaultServer.__init__(self, network)
66
67 def _reset(self):
68 super()._reset()
69 self.requested_addrs = set()
70 self.scripthash_to_address = {}
71 self._processed_some_notifications = False # so that we don't miss them
72 self._reset_request_counters()
73 # Queues
74 self.add_queue = asyncio.Queue()
75 self.status_queue = asyncio.Queue()
76
77 async def _run_tasks(self, *, taskgroup):
78 await super()._run_tasks(taskgroup=taskgroup)
79 try:
80 async with taskgroup as group:
81 await group.spawn(self.send_subscriptions())
82 await group.spawn(self.handle_status())
83 await group.spawn(self.main())
84 finally:
85 # we are being cancelled now
86 # TODO: libbitcoin (this would be handled by zeromq.Client.stop()
87 print("self.session.unsubscribe(self.status_queue)")
88
89 def _reset_request_counters(self):
90 self._requests_sent = 0
91 self._requests_answered = 0
92
93 def add(self, addr):
94 asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
95
96 async def _add_address(self, addr: str):
97 # note: this method is async as add_queue.put_nowait is not thread-safe.
98 if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
99 if addr in self.requested_addrs: return
100 self.requested_addrs.add(addr)
101 self.add_queue.put_nowait(addr)
102
103 async def _on_address_status(self, addr, status):
104 """Handle the change of the status of an address."""
105 raise NotImplementedError() # implemented by subclasses
106
107 async def send_subscriptions(self):
108 async def subscribe_to_address(addr):
109 h = address_to_scripthash(addr)
110 self.scripthash_to_address[h] = addr
111 self._requests_sent += 1
112 async with self._network_request_semaphore:
113 # TODO: libbitcoin
114 print("await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)")
115 # TODO: libbitcoin XXX: Review this, it's probably incorrect
116 print(f"DEBUG: network: subscribe_to_address: {h}")
117 await self.interface.client._subscribe_to_scripthash(h, self.status_queue)
118 self._requests_answered += 1
119 self.requested_addrs.remove(addr)
120
121 while True:
122 addr = await self.add_queue.get()
123 await self.taskgroup.spawn(subscribe_to_address, addr)
124
125 async def handle_status(self):
126 while True:
127 h, status = await self.status_queue.get()
128 addr = self.scripthash_to_address[h]
129 await self.taskgroup.spawn(self._on_address_status, addr, status)
130 self._processed_some_notifications = True
131
132 def num_requests_sent_and_answered(self) -> Tuple[int, int]:
133 return self._requests_sent, self._requests_answered
134
135 async def main(self):
136 raise NotImplementedError() # implemented by subclasses
137
138
139 class Synchronizer(SynchronizerBase):
140 '''The synchronizer keeps the wallet up-to-date with its set of
141 addresses and their transactions. It subscribes over the network
142 to wallet addresses, gets the wallet to generate new addresses
143 when necessary, requests the transaction history of any addresses
144 we don't have the full history of, and requests binary transaction
145 data of any transactions the wallet doesn't have.
146 '''
147 def __init__(self, wallet: 'AddressSynchronizer'):
148 self.wallet = wallet
149 SynchronizerBase.__init__(self, wallet.network)
150
151 def _reset(self):
152 super()._reset()
153 self.requested_tx = {}
154 self.requested_histories = set()
155 self._stale_histories = dict() # type: Dict[str, asyncio.Task]
156
157 def diagnostic_name(self):
158 return self.wallet.diagnostic_name()
159
160 def is_up_to_date(self):
161 return (not self.requested_addrs
162 and not self.requested_histories
163 and not self.requested_tx
164 and not self._stale_histories)
165
166 async def _on_address_status(self, addr, status):
167 history = self.wallet.db.get_addr_history(addr)
168 if history_status(history) == status:
169 return
170 # No point in requesting history twice for the same announced status.
171 # However if we got announced a new status, we should request history again:
172 if (addr, status) in self.requested_histories:
173 return
174 # request address history
175 self.requested_histories.add((addr, status))
176 self._stale_histories.pop(addr, asyncio.Future()).cancel()
177 h = address_to_scripthash(addr)
178 self._requests_sent += 1
179 async with self._network_request_semaphore:
180 result = await self.interface.get_history_for_scripthash(h)
181 self._requests_answered += 1
182 self.logger.info(f"receiving history {addr} {len(result)}")
183 hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
184 # tx_fees
185 tx_fees = [(item['tx_hash'], item.get('fee')) for item in result]
186 tx_fees = dict(filter(lambda x:x[1] is not None, tx_fees))
187 # Check that the status corresponds to what was announced
188 if history_status(hist) != status:
189 # could happen naturally if history changed between getting status and history (race)
190 self.logger.info(f"error: status mismatch: {addr}. we'll wait a bit for status update.")
191 # The server is supposed to send a new status notification, which will trigger a new
192 # get_history. We shall wait a bit for this to happen, otherwise we disconnect.
193 async def disconnect_if_still_stale():
194 timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
195 await asyncio.sleep(timeout)
196 raise SynchronizerFailure(f"timeout reached waiting for addr {addr}: history still stale")
197 self._stale_histories[addr] = await self.taskgroup.spawn(disconnect_if_still_stale)
198 else:
199 self._stale_histories.pop(addr, asyncio.Future()).cancel()
200 # Store received history
201 self.wallet.receive_history_callback(addr, hist, tx_fees)
202 # Request transactions we don't have
203 await self._request_missing_txs(hist)
204
205 # Remove request; this allows up_to_date to be True
206 self.requested_histories.discard((addr, status))
207
208 async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False):
209 # "hist" is a list of [tx_hash, tx_height] lists
210 transaction_hashes = []
211 for tx_hash, tx_height in hist:
212 if tx_hash in self.requested_tx:
213 continue
214 tx = self.wallet.db.get_transaction(tx_hash)
215 if tx and not isinstance(tx, PartialTransaction):
216 continue # already have complete tx
217 transaction_hashes.append(tx_hash)
218 self.requested_tx[tx_hash] = tx_height
219
220 if not transaction_hashes: return
221 async with TaskGroup() as group:
222 for tx_hash in transaction_hashes:
223 await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))
224
225 async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
226 self._requests_sent += 1
227 try:
228 async with self._network_request_semaphore:
229 raw_tx = await self.interface.get_transaction(tx_hash)
230 except RPCError as e:
231 # most likely, "No such mempool or blockchain transaction"
232 if allow_server_not_finding_tx:
233 self.requested_tx.pop(tx_hash)
234 return
235 else:
236 raise
237 finally:
238 self._requests_answered += 1
239 tx = Transaction(raw_tx)
240 if tx_hash != tx.txid():
241 raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
242 tx_height = self.requested_tx.pop(tx_hash)
243 self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
244 self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}")
245 # callbacks
246 util.trigger_callback('new_transaction', self.wallet, tx)
247
248 async def main(self):
249 self.wallet.set_up_to_date(False)
250 # request missing txns, if any
251 for addr in random_shuffled_copy(self.wallet.db.get_history()):
252 history = self.wallet.db.get_addr_history(addr)
253 # Old electrum servers returned ['*'] when all history for the address
254 # was pruned. This no longer happens but may remain in old wallets.
255 if history == ['*']: continue
256 await self._request_missing_txs(history, allow_server_not_finding_tx=True)
257 # add addresses to bootstrap
258 for addr in random_shuffled_copy(self.wallet.get_addresses()):
259 await self._add_address(addr)
260 # main loop
261 while True:
262 await asyncio.sleep(0.1)
263 await run_in_thread(self.wallet.synchronize)
264 up_to_date = self.is_up_to_date()
265 if (up_to_date != self.wallet.is_up_to_date()
266 or up_to_date and self._processed_some_notifications):
267 self._processed_some_notifications = False
268 if up_to_date:
269 self._reset_request_counters()
270 self.wallet.set_up_to_date(up_to_date)
271 util.trigger_callback('wallet_updated', self.wallet)
272
273
274 class Notifier(SynchronizerBase):
275 """Watch addresses. Every time the status of an address changes,
276 an HTTP POST is sent to the corresponding URL.
277 """
278 def __init__(self, network):
279 SynchronizerBase.__init__(self, network)
280 self.watched_addresses = defaultdict(list) # type: Dict[str, List[str]]
281 self._start_watching_queue = asyncio.Queue() # type: asyncio.Queue[Tuple[str, str]]
282
283 async def main(self):
284 # resend existing subscriptions if we were restarted
285 for addr in self.watched_addresses:
286 await self._add_address(addr)
287 # main loop
288 while True:
289 addr, url = await self._start_watching_queue.get()
290 self.watched_addresses[addr].append(url)
291 await self._add_address(addr)
292
293 async def start_watching_addr(self, addr: str, url: str):
294 await self._start_watching_queue.put((addr, url))
295
296 async def stop_watching_addr(self, addr: str):
297 self.watched_addresses.pop(addr, None)
298 # TODO blockchain.scripthash.unsubscribe
299
300 async def _on_address_status(self, addr, status):
301 if addr not in self.watched_addresses:
302 return
303 self.logger.info(f'new status for addr {addr}')
304 headers = {'content-type': 'application/json'}
305 data = {'address': addr, 'status': status}
306 for url in self.watched_addresses[addr]:
307 try:
308 async with make_aiohttp_session(proxy=self.network.proxy, headers=headers) as session:
309 async with session.post(url, json=data, headers=headers) as resp:
310 await resp.text()
311 except Exception as e:
312 self.logger.info(repr(e))
313 else:
314 self.logger.info(f'Got Response for {addr}')