URI: 
       tlnpeer.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tlnpeer.py (101473B)
       ---
            1 #!/usr/bin/env python3
            2 #
            3 # Copyright (C) 2018 The Electrum developers
            4 # Distributed under the MIT software license, see the accompanying
            5 # file LICENCE or http://www.opensource.org/licenses/mit-license.php
            6 
            7 import zlib
            8 from collections import OrderedDict, defaultdict
            9 import asyncio
           10 import os
           11 import time
           12 from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set
           13 from datetime import datetime
           14 import functools
           15 
           16 import aiorpcx
           17 from aiorpcx import TaskGroup
           18 
           19 from .crypto import sha256, sha256d
           20 from . import bitcoin, util
           21 from . import ecc
           22 from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
           23 from . import constants
           24 from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup,
           25                    UnrelatedTransactionException)
           26 from . import transaction
           27 from .transaction import PartialTxOutput, match_script_against_template
           28 from .logging import Logger
           29 from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment,
           30                       process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure,
           31                       ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
           32                       OnionFailureCodeMetaFlag)
           33 from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState
           34 from . import lnutil
           35 from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
           36                      RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
           37                      funding_output_script, get_per_commitment_secret_from_seed,
           38                      secret_to_pubkey, PaymentFailure, LnFeatures,
           39                      LOCAL, REMOTE, HTLCOwner,
           40                      ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED,
           41                      LightningPeerConnectionClosed, HandshakeFailed,
           42                      RemoteMisbehaving, ShortChannelID,
           43                      IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
           44                      LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx,
           45                      UpfrontShutdownScriptViolation)
           46 from .lnutil import FeeUpdate, channel_id_from_funding_tx
           47 from .lntransport import LNTransport, LNTransportBase
           48 from .lnmsg import encode_msg, decode_msg
           49 from .interface import GracefulDisconnect
           50 from .lnrouter import fee_for_edge_msat
           51 from .lnutil import ln_dummy_address
           52 from .json_db import StoredDict
           53 from .invoices import PR_PAID
           54 
           55 if TYPE_CHECKING:
           56     from .lnworker import LNGossip, LNWallet
           57     from .lnrouter import LNPaymentRoute
           58     from .transaction import PartialTransaction
           59 
           60 
           61 LN_P2P_NETWORK_TIMEOUT = 20
           62 
           63 
           64 class Peer(Logger):
           65     LOGGING_SHORTCUT = 'P'
           66 
           67     def __init__(
           68             self,
           69             lnworker: Union['LNGossip', 'LNWallet'],
           70             pubkey: bytes,
           71             transport: LNTransportBase,
           72             *, is_channel_backup= False):
           73 
           74         self.is_channel_backup = is_channel_backup
           75         self._sent_init = False  # type: bool
           76         self._received_init = False  # type: bool
           77         self.initialized = asyncio.Future()
           78         self.got_disconnected = asyncio.Event()
           79         self.querying = asyncio.Event()
           80         self.transport = transport
           81         self.pubkey = pubkey  # remote pubkey
           82         self.lnworker = lnworker
           83         self.privkey = self.transport.privkey  # local privkey
           84         self.features = self.lnworker.features  # type: LnFeatures
           85         self.their_features = LnFeatures(0)  # type: LnFeatures
           86         self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
           87         assert self.node_ids[0] != self.node_ids[1]
           88         self.network = lnworker.network
           89         self.ping_time = 0
           90         self.reply_channel_range = asyncio.Queue()
           91         # gossip uses a single queue to preserve message order
           92         self.gossip_queue = asyncio.Queue()
           93         self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
           94         self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
           95         self.temp_id_to_id = {}   # to forward error messages
           96         self.funding_created_sent = set() # for channels in PREOPENING
           97         self.funding_signed_sent = set()  # for channels in PREOPENING
           98         self.shutdown_received = {} # chan_id -> asyncio.Future()
           99         self.announcement_signatures = defaultdict(asyncio.Queue)
          100         self.orphan_channel_updates = OrderedDict()
          101         Logger.__init__(self)
          102         self.taskgroup = SilentTaskGroup()
          103         # HTLCs offered by REMOTE, that we started removing but are still active:
          104         self.received_htlcs_pending_removal = set()  # type: Set[Tuple[Channel, int]]
          105         self.received_htlc_removed_event = asyncio.Event()
          106         self._htlc_switch_iterstart_event = asyncio.Event()
          107         self._htlc_switch_iterdone_event = asyncio.Event()
          108 
          109     def send_message(self, message_name: str, **kwargs):
          110         assert type(message_name) is str
          111         self.logger.debug(f"Sending {message_name.upper()}")
          112         if message_name.upper() != "INIT" and not self.is_initialized():
          113             raise Exception("tried to send message before we are initialized")
          114         raw_msg = encode_msg(message_name, **kwargs)
          115         self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
          116         self.transport.send_bytes(raw_msg)
          117 
          118     def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
          119         is_commitment_signed = message_name == "commitment_signed"
          120         if not (message_name.startswith("update_") or is_commitment_signed):
          121             return
          122         assert channel_id
          123         chan = self.get_channel_by_id(channel_id)
          124         if not chan:
          125             raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
          126         chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
          127         if is_commitment_signed:
          128             # saving now, to ensure replaying updates works (in case of channel reestablishment)
          129             self.lnworker.save_channel(chan)
          130 
          131     def maybe_set_initialized(self):
          132         if self.initialized.done():
          133             return
          134         if self._sent_init and self._received_init:
          135             self.initialized.set_result(True)
          136 
          137     def is_initialized(self) -> bool:
          138         return (self.initialized.done()
          139                 and not self.initialized.cancelled()
          140                 and self.initialized.exception() is None
          141                 and self.initialized.result() is True)
          142 
          143     async def initialize(self):
          144         if isinstance(self.transport, LNTransport):
          145             await self.transport.handshake()
          146         features = self.features.for_init_message()
          147         b = int.bit_length(features)
          148         flen = b // 8 + int(bool(b % 8))
          149         self.send_message(
          150             "init", gflen=0, flen=flen,
          151             features=features,
          152             init_tlvs={
          153                 'networks':
          154                 {'chains': constants.net.rev_genesis_bytes()}
          155             })
          156         self._sent_init = True
          157         self.maybe_set_initialized()
          158 
          159     @property
          160     def channels(self) -> Dict[bytes, Channel]:
          161         return self.lnworker.channels_for_peer(self.pubkey)
          162 
          163     def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
          164         # note: this is faster than self.channels.get(channel_id)
          165         chan = self.lnworker.get_channel_by_id(channel_id)
          166         if not chan:
          167             return None
          168         if chan.node_id != self.pubkey:
          169             return None
          170         return chan
          171 
          172     def diagnostic_name(self):
          173         return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
          174 
          175     def ping_if_required(self):
          176         if time.time() - self.ping_time > 120:
          177             self.send_message('ping', num_pong_bytes=4, byteslen=4)
          178             self.ping_time = time.time()
          179 
          180     def process_message(self, message):
          181         message_type, payload = decode_msg(message)
          182         # only process INIT if we are a backup
          183         if self.is_channel_backup is True and message_type != 'init':
          184             return
          185         if message_type in self.ordered_messages:
          186             chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
          187             self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
          188         else:
          189             if message_type != 'error' and 'channel_id' in payload:
          190                 chan = self.get_channel_by_id(payload['channel_id'])
          191                 if chan is None:
          192                     raise Exception('Got unknown '+ message_type)
          193                 args = (chan, payload)
          194             else:
          195                 args = (payload,)
          196             try:
          197                 f = getattr(self, 'on_' + message_type)
          198             except AttributeError:
          199                 #self.logger.info("Received '%s'" % message_type.upper(), payload)
          200                 return
          201             # raw message is needed to check signature
          202             if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
          203                 payload['raw'] = message
          204             execution_result = f(*args)
          205             if asyncio.iscoroutinefunction(f):
          206                 asyncio.ensure_future(self.taskgroup.spawn(execution_result))
          207 
          208     def on_error(self, payload):
          209         self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
          210         chan_id = payload.get("channel_id")
          211         if chan_id in self.temp_id_to_id:
          212             chan_id = self.temp_id_to_id[chan_id]
          213         self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
          214 
          215     def on_ping(self, payload):
          216         l = payload['num_pong_bytes']
          217         self.send_message('pong', byteslen=l)
          218 
          219     def on_pong(self, payload):
          220         pass
          221 
          222     async def wait_for_message(self, expected_name, channel_id):
          223         q = self.ordered_message_queues[channel_id]
          224         name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
          225         if payload.get('error'):
          226             raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error')))
          227         if name != expected_name:
          228             raise Exception(f"Received unexpected '{name}'")
          229         return payload
          230 
          231     def on_init(self, payload):
          232         if self._received_init:
          233             self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
          234             return
          235         self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big"))
          236         their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big")
          237         self.their_features |= their_globalfeatures
          238         # check transitive dependencies for received features
          239         if not self.their_features.validate_transitive_dependencies():
          240             raise GracefulDisconnect("remote did not set all dependencies for the features they sent")
          241         # check if features are compatible, and set self.features to what we negotiated
          242         try:
          243             self.features = ln_compare_features(self.features, self.their_features)
          244         except IncompatibleLightningFeatures as e:
          245             self.initialized.set_exception(e)
          246             raise GracefulDisconnect(f"{str(e)}")
          247         # check that they are on the same chain as us, if provided
          248         their_networks = payload["init_tlvs"].get("networks")
          249         if their_networks:
          250             their_chains = list(chunks(their_networks["chains"], 32))
          251             if constants.net.rev_genesis_bytes() not in their_chains:
          252                 raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
          253         # all checks passed
          254         self.lnworker.on_peer_successfully_established(self)
          255         self._received_init = True
          256         self.maybe_set_initialized()
          257 
          258     def on_node_announcement(self, payload):
          259         if self.lnworker.channel_db:
          260             self.gossip_queue.put_nowait(('node_announcement', payload))
          261 
          262     def on_channel_announcement(self, payload):
          263         if self.lnworker.channel_db:
          264             self.gossip_queue.put_nowait(('channel_announcement', payload))
          265 
          266     def on_channel_update(self, payload):
          267         self.maybe_save_remote_update(payload)
          268         if self.lnworker.channel_db:
          269             self.gossip_queue.put_nowait(('channel_update', payload))
          270 
          271     def maybe_save_remote_update(self, payload):
          272         if not self.channels:
          273             return
          274         for chan in self.channels.values():
          275             if chan.short_channel_id == payload['short_channel_id']:
          276                 chan.set_remote_update(payload['raw'])
          277                 self.logger.info("saved remote_update")
          278                 break
          279         else:
          280             # Save (some bounded number of) orphan channel updates for later
          281             # as it might be for our own direct channel with this peer
          282             # (and we might not yet know the short channel id for that)
          283             # Background: this code is here to deal with a bug in LND,
          284             # see https://github.com/lightningnetwork/lnd/issues/3651
          285             # and https://github.com/lightningnetwork/lightning-rfc/pull/657
          286             # This code assumes gossip_queries is set. BOLT7: "if the
          287             # gossip_queries feature is negotiated, [a node] MUST NOT
          288             # send gossip it did not generate itself"
          289             short_channel_id = ShortChannelID(payload['short_channel_id'])
          290             self.logger.info(f'received orphan channel update {short_channel_id}')
          291             self.orphan_channel_updates[short_channel_id] = payload
          292             while len(self.orphan_channel_updates) > 25:
          293                 self.orphan_channel_updates.popitem(last=False)
          294 
          295     def on_announcement_signatures(self, chan: Channel, payload):
          296         if chan.config[LOCAL].was_announced:
          297             h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
          298         else:
          299             self.announcement_signatures[chan.channel_id].put_nowait(payload)
          300 
          301     def handle_disconnect(func):
          302         @functools.wraps(func)
          303         async def wrapper_func(self, *args, **kwargs):
          304             try:
          305                 return await func(self, *args, **kwargs)
          306             except GracefulDisconnect as e:
          307                 self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
          308             except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
          309                     aiorpcx.socks.SOCKSError) as e:
          310                 self.logger.info(f"Disconnecting: {repr(e)}")
          311             finally:
          312                 self.close_and_cleanup()
          313         return wrapper_func
          314 
          315     @ignore_exceptions  # do not kill outer taskgroup
          316     @log_exceptions
          317     @handle_disconnect
          318     async def main_loop(self):
          319         async with self.taskgroup as group:
          320             await group.spawn(self._message_loop())
          321             await group.spawn(self.htlc_switch())
          322             await group.spawn(self.query_gossip())
          323             await group.spawn(self.process_gossip())
          324 
          325     async def process_gossip(self):
          326         while True:
          327             await asyncio.sleep(5)
          328             if not self.network.lngossip:
          329                 continue
          330             chan_anns = []
          331             chan_upds = []
          332             node_anns = []
          333             while True:
          334                 name, payload = await self.gossip_queue.get()
          335                 if name == 'channel_announcement':
          336                     chan_anns.append(payload)
          337                 elif name == 'channel_update':
          338                     chan_upds.append(payload)
          339                 elif name == 'node_announcement':
          340                     node_anns.append(payload)
          341                 else:
          342                     raise Exception('unknown message')
          343                 if self.gossip_queue.empty():
          344                     break
          345             # verify in peer's TaskGroup so that we fail the connection
          346             self.verify_channel_announcements(chan_anns)
          347             self.verify_node_announcements(node_anns)
          348             if self.network.lngossip:
          349                 await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
          350 
          351     def verify_channel_announcements(self, chan_anns):
          352         for payload in chan_anns:
          353             h = sha256d(payload['raw'][2+256:])
          354             pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
          355             sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
          356             for pubkey, sig in zip(pubkeys, sigs):
          357                 if not ecc.verify_signature(pubkey, sig, h):
          358                     raise Exception('signature failed')
          359 
          360     def verify_node_announcements(self, node_anns):
          361         for payload in node_anns:
          362             pubkey = payload['node_id']
          363             signature = payload['signature']
          364             h = sha256d(payload['raw'][66:])
          365             if not ecc.verify_signature(pubkey, signature, h):
          366                 raise Exception('signature failed')
          367 
          368     async def query_gossip(self):
          369         try:
          370             await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
          371         except Exception as e:
          372             raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
          373         if self.lnworker == self.lnworker.network.lngossip:
          374             try:
          375                 ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
          376             except asyncio.TimeoutError as e:
          377                 raise GracefulDisconnect("query_channel_range timed out") from e
          378             self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
          379             await self.lnworker.add_new_ids(ids)
          380             while True:
          381                 todo = self.lnworker.get_ids_to_query()
          382                 if not todo:
          383                     await asyncio.sleep(1)
          384                     continue
          385                 await self.get_short_channel_ids(todo)
          386 
          387     async def get_channel_range(self):
          388         first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
          389         num_blocks = self.lnworker.network.get_local_height() - first_block
          390         self.query_channel_range(first_block, num_blocks)
          391         intervals = []
          392         ids = set()
          393         # note: implementations behave differently...
          394         # "sane implementation that follows BOLT-07" example:
          395         #   query_channel_range. <<< first_block 497000, num_blocks 79038
          396         #   on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
          397         #   on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
          398         #   on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
          399         #   on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
          400         # lnd example:
          401         #   query_channel_range. <<< first_block 497000, num_blocks 79038
          402         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
          403         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
          404         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
          405         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
          406         #   on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
          407         while True:
          408             index, num, complete, _ids = await self.reply_channel_range.get()
          409             ids.update(_ids)
          410             intervals.append((index, index+num))
          411             intervals.sort()
          412             while len(intervals) > 1:
          413                 a,b = intervals[0]
          414                 c,d = intervals[1]
          415                 if not (a <= c and a <= b and c <= d):
          416                     raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
          417                 if b >= c:
          418                     intervals = [(a,d)] + intervals[2:]
          419                 else:
          420                     break
          421             if len(intervals) == 1 and complete:
          422                 a, b = intervals[0]
          423                 if a <= first_block and b >= first_block + num_blocks:
          424                     break
          425         return ids, complete
          426 
          427     def request_gossip(self, timestamp=0):
          428         if timestamp == 0:
          429             self.logger.info('requesting whole channel graph')
          430         else:
          431             self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}')
          432         self.send_message(
          433             'gossip_timestamp_filter',
          434             chain_hash=constants.net.rev_genesis_bytes(),
          435             first_timestamp=timestamp,
          436             timestamp_range=b'\xff'*4)
          437 
          438     def query_channel_range(self, first_block, num_blocks):
          439         self.logger.info(f'query channel range {first_block} {num_blocks}')
          440         self.send_message(
          441             'query_channel_range',
          442             chain_hash=constants.net.rev_genesis_bytes(),
          443             first_blocknum=first_block,
          444             number_of_blocks=num_blocks)
          445 
          446     def decode_short_ids(self, encoded):
          447         if encoded[0] == 0:
          448             decoded = encoded[1:]
          449         elif encoded[0] == 1:
          450             decoded = zlib.decompress(encoded[1:])
          451         else:
          452             raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
          453         ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
          454         return ids
          455 
          456     def on_reply_channel_range(self, payload):
          457         first = payload['first_blocknum']
          458         num = payload['number_of_blocks']
          459         complete = bool(int.from_bytes(payload['complete'], 'big'))
          460         encoded = payload['encoded_short_ids']
          461         ids = self.decode_short_ids(encoded)
          462         #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}")
          463         self.reply_channel_range.put_nowait((first, num, complete, ids))
          464 
          465     async def get_short_channel_ids(self, ids):
          466         self.logger.info(f'Querying {len(ids)} short_channel_ids')
          467         assert not self.querying.is_set()
          468         self.query_short_channel_ids(ids)
          469         await self.querying.wait()
          470         self.querying.clear()
          471 
          472     def query_short_channel_ids(self, ids, compressed=True):
          473         ids = sorted(ids)
          474         s = b''.join(ids)
          475         encoded = zlib.compress(s) if compressed else s
          476         prefix = b'\x01' if compressed else b'\x00'
          477         self.send_message(
          478             'query_short_channel_ids',
          479             chain_hash=constants.net.rev_genesis_bytes(),
          480             len=1+len(encoded),
          481             encoded_short_ids=prefix+encoded)
          482 
          483     async def _message_loop(self):
          484         try:
          485             await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
          486         except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
          487             raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
          488         async for msg in self.transport.read_messages():
          489             self.process_message(msg)
          490             await asyncio.sleep(.01)
          491 
          492     def on_reply_short_channel_ids_end(self, payload):
          493         self.querying.set()
          494 
          495     def close_and_cleanup(self):
          496         try:
          497             if self.transport:
          498                 self.transport.close()
          499         except:
          500             pass
          501         self.lnworker.peer_closed(self)
          502         self.got_disconnected.set()
          503 
          504     def is_static_remotekey(self):
          505         return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
          506 
          507     def is_upfront_shutdown_script(self):
          508         return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
          509 
          510     def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
          511         if msg_identifier not in ['accept', 'open']:
          512             raise ValueError("msg_identifier must be either 'accept' or 'open'")
          513 
          514         uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
          515             'upfront_shutdown_script')
          516 
          517         if uss_tlv and self.is_upfront_shutdown_script():
          518             upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
          519         else:
          520             upfront_shutdown_script = b''
          521         self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
          522         return upfront_shutdown_script
          523 
          524     def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
          525         channel_seed = os.urandom(32)
          526         initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
          527 
          528         static_remotekey = None
          529         # sending empty bytes as the upfront_shutdown_script will give us the
          530         # flexibility to decide an address at closing time
          531         upfront_shutdown_script = b''
          532 
          533         if self.is_static_remotekey():
          534             wallet = self.lnworker.wallet
          535             assert wallet.txin_type == 'p2wpkh'
          536             addr = wallet.get_new_sweep_address_for_channel()
          537             static_remotekey = bfh(wallet.get_public_key(addr))
          538         else:
          539             static_remotekey = None
          540         dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY
          541         reserve_sat = max(funding_sat // 100, dust_limit_sat)
          542         # for comparison of defaults, see
          543         # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
          544         # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
          545         # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
          546         local_config = LocalConfig.from_seed(
          547             channel_seed=channel_seed,
          548             static_remotekey=static_remotekey,
          549             upfront_shutdown_script=upfront_shutdown_script,
          550             to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144),
          551             dust_limit_sat=dust_limit_sat,
          552             max_htlc_value_in_flight_msat=funding_sat * 1000,
          553             max_accepted_htlcs=30,
          554             initial_msat=initial_msat,
          555             reserve_sat=reserve_sat,
          556             funding_locked_received=False,
          557             was_announced=False,
          558             current_commitment_signature=None,
          559             current_htlc_signatures=b'',
          560             htlc_minimum_msat=1,
          561         )
          562         local_config.validate_params(funding_sat=funding_sat)
          563         return local_config
          564 
          565     def temporarily_reserve_funding_tx_change_address(func):
          566         # During the channel open flow, if we initiated, we might have used a change address
          567         # of ours in the funding tx. The funding tx is not part of the wallet history
          568         # at that point yet, but we should already consider this change address as 'used'.
          569         @functools.wraps(func)
          570         async def wrapper(self: 'Peer', *args, **kwargs):
          571             funding_tx = kwargs['funding_tx']  # type: PartialTransaction
          572             wallet = self.lnworker.wallet
          573             change_addresses = [txout.address for txout in funding_tx.outputs()
          574                                 if wallet.is_change(txout.address)]
          575             for addr in change_addresses:
          576                 wallet.set_reserved_state_of_address(addr, reserved=True)
          577             try:
          578                 return await func(self, *args, **kwargs)
          579             finally:
          580                 for addr in change_addresses:
          581                     self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
          582         return wrapper
          583 
          584     @log_exceptions
          585     @temporarily_reserve_funding_tx_change_address
          586     async def channel_establishment_flow(
          587             self, *,
          588             funding_tx: 'PartialTransaction',
          589             funding_sat: int,
          590             push_msat: int,
          591             temp_channel_id: bytes
          592     ) -> Tuple[Channel, 'PartialTransaction']:
          593         """Implements the channel opening flow.
          594 
          595         -> open_channel message
          596         <- accept_channel message
          597         -> funding_created message
          598         <- funding_signed message
          599 
          600         Channel configurations are initialized in this method.
          601         """
          602         # will raise if init fails
          603         await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
          604         # trampoline is not yet in features
          605         if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey):
          606             raise Exception('Not a trampoline node: ' + str(self.their_features))
          607 
          608         feerate = self.lnworker.current_feerate_per_kw()
          609         local_config = self.make_local_config(funding_sat, push_msat, LOCAL)
          610 
          611         if funding_sat > LN_MAX_FUNDING_SAT:
          612             raise Exception(
          613                 f"MUST set funding_satoshis to less than 2^24 satoshi. "
          614                 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
          615         if push_msat > 1000 * funding_sat:
          616             raise Exception(
          617                 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
          618                 f"{push_msat} msat > {1000 * funding_sat} msat")
          619         if funding_sat < lnutil.MIN_FUNDING_SAT:
          620             raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
          621 
          622         # for the first commitment transaction
          623         per_commitment_secret_first = get_per_commitment_secret_from_seed(
          624             local_config.per_commitment_secret_seed,
          625             RevocationStore.START_INDEX
          626         )
          627         per_commitment_point_first = secret_to_pubkey(
          628             int.from_bytes(per_commitment_secret_first, 'big'))
          629         self.send_message(
          630             "open_channel",
          631             temporary_channel_id=temp_channel_id,
          632             chain_hash=constants.net.rev_genesis_bytes(),
          633             funding_satoshis=funding_sat,
          634             push_msat=push_msat,
          635             dust_limit_satoshis=local_config.dust_limit_sat,
          636             feerate_per_kw=feerate,
          637             max_accepted_htlcs=local_config.max_accepted_htlcs,
          638             funding_pubkey=local_config.multisig_key.pubkey,
          639             revocation_basepoint=local_config.revocation_basepoint.pubkey,
          640             htlc_basepoint=local_config.htlc_basepoint.pubkey,
          641             payment_basepoint=local_config.payment_basepoint.pubkey,
          642             delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
          643             first_per_commitment_point=per_commitment_point_first,
          644             to_self_delay=local_config.to_self_delay,
          645             max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
          646             channel_flags=0x00,  # not willing to announce channel
          647             channel_reserve_satoshis=local_config.reserve_sat,
          648             htlc_minimum_msat=local_config.htlc_minimum_msat,
          649             open_channel_tlvs={
          650                 'upfront_shutdown_script':
          651                     {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
          652             }
          653         )
          654 
          655         # <- accept_channel
          656         payload = await self.wait_for_message('accept_channel', temp_channel_id)
          657         remote_per_commitment_point = payload['first_per_commitment_point']
          658         funding_txn_minimum_depth = payload['minimum_depth']
          659         if funding_txn_minimum_depth <= 0:
          660             raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
          661         if funding_txn_minimum_depth > 30:
          662             raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
          663 
          664         upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
          665             payload, 'accept')
          666 
          667         remote_config = RemoteConfig(
          668             payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
          669             multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
          670             htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
          671             delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
          672             revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
          673             to_self_delay=payload['to_self_delay'],
          674             dust_limit_sat=payload['dust_limit_satoshis'],
          675             max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
          676             max_accepted_htlcs=payload["max_accepted_htlcs"],
          677             initial_msat=push_msat,
          678             reserve_sat=payload["channel_reserve_satoshis"],
          679             htlc_minimum_msat=payload['htlc_minimum_msat'],
          680             next_per_commitment_point=remote_per_commitment_point,
          681             current_per_commitment_point=None,
          682             upfront_shutdown_script=upfront_shutdown_script
          683         )
          684         remote_config.validate_params(funding_sat=funding_sat)
          685         # if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message:
          686         #     MUST reject the channel.
          687         if remote_config.reserve_sat < local_config.dust_limit_sat:
          688             raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat")
          689         # if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis:
          690         #     MUST reject the channel.
          691         if local_config.reserve_sat < remote_config.dust_limit_sat:
          692             raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat")
          693 
          694         # -> funding created
          695         # replace dummy output in funding tx
          696         redeem_script = funding_output_script(local_config, remote_config)
          697         funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
          698         funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
          699         dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)
          700         funding_tx.outputs().remove(dummy_output)
          701         funding_tx.add_outputs([funding_output])
          702         funding_tx.set_rbf(False)
          703         if not funding_tx.is_segwit():
          704             raise Exception('Funding transaction is not segwit')
          705         funding_txid = funding_tx.txid()
          706         assert funding_txid
          707         funding_index = funding_tx.outputs().index(funding_output)
          708         # build remote commitment transaction
          709         channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
          710         outpoint = Outpoint(funding_txid, funding_index)
          711         constraints = ChannelConstraints(
          712             capacity=funding_sat,
          713             is_initiator=True,
          714             funding_txn_minimum_depth=funding_txn_minimum_depth
          715         )
          716         chan_dict = self.create_channel_storage(
          717             channel_id, outpoint, local_config, remote_config, constraints)
          718         chan = Channel(
          719             chan_dict,
          720             sweep_address=self.lnworker.sweep_address,
          721             lnworker=self.lnworker,
          722             initial_feerate=feerate
          723         )
          724         chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
          725         if isinstance(self.transport, LNTransport):
          726             chan.add_or_update_peer_addr(self.transport.peer_addr)
          727         sig_64, _ = chan.sign_next_commitment()
          728         self.temp_id_to_id[temp_channel_id] = channel_id
          729 
          730         self.send_message("funding_created",
          731             temporary_channel_id=temp_channel_id,
          732             funding_txid=funding_txid_bytes,
          733             funding_output_index=funding_index,
          734             signature=sig_64)
          735         self.funding_created_sent.add(channel_id)
          736 
          737         # <- funding signed
          738         payload = await self.wait_for_message('funding_signed', channel_id)
          739         self.logger.info('received funding_signed')
          740         remote_sig = payload['signature']
          741         chan.receive_new_commitment(remote_sig, [])
          742         chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
          743         chan.set_state(ChannelState.OPENING)
          744         self.lnworker.add_new_channel(chan)
          745         return chan, funding_tx
          746 
          747     def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints):
          748         chan_dict = {
          749             "node_id": self.pubkey.hex(),
          750             "channel_id": channel_id.hex(),
          751             "short_channel_id": None,
          752             "funding_outpoint": outpoint,
          753             "remote_config": remote_config,
          754             "local_config": local_config,
          755             "constraints": constraints,
          756             "remote_update": None,
          757             "state": ChannelState.PREOPENING.name,
          758             'onion_keys': {},
          759             'data_loss_protect_remote_pcp': {},
          760             "log": {},
          761             "revocation_store": {},
          762             "static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2
          763         }
          764         return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, [])
          765 
          766     async def on_open_channel(self, payload):
          767         """Implements the channel acceptance flow.
          768 
          769         <- open_channel message
          770         -> accept_channel message
          771         <- funding_created message
          772         -> funding_signed message
          773 
          774         Channel configurations are initialized in this method.
          775         """
          776         # <- open_channel
          777         if payload['chain_hash'] != constants.net.rev_genesis_bytes():
          778             raise Exception('wrong chain_hash')
          779         funding_sat = payload['funding_satoshis']
          780         push_msat = payload['push_msat']
          781         feerate = payload['feerate_per_kw']  # note: we are not validating this
          782         temp_chan_id = payload['temporary_channel_id']
          783         local_config = self.make_local_config(funding_sat, push_msat, REMOTE)
          784         if funding_sat > LN_MAX_FUNDING_SAT:
          785             raise Exception(
          786                 f"MUST set funding_satoshis to less than 2^24 satoshi. "
          787                 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
          788         if push_msat > 1000 * funding_sat:
          789             raise Exception(
          790                 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
          791                 f"{push_msat} msat > {1000 * funding_sat} msat")
          792         if funding_sat < lnutil.MIN_FUNDING_SAT:
          793             raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
          794 
          795         upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
          796             payload, 'open')
          797 
          798         remote_config = RemoteConfig(
          799             payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
          800             multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
          801             htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
          802             delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
          803             revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
          804             to_self_delay=payload['to_self_delay'],
          805             dust_limit_sat=payload['dust_limit_satoshis'],
          806             max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
          807             max_accepted_htlcs=payload['max_accepted_htlcs'],
          808             initial_msat=funding_sat * 1000 - push_msat,
          809             reserve_sat=payload['channel_reserve_satoshis'],
          810             htlc_minimum_msat=payload['htlc_minimum_msat'],
          811             next_per_commitment_point=payload['first_per_commitment_point'],
          812             current_per_commitment_point=None,
          813             upfront_shutdown_script=upfront_shutdown_script,
          814         )
          815 
          816         remote_config.validate_params(funding_sat=funding_sat)
          817         # The receiving node MUST fail the channel if:
          818         #     the funder's amount for the initial commitment transaction is not
          819         #     sufficient for full fee payment.
          820         if remote_config.initial_msat < calc_fees_for_commitment_tx(
          821                 num_htlcs=0,
          822                 feerate=feerate,
          823                 is_local_initiator=False)[REMOTE]:
          824             raise Exception(
          825                 "the funder's amount for the initial commitment transaction "
          826                 "is not sufficient for full fee payment")
          827         # The receiving node MUST fail the channel if:
          828         #     both to_local and to_remote amounts for the initial commitment transaction are
          829         #     less than or equal to channel_reserve_satoshis (see BOLT 3).
          830         if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']
          831                 and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']):
          832             raise Exception(
          833                 "both to_local and to_remote amounts for the initial commitment "
          834                 "transaction are less than or equal to channel_reserve_satoshis")
          835         # note: we ignore payload['channel_flags'],  which e.g. contains 'announce_channel'.
          836         #       Notably if the remote sets 'announce_channel' to True, we will ignore that too,
          837         #       but we will not play along with actually announcing the channel (so we keep it private).
          838 
          839         # -> accept channel
          840         # for the first commitment transaction
          841         per_commitment_secret_first = get_per_commitment_secret_from_seed(
          842             local_config.per_commitment_secret_seed,
          843             RevocationStore.START_INDEX
          844         )
          845         per_commitment_point_first = secret_to_pubkey(
          846             int.from_bytes(per_commitment_secret_first, 'big'))
          847         min_depth = 3
          848         self.send_message(
          849             'accept_channel',
          850             temporary_channel_id=temp_chan_id,
          851             dust_limit_satoshis=local_config.dust_limit_sat,
          852             max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
          853             channel_reserve_satoshis=local_config.reserve_sat,
          854             htlc_minimum_msat=local_config.htlc_minimum_msat,
          855             minimum_depth=min_depth,
          856             to_self_delay=local_config.to_self_delay,
          857             max_accepted_htlcs=local_config.max_accepted_htlcs,
          858             funding_pubkey=local_config.multisig_key.pubkey,
          859             revocation_basepoint=local_config.revocation_basepoint.pubkey,
          860             payment_basepoint=local_config.payment_basepoint.pubkey,
          861             delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
          862             htlc_basepoint=local_config.htlc_basepoint.pubkey,
          863             first_per_commitment_point=per_commitment_point_first,
          864             accept_channel_tlvs={
          865                 'upfront_shutdown_script':
          866                     {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
          867             }
          868         )
          869 
          870         # <- funding created
          871         funding_created = await self.wait_for_message('funding_created', temp_chan_id)
          872 
          873         # -> funding signed
          874         funding_idx = funding_created['funding_output_index']
          875         funding_txid = bh2u(funding_created['funding_txid'][::-1])
          876         channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
          877         constraints = ChannelConstraints(
          878             capacity=funding_sat,
          879             is_initiator=False,
          880             funding_txn_minimum_depth=min_depth
          881         )
          882         outpoint = Outpoint(funding_txid, funding_idx)
          883         chan_dict = self.create_channel_storage(
          884             channel_id, outpoint, local_config, remote_config, constraints)
          885         chan = Channel(
          886             chan_dict,
          887             sweep_address=self.lnworker.sweep_address,
          888             lnworker=self.lnworker,
          889             initial_feerate=feerate
          890         )
          891         chan.storage['init_timestamp'] = int(time.time())
          892         if isinstance(self.transport, LNTransport):
          893             chan.add_or_update_peer_addr(self.transport.peer_addr)
          894         remote_sig = funding_created['signature']
          895         chan.receive_new_commitment(remote_sig, [])
          896         sig_64, _ = chan.sign_next_commitment()
          897         self.send_message('funding_signed',
          898             channel_id=channel_id,
          899             signature=sig_64,
          900         )
          901         self.funding_signed_sent.add(chan.channel_id)
          902         chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
          903         chan.set_state(ChannelState.OPENING)
          904         self.lnworker.add_new_channel(chan)
          905 
          906     async def trigger_force_close(self, channel_id: bytes):
          907         await self.initialized
          908         latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
          909         self.send_message(
          910             "channel_reestablish",
          911             channel_id=channel_id,
          912             next_commitment_number=0,
          913             next_revocation_number=0,
          914             your_last_per_commitment_secret=0,
          915             my_current_per_commitment_point=latest_point)
          916 
          917     async def reestablish_channel(self, chan: Channel):
          918         await self.initialized
          919         chan_id = chan.channel_id
          920         assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
          921         if chan.peer_state != PeerState.DISCONNECTED:
          922             self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} '
          923                              f'already in peer_state {chan.peer_state!r}')
          924             return
          925         chan.peer_state = PeerState.REESTABLISHING
          926         util.trigger_callback('channel', self.lnworker.wallet, chan)
          927         # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
          928         chan.hm.discard_unsigned_remote_updates()
          929         # ctns
          930         oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
          931         latest_local_ctn = chan.get_latest_ctn(LOCAL)
          932         next_local_ctn = chan.get_next_ctn(LOCAL)
          933         oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
          934         latest_remote_ctn = chan.get_latest_ctn(REMOTE)
          935         next_remote_ctn = chan.get_next_ctn(REMOTE)
          936         assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
          937         # send message
          938         if chan.is_static_remotekey_enabled():
          939             latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
          940         else:
          941             latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn)
          942         if oldest_unrevoked_remote_ctn == 0:
          943             last_rev_secret = 0
          944         else:
          945             last_rev_index = oldest_unrevoked_remote_ctn - 1
          946             last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
          947         self.send_message(
          948             "channel_reestablish",
          949             channel_id=chan_id,
          950             next_commitment_number=next_local_ctn,
          951             next_revocation_number=oldest_unrevoked_remote_ctn,
          952             your_last_per_commitment_secret=last_rev_secret,
          953             my_current_per_commitment_point=latest_point)
          954         self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
          955                          f'(next_local_ctn={next_local_ctn}, '
          956                          f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
          957         while True:
          958             try:
          959                 msg = await self.wait_for_message('channel_reestablish', chan_id)
          960                 break
          961             except asyncio.TimeoutError:
          962                 self.logger.info('waiting to receive channel_reestablish...')
          963                 continue
          964         their_next_local_ctn = msg["next_commitment_number"]
          965         their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
          966         their_local_pcp = msg.get("my_current_per_commitment_point")
          967         their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
          968         self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
          969                          f'(their_next_local_ctn={their_next_local_ctn}, '
          970                          f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
          971         # sanity checks of received values
          972         if their_next_local_ctn < 0:
          973             raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
          974         if their_oldest_unrevoked_remote_ctn < 0:
          975             raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
          976         # Replay un-acked local updates (including commitment_signed) byte-for-byte.
          977         # If we have sent them a commitment signature that they "lost" (due to disconnect),
          978         # we need to make sure we replay the same local updates, as otherwise they could
          979         # end up with two (or more) signed valid commitment transactions at the same ctn.
          980         # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
          981         # e.g. for watchtowers, hence we must ensure these ctxs coincide.
          982         # We replay the local updates even if they were not yet committed.
          983         unacked = chan.hm.get_unacked_local_updates()
          984         n_replayed_msgs = 0
          985         for ctn, messages in unacked.items():
          986             if ctn < their_next_local_ctn:
          987                 # They claim to have received these messages and the corresponding
          988                 # commitment_signed, hence we must not replay them.
          989                 continue
          990             for raw_upd_msg in messages:
          991                 self.transport.send_bytes(raw_upd_msg)
          992                 n_replayed_msgs += 1
          993         self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages')
          994 
          995         we_are_ahead = False
          996         they_are_ahead = False
          997         # compare remote ctns
          998         if next_remote_ctn != their_next_local_ctn:
          999             if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
         1000                 # We replayed the local updates (see above), which should have contained a commitment_signed
         1001                 # (due to is_revack_pending being true), and this should have remedied this situation.
         1002                 pass
         1003             else:
         1004                 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
         1005                                     f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
         1006                 if their_next_local_ctn < next_remote_ctn:
         1007                     we_are_ahead = True
         1008                 else:
         1009                     they_are_ahead = True
         1010         # compare local ctns
         1011         if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
         1012             if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
         1013                 # A node:
         1014                 #    if next_revocation_number is equal to the commitment number of the last revoke_and_ack
         1015                 #    the receiving node sent, AND the receiving node hasn't already received a closing_signed:
         1016                 #        MUST re-send the revoke_and_ack.
         1017                 last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
         1018                 next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
         1019                 self.send_message(
         1020                     "revoke_and_ack",
         1021                     channel_id=chan.channel_id,
         1022                     per_commitment_secret=last_secret,
         1023                     next_per_commitment_point=next_point)
         1024             else:
         1025                 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
         1026                                     f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
         1027                 if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
         1028                     we_are_ahead = True
         1029                 else:
         1030                     they_are_ahead = True
         1031         # option_data_loss_protect
         1032         def are_datalossprotect_fields_valid() -> bool:
         1033             if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
         1034                 return False
         1035             if their_oldest_unrevoked_remote_ctn > 0:
         1036                 our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
         1037             else:
         1038                 assert their_oldest_unrevoked_remote_ctn == 0
         1039                 our_pcs = bytes(32)
         1040             if our_pcs != their_claim_of_our_last_per_commitment_secret:
         1041                 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
         1042                                   f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}")
         1043                 return False
         1044             if chan.is_static_remotekey_enabled():
         1045                 return True
         1046             try:
         1047                 __, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1)
         1048             except RemoteCtnTooFarInFuture:
         1049                 pass
         1050             else:
         1051                 if our_remote_pcp != their_local_pcp:
         1052                     self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
         1053                                       f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}")
         1054                     return False
         1055             return True
         1056 
         1057         if not are_datalossprotect_fields_valid():
         1058             raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
         1059 
         1060         if they_are_ahead:
         1061             self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
         1062                                 f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}")
         1063             # data_loss_protect_remote_pcp is used in lnsweep
         1064             chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
         1065             self.lnworker.save_channel(chan)
         1066             chan.peer_state = PeerState.BAD
         1067             return
         1068         elif we_are_ahead:
         1069             self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
         1070             await self.lnworker.try_force_closing(chan_id)
         1071             return
         1072 
         1073         chan.peer_state = PeerState.GOOD
         1074         if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1:
         1075             self.send_funding_locked(chan)
         1076         # checks done
         1077         if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
         1078             self.mark_open(chan)
         1079         util.trigger_callback('channel', self.lnworker.wallet, chan)
         1080         # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
         1081         if chan.get_state() == ChannelState.SHUTDOWN:
         1082             await self.send_shutdown(chan)
         1083 
         1084     def send_funding_locked(self, chan: Channel):
         1085         channel_id = chan.channel_id
         1086         per_commitment_secret_index = RevocationStore.START_INDEX - 1
         1087         per_commitment_point_second = secret_to_pubkey(int.from_bytes(
         1088             get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
         1089         # note: if funding_locked was not yet received, we might send it multiple times
         1090         self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
         1091         if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
         1092             self.mark_open(chan)
         1093 
         1094     def on_funding_locked(self, chan: Channel, payload):
         1095         self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}")
         1096         if not chan.config[LOCAL].funding_locked_received:
         1097             their_next_point = payload["next_per_commitment_point"]
         1098             chan.config[REMOTE].next_per_commitment_point = their_next_point
         1099             chan.config[LOCAL].funding_locked_received = True
         1100             self.lnworker.save_channel(chan)
         1101         if chan.is_funded():
         1102             self.mark_open(chan)
         1103 
         1104     def on_network_update(self, chan: Channel, funding_tx_depth: int):
         1105         """
         1106         Only called when the channel is OPEN.
         1107 
         1108         Runs on the Network thread.
         1109         """
         1110         if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6:
         1111             # don't announce our channels
         1112             # FIXME should this be a field in chan.local_state maybe?
         1113             return
         1114             chan.config[LOCAL].was_announced = True
         1115             self.lnworker.save_channel(chan)
         1116             coro = self.handle_announcements(chan)
         1117             asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
         1118 
         1119     @log_exceptions
         1120     async def handle_announcements(self, chan: Channel):
         1121         h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
         1122         announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get()
         1123         remote_node_sig = announcement_signatures_msg["node_signature"]
         1124         remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"]
         1125         if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h):
         1126             raise Exception("bitcoin_sig invalid in announcement_signatures")
         1127         if not ecc.verify_signature(self.pubkey, remote_node_sig, h):
         1128             raise Exception("node_sig invalid in announcement_signatures")
         1129 
         1130         node_sigs = [remote_node_sig, local_node_sig]
         1131         bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig]
         1132         bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey]
         1133 
         1134         if self.node_ids[0] > self.node_ids[1]:
         1135             node_sigs.reverse()
         1136             bitcoin_sigs.reverse()
         1137             node_ids = list(reversed(self.node_ids))
         1138             bitcoin_keys.reverse()
         1139         else:
         1140             node_ids = self.node_ids
         1141 
         1142         self.send_message("channel_announcement",
         1143             node_signatures_1=node_sigs[0],
         1144             node_signatures_2=node_sigs[1],
         1145             bitcoin_signature_1=bitcoin_sigs[0],
         1146             bitcoin_signature_2=bitcoin_sigs[1],
         1147             len=0,
         1148             #features not set (defaults to zeros)
         1149             chain_hash=constants.net.rev_genesis_bytes(),
         1150             short_channel_id=chan.short_channel_id,
         1151             node_id_1=node_ids[0],
         1152             node_id_2=node_ids[1],
         1153             bitcoin_key_1=bitcoin_keys[0],
         1154             bitcoin_key_2=bitcoin_keys[1]
         1155         )
         1156 
         1157     def mark_open(self, chan: Channel):
         1158         assert chan.is_funded()
         1159         # only allow state transition from "FUNDED" to "OPEN"
         1160         old_state = chan.get_state()
         1161         if old_state == ChannelState.OPEN:
         1162             return
         1163         if old_state != ChannelState.FUNDED:
         1164             self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
         1165             return
         1166         assert chan.config[LOCAL].funding_locked_received
         1167         chan.set_state(ChannelState.OPEN)
         1168         util.trigger_callback('channel', self.lnworker.wallet, chan)
         1169         # peer may have sent us a channel update for the incoming direction previously
         1170         pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
         1171         if pending_channel_update:
         1172             chan.set_remote_update(pending_channel_update['raw'])
         1173         self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
         1174         forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
         1175         if forwarding_enabled:
         1176             # send channel_update of outgoing edge to peer,
         1177             # so that channel can be used to to receive payments
         1178             self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
         1179             chan_upd = chan.get_outgoing_gossip_channel_update()
         1180             self.transport.send_bytes(chan_upd)
         1181 
         1182     def send_announcement_signatures(self, chan: Channel):
         1183         chan_ann = chan.construct_channel_announcement_without_sigs()
         1184         preimage = chan_ann[256+2:]
         1185         msg_hash = sha256d(preimage)
         1186         bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s)
         1187         node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s)
         1188         self.send_message("announcement_signatures",
         1189             channel_id=chan.channel_id,
         1190             short_channel_id=chan.short_channel_id,
         1191             node_signature=node_signature,
         1192             bitcoin_signature=bitcoin_signature
         1193         )
         1194         return msg_hash, node_signature, bitcoin_signature
         1195 
         1196     def on_update_fail_htlc(self, chan: Channel, payload):
         1197         htlc_id = payload["id"]
         1198         reason = payload["reason"]
         1199         self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
         1200         chan.receive_fail_htlc(htlc_id, error_bytes=reason)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
         1201         self.maybe_send_commitment(chan)
         1202 
         1203     def maybe_send_commitment(self, chan: Channel):
         1204         # REMOTE should revoke first before we can sign a new ctx
         1205         if chan.hm.is_revack_pending(REMOTE):
         1206             return
         1207         # if there are no changes, we will not (and must not) send a new commitment
         1208         if not chan.has_pending_changes(REMOTE):
         1209             return
         1210         self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
         1211         sig_64, htlc_sigs = chan.sign_next_commitment()
         1212         self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
         1213 
         1214     def pay(self, *,
         1215             route: 'LNPaymentRoute',
         1216             chan: Channel,
         1217             amount_msat: int,
         1218             total_msat: int,
         1219             payment_hash: bytes,
         1220             min_final_cltv_expiry: int,
         1221             payment_secret: bytes = None,
         1222             trampoline_onion=None) -> UpdateAddHtlc:
         1223 
         1224         assert amount_msat > 0, "amount_msat is not greater zero"
         1225         assert len(route) > 0
         1226         if not chan.can_send_update_add_htlc():
         1227             raise PaymentFailure("Channel cannot send update_add_htlc")
         1228         # add features learned during "init" for direct neighbour:
         1229         route[0].node_features |= self.features
         1230         local_height = self.network.get_local_height()
         1231         final_cltv = local_height + min_final_cltv_expiry
         1232         hops_data, amount_msat, cltv = calc_hops_data_for_payment(
         1233             route,
         1234             amount_msat,
         1235             final_cltv,
         1236             total_msat=total_msat,
         1237             payment_secret=payment_secret)
         1238         num_hops = len(hops_data)
         1239         self.logger.info(f"lnpeer.pay len(route)={len(route)}")
         1240         for i in range(len(route)):
         1241             self.logger.info(f"  {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
         1242         assert final_cltv <= cltv, (final_cltv, cltv)
         1243         session_key = os.urandom(32) # session_key
         1244         # if we are forwarding a trampoline payment, add trampoline onion
         1245         if trampoline_onion:
         1246             self.logger.info(f'adding trampoline onion to final payload')
         1247             trampoline_payload = hops_data[num_hops-2].payload
         1248             trampoline_payload["trampoline_onion_packet"] = {
         1249                 "version": trampoline_onion.version,
         1250                 "public_key": trampoline_onion.public_key,
         1251                 "hops_data": trampoline_onion.hops_data,
         1252                 "hmac": trampoline_onion.hmac
         1253             }
         1254         # create onion packet
         1255         payment_path_pubkeys = [x.node_id for x in route]
         1256         onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
         1257         self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
         1258         # create htlc
         1259         if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
         1260             raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)")
         1261         htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
         1262         htlc = chan.add_htlc(htlc)
         1263         chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
         1264         self.logger.info(f"starting payment. htlc: {htlc}")
         1265         self.send_message(
         1266             "update_add_htlc",
         1267             channel_id=chan.channel_id,
         1268             id=htlc.htlc_id,
         1269             cltv_expiry=htlc.cltv_expiry,
         1270             amount_msat=htlc.amount_msat,
         1271             payment_hash=htlc.payment_hash,
         1272             onion_routing_packet=onion.to_bytes())
         1273         self.maybe_send_commitment(chan)
         1274         return htlc
         1275 
         1276     def send_revoke_and_ack(self, chan: Channel):
         1277         self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
         1278         rev = chan.revoke_current_commitment()
         1279         self.lnworker.save_channel(chan)
         1280         self.send_message("revoke_and_ack",
         1281             channel_id=chan.channel_id,
         1282             per_commitment_secret=rev.per_commitment_secret,
         1283             next_per_commitment_point=rev.next_per_commitment_point)
         1284         self.maybe_send_commitment(chan)
         1285 
         1286     def on_commitment_signed(self, chan: Channel, payload):
         1287         if chan.peer_state == PeerState.BAD:
         1288             return
         1289         self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
         1290         # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
         1291         if not chan.has_pending_changes(LOCAL):
         1292             # TODO if feerate changed A->B->A; so there were updates but the value is identical,
         1293             #      then it might be legal to send a commitment_signature
         1294             #      see https://github.com/lightningnetwork/lightning-rfc/pull/618
         1295             raise RemoteMisbehaving('received commitment_signed without pending changes')
         1296         # REMOTE should wait until we have revoked
         1297         if chan.hm.is_revack_pending(LOCAL):
         1298             raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
         1299         data = payload["htlc_signature"]
         1300         htlc_sigs = list(chunks(data, 64))
         1301         chan.receive_new_commitment(payload["signature"], htlc_sigs)
         1302         self.send_revoke_and_ack(chan)
         1303 
         1304     def on_update_fulfill_htlc(self, chan: Channel, payload):
         1305         preimage = payload["payment_preimage"]
         1306         payment_hash = sha256(preimage)
         1307         htlc_id = payload["id"]
         1308         self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
         1309         chan.receive_htlc_settle(preimage, htlc_id)  # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
         1310         self.lnworker.save_preimage(payment_hash, preimage)
         1311         self.maybe_send_commitment(chan)
         1312 
         1313     def on_update_fail_malformed_htlc(self, chan: Channel, payload):
         1314         htlc_id = payload["id"]
         1315         failure_code = payload["failure_code"]
         1316         self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
         1317                          f"htlc_id {htlc_id}. failure_code={failure_code}")
         1318         if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
         1319             asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
         1320             raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
         1321         reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
         1322         chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
         1323         self.maybe_send_commitment(chan)
         1324 
         1325     def on_update_add_htlc(self, chan: Channel, payload):
         1326         payment_hash = payload["payment_hash"]
         1327         htlc_id = payload["id"]
         1328         cltv_expiry = payload["cltv_expiry"]
         1329         amount_msat_htlc = payload["amount_msat"]
         1330         onion_packet = payload["onion_routing_packet"]
         1331         htlc = UpdateAddHtlc(
         1332             amount_msat=amount_msat_htlc,
         1333             payment_hash=payment_hash,
         1334             cltv_expiry=cltv_expiry,
         1335             timestamp=int(time.time()),
         1336             htlc_id=htlc_id)
         1337         self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
         1338         if chan.get_state() != ChannelState.OPEN:
         1339             raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
         1340         if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
         1341             asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
         1342             raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}")
         1343         # add htlc
         1344         chan.receive_htlc(htlc, onion_packet)
         1345         util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
         1346 
         1347     def maybe_forward_htlc(
         1348             self, *,
         1349             htlc: UpdateAddHtlc,
         1350             processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]:
         1351 
         1352         # Forward HTLC
         1353         # FIXME: there are critical safety checks MISSING here
         1354         #        - for example; atm we forward first and then persist "forwarding_info",
         1355         #          so if we segfault in-between and restart, we might forward an HTLC twice...
         1356         #          (same for trampoline forwarding)
         1357         forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
         1358         if not forwarding_enabled:
         1359             self.logger.info(f"forwarding is disabled. failing htlc.")
         1360             raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
         1361         chain = self.network.blockchain()
         1362         if chain.is_tip_stale():
         1363             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
         1364         try:
         1365             next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]
         1366         except:
         1367             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
         1368         next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
         1369         local_height = chain.height()
         1370         if next_chan is None:
         1371             self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}")
         1372             raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
         1373         outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
         1374         outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
         1375         if not next_chan.can_send_update_add_htlc():
         1376             self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. "
         1377                              f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
         1378             data = outgoing_chan_upd_len + outgoing_chan_upd
         1379             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
         1380         try:
         1381             next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
         1382         except:
         1383             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
         1384         if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta:
         1385             data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
         1386             raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
         1387         if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \
         1388                 or next_cltv_expiry <= local_height:
         1389             data = outgoing_chan_upd_len + outgoing_chan_upd
         1390             raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=data)
         1391         if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
         1392             raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
         1393         try:
         1394             next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
         1395         except:
         1396             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
         1397         forwarding_fees = fee_for_edge_msat(
         1398             forwarded_amount_msat=next_amount_msat_htlc,
         1399             fee_base_msat=next_chan.forwarding_fee_base_msat,
         1400             fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
         1401         if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
         1402             data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
         1403             raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
         1404         self.logger.info(f'forwarding htlc to {next_chan.node_id}')
         1405         next_htlc = UpdateAddHtlc(
         1406             amount_msat=next_amount_msat_htlc,
         1407             payment_hash=htlc.payment_hash,
         1408             cltv_expiry=next_cltv_expiry,
         1409             timestamp=int(time.time()))
         1410         next_htlc = next_chan.add_htlc(next_htlc)
         1411         next_peer = self.lnworker.peers[next_chan.node_id]
         1412         try:
         1413             next_peer.send_message(
         1414                 "update_add_htlc",
         1415                 channel_id=next_chan.channel_id,
         1416                 id=next_htlc.htlc_id,
         1417                 cltv_expiry=next_cltv_expiry,
         1418                 amount_msat=next_amount_msat_htlc,
         1419                 payment_hash=next_htlc.payment_hash,
         1420                 onion_routing_packet=processed_onion.next_packet.to_bytes()
         1421             )
         1422         except BaseException as e:
         1423             self.logger.info(f"failed to forward htlc: error sending message. {e}")
         1424             data = outgoing_chan_upd_len + outgoing_chan_upd
         1425             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
         1426         return next_chan_scid, next_htlc.htlc_id
         1427 
         1428     def maybe_forward_trampoline(
         1429             self, *,
         1430             chan: Channel,
         1431             htlc: UpdateAddHtlc,
         1432             trampoline_onion: ProcessedOnionPacket):
         1433 
         1434         payload = trampoline_onion.hop_data.payload
         1435         payment_hash = htlc.payment_hash
         1436         payment_secret = os.urandom(32)
         1437         try:
         1438             outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
         1439             amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
         1440             cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
         1441             if "invoice_features" in payload:
         1442                 self.logger.info('forward_trampoline: legacy')
         1443                 next_trampoline_onion = None
         1444                 invoice_features = payload["invoice_features"]["invoice_features"]
         1445                 invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
         1446             else:
         1447                 self.logger.info('forward_trampoline: end-to-end')
         1448                 invoice_features = LnFeatures.BASIC_MPP_OPT
         1449                 next_trampoline_onion = trampoline_onion.next_packet
         1450         except Exception as e:
         1451             self.logger.exception('')
         1452             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
         1453 
         1454         # these are the fee/cltv paid by the sender
         1455         # pay_to_node will raise if they are not sufficient
         1456         trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion
         1457         trampoline_fee = htlc.amount_msat - amt_to_forward
         1458 
         1459         @log_exceptions
         1460         async def forward_trampoline_payment():
         1461             try:
         1462                 await self.lnworker.pay_to_node(
         1463                     node_pubkey=outgoing_node_id,
         1464                     payment_hash=payment_hash,
         1465                     payment_secret=payment_secret,
         1466                     amount_to_pay=amt_to_forward,
         1467                     min_cltv_expiry=cltv_from_onion,
         1468                     r_tags=[],
         1469                     invoice_features=invoice_features,
         1470                     fwd_trampoline_onion=next_trampoline_onion,
         1471                     fwd_trampoline_fee=trampoline_fee,
         1472                     fwd_trampoline_cltv_delta=trampoline_cltv_delta,
         1473                     attempts=1)
         1474             except OnionRoutingFailure as e:
         1475                 # FIXME: cannot use payment_hash as key
         1476                 self.lnworker.trampoline_forwarding_failures[payment_hash] = e
         1477             except PaymentFailure as e:
         1478                 # FIXME: adapt the error code
         1479                 error_reason = OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
         1480                 self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason
         1481 
         1482         asyncio.ensure_future(forward_trampoline_payment())
         1483 
         1484     def maybe_fulfill_htlc(
         1485             self, *,
         1486             chan: Channel,
         1487             htlc: UpdateAddHtlc,
         1488             processed_onion: ProcessedOnionPacket,
         1489             is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[OnionPacket]]:
         1490 
         1491         """As a final recipient of an HTLC, decide if we should fulfill it.
         1492         Return (preimage, trampoline_onion_packet) with at most a single element not None
         1493         """
         1494         def log_fail_reason(reason: str):
         1495             self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
         1496                              f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
         1497 
         1498         try:
         1499             amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
         1500         except:
         1501             log_fail_reason(f"'amt_to_forward' missing from onion")
         1502             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
         1503 
         1504         # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
         1505         # We should not release the preimage for an HTLC that its sender could already time out as
         1506         # then they might try to force-close and it becomes a race.
         1507         chain = self.network.blockchain()
         1508         if chain.is_tip_stale():
         1509             log_fail_reason(f"our chain tip is stale")
         1510             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
         1511         local_height = chain.height()
         1512         exc_incorrect_or_unknown_pd = OnionRoutingFailure(
         1513             code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
         1514             data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big"))
         1515         if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry:
         1516             log_fail_reason(f"htlc.cltv_expiry is unreasonably close")
         1517             raise exc_incorrect_or_unknown_pd
         1518         try:
         1519             cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
         1520         except:
         1521             log_fail_reason(f"'outgoing_cltv_value' missing from onion")
         1522             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
         1523 
         1524         if not is_trampoline:
         1525             if cltv_from_onion != htlc.cltv_expiry:
         1526                 log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry")
         1527                 raise OnionRoutingFailure(
         1528                     code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
         1529                     data=htlc.cltv_expiry.to_bytes(4, byteorder="big"))
         1530         try:
         1531             total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
         1532         except:
         1533             total_msat = amt_to_forward # fall back to "amt_to_forward"
         1534 
         1535         if not is_trampoline and amt_to_forward != htlc.amount_msat:
         1536             log_fail_reason(f"amt_to_forward != htlc.amount_msat")
         1537             raise OnionRoutingFailure(
         1538                 code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
         1539                 data=htlc.amount_msat.to_bytes(8, byteorder="big"))
         1540 
         1541         try:
         1542             payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]
         1543         except:
         1544             if total_msat > amt_to_forward:
         1545                 # payment_secret is required for MPP
         1546                 log_fail_reason(f"'payment_secret' missing from onion")
         1547                 raise exc_incorrect_or_unknown_pd
         1548             # TODO fail here if invoice has set PAYMENT_SECRET_REQ
         1549             payment_secret_from_onion = None
         1550 
         1551         if total_msat > amt_to_forward:
         1552             mpp_status = self.lnworker.check_received_mpp_htlc(payment_secret_from_onion, chan.short_channel_id, htlc, total_msat)
         1553             if mpp_status is None:
         1554                 return None, None
         1555             if mpp_status is False:
         1556                 log_fail_reason(f"MPP_TIMEOUT")
         1557                 raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
         1558             assert mpp_status is True
         1559 
         1560         # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
         1561         if processed_onion.trampoline_onion_packet:
         1562             # TODO: we should check that all trampoline_onions are the same
         1563             return None, processed_onion.trampoline_onion_packet
         1564 
         1565         info = self.lnworker.get_payment_info(htlc.payment_hash)
         1566         if info is None:
         1567             log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
         1568             raise exc_incorrect_or_unknown_pd
         1569         preimage = self.lnworker.get_preimage(htlc.payment_hash)
         1570         if payment_secret_from_onion:
         1571             if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage):
         1572                 log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}')
         1573                 raise exc_incorrect_or_unknown_pd
         1574         invoice_msat = info.amount_msat
         1575         if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
         1576             log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
         1577             raise exc_incorrect_or_unknown_pd
         1578         self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
         1579         self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
         1580         return preimage, None
         1581 
         1582     def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
         1583         self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
         1584         assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
         1585         assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
         1586         self.received_htlcs_pending_removal.add((chan, htlc_id))
         1587         chan.settle_htlc(preimage, htlc_id)
         1588         self.send_message(
         1589             "update_fulfill_htlc",
         1590             channel_id=chan.channel_id,
         1591             id=htlc_id,
         1592             payment_preimage=preimage)
         1593 
         1594     def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
         1595         self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
         1596         assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
         1597         self.received_htlcs_pending_removal.add((chan, htlc_id))
         1598         chan.fail_htlc(htlc_id)
         1599         self.send_message(
         1600             "update_fail_htlc",
         1601             channel_id=chan.channel_id,
         1602             id=htlc_id,
         1603             len=len(error_bytes),
         1604             reason=error_bytes)
         1605 
         1606     def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
         1607         self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
         1608         assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
         1609         if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
         1610             raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
         1611         self.received_htlcs_pending_removal.add((chan, htlc_id))
         1612         chan.fail_htlc(htlc_id)
         1613         self.send_message(
         1614             "update_fail_malformed_htlc",
         1615             channel_id=chan.channel_id,
         1616             id=htlc_id,
         1617             sha256_of_onion=reason.data,
         1618             failure_code=reason.code)
         1619 
         1620     def on_revoke_and_ack(self, chan: Channel, payload):
         1621         if chan.peer_state == PeerState.BAD:
         1622             return
         1623         self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
         1624         rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
         1625         chan.receive_revocation(rev)
         1626         self.lnworker.save_channel(chan)
         1627         self.maybe_send_commitment(chan)
         1628 
         1629     def on_update_fee(self, chan: Channel, payload):
         1630         feerate = payload["feerate_per_kw"]
         1631         chan.update_fee(feerate, False)
         1632 
         1633     async def maybe_update_fee(self, chan: Channel):
         1634         """
         1635         called when our fee estimates change
         1636         """
         1637         if not chan.can_send_ctx_updates():
         1638             return
         1639         feerate_per_kw = self.lnworker.current_feerate_per_kw()
         1640         if not chan.constraints.is_initiator:
         1641             if constants.net is not constants.BitcoinRegtest:
         1642                 chan_feerate = chan.get_latest_feerate(LOCAL)
         1643                 ratio = chan_feerate / feerate_per_kw
         1644                 if ratio < 0.5:
         1645                     # Note that we trust the Electrum server about fee rates
         1646                     # Thus, automated force-closing might not be a good idea
         1647                     # Maybe we should display something in the GUI instead
         1648                     self.logger.warning(
         1649                         f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
         1650                         f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!")
         1651             return
         1652         chan_fee = chan.get_next_feerate(REMOTE)
         1653         if feerate_per_kw < chan_fee / 2:
         1654             self.logger.info("FEES HAVE FALLEN")
         1655         elif feerate_per_kw > chan_fee * 2:
         1656             self.logger.info("FEES HAVE RISEN")
         1657         else:
         1658             return
         1659         self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. "
         1660                          f"new feerate {feerate_per_kw}")
         1661         chan.update_fee(feerate_per_kw, True)
         1662         self.send_message(
         1663             "update_fee",
         1664             channel_id=chan.channel_id,
         1665             feerate_per_kw=feerate_per_kw)
         1666         self.maybe_send_commitment(chan)
         1667 
         1668     @log_exceptions
         1669     async def close_channel(self, chan_id: bytes):
         1670         chan = self.channels[chan_id]
         1671         self.shutdown_received[chan_id] = asyncio.Future()
         1672         await self.send_shutdown(chan)
         1673         payload = await self.shutdown_received[chan_id]
         1674         txid = await self._shutdown(chan, payload, is_local=True)
         1675         self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
         1676         return txid
         1677 
         1678     async def on_shutdown(self, chan: Channel, payload):
         1679         their_scriptpubkey = payload['scriptpubkey']
         1680         their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
         1681 
         1682         # BOLT-02 check if they use the upfront shutdown script they advertized
         1683         if their_upfront_scriptpubkey:
         1684             if not (their_scriptpubkey == their_upfront_scriptpubkey):
         1685                 raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening")
         1686 
         1687         # BOLT-02 restrict the scriptpubkey to some templates:
         1688         if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0)
         1689                 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH)
         1690                 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)):
         1691             raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}')
         1692         chan_id = chan.channel_id
         1693         if chan_id in self.shutdown_received:
         1694             self.shutdown_received[chan_id].set_result(payload)
         1695         else:
         1696             chan = self.channels[chan_id]
         1697             await self.send_shutdown(chan)
         1698             txid = await self._shutdown(chan, payload, is_local=False)
         1699             self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
         1700 
         1701     def can_send_shutdown(self, chan: Channel):
         1702         if chan.get_state() >= ChannelState.OPENING:
         1703             return True
         1704         if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
         1705             return True
         1706         if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
         1707             return True
         1708         return False
         1709 
         1710     async def send_shutdown(self, chan: Channel):
         1711         if not self.can_send_shutdown(chan):
         1712             raise Exception('cannot send shutdown')
         1713 
         1714         if chan.config[LOCAL].upfront_shutdown_script:
         1715             scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
         1716         else:
         1717             scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
         1718         assert scriptpubkey
         1719 
         1720         # wait until no more pending updates (bolt2)
         1721         chan.set_can_send_ctx_updates(False)
         1722         while chan.has_pending_changes(REMOTE):
         1723             await asyncio.sleep(0.1)
         1724         self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
         1725         chan.set_state(ChannelState.SHUTDOWN)
         1726         # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN
         1727         chan.set_can_send_ctx_updates(True)
         1728 
         1729     @log_exceptions
         1730     async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
         1731         # wait until no HTLCs remain in either commitment transaction
         1732         while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0:
         1733             self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
         1734             await asyncio.sleep(1)
         1735         # if no HTLCs remain, we must not send updates
         1736         chan.set_can_send_ctx_updates(False)
         1737         their_scriptpubkey = payload['scriptpubkey']
         1738         if chan.config[LOCAL].upfront_shutdown_script:
         1739             our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
         1740         else:
         1741             our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
         1742         assert our_scriptpubkey
         1743 
         1744         # estimate fee of closing tx
         1745         our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
         1746         fee_rate = self.network.config.fee_per_kb()
         1747         our_fee = fee_rate * closing_tx.estimated_size() // 1000
         1748         # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
         1749         max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
         1750         our_fee = min(our_fee, max_fee)
         1751         drop_remote = False
         1752         def send_closing_signed():
         1753             our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
         1754             self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
         1755         def verify_signature(tx, sig):
         1756             their_pubkey = chan.config[REMOTE].multisig_key.pubkey
         1757             preimage_hex = tx.serialize_preimage(0)
         1758             pre_hash = sha256d(bfh(preimage_hex))
         1759             return ecc.verify_signature(their_pubkey, sig, pre_hash)
         1760         # the funder sends the first 'closing_signed' message
         1761         if chan.constraints.is_initiator:
         1762             send_closing_signed()
         1763         # negotiate fee
         1764         while True:
         1765             # FIXME: the remote SHOULD send closing_signed, but some don't.
         1766             cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
         1767             their_fee = cs_payload['fee_satoshis']
         1768             if their_fee > max_fee:
         1769                 raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}')
         1770             their_sig = cs_payload['signature']
         1771             # verify their sig: they might have dropped their output
         1772             our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
         1773             if verify_signature(closing_tx, their_sig):
         1774                 drop_remote = False
         1775             else:
         1776                 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
         1777                 if verify_signature(closing_tx, their_sig):
         1778                     drop_remote = True
         1779                 else:
         1780                     raise Exception('failed to verify their signature')
         1781             # Agree if difference is lower or equal to one (see below)
         1782             if abs(our_fee - their_fee) < 2:
         1783                 our_fee = their_fee
         1784                 break
         1785             # this will be "strictly between" (as in BOLT2) previous values because of the above
         1786             our_fee = (our_fee + their_fee) // 2
         1787             # another round
         1788             send_closing_signed()
         1789         # the non-funder replies
         1790         if not chan.constraints.is_initiator:
         1791             send_closing_signed()
         1792         # add signatures
         1793         closing_tx.add_signature_to_txin(
         1794             txin_idx=0,
         1795             signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(),
         1796             sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01'))
         1797         closing_tx.add_signature_to_txin(
         1798             txin_idx=0,
         1799             signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(),
         1800             sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01'))
         1801         # save local transaction and set state
         1802         try:
         1803             self.lnworker.wallet.add_transaction(closing_tx)
         1804         except UnrelatedTransactionException:
         1805             pass  # this can happen if (~all the balance goes to REMOTE)
         1806         chan.set_state(ChannelState.CLOSING)
         1807         # broadcast
         1808         await self.network.try_broadcasting(closing_tx, 'closing')
         1809         return closing_tx.txid()
         1810 
         1811     async def htlc_switch(self):
         1812         await self.initialized
         1813         while True:
         1814             self._htlc_switch_iterdone_event.set()
         1815             self._htlc_switch_iterdone_event.clear()
         1816             await asyncio.sleep(0.1)  # TODO maybe make this partly event-driven
         1817             self._htlc_switch_iterstart_event.set()
         1818             self._htlc_switch_iterstart_event.clear()
         1819             self.ping_if_required()
         1820             self._maybe_cleanup_received_htlcs_pending_removal()
         1821             for chan_id, chan in self.channels.items():
         1822                 if not chan.can_send_ctx_updates():
         1823                     continue
         1824                 self.maybe_send_commitment(chan)
         1825                 done = set()
         1826                 unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
         1827                 for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items():
         1828                     if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
         1829                         continue
         1830                     htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
         1831                     error_reason = None  # type: Optional[OnionRoutingFailure]
         1832                     error_bytes = None  # type: Optional[bytes]
         1833                     preimage = None
         1834                     fw_info = None
         1835                     onion_packet_bytes = bytes.fromhex(onion_packet_hex)
         1836                     onion_packet = None
         1837                     try:
         1838                         onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
         1839                     except OnionRoutingFailure as e:
         1840                         error_reason = e
         1841                     else:
         1842                         try:
         1843                             preimage, fw_info, error_bytes = await self.process_unfulfilled_htlc(
         1844                                 chan=chan,
         1845                                 htlc=htlc,
         1846                                 forwarding_info=forwarding_info,
         1847                                 onion_packet_bytes=onion_packet_bytes,
         1848                                 onion_packet=onion_packet)
         1849                         except OnionRoutingFailure as e:
         1850                             error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey)
         1851                     if fw_info:
         1852                         unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info
         1853                     elif preimage or error_reason or error_bytes:
         1854                         if preimage:
         1855                             await self.lnworker.enable_htlc_settle.wait()
         1856                             self.fulfill_htlc(chan, htlc.htlc_id, preimage)
         1857                         elif error_bytes:
         1858                             self.fail_htlc(
         1859                                 chan=chan,
         1860                                 htlc_id=htlc.htlc_id,
         1861                                 error_bytes=error_bytes)
         1862                         else:
         1863                             self.fail_malformed_htlc(
         1864                                 chan=chan,
         1865                                 htlc_id=htlc.htlc_id,
         1866                                 reason=error_reason)
         1867                         done.add(htlc_id)
         1868                 # cleanup
         1869                 for htlc_id in done:
         1870                     unfulfilled.pop(htlc_id)
         1871 
         1872     def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
         1873         done = set()
         1874         for chan, htlc_id in self.received_htlcs_pending_removal:
         1875             if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
         1876                 done.add((chan, htlc_id))
         1877         if done:
         1878             for key in done:
         1879                 self.received_htlcs_pending_removal.remove(key)
         1880             self.received_htlc_removed_event.set()
         1881             self.received_htlc_removed_event.clear()
         1882 
         1883     async def wait_one_htlc_switch_iteration(self) -> None:
         1884         """Waits until the HTLC switch does a full iteration or the peer disconnects,
         1885         whichever happens first.
         1886         """
         1887         async def htlc_switch_iteration():
         1888             await self._htlc_switch_iterstart_event.wait()
         1889             await self._htlc_switch_iterdone_event.wait()
         1890 
         1891         async with TaskGroup(wait=any) as group:
         1892             await group.spawn(htlc_switch_iteration())
         1893             await group.spawn(self.got_disconnected.wait())
         1894 
         1895     async def process_unfulfilled_htlc(
         1896             self, *,
         1897             chan: Channel,
         1898             htlc: UpdateAddHtlc,
         1899             forwarding_info: Tuple[str, int],
         1900             onion_packet_bytes: bytes,
         1901             onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]:
         1902         """
         1903         return (preimage, fw_info, error_bytes) with at most a single element that is not None
         1904         raise an OnionRoutingFailure if we need to fail the htlc
         1905         """
         1906         payment_hash = htlc.payment_hash
         1907         processed_onion = self.process_onion_packet(
         1908             onion_packet,
         1909             payment_hash=payment_hash,
         1910             onion_packet_bytes=onion_packet_bytes)
         1911         if processed_onion.are_we_final:
         1912             preimage, trampoline_onion_packet = self.maybe_fulfill_htlc(
         1913                 chan=chan,
         1914                 htlc=htlc,
         1915                 processed_onion=processed_onion)
         1916             # trampoline forwarding
         1917             if trampoline_onion_packet:
         1918                 if not forwarding_info:
         1919                     trampoline_onion = self.process_onion_packet(
         1920                         trampoline_onion_packet,
         1921                         payment_hash=htlc.payment_hash,
         1922                         onion_packet_bytes=onion_packet_bytes,
         1923                         is_trampoline=True)
         1924                     if trampoline_onion.are_we_final:
         1925                         preimage, _ = self.maybe_fulfill_htlc(
         1926                             chan=chan,
         1927                             htlc=htlc,
         1928                             processed_onion=trampoline_onion,
         1929                             is_trampoline=True)
         1930                     else:
         1931                         await self.lnworker.enable_htlc_forwarding.wait()
         1932                         self.maybe_forward_trampoline(
         1933                             chan=chan,
         1934                             htlc=htlc,
         1935                             trampoline_onion=trampoline_onion)
         1936                         # return True so that this code gets executed only once
         1937                         return None, True, None
         1938                 else:
         1939                     preimage = self.lnworker.get_preimage(payment_hash)
         1940                     error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None)
         1941                     if error_reason:
         1942                         self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}')
         1943                         raise error_reason
         1944 
         1945         elif not forwarding_info:
         1946             await self.lnworker.enable_htlc_forwarding.wait()
         1947             next_chan_id, next_htlc_id = self.maybe_forward_htlc(
         1948                 htlc=htlc,
         1949                 processed_onion=processed_onion)
         1950             fw_info = (next_chan_id.hex(), next_htlc_id)
         1951             return None, fw_info, None
         1952         else:
         1953             preimage = self.lnworker.get_preimage(payment_hash)
         1954             next_chan_id_hex, htlc_id = forwarding_info
         1955             next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex))
         1956             if next_chan:
         1957                 error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id)
         1958                 if error_bytes:
         1959                     return None, None, error_bytes
         1960                 if error_reason:
         1961                     raise error_reason
         1962         if preimage:
         1963             return preimage, None, None
         1964         return None, None, None
         1965 
         1966     def process_onion_packet(
         1967             self,
         1968             onion_packet: OnionPacket, *,
         1969             payment_hash: bytes,
         1970             onion_packet_bytes: bytes,
         1971             is_trampoline: bool = False) -> ProcessedOnionPacket:
         1972 
         1973         failure_data = sha256(onion_packet_bytes)
         1974         try:
         1975             processed_onion = process_onion_packet(
         1976                 onion_packet,
         1977                 associated_data=payment_hash,
         1978                 our_onion_private_key=self.privkey,
         1979                 is_trampoline=is_trampoline)
         1980         except UnsupportedOnionPacketVersion:
         1981             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
         1982         except InvalidOnionPubkey:
         1983             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
         1984         except InvalidOnionMac:
         1985             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
         1986         except Exception as e:
         1987             self.logger.info(f"error processing onion packet: {e!r}")
         1988             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
         1989         if self.network.config.get('test_fail_malformed_htlc'):
         1990             raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
         1991         if self.network.config.get('test_fail_htlcs_with_temp_node_failure'):
         1992             raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
         1993         return processed_onion