URI: 
       tavoid reading from queues concurrently in pay() - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 50b3bc939cce67694e7ce0d954f5749e544a38d4
   DIR parent a54631b8738e383f8b5649e8044da66e04afc82e
  HTML Author: Janus <ysangkok@gmail.com>
       Date:   Mon, 24 Sep 2018 20:22:43 +0200
       
       avoid reading from queues concurrently in pay()
       
       Diffstat:
         M electrum/lnbase.py                  |     102 ++++++++++++++-----------------
         M electrum/lnhtlc.py                  |       2 ++
         M electrum/tests/test_lnhtlc.py       |       3 ++-
       
       3 files changed, 49 insertions(+), 58 deletions(-)
       ---
   DIR diff --git a/electrum/lnbase.py b/electrum/lnbase.py
       t@@ -287,10 +287,8 @@ class Peer(PrintError):
                self.channel_reestablished = defaultdict(asyncio.Future)
                self.funding_signed = defaultdict(asyncio.Queue)
                self.revoke_and_ack = defaultdict(asyncio.Queue)
       -        self.update_fulfill_htlc = defaultdict(asyncio.Queue)
                self.commitment_signed = defaultdict(asyncio.Queue)
                self.announcement_signatures = defaultdict(asyncio.Queue)
       -        self.update_fail_htlc = defaultdict(asyncio.Queue)
                self.closing_signed = defaultdict(asyncio.Queue)
                self.localfeatures = (0x08 if request_initial_sync else 0)
                self.invoices = lnworker.invoices
       t@@ -404,7 +402,9 @@ class Peer(PrintError):
                # raw message is needed to check signature
                if message_type=='node_announcement':
                    payload['raw'] = message
       -        f(payload)
       +        execution_result = f(payload)
       +        if asyncio.iscoroutinefunction(f):
       +            asyncio.ensure_future(execution_result)
        
            def on_error(self, payload):
                self.print_error("error", payload["data"].decode("ascii"))
       t@@ -807,12 +807,14 @@ class Peer(PrintError):
        
                return h, node_signature, bitcoin_signature
        
       -    def on_update_fail_htlc(self, payload):
       +    @aiosafe
       +    async def on_update_fail_htlc(self, payload):
                channel_id = payload["channel_id"]
       +        chan = self.channels[channel_id]
                htlc_id = int.from_bytes(payload["id"], "big")
                key = (channel_id, htlc_id)
                route = self.attempted_route[key]
       -        failure_msg, sender_idx = decode_onion_error(payload["reason"], [x.node_id for x in route], self.secret_key)
       +        failure_msg, sender_idx = decode_onion_error(payload["reason"], [x.node_id for x in route], chan.onion_keys[htlc_id])
                code = failure_msg.code
                code_name = ONION_FAILURE_CODE_MAP.get(code, 'unknown_error!!')
                data = failure_msg.data
       t@@ -826,7 +828,18 @@ class Peer(PrintError):
                    # also, we need finer blacklisting (directed edges; nodes)
                    self.network.path_finder.blacklist.add(short_chan_id)
        
       -        self.update_fail_htlc[payload["channel_id"]].put_nowait("HTLC failure with code {} ({})".format(code, code_name))
       +        print("HTLC failure with code {} ({})".format(code, code_name))
       +        chan = self.channels[channel_id]
       +        sig_64, htlc_sigs = chan.sign_next_commitment()
       +        self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)))
       +        await self.receive_revoke(chan)
       +        chan.receive_fail_htlc(htlc_id)
       +        await self.receive_commitment(chan)
       +        self.revoke(chan)
       +        sig_64, htlc_sigs = chan.sign_next_commitment()
       +        self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=0))
       +        await self.receive_revoke(chan)
       +        self.lnworker.save_channel(chan)
        
            async def pay(self, path, chan, amount_msat, payment_hash, pubkey_in_invoice, min_final_cltv_expiry):
                assert chan.get_state() == "OPEN", chan.get_state()
       t@@ -843,9 +856,9 @@ class Peer(PrintError):
                    hops_data += [OnionHopsDataSingle(OnionPerHop(route_edge.short_channel_id, amount_msat.to_bytes(8, "big"), final_cltv_expiry_without_deltas.to_bytes(4, "big")))]
                    total_fee += route_edge.channel_policy.fee_base_msat + ( amount_msat * route_edge.channel_policy.fee_proportional_millionths // 1000000 )
                associated_data = payment_hash
       -        self.secret_key = os.urandom(32)
       +        secret_key = os.urandom(32)
                hops_data += [OnionHopsDataSingle(OnionPerHop(b"\x00"*8, amount_msat.to_bytes(8, "big"), (final_cltv_expiry_without_deltas).to_bytes(4, "big")))]
       -        onion = new_onion_packet([x.node_id for x in route], self.secret_key, hops_data, associated_data)
       +        onion = new_onion_packet([x.node_id for x in route], secret_key, hops_data, associated_data)
                amount_msat += total_fee
                # FIXME this below will probably break with multiple HTLCs
                msat_local = chan.balance(LOCAL) - amount_msat
       t@@ -866,6 +879,7 @@ class Peer(PrintError):
                    raise PaymentFailure('not enough local balance')
        
                htlc_id = chan.add_htlc(htlc)
       +        chan.onion_keys[htlc_id] = secret_key
                self.send_message(gen_msg("update_add_htlc", channel_id=chan.channel_id, id=htlc_id, cltv_expiry=final_cltv_expiry_with_deltas, amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes()))
        
                self.attempted_route[(chan.channel_id, htlc_id)] = route
       t@@ -876,46 +890,6 @@ class Peer(PrintError):
                await self.receive_commitment(chan)
                self.revoke(chan)
        
       -        fulfill_coro = asyncio.ensure_future(self.update_fulfill_htlc[chan.channel_id].get())
       -        failure_coro = asyncio.ensure_future(self.update_fail_htlc[chan.channel_id].get())
       -
       -        done, pending = await asyncio.wait([fulfill_coro, failure_coro], return_when=FIRST_COMPLETED)
       -        # TODO what if HTLC gets stuck in multihop payment (A->B->C->D->E; on the way back C goes offline)
       -        payment_succeeded = False
       -        if failure_coro.done():
       -            fulfill_coro.cancel()
       -            sig_64, htlc_sigs = chan.sign_next_commitment()
       -            self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)))
       -            await self.receive_revoke(chan)
       -            chan.receive_fail_htlc(htlc_id)
       -            await self.receive_commitment(chan)
       -            self.revoke(chan)
       -            sig_64, htlc_sigs = chan.sign_next_commitment()
       -            res = failure_coro.result()
       -        else:
       -            failure_coro.cancel()
       -            update_fulfill_htlc_msg = fulfill_coro.result()
       -            preimage = update_fulfill_htlc_msg["payment_preimage"]
       -            chan.receive_htlc_settle(preimage, int.from_bytes(update_fulfill_htlc_msg["id"], "big"))
       -            await self.receive_commitment(chan)
       -            self.revoke(chan)
       -            # FIXME why is this not using the HTLC state machine?
       -            bare_ctx = chan.make_commitment(chan.remote_state.ctn + 1, False, chan.remote_state.next_per_commitment_point,
       -                msat_remote, msat_local)
       -            self.lnwatcher.process_new_offchain_ctx(chan, bare_ctx, ours=False)
       -            sig_64 = sign_and_get_sig_string(bare_ctx, chan.local_config, chan.remote_config)
       -            res = bh2u(preimage)
       -            payment_succeeded = True
       -
       -        self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=0))
       -        await self.receive_revoke(chan)
       -        self.lnworker.save_channel(chan)
       -
       -        if payment_succeeded:
       -            return res
       -        else:
       -            raise PaymentFailure(res)
       -
            async def receive_revoke(self, m):
                revoke_and_ack_msg = await self.revoke_and_ack[m.channel_id].get()
                m.receive_revocation(RevokeAndAck(revoke_and_ack_msg["per_commitment_secret"], revoke_and_ack_msg["next_per_commitment_point"]))
       t@@ -1016,10 +990,27 @@ class Peer(PrintError):
                self.lnworker.save_channel(chan)
                self.commitment_signed[channel_id].put_nowait(payload)
        
       -    def on_update_fulfill_htlc(self, payload):
       +    @aiosafe
       +    async def on_update_fulfill_htlc(self, update_fulfill_htlc_msg):
                self.print_error("update_fulfill")
       -        channel_id = payload["channel_id"]
       -        self.update_fulfill_htlc[channel_id].put_nowait(payload)
       +        chan = self.channels[update_fulfill_htlc_msg["channel_id"]]
       +        preimage = update_fulfill_htlc_msg["payment_preimage"]
       +        htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big")
       +        htlc = chan.lookup_htlc(chan.log[LOCAL], htlc_id)
       +        chan.receive_htlc_settle(preimage, htlc_id)
       +        msat_local = chan.balance(LOCAL) - htlc.amount_msat
       +        msat_remote = chan.balance(REMOTE) + htlc.amount_msat
       +        await self.receive_commitment(chan)
       +        self.revoke(chan)
       +        # FIXME why is this not using the HTLC state machine?
       +        bare_ctx = chan.make_commitment(chan.remote_state.ctn + 1, False, chan.remote_state.next_per_commitment_point,
       +            msat_remote, msat_local)
       +        self.lnwatcher.process_new_offchain_ctx(chan, bare_ctx, ours=False)
       +        sig_64 = sign_and_get_sig_string(bare_ctx, chan.local_config, chan.remote_config)
       +
       +        self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=0))
       +        await self.receive_revoke(chan)
       +        self.lnworker.save_channel(chan)
        
            def on_update_fail_malformed_htlc(self, payload):
                self.on_error(payload)
       t@@ -1084,11 +1075,8 @@ class Peer(PrintError):
                if chan_id not in self.closing_signed: raise Exception("Got unknown closing_signed")
                self.closing_signed[chan_id].put_nowait(payload)
        
       -    def on_shutdown(self, payload):
       -        coro = self.shutdown_coroutine(payload)
       -        asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
       -
       -    async def shutdown_coroutine(self, payload):
       +    @aiosafe
       +    async def on_shutdown(self, payload):
                # length of scripts allowed in BOLT-02
                if int.from_bytes(payload['len'], 'big') not in (3+20+2, 2+20+1, 2+20, 2+32):
                    raise Exception('scriptpubkey length in received shutdown message invalid: ' + str(payload['len']))
       t@@ -1103,7 +1091,7 @@ class Peer(PrintError):
                    except asyncio.TimeoutError:
                        pass
                    else:
       -                fee = closing_signed['fee_satoshis']
       +                fee = int.from_bytes(closing_signed['fee_satoshis'], 'big')
                        signature, _ = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'], fee_sat=fee)
                        self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature))
                self.print_error('REMOTE PEER CLOSED CHANNEL')
   DIR diff --git a/electrum/lnhtlc.py b/electrum/lnhtlc.py
       t@@ -159,6 +159,7 @@ class HTLCStateMachine(PrintError):
                self.funding_outpoint = Outpoint(**decodeAll(state["funding_outpoint"])) if type(state["funding_outpoint"]) is not Outpoint else state["funding_outpoint"]
                self.node_id = maybeDecode("node_id", state["node_id"]) if type(state["node_id"]) is not bytes else state["node_id"]
                self.short_channel_id = maybeDecode("short_channel_id", state["short_channel_id"]) if type(state["short_channel_id"]) is not bytes else state["short_channel_id"]
       +        self.onion_keys = {int(k): bfh(v) for k,v in state['onion_keys'].items()} if 'onion_keys' in state else {}
        
                # FIXME this is a tx serialised in the custom electrum partial tx format.
                # we should not persist txns in this format. we should persist htlcs, and be able to derive
       t@@ -689,6 +690,7 @@ class HTLCStateMachine(PrintError):
                        "remote_log": [(type(x).__name__, x) for x in self.log[REMOTE]],
                        "local_log": [(type(x).__name__, x) for x in self.log[LOCAL]],
                        "fee_updates": [x.to_save() for x in self.fee_mgr],
       +                "onion_keys": {str(k): bh2u(v) for k, v in self.onion_keys.items()},
                }
        
            def serialize(self):
   DIR diff --git a/electrum/tests/test_lnhtlc.py b/electrum/tests/test_lnhtlc.py
       t@@ -9,7 +9,7 @@ import electrum.util as util
        import os
        import binascii
        
       -from electrum.lnhtlc import SENT, LOCAL, REMOTE, RECEIVED
       +from electrum.lnutil import SENT, LOCAL, REMOTE, RECEIVED
        
        def create_channel_state(funding_txid, funding_index, funding_sat, local_feerate, is_initiator, local_amount, remote_amount, privkeys, other_pubkeys, seed, cur, nex, other_node_id, l_dust, r_dust, l_csv, r_csv):
            assert local_amount > 0
       t@@ -70,6 +70,7 @@ def create_channel_state(funding_txid, funding_index, funding_sat, local_feerate
                    "constraints":lnbase.ChannelConstraints(capacity=funding_sat, is_initiator=is_initiator, funding_txn_minimum_depth=3),
                    "node_id":other_node_id,
                    "remote_commitment_to_be_revoked": None,
       +            'onion_keys': {},
            }
        
        def bip32(sequence):