URI: 
       tlnonion.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tlnonion.py (23032B)
       ---
            1 # -*- coding: utf-8 -*-
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2018 The Electrum developers
            5 #
            6 # Permission is hereby granted, free of charge, to any person
            7 # obtaining a copy of this software and associated documentation files
            8 # (the "Software"), to deal in the Software without restriction,
            9 # including without limitation the rights to use, copy, modify, merge,
           10 # publish, distribute, sublicense, and/or sell copies of the Software,
           11 # and to permit persons to whom the Software is furnished to do so,
           12 # subject to the following conditions:
           13 #
           14 # The above copyright notice and this permission notice shall be
           15 # included in all copies or substantial portions of the Software.
           16 #
           17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           24 # SOFTWARE.
           25 
           26 import io
           27 import hashlib
           28 from typing import Sequence, List, Tuple, NamedTuple, TYPE_CHECKING
           29 from enum import IntEnum, IntFlag
           30 
           31 from . import ecc
           32 from .crypto import sha256, hmac_oneshot, chacha20_encrypt
           33 from .util import bh2u, profiler, xor_bytes, bfh
           34 from .lnutil import (get_ecdh, PaymentFailure, NUM_MAX_HOPS_IN_PAYMENT_PATH,
           35                      NUM_MAX_EDGES_IN_PAYMENT_PATH, ShortChannelID, OnionFailureCodeMetaFlag)
           36 from .lnmsg import OnionWireSerializer, read_bigsize_int, write_bigsize_int
           37 
           38 if TYPE_CHECKING:
           39     from .lnrouter import LNPaymentRoute
           40 
           41 
           42 HOPS_DATA_SIZE = 1300      # also sometimes called routingInfoSize in bolt-04
           43 TRAMPOLINE_HOPS_DATA_SIZE = 400
           44 LEGACY_PER_HOP_FULL_SIZE = 65
           45 PER_HOP_HMAC_SIZE = 32
           46 
           47 
           48 class UnsupportedOnionPacketVersion(Exception): pass
           49 class InvalidOnionMac(Exception): pass
           50 class InvalidOnionPubkey(Exception): pass
           51 
           52 
           53 class LegacyHopDataPayload:
           54 
           55     def __init__(self, *, short_channel_id: bytes, amt_to_forward: int, outgoing_cltv_value: int):
           56         self.short_channel_id = ShortChannelID(short_channel_id)
           57         self.amt_to_forward = amt_to_forward
           58         self.outgoing_cltv_value = outgoing_cltv_value
           59 
           60     def to_bytes(self) -> bytes:
           61         ret = self.short_channel_id
           62         ret += int.to_bytes(self.amt_to_forward, length=8, byteorder="big", signed=False)
           63         ret += int.to_bytes(self.outgoing_cltv_value, length=4, byteorder="big", signed=False)
           64         ret += bytes(12)  # padding
           65         if len(ret) != 32:
           66             raise Exception('unexpected length {}'.format(len(ret)))
           67         return ret
           68 
           69     def to_tlv_dict(self) -> dict:
           70         d = {
           71             "amt_to_forward": {"amt_to_forward": self.amt_to_forward},
           72             "outgoing_cltv_value": {"outgoing_cltv_value": self.outgoing_cltv_value},
           73             "short_channel_id": {"short_channel_id": self.short_channel_id},
           74         }
           75         return d
           76 
           77     @classmethod
           78     def from_bytes(cls, b: bytes) -> 'LegacyHopDataPayload':
           79         if len(b) != 32:
           80             raise Exception('unexpected length {}'.format(len(b)))
           81         return LegacyHopDataPayload(
           82             short_channel_id=b[:8],
           83             amt_to_forward=int.from_bytes(b[8:16], byteorder="big", signed=False),
           84             outgoing_cltv_value=int.from_bytes(b[16:20], byteorder="big", signed=False),
           85         )
           86 
           87     @classmethod
           88     def from_tlv_dict(cls, d: dict) -> 'LegacyHopDataPayload':
           89         return LegacyHopDataPayload(
           90             short_channel_id=d["short_channel_id"]["short_channel_id"] if "short_channel_id" in d else b"\x00" * 8,
           91             amt_to_forward=d["amt_to_forward"]["amt_to_forward"],
           92             outgoing_cltv_value=d["outgoing_cltv_value"]["outgoing_cltv_value"],
           93         )
           94 
           95 
           96 class OnionHopsDataSingle:  # called HopData in lnd
           97 
           98     def __init__(self, *, is_tlv_payload: bool, payload: dict = None):
           99         self.is_tlv_payload = is_tlv_payload
          100         if payload is None:
          101             payload = {}
          102         self.payload = payload
          103         self.hmac = None
          104         self._raw_bytes_payload = None  # used in unit tests
          105 
          106     def to_bytes(self) -> bytes:
          107         hmac_ = self.hmac if self.hmac is not None else bytes(PER_HOP_HMAC_SIZE)
          108         if self._raw_bytes_payload is not None:
          109             ret = write_bigsize_int(len(self._raw_bytes_payload))
          110             ret += self._raw_bytes_payload
          111             ret += hmac_
          112             return ret
          113         if not self.is_tlv_payload:
          114             ret = b"\x00"  # realm==0
          115             legacy_payload = LegacyHopDataPayload.from_tlv_dict(self.payload)
          116             ret += legacy_payload.to_bytes()
          117             ret += hmac_
          118             if len(ret) != LEGACY_PER_HOP_FULL_SIZE:
          119                 raise Exception('unexpected length {}'.format(len(ret)))
          120             return ret
          121         else:  # tlv
          122             payload_fd = io.BytesIO()
          123             OnionWireSerializer.write_tlv_stream(fd=payload_fd,
          124                                                  tlv_stream_name="tlv_payload",
          125                                                  **self.payload)
          126             payload_bytes = payload_fd.getvalue()
          127             with io.BytesIO() as fd:
          128                 fd.write(write_bigsize_int(len(payload_bytes)))
          129                 fd.write(payload_bytes)
          130                 fd.write(hmac_)
          131                 return fd.getvalue()
          132 
          133     @classmethod
          134     def from_fd(cls, fd: io.BytesIO) -> 'OnionHopsDataSingle':
          135         first_byte = fd.read(1)
          136         if len(first_byte) == 0:
          137             raise Exception(f"unexpected EOF")
          138         fd.seek(-1, io.SEEK_CUR)  # undo read
          139         if first_byte == b'\x00':
          140             # legacy hop data format
          141             b = fd.read(LEGACY_PER_HOP_FULL_SIZE)
          142             if len(b) != LEGACY_PER_HOP_FULL_SIZE:
          143                 raise Exception(f'unexpected length {len(b)}')
          144             ret = OnionHopsDataSingle(is_tlv_payload=False)
          145             legacy_payload = LegacyHopDataPayload.from_bytes(b[1:33])
          146             ret.payload = legacy_payload.to_tlv_dict()
          147             ret.hmac = b[33:]
          148             return ret
          149         elif first_byte == b'\x01':
          150             # reserved for future use
          151             raise Exception("unsupported hop payload: length==1")
          152         else:
          153             hop_payload_length = read_bigsize_int(fd)
          154             hop_payload = fd.read(hop_payload_length)
          155             if hop_payload_length != len(hop_payload):
          156                 raise Exception(f"unexpected EOF")
          157             ret = OnionHopsDataSingle(is_tlv_payload=True)
          158             ret.payload = OnionWireSerializer.read_tlv_stream(fd=io.BytesIO(hop_payload),
          159                                                               tlv_stream_name="tlv_payload")
          160             ret.hmac = fd.read(PER_HOP_HMAC_SIZE)
          161             assert len(ret.hmac) == PER_HOP_HMAC_SIZE
          162             return ret
          163 
          164     def __repr__(self):
          165         return f"<OnionHopsDataSingle. is_tlv_payload={self.is_tlv_payload}. payload={self.payload}. hmac={self.hmac}>"
          166 
          167 
          168 class OnionPacket:
          169 
          170     def __init__(self, public_key: bytes, hops_data: bytes, hmac: bytes):
          171         assert len(public_key) == 33
          172         assert len(hops_data) in [ HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE ]
          173         assert len(hmac) == PER_HOP_HMAC_SIZE
          174         self.version = 0
          175         self.public_key = public_key
          176         self.hops_data = hops_data  # also called RoutingInfo in bolt-04
          177         self.hmac = hmac
          178         if not ecc.ECPubkey.is_pubkey_bytes(public_key):
          179             raise InvalidOnionPubkey()
          180 
          181     def to_bytes(self) -> bytes:
          182         ret = bytes([self.version])
          183         ret += self.public_key
          184         ret += self.hops_data
          185         ret += self.hmac
          186         if len(ret) - 66 not in [ HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE ]:
          187             raise Exception('unexpected length {}'.format(len(ret)))
          188         return ret
          189 
          190     @classmethod
          191     def from_bytes(cls, b: bytes):
          192         if len(b) - 66 not in [ HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE ]:
          193             raise Exception('unexpected length {}'.format(len(b)))
          194         version = b[0]
          195         if version != 0:
          196             raise UnsupportedOnionPacketVersion('version {} is not supported'.format(version))
          197         return OnionPacket(
          198             public_key=b[1:34],
          199             hops_data=b[34:-32],
          200             hmac=b[-32:]
          201         )
          202 
          203 
          204 def get_bolt04_onion_key(key_type: bytes, secret: bytes) -> bytes:
          205     if key_type not in (b'rho', b'mu', b'um', b'ammag', b'pad'):
          206         raise Exception('invalid key_type {}'.format(key_type))
          207     key = hmac_oneshot(key_type, msg=secret, digest=hashlib.sha256)
          208     return key
          209 
          210 
          211 def get_shared_secrets_along_route(payment_path_pubkeys: Sequence[bytes],
          212                                    session_key: bytes) -> Sequence[bytes]:
          213     num_hops = len(payment_path_pubkeys)
          214     hop_shared_secrets = num_hops * [b'']
          215     ephemeral_key = session_key
          216     # compute shared key for each hop
          217     for i in range(0, num_hops):
          218         hop_shared_secrets[i] = get_ecdh(ephemeral_key, payment_path_pubkeys[i])
          219         ephemeral_pubkey = ecc.ECPrivkey(ephemeral_key).get_public_key_bytes()
          220         blinding_factor = sha256(ephemeral_pubkey + hop_shared_secrets[i])
          221         blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
          222         ephemeral_key_int = int.from_bytes(ephemeral_key, byteorder="big")
          223         ephemeral_key_int = ephemeral_key_int * blinding_factor_int % ecc.CURVE_ORDER
          224         ephemeral_key = ephemeral_key_int.to_bytes(32, byteorder="big")
          225     return hop_shared_secrets
          226 
          227 
          228 def new_onion_packet(payment_path_pubkeys: Sequence[bytes], session_key: bytes,
          229                      hops_data: Sequence[OnionHopsDataSingle], associated_data: bytes, trampoline=False) -> OnionPacket:
          230     num_hops = len(payment_path_pubkeys)
          231     assert num_hops == len(hops_data)
          232     hop_shared_secrets = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
          233 
          234     data_size = TRAMPOLINE_HOPS_DATA_SIZE if trampoline else HOPS_DATA_SIZE
          235     filler = _generate_filler(b'rho', hops_data, hop_shared_secrets, data_size)
          236     next_hmac = bytes(PER_HOP_HMAC_SIZE)
          237 
          238     # Our starting packet needs to be filled out with random bytes, we
          239     # generate some deterministically using the session private key.
          240     pad_key = get_bolt04_onion_key(b'pad', session_key)
          241     mix_header = generate_cipher_stream(pad_key, data_size)
          242 
          243     # compute routing info and MAC for each hop
          244     for i in range(num_hops-1, -1, -1):
          245         rho_key = get_bolt04_onion_key(b'rho', hop_shared_secrets[i])
          246         mu_key = get_bolt04_onion_key(b'mu', hop_shared_secrets[i])
          247         hops_data[i].hmac = next_hmac
          248         stream_bytes = generate_cipher_stream(rho_key, data_size)
          249         hop_data_bytes = hops_data[i].to_bytes()
          250         mix_header = mix_header[:-len(hop_data_bytes)]
          251         mix_header = hop_data_bytes + mix_header
          252         mix_header = xor_bytes(mix_header, stream_bytes)
          253         if i == num_hops - 1 and len(filler) != 0:
          254             mix_header = mix_header[:-len(filler)] + filler
          255         packet = mix_header + associated_data
          256         next_hmac = hmac_oneshot(mu_key, msg=packet, digest=hashlib.sha256)
          257 
          258     return OnionPacket(
          259         public_key=ecc.ECPrivkey(session_key).get_public_key_bytes(),
          260         hops_data=mix_header,
          261         hmac=next_hmac)
          262 
          263 
          264 def calc_hops_data_for_payment(
          265         route: 'LNPaymentRoute',
          266         amount_msat: int,
          267         final_cltv: int, *,
          268         total_msat=None,
          269         payment_secret: bytes = None) -> Tuple[List[OnionHopsDataSingle], int, int]:
          270 
          271     """Returns the hops_data to be used for constructing an onion packet,
          272     and the amount_msat and cltv to be used on our immediate channel.
          273     """
          274     if len(route) > NUM_MAX_EDGES_IN_PAYMENT_PATH:
          275         raise PaymentFailure(f"too long route ({len(route)} edges)")
          276     # payload that will be seen by the last hop:
          277     amt = amount_msat
          278     cltv = final_cltv
          279     hop_payload = {
          280         "amt_to_forward": {"amt_to_forward": amt},
          281         "outgoing_cltv_value": {"outgoing_cltv_value": cltv},
          282     }
          283     # for multipart payments we need to tell the receiver about the total and
          284     # partial amounts
          285     if payment_secret is not None:
          286         hop_payload["payment_data"] = {
          287             "payment_secret": payment_secret,
          288             "total_msat": total_msat,
          289             "amount_msat": amt
          290         }
          291     hops_data = [OnionHopsDataSingle(
          292         is_tlv_payload=route[-1].has_feature_varonion(), payload=hop_payload)]
          293     # payloads, backwards from last hop (but excluding the first edge):
          294     for edge_index in range(len(route) - 1, 0, -1):
          295         route_edge = route[edge_index]
          296         is_trampoline = route_edge.is_trampoline()
          297         if is_trampoline:
          298             amt += route_edge.fee_for_edge(amt)
          299             cltv += route_edge.cltv_expiry_delta
          300         hop_payload = {
          301             "amt_to_forward": {"amt_to_forward": amt},
          302             "outgoing_cltv_value": {"outgoing_cltv_value": cltv},
          303             "short_channel_id": {"short_channel_id": route_edge.short_channel_id},
          304         }
          305         hops_data.append(
          306             OnionHopsDataSingle(
          307                 is_tlv_payload=route[edge_index-1].has_feature_varonion(),
          308                 payload=hop_payload))
          309         if not is_trampoline:
          310             amt += route_edge.fee_for_edge(amt)
          311             cltv += route_edge.cltv_expiry_delta
          312     hops_data.reverse()
          313     return hops_data, amt, cltv
          314 
          315 
          316 def _generate_filler(key_type: bytes, hops_data: Sequence[OnionHopsDataSingle],
          317                      shared_secrets: Sequence[bytes], data_size:int) -> bytes:
          318     num_hops = len(hops_data)
          319 
          320     # generate filler that matches all but the last hop (no HMAC for last hop)
          321     filler_size = 0
          322     for hop_data in hops_data[:-1]:
          323         filler_size += len(hop_data.to_bytes())
          324     filler = bytearray(filler_size)
          325 
          326     for i in range(0, num_hops-1):  # -1, as last hop does not obfuscate
          327         # Sum up how many frames were used by prior hops.
          328         filler_start = data_size
          329         for hop_data in hops_data[:i]:
          330             filler_start -= len(hop_data.to_bytes())
          331         # The filler is the part dangling off of the end of the
          332         # routingInfo, so offset it from there, and use the current
          333         # hop's frame count as its size.
          334         filler_end = data_size + len(hops_data[i].to_bytes())
          335 
          336         stream_key = get_bolt04_onion_key(key_type, shared_secrets[i])
          337         stream_bytes = generate_cipher_stream(stream_key, 2 * data_size)
          338         filler = xor_bytes(filler, stream_bytes[filler_start:filler_end])
          339         filler += bytes(filler_size - len(filler))  # right pad with zeroes
          340 
          341     return filler
          342 
          343 
          344 def generate_cipher_stream(stream_key: bytes, num_bytes: int) -> bytes:
          345     return chacha20_encrypt(key=stream_key,
          346                             nonce=bytes(8),
          347                             data=bytes(num_bytes))
          348 
          349 
          350 class ProcessedOnionPacket(NamedTuple):
          351     are_we_final: bool
          352     hop_data: OnionHopsDataSingle
          353     next_packet: OnionPacket
          354     trampoline_onion_packet: OnionPacket
          355 
          356 
          357 # TODO replay protection
          358 def process_onion_packet(
          359         onion_packet: OnionPacket,
          360         associated_data: bytes,
          361         our_onion_private_key: bytes,
          362         is_trampoline=False) -> ProcessedOnionPacket:
          363     if not ecc.ECPubkey.is_pubkey_bytes(onion_packet.public_key):
          364         raise InvalidOnionPubkey()
          365     shared_secret = get_ecdh(our_onion_private_key, onion_packet.public_key)
          366     # check message integrity
          367     mu_key = get_bolt04_onion_key(b'mu', shared_secret)
          368     calculated_mac = hmac_oneshot(
          369         mu_key, msg=onion_packet.hops_data+associated_data,
          370         digest=hashlib.sha256)
          371     if onion_packet.hmac != calculated_mac:
          372         raise InvalidOnionMac()
          373     # peel an onion layer off
          374     rho_key = get_bolt04_onion_key(b'rho', shared_secret)
          375     data_size = TRAMPOLINE_HOPS_DATA_SIZE if is_trampoline else HOPS_DATA_SIZE
          376     stream_bytes = generate_cipher_stream(rho_key, 2 * data_size)
          377     padded_header = onion_packet.hops_data + bytes(data_size)
          378     next_hops_data = xor_bytes(padded_header, stream_bytes)
          379     next_hops_data_fd = io.BytesIO(next_hops_data)
          380     hop_data = OnionHopsDataSingle.from_fd(next_hops_data_fd)
          381     # trampoline
          382     trampoline_onion_packet = hop_data.payload.get('trampoline_onion_packet')
          383     if trampoline_onion_packet:
          384         top_version = trampoline_onion_packet.get('version')
          385         top_public_key = trampoline_onion_packet.get('public_key')
          386         top_hops_data = trampoline_onion_packet.get('hops_data')
          387         top_hops_data_fd = io.BytesIO(top_hops_data)
          388         top_hmac = trampoline_onion_packet.get('hmac')
          389         trampoline_onion_packet = OnionPacket(
          390             public_key=top_public_key,
          391             hops_data=top_hops_data_fd.read(TRAMPOLINE_HOPS_DATA_SIZE),
          392             hmac=top_hmac)
          393     # calc next ephemeral key
          394     blinding_factor = sha256(onion_packet.public_key + shared_secret)
          395     blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
          396     next_public_key_int = ecc.ECPubkey(onion_packet.public_key) * blinding_factor_int
          397     next_public_key = next_public_key_int.get_public_key_bytes()
          398     next_onion_packet = OnionPacket(
          399         public_key=next_public_key,
          400         hops_data=next_hops_data_fd.read(data_size),
          401         hmac=hop_data.hmac)
          402     if hop_data.hmac == bytes(PER_HOP_HMAC_SIZE):
          403         # we are the destination / exit node
          404         are_we_final = True
          405     else:
          406         # we are an intermediate node; forwarding
          407         are_we_final = False
          408     return ProcessedOnionPacket(are_we_final, hop_data, next_onion_packet, trampoline_onion_packet)
          409 
          410 
          411 class FailedToDecodeOnionError(Exception): pass
          412 
          413 
          414 class OnionRoutingFailure(Exception):
          415 
          416     def __init__(self, code: int, data: bytes):
          417         self.code = code
          418         self.data = data
          419 
          420     def __repr__(self):
          421         return repr((self.code, self.data))
          422 
          423     def to_bytes(self) -> bytes:
          424         ret = self.code.to_bytes(2, byteorder="big")
          425         ret += self.data
          426         return ret
          427 
          428     @classmethod
          429     def from_bytes(cls, failure_msg: bytes):
          430         failure_code = int.from_bytes(failure_msg[:2], byteorder='big')
          431         try:
          432             failure_code = OnionFailureCode(failure_code)
          433         except ValueError:
          434             pass  # uknown failure code
          435         failure_data = failure_msg[2:]
          436         return OnionRoutingFailure(failure_code, failure_data)
          437 
          438     def code_name(self) -> str:
          439         if isinstance(self.code, OnionFailureCode):
          440             return str(self.code.name)
          441         return f"Unknown error ({self.code!r})"
          442 
          443 
          444 def construct_onion_error(
          445         reason: OnionRoutingFailure,
          446         onion_packet: OnionPacket,
          447         our_onion_private_key: bytes,
          448 ) -> bytes:
          449     # create payload
          450     failure_msg = reason.to_bytes()
          451     failure_len = len(failure_msg)
          452     pad_len = 256 - failure_len
          453     assert pad_len >= 0
          454     error_packet =  failure_len.to_bytes(2, byteorder="big")
          455     error_packet += failure_msg
          456     error_packet += pad_len.to_bytes(2, byteorder="big")
          457     error_packet += bytes(pad_len)
          458     # add hmac
          459     shared_secret = get_ecdh(our_onion_private_key, onion_packet.public_key)
          460     um_key = get_bolt04_onion_key(b'um', shared_secret)
          461     hmac_ = hmac_oneshot(um_key, msg=error_packet, digest=hashlib.sha256)
          462     error_packet = hmac_ + error_packet
          463     # obfuscate
          464     ammag_key = get_bolt04_onion_key(b'ammag', shared_secret)
          465     stream_bytes = generate_cipher_stream(ammag_key, len(error_packet))
          466     error_packet = xor_bytes(error_packet, stream_bytes)
          467     return error_packet
          468 
          469 
          470 def _decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes],
          471                         session_key: bytes) -> Tuple[bytes, int]:
          472     """Returns the decoded error bytes, and the index of the sender of the error."""
          473     num_hops = len(payment_path_pubkeys)
          474     hop_shared_secrets = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
          475     for i in range(num_hops):
          476         ammag_key = get_bolt04_onion_key(b'ammag', hop_shared_secrets[i])
          477         um_key = get_bolt04_onion_key(b'um', hop_shared_secrets[i])
          478         stream_bytes = generate_cipher_stream(ammag_key, len(error_packet))
          479         error_packet = xor_bytes(error_packet, stream_bytes)
          480         hmac_computed = hmac_oneshot(um_key, msg=error_packet[32:], digest=hashlib.sha256)
          481         hmac_found = error_packet[:32]
          482         if hmac_computed == hmac_found:
          483             return error_packet, i
          484     raise FailedToDecodeOnionError()
          485 
          486 
          487 def decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes],
          488                        session_key: bytes) -> (OnionRoutingFailure, int):
          489     """Returns the failure message, and the index of the sender of the error."""
          490     decrypted_error, sender_index = _decode_onion_error(error_packet, payment_path_pubkeys, session_key)
          491     failure_msg = get_failure_msg_from_onion_error(decrypted_error)
          492     return failure_msg, sender_index
          493 
          494 
          495 def get_failure_msg_from_onion_error(decrypted_error_packet: bytes) -> OnionRoutingFailure:
          496     # get failure_msg bytes from error packet
          497     failure_len = int.from_bytes(decrypted_error_packet[32:34], byteorder='big')
          498     failure_msg = decrypted_error_packet[34:34+failure_len]
          499     # create failure message object
          500     return OnionRoutingFailure.from_bytes(failure_msg)
          501 
          502 
          503 
          504 # TODO maybe we should rm this and just use OnionWireSerializer and onion_wire.csv
          505 BADONION = OnionFailureCodeMetaFlag.BADONION
          506 PERM     = OnionFailureCodeMetaFlag.PERM
          507 NODE     = OnionFailureCodeMetaFlag.NODE
          508 UPDATE   = OnionFailureCodeMetaFlag.UPDATE
          509 class OnionFailureCode(IntEnum):
          510     INVALID_REALM =                           PERM | 1
          511     TEMPORARY_NODE_FAILURE =                  NODE | 2
          512     PERMANENT_NODE_FAILURE =                  PERM | NODE | 2
          513     REQUIRED_NODE_FEATURE_MISSING =           PERM | NODE | 3
          514     INVALID_ONION_VERSION =                   BADONION | PERM | 4
          515     INVALID_ONION_HMAC =                      BADONION | PERM | 5
          516     INVALID_ONION_KEY =                       BADONION | PERM | 6
          517     TEMPORARY_CHANNEL_FAILURE =               UPDATE | 7
          518     PERMANENT_CHANNEL_FAILURE =               PERM | 8
          519     REQUIRED_CHANNEL_FEATURE_MISSING =        PERM | 9
          520     UNKNOWN_NEXT_PEER =                       PERM | 10
          521     AMOUNT_BELOW_MINIMUM =                    UPDATE | 11
          522     FEE_INSUFFICIENT =                        UPDATE | 12
          523     INCORRECT_CLTV_EXPIRY =                   UPDATE | 13
          524     EXPIRY_TOO_SOON =                         UPDATE | 14
          525     INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS =    PERM | 15
          526     _LEGACY_INCORRECT_PAYMENT_AMOUNT =        PERM | 16
          527     FINAL_EXPIRY_TOO_SOON =                   17
          528     FINAL_INCORRECT_CLTV_EXPIRY =             18
          529     FINAL_INCORRECT_HTLC_AMOUNT =             19
          530     CHANNEL_DISABLED =                        UPDATE | 20
          531     EXPIRY_TOO_FAR =                          21
          532     INVALID_ONION_PAYLOAD =                   PERM | 22
          533     MPP_TIMEOUT =                             23
          534     TRAMPOLINE_FEE_INSUFFICIENT =             NODE | 51
          535     TRAMPOLINE_EXPIRY_TOO_SOON =              NODE | 52
          536 
          537 
          538 # don't use these elsewhere, the names are ambiguous without context
          539 del BADONION; del PERM; del NODE; del UPDATE