URI: 
       tRefactor LNPeer in order to support HTLC forwarding: 1. Do not perform channel updates in coroutines, because they would get executed in random order. 2. After applying channel updates, wait only for the relevant commitment (local or remote) and not for both, because local and remote might be out of sync (BOLT 2). 3. When waiting for a commitment, wait until a given ctn has been reached, because a queue cannot be shared by several coroutines - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit a40207cbbbee56c54875ebe3a27780878cbb9def
   DIR parent 50b4f785a9b6d212575767cbb050dd4318f9903b
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Mon,  4 Feb 2019 12:37:30 +0100
       
       Refactor LNPeer in order to support HTLC forwarding:
        1. Do not perform channel updates in coroutines, because they would get executed in random order.
        2. After applying channel updates, wait only for the relevant commitment (local or remote) and not for both, because local and remote might be out of sync (BOLT 2).
        3. When waiting for a commitment, wait until a given ctn has been reached, because a queue cannot be shared by several coroutines
       
       Diffstat:
         M electrum/lnchannel.py               |       3 +++
         M electrum/lnpeer.py                  |     116 +++++++++++++++++--------------
       
       2 files changed, 67 insertions(+), 52 deletions(-)
       ---
   DIR diff --git a/electrum/lnchannel.py b/electrum/lnchannel.py
       t@@ -594,6 +594,9 @@ class Channel(PrintError):
                feerate = self.constraints.feerate
                return self.make_commitment(subject, this_point, ctn, feerate, False)
        
       +    def get_current_ctn(self, subject):
       +        return self.config[subject].ctn
       +
            def total_msat(self, direction):
                assert type(direction) is Direction
                sub = LOCAL if direction == SENT else REMOTE
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -68,8 +68,6 @@ class Peer(PrintError):
                self.channel_reestablished = defaultdict(asyncio.Future)
                self.funding_signed = defaultdict(asyncio.Queue)
                self.funding_created = defaultdict(asyncio.Queue)
       -        self.revoke_and_ack = defaultdict(asyncio.Queue)
       -        self.commitment_signed = defaultdict(asyncio.Queue)
                self.announcement_signatures = defaultdict(asyncio.Queue)
                self.closing_signed = defaultdict(asyncio.Queue)
                self.payment_preimages = defaultdict(asyncio.Queue)
       t@@ -79,10 +77,11 @@ class Peer(PrintError):
                self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ
                self.attempted_route = {}
                self.orphan_channel_updates = OrderedDict()
       +        self.pending_updates = defaultdict(bool)
        
            def send_message(self, message_name: str, **kwargs):
                assert type(message_name) is str
       -        self.print_error("Sending '%s'"%message_name.upper())
       +        #self.print_error("Sending '%s'"%message_name.upper())
                self.transport.send_bytes(encode_msg(message_name, **kwargs))
        
            async def initialize(self):
       t@@ -108,7 +107,7 @@ class Peer(PrintError):
                try:
                    f = getattr(self, 'on_' + message_type)
                except AttributeError:
       -            self.print_error("Received '%s'" % message_type.upper(), payload)
       +            #self.print_error("Received '%s'" % message_type.upper(), payload)
                    return
                # raw message is needed to check signature
                if message_type=='node_announcement':
       t@@ -122,7 +121,7 @@ class Peer(PrintError):
                self.print_error("error", payload["data"].decode("ascii"))
                chan_id = payload.get("channel_id")
                for d in [ self.channel_accepted, self.funding_signed,
       -                   self.funding_created, self.revoke_and_ack, self.commitment_signed,
       +                   self.funding_created,
                           self.announcement_signatures, self.closing_signed ]:
                    if chan_id in d:
                        d[chan_id].put_nowait({'error':payload['data']})
       t@@ -749,8 +748,7 @@ class Peer(PrintError):
        
                return h, node_signature, bitcoin_signature
        
       -    @log_exceptions
       -    async def on_update_fail_htlc(self, payload):
       +    def on_update_fail_htlc(self, payload):
                channel_id = payload["channel_id"]
                htlc_id = int.from_bytes(payload["id"], "big")
                key = (channel_id, htlc_id)
       t@@ -762,7 +760,7 @@ class Peer(PrintError):
                    self.print_error("UPDATE_FAIL_HTLC. cannot decode! attempted route is MISSING. {}".format(key))
                else:
                    try:
       -                await self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id)
       +                self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id)
                    except Exception:
                        # exceptions are suppressed as failing to handle an error code
                        # should not block us from removing the htlc
       t@@ -770,10 +768,15 @@ class Peer(PrintError):
                # process update_fail_htlc on channel
                chan = self.channels[channel_id]
                chan.receive_fail_htlc(htlc_id)
       -        await self.receive_and_revoke(chan)
       +        local_ctn = chan.get_current_ctn(LOCAL)
       +        asyncio.ensure_future(self._on_update_fail_htlc(chan, htlc_id, local_ctn))
       +
       +    @log_exceptions
       +    async def _on_update_fail_htlc(self, chan, htlc_id, local_ctn):
       +        await self.await_local(chan, local_ctn)
                self.network.trigger_callback('ln_message', self.lnworker, 'Payment failed', htlc_id)
        
       -    async def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id):
       +    def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id):
                chan = self.channels[channel_id]
                failure_msg, sender_idx = decode_onion_error(error_reason,
                                                             [x.node_id for x in route],
       t@@ -814,23 +817,22 @@ class Peer(PrintError):
                    else:
                        self.network.path_finder.blacklist.add(short_chan_id)
        
       -    def send_commitment(self, chan: Channel):
       +    def maybe_send_commitment(self, chan: Channel):
       +        if not self.pending_updates[chan]:
       +            return
       +        self.print_error('send_commitment')
                sig_64, htlc_sigs = chan.sign_next_commitment()
                self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
       -        return len(htlc_sigs)
       +        self.pending_updates[chan] = False
        
       -    async def send_and_revoke(self, chan: Channel):
       -        """ generic channel update flow """
       -        self.send_commitment(chan)
       -        await self.receive_revoke_and_ack(chan)
       -        await self.receive_commitment(chan)
       -        self.send_revoke_and_ack(chan)
       +    async def await_remote(self, chan: Channel, ctn: int):
       +        self.maybe_send_commitment(chan)
       +        while chan.get_current_ctn(REMOTE) <= ctn:
       +            await asyncio.sleep(0.1)
        
       -    async def receive_and_revoke(self, chan: Channel):
       -        await self.receive_commitment(chan)
       -        self.send_revoke_and_ack(chan)
       -        self.send_commitment(chan)
       -        await self.receive_revoke_and_ack(chan)
       +    async def await_local(self, chan: Channel, ctn: int):
       +        while chan.get_current_ctn(LOCAL) <= ctn:
       +            await asyncio.sleep(0.1)
        
            async def pay(self, route: List['RouteEdge'], chan: Channel, amount_msat: int,
                          payment_hash: bytes, min_final_cltv_expiry: int):
       t@@ -845,6 +847,7 @@ class Peer(PrintError):
                # create htlc
                htlc = {'amount_msat':amount_msat, 'payment_hash':payment_hash, 'cltv_expiry':cltv}
                htlc_id = chan.add_htlc(htlc)
       +        remote_ctn = chan.get_current_ctn(REMOTE)
                chan.onion_keys[htlc_id] = secret_key
                self.attempted_route[(chan.channel_id, htlc_id)] = route
                self.print_error(f"starting payment. route: {route}")
       t@@ -855,14 +858,10 @@ class Peer(PrintError):
                                  amount_msat=amount_msat,
                                  payment_hash=payment_hash,
                                  onion_routing_packet=onion.to_bytes())
       -        await self.send_and_revoke(chan)
       +        self.pending_updates[chan] = True
       +        await self.await_remote(chan, remote_ctn)
                return UpdateAddHtlc(**htlc, htlc_id=htlc_id)
        
       -    async def receive_revoke_and_ack(self, chan: Channel):
       -        revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get()
       -        chan.receive_revocation(RevokeAndAck(revoke_and_ack_msg["per_commitment_secret"], revoke_and_ack_msg["next_per_commitment_point"]))
       -        self.lnworker.save_channel(chan)
       -
            def send_revoke_and_ack(self, chan: Channel):
                rev, _ = chan.revoke_current_commitment()
                self.lnworker.save_channel(chan)
       t@@ -871,36 +870,34 @@ class Peer(PrintError):
                    per_commitment_secret=rev.per_commitment_secret,
                    next_per_commitment_point=rev.next_per_commitment_point)
        
       -    async def receive_commitment(self, chan: Channel, commitment_signed_msg=None):
       -        if commitment_signed_msg is None:
       -            commitment_signed_msg = await self.commitment_signed[chan.channel_id].get()
       -        data = commitment_signed_msg["htlc_signature"]
       -        htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)]
       -        chan.receive_new_commitment(commitment_signed_msg["signature"], htlc_sigs)
       -        return len(htlc_sigs)
       -
            def on_commitment_signed(self, payload):
       -        self.print_error("commitment_signed", payload)
       +        self.print_error("on_commitment_signed")
                channel_id = payload['channel_id']
       -        self.commitment_signed[channel_id].put_nowait(payload)
       +        chan = self.channels[channel_id]
       +        data = payload["htlc_signature"]
       +        htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)]
       +        chan.receive_new_commitment(payload["signature"], htlc_sigs)
       +        self.send_revoke_and_ack(chan)
        
       -    @log_exceptions
       -    async def on_update_fulfill_htlc(self, update_fulfill_htlc_msg):
       +    def on_update_fulfill_htlc(self, update_fulfill_htlc_msg):
                self.print_error("update_fulfill")
                chan = self.channels[update_fulfill_htlc_msg["channel_id"]]
                preimage = update_fulfill_htlc_msg["payment_preimage"]
                htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big")
                chan.receive_htlc_settle(preimage, htlc_id)
       -        await self.receive_and_revoke(chan)
       +        local_ctn = chan.get_current_ctn(LOCAL)
       +        asyncio.ensure_future(self._on_update_fulfill_htlc(chan, htlc_id, preimage, local_ctn))
       +
       +    @log_exceptions
       +    async def _on_update_fulfill_htlc(self, chan, htlc_id, preimage, local_ctn):
       +        await self.await_local(chan, local_ctn)
                self.network.trigger_callback('ln_message', self.lnworker, 'Payment sent', htlc_id)
       -        # used in lightning-integration
                self.payment_preimages[sha256(preimage)].put_nowait(preimage)
        
            def on_update_fail_malformed_htlc(self, payload):
                self.print_error("error", payload["data"].decode("ascii"))
        
       -    @log_exceptions
       -    async def on_update_add_htlc(self, payload):
       +    def on_update_add_htlc(self, payload):
                # no onion routing for the moment: we assume we are the end node
                self.print_error('on_update_add_htlc')
                # check if this in our list of requests
       t@@ -919,7 +916,12 @@ class Peer(PrintError):
                # add htlc
                htlc = {'amount_msat': amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry}
                htlc_id = chan.receive_htlc(htlc)
       -        await self.receive_and_revoke(chan)
       +        local_ctn = chan.get_current_ctn(LOCAL)
       +        asyncio.ensure_future(self._on_update_add_htlc(chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion))
       +
       +    @log_exceptions
       +    async def _on_update_add_htlc(self, chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion):
       +        await self.await_local(chan, local_ctn)
                # Forward HTLC
                # FIXME: this is not robust to us going offline before payment is fulfilled
                if not processed_onion.are_we_final:
       t@@ -936,6 +938,7 @@ class Peer(PrintError):
                    next_amount_msat_htlc = int.from_bytes(dph.amt_to_forward, 'big')
                    next_htlc = {'amount_msat':next_amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':next_cltv_expiry}
                    next_htlc_id = next_chan.add_htlc(next_htlc)
       +            next_remote_ctn = next_chan.get_current_ctn(REMOTE)
                    next_peer.send_message(
                        "update_add_htlc",
                        channel_id=next_chan.channel_id,
       t@@ -945,7 +948,8 @@ class Peer(PrintError):
                        payment_hash=payment_hash,
                        onion_routing_packet=processed_onion.next_packet.to_bytes()
                    )
       -            await next_peer.send_and_revoke(next_chan)
       +            next_peer.pending_updates[next_chan] = True
       +            await next_peer.await_remote(next_chan, next_remote_ctn)
                    # wait until we get paid
                    preimage = await next_peer.payment_preimages[payment_hash].get()
                    # fulfill the original htlc
       t@@ -989,29 +993,35 @@ class Peer(PrintError):
        
            async def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
                chan.settle_htlc(preimage, htlc_id)
       +        remote_ctn = chan.get_current_ctn(REMOTE)
                self.send_message("update_fulfill_htlc",
                                  channel_id=chan.channel_id,
                                  id=htlc_id,
                                  payment_preimage=preimage)
       -        await self.send_and_revoke(chan)
       +        self.pending_updates[chan] = True
       +        await self.await_remote(chan, remote_ctn)
                self.network.trigger_callback('ln_message', self.lnworker, 'Payment received', htlc_id)
        
            async def fail_htlc(self, chan: Channel, htlc_id: int, onion_packet: OnionPacket,
                                reason: OnionRoutingFailureMessage):
                self.print_error(f"failing received htlc {(bh2u(chan.channel_id), htlc_id)}. reason: {reason}")
                chan.fail_htlc(htlc_id)
       +        remote_ctn = chan.get_current_ctn(REMOTE)
                error_packet = construct_onion_error(reason, onion_packet, our_onion_private_key=self.privkey)
                self.send_message("update_fail_htlc",
                                  channel_id=chan.channel_id,
                                  id=htlc_id,
                                  len=len(error_packet),
                                  reason=error_packet)
       -        await self.send_and_revoke(chan)
       +        self.pending_updates[chan] = True
       +        await self.await_remote(chan, remote_ctn)
        
            def on_revoke_and_ack(self, payload):
       -        self.print_error("got revoke_and_ack")
       +        self.print_error("on_revoke_and_ack")
                channel_id = payload["channel_id"]
       -        self.revoke_and_ack[channel_id].put_nowait(payload)
       +        chan = self.channels[channel_id]
       +        chan.receive_revocation(RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"]))
       +        self.lnworker.save_channel(chan)
        
            def on_update_fee(self, payload):
                channel_id = payload["channel_id"]
       t@@ -1036,10 +1046,12 @@ class Peer(PrintError):
                else:
                    return
                chan.update_fee(feerate_per_kw, True)
       +        remote_ctn = chan.get_current_ctn(REMOTE)
                self.send_message("update_fee",
                                  channel_id=chan.channel_id,
                                  feerate_per_kw=feerate_per_kw)
       -        await self.send_and_revoke(chan)
       +        self.pending_updates[chan] = True
       +        await self.await_remote(chan, remote_ctn)
        
            def on_closing_signed(self, payload):
                chan_id = payload["channel_id"]