tMove the part of process_gossip that requires access to channel_db into in LNGossip. - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit f83d2d9fee160e7b5475d397672559fc8f947470 DIR parent 7ab1a4552b579cd7b9a2197be12da4f02f396629 HTML Author: ThomasV <thomasv@electrum.org> Date: Mon, 21 Dec 2020 13:26:56 +0100 Move the part of process_gossip that requires access to channel_db into in LNGossip. Diffstat: M electrum/lnpeer.py | 45 +++++++++---------------------- M electrum/lnworker.py | 23 ++++++++++++++++++++++- 2 files changed, 35 insertions(+), 33 deletions(-) --- DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py t@@ -84,7 +84,6 @@ class Peer(Logger): self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)] assert self.node_ids[0] != self.node_ids[1] self.network = lnworker.network - self.channel_db = lnworker.network.channel_db self.ping_time = 0 self.reply_channel_range = asyncio.Queue() # gossip uses a single queue to preserve message order t@@ -261,6 +260,15 @@ class Peer(Logger): if chan.short_channel_id == payload['short_channel_id']: chan.set_remote_update(payload['raw']) self.logger.info("saved remote_update") + break + else: + # Save (some bounded number of) orphan channel updates for later + # as it might be for our own direct channel with this peer + # (and we might not yet know the short channel id for that) + short_channel_id = ShortChannelID(payload['short_channel_id']) + self.orphan_channel_updates[short_channel_id] = payload + while len(self.orphan_channel_updates) > 25: + self.orphan_channel_updates.popitem(last=False) def on_announcement_signatures(self, chan: Channel, payload): if chan.config[LOCAL].was_announced: t@@ -292,8 +300,6 @@ class Peer(Logger): await group.spawn(self.process_gossip()) async def process_gossip(self): - await self.channel_db.data_loaded.wait() - # verify in peer's TaskGroup so that we fail the connection while True: await asyncio.sleep(5) chan_anns = [] t@@ -311,35 +317,10 @@ class Peer(Logger): raise Exception('unknown message') if self.gossip_queue.empty(): break - self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}') - # note: data processed in chunks to avoid taking sql lock for too long - # channel announcements - for chan_anns_chunk in chunks(chan_anns, 300): - self.verify_channel_announcements(chan_anns_chunk) - self.channel_db.add_channel_announcement(chan_anns_chunk) - # node announcements - for node_anns_chunk in chunks(node_anns, 100): - self.verify_node_announcements(node_anns_chunk) - self.channel_db.add_node_announcement(node_anns_chunk) - # channel updates - for chan_upds_chunk in chunks(chan_upds, 1000): - categorized_chan_upds = self.channel_db.add_channel_updates( - chan_upds_chunk, max_age=self.network.lngossip.max_age) - orphaned = categorized_chan_upds.orphaned - if orphaned: - self.logger.info(f'adding {len(orphaned)} unknown channel ids') - orphaned_ids = [c['short_channel_id'] for c in orphaned] - await self.network.lngossip.add_new_ids(orphaned_ids) - # Save (some bounded number of) orphan channel updates for later - # as it might be for our own direct channel with this peer - # (and we might not yet know the short channel id for that) - for chan_upd_payload in orphaned: - short_channel_id = ShortChannelID(chan_upd_payload['short_channel_id']) - self.orphan_channel_updates[short_channel_id] = chan_upd_payload - while len(self.orphan_channel_updates) > 25: - self.orphan_channel_updates.popitem(last=False) - if categorized_chan_upds.good: - self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}') + # verify in peer's TaskGroup so that we fail the connection + self.verify_channel_announcements(chan_anns) + self.verify_node_announcements(node_anns) + await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds) def verify_channel_announcements(self, chan_anns): for payload in chan_anns: DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -25,7 +25,7 @@ from aiorpcx import run_in_thread, TaskGroup, NetAddress from . import constants, util from . import keystore -from .util import profiler +from .util import profiler, chunks from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER from .util import NetworkRetryManager, JsonRPCClient from .lnutil import LN_MAX_FUNDING_SAT t@@ -518,6 +518,27 @@ class LNGossip(LNWorker): progress_percent = 0 return current_est, total_est, progress_percent + async def process_gossip(self, chan_anns, node_anns, chan_upds): + await self.channel_db.data_loaded.wait() + self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}') + # note: data processed in chunks to avoid taking sql lock for too long + # channel announcements + for chan_anns_chunk in chunks(chan_anns, 300): + self.channel_db.add_channel_announcement(chan_anns_chunk) + # node announcements + for node_anns_chunk in chunks(node_anns, 100): + self.channel_db.add_node_announcement(node_anns_chunk) + # channel updates + for chan_upds_chunk in chunks(chan_upds, 1000): + categorized_chan_upds = self.channel_db.add_channel_updates( + chan_upds_chunk, max_age=self.max_age) + orphaned = categorized_chan_upds.orphaned + if orphaned: + self.logger.info(f'adding {len(orphaned)} unknown channel ids') + orphaned_ids = [c['short_channel_id'] for c in orphaned] + await self.add_new_ids(orphaned_ids) + if categorized_chan_upds.good: + self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}') class LNWallet(LNWorker):