tlnpeer.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tlnpeer.py (101473B)
---
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2018 The Electrum developers
4 # Distributed under the MIT software license, see the accompanying
5 # file LICENCE or http://www.opensource.org/licenses/mit-license.php
6
7 import zlib
8 from collections import OrderedDict, defaultdict
9 import asyncio
10 import os
11 import time
12 from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set
13 from datetime import datetime
14 import functools
15
16 import aiorpcx
17 from aiorpcx import TaskGroup
18
19 from .crypto import sha256, sha256d
20 from . import bitcoin, util
21 from . import ecc
22 from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
23 from . import constants
24 from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup,
25 UnrelatedTransactionException)
26 from . import transaction
27 from .transaction import PartialTxOutput, match_script_against_template
28 from .logging import Logger
29 from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment,
30 process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure,
31 ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
32 OnionFailureCodeMetaFlag)
33 from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState
34 from . import lnutil
35 from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
36 RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
37 funding_output_script, get_per_commitment_secret_from_seed,
38 secret_to_pubkey, PaymentFailure, LnFeatures,
39 LOCAL, REMOTE, HTLCOwner,
40 ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED,
41 LightningPeerConnectionClosed, HandshakeFailed,
42 RemoteMisbehaving, ShortChannelID,
43 IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
44 LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx,
45 UpfrontShutdownScriptViolation)
46 from .lnutil import FeeUpdate, channel_id_from_funding_tx
47 from .lntransport import LNTransport, LNTransportBase
48 from .lnmsg import encode_msg, decode_msg
49 from .interface import GracefulDisconnect
50 from .lnrouter import fee_for_edge_msat
51 from .lnutil import ln_dummy_address
52 from .json_db import StoredDict
53 from .invoices import PR_PAID
54
55 if TYPE_CHECKING:
56 from .lnworker import LNGossip, LNWallet
57 from .lnrouter import LNPaymentRoute
58 from .transaction import PartialTransaction
59
60
61 LN_P2P_NETWORK_TIMEOUT = 20
62
63
64 class Peer(Logger):
65 LOGGING_SHORTCUT = 'P'
66
67 def __init__(
68 self,
69 lnworker: Union['LNGossip', 'LNWallet'],
70 pubkey: bytes,
71 transport: LNTransportBase,
72 *, is_channel_backup= False):
73
74 self.is_channel_backup = is_channel_backup
75 self._sent_init = False # type: bool
76 self._received_init = False # type: bool
77 self.initialized = asyncio.Future()
78 self.got_disconnected = asyncio.Event()
79 self.querying = asyncio.Event()
80 self.transport = transport
81 self.pubkey = pubkey # remote pubkey
82 self.lnworker = lnworker
83 self.privkey = self.transport.privkey # local privkey
84 self.features = self.lnworker.features # type: LnFeatures
85 self.their_features = LnFeatures(0) # type: LnFeatures
86 self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
87 assert self.node_ids[0] != self.node_ids[1]
88 self.network = lnworker.network
89 self.ping_time = 0
90 self.reply_channel_range = asyncio.Queue()
91 # gossip uses a single queue to preserve message order
92 self.gossip_queue = asyncio.Queue()
93 self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
94 self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
95 self.temp_id_to_id = {} # to forward error messages
96 self.funding_created_sent = set() # for channels in PREOPENING
97 self.funding_signed_sent = set() # for channels in PREOPENING
98 self.shutdown_received = {} # chan_id -> asyncio.Future()
99 self.announcement_signatures = defaultdict(asyncio.Queue)
100 self.orphan_channel_updates = OrderedDict()
101 Logger.__init__(self)
102 self.taskgroup = SilentTaskGroup()
103 # HTLCs offered by REMOTE, that we started removing but are still active:
104 self.received_htlcs_pending_removal = set() # type: Set[Tuple[Channel, int]]
105 self.received_htlc_removed_event = asyncio.Event()
106 self._htlc_switch_iterstart_event = asyncio.Event()
107 self._htlc_switch_iterdone_event = asyncio.Event()
108
109 def send_message(self, message_name: str, **kwargs):
110 assert type(message_name) is str
111 self.logger.debug(f"Sending {message_name.upper()}")
112 if message_name.upper() != "INIT" and not self.is_initialized():
113 raise Exception("tried to send message before we are initialized")
114 raw_msg = encode_msg(message_name, **kwargs)
115 self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
116 self.transport.send_bytes(raw_msg)
117
118 def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
119 is_commitment_signed = message_name == "commitment_signed"
120 if not (message_name.startswith("update_") or is_commitment_signed):
121 return
122 assert channel_id
123 chan = self.get_channel_by_id(channel_id)
124 if not chan:
125 raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
126 chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
127 if is_commitment_signed:
128 # saving now, to ensure replaying updates works (in case of channel reestablishment)
129 self.lnworker.save_channel(chan)
130
131 def maybe_set_initialized(self):
132 if self.initialized.done():
133 return
134 if self._sent_init and self._received_init:
135 self.initialized.set_result(True)
136
137 def is_initialized(self) -> bool:
138 return (self.initialized.done()
139 and not self.initialized.cancelled()
140 and self.initialized.exception() is None
141 and self.initialized.result() is True)
142
143 async def initialize(self):
144 if isinstance(self.transport, LNTransport):
145 await self.transport.handshake()
146 features = self.features.for_init_message()
147 b = int.bit_length(features)
148 flen = b // 8 + int(bool(b % 8))
149 self.send_message(
150 "init", gflen=0, flen=flen,
151 features=features,
152 init_tlvs={
153 'networks':
154 {'chains': constants.net.rev_genesis_bytes()}
155 })
156 self._sent_init = True
157 self.maybe_set_initialized()
158
159 @property
160 def channels(self) -> Dict[bytes, Channel]:
161 return self.lnworker.channels_for_peer(self.pubkey)
162
163 def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
164 # note: this is faster than self.channels.get(channel_id)
165 chan = self.lnworker.get_channel_by_id(channel_id)
166 if not chan:
167 return None
168 if chan.node_id != self.pubkey:
169 return None
170 return chan
171
172 def diagnostic_name(self):
173 return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
174
175 def ping_if_required(self):
176 if time.time() - self.ping_time > 120:
177 self.send_message('ping', num_pong_bytes=4, byteslen=4)
178 self.ping_time = time.time()
179
180 def process_message(self, message):
181 message_type, payload = decode_msg(message)
182 # only process INIT if we are a backup
183 if self.is_channel_backup is True and message_type != 'init':
184 return
185 if message_type in self.ordered_messages:
186 chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
187 self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
188 else:
189 if message_type != 'error' and 'channel_id' in payload:
190 chan = self.get_channel_by_id(payload['channel_id'])
191 if chan is None:
192 raise Exception('Got unknown '+ message_type)
193 args = (chan, payload)
194 else:
195 args = (payload,)
196 try:
197 f = getattr(self, 'on_' + message_type)
198 except AttributeError:
199 #self.logger.info("Received '%s'" % message_type.upper(), payload)
200 return
201 # raw message is needed to check signature
202 if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
203 payload['raw'] = message
204 execution_result = f(*args)
205 if asyncio.iscoroutinefunction(f):
206 asyncio.ensure_future(self.taskgroup.spawn(execution_result))
207
208 def on_error(self, payload):
209 self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
210 chan_id = payload.get("channel_id")
211 if chan_id in self.temp_id_to_id:
212 chan_id = self.temp_id_to_id[chan_id]
213 self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
214
215 def on_ping(self, payload):
216 l = payload['num_pong_bytes']
217 self.send_message('pong', byteslen=l)
218
219 def on_pong(self, payload):
220 pass
221
222 async def wait_for_message(self, expected_name, channel_id):
223 q = self.ordered_message_queues[channel_id]
224 name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
225 if payload.get('error'):
226 raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error')))
227 if name != expected_name:
228 raise Exception(f"Received unexpected '{name}'")
229 return payload
230
231 def on_init(self, payload):
232 if self._received_init:
233 self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
234 return
235 self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big"))
236 their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big")
237 self.their_features |= their_globalfeatures
238 # check transitive dependencies for received features
239 if not self.their_features.validate_transitive_dependencies():
240 raise GracefulDisconnect("remote did not set all dependencies for the features they sent")
241 # check if features are compatible, and set self.features to what we negotiated
242 try:
243 self.features = ln_compare_features(self.features, self.their_features)
244 except IncompatibleLightningFeatures as e:
245 self.initialized.set_exception(e)
246 raise GracefulDisconnect(f"{str(e)}")
247 # check that they are on the same chain as us, if provided
248 their_networks = payload["init_tlvs"].get("networks")
249 if their_networks:
250 their_chains = list(chunks(their_networks["chains"], 32))
251 if constants.net.rev_genesis_bytes() not in their_chains:
252 raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
253 # all checks passed
254 self.lnworker.on_peer_successfully_established(self)
255 self._received_init = True
256 self.maybe_set_initialized()
257
258 def on_node_announcement(self, payload):
259 if self.lnworker.channel_db:
260 self.gossip_queue.put_nowait(('node_announcement', payload))
261
262 def on_channel_announcement(self, payload):
263 if self.lnworker.channel_db:
264 self.gossip_queue.put_nowait(('channel_announcement', payload))
265
266 def on_channel_update(self, payload):
267 self.maybe_save_remote_update(payload)
268 if self.lnworker.channel_db:
269 self.gossip_queue.put_nowait(('channel_update', payload))
270
271 def maybe_save_remote_update(self, payload):
272 if not self.channels:
273 return
274 for chan in self.channels.values():
275 if chan.short_channel_id == payload['short_channel_id']:
276 chan.set_remote_update(payload['raw'])
277 self.logger.info("saved remote_update")
278 break
279 else:
280 # Save (some bounded number of) orphan channel updates for later
281 # as it might be for our own direct channel with this peer
282 # (and we might not yet know the short channel id for that)
283 # Background: this code is here to deal with a bug in LND,
284 # see https://github.com/lightningnetwork/lnd/issues/3651
285 # and https://github.com/lightningnetwork/lightning-rfc/pull/657
286 # This code assumes gossip_queries is set. BOLT7: "if the
287 # gossip_queries feature is negotiated, [a node] MUST NOT
288 # send gossip it did not generate itself"
289 short_channel_id = ShortChannelID(payload['short_channel_id'])
290 self.logger.info(f'received orphan channel update {short_channel_id}')
291 self.orphan_channel_updates[short_channel_id] = payload
292 while len(self.orphan_channel_updates) > 25:
293 self.orphan_channel_updates.popitem(last=False)
294
295 def on_announcement_signatures(self, chan: Channel, payload):
296 if chan.config[LOCAL].was_announced:
297 h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
298 else:
299 self.announcement_signatures[chan.channel_id].put_nowait(payload)
300
301 def handle_disconnect(func):
302 @functools.wraps(func)
303 async def wrapper_func(self, *args, **kwargs):
304 try:
305 return await func(self, *args, **kwargs)
306 except GracefulDisconnect as e:
307 self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
308 except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
309 aiorpcx.socks.SOCKSError) as e:
310 self.logger.info(f"Disconnecting: {repr(e)}")
311 finally:
312 self.close_and_cleanup()
313 return wrapper_func
314
315 @ignore_exceptions # do not kill outer taskgroup
316 @log_exceptions
317 @handle_disconnect
318 async def main_loop(self):
319 async with self.taskgroup as group:
320 await group.spawn(self._message_loop())
321 await group.spawn(self.htlc_switch())
322 await group.spawn(self.query_gossip())
323 await group.spawn(self.process_gossip())
324
325 async def process_gossip(self):
326 while True:
327 await asyncio.sleep(5)
328 if not self.network.lngossip:
329 continue
330 chan_anns = []
331 chan_upds = []
332 node_anns = []
333 while True:
334 name, payload = await self.gossip_queue.get()
335 if name == 'channel_announcement':
336 chan_anns.append(payload)
337 elif name == 'channel_update':
338 chan_upds.append(payload)
339 elif name == 'node_announcement':
340 node_anns.append(payload)
341 else:
342 raise Exception('unknown message')
343 if self.gossip_queue.empty():
344 break
345 # verify in peer's TaskGroup so that we fail the connection
346 self.verify_channel_announcements(chan_anns)
347 self.verify_node_announcements(node_anns)
348 if self.network.lngossip:
349 await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
350
351 def verify_channel_announcements(self, chan_anns):
352 for payload in chan_anns:
353 h = sha256d(payload['raw'][2+256:])
354 pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
355 sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
356 for pubkey, sig in zip(pubkeys, sigs):
357 if not ecc.verify_signature(pubkey, sig, h):
358 raise Exception('signature failed')
359
360 def verify_node_announcements(self, node_anns):
361 for payload in node_anns:
362 pubkey = payload['node_id']
363 signature = payload['signature']
364 h = sha256d(payload['raw'][66:])
365 if not ecc.verify_signature(pubkey, signature, h):
366 raise Exception('signature failed')
367
368 async def query_gossip(self):
369 try:
370 await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
371 except Exception as e:
372 raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
373 if self.lnworker == self.lnworker.network.lngossip:
374 try:
375 ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
376 except asyncio.TimeoutError as e:
377 raise GracefulDisconnect("query_channel_range timed out") from e
378 self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
379 await self.lnworker.add_new_ids(ids)
380 while True:
381 todo = self.lnworker.get_ids_to_query()
382 if not todo:
383 await asyncio.sleep(1)
384 continue
385 await self.get_short_channel_ids(todo)
386
387 async def get_channel_range(self):
388 first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
389 num_blocks = self.lnworker.network.get_local_height() - first_block
390 self.query_channel_range(first_block, num_blocks)
391 intervals = []
392 ids = set()
393 # note: implementations behave differently...
394 # "sane implementation that follows BOLT-07" example:
395 # query_channel_range. <<< first_block 497000, num_blocks 79038
396 # on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
397 # on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
398 # on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
399 # on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
400 # lnd example:
401 # query_channel_range. <<< first_block 497000, num_blocks 79038
402 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
403 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
404 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
405 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
406 # on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
407 while True:
408 index, num, complete, _ids = await self.reply_channel_range.get()
409 ids.update(_ids)
410 intervals.append((index, index+num))
411 intervals.sort()
412 while len(intervals) > 1:
413 a,b = intervals[0]
414 c,d = intervals[1]
415 if not (a <= c and a <= b and c <= d):
416 raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
417 if b >= c:
418 intervals = [(a,d)] + intervals[2:]
419 else:
420 break
421 if len(intervals) == 1 and complete:
422 a, b = intervals[0]
423 if a <= first_block and b >= first_block + num_blocks:
424 break
425 return ids, complete
426
427 def request_gossip(self, timestamp=0):
428 if timestamp == 0:
429 self.logger.info('requesting whole channel graph')
430 else:
431 self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}')
432 self.send_message(
433 'gossip_timestamp_filter',
434 chain_hash=constants.net.rev_genesis_bytes(),
435 first_timestamp=timestamp,
436 timestamp_range=b'\xff'*4)
437
438 def query_channel_range(self, first_block, num_blocks):
439 self.logger.info(f'query channel range {first_block} {num_blocks}')
440 self.send_message(
441 'query_channel_range',
442 chain_hash=constants.net.rev_genesis_bytes(),
443 first_blocknum=first_block,
444 number_of_blocks=num_blocks)
445
446 def decode_short_ids(self, encoded):
447 if encoded[0] == 0:
448 decoded = encoded[1:]
449 elif encoded[0] == 1:
450 decoded = zlib.decompress(encoded[1:])
451 else:
452 raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
453 ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
454 return ids
455
456 def on_reply_channel_range(self, payload):
457 first = payload['first_blocknum']
458 num = payload['number_of_blocks']
459 complete = bool(int.from_bytes(payload['complete'], 'big'))
460 encoded = payload['encoded_short_ids']
461 ids = self.decode_short_ids(encoded)
462 #self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}")
463 self.reply_channel_range.put_nowait((first, num, complete, ids))
464
465 async def get_short_channel_ids(self, ids):
466 self.logger.info(f'Querying {len(ids)} short_channel_ids')
467 assert not self.querying.is_set()
468 self.query_short_channel_ids(ids)
469 await self.querying.wait()
470 self.querying.clear()
471
472 def query_short_channel_ids(self, ids, compressed=True):
473 ids = sorted(ids)
474 s = b''.join(ids)
475 encoded = zlib.compress(s) if compressed else s
476 prefix = b'\x01' if compressed else b'\x00'
477 self.send_message(
478 'query_short_channel_ids',
479 chain_hash=constants.net.rev_genesis_bytes(),
480 len=1+len(encoded),
481 encoded_short_ids=prefix+encoded)
482
483 async def _message_loop(self):
484 try:
485 await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
486 except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
487 raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
488 async for msg in self.transport.read_messages():
489 self.process_message(msg)
490 await asyncio.sleep(.01)
491
492 def on_reply_short_channel_ids_end(self, payload):
493 self.querying.set()
494
495 def close_and_cleanup(self):
496 try:
497 if self.transport:
498 self.transport.close()
499 except:
500 pass
501 self.lnworker.peer_closed(self)
502 self.got_disconnected.set()
503
504 def is_static_remotekey(self):
505 return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
506
507 def is_upfront_shutdown_script(self):
508 return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
509
510 def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
511 if msg_identifier not in ['accept', 'open']:
512 raise ValueError("msg_identifier must be either 'accept' or 'open'")
513
514 uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
515 'upfront_shutdown_script')
516
517 if uss_tlv and self.is_upfront_shutdown_script():
518 upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
519 else:
520 upfront_shutdown_script = b''
521 self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
522 return upfront_shutdown_script
523
524 def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
525 channel_seed = os.urandom(32)
526 initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
527
528 static_remotekey = None
529 # sending empty bytes as the upfront_shutdown_script will give us the
530 # flexibility to decide an address at closing time
531 upfront_shutdown_script = b''
532
533 if self.is_static_remotekey():
534 wallet = self.lnworker.wallet
535 assert wallet.txin_type == 'p2wpkh'
536 addr = wallet.get_new_sweep_address_for_channel()
537 static_remotekey = bfh(wallet.get_public_key(addr))
538 else:
539 static_remotekey = None
540 dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY
541 reserve_sat = max(funding_sat // 100, dust_limit_sat)
542 # for comparison of defaults, see
543 # https://github.com/ACINQ/eclair/blob/afa378fbb73c265da44856b4ad0f2128a88ae6c6/eclair-core/src/main/resources/reference.conf#L66
544 # https://github.com/ElementsProject/lightning/blob/0056dd75572a8857cff36fcbdb1a2295a1ac9253/lightningd/options.c#L657
545 # https://github.com/lightningnetwork/lnd/blob/56b61078c5b2be007d318673a5f3b40c6346883a/config.go#L81
546 local_config = LocalConfig.from_seed(
547 channel_seed=channel_seed,
548 static_remotekey=static_remotekey,
549 upfront_shutdown_script=upfront_shutdown_script,
550 to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144),
551 dust_limit_sat=dust_limit_sat,
552 max_htlc_value_in_flight_msat=funding_sat * 1000,
553 max_accepted_htlcs=30,
554 initial_msat=initial_msat,
555 reserve_sat=reserve_sat,
556 funding_locked_received=False,
557 was_announced=False,
558 current_commitment_signature=None,
559 current_htlc_signatures=b'',
560 htlc_minimum_msat=1,
561 )
562 local_config.validate_params(funding_sat=funding_sat)
563 return local_config
564
565 def temporarily_reserve_funding_tx_change_address(func):
566 # During the channel open flow, if we initiated, we might have used a change address
567 # of ours in the funding tx. The funding tx is not part of the wallet history
568 # at that point yet, but we should already consider this change address as 'used'.
569 @functools.wraps(func)
570 async def wrapper(self: 'Peer', *args, **kwargs):
571 funding_tx = kwargs['funding_tx'] # type: PartialTransaction
572 wallet = self.lnworker.wallet
573 change_addresses = [txout.address for txout in funding_tx.outputs()
574 if wallet.is_change(txout.address)]
575 for addr in change_addresses:
576 wallet.set_reserved_state_of_address(addr, reserved=True)
577 try:
578 return await func(self, *args, **kwargs)
579 finally:
580 for addr in change_addresses:
581 self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
582 return wrapper
583
584 @log_exceptions
585 @temporarily_reserve_funding_tx_change_address
586 async def channel_establishment_flow(
587 self, *,
588 funding_tx: 'PartialTransaction',
589 funding_sat: int,
590 push_msat: int,
591 temp_channel_id: bytes
592 ) -> Tuple[Channel, 'PartialTransaction']:
593 """Implements the channel opening flow.
594
595 -> open_channel message
596 <- accept_channel message
597 -> funding_created message
598 <- funding_signed message
599
600 Channel configurations are initialized in this method.
601 """
602 # will raise if init fails
603 await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
604 # trampoline is not yet in features
605 if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey):
606 raise Exception('Not a trampoline node: ' + str(self.their_features))
607
608 feerate = self.lnworker.current_feerate_per_kw()
609 local_config = self.make_local_config(funding_sat, push_msat, LOCAL)
610
611 if funding_sat > LN_MAX_FUNDING_SAT:
612 raise Exception(
613 f"MUST set funding_satoshis to less than 2^24 satoshi. "
614 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
615 if push_msat > 1000 * funding_sat:
616 raise Exception(
617 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
618 f"{push_msat} msat > {1000 * funding_sat} msat")
619 if funding_sat < lnutil.MIN_FUNDING_SAT:
620 raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
621
622 # for the first commitment transaction
623 per_commitment_secret_first = get_per_commitment_secret_from_seed(
624 local_config.per_commitment_secret_seed,
625 RevocationStore.START_INDEX
626 )
627 per_commitment_point_first = secret_to_pubkey(
628 int.from_bytes(per_commitment_secret_first, 'big'))
629 self.send_message(
630 "open_channel",
631 temporary_channel_id=temp_channel_id,
632 chain_hash=constants.net.rev_genesis_bytes(),
633 funding_satoshis=funding_sat,
634 push_msat=push_msat,
635 dust_limit_satoshis=local_config.dust_limit_sat,
636 feerate_per_kw=feerate,
637 max_accepted_htlcs=local_config.max_accepted_htlcs,
638 funding_pubkey=local_config.multisig_key.pubkey,
639 revocation_basepoint=local_config.revocation_basepoint.pubkey,
640 htlc_basepoint=local_config.htlc_basepoint.pubkey,
641 payment_basepoint=local_config.payment_basepoint.pubkey,
642 delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
643 first_per_commitment_point=per_commitment_point_first,
644 to_self_delay=local_config.to_self_delay,
645 max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
646 channel_flags=0x00, # not willing to announce channel
647 channel_reserve_satoshis=local_config.reserve_sat,
648 htlc_minimum_msat=local_config.htlc_minimum_msat,
649 open_channel_tlvs={
650 'upfront_shutdown_script':
651 {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
652 }
653 )
654
655 # <- accept_channel
656 payload = await self.wait_for_message('accept_channel', temp_channel_id)
657 remote_per_commitment_point = payload['first_per_commitment_point']
658 funding_txn_minimum_depth = payload['minimum_depth']
659 if funding_txn_minimum_depth <= 0:
660 raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
661 if funding_txn_minimum_depth > 30:
662 raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
663
664 upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
665 payload, 'accept')
666
667 remote_config = RemoteConfig(
668 payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
669 multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
670 htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
671 delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
672 revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
673 to_self_delay=payload['to_self_delay'],
674 dust_limit_sat=payload['dust_limit_satoshis'],
675 max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
676 max_accepted_htlcs=payload["max_accepted_htlcs"],
677 initial_msat=push_msat,
678 reserve_sat=payload["channel_reserve_satoshis"],
679 htlc_minimum_msat=payload['htlc_minimum_msat'],
680 next_per_commitment_point=remote_per_commitment_point,
681 current_per_commitment_point=None,
682 upfront_shutdown_script=upfront_shutdown_script
683 )
684 remote_config.validate_params(funding_sat=funding_sat)
685 # if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message:
686 # MUST reject the channel.
687 if remote_config.reserve_sat < local_config.dust_limit_sat:
688 raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat")
689 # if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis:
690 # MUST reject the channel.
691 if local_config.reserve_sat < remote_config.dust_limit_sat:
692 raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat")
693
694 # -> funding created
695 # replace dummy output in funding tx
696 redeem_script = funding_output_script(local_config, remote_config)
697 funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
698 funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
699 dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)
700 funding_tx.outputs().remove(dummy_output)
701 funding_tx.add_outputs([funding_output])
702 funding_tx.set_rbf(False)
703 if not funding_tx.is_segwit():
704 raise Exception('Funding transaction is not segwit')
705 funding_txid = funding_tx.txid()
706 assert funding_txid
707 funding_index = funding_tx.outputs().index(funding_output)
708 # build remote commitment transaction
709 channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
710 outpoint = Outpoint(funding_txid, funding_index)
711 constraints = ChannelConstraints(
712 capacity=funding_sat,
713 is_initiator=True,
714 funding_txn_minimum_depth=funding_txn_minimum_depth
715 )
716 chan_dict = self.create_channel_storage(
717 channel_id, outpoint, local_config, remote_config, constraints)
718 chan = Channel(
719 chan_dict,
720 sweep_address=self.lnworker.sweep_address,
721 lnworker=self.lnworker,
722 initial_feerate=feerate
723 )
724 chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
725 if isinstance(self.transport, LNTransport):
726 chan.add_or_update_peer_addr(self.transport.peer_addr)
727 sig_64, _ = chan.sign_next_commitment()
728 self.temp_id_to_id[temp_channel_id] = channel_id
729
730 self.send_message("funding_created",
731 temporary_channel_id=temp_channel_id,
732 funding_txid=funding_txid_bytes,
733 funding_output_index=funding_index,
734 signature=sig_64)
735 self.funding_created_sent.add(channel_id)
736
737 # <- funding signed
738 payload = await self.wait_for_message('funding_signed', channel_id)
739 self.logger.info('received funding_signed')
740 remote_sig = payload['signature']
741 chan.receive_new_commitment(remote_sig, [])
742 chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
743 chan.set_state(ChannelState.OPENING)
744 self.lnworker.add_new_channel(chan)
745 return chan, funding_tx
746
747 def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints):
748 chan_dict = {
749 "node_id": self.pubkey.hex(),
750 "channel_id": channel_id.hex(),
751 "short_channel_id": None,
752 "funding_outpoint": outpoint,
753 "remote_config": remote_config,
754 "local_config": local_config,
755 "constraints": constraints,
756 "remote_update": None,
757 "state": ChannelState.PREOPENING.name,
758 'onion_keys': {},
759 'data_loss_protect_remote_pcp': {},
760 "log": {},
761 "revocation_store": {},
762 "static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2
763 }
764 return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, [])
765
766 async def on_open_channel(self, payload):
767 """Implements the channel acceptance flow.
768
769 <- open_channel message
770 -> accept_channel message
771 <- funding_created message
772 -> funding_signed message
773
774 Channel configurations are initialized in this method.
775 """
776 # <- open_channel
777 if payload['chain_hash'] != constants.net.rev_genesis_bytes():
778 raise Exception('wrong chain_hash')
779 funding_sat = payload['funding_satoshis']
780 push_msat = payload['push_msat']
781 feerate = payload['feerate_per_kw'] # note: we are not validating this
782 temp_chan_id = payload['temporary_channel_id']
783 local_config = self.make_local_config(funding_sat, push_msat, REMOTE)
784 if funding_sat > LN_MAX_FUNDING_SAT:
785 raise Exception(
786 f"MUST set funding_satoshis to less than 2^24 satoshi. "
787 f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
788 if push_msat > 1000 * funding_sat:
789 raise Exception(
790 f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
791 f"{push_msat} msat > {1000 * funding_sat} msat")
792 if funding_sat < lnutil.MIN_FUNDING_SAT:
793 raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
794
795 upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
796 payload, 'open')
797
798 remote_config = RemoteConfig(
799 payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
800 multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
801 htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
802 delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
803 revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
804 to_self_delay=payload['to_self_delay'],
805 dust_limit_sat=payload['dust_limit_satoshis'],
806 max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
807 max_accepted_htlcs=payload['max_accepted_htlcs'],
808 initial_msat=funding_sat * 1000 - push_msat,
809 reserve_sat=payload['channel_reserve_satoshis'],
810 htlc_minimum_msat=payload['htlc_minimum_msat'],
811 next_per_commitment_point=payload['first_per_commitment_point'],
812 current_per_commitment_point=None,
813 upfront_shutdown_script=upfront_shutdown_script,
814 )
815
816 remote_config.validate_params(funding_sat=funding_sat)
817 # The receiving node MUST fail the channel if:
818 # the funder's amount for the initial commitment transaction is not
819 # sufficient for full fee payment.
820 if remote_config.initial_msat < calc_fees_for_commitment_tx(
821 num_htlcs=0,
822 feerate=feerate,
823 is_local_initiator=False)[REMOTE]:
824 raise Exception(
825 "the funder's amount for the initial commitment transaction "
826 "is not sufficient for full fee payment")
827 # The receiving node MUST fail the channel if:
828 # both to_local and to_remote amounts for the initial commitment transaction are
829 # less than or equal to channel_reserve_satoshis (see BOLT 3).
830 if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']
831 and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']):
832 raise Exception(
833 "both to_local and to_remote amounts for the initial commitment "
834 "transaction are less than or equal to channel_reserve_satoshis")
835 # note: we ignore payload['channel_flags'], which e.g. contains 'announce_channel'.
836 # Notably if the remote sets 'announce_channel' to True, we will ignore that too,
837 # but we will not play along with actually announcing the channel (so we keep it private).
838
839 # -> accept channel
840 # for the first commitment transaction
841 per_commitment_secret_first = get_per_commitment_secret_from_seed(
842 local_config.per_commitment_secret_seed,
843 RevocationStore.START_INDEX
844 )
845 per_commitment_point_first = secret_to_pubkey(
846 int.from_bytes(per_commitment_secret_first, 'big'))
847 min_depth = 3
848 self.send_message(
849 'accept_channel',
850 temporary_channel_id=temp_chan_id,
851 dust_limit_satoshis=local_config.dust_limit_sat,
852 max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
853 channel_reserve_satoshis=local_config.reserve_sat,
854 htlc_minimum_msat=local_config.htlc_minimum_msat,
855 minimum_depth=min_depth,
856 to_self_delay=local_config.to_self_delay,
857 max_accepted_htlcs=local_config.max_accepted_htlcs,
858 funding_pubkey=local_config.multisig_key.pubkey,
859 revocation_basepoint=local_config.revocation_basepoint.pubkey,
860 payment_basepoint=local_config.payment_basepoint.pubkey,
861 delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
862 htlc_basepoint=local_config.htlc_basepoint.pubkey,
863 first_per_commitment_point=per_commitment_point_first,
864 accept_channel_tlvs={
865 'upfront_shutdown_script':
866 {'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
867 }
868 )
869
870 # <- funding created
871 funding_created = await self.wait_for_message('funding_created', temp_chan_id)
872
873 # -> funding signed
874 funding_idx = funding_created['funding_output_index']
875 funding_txid = bh2u(funding_created['funding_txid'][::-1])
876 channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
877 constraints = ChannelConstraints(
878 capacity=funding_sat,
879 is_initiator=False,
880 funding_txn_minimum_depth=min_depth
881 )
882 outpoint = Outpoint(funding_txid, funding_idx)
883 chan_dict = self.create_channel_storage(
884 channel_id, outpoint, local_config, remote_config, constraints)
885 chan = Channel(
886 chan_dict,
887 sweep_address=self.lnworker.sweep_address,
888 lnworker=self.lnworker,
889 initial_feerate=feerate
890 )
891 chan.storage['init_timestamp'] = int(time.time())
892 if isinstance(self.transport, LNTransport):
893 chan.add_or_update_peer_addr(self.transport.peer_addr)
894 remote_sig = funding_created['signature']
895 chan.receive_new_commitment(remote_sig, [])
896 sig_64, _ = chan.sign_next_commitment()
897 self.send_message('funding_signed',
898 channel_id=channel_id,
899 signature=sig_64,
900 )
901 self.funding_signed_sent.add(chan.channel_id)
902 chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
903 chan.set_state(ChannelState.OPENING)
904 self.lnworker.add_new_channel(chan)
905
906 async def trigger_force_close(self, channel_id: bytes):
907 await self.initialized
908 latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
909 self.send_message(
910 "channel_reestablish",
911 channel_id=channel_id,
912 next_commitment_number=0,
913 next_revocation_number=0,
914 your_last_per_commitment_secret=0,
915 my_current_per_commitment_point=latest_point)
916
917 async def reestablish_channel(self, chan: Channel):
918 await self.initialized
919 chan_id = chan.channel_id
920 assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
921 if chan.peer_state != PeerState.DISCONNECTED:
922 self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} '
923 f'already in peer_state {chan.peer_state!r}')
924 return
925 chan.peer_state = PeerState.REESTABLISHING
926 util.trigger_callback('channel', self.lnworker.wallet, chan)
927 # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
928 chan.hm.discard_unsigned_remote_updates()
929 # ctns
930 oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
931 latest_local_ctn = chan.get_latest_ctn(LOCAL)
932 next_local_ctn = chan.get_next_ctn(LOCAL)
933 oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
934 latest_remote_ctn = chan.get_latest_ctn(REMOTE)
935 next_remote_ctn = chan.get_next_ctn(REMOTE)
936 assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
937 # send message
938 if chan.is_static_remotekey_enabled():
939 latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
940 else:
941 latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn)
942 if oldest_unrevoked_remote_ctn == 0:
943 last_rev_secret = 0
944 else:
945 last_rev_index = oldest_unrevoked_remote_ctn - 1
946 last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
947 self.send_message(
948 "channel_reestablish",
949 channel_id=chan_id,
950 next_commitment_number=next_local_ctn,
951 next_revocation_number=oldest_unrevoked_remote_ctn,
952 your_last_per_commitment_secret=last_rev_secret,
953 my_current_per_commitment_point=latest_point)
954 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
955 f'(next_local_ctn={next_local_ctn}, '
956 f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
957 while True:
958 try:
959 msg = await self.wait_for_message('channel_reestablish', chan_id)
960 break
961 except asyncio.TimeoutError:
962 self.logger.info('waiting to receive channel_reestablish...')
963 continue
964 their_next_local_ctn = msg["next_commitment_number"]
965 their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
966 their_local_pcp = msg.get("my_current_per_commitment_point")
967 their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
968 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
969 f'(their_next_local_ctn={their_next_local_ctn}, '
970 f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
971 # sanity checks of received values
972 if their_next_local_ctn < 0:
973 raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
974 if their_oldest_unrevoked_remote_ctn < 0:
975 raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
976 # Replay un-acked local updates (including commitment_signed) byte-for-byte.
977 # If we have sent them a commitment signature that they "lost" (due to disconnect),
978 # we need to make sure we replay the same local updates, as otherwise they could
979 # end up with two (or more) signed valid commitment transactions at the same ctn.
980 # Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
981 # e.g. for watchtowers, hence we must ensure these ctxs coincide.
982 # We replay the local updates even if they were not yet committed.
983 unacked = chan.hm.get_unacked_local_updates()
984 n_replayed_msgs = 0
985 for ctn, messages in unacked.items():
986 if ctn < their_next_local_ctn:
987 # They claim to have received these messages and the corresponding
988 # commitment_signed, hence we must not replay them.
989 continue
990 for raw_upd_msg in messages:
991 self.transport.send_bytes(raw_upd_msg)
992 n_replayed_msgs += 1
993 self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages')
994
995 we_are_ahead = False
996 they_are_ahead = False
997 # compare remote ctns
998 if next_remote_ctn != their_next_local_ctn:
999 if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
1000 # We replayed the local updates (see above), which should have contained a commitment_signed
1001 # (due to is_revack_pending being true), and this should have remedied this situation.
1002 pass
1003 else:
1004 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
1005 f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
1006 if their_next_local_ctn < next_remote_ctn:
1007 we_are_ahead = True
1008 else:
1009 they_are_ahead = True
1010 # compare local ctns
1011 if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
1012 if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
1013 # A node:
1014 # if next_revocation_number is equal to the commitment number of the last revoke_and_ack
1015 # the receiving node sent, AND the receiving node hasn't already received a closing_signed:
1016 # MUST re-send the revoke_and_ack.
1017 last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
1018 next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
1019 self.send_message(
1020 "revoke_and_ack",
1021 channel_id=chan.channel_id,
1022 per_commitment_secret=last_secret,
1023 next_per_commitment_point=next_point)
1024 else:
1025 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
1026 f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
1027 if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
1028 we_are_ahead = True
1029 else:
1030 they_are_ahead = True
1031 # option_data_loss_protect
1032 def are_datalossprotect_fields_valid() -> bool:
1033 if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
1034 return False
1035 if their_oldest_unrevoked_remote_ctn > 0:
1036 our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
1037 else:
1038 assert their_oldest_unrevoked_remote_ctn == 0
1039 our_pcs = bytes(32)
1040 if our_pcs != their_claim_of_our_last_per_commitment_secret:
1041 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
1042 f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}")
1043 return False
1044 if chan.is_static_remotekey_enabled():
1045 return True
1046 try:
1047 __, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1)
1048 except RemoteCtnTooFarInFuture:
1049 pass
1050 else:
1051 if our_remote_pcp != their_local_pcp:
1052 self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
1053 f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}")
1054 return False
1055 return True
1056
1057 if not are_datalossprotect_fields_valid():
1058 raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
1059
1060 if they_are_ahead:
1061 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
1062 f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}")
1063 # data_loss_protect_remote_pcp is used in lnsweep
1064 chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
1065 self.lnworker.save_channel(chan)
1066 chan.peer_state = PeerState.BAD
1067 return
1068 elif we_are_ahead:
1069 self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
1070 await self.lnworker.try_force_closing(chan_id)
1071 return
1072
1073 chan.peer_state = PeerState.GOOD
1074 if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1:
1075 self.send_funding_locked(chan)
1076 # checks done
1077 if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
1078 self.mark_open(chan)
1079 util.trigger_callback('channel', self.lnworker.wallet, chan)
1080 # if we have sent a previous shutdown, it must be retransmitted (Bolt2)
1081 if chan.get_state() == ChannelState.SHUTDOWN:
1082 await self.send_shutdown(chan)
1083
1084 def send_funding_locked(self, chan: Channel):
1085 channel_id = chan.channel_id
1086 per_commitment_secret_index = RevocationStore.START_INDEX - 1
1087 per_commitment_point_second = secret_to_pubkey(int.from_bytes(
1088 get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
1089 # note: if funding_locked was not yet received, we might send it multiple times
1090 self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
1091 if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
1092 self.mark_open(chan)
1093
1094 def on_funding_locked(self, chan: Channel, payload):
1095 self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}")
1096 if not chan.config[LOCAL].funding_locked_received:
1097 their_next_point = payload["next_per_commitment_point"]
1098 chan.config[REMOTE].next_per_commitment_point = their_next_point
1099 chan.config[LOCAL].funding_locked_received = True
1100 self.lnworker.save_channel(chan)
1101 if chan.is_funded():
1102 self.mark_open(chan)
1103
1104 def on_network_update(self, chan: Channel, funding_tx_depth: int):
1105 """
1106 Only called when the channel is OPEN.
1107
1108 Runs on the Network thread.
1109 """
1110 if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6:
1111 # don't announce our channels
1112 # FIXME should this be a field in chan.local_state maybe?
1113 return
1114 chan.config[LOCAL].was_announced = True
1115 self.lnworker.save_channel(chan)
1116 coro = self.handle_announcements(chan)
1117 asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
1118
1119 @log_exceptions
1120 async def handle_announcements(self, chan: Channel):
1121 h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
1122 announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get()
1123 remote_node_sig = announcement_signatures_msg["node_signature"]
1124 remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"]
1125 if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h):
1126 raise Exception("bitcoin_sig invalid in announcement_signatures")
1127 if not ecc.verify_signature(self.pubkey, remote_node_sig, h):
1128 raise Exception("node_sig invalid in announcement_signatures")
1129
1130 node_sigs = [remote_node_sig, local_node_sig]
1131 bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig]
1132 bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey]
1133
1134 if self.node_ids[0] > self.node_ids[1]:
1135 node_sigs.reverse()
1136 bitcoin_sigs.reverse()
1137 node_ids = list(reversed(self.node_ids))
1138 bitcoin_keys.reverse()
1139 else:
1140 node_ids = self.node_ids
1141
1142 self.send_message("channel_announcement",
1143 node_signatures_1=node_sigs[0],
1144 node_signatures_2=node_sigs[1],
1145 bitcoin_signature_1=bitcoin_sigs[0],
1146 bitcoin_signature_2=bitcoin_sigs[1],
1147 len=0,
1148 #features not set (defaults to zeros)
1149 chain_hash=constants.net.rev_genesis_bytes(),
1150 short_channel_id=chan.short_channel_id,
1151 node_id_1=node_ids[0],
1152 node_id_2=node_ids[1],
1153 bitcoin_key_1=bitcoin_keys[0],
1154 bitcoin_key_2=bitcoin_keys[1]
1155 )
1156
1157 def mark_open(self, chan: Channel):
1158 assert chan.is_funded()
1159 # only allow state transition from "FUNDED" to "OPEN"
1160 old_state = chan.get_state()
1161 if old_state == ChannelState.OPEN:
1162 return
1163 if old_state != ChannelState.FUNDED:
1164 self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
1165 return
1166 assert chan.config[LOCAL].funding_locked_received
1167 chan.set_state(ChannelState.OPEN)
1168 util.trigger_callback('channel', self.lnworker.wallet, chan)
1169 # peer may have sent us a channel update for the incoming direction previously
1170 pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
1171 if pending_channel_update:
1172 chan.set_remote_update(pending_channel_update['raw'])
1173 self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
1174 forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
1175 if forwarding_enabled:
1176 # send channel_update of outgoing edge to peer,
1177 # so that channel can be used to to receive payments
1178 self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
1179 chan_upd = chan.get_outgoing_gossip_channel_update()
1180 self.transport.send_bytes(chan_upd)
1181
1182 def send_announcement_signatures(self, chan: Channel):
1183 chan_ann = chan.construct_channel_announcement_without_sigs()
1184 preimage = chan_ann[256+2:]
1185 msg_hash = sha256d(preimage)
1186 bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s)
1187 node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s)
1188 self.send_message("announcement_signatures",
1189 channel_id=chan.channel_id,
1190 short_channel_id=chan.short_channel_id,
1191 node_signature=node_signature,
1192 bitcoin_signature=bitcoin_signature
1193 )
1194 return msg_hash, node_signature, bitcoin_signature
1195
1196 def on_update_fail_htlc(self, chan: Channel, payload):
1197 htlc_id = payload["id"]
1198 reason = payload["reason"]
1199 self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1200 chan.receive_fail_htlc(htlc_id, error_bytes=reason) # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
1201 self.maybe_send_commitment(chan)
1202
1203 def maybe_send_commitment(self, chan: Channel):
1204 # REMOTE should revoke first before we can sign a new ctx
1205 if chan.hm.is_revack_pending(REMOTE):
1206 return
1207 # if there are no changes, we will not (and must not) send a new commitment
1208 if not chan.has_pending_changes(REMOTE):
1209 return
1210 self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
1211 sig_64, htlc_sigs = chan.sign_next_commitment()
1212 self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
1213
1214 def pay(self, *,
1215 route: 'LNPaymentRoute',
1216 chan: Channel,
1217 amount_msat: int,
1218 total_msat: int,
1219 payment_hash: bytes,
1220 min_final_cltv_expiry: int,
1221 payment_secret: bytes = None,
1222 trampoline_onion=None) -> UpdateAddHtlc:
1223
1224 assert amount_msat > 0, "amount_msat is not greater zero"
1225 assert len(route) > 0
1226 if not chan.can_send_update_add_htlc():
1227 raise PaymentFailure("Channel cannot send update_add_htlc")
1228 # add features learned during "init" for direct neighbour:
1229 route[0].node_features |= self.features
1230 local_height = self.network.get_local_height()
1231 final_cltv = local_height + min_final_cltv_expiry
1232 hops_data, amount_msat, cltv = calc_hops_data_for_payment(
1233 route,
1234 amount_msat,
1235 final_cltv,
1236 total_msat=total_msat,
1237 payment_secret=payment_secret)
1238 num_hops = len(hops_data)
1239 self.logger.info(f"lnpeer.pay len(route)={len(route)}")
1240 for i in range(len(route)):
1241 self.logger.info(f" {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
1242 assert final_cltv <= cltv, (final_cltv, cltv)
1243 session_key = os.urandom(32) # session_key
1244 # if we are forwarding a trampoline payment, add trampoline onion
1245 if trampoline_onion:
1246 self.logger.info(f'adding trampoline onion to final payload')
1247 trampoline_payload = hops_data[num_hops-2].payload
1248 trampoline_payload["trampoline_onion_packet"] = {
1249 "version": trampoline_onion.version,
1250 "public_key": trampoline_onion.public_key,
1251 "hops_data": trampoline_onion.hops_data,
1252 "hmac": trampoline_onion.hmac
1253 }
1254 # create onion packet
1255 payment_path_pubkeys = [x.node_id for x in route]
1256 onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
1257 self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
1258 # create htlc
1259 if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
1260 raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)")
1261 htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
1262 htlc = chan.add_htlc(htlc)
1263 chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
1264 self.logger.info(f"starting payment. htlc: {htlc}")
1265 self.send_message(
1266 "update_add_htlc",
1267 channel_id=chan.channel_id,
1268 id=htlc.htlc_id,
1269 cltv_expiry=htlc.cltv_expiry,
1270 amount_msat=htlc.amount_msat,
1271 payment_hash=htlc.payment_hash,
1272 onion_routing_packet=onion.to_bytes())
1273 self.maybe_send_commitment(chan)
1274 return htlc
1275
1276 def send_revoke_and_ack(self, chan: Channel):
1277 self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
1278 rev = chan.revoke_current_commitment()
1279 self.lnworker.save_channel(chan)
1280 self.send_message("revoke_and_ack",
1281 channel_id=chan.channel_id,
1282 per_commitment_secret=rev.per_commitment_secret,
1283 next_per_commitment_point=rev.next_per_commitment_point)
1284 self.maybe_send_commitment(chan)
1285
1286 def on_commitment_signed(self, chan: Channel, payload):
1287 if chan.peer_state == PeerState.BAD:
1288 return
1289 self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
1290 # make sure there were changes to the ctx, otherwise the remote peer is misbehaving
1291 if not chan.has_pending_changes(LOCAL):
1292 # TODO if feerate changed A->B->A; so there were updates but the value is identical,
1293 # then it might be legal to send a commitment_signature
1294 # see https://github.com/lightningnetwork/lightning-rfc/pull/618
1295 raise RemoteMisbehaving('received commitment_signed without pending changes')
1296 # REMOTE should wait until we have revoked
1297 if chan.hm.is_revack_pending(LOCAL):
1298 raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
1299 data = payload["htlc_signature"]
1300 htlc_sigs = list(chunks(data, 64))
1301 chan.receive_new_commitment(payload["signature"], htlc_sigs)
1302 self.send_revoke_and_ack(chan)
1303
1304 def on_update_fulfill_htlc(self, chan: Channel, payload):
1305 preimage = payload["payment_preimage"]
1306 payment_hash = sha256(preimage)
1307 htlc_id = payload["id"]
1308 self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1309 chan.receive_htlc_settle(preimage, htlc_id) # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
1310 self.lnworker.save_preimage(payment_hash, preimage)
1311 self.maybe_send_commitment(chan)
1312
1313 def on_update_fail_malformed_htlc(self, chan: Channel, payload):
1314 htlc_id = payload["id"]
1315 failure_code = payload["failure_code"]
1316 self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
1317 f"htlc_id {htlc_id}. failure_code={failure_code}")
1318 if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
1319 asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
1320 raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
1321 reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
1322 chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
1323 self.maybe_send_commitment(chan)
1324
1325 def on_update_add_htlc(self, chan: Channel, payload):
1326 payment_hash = payload["payment_hash"]
1327 htlc_id = payload["id"]
1328 cltv_expiry = payload["cltv_expiry"]
1329 amount_msat_htlc = payload["amount_msat"]
1330 onion_packet = payload["onion_routing_packet"]
1331 htlc = UpdateAddHtlc(
1332 amount_msat=amount_msat_htlc,
1333 payment_hash=payment_hash,
1334 cltv_expiry=cltv_expiry,
1335 timestamp=int(time.time()),
1336 htlc_id=htlc_id)
1337 self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc={str(htlc)}")
1338 if chan.get_state() != ChannelState.OPEN:
1339 raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
1340 if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
1341 asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
1342 raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}")
1343 # add htlc
1344 chan.receive_htlc(htlc, onion_packet)
1345 util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
1346
1347 def maybe_forward_htlc(
1348 self, *,
1349 htlc: UpdateAddHtlc,
1350 processed_onion: ProcessedOnionPacket) -> Tuple[bytes, int]:
1351
1352 # Forward HTLC
1353 # FIXME: there are critical safety checks MISSING here
1354 # - for example; atm we forward first and then persist "forwarding_info",
1355 # so if we segfault in-between and restart, we might forward an HTLC twice...
1356 # (same for trampoline forwarding)
1357 forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
1358 if not forwarding_enabled:
1359 self.logger.info(f"forwarding is disabled. failing htlc.")
1360 raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
1361 chain = self.network.blockchain()
1362 if chain.is_tip_stale():
1363 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
1364 try:
1365 next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]
1366 except:
1367 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1368 next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
1369 local_height = chain.height()
1370 if next_chan is None:
1371 self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}")
1372 raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
1373 outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
1374 outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
1375 if not next_chan.can_send_update_add_htlc():
1376 self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. "
1377 f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
1378 data = outgoing_chan_upd_len + outgoing_chan_upd
1379 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
1380 try:
1381 next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1382 except:
1383 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1384 if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta:
1385 data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
1386 raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
1387 if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \
1388 or next_cltv_expiry <= local_height:
1389 data = outgoing_chan_upd_len + outgoing_chan_upd
1390 raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=data)
1391 if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
1392 raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
1393 try:
1394 next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
1395 except:
1396 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1397 forwarding_fees = fee_for_edge_msat(
1398 forwarded_amount_msat=next_amount_msat_htlc,
1399 fee_base_msat=next_chan.forwarding_fee_base_msat,
1400 fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
1401 if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
1402 data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
1403 raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
1404 self.logger.info(f'forwarding htlc to {next_chan.node_id}')
1405 next_htlc = UpdateAddHtlc(
1406 amount_msat=next_amount_msat_htlc,
1407 payment_hash=htlc.payment_hash,
1408 cltv_expiry=next_cltv_expiry,
1409 timestamp=int(time.time()))
1410 next_htlc = next_chan.add_htlc(next_htlc)
1411 next_peer = self.lnworker.peers[next_chan.node_id]
1412 try:
1413 next_peer.send_message(
1414 "update_add_htlc",
1415 channel_id=next_chan.channel_id,
1416 id=next_htlc.htlc_id,
1417 cltv_expiry=next_cltv_expiry,
1418 amount_msat=next_amount_msat_htlc,
1419 payment_hash=next_htlc.payment_hash,
1420 onion_routing_packet=processed_onion.next_packet.to_bytes()
1421 )
1422 except BaseException as e:
1423 self.logger.info(f"failed to forward htlc: error sending message. {e}")
1424 data = outgoing_chan_upd_len + outgoing_chan_upd
1425 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
1426 return next_chan_scid, next_htlc.htlc_id
1427
1428 def maybe_forward_trampoline(
1429 self, *,
1430 chan: Channel,
1431 htlc: UpdateAddHtlc,
1432 trampoline_onion: ProcessedOnionPacket):
1433
1434 payload = trampoline_onion.hop_data.payload
1435 payment_hash = htlc.payment_hash
1436 payment_secret = os.urandom(32)
1437 try:
1438 outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
1439 amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
1440 cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1441 if "invoice_features" in payload:
1442 self.logger.info('forward_trampoline: legacy')
1443 next_trampoline_onion = None
1444 invoice_features = payload["invoice_features"]["invoice_features"]
1445 invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
1446 else:
1447 self.logger.info('forward_trampoline: end-to-end')
1448 invoice_features = LnFeatures.BASIC_MPP_OPT
1449 next_trampoline_onion = trampoline_onion.next_packet
1450 except Exception as e:
1451 self.logger.exception('')
1452 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1453
1454 # these are the fee/cltv paid by the sender
1455 # pay_to_node will raise if they are not sufficient
1456 trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion
1457 trampoline_fee = htlc.amount_msat - amt_to_forward
1458
1459 @log_exceptions
1460 async def forward_trampoline_payment():
1461 try:
1462 await self.lnworker.pay_to_node(
1463 node_pubkey=outgoing_node_id,
1464 payment_hash=payment_hash,
1465 payment_secret=payment_secret,
1466 amount_to_pay=amt_to_forward,
1467 min_cltv_expiry=cltv_from_onion,
1468 r_tags=[],
1469 invoice_features=invoice_features,
1470 fwd_trampoline_onion=next_trampoline_onion,
1471 fwd_trampoline_fee=trampoline_fee,
1472 fwd_trampoline_cltv_delta=trampoline_cltv_delta,
1473 attempts=1)
1474 except OnionRoutingFailure as e:
1475 # FIXME: cannot use payment_hash as key
1476 self.lnworker.trampoline_forwarding_failures[payment_hash] = e
1477 except PaymentFailure as e:
1478 # FIXME: adapt the error code
1479 error_reason = OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
1480 self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason
1481
1482 asyncio.ensure_future(forward_trampoline_payment())
1483
1484 def maybe_fulfill_htlc(
1485 self, *,
1486 chan: Channel,
1487 htlc: UpdateAddHtlc,
1488 processed_onion: ProcessedOnionPacket,
1489 is_trampoline: bool = False) -> Tuple[Optional[bytes], Optional[OnionPacket]]:
1490
1491 """As a final recipient of an HTLC, decide if we should fulfill it.
1492 Return (preimage, trampoline_onion_packet) with at most a single element not None
1493 """
1494 def log_fail_reason(reason: str):
1495 self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
1496 f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
1497
1498 try:
1499 amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
1500 except:
1501 log_fail_reason(f"'amt_to_forward' missing from onion")
1502 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1503
1504 # Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
1505 # We should not release the preimage for an HTLC that its sender could already time out as
1506 # then they might try to force-close and it becomes a race.
1507 chain = self.network.blockchain()
1508 if chain.is_tip_stale():
1509 log_fail_reason(f"our chain tip is stale")
1510 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
1511 local_height = chain.height()
1512 exc_incorrect_or_unknown_pd = OnionRoutingFailure(
1513 code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
1514 data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big"))
1515 if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry:
1516 log_fail_reason(f"htlc.cltv_expiry is unreasonably close")
1517 raise exc_incorrect_or_unknown_pd
1518 try:
1519 cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
1520 except:
1521 log_fail_reason(f"'outgoing_cltv_value' missing from onion")
1522 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
1523
1524 if not is_trampoline:
1525 if cltv_from_onion != htlc.cltv_expiry:
1526 log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry")
1527 raise OnionRoutingFailure(
1528 code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
1529 data=htlc.cltv_expiry.to_bytes(4, byteorder="big"))
1530 try:
1531 total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
1532 except:
1533 total_msat = amt_to_forward # fall back to "amt_to_forward"
1534
1535 if not is_trampoline and amt_to_forward != htlc.amount_msat:
1536 log_fail_reason(f"amt_to_forward != htlc.amount_msat")
1537 raise OnionRoutingFailure(
1538 code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
1539 data=htlc.amount_msat.to_bytes(8, byteorder="big"))
1540
1541 try:
1542 payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]
1543 except:
1544 if total_msat > amt_to_forward:
1545 # payment_secret is required for MPP
1546 log_fail_reason(f"'payment_secret' missing from onion")
1547 raise exc_incorrect_or_unknown_pd
1548 # TODO fail here if invoice has set PAYMENT_SECRET_REQ
1549 payment_secret_from_onion = None
1550
1551 if total_msat > amt_to_forward:
1552 mpp_status = self.lnworker.check_received_mpp_htlc(payment_secret_from_onion, chan.short_channel_id, htlc, total_msat)
1553 if mpp_status is None:
1554 return None, None
1555 if mpp_status is False:
1556 log_fail_reason(f"MPP_TIMEOUT")
1557 raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
1558 assert mpp_status is True
1559
1560 # if there is a trampoline_onion, maybe_fulfill_htlc will be called again
1561 if processed_onion.trampoline_onion_packet:
1562 # TODO: we should check that all trampoline_onions are the same
1563 return None, processed_onion.trampoline_onion_packet
1564
1565 info = self.lnworker.get_payment_info(htlc.payment_hash)
1566 if info is None:
1567 log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
1568 raise exc_incorrect_or_unknown_pd
1569 preimage = self.lnworker.get_preimage(htlc.payment_hash)
1570 if payment_secret_from_onion:
1571 if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage):
1572 log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}')
1573 raise exc_incorrect_or_unknown_pd
1574 invoice_msat = info.amount_msat
1575 if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
1576 log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
1577 raise exc_incorrect_or_unknown_pd
1578 self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
1579 self.lnworker.set_request_status(htlc.payment_hash, PR_PAID)
1580 return preimage, None
1581
1582 def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
1583 self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
1584 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
1585 assert chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
1586 self.received_htlcs_pending_removal.add((chan, htlc_id))
1587 chan.settle_htlc(preimage, htlc_id)
1588 self.send_message(
1589 "update_fulfill_htlc",
1590 channel_id=chan.channel_id,
1591 id=htlc_id,
1592 payment_preimage=preimage)
1593
1594 def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
1595 self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
1596 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
1597 self.received_htlcs_pending_removal.add((chan, htlc_id))
1598 chan.fail_htlc(htlc_id)
1599 self.send_message(
1600 "update_fail_htlc",
1601 channel_id=chan.channel_id,
1602 id=htlc_id,
1603 len=len(error_bytes),
1604 reason=error_bytes)
1605
1606 def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
1607 self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
1608 assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
1609 if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
1610 raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
1611 self.received_htlcs_pending_removal.add((chan, htlc_id))
1612 chan.fail_htlc(htlc_id)
1613 self.send_message(
1614 "update_fail_malformed_htlc",
1615 channel_id=chan.channel_id,
1616 id=htlc_id,
1617 sha256_of_onion=reason.data,
1618 failure_code=reason.code)
1619
1620 def on_revoke_and_ack(self, chan: Channel, payload):
1621 if chan.peer_state == PeerState.BAD:
1622 return
1623 self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
1624 rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
1625 chan.receive_revocation(rev)
1626 self.lnworker.save_channel(chan)
1627 self.maybe_send_commitment(chan)
1628
1629 def on_update_fee(self, chan: Channel, payload):
1630 feerate = payload["feerate_per_kw"]
1631 chan.update_fee(feerate, False)
1632
1633 async def maybe_update_fee(self, chan: Channel):
1634 """
1635 called when our fee estimates change
1636 """
1637 if not chan.can_send_ctx_updates():
1638 return
1639 feerate_per_kw = self.lnworker.current_feerate_per_kw()
1640 if not chan.constraints.is_initiator:
1641 if constants.net is not constants.BitcoinRegtest:
1642 chan_feerate = chan.get_latest_feerate(LOCAL)
1643 ratio = chan_feerate / feerate_per_kw
1644 if ratio < 0.5:
1645 # Note that we trust the Electrum server about fee rates
1646 # Thus, automated force-closing might not be a good idea
1647 # Maybe we should display something in the GUI instead
1648 self.logger.warning(
1649 f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
1650 f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!")
1651 return
1652 chan_fee = chan.get_next_feerate(REMOTE)
1653 if feerate_per_kw < chan_fee / 2:
1654 self.logger.info("FEES HAVE FALLEN")
1655 elif feerate_per_kw > chan_fee * 2:
1656 self.logger.info("FEES HAVE RISEN")
1657 else:
1658 return
1659 self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. "
1660 f"new feerate {feerate_per_kw}")
1661 chan.update_fee(feerate_per_kw, True)
1662 self.send_message(
1663 "update_fee",
1664 channel_id=chan.channel_id,
1665 feerate_per_kw=feerate_per_kw)
1666 self.maybe_send_commitment(chan)
1667
1668 @log_exceptions
1669 async def close_channel(self, chan_id: bytes):
1670 chan = self.channels[chan_id]
1671 self.shutdown_received[chan_id] = asyncio.Future()
1672 await self.send_shutdown(chan)
1673 payload = await self.shutdown_received[chan_id]
1674 txid = await self._shutdown(chan, payload, is_local=True)
1675 self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
1676 return txid
1677
1678 async def on_shutdown(self, chan: Channel, payload):
1679 their_scriptpubkey = payload['scriptpubkey']
1680 their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
1681
1682 # BOLT-02 check if they use the upfront shutdown script they advertized
1683 if their_upfront_scriptpubkey:
1684 if not (their_scriptpubkey == their_upfront_scriptpubkey):
1685 raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening")
1686
1687 # BOLT-02 restrict the scriptpubkey to some templates:
1688 if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0)
1689 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH)
1690 or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)):
1691 raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}')
1692 chan_id = chan.channel_id
1693 if chan_id in self.shutdown_received:
1694 self.shutdown_received[chan_id].set_result(payload)
1695 else:
1696 chan = self.channels[chan_id]
1697 await self.send_shutdown(chan)
1698 txid = await self._shutdown(chan, payload, is_local=False)
1699 self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
1700
1701 def can_send_shutdown(self, chan: Channel):
1702 if chan.get_state() >= ChannelState.OPENING:
1703 return True
1704 if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
1705 return True
1706 if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
1707 return True
1708 return False
1709
1710 async def send_shutdown(self, chan: Channel):
1711 if not self.can_send_shutdown(chan):
1712 raise Exception('cannot send shutdown')
1713
1714 if chan.config[LOCAL].upfront_shutdown_script:
1715 scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
1716 else:
1717 scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
1718 assert scriptpubkey
1719
1720 # wait until no more pending updates (bolt2)
1721 chan.set_can_send_ctx_updates(False)
1722 while chan.has_pending_changes(REMOTE):
1723 await asyncio.sleep(0.1)
1724 self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
1725 chan.set_state(ChannelState.SHUTDOWN)
1726 # can fullfill or fail htlcs. cannot add htlcs, because state != OPEN
1727 chan.set_can_send_ctx_updates(True)
1728
1729 @log_exceptions
1730 async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
1731 # wait until no HTLCs remain in either commitment transaction
1732 while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0:
1733 self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
1734 await asyncio.sleep(1)
1735 # if no HTLCs remain, we must not send updates
1736 chan.set_can_send_ctx_updates(False)
1737 their_scriptpubkey = payload['scriptpubkey']
1738 if chan.config[LOCAL].upfront_shutdown_script:
1739 our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
1740 else:
1741 our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
1742 assert our_scriptpubkey
1743
1744 # estimate fee of closing tx
1745 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
1746 fee_rate = self.network.config.fee_per_kb()
1747 our_fee = fee_rate * closing_tx.estimated_size() // 1000
1748 # BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
1749 max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
1750 our_fee = min(our_fee, max_fee)
1751 drop_remote = False
1752 def send_closing_signed():
1753 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
1754 self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
1755 def verify_signature(tx, sig):
1756 their_pubkey = chan.config[REMOTE].multisig_key.pubkey
1757 preimage_hex = tx.serialize_preimage(0)
1758 pre_hash = sha256d(bfh(preimage_hex))
1759 return ecc.verify_signature(their_pubkey, sig, pre_hash)
1760 # the funder sends the first 'closing_signed' message
1761 if chan.constraints.is_initiator:
1762 send_closing_signed()
1763 # negotiate fee
1764 while True:
1765 # FIXME: the remote SHOULD send closing_signed, but some don't.
1766 cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
1767 their_fee = cs_payload['fee_satoshis']
1768 if their_fee > max_fee:
1769 raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}')
1770 their_sig = cs_payload['signature']
1771 # verify their sig: they might have dropped their output
1772 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
1773 if verify_signature(closing_tx, their_sig):
1774 drop_remote = False
1775 else:
1776 our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
1777 if verify_signature(closing_tx, their_sig):
1778 drop_remote = True
1779 else:
1780 raise Exception('failed to verify their signature')
1781 # Agree if difference is lower or equal to one (see below)
1782 if abs(our_fee - their_fee) < 2:
1783 our_fee = their_fee
1784 break
1785 # this will be "strictly between" (as in BOLT2) previous values because of the above
1786 our_fee = (our_fee + their_fee) // 2
1787 # another round
1788 send_closing_signed()
1789 # the non-funder replies
1790 if not chan.constraints.is_initiator:
1791 send_closing_signed()
1792 # add signatures
1793 closing_tx.add_signature_to_txin(
1794 txin_idx=0,
1795 signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(),
1796 sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01'))
1797 closing_tx.add_signature_to_txin(
1798 txin_idx=0,
1799 signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(),
1800 sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01'))
1801 # save local transaction and set state
1802 try:
1803 self.lnworker.wallet.add_transaction(closing_tx)
1804 except UnrelatedTransactionException:
1805 pass # this can happen if (~all the balance goes to REMOTE)
1806 chan.set_state(ChannelState.CLOSING)
1807 # broadcast
1808 await self.network.try_broadcasting(closing_tx, 'closing')
1809 return closing_tx.txid()
1810
1811 async def htlc_switch(self):
1812 await self.initialized
1813 while True:
1814 self._htlc_switch_iterdone_event.set()
1815 self._htlc_switch_iterdone_event.clear()
1816 await asyncio.sleep(0.1) # TODO maybe make this partly event-driven
1817 self._htlc_switch_iterstart_event.set()
1818 self._htlc_switch_iterstart_event.clear()
1819 self.ping_if_required()
1820 self._maybe_cleanup_received_htlcs_pending_removal()
1821 for chan_id, chan in self.channels.items():
1822 if not chan.can_send_ctx_updates():
1823 continue
1824 self.maybe_send_commitment(chan)
1825 done = set()
1826 unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
1827 for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items():
1828 if not chan.hm.is_htlc_irrevocably_added_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1829 continue
1830 htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
1831 error_reason = None # type: Optional[OnionRoutingFailure]
1832 error_bytes = None # type: Optional[bytes]
1833 preimage = None
1834 fw_info = None
1835 onion_packet_bytes = bytes.fromhex(onion_packet_hex)
1836 onion_packet = None
1837 try:
1838 onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
1839 except OnionRoutingFailure as e:
1840 error_reason = e
1841 else:
1842 try:
1843 preimage, fw_info, error_bytes = await self.process_unfulfilled_htlc(
1844 chan=chan,
1845 htlc=htlc,
1846 forwarding_info=forwarding_info,
1847 onion_packet_bytes=onion_packet_bytes,
1848 onion_packet=onion_packet)
1849 except OnionRoutingFailure as e:
1850 error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey)
1851 if fw_info:
1852 unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info
1853 elif preimage or error_reason or error_bytes:
1854 if preimage:
1855 await self.lnworker.enable_htlc_settle.wait()
1856 self.fulfill_htlc(chan, htlc.htlc_id, preimage)
1857 elif error_bytes:
1858 self.fail_htlc(
1859 chan=chan,
1860 htlc_id=htlc.htlc_id,
1861 error_bytes=error_bytes)
1862 else:
1863 self.fail_malformed_htlc(
1864 chan=chan,
1865 htlc_id=htlc.htlc_id,
1866 reason=error_reason)
1867 done.add(htlc_id)
1868 # cleanup
1869 for htlc_id in done:
1870 unfulfilled.pop(htlc_id)
1871
1872 def _maybe_cleanup_received_htlcs_pending_removal(self) -> None:
1873 done = set()
1874 for chan, htlc_id in self.received_htlcs_pending_removal:
1875 if chan.hm.is_htlc_irrevocably_removed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
1876 done.add((chan, htlc_id))
1877 if done:
1878 for key in done:
1879 self.received_htlcs_pending_removal.remove(key)
1880 self.received_htlc_removed_event.set()
1881 self.received_htlc_removed_event.clear()
1882
1883 async def wait_one_htlc_switch_iteration(self) -> None:
1884 """Waits until the HTLC switch does a full iteration or the peer disconnects,
1885 whichever happens first.
1886 """
1887 async def htlc_switch_iteration():
1888 await self._htlc_switch_iterstart_event.wait()
1889 await self._htlc_switch_iterdone_event.wait()
1890
1891 async with TaskGroup(wait=any) as group:
1892 await group.spawn(htlc_switch_iteration())
1893 await group.spawn(self.got_disconnected.wait())
1894
1895 async def process_unfulfilled_htlc(
1896 self, *,
1897 chan: Channel,
1898 htlc: UpdateAddHtlc,
1899 forwarding_info: Tuple[str, int],
1900 onion_packet_bytes: bytes,
1901 onion_packet: OnionPacket) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]:
1902 """
1903 return (preimage, fw_info, error_bytes) with at most a single element that is not None
1904 raise an OnionRoutingFailure if we need to fail the htlc
1905 """
1906 payment_hash = htlc.payment_hash
1907 processed_onion = self.process_onion_packet(
1908 onion_packet,
1909 payment_hash=payment_hash,
1910 onion_packet_bytes=onion_packet_bytes)
1911 if processed_onion.are_we_final:
1912 preimage, trampoline_onion_packet = self.maybe_fulfill_htlc(
1913 chan=chan,
1914 htlc=htlc,
1915 processed_onion=processed_onion)
1916 # trampoline forwarding
1917 if trampoline_onion_packet:
1918 if not forwarding_info:
1919 trampoline_onion = self.process_onion_packet(
1920 trampoline_onion_packet,
1921 payment_hash=htlc.payment_hash,
1922 onion_packet_bytes=onion_packet_bytes,
1923 is_trampoline=True)
1924 if trampoline_onion.are_we_final:
1925 preimage, _ = self.maybe_fulfill_htlc(
1926 chan=chan,
1927 htlc=htlc,
1928 processed_onion=trampoline_onion,
1929 is_trampoline=True)
1930 else:
1931 await self.lnworker.enable_htlc_forwarding.wait()
1932 self.maybe_forward_trampoline(
1933 chan=chan,
1934 htlc=htlc,
1935 trampoline_onion=trampoline_onion)
1936 # return True so that this code gets executed only once
1937 return None, True, None
1938 else:
1939 preimage = self.lnworker.get_preimage(payment_hash)
1940 error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None)
1941 if error_reason:
1942 self.logger.info(f'trampoline forwarding failure: {error_reason.code_name()}')
1943 raise error_reason
1944
1945 elif not forwarding_info:
1946 await self.lnworker.enable_htlc_forwarding.wait()
1947 next_chan_id, next_htlc_id = self.maybe_forward_htlc(
1948 htlc=htlc,
1949 processed_onion=processed_onion)
1950 fw_info = (next_chan_id.hex(), next_htlc_id)
1951 return None, fw_info, None
1952 else:
1953 preimage = self.lnworker.get_preimage(payment_hash)
1954 next_chan_id_hex, htlc_id = forwarding_info
1955 next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex))
1956 if next_chan:
1957 error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id)
1958 if error_bytes:
1959 return None, None, error_bytes
1960 if error_reason:
1961 raise error_reason
1962 if preimage:
1963 return preimage, None, None
1964 return None, None, None
1965
1966 def process_onion_packet(
1967 self,
1968 onion_packet: OnionPacket, *,
1969 payment_hash: bytes,
1970 onion_packet_bytes: bytes,
1971 is_trampoline: bool = False) -> ProcessedOnionPacket:
1972
1973 failure_data = sha256(onion_packet_bytes)
1974 try:
1975 processed_onion = process_onion_packet(
1976 onion_packet,
1977 associated_data=payment_hash,
1978 our_onion_private_key=self.privkey,
1979 is_trampoline=is_trampoline)
1980 except UnsupportedOnionPacketVersion:
1981 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
1982 except InvalidOnionPubkey:
1983 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
1984 except InvalidOnionMac:
1985 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
1986 except Exception as e:
1987 self.logger.info(f"error processing onion packet: {e!r}")
1988 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
1989 if self.network.config.get('test_fail_malformed_htlc'):
1990 raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
1991 if self.network.config.get('test_fail_htlcs_with_temp_node_failure'):
1992 raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
1993 return processed_onion