tprune channels older than two weeks from database - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit b055eeace297bd1c8a91f68ecc7dbffe2e36de8e DIR parent 522ce5bb9f20e3790f622e4865aeaf137a3818d1 HTML Author: ThomasV <thomasv@electrum.org> Date: Thu, 16 May 2019 09:56:16 +0200 prune channels older than two weeks from database Diffstat: M electrum/lnpeer.py | 6 ++---- M electrum/lnrouter.py | 61 ++++++++++++++++++++++--------- M electrum/lnworker.py | 27 ++++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 25 deletions(-) --- DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py t@@ -248,10 +248,8 @@ class Peer(Logger): self.verify_channel_updates(good) self.channel_db.on_channel_update(good) # refresh gui - known = self.channel_db.num_channels - unknown = len(self.network.lngossip.unknown_ids) - self.logger.info(f'Channels: {known} of {known+unknown}') - self.network.trigger_callback('ln_status') + if chan_anns or node_anns or chan_upds: + self.network.lngossip.refresh_gui() def verify_channel_announcements(self, chan_anns): for payload in chan_anns: DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py t@@ -235,20 +235,6 @@ class ChannelDB(SqlDB): @sql @profiler - def purge_unknown_channels(self, channel_ids): - ids = [x.hex() for x in channel_ids] - missing = self.DBSession \ - .query(ChannelInfo) \ - .filter(not_(ChannelInfo.short_channel_id.in_(ids))) \ - .all() - if missing: - self.logger.info("deleting {} channels".format(len(missing))) - delete_query = ChannelInfo.__table__.delete().where(not_(ChannelInfo.short_channel_id.in_(ids))) - self.DBSession.execute(delete_query) - self.DBSession.commit() - - @sql - @profiler def compare_channels(self, channel_ids): ids = [x.hex() for x in channel_ids] # I need to get the unknown, and also the channels that need refresh t@@ -344,7 +330,7 @@ class ChannelDB(SqlDB): self.DBSession.add(channel_info) self.DBSession.commit() self._update_counts() - self.logger.info('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads))) + self.logger.debug('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads))) @sql def get_last_timestamp(self): t@@ -390,6 +376,7 @@ class ChannelDB(SqlDB): @sql @profiler def on_channel_update(self, msg_payloads): + now = int(time.time()) if type(msg_payloads) is dict: msg_payloads = [msg_payloads] new_policies = {} t@@ -397,7 +384,9 @@ class ChannelDB(SqlDB): short_channel_id = msg_payload['short_channel_id'].hex() node_id = msg_payload['node_id'].hex() new_policy = Policy.from_msg(msg_payload, node_id, short_channel_id) - #self.logger.info(f'on_channel_update {datetime.fromtimestamp(new_policy.timestamp).ctime()}') + # must not be older than two weeks + if new_policy.timestamp < now - 14*24*3600: + continue old_policy = self.DBSession.query(Policy).filter_by(short_channel_id=short_channel_id, start_node=node_id).one_or_none() if old_policy: if old_policy.timestamp >= new_policy.timestamp: t@@ -414,7 +403,7 @@ class ChannelDB(SqlDB): self.DBSession.add(new_policy) self.DBSession.commit() if new_policies: - self.logger.info(f'on_channel_update: {len(new_policies)}/{len(msg_payloads)}') + self.logger.debug(f'on_channel_update: {len(new_policies)}/{len(msg_payloads)}') #self.logger.info(f'last timestamp: {datetime.fromtimestamp(self._get_last_timestamp()).ctime()}') self._update_counts() t@@ -446,7 +435,7 @@ class ChannelDB(SqlDB): new_nodes[node_id] = node_info for addr in node_addresses: new_addresses[(addr.node_id,addr.host,addr.port)] = addr - self.logger.info("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) + self.logger.debug("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) for node_info in new_nodes.values(): self.DBSession.add(node_info) for new_addr in new_addresses.values(): t@@ -467,6 +456,42 @@ class ChannelDB(SqlDB): return None return Policy.from_msg(msg, None, short_channel_id) # won't actually be written to DB + @sql + @profiler + def get_old_policies(self, delta): + timestamp = int(time.time()) - delta + old_policies = self.DBSession.query(Policy.short_channel_id).filter(Policy.timestamp <= timestamp) + return old_policies.distinct().count() + + @sql + @profiler + def prune_old_policies(self, delta): + # note: delete queries are order sensitive + timestamp = int(time.time()) - delta + old_policies = self.DBSession.query(Policy.short_channel_id).filter(Policy.timestamp <= timestamp) + delete_old_channels = ChannelInfo.__table__.delete().where(ChannelInfo.short_channel_id.in_(old_policies)) + delete_old_policies = Policy.__table__.delete().where(Policy.timestamp <= timestamp) + self.DBSession.execute(delete_old_channels) + self.DBSession.execute(delete_old_policies) + self.DBSession.commit() + self._update_counts() + + @sql + @profiler + def get_orphaned_channels(self): + subquery = self.DBSession.query(Policy.short_channel_id) + orphaned = self.DBSession.query(ChannelInfo).filter(not_(ChannelInfo.short_channel_id.in_(subquery))) + return orphaned.count() + + @sql + @profiler + def prune_orphaned_channels(self): + subquery = self.DBSession.query(Policy.short_channel_id) + delete_orphaned = ChannelInfo.__table__.delete().where(not_(ChannelInfo.short_channel_id.in_(subquery))) + self.DBSession.execute(delete_orphaned) + self.DBSession.commit() + self._update_counts() + def add_channel_update_for_private_channel(self, msg_payload: dict, start_node_id: bytes): if not verify_sig_for_channel_update(msg_payload, start_node_id): return # ignore DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -232,6 +232,7 @@ class LNWorker(Logger): class LNGossip(LNWorker): # height of first channel announcements first_block = 497000 + max_age = 14*24*3600 def __init__(self, network): seed = os.urandom(32) t@@ -244,16 +245,36 @@ class LNGossip(LNWorker): def start_network(self, network: 'Network'): super().start_network(network) + asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop) + + def refresh_gui(self): + # refresh gui + known = self.channel_db.num_channels + unknown = len(self.unknown_ids) + self.logger.info(f'Channels: {known} of {known+unknown}') + self.network.trigger_callback('ln_status') + + async def maintain_db(self): + n = self.channel_db.get_orphaned_channels() + if n: + self.logger.info(f'Deleting {n} orphaned channels') + self.channel_db.prune_orphaned_channels() + self.refresh_gui() + while True: + n = self.channel_db.get_old_policies(self.max_age) + if n: + self.logger.info(f'Deleting {n} old channels') + self.channel_db.prune_old_policies(self.max_age) + self.refresh_gui() + await asyncio.sleep(5) def add_new_ids(self, ids): - #if complete: - # self.channel_db.purge_unknown_channels(ids) known = self.channel_db.compare_channels(ids) new = set(ids) - set(known) self.unknown_ids.update(new) def get_ids_to_query(self): - N = 250 + N = 500 l = list(self.unknown_ids) self.unknown_ids = set(l[N:]) return l[0:N]