URI: 
       tchannel verifier: NetworkJobOnDefaultServer, and some error handling - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 17ccb79ca4d5b357249dcc64c57f049f8a21dc80
   DIR parent 5a081b2131e7e90b472b191d73ae6204704ade28
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Sun, 14 Oct 2018 18:07:48 +0200
       
       channel verifier: NetworkJobOnDefaultServer, and some error handling
       
       Diffstat:
         M electrum/daemon.py                  |       1 -
         M electrum/lnchannelverifier.py       |      81 ++++++++++++++++++++++---------
         M electrum/verifier.py                |       2 +-
       
       3 files changed, 59 insertions(+), 25 deletions(-)
       ---
   DIR diff --git a/electrum/daemon.py b/electrum/daemon.py
       t@@ -169,7 +169,6 @@ class Daemon(DaemonThread):
                    self.network.start([
                        self.fx.run,
                        self.network.lnwatcher.watchtower_task,
       -                self.network.channel_db.ca_verifier.main
                    ])
                self.start()
        
   DIR diff --git a/electrum/lnchannelverifier.py b/electrum/lnchannelverifier.py
       t@@ -26,37 +26,45 @@
        import asyncio
        import threading
        
       -from aiorpcx import TaskGroup
       +import aiorpcx
        
        from . import lnbase
        from . import bitcoin
        from . import ecc
        from . import constants
       -from .util import ThreadJob, bh2u, bfh
       +from .util import bh2u, bfh, NetworkJobOnDefaultServer
        from .lnutil import invert_short_channel_id, funding_output_script_from_keys
        from .verifier import verify_tx_is_in_block, MerkleVerificationFailure
        from .transaction import Transaction
       +from .interface import GracefulDisconnect
        
        
       -class LNChannelVerifier(ThreadJob):
       +class LNChannelVerifier(NetworkJobOnDefaultServer):
            """ Verify channel announcements for the Channel DB """
        
       +    # FIXME the initial routing sync is bandwidth-heavy, and the electrum server
       +    # will start throttling us, making it even slower. one option would be to
       +    # spread it over multiple servers.
       +
            def __init__(self, network, channel_db):
       -        self.network = network
       +        NetworkJobOnDefaultServer.__init__(self, network)
                self.channel_db = channel_db
                self.lock = threading.Lock()
       +        self.unverified_channel_info = {}  # short_channel_id -> channel_info
       +        # channel announcements that seem to be invalid:
       +        self.blacklist = set()  # short_channel_id
        
       -        # items only removed when whole verification succeeds for them.
       -        # fixme: if it fails, it will never succeed
       +    def _reset(self):
       +        super()._reset()
                self.started_verifying_channel = set()  # short_channel_id
        
       -        self.unverified_channel_info = {}  # short_channel_id -> channel_info
       -
            # TODO make async; and rm self.lock completely
            def add_new_channel_info(self, channel_info):
                short_channel_id = channel_info.channel_id
                if short_channel_id in self.unverified_channel_info:
                    return
       +        if short_channel_id in self.blacklist:
       +            return
                if not verify_sigs_for_channel_announcement(channel_info.msg_payload):
                    return
                with self.lock:
       t@@ -65,13 +73,16 @@ class LNChannelVerifier(ThreadJob):
            def get_pending_channel_info(self, short_channel_id):
                return self.unverified_channel_info.get(short_channel_id, None)
        
       +    async def _start_tasks(self):
       +        async with self.group as group:
       +            await group.spawn(self.main)
       +
            async def main(self):
                while True:
       -            async with TaskGroup() as group:
       -                await self.iteration(group)
       +            await self._verify_some_channels()
                    await asyncio.sleep(0.1)
        
       -    async def iteration(self, group: TaskGroup):
       +    async def _verify_some_channels(self):
                blockchain = self.network.blockchain()
                local_height = blockchain.height()
        
       t@@ -88,15 +99,22 @@ class LNChannelVerifier(ThreadJob):
                    header = blockchain.read_header(block_height)
                    if header is None:
                        if block_height < constants.net.max_checkpoint():
       -                    await group.spawn(self.network.request_chunk(block_height, None, can_return_early=True))
       +                    await self.group.spawn(self.network.request_chunk(block_height, None, can_return_early=True))
                        continue
       -            await group.spawn(self.verify_channel(block_height, tx_pos, short_channel_id))
       +            self.started_verifying_channel.add(short_channel_id)
       +            await self.group.spawn(self.verify_channel(block_height, tx_pos, short_channel_id))
                    #self.print_error('requested short_channel_id', bh2u(short_channel_id))
        
            async def verify_channel(self, block_height, tx_pos, short_channel_id):
       -        with self.lock:
       -            self.started_verifying_channel.add(short_channel_id)
       -        result = await self.network.get_txid_from_txpos(block_height, tx_pos, True)
       +        # we are verifying channel announcements as they are from untrusted ln peers.
       +        # we use electrum servers to do this. however we don't trust electrum servers either...
       +        try:
       +            result = await self.network.get_txid_from_txpos(block_height, tx_pos, True)
       +        except aiorpcx.jsonrpc.RPCError:
       +            # the electrum server is complaining about the tx_pos for given block.
       +            # it is not clear what to do now, but let's believe the server.
       +            self._blacklist_short_channel_id(short_channel_id)
       +            return
                tx_hash = result['tx_hash']
                merkle_branch = result['merkle']
                # we need to wait if header sync/reorg is still ongoing, hence lock:
       t@@ -105,17 +123,26 @@ class LNChannelVerifier(ThreadJob):
                try:
                    verify_tx_is_in_block(tx_hash, merkle_branch, tx_pos, header, block_height)
                except MerkleVerificationFailure as e:
       -            self.print_error(str(e))
       -            return
       -        tx = Transaction(await self.network.get_transaction(tx_hash))
       +            # the electrum server sent an incorrect proof. blame is on server, not the ln peer
       +            raise GracefulDisconnect(e) from e
       +        try:
       +            raw_tx = await self.network.get_transaction(tx_hash)
       +        except aiorpcx.jsonrpc.RPCError as e:
       +            # the electrum server can't find the tx; but it was the
       +            # one who told us about the txid!! blame is on server
       +            raise GracefulDisconnect(e) from e
       +        tx = Transaction(raw_tx)
                try:
                    tx.deserialize()
                except Exception:
       +            # either bug in client, or electrum server is evil.
       +            # if we connect to a diff server at some point, let's try again.
                    self.print_msg("cannot deserialize transaction, skipping", tx_hash)
                    return
                if tx_hash != tx.txid():
       -            self.print_error("received tx does not match expected txid ({} != {})"
       -                             .format(tx_hash, tx.txid()))
       +            # either bug in client, or electrum server is evil.
       +            # if we connect to a diff server at some point, let's try again.
       +            self.print_error(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
                    return
                # check funding output
                channel_info = self.unverified_channel_info[short_channel_id]
       t@@ -126,8 +153,12 @@ class LNChannelVerifier(ThreadJob):
                try:
                    actual_output = tx.outputs()[output_idx]
                except IndexError:
       +            self._blacklist_short_channel_id(short_channel_id)
                    return
                if expected_address != actual_output.address:
       +            # FIXME what now? best would be to ban the originating ln peer.
       +            self.print_error(f"funding output script mismatch for {bh2u(short_channel_id)}")
       +            self.started_verifying_channel.remove(short_channel_id)
                    return
                # put channel into channel DB
                channel_info.set_capacity(actual_output.value)
       t@@ -135,8 +166,12 @@ class LNChannelVerifier(ThreadJob):
                # remove channel from unverified
                with self.lock:
                    self.unverified_channel_info.pop(short_channel_id, None)
       -            try: self.started_verifying_channel.remove(short_channel_id)
       -            except KeyError: pass
       +        self.started_verifying_channel.remove(short_channel_id)
       +
       +    def _blacklist_short_channel_id(self, short_channel_id: bytes) -> None:
       +        self.blacklist.add(short_channel_id)
       +        with self.lock:
       +            self.unverified_channel_info.pop(short_channel_id, None)
        
        
        def verify_sigs_for_channel_announcement(chan_ann: dict) -> bool:
   DIR diff --git a/electrum/verifier.py b/electrum/verifier.py
       t@@ -122,7 +122,7 @@ class SPV(NetworkJobOnDefaultServer):
                        self.logger.info(f"skipping merkle proof check {tx_hash}")
                    else:
                        self.logger.info(repr(e))
       -                raise GracefulDisconnect(e)
       +                raise GracefulDisconnect(e) from e
                # we passed all the tests
                self.merkle_roots[tx_hash] = header.get('merkle_root')
                self.requested_merkle.discard(tx_hash)