tcrypto.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tcrypto.py (15661B)
---
1 # -*- coding: utf-8 -*-
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2018 The Electrum developers
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation files
8 # (the "Software"), to deal in the Software without restriction,
9 # including without limitation the rights to use, copy, modify, merge,
10 # publish, distribute, sublicense, and/or sell copies of the Software,
11 # and to permit persons to whom the Software is furnished to do so,
12 # subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25
26 import base64
27 import os
28 import sys
29 import hashlib
30 import hmac
31 from typing import Union
32
33 from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
34 from .i18n import _
35 from .logging import get_logger
36
37
38 _logger = get_logger(__name__)
39
40
41 HAS_PYAES = False
42 try:
43 import pyaes
44 except:
45 pass
46 else:
47 HAS_PYAES = True
48
49 HAS_CRYPTODOME = False
50 MIN_CRYPTODOME_VERSION = "3.7"
51 try:
52 import Cryptodome
53 if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
54 _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
55 raise Exception()
56 from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
57 from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
58 from Cryptodome.Cipher import AES as CD_AES
59 except:
60 pass
61 else:
62 HAS_CRYPTODOME = True
63
64 HAS_CRYPTOGRAPHY = False
65 MIN_CRYPTOGRAPHY_VERSION = "2.1"
66 try:
67 import cryptography
68 if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
69 _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
70 raise Exception()
71 from cryptography import exceptions
72 from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
73 from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
74 from cryptography.hazmat.primitives.ciphers import modes as CG_modes
75 from cryptography.hazmat.backends import default_backend as CG_default_backend
76 import cryptography.hazmat.primitives.ciphers.aead as CG_aead
77 except:
78 pass
79 else:
80 HAS_CRYPTOGRAPHY = True
81
82
83 if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
84 sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
85
86
87 class InvalidPadding(Exception):
88 pass
89
90
91 def append_PKCS7_padding(data: bytes) -> bytes:
92 assert_bytes(data)
93 padlen = 16 - (len(data) % 16)
94 return data + bytes([padlen]) * padlen
95
96
97 def strip_PKCS7_padding(data: bytes) -> bytes:
98 assert_bytes(data)
99 if len(data) % 16 != 0 or len(data) == 0:
100 raise InvalidPadding("invalid length")
101 padlen = data[-1]
102 if not (0 < padlen <= 16):
103 raise InvalidPadding("invalid padding byte (out of range)")
104 for i in data[-padlen:]:
105 if i != padlen:
106 raise InvalidPadding("invalid padding byte (inconsistent)")
107 return data[0:-padlen]
108
109
110 def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
111 assert_bytes(key, iv, data)
112 data = append_PKCS7_padding(data)
113 if HAS_CRYPTODOME:
114 e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
115 elif HAS_CRYPTOGRAPHY:
116 cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
117 encryptor = cipher.encryptor()
118 e = encryptor.update(data) + encryptor.finalize()
119 elif HAS_PYAES:
120 aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
121 aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
122 e = aes.feed(data) + aes.feed() # empty aes.feed() flushes buffer
123 else:
124 raise Exception("no AES backend found")
125 return e
126
127
128 def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
129 assert_bytes(key, iv, data)
130 if HAS_CRYPTODOME:
131 cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
132 data = cipher.decrypt(data)
133 elif HAS_CRYPTOGRAPHY:
134 cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
135 decryptor = cipher.decryptor()
136 data = decryptor.update(data) + decryptor.finalize()
137 elif HAS_PYAES:
138 aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
139 aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
140 data = aes.feed(data) + aes.feed() # empty aes.feed() flushes buffer
141 else:
142 raise Exception("no AES backend found")
143 try:
144 return strip_PKCS7_padding(data)
145 except InvalidPadding:
146 raise InvalidPassword()
147
148
149 def EncodeAES_base64(secret: bytes, msg: bytes) -> bytes:
150 """Returns base64 encoded ciphertext."""
151 e = EncodeAES_bytes(secret, msg)
152 return base64.b64encode(e)
153
154
155 def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
156 assert_bytes(msg)
157 iv = bytes(os.urandom(16))
158 ct = aes_encrypt_with_iv(secret, iv, msg)
159 return iv + ct
160
161
162 def DecodeAES_base64(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
163 ciphertext = bytes(base64.b64decode(ciphertext_b64))
164 return DecodeAES_bytes(secret, ciphertext)
165
166
167 def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
168 assert_bytes(ciphertext)
169 iv, e = ciphertext[:16], ciphertext[16:]
170 s = aes_decrypt_with_iv(secret, iv, e)
171 return s
172
173
174 PW_HASH_VERSION_LATEST = 1
175 KNOWN_PW_HASH_VERSIONS = (1, 2, )
176 SUPPORTED_PW_HASH_VERSIONS = (1, )
177 assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
178 assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
179
180
181 class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
182 def __init__(self, version):
183 self.version = version
184
185 def __str__(self):
186 return "{unexpected}: {version}\n{instruction}".format(
187 unexpected=_("Unexpected password hash version"),
188 version=self.version,
189 instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
190
191
192 class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
193 def __init__(self, version):
194 self.version = version
195
196 def __str__(self):
197 return "{unsupported}: {version}\n{instruction}".format(
198 unsupported=_("Unsupported password hash version"),
199 version=self.version,
200 instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
201 "Alternatively, restore from seed.")
202
203
204 def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
205 pw = to_bytes(password, 'utf8')
206 if version not in SUPPORTED_PW_HASH_VERSIONS:
207 raise UnsupportedPasswordHashVersion(version)
208 if version == 1:
209 return sha256d(pw)
210 else:
211 assert version not in KNOWN_PW_HASH_VERSIONS
212 raise UnexpectedPasswordHashVersion(version)
213
214
215 def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
216 if version not in KNOWN_PW_HASH_VERSIONS:
217 raise UnexpectedPasswordHashVersion(version)
218 # derive key from password
219 secret = _hash_password(password, version=version)
220 # encrypt given data
221 ciphertext = EncodeAES_bytes(secret, data)
222 return ciphertext
223
224
225 def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
226 if version not in KNOWN_PW_HASH_VERSIONS:
227 raise UnexpectedPasswordHashVersion(version)
228 # derive key from password
229 secret = _hash_password(password, version=version)
230 # decrypt given data
231 try:
232 d = DecodeAES_bytes(secret, data_bytes)
233 except Exception as e:
234 raise InvalidPassword() from e
235 return d
236
237
238 def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
239 """plaintext bytes -> base64 ciphertext"""
240 ciphertext = _pw_encode_raw(data, password, version=version)
241 ciphertext_b64 = base64.b64encode(ciphertext)
242 return ciphertext_b64.decode('utf8')
243
244
245 def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
246 """base64 ciphertext -> plaintext bytes"""
247 if version not in KNOWN_PW_HASH_VERSIONS:
248 raise UnexpectedPasswordHashVersion(version)
249 data_bytes = bytes(base64.b64decode(data))
250 return _pw_decode_raw(data_bytes, password, version=version)
251
252
253 def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
254 """plaintext bytes -> base64 ciphertext"""
255 # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
256 # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
257 version = PW_HASH_VERSION_LATEST
258 mac = sha256(data)[0:4]
259 ciphertext = _pw_encode_raw(data, password, version=version)
260 ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
261 return ciphertext_b64.decode('utf8')
262
263
264 def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
265 """base64 ciphertext -> plaintext bytes"""
266 data_bytes = bytes(base64.b64decode(data))
267 version = int(data_bytes[0])
268 encrypted = data_bytes[1:-4]
269 mac = data_bytes[-4:]
270 if version not in KNOWN_PW_HASH_VERSIONS:
271 raise UnexpectedPasswordHashVersion(version)
272 decrypted = _pw_decode_raw(encrypted, password, version=version)
273 if sha256(decrypted)[0:4] != mac:
274 raise InvalidPassword()
275 return decrypted
276
277
278 def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
279 """plaintext str -> base64 ciphertext"""
280 if not password:
281 return data
282 plaintext_bytes = to_bytes(data, "utf8")
283 return pw_encode_bytes(plaintext_bytes, password, version=version)
284
285
286 def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
287 """base64 ciphertext -> plaintext str"""
288 if password is None:
289 return data
290 plaintext_bytes = pw_decode_bytes(data, password, version=version)
291 try:
292 plaintext_str = to_string(plaintext_bytes, "utf8")
293 except UnicodeDecodeError as e:
294 raise InvalidPassword() from e
295 return plaintext_str
296
297
298 def sha256(x: Union[bytes, str]) -> bytes:
299 x = to_bytes(x, 'utf8')
300 return bytes(hashlib.sha256(x).digest())
301
302
303 def sha256d(x: Union[bytes, str]) -> bytes:
304 x = to_bytes(x, 'utf8')
305 out = bytes(sha256(sha256(x)))
306 return out
307
308
309 def hash_160(x: bytes) -> bytes:
310 return ripemd(sha256(x))
311
312 def ripemd(x):
313 try:
314 md = hashlib.new('ripemd160')
315 md.update(x)
316 return md.digest()
317 except BaseException:
318 # ripemd160 is not guaranteed to be available in hashlib on all platforms.
319 # Historically, our Android builds had hashlib/openssl which did not have it.
320 # see https://github.com/spesmilo/electrum/issues/7093
321 # We bundle a pure python implementation as fallback that gets used now:
322 from . import ripemd
323 md = ripemd.new(x)
324 return md.digest()
325
326 def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
327 if hasattr(hmac, 'digest'):
328 # requires python 3.7+; faster
329 return hmac.digest(key, msg, digest)
330 else:
331 return hmac.new(key, msg, digest).digest()
332
333
334 def chacha20_poly1305_encrypt(
335 *,
336 key: bytes,
337 nonce: bytes,
338 associated_data: bytes = None,
339 data: bytes
340 ) -> bytes:
341 assert isinstance(key, (bytes, bytearray))
342 assert isinstance(nonce, (bytes, bytearray))
343 assert isinstance(associated_data, (bytes, bytearray, type(None)))
344 assert isinstance(data, (bytes, bytearray))
345 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
346 assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
347 if HAS_CRYPTODOME:
348 cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
349 if associated_data is not None:
350 cipher.update(associated_data)
351 ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
352 return ciphertext + mac
353 if HAS_CRYPTOGRAPHY:
354 a = CG_aead.ChaCha20Poly1305(key)
355 return a.encrypt(nonce, data, associated_data)
356 raise Exception("no chacha20 backend found")
357
358
359 def chacha20_poly1305_decrypt(
360 *,
361 key: bytes,
362 nonce: bytes,
363 associated_data: bytes = None,
364 data: bytes
365 ) -> bytes:
366 assert isinstance(key, (bytes, bytearray))
367 assert isinstance(nonce, (bytes, bytearray))
368 assert isinstance(associated_data, (bytes, bytearray, type(None)))
369 assert isinstance(data, (bytes, bytearray))
370 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
371 assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
372 if HAS_CRYPTODOME:
373 cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
374 if associated_data is not None:
375 cipher.update(associated_data)
376 # raises ValueError if not valid (e.g. incorrect MAC)
377 return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
378 if HAS_CRYPTOGRAPHY:
379 a = CG_aead.ChaCha20Poly1305(key)
380 try:
381 return a.decrypt(nonce, data, associated_data)
382 except cryptography.exceptions.InvalidTag as e:
383 raise ValueError("invalid tag") from e
384 raise Exception("no chacha20 backend found")
385
386
387 def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
388 assert isinstance(key, (bytes, bytearray))
389 assert isinstance(nonce, (bytes, bytearray))
390 assert isinstance(data, (bytes, bytearray))
391 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
392 assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
393 if HAS_CRYPTODOME:
394 cipher = CD_ChaCha20.new(key=key, nonce=nonce)
395 return cipher.encrypt(data)
396 if HAS_CRYPTOGRAPHY:
397 nonce = bytes(16 - len(nonce)) + nonce # cryptography wants 16 byte nonces
398 algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
399 cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
400 encryptor = cipher.encryptor()
401 return encryptor.update(data)
402 raise Exception("no chacha20 backend found")
403
404
405 def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
406 assert isinstance(key, (bytes, bytearray))
407 assert isinstance(nonce, (bytes, bytearray))
408 assert isinstance(data, (bytes, bytearray))
409 assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
410 assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
411 if HAS_CRYPTODOME:
412 cipher = CD_ChaCha20.new(key=key, nonce=nonce)
413 return cipher.decrypt(data)
414 if HAS_CRYPTOGRAPHY:
415 nonce = bytes(16 - len(nonce)) + nonce # cryptography wants 16 byte nonces
416 algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
417 cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
418 decryptor = cipher.decryptor()
419 return decryptor.update(data)
420 raise Exception("no chacha20 backend found")