URI: 
       tparallelize short_channel_id requests - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 0e42fd9f1738a05f9059f00fa3b9d9868326e7e0
   DIR parent 1011245c5e29a128edf4629d024c4cf345190282
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Mon, 13 May 2019 22:33:56 +0200
       
       parallelize short_channel_id requests
       
       Diffstat:
         M electrum/gui/qt/lightning_dialog.py |      12 +++++++++---
         M electrum/lnpeer.py                  |      58 ++++++++++++++++++++++++-------
         M electrum/lnrouter.py                |       2 --
         M electrum/lnworker.py                |      72 ++++++++++++-------------------
       
       4 files changed, 82 insertions(+), 62 deletions(-)
       ---
   DIR diff --git a/electrum/gui/qt/lightning_dialog.py b/electrum/gui/qt/lightning_dialog.py
       t@@ -78,8 +78,11 @@ class LightningDialog(QDialog):
                # channel_db
                network_w = QWidget()
                network_vbox = QVBoxLayout(network_w)
       +        self.num_peers = QLabel('')
       +        network_vbox.addWidget(self.num_peers)
                self.status = QLabel('')
                network_vbox.addWidget(self.status)
       +        network_vbox.addStretch(1)
                # local
                local_w = QWidget()
                vbox_local = QVBoxLayout(local_w)
       t@@ -105,14 +108,17 @@ class LightningDialog(QDialog):
                b.clicked.connect(self.on_close)
                vbox.addLayout(Buttons(b))
                self.watcher_list.update()
       -        self.gui_object.timer.timeout.connect(self.update_status)
       +        self.network.register_callback(self.update_status, ['ln_status'])
        
       -    def update_status(self):
       +    def update_status(self, event):
                if self.network.lngossip is None:
                    return
                channel_db = self.network.channel_db
                num_peers = sum([p.initialized.is_set() for p in self.network.lngossip.peers.values()])
       -        msg = _('{} peers, {} nodes, {} channels.').format(num_peers, channel_db.num_nodes, channel_db.num_channels)
       +        self.num_peers.setText(f'{num_peers} peers, {channel_db.num_nodes} nodes')
       +        known = channel_db.num_channels
       +        unknown = len(self.network.lngossip.unknown_ids)
       +        msg = _(f'Channels: {known} of {known + unknown}')
                self.status.setText(msg)
        
            def on_close(self):
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -83,7 +83,6 @@ class Peer(Logger):
                self.recv_commitment_for_ctn_last = defaultdict(lambda: None)  # type: Dict[Channel, Optional[int]]
                self._local_changed_events = defaultdict(asyncio.Event)
                self._remote_changed_events = defaultdict(asyncio.Event)
       -        self.receiving_channels = False
                Logger.__init__(self)
        
            def send_message(self, message_name: str, **kwargs):
       t@@ -197,7 +196,7 @@ class Peer(Logger):
                    try:
                        return await func(self, *args, **kwargs)
                    except Exception as e:
       -                self.logger.info("Disconnecting: {}".format(e))
       +                self.logger.info("Disconnecting: {}".format(repr(e)))
                    finally:
                        self.close_and_cleanup()
                return wrapper_func
       t@@ -207,8 +206,47 @@ class Peer(Logger):
            async def main_loop(self):
                async with aiorpcx.TaskGroup() as group:
                    await group.spawn(self._message_loop())
       -            # kill group if the peer times out
       -            await group.spawn(asyncio.wait_for(self.initialized.wait(), 10))
       +            await group.spawn(self._run_gossip())
       +
       +    @log_exceptions
       +    async def _run_gossip(self):
       +        await asyncio.wait_for(self.initialized.wait(), 5)
       +        if self.lnworker == self.lnworker.network.lngossip:
       +            ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
       +            self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
       +            self.lnworker.add_new_ids(ids)
       +            while True:
       +                todo = self.lnworker.get_ids_to_query()
       +                if not todo:
       +                    await asyncio.sleep(1)
       +                    continue
       +                await self.querying_lock.acquire()
       +                self.logger.info(f'Querying {len(todo)} short_channel_ids')
       +                self.query_short_channel_ids(todo)
       +
       +    async def get_channel_range(self):
       +        req_index = self.lnworker.first_block
       +        req_num = self.lnworker.network.get_local_height() - req_index
       +        self.query_channel_range(req_index, req_num)
       +        intervals = []
       +        ids = set()
       +        while True:
       +            index, num, complete, _ids = await self.reply_channel_range.get()
       +            ids.update(_ids)
       +            intervals.append((index, index+num))
       +            intervals.sort()
       +            while len(intervals) > 1:
       +                a,b = intervals[0]
       +                c,d = intervals[1]
       +                if b == c:
       +                    intervals = [(a,d)] + intervals[2:]
       +                else:
       +                    break
       +            if len(intervals) == 1:
       +                a, b = intervals[0]
       +                if a <= req_index and b >= req_index + req_num:
       +                    break
       +        return ids, complete
        
            def request_gossip(self, timestamp=0):
                if timestamp == 0:
       t@@ -222,7 +260,7 @@ class Peer(Logger):
                    timestamp_range=b'\xff'*4)
        
            def query_channel_range(self, index, num):
       -        self.logger.info(f'query channel range')
       +        self.logger.info(f'query channel range {index} {num}')
                self.send_message(
                    'query_channel_range',
                    chain_hash=constants.net.rev_genesis_bytes(),
       t@@ -250,9 +288,7 @@ class Peer(Logger):
                ids = self.decode_short_ids(encoded)
                self.reply_channel_range.put_nowait((first, num, complete, ids))
        
       -    async def query_short_channel_ids(self, ids, compressed=True):
       -        await self.querying_lock.acquire()
       -        #self.logger.info('querying {} short_channel_ids'.format(len(ids)))
       +    def query_short_channel_ids(self, ids, compressed=True):
                s = b''.join(ids)
                encoded = zlib.compress(s) if compressed else s
                prefix = b'\x01' if compressed else b'\x00'
       t@@ -282,11 +318,7 @@ class Peer(Logger):
                        self.transport.close()
                except:
                    pass
       -        for chan in self.channels.values():
       -            if chan.get_state() != 'FORCE_CLOSING':
       -                chan.set_state('DISCONNECTED')
       -            self.network.trigger_callback('channel', chan)
       -        self.lnworker.peers.pop(self.pubkey)
       +        self.lnworker.peer_closed(self)
        
            def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
                # key derivation
   DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
       t@@ -359,7 +359,6 @@ class ChannelDB(SqlDB):
                self.DBSession.commit()
                self._update_counts()
                self.logger.info('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
       -        self.network.trigger_callback('ln_status')
        
            @sql
            def get_last_timestamp(self):
       t@@ -457,7 +456,6 @@ class ChannelDB(SqlDB):
                        self.DBSession.add(new_addr)
                self.DBSession.commit()
                self._update_counts()
       -        self.network.trigger_callback('ln_status')
        
            def get_routing_policy_for_channel(self, start_node_id: bytes,
                                               short_channel_id: bytes) -> Optional[bytes]:
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -101,6 +101,7 @@ class LNWorker(Logger):
                        self.network.trigger_callback('ln_status')
                    await asyncio.start_server(cb, addr, int(port))
        
       +    @log_exceptions
            async def main_loop(self):
                while True:
                    await asyncio.sleep(1)
       t@@ -165,7 +166,7 @@ class LNWorker(Logger):
                        host, port = self.choose_preferred_address(addrs)
                        peer = LNPeerAddr(host, port, bytes.fromhex(node.node_id))
                        if peer in self._last_tried_peer: continue
       -                self.logger.info('taking random ln peer from our channel db')
       +                #self.logger.info('taking random ln peer from our channel db')
                        return [peer]
        
                # TODO remove this. For some reason the dns seeds seem to ignore the realm byte
       t@@ -237,62 +238,38 @@ class LNGossip(LNWorker):
                node = BIP32Node.from_rootseed(seed, xtype='standard')
                xprv = node.to_xprv()
                super().__init__(xprv)
       +        self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_OPT
                self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_REQ
       +        self.unknown_ids = set()
        
            def start_network(self, network: 'Network'):
                super().start_network(network)
                asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.gossip_task()), self.network.asyncio_loop)
        
       -    async def gossip_task(self):
       -        req_index = self.first_block
       -        req_num = self.network.get_local_height() - req_index
       -        while len(self.peers) == 0:
       -            await asyncio.sleep(1)
       -            continue
       -        # todo: parallelize over peers
       -        peer = list(self.peers.values())[0]
       -        await peer.initialized.wait()
       -        # send channels_range query. peer will reply with several intervals
       -        peer.query_channel_range(req_index, req_num)
       -        intervals = []
       -        ids = set()
       -        # wait until requested range is covered
       -        while True:
       -            index, num, complete, _ids = await peer.reply_channel_range.get()
       -            ids.update(_ids)
       -            intervals.append((index, index+num))
       -            intervals.sort()
       -            while len(intervals) > 1:
       -                a,b = intervals[0]
       -                c,d = intervals[1]
       -                if b == c:
       -                    intervals = [(a,d)] + intervals[2:]
       -                else:
       -                    break
       -            if len(intervals) == 1:
       -                a, b = intervals[0]
       -                if a <= req_index and b >= req_index + req_num:
       -                    break
       -        self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
       -        # TODO: filter results by date of last channel update, purge DB
       +    def add_new_ids(self, ids):
                #if complete:
                #    self.channel_db.purge_unknown_channels(ids)
                known = self.channel_db.compare_channels(ids)
       -        unknown = list(ids - set(known))
       -        total = len(unknown)
       -        N = 500
       -        while unknown:
       -            self.channel_db.process_gossip()
       -            await peer.query_short_channel_ids(unknown[0:N])
       -            unknown = unknown[N:]
       -            self.logger.info(f'Querying channels: {total - len(unknown)}/{total}. Count: {self.channel_db.num_channels}')
       +        new = ids - set(known)
       +        self.unknown_ids.update(new)
        
       -        # request gossip fromm current time
       -        now = int(time.time())
       -        peer.request_gossip(now)
       +    def get_ids_to_query(self):
       +        N = 250
       +        l = list(self.unknown_ids)
       +        self.unknown_ids = set(l[N:])
       +        return l[0:N]
       +
       +    async def gossip_task(self):
                while True:
                    await asyncio.sleep(5)
                    self.channel_db.process_gossip()
       +            known = self.channel_db.num_channels
       +            unknown = len(self.unknown_ids)
       +            self.logger.info(f'Channels: {known} of {known+unknown}')
       +            self.network.trigger_callback('ln_status')
       +
       +    def peer_closed(self, peer):
       +        self.peers.pop(peer.pubkey)
        
        
        class LNWallet(LNWorker):
       t@@ -343,6 +320,13 @@ class LNWallet(LNWorker):
                ]:
                    asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
        
       +    def peer_closed(self, peer):
       +        for chan in self.channels_for_peer(peer.pubkey).values():
       +            if chan.get_state() != 'FORCE_CLOSING':
       +                chan.set_state('DISCONNECTED')
       +            self.network.trigger_callback('channel', chan)
       +        self.peers.pop(peer.pubkey)
       +
            def payment_completed(self, chan: Channel, direction: Direction,
                                  htlc: UpdateAddHtlc):
                chan_id = chan.channel_id