URI: 
       tcrypto.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tcrypto.py (15661B)
       ---
            1 # -*- coding: utf-8 -*-
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2018 The Electrum developers
            5 #
            6 # Permission is hereby granted, free of charge, to any person
            7 # obtaining a copy of this software and associated documentation files
            8 # (the "Software"), to deal in the Software without restriction,
            9 # including without limitation the rights to use, copy, modify, merge,
           10 # publish, distribute, sublicense, and/or sell copies of the Software,
           11 # and to permit persons to whom the Software is furnished to do so,
           12 # subject to the following conditions:
           13 #
           14 # The above copyright notice and this permission notice shall be
           15 # included in all copies or substantial portions of the Software.
           16 #
           17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           24 # SOFTWARE.
           25 
           26 import base64
           27 import os
           28 import sys
           29 import hashlib
           30 import hmac
           31 from typing import Union
           32 
           33 from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
           34 from .i18n import _
           35 from .logging import get_logger
           36 
           37 
           38 _logger = get_logger(__name__)
           39 
           40 
           41 HAS_PYAES = False
           42 try:
           43     import pyaes
           44 except:
           45     pass
           46 else:
           47     HAS_PYAES = True
           48 
           49 HAS_CRYPTODOME = False
           50 MIN_CRYPTODOME_VERSION = "3.7"
           51 try:
           52     import Cryptodome
           53     if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
           54         _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
           55         raise Exception()
           56     from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
           57     from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
           58     from Cryptodome.Cipher import AES as CD_AES
           59 except:
           60     pass
           61 else:
           62     HAS_CRYPTODOME = True
           63 
           64 HAS_CRYPTOGRAPHY = False
           65 MIN_CRYPTOGRAPHY_VERSION = "2.1"
           66 try:
           67     import cryptography
           68     if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
           69         _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
           70         raise Exception()
           71     from cryptography import exceptions
           72     from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
           73     from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
           74     from cryptography.hazmat.primitives.ciphers import modes as CG_modes
           75     from cryptography.hazmat.backends import default_backend as CG_default_backend
           76     import cryptography.hazmat.primitives.ciphers.aead as CG_aead
           77 except:
           78     pass
           79 else:
           80     HAS_CRYPTOGRAPHY = True
           81 
           82 
           83 if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
           84     sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
           85 
           86 
           87 class InvalidPadding(Exception):
           88     pass
           89 
           90 
           91 def append_PKCS7_padding(data: bytes) -> bytes:
           92     assert_bytes(data)
           93     padlen = 16 - (len(data) % 16)
           94     return data + bytes([padlen]) * padlen
           95 
           96 
           97 def strip_PKCS7_padding(data: bytes) -> bytes:
           98     assert_bytes(data)
           99     if len(data) % 16 != 0 or len(data) == 0:
          100         raise InvalidPadding("invalid length")
          101     padlen = data[-1]
          102     if not (0 < padlen <= 16):
          103         raise InvalidPadding("invalid padding byte (out of range)")
          104     for i in data[-padlen:]:
          105         if i != padlen:
          106             raise InvalidPadding("invalid padding byte (inconsistent)")
          107     return data[0:-padlen]
          108 
          109 
          110 def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
          111     assert_bytes(key, iv, data)
          112     data = append_PKCS7_padding(data)
          113     if HAS_CRYPTODOME:
          114         e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
          115     elif HAS_CRYPTOGRAPHY:
          116         cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
          117         encryptor = cipher.encryptor()
          118         e = encryptor.update(data) + encryptor.finalize()
          119     elif HAS_PYAES:
          120         aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
          121         aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
          122         e = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
          123     else:
          124         raise Exception("no AES backend found")
          125     return e
          126 
          127 
          128 def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
          129     assert_bytes(key, iv, data)
          130     if HAS_CRYPTODOME:
          131         cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
          132         data = cipher.decrypt(data)
          133     elif HAS_CRYPTOGRAPHY:
          134         cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
          135         decryptor = cipher.decryptor()
          136         data = decryptor.update(data) + decryptor.finalize()
          137     elif HAS_PYAES:
          138         aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
          139         aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
          140         data = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
          141     else:
          142         raise Exception("no AES backend found")
          143     try:
          144         return strip_PKCS7_padding(data)
          145     except InvalidPadding:
          146         raise InvalidPassword()
          147 
          148 
          149 def EncodeAES_base64(secret: bytes, msg: bytes) -> bytes:
          150     """Returns base64 encoded ciphertext."""
          151     e = EncodeAES_bytes(secret, msg)
          152     return base64.b64encode(e)
          153 
          154 
          155 def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
          156     assert_bytes(msg)
          157     iv = bytes(os.urandom(16))
          158     ct = aes_encrypt_with_iv(secret, iv, msg)
          159     return iv + ct
          160 
          161 
          162 def DecodeAES_base64(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
          163     ciphertext = bytes(base64.b64decode(ciphertext_b64))
          164     return DecodeAES_bytes(secret, ciphertext)
          165 
          166 
          167 def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
          168     assert_bytes(ciphertext)
          169     iv, e = ciphertext[:16], ciphertext[16:]
          170     s = aes_decrypt_with_iv(secret, iv, e)
          171     return s
          172 
          173 
          174 PW_HASH_VERSION_LATEST = 1
          175 KNOWN_PW_HASH_VERSIONS = (1, 2, )
          176 SUPPORTED_PW_HASH_VERSIONS = (1, )
          177 assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
          178 assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
          179 
          180 
          181 class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
          182     def __init__(self, version):
          183         self.version = version
          184 
          185     def __str__(self):
          186         return "{unexpected}: {version}\n{instruction}".format(
          187             unexpected=_("Unexpected password hash version"),
          188             version=self.version,
          189             instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
          190 
          191 
          192 class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
          193     def __init__(self, version):
          194         self.version = version
          195 
          196     def __str__(self):
          197         return "{unsupported}: {version}\n{instruction}".format(
          198             unsupported=_("Unsupported password hash version"),
          199             version=self.version,
          200             instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
          201                         "Alternatively, restore from seed.")
          202 
          203 
          204 def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
          205     pw = to_bytes(password, 'utf8')
          206     if version not in SUPPORTED_PW_HASH_VERSIONS:
          207         raise UnsupportedPasswordHashVersion(version)
          208     if version == 1:
          209         return sha256d(pw)
          210     else:
          211         assert version not in KNOWN_PW_HASH_VERSIONS
          212         raise UnexpectedPasswordHashVersion(version)
          213 
          214 
          215 def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
          216     if version not in KNOWN_PW_HASH_VERSIONS:
          217         raise UnexpectedPasswordHashVersion(version)
          218     # derive key from password
          219     secret = _hash_password(password, version=version)
          220     # encrypt given data
          221     ciphertext = EncodeAES_bytes(secret, data)
          222     return ciphertext
          223 
          224 
          225 def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
          226     if version not in KNOWN_PW_HASH_VERSIONS:
          227         raise UnexpectedPasswordHashVersion(version)
          228     # derive key from password
          229     secret = _hash_password(password, version=version)
          230     # decrypt given data
          231     try:
          232         d = DecodeAES_bytes(secret, data_bytes)
          233     except Exception as e:
          234         raise InvalidPassword() from e
          235     return d
          236 
          237 
          238 def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
          239     """plaintext bytes -> base64 ciphertext"""
          240     ciphertext = _pw_encode_raw(data, password, version=version)
          241     ciphertext_b64 = base64.b64encode(ciphertext)
          242     return ciphertext_b64.decode('utf8')
          243 
          244 
          245 def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
          246     """base64 ciphertext -> plaintext bytes"""
          247     if version not in KNOWN_PW_HASH_VERSIONS:
          248         raise UnexpectedPasswordHashVersion(version)
          249     data_bytes = bytes(base64.b64decode(data))
          250     return _pw_decode_raw(data_bytes, password, version=version)
          251 
          252 
          253 def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
          254     """plaintext bytes -> base64 ciphertext"""
          255     # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
          256     # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
          257     version = PW_HASH_VERSION_LATEST
          258     mac = sha256(data)[0:4]
          259     ciphertext = _pw_encode_raw(data, password, version=version)
          260     ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
          261     return ciphertext_b64.decode('utf8')
          262 
          263 
          264 def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
          265     """base64 ciphertext -> plaintext bytes"""
          266     data_bytes = bytes(base64.b64decode(data))
          267     version = int(data_bytes[0])
          268     encrypted = data_bytes[1:-4]
          269     mac = data_bytes[-4:]
          270     if version not in KNOWN_PW_HASH_VERSIONS:
          271         raise UnexpectedPasswordHashVersion(version)
          272     decrypted = _pw_decode_raw(encrypted, password, version=version)
          273     if sha256(decrypted)[0:4] != mac:
          274         raise InvalidPassword()
          275     return decrypted
          276 
          277 
          278 def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
          279     """plaintext str -> base64 ciphertext"""
          280     if not password:
          281         return data
          282     plaintext_bytes = to_bytes(data, "utf8")
          283     return pw_encode_bytes(plaintext_bytes, password, version=version)
          284 
          285 
          286 def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
          287     """base64 ciphertext -> plaintext str"""
          288     if password is None:
          289         return data
          290     plaintext_bytes = pw_decode_bytes(data, password, version=version)
          291     try:
          292         plaintext_str = to_string(plaintext_bytes, "utf8")
          293     except UnicodeDecodeError as e:
          294         raise InvalidPassword() from e
          295     return plaintext_str
          296 
          297 
          298 def sha256(x: Union[bytes, str]) -> bytes:
          299     x = to_bytes(x, 'utf8')
          300     return bytes(hashlib.sha256(x).digest())
          301 
          302 
          303 def sha256d(x: Union[bytes, str]) -> bytes:
          304     x = to_bytes(x, 'utf8')
          305     out = bytes(sha256(sha256(x)))
          306     return out
          307 
          308 
          309 def hash_160(x: bytes) -> bytes:
          310     return ripemd(sha256(x))
          311 
          312 def ripemd(x):
          313     try:
          314         md = hashlib.new('ripemd160')
          315         md.update(x)
          316         return md.digest()
          317     except BaseException:
          318         # ripemd160 is not guaranteed to be available in hashlib on all platforms.
          319         # Historically, our Android builds had hashlib/openssl which did not have it.
          320         # see https://github.com/spesmilo/electrum/issues/7093
          321         # We bundle a pure python implementation as fallback that gets used now:
          322         from . import ripemd
          323         md = ripemd.new(x)
          324         return md.digest()
          325 
          326 def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
          327     if hasattr(hmac, 'digest'):
          328         # requires python 3.7+; faster
          329         return hmac.digest(key, msg, digest)
          330     else:
          331         return hmac.new(key, msg, digest).digest()
          332 
          333 
          334 def chacha20_poly1305_encrypt(
          335         *,
          336         key: bytes,
          337         nonce: bytes,
          338         associated_data: bytes = None,
          339         data: bytes
          340 ) -> bytes:
          341     assert isinstance(key, (bytes, bytearray))
          342     assert isinstance(nonce, (bytes, bytearray))
          343     assert isinstance(associated_data, (bytes, bytearray, type(None)))
          344     assert isinstance(data, (bytes, bytearray))
          345     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
          346     assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
          347     if HAS_CRYPTODOME:
          348         cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
          349         if associated_data is not None:
          350             cipher.update(associated_data)
          351         ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
          352         return ciphertext + mac
          353     if HAS_CRYPTOGRAPHY:
          354         a = CG_aead.ChaCha20Poly1305(key)
          355         return a.encrypt(nonce, data, associated_data)
          356     raise Exception("no chacha20 backend found")
          357 
          358 
          359 def chacha20_poly1305_decrypt(
          360         *,
          361         key: bytes,
          362         nonce: bytes,
          363         associated_data: bytes = None,
          364         data: bytes
          365 ) -> bytes:
          366     assert isinstance(key, (bytes, bytearray))
          367     assert isinstance(nonce, (bytes, bytearray))
          368     assert isinstance(associated_data, (bytes, bytearray, type(None)))
          369     assert isinstance(data, (bytes, bytearray))
          370     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
          371     assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
          372     if HAS_CRYPTODOME:
          373         cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
          374         if associated_data is not None:
          375             cipher.update(associated_data)
          376         # raises ValueError if not valid (e.g. incorrect MAC)
          377         return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
          378     if HAS_CRYPTOGRAPHY:
          379         a = CG_aead.ChaCha20Poly1305(key)
          380         try:
          381             return a.decrypt(nonce, data, associated_data)
          382         except cryptography.exceptions.InvalidTag as e:
          383             raise ValueError("invalid tag") from e
          384     raise Exception("no chacha20 backend found")
          385 
          386 
          387 def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
          388     assert isinstance(key, (bytes, bytearray))
          389     assert isinstance(nonce, (bytes, bytearray))
          390     assert isinstance(data, (bytes, bytearray))
          391     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
          392     assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
          393     if HAS_CRYPTODOME:
          394         cipher = CD_ChaCha20.new(key=key, nonce=nonce)
          395         return cipher.encrypt(data)
          396     if HAS_CRYPTOGRAPHY:
          397         nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
          398         algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
          399         cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
          400         encryptor = cipher.encryptor()
          401         return encryptor.update(data)
          402     raise Exception("no chacha20 backend found")
          403 
          404 
          405 def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
          406     assert isinstance(key, (bytes, bytearray))
          407     assert isinstance(nonce, (bytes, bytearray))
          408     assert isinstance(data, (bytes, bytearray))
          409     assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
          410     assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
          411     if HAS_CRYPTODOME:
          412         cipher = CD_ChaCha20.new(key=key, nonce=nonce)
          413         return cipher.decrypt(data)
          414     if HAS_CRYPTOGRAPHY:
          415         nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
          416         algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
          417         cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
          418         decryptor = cipher.decryptor()
          419         return decryptor.update(data)
          420     raise Exception("no chacha20 backend found")