URI: 
       tSynchronize watchtower asynchronously: - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit f7c05f2602d3b21c132a3a2c0be49947b0a35a0e
   DIR parent f060e53912cdbf58d0a0dbb186cd8a82d0d3068b
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Fri,  5 Jul 2019 14:42:09 +0200
       
       Synchronize watchtower asynchronously:
       
       - remove remote_commitment_to_be_revoked
       - pass old ctns to lnsweep.create_sweeptxs_for_watchtower
       - store the ctn of sweeptxs in sweepStore database
       - request the highest ctn from sweepstore using get_ctn
       - send sweeptxs asynchronously in LNWallet.sync_with_watchtower
       
       Diffstat:
         M electrum/daemon.py                  |      11 ++++++-----
         M electrum/gui/qt/lightning_dialog.py |       2 +-
         M electrum/lnchannel.py               |      18 +++++++++---------
         M electrum/lnpeer.py                  |      13 -------------
         M electrum/lnsweep.py                 |       1 -
         M electrum/lnwatcher.py               |     186 ++++++++++++++-----------------
         M electrum/lnworker.py                |      56 ++++++++++++++++++++++++++++---
         M electrum/network.py                 |       9 +++++----
         M electrum/tests/regtest/regtest.sh   |      27 +++++++++++++++++++++++++++
         M electrum/tests/test_regtest.py      |       3 +++
       
       10 files changed, 189 insertions(+), 137 deletions(-)
       ---
   DIR diff --git a/electrum/daemon.py b/electrum/daemon.py
       t@@ -122,12 +122,12 @@ def get_rpc_credentials(config: SimpleConfig) -> Tuple[str, str]:
            return rpc_user, rpc_password
        
        
       -class WatchTower(DaemonThread):
       +class WatchTowerServer(DaemonThread):
        
       -    def __init__(self, config, lnwatcher):
       +    def __init__(self, network):
                DaemonThread.__init__(self)
       -        self.config = config
       -        self.lnwatcher = lnwatcher
       +        self.config = network.config
       +        self.lnwatcher = network.local_watchtower
                self.start()
        
            def run(self):
       t@@ -136,6 +136,7 @@ class WatchTower(DaemonThread):
                server = SimpleJSONRPCServer((host, port), logRequests=True)
                server.register_function(self.lnwatcher.add_sweep_tx, 'add_sweep_tx')
                server.register_function(self.lnwatcher.add_channel, 'add_channel')
       +        server.register_function(self.lnwatcher.get_ctn, 'get_ctn')
                server.register_function(self.lnwatcher.get_num_tx, 'get_num_tx')
                server.timeout = 0.1
                while self.is_running():
       t@@ -165,7 +166,7 @@ class Daemon(DaemonThread):
                if listen_jsonrpc:
                    self.init_server(config, fd)
                # server-side watchtower
       -        self.watchtower = WatchTower(self.config, self.network.lnwatcher) if self.config.get('watchtower_host') else None
       +        self.watchtower = WatchTowerServer(self.network) if self.config.get('watchtower_host') else None
                if self.network:
                    self.network.start([
                        self.fx.run,
   DIR diff --git a/electrum/gui/qt/lightning_dialog.py b/electrum/gui/qt/lightning_dialog.py
       t@@ -72,7 +72,7 @@ class LightningDialog(QDialog):
                self.gui_object = gui_object
                self.config = gui_object.config
                self.network = gui_object.daemon.network
       -        self.lnwatcher = self.network.lnwatcher
       +        self.lnwatcher = self.network.local_watchtower
                self.setWindowTitle(_('Lightning'))
                self.setMinimumSize(600, 20)
                self.watcher_list = WatcherList(self)
   DIR diff --git a/electrum/lnchannel.py b/electrum/lnchannel.py
       t@@ -133,12 +133,6 @@ class Channel(Logger):
                self.onion_keys = str_bytes_dict_from_save(state.get('onion_keys', {}))
                self.force_closed = state.get('force_closed')
        
       -        # FIXME this is a tx serialised in the custom electrum partial tx format.
       -        # we should not persist txns in this format. we should persist htlcs, and be able to derive
       -        # any past commitment transaction and use that instead; until then...
       -        self.remote_commitment_to_be_revoked = Transaction(state["remote_commitment_to_be_revoked"])
       -        self.remote_commitment_to_be_revoked.deserialize(True)
       -
                log = state.get('log')
                self.hm = HTLCManager(local_ctn=self.config[LOCAL].ctn,
                                      remote_ctn=self.config[REMOTE].ctn,
       t@@ -187,7 +181,6 @@ class Channel(Logger):
                self.remote_commitment = self.current_commitment(REMOTE)
        
            def open_with_first_pcp(self, remote_pcp, remote_sig):
       -        self.remote_commitment_to_be_revoked = self.pending_commitment(REMOTE)
                self.config[REMOTE] = self.config[REMOTE]._replace(ctn=0, current_per_commitment_point=remote_pcp, next_per_commitment_point=None)
                self.config[LOCAL] = self.config[LOCAL]._replace(ctn=0, current_commitment_signature=remote_sig)
                self.hm.channel_open_finished()
       t@@ -450,7 +443,6 @@ class Channel(Logger):
                    next_per_commitment_point=revocation.next_per_commitment_point,
                )
                self.set_remote_commitment()
       -        self.remote_commitment_to_be_revoked = prev_remote_commitment
        
            def balance(self, whose, *, ctx_owner=HTLCOwner.LOCAL, ctn=None):
                """
       t@@ -540,6 +532,15 @@ class Channel(Logger):
                feerate = self.get_feerate(subject, ctn)
                return self.make_commitment(subject, this_point, ctn, feerate, False)
        
       +    def create_sweeptxs(self, ctn):
       +        from .lnsweep import create_sweeptxs_for_watchtower
       +        their_conf = self.config[REMOTE]
       +        feerate = self.get_feerate(REMOTE, ctn)
       +        secret = their_conf.revocation_store.retrieve_secret(RevocationStore.START_INDEX - ctn)
       +        point = secret_to_pubkey(int.from_bytes(secret, 'big'))
       +        ctx = self.make_commitment(REMOTE, point, ctn, feerate, False)
       +        return create_sweeptxs_for_watchtower(self, ctx, secret, self.sweep_address)
       +
            def get_current_ctn(self, subject):
                return self.config[subject].ctn
        
       t@@ -609,7 +610,6 @@ class Channel(Logger):
                        "constraints": self.constraints,
                        "funding_outpoint": self.funding_outpoint,
                        "node_id": self.node_id,
       -                "remote_commitment_to_be_revoked": str(self.remote_commitment_to_be_revoked),
                        "log": self.hm.to_save(),
                        "onion_keys": str_bytes_dict_to_save(self.onion_keys),
                        "force_closed": self.force_closed,
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -41,7 +41,6 @@ from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
                             MINIMUM_MAX_HTLC_VALUE_IN_FLIGHT_ACCEPTED, MAXIMUM_HTLC_MINIMUM_MSAT_ACCEPTED,
                             MAXIMUM_REMOTE_TO_SELF_DELAY_ACCEPTED, RemoteMisbehaving, DEFAULT_TO_SELF_DELAY)
        from .lnutil import FeeUpdate
       -from .lnsweep import create_sweeptxs_for_watchtower
        from .lntransport import LNTransport, LNTransportBase
        from .lnmsg import encode_msg, decode_msg
        from .interface import GracefulDisconnect
       t@@ -545,7 +544,6 @@ class Peer(Logger):
                    "remote_config": remote_config,
                    "local_config": local_config,
                    "constraints": ChannelConstraints(capacity=funding_sat, is_initiator=True, funding_txn_minimum_depth=funding_txn_minimum_depth),
       -            "remote_commitment_to_be_revoked": None,
                }
                chan = Channel(chan_dict,
                               sweep_address=self.lnworker.sweep_address,
       t@@ -633,7 +631,6 @@ class Peer(Logger):
                        ),
                        "local_config": local_config,
                        "constraints": ChannelConstraints(capacity=funding_sat, is_initiator=False, funding_txn_minimum_depth=min_depth),
       -                "remote_commitment_to_be_revoked": None,
                }
                chan = Channel(chan_dict,
                               sweep_address=self.lnworker.sweep_address,
       t@@ -1261,22 +1258,12 @@ class Peer(Logger):
                self.logger.info("on_revoke_and_ack")
                channel_id = payload["channel_id"]
                chan = self.channels[channel_id]
       -        ctx = chan.remote_commitment_to_be_revoked  # FIXME can't we just reconstruct it?
                rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
                chan.receive_revocation(rev)
                self._remote_changed_events[chan.channel_id].set()
                self._remote_changed_events[chan.channel_id].clear()
                self.lnworker.save_channel(chan)
                self.maybe_send_commitment(chan)
       -        asyncio.ensure_future(self._on_revoke_and_ack(chan, ctx, rev.per_commitment_secret))
       -
       -    @ignore_exceptions
       -    @log_exceptions
       -    async def _on_revoke_and_ack(self, chan, ctx, per_commitment_secret):
       -        outpoint = chan.funding_outpoint.to_str()
       -        sweeptxs = create_sweeptxs_for_watchtower(chan, ctx, per_commitment_secret, chan.sweep_address)
       -        for tx in sweeptxs:
       -            await self.lnworker.lnwatcher.add_sweep_tx(outpoint, tx.prevout(0), str(tx))
        
            def on_update_fee(self, payload):
                channel_id = payload["channel_id"]
   DIR diff --git a/electrum/lnsweep.py b/electrum/lnsweep.py
       t@@ -77,7 +77,6 @@ def create_sweeptxs_for_watchtower(chan: 'Channel', ctx: Transaction, per_commit
                    is_revocation=True)
        
            ctn = extract_ctn_from_tx_and_chan(ctx, chan)
       -    assert ctn == chan.config[REMOTE].ctn - 1
            # received HTLCs, in their ctx
            received_htlcs = chan.included_htlcs(REMOTE, RECEIVED, ctn)
            for htlc in received_htlcs:
   DIR diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py
       t@@ -41,10 +41,9 @@ class TxMinedDepth(IntEnum):
        create_sweep_txs="""
        CREATE TABLE IF NOT EXISTS sweep_txs (
        funding_outpoint VARCHAR(34) NOT NULL,
       -"index" INTEGER NOT NULL,
       +ctn INTEGER NOT NULL,
        prevout VARCHAR(34),
       -tx VARCHAR,
       -PRIMARY KEY(funding_outpoint, "index")
       +tx VARCHAR
        )"""
        
        create_channel_info="""
       t@@ -73,24 +72,15 @@ class SweepStore(SqlDB):
                return [Transaction(bh2u(r[0])) for r in c.fetchall()]
        
            @sql
       -    def get_tx_by_index(self, funding_outpoint, index):
       -        c = self.conn.cursor()
       -        c.execute("""SELECT prevout, tx FROM sweep_txs WHERE funding_outpoint=? AND "index"=?""", (funding_outpoint, index))
       -        r = c.fetchone()[0]
       -        return str(r[0]), bh2u(r[1])
       -
       -    @sql
            def list_sweep_tx(self):
                c = self.conn.cursor()
                c.execute("SELECT funding_outpoint FROM sweep_txs")
                return set([r[0] for r in c.fetchall()])
        
            @sql
       -    def add_sweep_tx(self, funding_outpoint, prevout, tx):
       +    def add_sweep_tx(self, funding_outpoint, ctn, prevout, tx):
                c = self.conn.cursor()
       -        c.execute("SELECT count(*) FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
       -        n = int(c.fetchone()[0])
       -        c.execute("""INSERT INTO sweep_txs (funding_outpoint, "index", prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, n, prevout, bfh(str(tx))))
       +        c.execute("""INSERT INTO sweep_txs (funding_outpoint, ctn, prevout, tx) VALUES (?,?,?,?)""", (funding_outpoint, ctn, prevout, bfh(str(tx))))
                self.conn.commit()
        
            @sql
       t@@ -100,13 +90,20 @@ class SweepStore(SqlDB):
                return int(c.fetchone()[0])
        
            @sql
       +    def get_ctn(self, outpoint, addr):
       +        if not self._has_channel(outpoint):
       +            self._add_channel(outpoint, addr)
       +        c = self.conn.cursor()
       +        c.execute("SELECT max(ctn) FROM sweep_txs WHERE funding_outpoint=?", (outpoint,))
       +        return int(c.fetchone()[0] or 0)
       +
       +    @sql
            def remove_sweep_tx(self, funding_outpoint):
                c = self.conn.cursor()
                c.execute("DELETE FROM sweep_txs WHERE funding_outpoint=?", (funding_outpoint,))
                self.conn.commit()
        
       -    @sql
       -    def add_channel(self, outpoint, address):
       +    def _add_channel(self, outpoint, address):
                c = self.conn.cursor()
                c.execute("INSERT INTO channel_info (address, outpoint) VALUES (?,?)", (address, outpoint))
                self.conn.commit()
       t@@ -117,8 +114,7 @@ class SweepStore(SqlDB):
                c.execute("DELETE FROM channel_info WHERE outpoint=?", (outpoint,))
                self.conn.commit()
        
       -    @sql
       -    def has_channel(self, outpoint):
       +    def _has_channel(self, outpoint):
                c = self.conn.cursor()
                c.execute("SELECT * FROM channel_info WHERE outpoint=?", (outpoint,))
                r = c.fetchone()
       t@@ -132,9 +128,9 @@ class SweepStore(SqlDB):
                return r[0] if r else None
        
            @sql
       -    def list_channel_info(self):
       +    def list_channels(self):
                c = self.conn.cursor()
       -        c.execute("SELECT address, outpoint FROM channel_info")
       +        c.execute("SELECT outpoint, address FROM channel_info")
                return [(r[0], r[1]) for r in c.fetchall()]
        
        
       t@@ -145,77 +141,22 @@ class LNWatcher(AddressSynchronizer):
            def __init__(self, network: 'Network'):
                AddressSynchronizer.__init__(self, JsonDB({}, manual_upgrades=False))
                self.config = network.config
       -        self.start_network(network)
       -        self.lock = threading.RLock()
       -        self.sweepstore = None
                self.channels = {}
       -        if self.config.get('sweepstore', False):
       -            self.sweepstore = SweepStore(os.path.join(network.config.path, "watchtower_db"), network)
       -        self.watchtower = None
       -        if self.config.get('watchtower_url'):
       -            self.set_remote_watchtower()
       +        self.network = network
                self.network.register_callback(self.on_network_update,
                                               ['network_updated', 'blockchain_updated', 'verified', 'wallet_updated'])
       -        # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
       -        # and a queue for seeing which txs are being published
       -        self.tx_progress = {} # type: Dict[str, ListenerItem]
                # status gets populated when we run
                self.channel_status = {}
        
            def get_channel_status(self, outpoint):
                return self.channel_status.get(outpoint, 'unknown')
        
       -    def set_remote_watchtower(self):
       -        watchtower_url = self.config.get('watchtower_url')
       -        try:
       -            self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None
       -        except:
       -            self.watchtower = None
       -            self.watchtower_queue = asyncio.Queue()
       -
       -    def get_num_tx(self, outpoint):
       -        if not self.sweepstore:
       -            return 0
       -        async def f():
       -            return await self.sweepstore.get_num_tx(outpoint)
       -        return self.network.run_from_another_thread(f())
       -
       -    def list_sweep_tx(self):
       -        if not self.sweepstore:
       -            return []
       -        async def f():
       -            return await self.sweepstore.list_sweep_tx()
       -        return self.network.run_from_another_thread(f())
       -
       -    @ignore_exceptions
       -    @log_exceptions
       -    async def watchtower_task(self):
       -        if not self.watchtower:
       -            return
       -        self.logger.info('watchtower task started')
       -        while True:
       -            outpoint, prevout, tx = await self.watchtower_queue.get()
       -            try:
       -                self.watchtower.add_sweep_tx(outpoint, prevout, tx)
       -                self.logger.info("transaction sent to watchtower")
       -            except ConnectionRefusedError:
       -                self.logger.info('could not reach watchtower, will retry in 5s')
       -                await asyncio.sleep(5)
       -                await self.watchtower_queue.put((outpoint, prevout, tx))
       -
            def add_channel(self, outpoint, address):
                self.add_address(address)
                self.channels[address] = outpoint
       -        #if self.sweepstore:
       -        #    if not await self.sweepstore.has_channel(outpoint):
       -        #        await self.sweepstore.add_channel(outpoint, address)
        
            async def unwatch_channel(self, address, funding_outpoint):
       -        self.logger.info(f'unwatching {funding_outpoint}')
       -        await self.sweepstore.remove_sweep_tx(funding_outpoint)
       -        await self.sweepstore.remove_channel(funding_outpoint)
       -        if funding_outpoint in self.tx_progress:
       -            self.tx_progress[funding_outpoint].all_done.set()
       +        pass
        
            @log_exceptions
            async def on_network_update(self, event, *args):
       t@@ -281,6 +222,44 @@ class LNWatcher(AddressSynchronizer):
                        result.update(r)
                return keep_watching, result
        
       +    def get_tx_mined_depth(self, txid: str):
       +        if not txid:
       +            return TxMinedDepth.FREE
       +        tx_mined_depth = self.get_tx_height(txid)
       +        height, conf = tx_mined_depth.height, tx_mined_depth.conf
       +        if conf > 100:
       +            return TxMinedDepth.DEEP
       +        elif conf > 0:
       +            return TxMinedDepth.SHALLOW
       +        elif height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
       +            return TxMinedDepth.MEMPOOL
       +        elif height == TX_HEIGHT_LOCAL:
       +            return TxMinedDepth.FREE
       +        elif height > 0 and conf == 0:
       +            # unverified but claimed to be mined
       +            return TxMinedDepth.MEMPOOL
       +        else:
       +            raise NotImplementedError()
       +
       +
       +class WatchTower(LNWatcher):
       +
       +    verbosity_filter = 'W'
       +
       +    def __init__(self, network):
       +        LNWatcher.__init__(self, network)
       +        self.network = network
       +        self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network)
       +        # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
       +        # and a queue for seeing which txs are being published
       +        self.tx_progress = {} # type: Dict[str, ListenerItem]
       +
       +    async def start_watching(self):
       +        # I need to watch the addresses from sweepstore
       +        l = await self.sweepstore.list_channels()
       +        for outpoint, address in l:
       +            self.add_channel(outpoint, address)
       +
            async def do_breach_remedy(self, funding_outpoint, spenders):
                for prevout, spender in spenders.items():
                    if spender is not None:
       t@@ -303,27 +282,34 @@ class LNWatcher(AddressSynchronizer):
                        await self.tx_progress[funding_outpoint].tx_queue.put(tx)
                    return txid
        
       -    async def add_sweep_tx(self, funding_outpoint: str, prevout: str, tx: str):
       -        if self.sweepstore:
       -            await self.sweepstore.add_sweep_tx(funding_outpoint, prevout, tx)
       -        if self.watchtower:
       -            self.watchtower_queue.put_nowait(funding_outpoint, prevout, tx)
       +    def get_ctn(self, outpoint, addr):
       +        async def f():
       +            return await self.sweepstore.get_ctn(outpoint, addr)
       +        return self.network.run_from_another_thread(f())
        
       -    def get_tx_mined_depth(self, txid: str):
       -        if not txid:
       -            return TxMinedDepth.FREE
       -        tx_mined_depth = self.get_tx_height(txid)
       -        height, conf = tx_mined_depth.height, tx_mined_depth.conf
       -        if conf > 100:
       -            return TxMinedDepth.DEEP
       -        elif conf > 0:
       -            return TxMinedDepth.SHALLOW
       -        elif height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
       -            return TxMinedDepth.MEMPOOL
       -        elif height == TX_HEIGHT_LOCAL:
       -            return TxMinedDepth.FREE
       -        elif height > 0 and conf == 0:
       -            # unverified but claimed to be mined
       -            return TxMinedDepth.MEMPOOL
       -        else:
       -            raise NotImplementedError()
       +    def get_num_tx(self, outpoint):
       +        async def f():
       +            return await self.sweepstore.get_num_tx(outpoint)
       +        return self.network.run_from_another_thread(f())
       +
       +    def add_sweep_tx(self, funding_outpoint: str, address:str, ctn:int, prevout: str, tx: str):
       +        async def f():
       +            return await self.sweepstore.add_sweep_tx(funding_outpoint, ctn, prevout, tx)
       +        return self.network.run_from_another_thread(f())
       +
       +    def list_sweep_tx(self):
       +        async def f():
       +            return await self.sweepstore.list_sweep_tx()
       +        return self.network.run_from_another_thread(f())
       +
       +    def list_channels(self):
       +        async def f():
       +            return await self.sweepstore.list_channels()
       +        return self.network.run_from_another_thread(f())
       +
       +    async def unwatch_channel(self, address, funding_outpoint):
       +        self.logger.info(f'unwatching {funding_outpoint}')
       +        await self.sweepstore.remove_sweep_tx(funding_outpoint)
       +        await self.sweepstore.remove_channel(funding_outpoint)
       +        if funding_outpoint in self.tx_progress:
       +            self.tx_progress[funding_outpoint].all_done.set()
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -28,6 +28,7 @@ from .transaction import Transaction
        from .crypto import sha256
        from .bip32 import BIP32Node
        from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
       +from .util import ignore_exceptions
        from .util import timestamp_to_datetime
        from .logging import Logger
        from .lntransport import LNTransport, LNResponderTransport
       t@@ -46,7 +47,6 @@ from .i18n import _
        from .lnrouter import RouteEdge, is_route_sane_to_use
        from .address_synchronizer import TX_HEIGHT_LOCAL
        from . import lnsweep
       -from .lnsweep import create_sweeptxs_for_their_ctx, create_sweeptxs_for_our_ctx
        from .lnwatcher import LNWatcher
        
        if TYPE_CHECKING:
       t@@ -300,7 +300,7 @@ class LNWallet(LNWorker):
                    node = BIP32Node.from_rootseed(seed, xtype='standard')
                    xprv = node.to_xprv()
                    self.storage.put('lightning_privkey2', xprv)
       -        super().__init__(xprv)
       +        LNWorker.__init__(self, xprv)
                self.ln_keystore = keystore.from_xprv(xprv)
                #self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ
                self.invoices = self.storage.get('lightning_invoices', {})        # RHASH -> (invoice, direction, is_paid)
       t@@ -317,13 +317,59 @@ class LNWallet(LNWorker):
                self.channel_timestamps = self.storage.get('lightning_channel_timestamps', {})
                self.pending_payments = defaultdict(asyncio.Future)
        
       +    @ignore_exceptions
       +    @log_exceptions
       +    async def sync_with_local_watchtower(self):
       +        watchtower = self.network.local_watchtower
       +        if watchtower:
       +            while True:
       +                for chan in self.channels.values():
       +                    await self.sync_channel_with_watchtower(chan, watchtower.sweepstore, True)
       +                await asyncio.sleep(5)
       +
       +    @ignore_exceptions
       +    @log_exceptions
       +    async def sync_with_remote_watchtower(self):
       +        # FIXME: jsonrpclib blocks the asyncio loop.
       +        # we should use aiohttp instead
       +        import jsonrpclib
       +        while True:
       +            watchtower_url = self.config.get('watchtower_url')
       +            if watchtower_url:
       +                watchtower = jsonrpclib.Server(watchtower_url)
       +                for chan in self.channels.values():
       +                    try:
       +                        await self.sync_channel_with_watchtower(chan, watchtower, False)
       +                    except ConnectionRefusedError:
       +                        self.logger.info(f'could not contact watchtower {watchtower_url}')
       +                        break
       +            await asyncio.sleep(5)
       +
       +    async def sync_channel_with_watchtower(self, chan, watchtower, is_local):
       +        outpoint = chan.funding_outpoint.to_str()
       +        addr = chan.get_funding_address()
       +        current_ctn = chan.get_current_ctn(REMOTE)
       +        if is_local:
       +            watchtower_ctn = await watchtower.get_ctn(outpoint, addr)
       +        else:
       +            watchtower_ctn = watchtower.get_ctn(outpoint, addr)
       +        for ctn in range(watchtower_ctn + 1, current_ctn):
       +            sweeptxs = chan.create_sweeptxs(ctn)
       +            self.logger.info(f'sync with watchtower: {outpoint}, {ctn}, {len(sweeptxs)}')
       +            for tx in sweeptxs:
       +                if is_local:
       +                    await watchtower.add_sweep_tx(outpoint, addr, ctn, tx.prevout(0), str(tx))
       +                else:
       +                    watchtower.add_sweep_tx(outpoint, addr, ctn, tx.prevout(0), str(tx))
       +
            def start_network(self, network: 'Network'):
       +        self.config = network.config
                self.lnwatcher = LNWatcher(network)
       +        self.lnwatcher.start_network(network)
                self.network = network
                self.network.register_callback(self.on_network_update, ['wallet_updated', 'network_updated', 'verified', 'fee'])  # thread safe
                self.network.register_callback(self.on_channel_open, ['channel_open'])
                self.network.register_callback(self.on_channel_closed, ['channel_closed'])
       -
                for chan_id, chan in self.channels.items():
                    self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
        
       t@@ -332,7 +378,9 @@ class LNWallet(LNWorker):
                        self.maybe_listen(),
                        self.on_network_update('network_updated'),  # shortcut (don't block) if funding tx locked and verified
                        self.lnwatcher.on_network_update('network_updated'),  # ping watcher to check our channels
       -                self.reestablish_peers_and_channels()
       +                self.reestablish_peers_and_channels(),
       +                self.sync_with_local_watchtower(),
       +                self.sync_with_remote_watchtower(),
                ]:
                    asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
        
   DIR diff --git a/electrum/network.py b/electrum/network.py
       t@@ -304,12 +304,12 @@ class Network(Logger):
                    from . import channel_db
                    self.channel_db = channel_db.ChannelDB(self)
                    self.path_finder = lnrouter.LNPathFinder(self.channel_db)
       -            self.lnwatcher = lnwatcher.LNWatcher(self)
                    self.lngossip = lnworker.LNGossip(self)
       +            self.local_watchtower = lnwatcher.WatchTower(self) if self.config.get('local_watchtower', True) else None
                else:
                    self.channel_db = None
       -            self.lnwatcher = None
                    self.lngossip = None
       +            self.local_watchtower = None
        
            def run_from_another_thread(self, coro, *, timeout=None):
                assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
       t@@ -1152,10 +1152,11 @@ class Network(Logger):
                self._set_oneserver(self.config.get('oneserver', False))
                self._start_interface(self.default_server)
        
       -        if self.lnwatcher:
       -            self._jobs.append(self.lnwatcher.watchtower_task)
                if self.lngossip:
                    self.lngossip.start_network(self)
       +        if self.local_watchtower:
       +            self.local_watchtower.start_network(self)
       +            await self.local_watchtower.start_watching()
        
                async def main():
                    try:
   DIR diff --git a/electrum/tests/regtest/regtest.sh b/electrum/tests/regtest/regtest.sh
       t@@ -301,3 +301,30 @@ if [[ $1 == "breach_with_spent_htlc" ]]; then
            fi
            echo "bob balance $balance"
        fi
       +
       +if [[ $1 == "watchtower" ]]; then
       +    # carol is a watchtower of alice
       +    $alice daemon stop
       +    $carol daemon stop
       +    $alice setconfig watchtower_url http://127.0.0.1:12345
       +    $carol setconfig watchtower_host 127.0.0.1
       +    $carol setconfig watchtower_port 12345
       +    $carol daemon -s 127.0.0.1:51001:t start
       +    $alice daemon -s 127.0.0.1:51001:t start
       +    $alice daemon load_wallet
       +    echo "waiting until alice funded"
       +    wait_until_funded
       +    echo "alice opens channel"
       +    bob_node=$($bob nodeid)
       +    channel=$($alice open_channel $bob_node 0.5)
       +    new_blocks 3
       +    wait_until_channel_open
       +    echo "alice pays bob"
       +    invoice1=$($bob addinvoice 0.05 "invoice1")
       +    $alice lnpay $invoice1
       +    invoice2=$($bob addinvoice 0.05 "invoice2")
       +    $alice lnpay $invoice2
       +    invoice3=$($bob addinvoice 0.05 "invoice3")
       +    $alice lnpay $invoice3
       +
       +fi
   DIR diff --git a/electrum/tests/test_regtest.py b/electrum/tests/test_regtest.py
       t@@ -38,3 +38,6 @@ class TestLightning(unittest.TestCase):
        
            def test_breach_with_spent_htlc(self):
                self.run_shell(['breach_with_spent_htlc'])
       +
       +    def test_watchtower(self):
       +        self.run_shell(['watchtower'])