tUse one LNWatcher instance per wallet - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 2be68ac4d2e074ed3e50eb754ce12d06c88e9745 DIR parent 4d76e84218d1b550adf13c197869dff080222ba8 HTML Author: ThomasV <thomasv@electrum.org> Date: Wed, 3 Jul 2019 08:46:00 +0200 Use one LNWatcher instance per wallet Diffstat: M electrum/lnpeer.py | 7 ++----- M electrum/lnwatcher.py | 59 +++++++++++++++---------------- M electrum/lnworker.py | 26 +++++++++++++------------- 3 files changed, 44 insertions(+), 48 deletions(-) --- DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py t@@ -67,7 +67,6 @@ class Peer(Logger): self.localfeatures = self.lnworker.localfeatures self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)] self.network = lnworker.network - self.lnwatcher = lnworker.network.lnwatcher self.channel_db = lnworker.network.channel_db self.ping_time = 0 self.reply_channel_range = asyncio.Queue() t@@ -550,7 +549,6 @@ class Peer(Logger): chan = Channel(chan_dict, sweep_address=self.lnworker.sweep_address, lnworker=self.lnworker) - chan.lnwatcher = self.lnwatcher sig_64, _ = chan.sign_next_commitment() self.send_message("funding_created", temporary_channel_id=temp_channel_id, t@@ -638,7 +636,6 @@ class Peer(Logger): chan = Channel(chan_dict, sweep_address=self.lnworker.sweep_address, lnworker=self.lnworker) - chan.lnwatcher = self.lnwatcher remote_sig = funding_created['signature'] chan.receive_new_commitment(remote_sig, []) sig_64, _ = chan.sign_next_commitment() t@@ -648,7 +645,7 @@ class Peer(Logger): ) chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig) self.lnworker.save_channel(chan) - await self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) + self.lnworker.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) self.lnworker.on_channels_updated() while True: try: t@@ -1277,7 +1274,7 @@ class Peer(Logger): outpoint = chan.funding_outpoint.to_str() sweeptxs = create_sweeptxs_for_watchtower(chan, ctx, per_commitment_secret, chan.sweep_address) for tx in sweeptxs: - await self.lnwatcher.add_sweep_tx(outpoint, tx.prevout(0), str(tx)) + await self.lnworker.lnwatcher.add_sweep_tx(outpoint, tx.prevout(0), str(tx)) def on_update_fee(self, payload): channel_id = payload["channel_id"] DIR diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py t@@ -14,6 +14,7 @@ from typing import NamedTuple, Dict import jsonrpclib from .sql_db import SqlDB, sql +from .json_db import JsonDB from .util import bh2u, bfh, log_exceptions, ignore_exceptions from . import wallet from .storage import WalletStorage t@@ -142,16 +143,19 @@ class LNWatcher(AddressSynchronizer): verbosity_filter = 'W' def __init__(self, network: 'Network'): - path = os.path.join(network.config.path, "watchtower_wallet") - storage = WalletStorage(path) - AddressSynchronizer.__init__(self, storage) + AddressSynchronizer.__init__(self, JsonDB({}, manual_upgrades=False)) self.config = network.config self.start_network(network) self.lock = threading.RLock() - self.sweepstore = SweepStore(os.path.join(network.config.path, "watchtower_db"), network) + self.sweepstore = None + self.channels = {} + if self.config.get('sweepstore', False): + self.sweepstore = SweepStore(os.path.join(network.config.path, "watchtower_db"), network) + self.watchtower = None + if self.config.get('watchtower_url'): + self.set_remote_watchtower() self.network.register_callback(self.on_network_update, ['network_updated', 'blockchain_updated', 'verified', 'wallet_updated']) - self.set_remote_watchtower() # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done, # and a queue for seeing which txs are being published self.tx_progress = {} # type: Dict[str, ListenerItem] t@@ -167,14 +171,18 @@ class LNWatcher(AddressSynchronizer): self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None except: self.watchtower = None - self.watchtower_queue = asyncio.Queue() + self.watchtower_queue = asyncio.Queue() def get_num_tx(self, outpoint): + if not self.sweepstore: + return 0 async def f(): return await self.sweepstore.get_num_tx(outpoint) return self.network.run_from_another_thread(f()) def list_sweep_tx(self): + if not self.sweepstore: + return [] async def f(): return await self.sweepstore.list_sweep_tx() return self.network.run_from_another_thread(f()) t@@ -182,35 +190,25 @@ class LNWatcher(AddressSynchronizer): @ignore_exceptions @log_exceptions async def watchtower_task(self): + if not self.watchtower: + return self.logger.info('watchtower task started') - # initial check - for address, outpoint in await self.sweepstore.list_channel_info(): - await self.watchtower_queue.put(outpoint) while True: - outpoint = await self.watchtower_queue.get() - if self.watchtower is None: - continue - # synchronize with remote + outpoint, prevout, tx = await self.watchtower_queue.get() try: - local_n = await self.sweepstore.get_num_tx(outpoint) - n = self.watchtower.get_num_tx(outpoint) - if n == 0: - address = await self.sweepstore.get_address(outpoint) - self.watchtower.add_channel(outpoint, address) - self.logger.info("sending %d transactions to watchtower"%(local_n - n)) - for index in range(n, local_n): - prevout, tx = await self.sweepstore.get_tx_by_index(outpoint, index) - self.watchtower.add_sweep_tx(outpoint, prevout, tx) + self.watchtower.add_sweep_tx(outpoint, prevout, tx) + self.logger.info("transaction sent to watchtower") except ConnectionRefusedError: self.logger.info('could not reach watchtower, will retry in 5s') await asyncio.sleep(5) - await self.watchtower_queue.put(outpoint) + await self.watchtower_queue.put((outpoint, prevout, tx)) - async def add_channel(self, outpoint, address): + def add_channel(self, outpoint, address): self.add_address(address) - with self.lock: - if not await self.sweepstore.has_channel(outpoint): - await self.sweepstore.add_channel(outpoint, address) + self.channels[address] = outpoint + #if self.sweepstore: + # if not await self.sweepstore.has_channel(outpoint): + # await self.sweepstore.add_channel(outpoint, address) async def unwatch_channel(self, address, funding_outpoint): self.logger.info(f'unwatching {funding_outpoint}') t@@ -229,7 +227,7 @@ class LNWatcher(AddressSynchronizer): return if not self.up_to_date: return - for address, outpoint in await self.sweepstore.list_channel_info(): + for address, outpoint in self.channels.items(): await self.check_onchain_situation(address, outpoint) async def check_onchain_situation(self, address, funding_outpoint): t@@ -306,9 +304,10 @@ class LNWatcher(AddressSynchronizer): return txid async def add_sweep_tx(self, funding_outpoint: str, prevout: str, tx: str): - await self.sweepstore.add_sweep_tx(funding_outpoint, prevout, tx) + if self.sweepstore: + await self.sweepstore.add_sweep_tx(funding_outpoint, prevout, tx) if self.watchtower: - self.watchtower_queue.put_nowait(funding_outpoint) + self.watchtower_queue.put_nowait(funding_outpoint, prevout, tx) def get_tx_mined_depth(self, txid: str): if not txid: DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -47,6 +47,7 @@ from .lnrouter import RouteEdge, is_route_sane_to_use from .address_synchronizer import TX_HEIGHT_LOCAL from . import lnsweep from .lnsweep import create_sweeptxs_for_their_ctx, create_sweeptxs_for_our_ctx +from .lnwatcher import LNWatcher if TYPE_CHECKING: from .network import Network t@@ -317,19 +318,20 @@ class LNWallet(LNWorker): self.pending_payments = defaultdict(asyncio.Future) def start_network(self, network: 'Network'): + self.lnwatcher = LNWatcher(network) self.network = network self.network.register_callback(self.on_network_update, ['wallet_updated', 'network_updated', 'verified', 'fee']) # thread safe self.network.register_callback(self.on_channel_open, ['channel_open']) self.network.register_callback(self.on_channel_closed, ['channel_closed']) for chan_id, chan in self.channels.items(): - self.network.lnwatcher.add_address(chan.get_funding_address()) + self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) super().start_network(network) for coro in [ self.maybe_listen(), self.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified - self.network.lnwatcher.on_network_update('network_updated'), # ping watcher to check our channels + self.lnwatcher.on_network_update('network_updated'), # ping watcher to check our channels self.reestablish_peers_and_channels() ]: asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop) t@@ -468,13 +470,13 @@ class LNWallet(LNWorker): if it's also deep enough, also save to disk. Returns tuple (mined_deep_enough, num_confirmations). """ - lnwatcher = self.network.lnwatcher - conf = lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf + conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf if conf > 0: - block_height, tx_pos = lnwatcher.get_txpos(chan.funding_outpoint.txid) + block_height, tx_pos = self.lnwatcher.get_txpos(chan.funding_outpoint.txid) assert tx_pos >= 0 chan.short_channel_id_predicted = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index) if conf >= chan.constraints.funding_txn_minimum_depth > 0: + self.logger.info(f"save_short_channel_id") chan.short_channel_id = chan.short_channel_id_predicted self.save_channel(chan) self.on_channels_updated() t@@ -492,7 +494,7 @@ class LNWallet(LNWorker): chan = self.channel_by_txo(funding_outpoint) if not chan: return - #self.logger.debug(f'on_channel_open {funding_outpoint}') + self.logger.debug(f'on_channel_open {funding_outpoint}') self.channel_timestamps[bh2u(chan.channel_id)] = funding_txid, funding_height.height, funding_height.timestamp, None, None, None self.storage.put('lightning_channel_timestamps', self.channel_timestamps) chan.set_funding_txo_spentness(False) t@@ -550,7 +552,7 @@ class LNWallet(LNWorker): .format(name, local_height, cltv_expiry, prevout)) broadcast = False if csv_delay: - prev_height = self.network.lnwatcher.get_tx_height(prev_txid) + prev_height = self.lnwatcher.get_tx_height(prev_txid) remaining = csv_delay - prev_height.conf if remaining > 0: self.logger.info('waiting for {}: CSV ({} >= {}), prevout: {}' t@@ -593,9 +595,8 @@ class LNWallet(LNWorker): # since short_channel_id could be changed while saving. with self.lock: channels = list(self.channels.values()) - lnwatcher = self.network.lnwatcher if event in ('verified', 'wallet_updated'): - if args[0] != lnwatcher: + if args[0] != self.lnwatcher: return for chan in channels: if chan.is_closed(): t@@ -615,11 +616,11 @@ class LNWallet(LNWorker): return if event == 'fee': await peer.bitcoin_fee_update(chan) - conf = lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf + conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf peer.on_network_update(chan, conf) elif chan.force_closed and chan.get_state() != 'CLOSED': txid = chan.force_close_tx().txid() - height = lnwatcher.get_tx_height(txid).height + height = self.lnwatcher.get_tx_height(txid).height self.logger.info(f"force closing tx {txid}, height {height}") if height == TX_HEIGHT_LOCAL: self.logger.info('REBROADCASTING CLOSING TX') t@@ -635,8 +636,7 @@ class LNWallet(LNWorker): push_msat=push_sat * 1000, temp_channel_id=os.urandom(32)) self.save_channel(chan) - self.network.lnwatcher.add_address(chan.get_funding_address()) - await self.network.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) + self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) self.on_channels_updated() return chan