URI: 
       tlnwatcher: more detailed logging, support notifying test suite of txs - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit c8dcf0b4715981b39a991bd06c2f0c544fc3c397
   DIR parent 795ba1f99d36aaeab7dd83023530dff214189e10
  HTML Author: Janus <ysangkok@gmail.com>
       Date:   Thu,  1 Nov 2018 18:49:39 +0100
       
       lnwatcher: more detailed logging, support notifying test suite of txs
       
       Diffstat:
         M electrum/lnutil.py                  |       3 +++
         M electrum/lnwatcher.py               |      43 +++++++++++++++++++++++++------
       
       2 files changed, 38 insertions(+), 8 deletions(-)
       ---
   DIR diff --git a/electrum/lnutil.py b/electrum/lnutil.py
       t@@ -617,6 +617,9 @@ class EncumberedTransaction(NamedTuple("EncumberedTransaction", [('name', str),
                d2['tx'] = Transaction(d['tx'])
                return EncumberedTransaction(**d2)
        
       +    def __str__(self):
       +        return super().__str__()[:-1] + ", txid: " + self.tx.txid() + ")"
       +
        
        NUM_MAX_HOPS_IN_PAYMENT_PATH = 20
        NUM_MAX_EDGES_IN_PAYMENT_PATH = NUM_MAX_HOPS_IN_PAYMENT_PATH + 1
   DIR diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py
       t@@ -8,6 +8,7 @@ import os
        from collections import defaultdict
        import asyncio
        from enum import IntEnum, auto
       +from typing import NamedTuple, Dict
        
        import jsonrpclib
        
       t@@ -20,6 +21,11 @@ from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGH
        if TYPE_CHECKING:
            from .network import Network
        
       +class ListenerItem(NamedTuple):
       +    # this is triggered when the lnwatcher is all done with the outpoint used as index in LNWatcher.tx_progress
       +    all_done : asyncio.Event
       +    # txs we broadcast are put on this queue so that the test can wait for them to get mined
       +    tx_queue : asyncio.Queue
        
        class TxMinedDepth(IntEnum):
            """ IntEnum because we call min() in get_deepest_tx_mined_depth_for_txids """
       t@@ -62,6 +68,9 @@ class LNWatcher(PrintError):
                watchtower_url = self.config.get('watchtower_url')
                self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None
                self.watchtower_queue = asyncio.Queue()
       +        # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
       +        # and a queue for seeing which txs are being published
       +        self.tx_progress = {} # type: Dict[str, ListenerItem]
        
            def with_watchtower(func):
                def wrapper(self, *args, **kwargs):
       t@@ -151,8 +160,9 @@ class LNWatcher(PrintError):
                    self.stop_and_delete(funding_outpoint)
        
            def stop_and_delete(self, funding_outpoint):
       +        if funding_outpoint in self.tx_progress:
       +            self.tx_progress[funding_outpoint].all_done.set()
                # TODO delete channel from watcher_db
       -        pass
        
            async def inspect_tx_candidate(self, funding_outpoint, prev_tx):
                """Returns True iff found any not-deeply-spent outputs that we could
       t@@ -164,6 +174,7 @@ class LNWatcher(PrintError):
                        self.watch_address(o.address)
                        not_yet_watching = True
                if not_yet_watching:
       +            self.print_error('prev_tx', prev_tx, 'not yet watching')
                    return True
                # get all possible responses we have
                prev_txid = prev_tx.txid()
       t@@ -171,10 +182,12 @@ class LNWatcher(PrintError):
                    encumbered_sweep_txns = self.sweepstore[funding_outpoint][prev_txid]
                if len(encumbered_sweep_txns) == 0:
                    if self.get_tx_mined_depth(prev_txid) == TxMinedDepth.DEEP:
       +                self.print_error(e_tx.name, 'have no follow-up transactions and prevtx mined deep, returning')
                        return False
                # check if any response applies
                keep_watching_this = False
                local_height = self.network.get_local_height()
       +        self.print_error(funding_outpoint, 'iterating over encumbered txs')
                for e_tx in list(encumbered_sweep_txns):
                    conflicts = self.addr_sync.get_conflicting_transactions(e_tx.tx.txid(), e_tx.tx, include_self=True)
                    conflict_mined_depth = self.get_deepest_tx_mined_depth_for_txids(conflicts)
       t@@ -188,32 +201,46 @@ class LNWatcher(PrintError):
                        broadcast = True
                        if e_tx.cltv_expiry:
                            if local_height > e_tx.cltv_expiry:
       -                        self.print_error('CLTV ({} > {}) fulfilled'.format(local_height, e_tx.cltv_expiry))
       +                        self.print_error(e_tx.name, 'CLTV ({} > {}) fulfilled'.format(local_height, e_tx.cltv_expiry))
                            else:
       -                        self.print_error('waiting for {}: CLTV ({} > {}), funding outpoint {} and tx {}'
       +                        self.print_error(e_tx.name, 'waiting for {}: CLTV ({} > {}), funding outpoint {} and tx {}'
                                        .format(e_tx.name, local_height, e_tx.cltv_expiry, funding_outpoint[:8], prev_tx.txid()[:8]))
                                broadcast = False
                        if e_tx.csv_delay:
                            if num_conf < e_tx.csv_delay:
       -                        self.print_error('waiting for {}: CSV ({} >= {}), funding outpoint {} and tx {}'
       +                        self.print_error(e_tx.name, 'waiting for {}: CSV ({} >= {}), funding outpoint {} and tx {}'
                                        .format(e_tx.name, num_conf, e_tx.csv_delay, funding_outpoint[:8], prev_tx.txid()[:8]))
                                broadcast = False
                        if broadcast:
       -                    if not await self.broadcast_or_log(e_tx):
       -                        self.print_error(f'encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}')
       +                    if not await self.broadcast_or_log(funding_outpoint, e_tx):
       +                        self.print_error(e_tx.name, f'could not publish encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}, prev_tx height:', tx_height, 'local_height', local_height)
                    else:
       -                # not mined or in mempool
       +                self.print_error(e_tx.name, 'status', conflict_mined_depth, 'recursing...')
       +                # mined or in mempool
                        keep_watching_this |= await self.inspect_tx_candidate(funding_outpoint, e_tx.tx)
        
                return keep_watching_this
        
       -    async def broadcast_or_log(self, e_tx):
       +    async def broadcast_or_log(self, funding_outpoint, e_tx):
       +        height = self.addr_sync.get_tx_height(e_tx.tx.txid()).height
       +        if height != TX_HEIGHT_LOCAL:
       +            return
       +        try:
       +            await self.network.get_transaction(e_tx.tx.txid())
       +        except:
       +            pass
       +        else:
       +            self.print_error('already published, returning')
       +            return
                try:
                    txid = await self.network.broadcast_transaction(e_tx.tx)
                except Exception as e:
                    self.print_error(f'broadcast: {e_tx.name}: failure: {repr(e)}')
                else:
                    self.print_error(f'broadcast: {e_tx.name}: success. txid: {txid}')
       +            if funding_outpoint in self.tx_progress:
       +                await self.tx_progress[funding_outpoint].tx_queue.put(e_tx)
       +            return txid
        
            @with_watchtower
            def add_sweep_tx(self, funding_outpoint: str, prev_txid: str, sweeptx):