tuse a single queue for gossip messages, so that they are processed in the correct order - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 308dc6aa6b2ae94d142374720ca0f4fdd200baa2 DIR parent e68f318b12775fc26e7e3a3a45605bc5a1311f4c HTML Author: ThomasV <thomasv@electrum.org> Date: Wed, 15 May 2019 12:30:19 +0200 use a single queue for gossip messages, so that they are processed in the correct order Diffstat: M electrum/lnpeer.py | 71 ++++++++++++++++--------------- M electrum/lnrouter.py | 16 ++-------------- M electrum/lnworker.py | 25 ++++++++++++++++++++++--- 3 files changed, 60 insertions(+), 52 deletions(-) --- DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py t@@ -69,10 +69,8 @@ class Peer(Logger): self.channel_db = lnworker.network.channel_db self.ping_time = 0 self.reply_channel_range = asyncio.Queue() - # gossip message queues - self.channel_announcements = asyncio.Queue() - self.channel_updates = asyncio.Queue() - self.node_announcements = asyncio.Queue() + # gossip uses a single queue to preserve message order + self.gossip_queue = asyncio.Queue() # channel messsage queues self.shutdown_received = defaultdict(asyncio.Future) self.channel_accepted = defaultdict(asyncio.Queue) t@@ -181,13 +179,13 @@ class Peer(Logger): self.initialized.set() def on_node_announcement(self, payload): - self.node_announcements.put_nowait(payload) + self.gossip_queue.put_nowait(('node_announcement', payload)) def on_channel_announcement(self, payload): - self.channel_announcements.put_nowait(payload) + self.gossip_queue.put_nowait(('channel_announcement', payload)) def on_channel_update(self, payload): - self.channel_updates.put_nowait(payload) + self.gossip_queue.put_nowait(('channel_update', payload)) def on_announcement_signatures(self, payload): channel_id = payload['channel_id'] t@@ -212,39 +210,42 @@ class Peer(Logger): async def main_loop(self): async with aiorpcx.TaskGroup() as group: await group.spawn(self._message_loop()) - await group.spawn(self._run_gossip()) - await group.spawn(self.verify_node_announcements()) - await group.spawn(self.verify_channel_announcements()) - await group.spawn(self.verify_channel_updates()) + await group.spawn(self.query_gossip()) + await group.spawn(self.process_gossip()) - async def verify_node_announcements(self): + @log_exceptions + async def process_gossip(self): + # verify in peer's TaskGroup so that we fail the connection + # forward to channel_db.gossip_queue while True: - payload = await self.node_announcements.get() - pubkey = payload['node_id'] - signature = payload['signature'] - h = sha256d(payload['raw'][66:]) - if not ecc.verify_signature(pubkey, signature, h): + name, payload = await self.gossip_queue.get() + if name == 'node_announcement': + self.verify_node_announcement(payload) + elif name == 'channel_announcement': + self.verify_channel_announcement(payload) + elif name == 'channel_update': + pass + else: + raise Exception('unknown message') + self.channel_db.gossip_queue.put_nowait((name, payload)) + + def verify_node_announcement(self, payload): + pubkey = payload['node_id'] + signature = payload['signature'] + h = sha256d(payload['raw'][66:]) + if not ecc.verify_signature(pubkey, signature, h): + raise Exception('signature failed') + + def verify_channel_announcement(self, payload): + h = sha256d(payload['raw'][2+256:]) + pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']] + sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']] + for pubkey, sig in zip(pubkeys, sigs): + if not ecc.verify_signature(pubkey, sig, h): raise Exception('signature failed') - self.channel_db.node_anns.append(payload) - - async def verify_channel_announcements(self): - while True: - payload = await self.channel_announcements.get() - h = sha256d(payload['raw'][2+256:]) - pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']] - sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']] - for pubkey, sig in zip(pubkeys, sigs): - if not ecc.verify_signature(pubkey, sig, h): - raise Exception('signature failed') - self.channel_db.chan_anns.append(payload) - - async def verify_channel_updates(self): - while True: - payload = await self.channel_updates.get() - self.channel_db.chan_upds.append(payload) @log_exceptions - async def _run_gossip(self): + async def query_gossip(self): await asyncio.wait_for(self.initialized.wait(), 10) if self.lnworker == self.lnworker.network.lngossip: ids, complete = await asyncio.wait_for(self.get_channel_range(), 10) DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py t@@ -35,6 +35,7 @@ from collections import defaultdict from typing import Sequence, List, Tuple, Optional, Dict, NamedTuple, TYPE_CHECKING, Set import binascii import base64 +import asyncio from sqlalchemy import Column, ForeignKey, Integer, String, Boolean from sqlalchemy.orm.query import Query t@@ -223,20 +224,7 @@ class ChannelDB(SqlDB): self._channel_updates_for_private_channels = {} # type: Dict[Tuple[bytes, bytes], dict] self.ca_verifier = LNChannelVerifier(network, self) self.update_counts() - self.node_anns = [] - self.chan_anns = [] - self.chan_upds = [] - - def process_gossip(self): - if self.chan_anns: - self.on_channel_announcement(self.chan_anns) - self.chan_anns = [] - if self.chan_upds: - self.on_channel_update(self.chan_upds) - self.chan_upds = [] - if self.node_anns: - self.on_node_announcement(self.node_anns) - self.node_anns = [] + self.gossip_queue = asyncio.Queue() @sql def update_counts(self): DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -244,7 +244,7 @@ class LNGossip(LNWorker): def start_network(self, network: 'Network'): super().start_network(network) - asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.gossip_task()), self.network.asyncio_loop) + asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.process_gossip()), self.network.asyncio_loop) def add_new_ids(self, ids): #if complete: t@@ -259,10 +259,29 @@ class LNGossip(LNWorker): self.unknown_ids = set(l[N:]) return l[0:N] - async def gossip_task(self): + @log_exceptions + async def process_gossip(self): while True: await asyncio.sleep(5) - self.channel_db.process_gossip() + chan_anns = [] + chan_upds = [] + node_anns = [] + while True: + name, payload = await self.channel_db.gossip_queue.get() + if name == 'channel_announcement': + chan_anns.append(payload) + elif name == 'channel_update': + chan_upds.append(payload) + elif name == 'node_announcement': + node_anns.append(payload) + else: + raise Exception('unknown message') + if self.channel_db.gossip_queue.empty(): + break + self.channel_db.on_channel_announcement(chan_anns) + self.channel_db.on_channel_update(chan_upds) + self.channel_db.on_node_announcement(node_anns) + # refresh gui known = self.channel_db.num_channels unknown = len(self.unknown_ids) self.logger.info(f'Channels: {known} of {known+unknown}')