URI: 
       tlnpeer: Use a single queue per channel for messages that are ordered. Forward error messages with 'temporary_channel_id' to the correct channel_id - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit ac884eb3c34a42bd93aebc966a7dd0d764057f76
   DIR parent 28c5825f415ba115f57dbf060743734013a8ac07
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Sun, 23 Feb 2020 17:18:45 +0100
       
       lnpeer: Use a single queue per channel for messages that are ordered.
       Forward error messages with 'temporary_channel_id' to the correct channel_id
       
       Diffstat:
         M electrum/lnpeer.py                  |      94 ++++++++++++-------------------
       
       1 file changed, 35 insertions(+), 59 deletions(-)
       ---
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -82,15 +82,11 @@ class Peer(Logger):
                self.reply_channel_range = asyncio.Queue()
                # gossip uses a single queue to preserve message order
                self.gossip_queue = asyncio.Queue()
       -        # channel messsage queues
       +        self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
       +        self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
       +        self.temp_id_to_id = {}   # to forward error messages
                self.shutdown_received = defaultdict(asyncio.Future)
       -        self.channel_accepted = defaultdict(asyncio.Queue)
       -        self.channel_reestablished = defaultdict(asyncio.Queue)
       -        self.funding_signed = defaultdict(asyncio.Queue)
       -        self.funding_created = defaultdict(asyncio.Queue)
                self.announcement_signatures = defaultdict(asyncio.Queue)
       -        self.closing_signed = defaultdict(asyncio.Queue)
       -        #
                self.orphan_channel_updates = OrderedDict()
                self._local_changed_events = defaultdict(asyncio.Event)
                self._remote_changed_events = defaultdict(asyncio.Event)
       t@@ -147,26 +143,28 @@ class Peer(Logger):
        
            def process_message(self, message):
                message_type, payload = decode_msg(message)
       -        try:
       -            f = getattr(self, 'on_' + message_type)
       -        except AttributeError:
       -            #self.logger.info("Received '%s'" % message_type.upper(), payload)
       -            return
       -        # raw message is needed to check signature
       -        if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
       -            payload['raw'] = message
       -        execution_result = f(payload)
       -        if asyncio.iscoroutinefunction(f):
       -            asyncio.ensure_future(execution_result)
       +        if message_type in self.ordered_messages:
       +            chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
       +            self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
       +        else:
       +            try:
       +                f = getattr(self, 'on_' + message_type)
       +            except AttributeError:
       +                #self.logger.info("Received '%s'" % message_type.upper(), payload)
       +                return
       +            # raw message is needed to check signature
       +            if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
       +                payload['raw'] = message
       +            execution_result = f(payload)
       +            if asyncio.iscoroutinefunction(f):
       +                asyncio.ensure_future(execution_result)
        
            def on_error(self, payload):
                self.logger.info(f"on_error: {payload['data'].decode('ascii')}")
                chan_id = payload.get("channel_id")
       -        for d in [ self.channel_accepted, self.funding_signed,
       -                   self.funding_created, self.channel_reestablished,
       -                   self.announcement_signatures, self.closing_signed ]:
       -            if chan_id in d:
       -                d[chan_id].put_nowait({'error':payload['data']})
       +        if chan_id in self.temp_id_to_id:
       +            chan_id = self.temp_id_to_id[chan_id]
       +        self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
        
            def on_ping(self, payload):
                l = int.from_bytes(payload['num_pong_bytes'], 'big')
       t@@ -175,21 +173,14 @@ class Peer(Logger):
            def on_pong(self, payload):
                pass
        
       -    def on_accept_channel(self, payload):
       -        temp_chan_id = payload["temporary_channel_id"]
       -        if temp_chan_id not in self.channel_accepted:
       -            raise Exception("Got unknown accept_channel")
       -        self.channel_accepted[temp_chan_id].put_nowait(payload)
       -
       -    def on_funding_signed(self, payload):
       -        channel_id = payload['channel_id']
       -        if channel_id not in self.funding_signed: raise Exception("Got unknown funding_signed")
       -        self.funding_signed[channel_id].put_nowait(payload)
       -
       -    def on_funding_created(self, payload):
       -        channel_id = payload['temporary_channel_id']
       -        if channel_id not in self.funding_created: raise Exception("Got unknown funding_created")
       -        self.funding_created[channel_id].put_nowait(payload)
       +    async def wait_for_message(self, expected_name, channel_id):
       +        q = self.ordered_message_queues[channel_id]
       +        name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
       +        if payload.get('error'):
       +            raise Exception('Remote peer reported error: ' + repr(payload.get('error')))
       +        if name != expected_name:
       +            raise Exception(f"Received unexpected '{name}'")
       +        return payload
        
            def on_init(self, payload):
                if self._received_init:
       t@@ -533,9 +524,7 @@ class Peer(Logger):
                    channel_reserve_satoshis=local_config.reserve_sat,
                    htlc_minimum_msat=1,
                )
       -        payload = await asyncio.wait_for(self.channel_accepted[temp_channel_id].get(), LN_P2P_NETWORK_TIMEOUT)
       -        if payload.get('error'):
       -            raise Exception('Remote Lightning peer reported error: ' + repr(payload.get('error')))
       +        payload = await self.wait_for_message('accept_channel', temp_channel_id)
                remote_per_commitment_point = payload['first_per_commitment_point']
                funding_txn_minimum_depth = int.from_bytes(payload['minimum_depth'], 'big')
                if funding_txn_minimum_depth <= 0:
       t@@ -601,12 +590,13 @@ class Peer(Logger):
                               lnworker=self.lnworker,
                               initial_feerate=feerate)
                sig_64, _ = chan.sign_next_commitment()
       +        self.temp_id_to_id[temp_channel_id] = channel_id
                self.send_message("funding_created",
                    temporary_channel_id=temp_channel_id,
                    funding_txid=funding_txid_bytes,
                    funding_output_index=funding_index,
                    signature=sig_64)
       -        payload = await asyncio.wait_for(self.funding_signed[channel_id].get(), LN_P2P_NETWORK_TIMEOUT)
       +        payload = await self.wait_for_message('funding_signed', channel_id)
                self.logger.info('received funding_signed')
                remote_sig = payload['signature']
                chan.receive_new_commitment(remote_sig, [])
       t@@ -666,7 +656,7 @@ class Peer(Logger):
                    htlc_basepoint=local_config.htlc_basepoint.pubkey,
                    first_per_commitment_point=per_commitment_point_first,
                )
       -        funding_created = await self.funding_created[temp_chan_id].get()
       +        funding_created = await self.wait_for_message('funding_created', temp_chan_id)
                funding_idx = int.from_bytes(funding_created['funding_output_index'], 'big')
                funding_txid = bh2u(funding_created['funding_txid'][::-1])
                channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
       t@@ -715,14 +705,6 @@ class Peer(Logger):
                    raise Exception(f'reserve too high: {remote_reserve_sat}, funding_sat: {funding_sat}')
                return remote_reserve_sat
        
       -    def on_channel_reestablish(self, payload):
       -        chan_id = payload["channel_id"]
       -        chan = self.channels.get(chan_id)
       -        if not chan:
       -            self.logger.info(f"Received unknown channel_reestablish {bh2u(chan_id)} {payload}")
       -            raise Exception('Unknown channel_reestablish')
       -        self.channel_reestablished[chan_id].put_nowait(payload)
       -
            @log_exceptions
            async def reestablish_channel(self, chan: Channel):
                await self.initialized
       t@@ -765,8 +747,7 @@ class Peer(Logger):
                self.logger.info(f'channel_reestablish: sent channel_reestablish with '
                                 f'(next_local_ctn={next_local_ctn}, '
                                 f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
       -
       -        msg = await self.channel_reestablished[chan_id].get()
       +        msg = await self.wait_for_message('channel_reestablish', chan_id)
                their_next_local_ctn = int.from_bytes(msg["next_local_commitment_number"], 'big')
                their_oldest_unrevoked_remote_ctn = int.from_bytes(msg["next_remote_revocation_number"], 'big')
                their_local_pcp = msg.get("my_current_per_commitment_point")
       t@@ -1356,11 +1337,6 @@ class Peer(Logger):
                                  feerate_per_kw=feerate_per_kw)
                await self.await_remote(chan, remote_ctn)
        
       -    def on_closing_signed(self, payload):
       -        chan_id = payload["channel_id"]
       -        if chan_id not in self.closing_signed: raise Exception("Got unknown closing_signed")
       -        self.closing_signed[chan_id].put_nowait(payload)
       -
            @log_exceptions
            async def close_channel(self, chan_id: bytes):
                chan = self.channels[chan_id]
       t@@ -1404,7 +1380,7 @@ class Peer(Logger):
                    our_sig, closing_tx = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'], fee_sat=our_fee)
                    self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
                    # FIXME: the remote SHOULD send closing_signed, but some don't.
       -            cs_payload = await asyncio.wait_for(self.closing_signed[chan.channel_id].get(), LN_P2P_NETWORK_TIMEOUT)
       +            cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
                    their_fee = int.from_bytes(cs_payload['fee_satoshis'], 'big')
                    their_sig = cs_payload['signature']
                    if our_fee == their_fee: