tparallelize short_channel_id requests - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 0e42fd9f1738a05f9059f00fa3b9d9868326e7e0 DIR parent 1011245c5e29a128edf4629d024c4cf345190282 HTML Author: ThomasV <thomasv@electrum.org> Date: Mon, 13 May 2019 22:33:56 +0200 parallelize short_channel_id requests Diffstat: M electrum/gui/qt/lightning_dialog.py | 12 +++++++++--- M electrum/lnpeer.py | 58 ++++++++++++++++++++++++------- M electrum/lnrouter.py | 2 -- M electrum/lnworker.py | 72 ++++++++++++------------------- 4 files changed, 82 insertions(+), 62 deletions(-) --- DIR diff --git a/electrum/gui/qt/lightning_dialog.py b/electrum/gui/qt/lightning_dialog.py t@@ -78,8 +78,11 @@ class LightningDialog(QDialog): # channel_db network_w = QWidget() network_vbox = QVBoxLayout(network_w) + self.num_peers = QLabel('') + network_vbox.addWidget(self.num_peers) self.status = QLabel('') network_vbox.addWidget(self.status) + network_vbox.addStretch(1) # local local_w = QWidget() vbox_local = QVBoxLayout(local_w) t@@ -105,14 +108,17 @@ class LightningDialog(QDialog): b.clicked.connect(self.on_close) vbox.addLayout(Buttons(b)) self.watcher_list.update() - self.gui_object.timer.timeout.connect(self.update_status) + self.network.register_callback(self.update_status, ['ln_status']) - def update_status(self): + def update_status(self, event): if self.network.lngossip is None: return channel_db = self.network.channel_db num_peers = sum([p.initialized.is_set() for p in self.network.lngossip.peers.values()]) - msg = _('{} peers, {} nodes, {} channels.').format(num_peers, channel_db.num_nodes, channel_db.num_channels) + self.num_peers.setText(f'{num_peers} peers, {channel_db.num_nodes} nodes') + known = channel_db.num_channels + unknown = len(self.network.lngossip.unknown_ids) + msg = _(f'Channels: {known} of {known + unknown}') self.status.setText(msg) def on_close(self): DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py t@@ -83,7 +83,6 @@ class Peer(Logger): self.recv_commitment_for_ctn_last = defaultdict(lambda: None) # type: Dict[Channel, Optional[int]] self._local_changed_events = defaultdict(asyncio.Event) self._remote_changed_events = defaultdict(asyncio.Event) - self.receiving_channels = False Logger.__init__(self) def send_message(self, message_name: str, **kwargs): t@@ -197,7 +196,7 @@ class Peer(Logger): try: return await func(self, *args, **kwargs) except Exception as e: - self.logger.info("Disconnecting: {}".format(e)) + self.logger.info("Disconnecting: {}".format(repr(e))) finally: self.close_and_cleanup() return wrapper_func t@@ -207,8 +206,47 @@ class Peer(Logger): async def main_loop(self): async with aiorpcx.TaskGroup() as group: await group.spawn(self._message_loop()) - # kill group if the peer times out - await group.spawn(asyncio.wait_for(self.initialized.wait(), 10)) + await group.spawn(self._run_gossip()) + + @log_exceptions + async def _run_gossip(self): + await asyncio.wait_for(self.initialized.wait(), 5) + if self.lnworker == self.lnworker.network.lngossip: + ids, complete = await asyncio.wait_for(self.get_channel_range(), 10) + self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete)) + self.lnworker.add_new_ids(ids) + while True: + todo = self.lnworker.get_ids_to_query() + if not todo: + await asyncio.sleep(1) + continue + await self.querying_lock.acquire() + self.logger.info(f'Querying {len(todo)} short_channel_ids') + self.query_short_channel_ids(todo) + + async def get_channel_range(self): + req_index = self.lnworker.first_block + req_num = self.lnworker.network.get_local_height() - req_index + self.query_channel_range(req_index, req_num) + intervals = [] + ids = set() + while True: + index, num, complete, _ids = await self.reply_channel_range.get() + ids.update(_ids) + intervals.append((index, index+num)) + intervals.sort() + while len(intervals) > 1: + a,b = intervals[0] + c,d = intervals[1] + if b == c: + intervals = [(a,d)] + intervals[2:] + else: + break + if len(intervals) == 1: + a, b = intervals[0] + if a <= req_index and b >= req_index + req_num: + break + return ids, complete def request_gossip(self, timestamp=0): if timestamp == 0: t@@ -222,7 +260,7 @@ class Peer(Logger): timestamp_range=b'\xff'*4) def query_channel_range(self, index, num): - self.logger.info(f'query channel range') + self.logger.info(f'query channel range {index} {num}') self.send_message( 'query_channel_range', chain_hash=constants.net.rev_genesis_bytes(), t@@ -250,9 +288,7 @@ class Peer(Logger): ids = self.decode_short_ids(encoded) self.reply_channel_range.put_nowait((first, num, complete, ids)) - async def query_short_channel_ids(self, ids, compressed=True): - await self.querying_lock.acquire() - #self.logger.info('querying {} short_channel_ids'.format(len(ids))) + def query_short_channel_ids(self, ids, compressed=True): s = b''.join(ids) encoded = zlib.compress(s) if compressed else s prefix = b'\x01' if compressed else b'\x00' t@@ -282,11 +318,7 @@ class Peer(Logger): self.transport.close() except: pass - for chan in self.channels.values(): - if chan.get_state() != 'FORCE_CLOSING': - chan.set_state('DISCONNECTED') - self.network.trigger_callback('channel', chan) - self.lnworker.peers.pop(self.pubkey) + self.lnworker.peer_closed(self) def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig: # key derivation DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py t@@ -359,7 +359,6 @@ class ChannelDB(SqlDB): self.DBSession.commit() self._update_counts() self.logger.info('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads))) - self.network.trigger_callback('ln_status') @sql def get_last_timestamp(self): t@@ -457,7 +456,6 @@ class ChannelDB(SqlDB): self.DBSession.add(new_addr) self.DBSession.commit() self._update_counts() - self.network.trigger_callback('ln_status') def get_routing_policy_for_channel(self, start_node_id: bytes, short_channel_id: bytes) -> Optional[bytes]: DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -101,6 +101,7 @@ class LNWorker(Logger): self.network.trigger_callback('ln_status') await asyncio.start_server(cb, addr, int(port)) + @log_exceptions async def main_loop(self): while True: await asyncio.sleep(1) t@@ -165,7 +166,7 @@ class LNWorker(Logger): host, port = self.choose_preferred_address(addrs) peer = LNPeerAddr(host, port, bytes.fromhex(node.node_id)) if peer in self._last_tried_peer: continue - self.logger.info('taking random ln peer from our channel db') + #self.logger.info('taking random ln peer from our channel db') return [peer] # TODO remove this. For some reason the dns seeds seem to ignore the realm byte t@@ -237,62 +238,38 @@ class LNGossip(LNWorker): node = BIP32Node.from_rootseed(seed, xtype='standard') xprv = node.to_xprv() super().__init__(xprv) + self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_OPT self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_REQ + self.unknown_ids = set() def start_network(self, network: 'Network'): super().start_network(network) asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.gossip_task()), self.network.asyncio_loop) - async def gossip_task(self): - req_index = self.first_block - req_num = self.network.get_local_height() - req_index - while len(self.peers) == 0: - await asyncio.sleep(1) - continue - # todo: parallelize over peers - peer = list(self.peers.values())[0] - await peer.initialized.wait() - # send channels_range query. peer will reply with several intervals - peer.query_channel_range(req_index, req_num) - intervals = [] - ids = set() - # wait until requested range is covered - while True: - index, num, complete, _ids = await peer.reply_channel_range.get() - ids.update(_ids) - intervals.append((index, index+num)) - intervals.sort() - while len(intervals) > 1: - a,b = intervals[0] - c,d = intervals[1] - if b == c: - intervals = [(a,d)] + intervals[2:] - else: - break - if len(intervals) == 1: - a, b = intervals[0] - if a <= req_index and b >= req_index + req_num: - break - self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete)) - # TODO: filter results by date of last channel update, purge DB + def add_new_ids(self, ids): #if complete: # self.channel_db.purge_unknown_channels(ids) known = self.channel_db.compare_channels(ids) - unknown = list(ids - set(known)) - total = len(unknown) - N = 500 - while unknown: - self.channel_db.process_gossip() - await peer.query_short_channel_ids(unknown[0:N]) - unknown = unknown[N:] - self.logger.info(f'Querying channels: {total - len(unknown)}/{total}. Count: {self.channel_db.num_channels}') + new = ids - set(known) + self.unknown_ids.update(new) - # request gossip fromm current time - now = int(time.time()) - peer.request_gossip(now) + def get_ids_to_query(self): + N = 250 + l = list(self.unknown_ids) + self.unknown_ids = set(l[N:]) + return l[0:N] + + async def gossip_task(self): while True: await asyncio.sleep(5) self.channel_db.process_gossip() + known = self.channel_db.num_channels + unknown = len(self.unknown_ids) + self.logger.info(f'Channels: {known} of {known+unknown}') + self.network.trigger_callback('ln_status') + + def peer_closed(self, peer): + self.peers.pop(peer.pubkey) class LNWallet(LNWorker): t@@ -343,6 +320,13 @@ class LNWallet(LNWorker): ]: asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop) + def peer_closed(self, peer): + for chan in self.channels_for_peer(peer.pubkey).values(): + if chan.get_state() != 'FORCE_CLOSING': + chan.set_state('DISCONNECTED') + self.network.trigger_callback('channel', chan) + self.peers.pop(peer.pubkey) + def payment_completed(self, chan: Channel, direction: Direction, htlc: UpdateAddHtlc): chan_id = chan.channel_id