URI: 
       tuse a single queue for gossip messages, so that they are processed in the correct order - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 308dc6aa6b2ae94d142374720ca0f4fdd200baa2
   DIR parent e68f318b12775fc26e7e3a3a45605bc5a1311f4c
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Wed, 15 May 2019 12:30:19 +0200
       
       use a single queue for gossip messages, so that they are processed in the correct order
       
       Diffstat:
         M electrum/lnpeer.py                  |      71 ++++++++++++++++---------------
         M electrum/lnrouter.py                |      16 ++--------------
         M electrum/lnworker.py                |      25 ++++++++++++++++++++++---
       
       3 files changed, 60 insertions(+), 52 deletions(-)
       ---
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -69,10 +69,8 @@ class Peer(Logger):
                self.channel_db = lnworker.network.channel_db
                self.ping_time = 0
                self.reply_channel_range = asyncio.Queue()
       -        # gossip message queues
       -        self.channel_announcements = asyncio.Queue()
       -        self.channel_updates = asyncio.Queue()
       -        self.node_announcements = asyncio.Queue()
       +        # gossip uses a single queue to preserve message order
       +        self.gossip_queue = asyncio.Queue()
                # channel messsage queues
                self.shutdown_received = defaultdict(asyncio.Future)
                self.channel_accepted = defaultdict(asyncio.Queue)
       t@@ -181,13 +179,13 @@ class Peer(Logger):
                self.initialized.set()
        
            def on_node_announcement(self, payload):
       -        self.node_announcements.put_nowait(payload)
       +        self.gossip_queue.put_nowait(('node_announcement', payload))
        
            def on_channel_announcement(self, payload):
       -        self.channel_announcements.put_nowait(payload)
       +        self.gossip_queue.put_nowait(('channel_announcement', payload))
        
            def on_channel_update(self, payload):
       -        self.channel_updates.put_nowait(payload)
       +        self.gossip_queue.put_nowait(('channel_update', payload))
        
            def on_announcement_signatures(self, payload):
                channel_id = payload['channel_id']
       t@@ -212,39 +210,42 @@ class Peer(Logger):
            async def main_loop(self):
                async with aiorpcx.TaskGroup() as group:
                    await group.spawn(self._message_loop())
       -            await group.spawn(self._run_gossip())
       -            await group.spawn(self.verify_node_announcements())
       -            await group.spawn(self.verify_channel_announcements())
       -            await group.spawn(self.verify_channel_updates())
       +            await group.spawn(self.query_gossip())
       +            await group.spawn(self.process_gossip())
        
       -    async def verify_node_announcements(self):
       +    @log_exceptions
       +    async def process_gossip(self):
       +        # verify in peer's TaskGroup so that we fail the connection
       +        # forward to channel_db.gossip_queue
                while True:
       -            payload = await self.node_announcements.get()
       -            pubkey = payload['node_id']
       -            signature = payload['signature']
       -            h = sha256d(payload['raw'][66:])
       -            if not ecc.verify_signature(pubkey, signature, h):
       +            name, payload = await self.gossip_queue.get()
       +            if name == 'node_announcement':
       +                self.verify_node_announcement(payload)
       +            elif name == 'channel_announcement':
       +                self.verify_channel_announcement(payload)
       +            elif name == 'channel_update':
       +                pass
       +            else:
       +                raise Exception('unknown message')
       +            self.channel_db.gossip_queue.put_nowait((name, payload))
       +
       +    def verify_node_announcement(self, payload):
       +        pubkey = payload['node_id']
       +        signature = payload['signature']
       +        h = sha256d(payload['raw'][66:])
       +        if not ecc.verify_signature(pubkey, signature, h):
       +            raise Exception('signature failed')
       +
       +    def verify_channel_announcement(self, payload):
       +        h = sha256d(payload['raw'][2+256:])
       +        pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
       +        sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
       +        for pubkey, sig in zip(pubkeys, sigs):
       +            if not ecc.verify_signature(pubkey, sig, h):
                        raise Exception('signature failed')
       -            self.channel_db.node_anns.append(payload)
       -
       -    async def verify_channel_announcements(self):
       -        while True:
       -            payload = await self.channel_announcements.get()
       -            h = sha256d(payload['raw'][2+256:])
       -            pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
       -            sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
       -            for pubkey, sig in zip(pubkeys, sigs):
       -                if not ecc.verify_signature(pubkey, sig, h):
       -                    raise Exception('signature failed')
       -            self.channel_db.chan_anns.append(payload)
       -
       -    async def verify_channel_updates(self):
       -        while True:
       -            payload = await self.channel_updates.get()
       -            self.channel_db.chan_upds.append(payload)
        
            @log_exceptions
       -    async def _run_gossip(self):
       +    async def query_gossip(self):
                await asyncio.wait_for(self.initialized.wait(), 10)
                if self.lnworker == self.lnworker.network.lngossip:
                    ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
   DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
       t@@ -35,6 +35,7 @@ from collections import defaultdict
        from typing import Sequence, List, Tuple, Optional, Dict, NamedTuple, TYPE_CHECKING, Set
        import binascii
        import base64
       +import asyncio
        
        from sqlalchemy import Column, ForeignKey, Integer, String, Boolean
        from sqlalchemy.orm.query import Query
       t@@ -223,20 +224,7 @@ class ChannelDB(SqlDB):
                self._channel_updates_for_private_channels = {}  # type: Dict[Tuple[bytes, bytes], dict]
                self.ca_verifier = LNChannelVerifier(network, self)
                self.update_counts()
       -        self.node_anns = []
       -        self.chan_anns = []
       -        self.chan_upds = []
       -
       -    def process_gossip(self):
       -        if self.chan_anns:
       -            self.on_channel_announcement(self.chan_anns)
       -            self.chan_anns = []
       -        if self.chan_upds:
       -            self.on_channel_update(self.chan_upds)
       -            self.chan_upds = []
       -        if self.node_anns:
       -            self.on_node_announcement(self.node_anns)
       -            self.node_anns = []
       +        self.gossip_queue = asyncio.Queue()
        
            @sql
            def update_counts(self):
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -244,7 +244,7 @@ class LNGossip(LNWorker):
        
            def start_network(self, network: 'Network'):
                super().start_network(network)
       -        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.gossip_task()), self.network.asyncio_loop)
       +        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.process_gossip()), self.network.asyncio_loop)
        
            def add_new_ids(self, ids):
                #if complete:
       t@@ -259,10 +259,29 @@ class LNGossip(LNWorker):
                self.unknown_ids = set(l[N:])
                return l[0:N]
        
       -    async def gossip_task(self):
       +    @log_exceptions
       +    async def process_gossip(self):
                while True:
                    await asyncio.sleep(5)
       -            self.channel_db.process_gossip()
       +            chan_anns = []
       +            chan_upds = []
       +            node_anns = []
       +            while True:
       +                name, payload = await self.channel_db.gossip_queue.get()
       +                if name == 'channel_announcement':
       +                    chan_anns.append(payload)
       +                elif name == 'channel_update':
       +                    chan_upds.append(payload)
       +                elif name == 'node_announcement':
       +                    node_anns.append(payload)
       +                else:
       +                    raise Exception('unknown message')
       +                if self.channel_db.gossip_queue.empty():
       +                    break
       +            self.channel_db.on_channel_announcement(chan_anns)
       +            self.channel_db.on_channel_update(chan_upds)
       +            self.channel_db.on_node_announcement(node_anns)
       +            # refresh gui
                    known = self.channel_db.num_channels
                    unknown = len(self.unknown_ids)
                    self.logger.info(f'Channels: {known} of {known+unknown}')