tcomplete bolt11 port to ecdsa instead of secp256k1 - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 4d3c34e04e827d57e25f5b7851e1565cac3b984e DIR parent 4aa9d7ea0d660d706e3fb7d29eba942773ee68a3 HTML Author: Janus <ysangkok@gmail.com> Date: Thu, 19 Apr 2018 15:21:47 +0200 complete bolt11 port to ecdsa instead of secp256k1 Diffstat: M lib/lightning_payencode/lnaddr.py | 54 ++++++++++++++++---------------- A lib/tests/test_bolt11.py | 94 +++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 27 deletions(-) --- DIR diff --git a/lib/lightning_payencode/lnaddr.py b/lib/lightning_payencode/lnaddr.py t@@ -1,7 +1,8 @@ #! /usr/bin/env python3 -import traceback import ecdsa.curves -from ..bitcoin import MyVerifyingKey, GetPubKey +from ecdsa.ecdsa import generator_secp256k1 +from ..bitcoin import MyVerifyingKey, GetPubKey, regenerate_key, hash160_to_b58_address, b58_address_to_hash160, ser_to_point, verify_signature +from hashlib import sha256 from ..segwit_addr import bech32_encode, bech32_decode, CHARSET from binascii import hexlify, unhexlify from bitstring import BitArray t@@ -9,9 +10,7 @@ from decimal import Decimal import bitstring import hashlib -import math import re -import sys import time t@@ -88,28 +87,25 @@ def encode_fallback(fallback, currency): raise ValueError("Invalid witness version {}".format(witness[0])) wprog = u5_to_bitarray(witness[1:]) else: - addr = base58.b58decode_check(fallback) - if is_p2pkh(currency, addr[0]): + addrtype, addr = b58_address_to_hash160(fallback) + if is_p2pkh(currency, addrtype): wver = 17 - elif is_p2sh(currency, addr[0]): + elif is_p2sh(currency, addrtype): wver = 18 else: raise ValueError("Unknown address type for {}".format(currency)) - wprog = addr[1:] + wprog = addr return tagged('f', bitstring.pack("uint:5", wver) + wprog) else: raise NotImplementedError("Support for currency {} not implemented".format(currency)) def parse_fallback(fallback, currency): - return None # this function disabled by Janus to avoid base58 dependency if currency == 'bc' or currency == 'tb': wver = fallback[0:5].uint if wver == 17: - addr=base58.b58encode_check(bytes([base58_prefix_map[currency][0]]) - + fallback[5:].tobytes()) + addr=hash160_to_b58_address(fallback[5:].tobytes(), base58_prefix_map[currency][0]) elif wver == 18: - addr=base58.b58encode_check(bytes([base58_prefix_map[currency][1]]) - + fallback[5:].tobytes()) + addr=hash160_to_b58_address(fallback[5:].tobytes(), base58_prefix_map[currency][1]) elif wver <= 16: addr=bech32_encode(currency, bitarray_to_u5(fallback)) else: t@@ -205,7 +201,7 @@ def lnencode(addr, privkey): expirybits = expirybits[5:] data += tagged('x', expirybits) elif k == 'h': - data += tagged_bytes('h', hashlib.sha256(v.encode('utf-8')).digest()) + data += tagged_bytes('h', sha256(v.encode('utf-8')).digest()) elif k == 'n': data += tagged_bytes('n', v) else: t@@ -224,11 +220,12 @@ def lnencode(addr, privkey): raise ValueError("Must include either 'd' or 'h'") # We actually sign the hrp, then data (padded to 8 bits with zeroes). - privkey = secp256k1.PrivateKey(bytes(unhexlify(privkey))) - sig = privkey.ecdsa_sign_recoverable(bytearray([ord(c) for c in hrp]) + data.tobytes()) - # This doesn't actually serialize, but returns a pair of values :( - sig, recid = privkey.ecdsa_recoverable_serialize(sig) - data += bytes(sig) + bytes([recid]) + msg = hrp.encode("ascii") + data.tobytes() + privkey = regenerate_key(privkey) + sig = privkey.sign_message(msg, is_compressed=False, algo=lambda x: sha256(x).digest()) + recovery_flag = bytes([sig[0] - 27]) + sig = bytes(sig[1:]) + recovery_flag + data += sig return bech32_encode(hrp, bitarray_to_u5(data)) t@@ -347,8 +344,8 @@ def lndecode(a, verbose=False): if data_length != 53: addr.unknown_tags.append((tag, tagdata)) continue - addr.pubkey = secp256k1.PublicKey(flags=secp256k1.ALL_FLAGS) - addr.pubkey.deserialize(trim_to_bytes(tagdata)) + pubkeybytes = trim_to_bytes(tagdata) + addr.pubkey = pubkeybytes else: addr.unknown_tags.append((tag, tagdata)) t@@ -357,24 +354,27 @@ def lndecode(a, verbose=False): .format(hexlify(sigdecoded[0:64]))) print('recovery flag: {}'.format(sigdecoded[64])) print('hex of data for signing: {}' - .format(hexlify(bytearray([ord(c) for c in hrp]) - + data.tobytes()))) - print('SHA256 of above: {}'.format(hashlib.sha256(bytearray([ord(c) for c in hrp]) + data.tobytes()).hexdigest())) + .format(hexlify(hrp.encode("ascii") + data.tobytes()))) + print('SHA256 of above: {}'.format(sha256(hrp.encode("ascii") + data.tobytes()).hexdigest())) # BOLT #11: # # A reader MUST check that the `signature` is valid (see the `n` tagged # field specified below). + addr.signature = sigdecoded[:65] if addr.pubkey: # Specified by `n` # BOLT #11: # # A reader MUST use the `n` field to validate the signature instead of # performing signature recovery if a valid `n` field is provided. - addr.signature = addr.pubkey.ecdsa_deserialize_compact(sigdecoded[0:64]) - if not addr.pubkey.ecdsa_verify(bytearray([ord(c) for c in hrp]) + data.tobytes(), addr.signature): + if not verify_signature(addr.pubkey, sigdecoded[:64], sha256(hrp.encode("ascii") + data.tobytes()).digest()): raise ValueError('Invalid signature') + pubkey_copy = addr.pubkey + class WrappedBytesKey: + serialize = lambda: pubkey_copy + addr.pubkey = WrappedBytesKey else: # Recover pubkey from signature. - addr.pubkey = SerializableKey(MyVerifyingKey.from_signature(sigdecoded[:64], sigdecoded[64], hashlib.sha256(bytearray([ord(c) for c in hrp]) + data.tobytes()).digest(), curve = ecdsa.curves.SECP256k1)) + addr.pubkey = SerializableKey(MyVerifyingKey.from_signature(sigdecoded[:64], sigdecoded[64], sha256(hrp.encode("ascii") + data.tobytes()).digest(), curve = ecdsa.curves.SECP256k1)) return addr DIR diff --git a/lib/tests/test_bolt11.py b/lib/tests/test_bolt11.py t@@ -0,0 +1,94 @@ +from hashlib import sha256 +from lib.lightning_payencode.lnaddr import shorten_amount, unshorten_amount, LnAddr, lnencode, lndecode, u5_to_bitarray, bitarray_to_u5 +from decimal import Decimal +from binascii import unhexlify, hexlify +from lib.segwit_addr import bech32_encode, bech32_decode +import pprint +import unittest + +RHASH=unhexlify('0001020304050607080900010203040506070809000102030405060708090102') +CONVERSION_RATE=1200 +PRIVKEY=unhexlify('e126f68f7eafcc8b74f54d269fe206be715000f94dac067d1c04a8ca3b2db734') +PUBKEY=unhexlify('03e7156ae33b0a208d0744199163177e909e80176e55d97a2f221ede0f934dd9ad') + +class TestBolt11(unittest.TestCase): + def test_shorten_amount(self): + tests = { + Decimal(10)/10**12: '10p', + Decimal(1000)/10**12: '1n', + Decimal(1200)/10**12: '1200p', + Decimal(123)/10**6: '123u', + Decimal(123)/1000: '123m', + Decimal(3): '3', + } + + for i, o in tests.items(): + assert shorten_amount(i) == o + assert unshorten_amount(shorten_amount(i)) == i + + @staticmethod + def compare(a, b): + + if len([t[1] for t in a.tags if t[0] == 'h']) == 1: + h1 = sha256([t[1] for t in a.tags if t[0] == 'h'][0].encode('utf-8')).digest() + h2 = [t[1] for t in b.tags if t[0] == 'h'][0] + assert h1 == h2 + + # Need to filter out these, since they are being modified during + # encoding, i.e., hashed + a.tags = [t for t in a.tags if t[0] != 'h' and t[0] != 'n'] + b.tags = [t for t in b.tags if t[0] != 'h' and t[0] != 'n'] + + assert b.pubkey.serialize() == PUBKEY, (hexlify(b.pubkey.serialize()), hexlify(PUBKEY)) + assert b.signature != None + + # Unset these, they are generated during encoding/decoding + b.pubkey = None + b.signature = None + + assert a.__dict__ == b.__dict__, (pprint.pformat([a.__dict__, b.__dict__])) + + def test_roundtrip(self): + longdescription = ('One piece of chocolate cake, one icecream cone, one' + ' pickle, one slice of swiss cheese, one slice of salami,' + ' one lollypop, one piece of cherry pie, one sausage, one' + ' cupcake, and one slice of watermelon') + + + tests = [ + LnAddr(RHASH, tags=[('d', '')]), + LnAddr(RHASH, amount=Decimal('0.001'), + tags=[('d', '1 cup coffee'), ('x', 60)]), + LnAddr(RHASH, amount=Decimal('1'), tags=[('h', longdescription)]), + LnAddr(RHASH, currency='tb', tags=[('f', 'mk2QpYatsKicvFVuTAQLBryyccRXMUaGHP'), ('h', longdescription)]), + LnAddr(RHASH, amount=24, tags=[ + ('r', [(unhexlify('029e03a901b85534ff1e92c43c74431f7ce72046060fcf7a95c37e148f78c77255'), unhexlify('0102030405060708'), 1, 20, 3), (unhexlify('039e03a901b85534ff1e92c43c74431f7ce72046060fcf7a95c37e148f78c77255'), unhexlify('030405060708090a'), 2, 30, 4)]), ('f', '1RustyRX2oai4EYYDpQGWvEL62BBGqN9T'), ('h', longdescription)]), + LnAddr(RHASH, amount=24, tags=[('f', '3EktnHQD7RiAE6uzMj2ZifT9YgRrkSgzQX'), ('h', longdescription)]), + LnAddr(RHASH, amount=24, tags=[('f', 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4'), ('h', longdescription)]), + LnAddr(RHASH, amount=24, tags=[('f', 'bc1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3qccfmv3'), ('h', longdescription)]), + LnAddr(RHASH, amount=24, tags=[('n', PUBKEY), ('h', longdescription)]), + ] + + # Roundtrip + for t in tests: + o = lndecode(lnencode(t, PRIVKEY)) + self.compare(t, o) + + def test_n_decoding(self): + # We flip the signature recovery bit, which would normally give a different + # pubkey. + hrp, data = bech32_decode(lnencode(LnAddr(RHASH, amount=24, tags=[('d', '')]), PRIVKEY), True) + databits = u5_to_bitarray(data) + databits.invert(-1) + lnaddr = lndecode(bech32_encode(hrp, bitarray_to_u5(databits)), True) + assert lnaddr.pubkey.serialize() != PUBKEY + + # But not if we supply expliciy `n` specifier! + hrp, data = bech32_decode(lnencode(LnAddr(RHASH, amount=24, + tags=[('d', ''), + ('n', PUBKEY)]), + PRIVKEY), True) + databits = u5_to_bitarray(data) + databits.invert(-1) + lnaddr = lndecode(bech32_encode(hrp, bitarray_to_u5(databits)), True) + assert lnaddr.pubkey.serialize() == PUBKEY