URI: 
       tln: restore channels correctly after restart - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit ae3971259de76dda850ffdf949f786a1677d8c24
   DIR parent aafbe74a28a48a50369ab81133b341941f1077d1
  HTML Author: Janus <ysangkok@gmail.com>
       Date:   Tue, 29 May 2018 18:12:48 +0200
       
       ln: restore channels correctly after restart
       
       * save funding_locked_received: if a node already sent us
       funding_locked, save it to avoid superfluous messages
       
       * use Queues instead of Futures: this ensure that we don't error if we
       receive two messages of the same type, and in avoids having to delete
       futures in finally blocks. A queue monitor could be added to detect
       queue elements that are not popped.
       
       * request initial routing sync: since we don't store the graph yet, it
       is better to request the graph from the Peer so that we can route
       
       * channel_state cleanup: now each channel should have a state, which is
       initialized to OPENING and only marked OPEN once we have verified that
       tthe funding_tx has been mined
       
       Diffstat:
         M lib/lnbase.py                       |     116 +++++++++----------------------
         M lib/lnworker.py                     |      68 +++++++++++++++++++------------
       
       2 files changed, 73 insertions(+), 111 deletions(-)
       ---
   DIR diff --git a/lib/lnbase.py b/lib/lnbase.py
       t@@ -273,7 +273,7 @@ ChannelConfig = namedtuple("ChannelConfig", [
            "to_self_delay", "dust_limit_sat", "max_htlc_value_in_flight_msat", "max_accepted_htlcs"])
        OnlyPubkeyKeypair = namedtuple("OnlyPubkeyKeypair", ["pubkey"])
        RemoteState = namedtuple("RemoteState", ["ctn", "next_per_commitment_point", "amount_msat", "revocation_store", "last_per_commitment_point", "next_htlc_id"])
       -LocalState = namedtuple("LocalState", ["ctn", "per_commitment_secret_seed", "amount_msat", "next_htlc_id"])
       +LocalState = namedtuple("LocalState", ["ctn", "per_commitment_secret_seed", "amount_msat", "next_htlc_id", "funding_locked_received"])
        ChannelConstraints = namedtuple("ChannelConstraints", ["feerate", "capacity", "is_initiator", "funding_txn_minimum_depth"])
        OpenChannel = namedtuple("OpenChannel", ["channel_id", "short_channel_id", "funding_outpoint", "local_config", "remote_config", "remote_state", "local_state", "constraints", "node_id"])
        
       t@@ -580,22 +580,12 @@ class Peer(PrintError):
                self.path_finder = path_finder
                self.read_buffer = b''
                self.ping_time = 0
       -        self.futures = ["channel_accepted",
       -            "funding_signed",
       -            "local_funding_locked",
       -            "remote_funding_locked",
       -            "revoke_and_ack",
       -            "channel_reestablish",
       -            "update_fulfill_htlc",
       -            "commitment_signed"]
       -        self.channel_accepted = defaultdict(asyncio.Future)
       -        self.funding_signed = defaultdict(asyncio.Future)
       -        self.local_funding_locked = defaultdict(asyncio.Future)
       -        self.remote_funding_locked = defaultdict(asyncio.Future)
       -        self.revoke_and_ack = defaultdict(asyncio.Future)
       -        self.update_fulfill_htlc = defaultdict(asyncio.Future)
       -        self.commitment_signed = defaultdict(asyncio.Future)
       -        self.initialized = asyncio.Future()
       +        self.channel_accepted = defaultdict(asyncio.Queue)
       +        self.funding_signed = defaultdict(asyncio.Queue)
       +        self.remote_funding_locked = defaultdict(asyncio.Queue)
       +        self.revoke_and_ack = defaultdict(asyncio.Queue)
       +        self.update_fulfill_htlc = defaultdict(asyncio.Queue)
       +        self.commitment_signed = defaultdict(asyncio.Queue)
                self.localfeatures = (0x08 if request_initial_sync else 0)
                self.unfulfilled_htlcs = []
                self.channel_state = channel_state
       t@@ -703,11 +693,7 @@ class Peer(PrintError):
                f(payload)
        
            def on_error(self, payload):
       -        for i in self.futures:
       -            if payload["channel_id"] in getattr(self, i):
       -                getattr(self, i)[payload["channel_id"]].set_exception(LightningError(payload["data"]))
       -                return
       -        self.print_error("no future found to resolve", payload)
       +        self.print_error("error", payload)
        
            def on_ping(self, payload):
                l = int.from_bytes(payload['num_pong_bytes'], 'big')
       t@@ -716,17 +702,17 @@ class Peer(PrintError):
            def on_accept_channel(self, payload):
                temp_chan_id = payload["temporary_channel_id"]
                if temp_chan_id not in self.channel_accepted: raise Exception("Got unknown accept_channel")
       -        self.channel_accepted[temp_chan_id].set_result(payload)
       +        self.channel_accepted[temp_chan_id].put_nowait(payload)
        
            def on_funding_signed(self, payload):
                channel_id = int.from_bytes(payload['channel_id'], 'big')
                if channel_id not in self.funding_signed: raise Exception("Got unknown funding_signed")
       -        self.funding_signed[channel_id].set_result(payload)
       +        self.funding_signed[channel_id].put_nowait(payload)
        
            def on_funding_locked(self, payload):
                channel_id = int.from_bytes(payload['channel_id'], 'big')
                if channel_id not in self.funding_signed: print("Got unknown funding_locked", payload)
       -        self.remote_funding_locked[channel_id].set_result(payload)
       +        self.remote_funding_locked[channel_id].put_nowait(payload)
        
            def on_node_announcement(self, payload):
                pubkey = payload['node_id']
       t@@ -785,8 +771,6 @@ class Peer(PrintError):
                # read init
                msg = await self.read_message()
                self.process_message(msg)
       -        # initialized
       -        self.initialized.set_result(msg)
                # reestablish channels
                [await self.reestablish_channel(c) for c in self.channels]
                # loop
       t@@ -799,7 +783,6 @@ class Peer(PrintError):
                self.writer.close()
        
            async def channel_establishment_flow(self, wallet, config, password, funding_sat, push_msat, temp_channel_id):
       -        await self.initialized
                # see lnd/keychain/derivation.go
                keyfamilymultisig = 0
                keyfamilyrevocationbase = 1
       t@@ -849,10 +832,7 @@ class Peer(PrintError):
                    channel_reserve_satoshis=10
                )
                self.send_message(msg)
       -        try:
       -            payload = await self.channel_accepted[temp_channel_id]
       -        finally:
       -            del self.channel_accepted[temp_channel_id]
       +        payload = await self.channel_accepted[temp_channel_id].get()
                remote_per_commitment_point = payload['first_per_commitment_point']
                remote_config=ChannelConfig(
                    payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
       t@@ -903,16 +883,12 @@ class Peer(PrintError):
                sig_64 = sign_and_get_sig_string(remote_ctx, local_config, remote_config)
                funding_txid_bytes = bytes.fromhex(funding_txid)[::-1]
                channel_id = int.from_bytes(funding_txid_bytes, 'big') ^ funding_index
       -        self.channel_state[channel_id] = "OPENING"
                self.send_message(gen_msg("funding_created",
                    temporary_channel_id=temp_channel_id,
                    funding_txid=funding_txid_bytes,
                    funding_output_index=funding_index,
                    signature=sig_64))
       -        try:
       -            payload = await self.funding_signed[channel_id]
       -        finally:
       -            del self.funding_signed[channel_id]
       +        payload = await self.funding_signed[channel_id].get()
                self.print_error('received funding_signed')
                remote_sig = payload['signature']
                # verify remote signature
       t@@ -949,14 +925,14 @@ class Peer(PrintError):
                            ctn = 0,
                            per_commitment_secret_seed=per_commitment_secret_seed,
                            amount_msat=local_amount,
       -                    next_htlc_id = 0
       +                    next_htlc_id = 0,
       +                    funding_locked_received = False
                        ),
                        constraints=ChannelConstraints(capacity=funding_sat, feerate=local_feerate, is_initiator=True, funding_txn_minimum_depth=funding_txn_minimum_depth)
                )
                return chan
        
            async def reestablish_channel(self, chan):
       -        assert chan.channel_id not in self.channel_state
                self.send_message(gen_msg("channel_reestablish",
                    channel_id=chan.channel_id,
                    next_local_commitment_number=chan.local_state.ctn+1,
       t@@ -980,27 +956,21 @@ class Peer(PrintError):
                    raise Exception("expected local ctn {}, got {}".format(chan.local_state.ctn, local_ctn))
                if channel_reestablish_msg["my_current_per_commitment_point"] != chan.remote_state.last_per_commitment_point:
                    raise Exception("Remote PCP mismatch")
       -        self.channel_state[chan.channel_id] = "OPEN"
        
            async def funding_locked(self, chan):
                channel_id = chan.channel_id
       -        try:
       -            short_channel_id = await self.local_funding_locked[channel_id]
       -        finally:
       -            del self.local_funding_locked[channel_id]
       +        short_channel_id = chan.short_channel_id
        
                per_commitment_secret_index = 2**48 - 2
                per_commitment_point_second = secret_to_pubkey(int.from_bytes(
                    get_per_commitment_secret_from_seed(chan.local_state.per_commitment_secret_seed, per_commitment_secret_index), 'big'))
                self.send_message(gen_msg("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second))
                # wait until we receive funding_locked
       -        try:
       -            remote_funding_locked_msg = await self.remote_funding_locked[channel_id]
       -        finally:
       -            del self.remote_funding_locked[channel_id]
       +        remote_funding_locked_msg = await self.remote_funding_locked[channel_id].get()
                self.print_error('Done waiting for remote_funding_locked', remote_funding_locked_msg)
       -        self.channel_state[chan.channel_id] = "OPEN"
       -        return chan._replace(short_channel_id=short_channel_id, remote_state=chan.remote_state._replace(next_per_commitment_point=remote_funding_locked_msg["next_per_commitment_point"]))
       +        new_remote_state = chan.remote_state._replace(next_per_commitment_point=remote_funding_locked_msg["next_per_commitment_point"])
       +        new_local_state = chan.local_state._replace(funding_locked_received = True)
       +        return chan._replace(short_channel_id=short_channel_id, remote_state=new_remote_state, local_state=new_local_state)
        
            def on_update_fail_htlc(self, payload):
                print("UPDATE_FAIL_HTLC", decode_onion_error(payload["reason"], self.node_keys, self.secret_key))
       t@@ -1097,10 +1067,7 @@ class Peer(PrintError):
        
                self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=1, htlc_signature=htlc_sig))
        
       -        try:
       -            revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id]
       -        finally:
       -            del self.revoke_and_ack[chan.channel_id]
       +        revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get()
                # TODO check revoke_and_ack results
        
                last_secret, _, next_point = derive_and_incr()
       t@@ -1112,16 +1079,10 @@ class Peer(PrintError):
        
                print("waiting for update_fulfill")
        
       -        try:
       -            update_fulfill_htlc_msg = await self.update_fulfill_htlc[chan.channel_id]
       -        finally:
       -            del self.update_fulfill_htlc[chan.channel_id]
       +        update_fulfill_htlc_msg = await self.update_fulfill_htlc[chan.channel_id].get()
        
                print("waiting for commitment_signed")
       -        try:
       -            commitment_signed_msg = await self.commitment_signed[chan.channel_id]
       -        finally:
       -            del self.commitment_signed[chan.channel_id]
       +        commitment_signed_msg = await self.commitment_signed[chan.channel_id].get()
        
                last_secret, _, next_point = derive_and_incr()
                their_revstore.add_next_entry(last_secret)
       t@@ -1139,10 +1100,7 @@ class Peer(PrintError):
        
                self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=0))
        
       -        try:
       -            revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id]
       -        finally:
       -            del self.revoke_and_ack[chan.channel_id]
       +        revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get()
                # TODO check revoke_and_ack results
        
                return chan._replace(
       t@@ -1182,10 +1140,7 @@ class Peer(PrintError):
                their_revstore = chan.remote_state.revocation_store
        
                channel_id = chan.channel_id
       -        try:
       -            commitment_signed_msg = await self.commitment_signed[channel_id]
       -        finally:
       -            del self.commitment_signed[channel_id]
       +        commitment_signed_msg = await self.commitment_signed[channel_id].get()
        
                if int.from_bytes(commitment_signed_msg["num_htlcs"], "big") < 1:
                    while len(self.unfulfilled_htlcs) < 1:
       t@@ -1267,10 +1222,7 @@ class Peer(PrintError):
        
                self.send_message(gen_msg("commitment_signed", channel_id=channel_id, signature=sig_64, num_htlcs=1, htlc_signature=htlc_sig))
        
       -        try:
       -            revoke_and_ack_msg = await self.revoke_and_ack[channel_id]
       -        finally:
       -            del self.revoke_and_ack[channel_id]
       +        revoke_and_ack_msg = await self.revoke_and_ack[channel_id].get()
        
                # TODO check revoke_and_ack_msg contents
        
       t@@ -1285,17 +1237,11 @@ class Peer(PrintError):
                sig_64 = sign_and_get_sig_string(bare_ctx, chan.local_config, chan.remote_config)
        
                self.send_message(gen_msg("commitment_signed", channel_id=channel_id, signature=sig_64, num_htlcs=0))
       -        try:
       -            revoke_and_ack_msg = await self.revoke_and_ack[channel_id]
       -        finally:
       -            del self.revoke_and_ack[channel_id]
       +        revoke_and_ack_msg = await self.revoke_and_ack[channel_id].get()
        
                # TODO check revoke_and_ack results
        
       -        try:
       -            commitment_signed_msg = await self.commitment_signed[channel_id]
       -        finally:
       -            del self.commitment_signed[channel_id]
       +        commitment_signed_msg = await self.commitment_signed[channel_id].get()
        
                # TODO check commitment_signed results
        
       t@@ -1325,11 +1271,11 @@ class Peer(PrintError):
            def on_commitment_signed(self, payload):
                self.print_error("commitment_signed", payload)
                channel_id = int.from_bytes(payload['channel_id'], 'big')
       -        self.commitment_signed[channel_id].set_result(payload)
       +        self.commitment_signed[channel_id].put_nowait(payload)
        
            def on_update_fulfill_htlc(self, payload):
                channel_id = int.from_bytes(payload["channel_id"], 'big')
       -        self.update_fulfill_htlc[channel_id].set_result(payload)
       +        self.update_fulfill_htlc[channel_id].put_nowait(payload)
        
            def on_update_fail_malformed_htlc(self, payload):
                self.on_error(payload)
       t@@ -1343,7 +1289,7 @@ class Peer(PrintError):
        
            def on_revoke_and_ack(self, payload):
                channel_id = int.from_bytes(payload["channel_id"], 'big')
       -        self.revoke_and_ack[channel_id].set_result(payload)
       +        self.revoke_and_ack[channel_id].put_nowait(payload)
        
        
        def count_trailing_zeros(index):
   DIR diff --git a/lib/lnworker.py b/lib/lnworker.py
       t@@ -12,7 +12,7 @@ import asyncio
        
        from . import constants
        from .bitcoin import sha256, COIN
       -from .util import bh2u, bfh
       +from .util import bh2u, bfh, PrintError
        from .constants import set_testnet, set_simnet
        from .simple_config import SimpleConfig
        from .network import Network
       t@@ -86,7 +86,7 @@ node_list = [
        
        
        
       -class LNWorker:
       +class LNWorker(PrintError):
        
            def __init__(self, wallet, network):
                self.wallet = wallet
       t@@ -100,7 +100,7 @@ class LNWorker:
                self.path_finder = lnrouter.LNPathFinder(self.channel_db)
                self.channels = [reconstruct_namedtuples(x) for x in wallet.storage.get("channels", {})]
                peer_list = network.config.get('lightning_peers', node_list)
       -        self.channel_state = {}
       +        self.channel_state = {chan.channel_id: "OPENING" for chan in self.channels}
                for host, port, pubkey in peer_list:
                    self.add_peer(host, int(port), pubkey)
                # wait until we see confirmations
       t@@ -110,49 +110,65 @@ class LNWorker:
            def add_peer(self, host, port, pubkey):
                node_id = bfh(pubkey)
                channels = list(filter(lambda x: x.node_id == node_id, self.channels))
       -        peer = Peer(host, int(port), node_id, self.privkey, self.network, self.channel_db, self.path_finder, self.channel_state, channels)
       +        peer = Peer(host, int(port), node_id, self.privkey, self.network, self.channel_db, self.path_finder, self.channel_state, channels, request_initial_sync=True)
                self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop()))
                self.peers[node_id] = peer
        
            def save_channel(self, openchannel):
       +        if openchannel.channel_id not in self.channel_state:
       +            self.channel_state[openchannel.channel_id] = "OPENING"
                self.channels = [openchannel] # TODO multiple channels
                dumped = serialize_channels(self.channels)
                self.wallet.storage.put("channels", dumped)
                self.wallet.storage.write()
        
       +    def save_short_chan_id(self, chan):
       +        """
       +        Checks if the Funding TX has been mined. If it has save the short channel ID to disk and return the new OpenChannel.
       +
       +        If the Funding TX has not been mined, return None
       +        """
       +        assert self.channel_state[chan.channel_id] == "OPENING"
       +        peer = self.peers[chan.node_id]
       +        conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
       +        if conf >= chan.constraints.funding_txn_minimum_depth:
       +            block_height, tx_pos = self.wallet.get_txpos(chan.funding_outpoint.txid)
       +            if tx_pos == -1:
       +                self.print_error('funding tx is not yet SPV verified.. but there are '
       +                                 'already enough confirmations (currently {})'.format(conf))
       +                return None
       +            chan = chan._replace(short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index))
       +            self.save_channel(chan)
       +            return chan
       +        return None
       +
            def on_network_update(self, event, *args):
                for chan in self.channels:
       +            if self.channel_state[chan.channel_id] == "OPEN":
       +                continue
       +            chan = self.save_short_chan_id(chan)
       +            if not chan:
       +                self.print_error("network update but funding tx is still not at sufficient depth")
       +                continue
                    peer = self.peers[chan.node_id]
       -            conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
       -            if conf >= chan.constraints.funding_txn_minimum_depth:
       -                block_height, tx_pos = self.wallet.get_txpos(chan.funding_outpoint.txid)
       -                if tx_pos == -1:
       -                    self.print_error('funding tx is not yet SPV verified.. but there are '
       -                                     'already enough confirmations (currently {})'.format(conf))
       -                    return
       -                if chan.channel_id not in self.channel_state or self.channel_state[chan.channel_id] != "OPENING":
       -                    return
       -                asyncio.run_coroutine_threadsafe(self.set_local_funding_locked_result(peer, chan, block_height, tx_pos), asyncio.get_event_loop())
       +            asyncio.run_coroutine_threadsafe(self.wait_funding_locked_and_mark_open(peer, chan), asyncio.get_event_loop())
        
            # aiosafe because we don't wait for result
            @aiosafe
       -    async def set_local_funding_locked_result(self, peer, chan, block_height, tx_pos):
       -        channel_id = chan.channel_id
       -        short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
       -        try:
       -            peer.local_funding_locked[channel_id].set_result(short_channel_id)
       -        except (asyncio.InvalidStateError, KeyError) as e:
       -            # FIXME race condition if updates come in quickly, set_result might be called multiple times
       -            # or self.local_funding_locked[channel_id] might be deleted already
       -            self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e))
       -        openchannel = await peer.funding_locked(chan)
       -        self.save_channel(openchannel)
       -        print("CHANNEL OPENING COMPLETED")
       +    async def wait_funding_locked_and_mark_open(self, peer, chan):
       +        if self.channel_state[chan.channel_id] == "OPEN":
       +            return
       +        if not chan.local_state.funding_locked_received:
       +            chan = await peer.funding_locked(chan)
       +            self.save_channel(chan)
       +            self.print_error("CHANNEL OPENING COMPLETED")
       +        self.channel_state[chan.channel_id] = "OPEN"
        
            # not aiosafe because we call .result() which will propagate an exception
            async def _open_channel_coroutine(self, node_id, amount, push_msat, password):
                peer = self.peers[bfh(node_id)]
                openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32))
       +        self.print_error("SAVING OPENING CHANNEL")
                self.save_channel(openingchannel)
        
            def open_channel(self, node_id, local_amt, push_amt, pw):