URI: 
       tLNWatcher refactoring: - do not store non-breach transactions - send 'channel_open' and 'channel_closed' events - force-closed channels are handled by LNWorker - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 729ddb8ec30494ee9c22284625d2c0f3196ac987
   DIR parent 1b7a3c25d1fe8ecd880ced6b98b38420c6b542b3
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Tue,  4 Dec 2018 20:50:24 +0100
       
       LNWatcher refactoring:
        - do not store non-breach transactions
        - send 'channel_open' and 'channel_closed' events
        - force-closed channels are handled by LNWorker
       
       Diffstat:
         M electrum/lnchan.py                  |      22 ++++------------------
         M electrum/lnwatcher.py               |     141 ++++++++++++-------------------
         M electrum/lnworker.py                |      76 +++++++++++++++++++++++++++----
       
       3 files changed, 124 insertions(+), 115 deletions(-)
       ---
   DIR diff --git a/electrum/lnchan.py b/electrum/lnchan.py
       t@@ -43,8 +43,7 @@ from .lnutil import HTLC_TIMEOUT_WEIGHT, HTLC_SUCCESS_WEIGHT
        from .lnutil import funding_output_script, LOCAL, REMOTE, HTLCOwner, make_closing_tx, make_commitment_outputs
        from .lnutil import ScriptHtlc, PaymentFailure, calc_onchain_fees, RemoteMisbehaving, make_htlc_output_witness_script
        from .transaction import Transaction
       -from .lnsweep import (create_sweeptxs_for_our_latest_ctx, create_sweeptxs_for_their_latest_ctx,
       -                      create_sweeptxs_for_their_just_revoked_ctx)
       +from .lnsweep import create_sweeptxs_for_their_just_revoked_ctx
        
        
        class ChannelJsonEncoder(json.JSONEncoder):
       t@@ -204,10 +203,10 @@ class Channel(PrintError):
                for sub in (LOCAL, REMOTE):
                    self.log[sub].locked_in.update(self.log[sub].adds.keys())
        
       +        # used in lnworker.on_channel_closed
                self.local_commitment = self.current_commitment(LOCAL)
                self.remote_commitment = self.current_commitment(REMOTE)
        
       -
            def set_state(self, state: str):
                if self._state == 'FORCE_CLOSING':
                    assert state == 'FORCE_CLOSING', 'new state was not FORCE_CLOSING: ' + state
       t@@ -325,7 +324,7 @@ class Channel(PrintError):
                htlcsigs.sort()
                htlcsigs = [x[1] for x in htlcsigs]
        
       -        self.process_new_offchain_ctx(pending_remote_commitment, ours=False)
       +        self.remote_commitment = self.pending_commitment(REMOTE)
        
                # we can't know if this message arrives.
                # since we shouldn't actually throw away
       t@@ -390,7 +389,7 @@ class Channel(PrintError):
                    if self.constraints.is_initiator and self.pending_fee[FUNDEE_ACKED]:
                        self.pending_fee[FUNDER_SIGNED] = True
        
       -        self.process_new_offchain_ctx(pending_local_commitment, ours=True)
       +        self.local_commitment = self.pending_commitment(LOCAL)
        
            def verify_htlc(self, htlc: UpdateAddHtlc, htlc_sigs: Sequence[bytes], we_receive: bool) -> int:
                _, this_point, _ = self.points()
       t@@ -454,19 +453,6 @@ class Channel(PrintError):
                next_point = secret_to_pubkey(int.from_bytes(next_secret, 'big'))
                return last_secret, this_point, next_point
        
       -    # TODO don't presign txns for non-breach close
       -    def process_new_offchain_ctx(self, ctx: 'Transaction', ours: bool):
       -        if not self.lnwatcher:
       -            return
       -        outpoint = self.funding_outpoint.to_str()
       -        if ours:
       -            encumbered_sweeptxs = create_sweeptxs_for_our_latest_ctx(self, ctx, self.sweep_address)
       -        else:
       -            encumbered_sweeptxs = create_sweeptxs_for_their_latest_ctx(self, ctx, self.sweep_address)
       -        for prev_txid, encumbered_tx in encumbered_sweeptxs:
       -            if encumbered_tx is not None:
       -                self.lnwatcher.add_sweep_tx(outpoint, prev_txid, encumbered_tx.to_json())
       -
            def process_new_revocation_secret(self, per_commitment_secret: bytes):
                if not self.lnwatcher:
                    return
   DIR diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py
       t@@ -112,6 +112,15 @@ class LNWatcher(AddressSynchronizer):
                        self.channel_info[address] = outpoint
                    self.write_to_disk()
        
       +    def unwatch_channel(self, address, funding_outpoint):
       +        self.print_error('unwatching', funding_outpoint)
       +        with self.lock:
       +            self.channel_info.pop(address)
       +            self.sweepstore.pop(funding_outpoint)
       +            self.write_to_disk()
       +        if funding_outpoint in self.tx_progress:
       +            self.tx_progress[funding_outpoint].all_done.set()
       +
            @log_exceptions
            async def on_network_update(self, event, *args):
                if event in ('verified', 'wallet_updated'):
       t@@ -125,90 +134,54 @@ class LNWatcher(AddressSynchronizer):
                with self.lock:
                    channel_info_items = list(self.channel_info.items())
                for address, outpoint in channel_info_items:
       -            await self.check_onchain_situation(outpoint)
       +            await self.check_onchain_situation(address, outpoint)
        
       -    async def check_onchain_situation(self, funding_outpoint):
       -        txid, index = funding_outpoint.split(':')
       -        ctx_candidate_txid = self.spent_outpoints[txid].get(int(index))
       -        is_spent = ctx_candidate_txid is not None
       -        self.network.trigger_callback('channel_txo', funding_outpoint, is_spent)
       -        if not is_spent:
       -            return
       -        ctx_candidate = self.transactions.get(ctx_candidate_txid)
       -        if ctx_candidate is None:
       -            return
       -        #self.print_error("funding outpoint {} is spent by {}"
       -        #                 .format(funding_outpoint, ctx_candidate_txid))
       -        conf = self.get_tx_height(ctx_candidate_txid).conf
       -        # only care about confirmed and verified ctxs. TODO is this necessary?
       -        if conf == 0:
       -            return
       -        keep_watching_this = await self.inspect_tx_candidate(funding_outpoint, ctx_candidate)
       -        if not keep_watching_this:
       -            self.stop_and_delete(funding_outpoint)
       -
       -    def stop_and_delete(self, funding_outpoint):
       -        if funding_outpoint in self.tx_progress:
       -            self.tx_progress[funding_outpoint].all_done.set()
       -        # TODO delete channel from watcher_db
       -
       -    async def inspect_tx_candidate(self, funding_outpoint, prev_tx):
       -        """Returns True iff found any not-deeply-spent outputs that we could
       -        potentially sweep at some point."""
       -        # make sure we are subscribed to all outputs of tx
       -        not_yet_watching = False
       -        for o in prev_tx.outputs():
       +    async def check_onchain_situation(self, address, funding_outpoint):
       +        keep_watching, spenders = self.inspect_tx_candidate(funding_outpoint, 0)
       +        txid = spenders.get(funding_outpoint)
       +        if txid is None:
       +            self.network.trigger_callback('channel_open', funding_outpoint)
       +        else:
       +            self.network.trigger_callback('channel_closed', funding_outpoint, txid, spenders)
       +            await self.do_breach_remedy(funding_outpoint, spenders)
       +        if not keep_watching:
       +            self.unwatch_channel(address, funding_outpoint)
       +        else:
       +            self.print_error('we will keep_watching', funding_outpoint)
       +
       +    def inspect_tx_candidate(self, outpoint, n):
       +        # FIXME: instead of stopping recursion at n == 2,
       +        # we should detect which outputs are HTLCs
       +        prev_txid, index = outpoint.split(':')
       +        txid = self.spent_outpoints[prev_txid].get(int(index))
       +        result = {outpoint:txid}
       +        if txid is None:
       +            self.print_error('keep watching because outpoint is unspent')
       +            return True, result
       +        keep_watching = (self.get_tx_mined_depth(txid) != TxMinedDepth.DEEP)
       +        if keep_watching:
       +            self.print_error('keep watching because spending tx is not deep')
       +        tx = self.transactions[txid]
       +        for i, o in enumerate(tx.outputs()):
                    if o.address not in self.get_addresses():
                        self.add_address(o.address)
       -                not_yet_watching = True
       -        if not_yet_watching:
       -            self.print_error('prev_tx', prev_tx, 'not yet watching')
       -            return True
       -        # get all possible responses we have
       -        prev_txid = prev_tx.txid()
       -        with self.lock:
       -            encumbered_sweep_txns = self.sweepstore[funding_outpoint][prev_txid]
       -        if len(encumbered_sweep_txns) == 0:
       -            if self.get_tx_mined_depth(prev_txid) == TxMinedDepth.DEEP:
       -                self.print_error('have no follow-up transactions and prevtx', prev_txid, 'mined deep, returning')
       -                return False
       -            return True
       -        # check if any response applies
       -        keep_watching_this = False
       -        local_height = self.network.get_local_height()
       -        self.print_error(funding_outpoint, 'iterating over encumbered txs')
       -        for e_tx in list(encumbered_sweep_txns):
       -            conflicts = self.get_conflicting_transactions(e_tx.tx.txid(), e_tx.tx, include_self=True)
       -            conflict_mined_depth = self.get_deepest_tx_mined_depth_for_txids(conflicts)
       -            if conflict_mined_depth != TxMinedDepth.DEEP:
       -                keep_watching_this = True
       -            if conflict_mined_depth == TxMinedDepth.FREE:
       -                tx_height = self.get_tx_height(prev_txid).height
       -                if tx_height == TX_HEIGHT_LOCAL:
       -                    continue
       -                num_conf = local_height - tx_height + 1
       -                broadcast = True
       -                if e_tx.cltv_expiry:
       -                    if local_height > e_tx.cltv_expiry:
       -                        self.print_error(e_tx.name, 'CLTV ({} > {}) fulfilled'.format(local_height, e_tx.cltv_expiry))
       -                    else:
       -                        self.print_error(e_tx.name, 'waiting for {}: CLTV ({} > {}), funding outpoint {} and tx {}'
       -                                .format(e_tx.name, local_height, e_tx.cltv_expiry, funding_outpoint[:8], prev_tx.txid()[:8]))
       -                        broadcast = False
       -                if e_tx.csv_delay:
       -                    if num_conf < e_tx.csv_delay:
       -                        self.print_error(e_tx.name, 'waiting for {}: CSV ({} >= {}), funding outpoint {} and tx {}'
       -                                .format(e_tx.name, num_conf, e_tx.csv_delay, funding_outpoint[:8], prev_tx.txid()[:8]))
       -                        broadcast = False
       -                if broadcast:
       -                    if not await self.broadcast_or_log(funding_outpoint, e_tx):
       -                        self.print_error(e_tx.name, f'could not publish encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}, prev_tx height:', tx_height, 'local_height', local_height)
       -            else:
       -                self.print_error(e_tx.name, 'status', conflict_mined_depth, 'recursing...')
       -                # mined or in mempool
       -                keep_watching_this |= await self.inspect_tx_candidate(funding_outpoint, e_tx.tx)
       -
       -        return keep_watching_this
       +                keep_watching = True
       +            elif n < 2:
       +                k, r = self.inspect_tx_candidate(txid+':%d'%i, n+1)
       +                keep_watching |= k
       +                result.update(r)
       +        return keep_watching, result
       +
       +    async def do_breach_remedy(self, funding_outpoint, spenders):
       +        for prevout, spender in spenders.items():
       +            if spender is not None:
       +                continue
       +            prev_txid, prev_n = prevout.split(':')
       +            with self.lock:
       +                encumbered_sweep_txns = self.sweepstore[funding_outpoint][prev_txid]
       +            for prev_txid, e_tx in encumbered_sweep_txns:
       +                if not await self.broadcast_or_log(funding_outpoint, e_tx):
       +                    self.print_error(e_tx.name, f'could not publish encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}')
        
            async def broadcast_or_log(self, funding_outpoint, e_tx):
                height = self.get_tx_height(e_tx.tx.txid()).height
       t@@ -249,9 +222,3 @@ class LNWatcher(AddressSynchronizer):
                    return TxMinedDepth.MEMPOOL
                else:
                    raise NotImplementedError()
       -
       -    def get_deepest_tx_mined_depth_for_txids(self, set_of_txids: Iterable[str]):
       -        if not set_of_txids:
       -            return TxMinedDepth.FREE
       -        # note: using "min" as lower status values are deeper
       -        return min(map(self.get_tx_mined_depth, set_of_txids))
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -38,6 +38,7 @@ from .lnutil import (Outpoint, calc_short_channel_id, LNPeerAddr,
        from .i18n import _
        from .lnrouter import RouteEdge, is_route_sane_to_use
        from .address_synchronizer import TX_HEIGHT_LOCAL
       +from .lnsweep import create_sweeptxs_for_our_latest_ctx, create_sweeptxs_for_their_latest_ctx
        
        if TYPE_CHECKING:
            from .network import Network
       t@@ -88,7 +89,8 @@ class LNWorker(PrintError):
                self._add_peers_from_config()
                # wait until we see confirmations
                self.network.register_callback(self.on_network_update, ['wallet_updated', 'network_updated', 'verified', 'fee'])  # thread safe
       -        self.network.register_callback(self.on_channel_txo, ['channel_txo'])
       +        self.network.register_callback(self.on_channel_open, ['channel_open'])
       +        self.network.register_callback(self.on_channel_closed, ['channel_closed'])
                asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
                self.first_timestamp_requested = None
        
       t@@ -282,23 +284,77 @@ class LNWorker(PrintError):
                    return True, conf
                return False, conf
        
       -    def on_channel_txo(self, event, txo, is_spent: bool):
       +    def channel_by_txo(self, txo):
                with self.lock:
                    channels = list(self.channels.values())
                for chan in channels:
                    if chan.funding_outpoint.to_str() == txo:
       -                break
       -        else:
       +                return chan
       +
       +    def on_channel_open(self, event, funding_outpoint):
       +        chan = self.channel_by_txo(funding_outpoint)
       +        if not chan:
                    return
       -        chan.set_funding_txo_spentness(is_spent)
       -        if is_spent:
       -            if chan.get_state() != 'FORCE_CLOSING':
       -                chan.set_state("CLOSED")
       -                self.on_channels_updated()
       -            self.channel_db.remove_channel(chan.short_channel_id)
       +        self.print_error('on_channel_open', funding_outpoint)
       +        chan.set_funding_txo_spentness(False)
       +        # send event to GUI
                self.network.trigger_callback('channel', chan)
        
            @log_exceptions
       +    async def on_channel_closed(self, event, funding_outpoint, txid, spenders):
       +        chan = self.channel_by_txo(funding_outpoint)
       +        if not chan:
       +            return
       +        self.print_error('on_channel_closed', funding_outpoint)
       +        chan.set_funding_txo_spentness(True)
       +        if chan.get_state() != 'FORCE_CLOSING':
       +            chan.set_state("CLOSED")
       +            self.on_channels_updated()
       +        self.network.trigger_callback('channel', chan)
       +        # remove from channel_db
       +        self.channel_db.remove_channel(chan.short_channel_id)
       +        # sweep
       +        our_ctx = chan.local_commitment
       +        their_ctx = chan.remote_commitment
       +        if txid == our_ctx.txid():
       +            self.print_error('we force closed', funding_outpoint)
       +            # we force closed
       +            encumbered_sweeptxs = create_sweeptxs_for_our_latest_ctx(chan, our_ctx, chan.sweep_address)
       +        elif txid == their_ctx.txid():
       +            self.print_error('they force closed', funding_outpoint)
       +            # they force closed
       +            encumbered_sweeptxs = create_sweeptxs_for_their_latest_ctx(chan, their_ctx, chan.sweep_address)
       +        else:
       +            # cooperative close or breach
       +            self.print_error('not sure who closed', funding_outpoint)
       +            encumbered_sweeptxs = []
       +
       +        local_height = self.network.get_local_height()
       +        for prev_txid, e_tx in encumbered_sweeptxs:
       +            spender = spenders.get(prev_txid + ':0') # we assume output index is 0
       +            if spender is not None:
       +                self.print_error('prev_tx already spent', prev_txid)
       +                continue
       +            num_conf = self.network.lnwatcher.get_tx_height(prev_txid).conf
       +            broadcast = True
       +            if e_tx.cltv_expiry:
       +                if local_height > e_tx.cltv_expiry:
       +                    self.print_error(e_tx.name, 'CLTV ({} > {}) fulfilled'.format(local_height, e_tx.cltv_expiry))
       +                else:
       +                    self.print_error(e_tx.name, 'waiting for {}: CLTV ({} > {}), funding outpoint {} and tx {}'
       +                                     .format(e_tx.name, local_height, e_tx.cltv_expiry, funding_outpoint[:8], prev_txid[:8]))
       +                    broadcast = False
       +            if e_tx.csv_delay:
       +                if num_conf < e_tx.csv_delay:
       +                    self.print_error(e_tx.name, 'waiting for {}: CSV ({} >= {}), funding outpoint {} and tx {}'
       +                                     .format(e_tx.name, num_conf, e_tx.csv_delay, funding_outpoint[:8], prev_txid[:8]))
       +                    broadcast = False
       +            if broadcast:
       +                if not await self.network.lnwatcher.broadcast_or_log(funding_outpoint, e_tx):
       +                    self.print_error(e_tx.name, f'could not publish encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}, local_height', local_height)
       +
       +
       +    @log_exceptions
            async def on_network_update(self, event, *args):
                # TODO
                # Race discovered in save_channel (assertion failing):