URI: 
       tprune channels older than two weeks from database - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit b055eeace297bd1c8a91f68ecc7dbffe2e36de8e
   DIR parent 522ce5bb9f20e3790f622e4865aeaf137a3818d1
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Thu, 16 May 2019 09:56:16 +0200
       
       prune channels older than two weeks from database
       
       Diffstat:
         M electrum/lnpeer.py                  |       6 ++----
         M electrum/lnrouter.py                |      61 ++++++++++++++++++++++---------
         M electrum/lnworker.py                |      27 ++++++++++++++++++++++++---
       
       3 files changed, 69 insertions(+), 25 deletions(-)
       ---
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -248,10 +248,8 @@ class Peer(Logger):
                    self.verify_channel_updates(good)
                    self.channel_db.on_channel_update(good)
                    # refresh gui
       -            known = self.channel_db.num_channels
       -            unknown = len(self.network.lngossip.unknown_ids)
       -            self.logger.info(f'Channels: {known} of {known+unknown}')
       -            self.network.trigger_callback('ln_status')
       +            if chan_anns or node_anns or chan_upds:
       +                self.network.lngossip.refresh_gui()
        
            def verify_channel_announcements(self, chan_anns):
                for payload in chan_anns:
   DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
       t@@ -235,20 +235,6 @@ class ChannelDB(SqlDB):
        
            @sql
            @profiler
       -    def purge_unknown_channels(self, channel_ids):
       -        ids = [x.hex() for x in channel_ids]
       -        missing = self.DBSession \
       -                      .query(ChannelInfo) \
       -                      .filter(not_(ChannelInfo.short_channel_id.in_(ids))) \
       -                      .all()
       -        if missing:
       -            self.logger.info("deleting {} channels".format(len(missing)))
       -            delete_query = ChannelInfo.__table__.delete().where(not_(ChannelInfo.short_channel_id.in_(ids)))
       -            self.DBSession.execute(delete_query)
       -            self.DBSession.commit()
       -
       -    @sql
       -    @profiler
            def compare_channels(self, channel_ids):
                ids = [x.hex() for x in channel_ids]
                # I need to get the unknown, and also the channels that need refresh
       t@@ -344,7 +330,7 @@ class ChannelDB(SqlDB):
                    self.DBSession.add(channel_info)
                self.DBSession.commit()
                self._update_counts()
       -        self.logger.info('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
       +        self.logger.debug('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
        
            @sql
            def get_last_timestamp(self):
       t@@ -390,6 +376,7 @@ class ChannelDB(SqlDB):
            @sql
            @profiler
            def on_channel_update(self, msg_payloads):
       +        now = int(time.time())
                if type(msg_payloads) is dict:
                    msg_payloads = [msg_payloads]
                new_policies = {}
       t@@ -397,7 +384,9 @@ class ChannelDB(SqlDB):
                    short_channel_id = msg_payload['short_channel_id'].hex()
                    node_id = msg_payload['node_id'].hex()
                    new_policy = Policy.from_msg(msg_payload, node_id, short_channel_id)
       -            #self.logger.info(f'on_channel_update {datetime.fromtimestamp(new_policy.timestamp).ctime()}')
       +            # must not be older than two weeks
       +            if new_policy.timestamp < now - 14*24*3600:
       +                continue
                    old_policy = self.DBSession.query(Policy).filter_by(short_channel_id=short_channel_id, start_node=node_id).one_or_none()
                    if old_policy:
                        if old_policy.timestamp >= new_policy.timestamp:
       t@@ -414,7 +403,7 @@ class ChannelDB(SqlDB):
                    self.DBSession.add(new_policy)
                self.DBSession.commit()
                if new_policies:
       -            self.logger.info(f'on_channel_update: {len(new_policies)}/{len(msg_payloads)}')
       +            self.logger.debug(f'on_channel_update: {len(new_policies)}/{len(msg_payloads)}')
                    #self.logger.info(f'last timestamp: {datetime.fromtimestamp(self._get_last_timestamp()).ctime()}')
                    self._update_counts()
        
       t@@ -446,7 +435,7 @@ class ChannelDB(SqlDB):
                    new_nodes[node_id] = node_info
                    for addr in node_addresses:
                        new_addresses[(addr.node_id,addr.host,addr.port)] = addr
       -        self.logger.info("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads)))
       +        self.logger.debug("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads)))
                for node_info in new_nodes.values():
                    self.DBSession.add(node_info)
                for new_addr in new_addresses.values():
       t@@ -467,6 +456,42 @@ class ChannelDB(SqlDB):
                    return None
                return Policy.from_msg(msg, None, short_channel_id) # won't actually be written to DB
        
       +    @sql
       +    @profiler
       +    def get_old_policies(self, delta):
       +        timestamp = int(time.time()) - delta
       +        old_policies = self.DBSession.query(Policy.short_channel_id).filter(Policy.timestamp <= timestamp)
       +        return old_policies.distinct().count()
       +
       +    @sql
       +    @profiler
       +    def prune_old_policies(self, delta):
       +        # note: delete queries are order sensitive
       +        timestamp = int(time.time()) - delta
       +        old_policies = self.DBSession.query(Policy.short_channel_id).filter(Policy.timestamp <= timestamp)
       +        delete_old_channels = ChannelInfo.__table__.delete().where(ChannelInfo.short_channel_id.in_(old_policies))
       +        delete_old_policies = Policy.__table__.delete().where(Policy.timestamp <= timestamp)
       +        self.DBSession.execute(delete_old_channels)
       +        self.DBSession.execute(delete_old_policies)
       +        self.DBSession.commit()
       +        self._update_counts()
       +
       +    @sql
       +    @profiler
       +    def get_orphaned_channels(self):
       +        subquery = self.DBSession.query(Policy.short_channel_id)
       +        orphaned = self.DBSession.query(ChannelInfo).filter(not_(ChannelInfo.short_channel_id.in_(subquery)))
       +        return orphaned.count()
       +
       +    @sql
       +    @profiler
       +    def prune_orphaned_channels(self):
       +        subquery = self.DBSession.query(Policy.short_channel_id)
       +        delete_orphaned = ChannelInfo.__table__.delete().where(not_(ChannelInfo.short_channel_id.in_(subquery)))
       +        self.DBSession.execute(delete_orphaned)
       +        self.DBSession.commit()
       +        self._update_counts()
       +
            def add_channel_update_for_private_channel(self, msg_payload: dict, start_node_id: bytes):
                if not verify_sig_for_channel_update(msg_payload, start_node_id):
                    return  # ignore
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -232,6 +232,7 @@ class LNWorker(Logger):
        class LNGossip(LNWorker):
            # height of first channel announcements
            first_block = 497000
       +    max_age = 14*24*3600
        
            def __init__(self, network):
                seed = os.urandom(32)
       t@@ -244,16 +245,36 @@ class LNGossip(LNWorker):
        
            def start_network(self, network: 'Network'):
                super().start_network(network)
       +        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
       +
       +    def refresh_gui(self):
       +        # refresh gui
       +        known = self.channel_db.num_channels
       +        unknown = len(self.unknown_ids)
       +        self.logger.info(f'Channels: {known} of {known+unknown}')
       +        self.network.trigger_callback('ln_status')
       +
       +    async def maintain_db(self):
       +        n = self.channel_db.get_orphaned_channels()
       +        if n:
       +            self.logger.info(f'Deleting {n} orphaned channels')
       +            self.channel_db.prune_orphaned_channels()
       +            self.refresh_gui()
       +        while True:
       +            n = self.channel_db.get_old_policies(self.max_age)
       +            if n:
       +                self.logger.info(f'Deleting {n} old channels')
       +                self.channel_db.prune_old_policies(self.max_age)
       +                self.refresh_gui()
       +            await asyncio.sleep(5)
        
            def add_new_ids(self, ids):
       -        #if complete:
       -        #    self.channel_db.purge_unknown_channels(ids)
                known = self.channel_db.compare_channels(ids)
                new = set(ids) - set(known)
                self.unknown_ids.update(new)
        
            def get_ids_to_query(self):
       -        N = 250
       +        N = 500
                l = list(self.unknown_ids)
                self.unknown_ids = set(l[N:])
                return l[0:N]