twatch_address.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
twatch_address.py (1249B)
---
1 #!/usr/bin/env python3
2
3 import sys
4 import asyncio
5
6 from electrum.network import Network
7 from electrum.util import print_msg, create_and_start_event_loop
8 from electrum.synchronizer import SynchronizerBase
9 from electrum.simple_config import SimpleConfig
10
11
12 try:
13 addr = sys.argv[1]
14 except Exception:
15 print("usage: watch_address <bitcoin_address>")
16 sys.exit(1)
17
18 config = SimpleConfig()
19
20 # start network
21 loop = create_and_start_event_loop()[0]
22 network = Network(config)
23 network.start()
24
25
26 class Notifier(SynchronizerBase):
27 def __init__(self, network):
28 SynchronizerBase.__init__(self, network)
29 self.watched_addresses = set()
30 self.watch_queue = asyncio.Queue()
31
32 async def main(self):
33 # resend existing subscriptions if we were restarted
34 for addr in self.watched_addresses:
35 await self._add_address(addr)
36 # main loop
37 while True:
38 addr = await self.watch_queue.get()
39 self.watched_addresses.add(addr)
40 await self._add_address(addr)
41
42 async def _on_address_status(self, addr, status):
43 print_msg(f"addr {addr}, status {status}")
44
45
46 notifier = Notifier(network)
47 asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)