tkeystore.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tkeystore.py (40334B)
---
1 #!/usr/bin/env python2
2 # -*- mode: python -*-
3 #
4 # Electrum - lightweight Bitcoin client
5 # Copyright (C) 2016 The Electrum developers
6 #
7 # Permission is hereby granted, free of charge, to any person
8 # obtaining a copy of this software and associated documentation files
9 # (the "Software"), to deal in the Software without restriction,
10 # including without limitation the rights to use, copy, modify, merge,
11 # publish, distribute, sublicense, and/or sell copies of the Software,
12 # and to permit persons to whom the Software is furnished to do so,
13 # subject to the following conditions:
14 #
15 # The above copyright notice and this permission notice shall be
16 # included in all copies or substantial portions of the Software.
17 #
18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
22 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
24 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 # SOFTWARE.
26
27 from unicodedata import normalize
28 import hashlib
29 import re
30 from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
31 from functools import lru_cache
32 from abc import ABC, abstractmethod
33
34 from . import bitcoin, ecc, constants, bip32
35 from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError
36 from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput
37 from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME,
38 is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation,
39 convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info)
40 from .ecc import string_to_number
41 from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
42 SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160)
43 from .util import (InvalidPassword, WalletFileException,
44 BitcoinException, bh2u, bfh, inv_dict, is_hex_str)
45 from .mnemonic import Mnemonic, Wordlist, seed_type, is_seed
46 from .plugin import run_hook
47 from .logging import Logger
48
49 if TYPE_CHECKING:
50 from .gui.qt.util import TaskThread
51 from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
52 from .wallet_db import WalletDB
53
54
55 class CannotDerivePubkey(Exception): pass
56
57
58 class KeyStore(Logger, ABC):
59 type: str
60
61 def __init__(self):
62 Logger.__init__(self)
63 self.is_requesting_to_be_rewritten_to_wallet_file = False # type: bool
64
65 def has_seed(self) -> bool:
66 return False
67
68 def is_watching_only(self) -> bool:
69 return False
70
71 def can_import(self) -> bool:
72 return False
73
74 def get_type_text(self) -> str:
75 return f'{self.type}'
76
77 @abstractmethod
78 def may_have_password(self):
79 """Returns whether the keystore can be encrypted with a password."""
80 pass
81
82 def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]:
83 keypairs = {}
84 for txin in tx.inputs():
85 keypairs.update(self._get_txin_derivations(txin))
86 return keypairs
87
88 def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[str, Union[Sequence[int], str]]:
89 if txin.is_complete():
90 return {}
91 keypairs = {}
92 for pubkey in txin.pubkeys:
93 if pubkey in txin.part_sigs:
94 # this pubkey already signed
95 continue
96 derivation = self.get_pubkey_derivation(pubkey, txin)
97 if not derivation:
98 continue
99 keypairs[pubkey.hex()] = derivation
100 return keypairs
101
102 def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool:
103 """Returns whether this keystore could sign *something* in this tx."""
104 if not ignore_watching_only and self.is_watching_only():
105 return False
106 if not isinstance(tx, PartialTransaction):
107 return False
108 return bool(self._get_tx_derivations(tx))
109
110 def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool:
111 """Returns whether this keystore could sign this txin."""
112 if not ignore_watching_only and self.is_watching_only():
113 return False
114 if not isinstance(txin, PartialTxInput):
115 return False
116 return bool(self._get_txin_derivations(txin))
117
118 def ready_to_sign(self) -> bool:
119 return not self.is_watching_only()
120
121 @abstractmethod
122 def dump(self) -> dict:
123 pass
124
125 @abstractmethod
126 def is_deterministic(self) -> bool:
127 pass
128
129 @abstractmethod
130 def sign_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
131 pass
132
133 @abstractmethod
134 def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
135 pass
136
137 @abstractmethod
138 def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
139 pass
140
141 @abstractmethod
142 def get_pubkey_derivation(self, pubkey: bytes,
143 txinout: Union['PartialTxInput', 'PartialTxOutput'],
144 *, only_der_suffix=True) \
145 -> Union[Sequence[int], str, None]:
146 """Returns either a derivation int-list if the pubkey can be HD derived from this keystore,
147 the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
148 or None if the pubkey is unrelated.
149 """
150 pass
151
152 def find_my_pubkey_in_txinout(
153 self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
154 *, only_der_suffix: bool = False
155 ) -> Tuple[Optional[bytes], Optional[List[int]]]:
156 # note: we assume that this cosigner only has one pubkey in this txin/txout
157 for pubkey in txinout.bip32_paths:
158 path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix)
159 if path and not isinstance(path, (str, bytes)):
160 return pubkey, list(path)
161 return None, None
162
163
164 class Software_KeyStore(KeyStore):
165
166 def __init__(self, d):
167 KeyStore.__init__(self)
168 self.pw_hash_version = d.get('pw_hash_version', 1)
169 if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS:
170 raise UnsupportedPasswordHashVersion(self.pw_hash_version)
171
172 def may_have_password(self):
173 return not self.is_watching_only()
174
175 def sign_message(self, sequence, message, password) -> bytes:
176 privkey, compressed = self.get_private_key(sequence, password)
177 key = ecc.ECPrivkey(privkey)
178 return key.sign_message(message, compressed)
179
180 def decrypt_message(self, sequence, message, password) -> bytes:
181 privkey, compressed = self.get_private_key(sequence, password)
182 ec = ecc.ECPrivkey(privkey)
183 decrypted = ec.decrypt_message(message)
184 return decrypted
185
186 def sign_transaction(self, tx, password):
187 if self.is_watching_only():
188 return
189 # Raise if password is not correct.
190 self.check_password(password)
191 # Add private keys
192 keypairs = self._get_tx_derivations(tx)
193 for k, v in keypairs.items():
194 keypairs[k] = self.get_private_key(v, password)
195 # Sign
196 if keypairs:
197 tx.sign(keypairs)
198
199 @abstractmethod
200 def update_password(self, old_password, new_password):
201 pass
202
203 @abstractmethod
204 def check_password(self, password):
205 pass
206
207 @abstractmethod
208 def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]:
209 """Returns (privkey, is_compressed)"""
210 pass
211
212
213 class Imported_KeyStore(Software_KeyStore):
214 # keystore for imported private keys
215
216 type = 'imported'
217
218 def __init__(self, d):
219 Software_KeyStore.__init__(self, d)
220 self.keypairs = d.get('keypairs', {}) # type: Dict[str, str]
221
222 def is_deterministic(self):
223 return False
224
225 def dump(self):
226 return {
227 'type': self.type,
228 'keypairs': self.keypairs,
229 'pw_hash_version': self.pw_hash_version,
230 }
231
232 def can_import(self):
233 return True
234
235 def check_password(self, password):
236 pubkey = list(self.keypairs.keys())[0]
237 self.get_private_key(pubkey, password)
238
239 def import_privkey(self, sec, password):
240 txin_type, privkey, compressed = deserialize_privkey(sec)
241 pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
242 # re-serialize the key so the internal storage format is consistent
243 serialized_privkey = serialize_privkey(
244 privkey, compressed, txin_type, internal_use=True)
245 # NOTE: if the same pubkey is reused for multiple addresses (script types),
246 # there will only be one pubkey-privkey pair for it in self.keypairs,
247 # and the privkey will encode a txin_type but that txin_type cannot be trusted.
248 # Removing keys complicates this further.
249 self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
250 return txin_type, pubkey
251
252 def delete_imported_key(self, key):
253 self.keypairs.pop(key)
254
255 def get_private_key(self, pubkey: str, password):
256 sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
257 try:
258 txin_type, privkey, compressed = deserialize_privkey(sec)
259 except BaseDecodeError as e:
260 raise InvalidPassword() from e
261 if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
262 raise InvalidPassword()
263 return privkey, compressed
264
265 def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True):
266 if pubkey.hex() in self.keypairs:
267 return pubkey.hex()
268 return None
269
270 def update_password(self, old_password, new_password):
271 self.check_password(old_password)
272 if new_password == '':
273 new_password = None
274 for k, v in self.keypairs.items():
275 b = pw_decode(v, old_password, version=self.pw_hash_version)
276 c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
277 self.keypairs[k] = c
278 self.pw_hash_version = PW_HASH_VERSION_LATEST
279
280
281 class Deterministic_KeyStore(Software_KeyStore):
282
283 def __init__(self, d):
284 Software_KeyStore.__init__(self, d)
285 self.seed = d.get('seed', '')
286 self.passphrase = d.get('passphrase', '')
287
288 def is_deterministic(self):
289 return True
290
291 def dump(self):
292 d = {
293 'type': self.type,
294 'pw_hash_version': self.pw_hash_version,
295 }
296 if self.seed:
297 d['seed'] = self.seed
298 if self.passphrase:
299 d['passphrase'] = self.passphrase
300 return d
301
302 def has_seed(self):
303 return bool(self.seed)
304
305 def is_watching_only(self):
306 return not self.has_seed()
307
308 @abstractmethod
309 def format_seed(self, seed: str) -> str:
310 pass
311
312 def add_seed(self, seed):
313 if self.seed:
314 raise Exception("a seed exists")
315 self.seed = self.format_seed(seed)
316
317 def get_seed(self, password):
318 if not self.has_seed():
319 raise Exception("This wallet has no seed words")
320 return pw_decode(self.seed, password, version=self.pw_hash_version)
321
322 def get_passphrase(self, password):
323 if self.passphrase:
324 return pw_decode(self.passphrase, password, version=self.pw_hash_version)
325 else:
326 return ''
327
328
329 class MasterPublicKeyMixin(ABC):
330
331 @abstractmethod
332 def get_master_public_key(self) -> str:
333 pass
334
335 @abstractmethod
336 def get_derivation_prefix(self) -> Optional[str]:
337 """Returns to bip32 path from some root node to self.xpub
338 Note that the return value might be None; if it is unknown.
339 """
340 pass
341
342 @abstractmethod
343 def get_root_fingerprint(self) -> Optional[str]:
344 """Returns the bip32 fingerprint of the top level node.
345 This top level node is the node at the beginning of the derivation prefix,
346 i.e. applying the derivation prefix to it will result self.xpub
347 Note that the return value might be None; if it is unknown.
348 """
349 pass
350
351 @abstractmethod
352 def get_fp_and_derivation_to_be_used_in_partial_tx(
353 self,
354 der_suffix: Sequence[int],
355 *,
356 only_der_suffix: bool,
357 ) -> Tuple[bytes, Sequence[int]]:
358 """Returns fingerprint and derivation path corresponding to a derivation suffix.
359 The fingerprint is either the root fp or the intermediate fp, depending on what is available
360 and 'only_der_suffix', and the derivation path is adjusted accordingly.
361 """
362 pass
363
364 @abstractmethod
365 def derive_pubkey(self, for_change: int, n: int) -> bytes:
366 """Returns pubkey at given path.
367 May raise CannotDerivePubkey.
368 """
369 pass
370
371 def get_pubkey_derivation(
372 self,
373 pubkey: bytes,
374 txinout: Union['PartialTxInput', 'PartialTxOutput'],
375 *,
376 only_der_suffix=True,
377 ) -> Union[Sequence[int], str, None]:
378 EXPECTED_DER_SUFFIX_LEN = 2
379 def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
380 if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN:
381 return False
382 try:
383 if pubkey != self.derive_pubkey(*der_suffix):
384 return False
385 except CannotDerivePubkey:
386 return False
387 return True
388
389 if pubkey not in txinout.bip32_paths:
390 return None
391 fp_found, path_found = txinout.bip32_paths[pubkey]
392 der_suffix = None
393 full_path = None
394 # 1. try fp against our root
395 ks_root_fingerprint_hex = self.get_root_fingerprint()
396 ks_der_prefix_str = self.get_derivation_prefix()
397 ks_der_prefix = convert_bip32_path_to_list_of_uint32(ks_der_prefix_str) if ks_der_prefix_str else None
398 if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and
399 fp_found.hex() == ks_root_fingerprint_hex):
400 if path_found[:len(ks_der_prefix)] == ks_der_prefix:
401 der_suffix = path_found[len(ks_der_prefix):]
402 if not test_der_suffix_against_pubkey(der_suffix, pubkey):
403 der_suffix = None
404 # 2. try fp against our intermediate fingerprint
405 if (der_suffix is None and isinstance(self, Xpub) and
406 fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
407 der_suffix = path_found
408 if not test_der_suffix_against_pubkey(der_suffix, pubkey):
409 der_suffix = None
410 # 3. hack/bruteforce: ignore fp and check pubkey anyway
411 # This is only to resolve the following scenario/problem:
412 # problem: if we don't know our root fp, but tx contains root fp and full path,
413 # we will miss the pubkey (false negative match). Though it might still work
414 # within gap limit due to tx.add_info_from_wallet overwriting the fields.
415 # Example: keystore has intermediate xprv without root fp; tx contains root fp and full path.
416 if der_suffix is None:
417 der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:]
418 if not test_der_suffix_against_pubkey(der_suffix, pubkey):
419 der_suffix = None
420 # if all attempts/methods failed, we give up now:
421 if der_suffix is None:
422 return None
423 if ks_der_prefix is not None:
424 full_path = ks_der_prefix + list(der_suffix)
425 return der_suffix if only_der_suffix else full_path
426
427
428 class Xpub(MasterPublicKeyMixin):
429
430 def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
431 self.xpub = None
432 self.xpub_receive = None
433 self.xpub_change = None
434 self._xpub_bip32_node = None # type: Optional[BIP32Node]
435
436 # "key origin" info (subclass should persist these):
437 self._derivation_prefix = derivation_prefix # type: Optional[str]
438 self._root_fingerprint = root_fingerprint # type: Optional[str]
439
440 def get_master_public_key(self):
441 return self.xpub
442
443 def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]:
444 if self._xpub_bip32_node is None:
445 if self.xpub is None:
446 return None
447 self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub)
448 return self._xpub_bip32_node
449
450 def get_derivation_prefix(self) -> Optional[str]:
451 return self._derivation_prefix
452
453 def get_root_fingerprint(self) -> Optional[str]:
454 return self._root_fingerprint
455
456 def get_fp_and_derivation_to_be_used_in_partial_tx(
457 self,
458 der_suffix: Sequence[int],
459 *,
460 only_der_suffix: bool,
461 ) -> Tuple[bytes, Sequence[int]]:
462 fingerprint_hex = self.get_root_fingerprint()
463 der_prefix_str = self.get_derivation_prefix()
464 if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
465 # use root fp, and true full path
466 fingerprint_bytes = bfh(fingerprint_hex)
467 der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
468 else:
469 # use intermediate fp, and claim der suffix is the full path
470 fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()
471 der_prefix_ints = convert_bip32_path_to_list_of_uint32('m')
472 der_full = der_prefix_ints + list(der_suffix)
473 return fingerprint_bytes, der_full
474
475 def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str:
476 assert self.xpub
477 fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[],
478 only_der_suffix=only_der_suffix)
479 bip32node = self.get_bip32_node_for_xpub()
480 depth = len(der_full)
481 child_number_int = der_full[-1] if len(der_full) >= 1 else 0
482 child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big")
483 fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint
484 bip32node = bip32node._replace(depth=depth,
485 fingerprint=fingerprint,
486 child_number=child_number_bytes)
487 return bip32node.to_xpub()
488
489 def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node):
490 assert self.xpub
491 # try to derive ourselves from what we were given
492 child_node1 = root_node.subkey_at_private_derivation(derivation_prefix)
493 child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True)
494 child_node2 = self.get_bip32_node_for_xpub()
495 child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True)
496 if child_pubkey_bytes1 != child_pubkey_bytes2:
497 raise Exception("(xpub, derivation_prefix, root_node) inconsistency")
498 self.add_key_origin(derivation_prefix=derivation_prefix,
499 root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower())
500
501 def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None:
502 assert self.xpub
503 if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)):
504 raise Exception("root fp must be 8 hex characters")
505 derivation_prefix = normalize_bip32_derivation(derivation_prefix)
506 if not is_xkey_consistent_with_key_origin_info(self.xpub,
507 derivation_prefix=derivation_prefix,
508 root_fingerprint=root_fingerprint):
509 raise Exception("xpub inconsistent with provided key origin info")
510 if root_fingerprint is not None:
511 self._root_fingerprint = root_fingerprint
512 if derivation_prefix is not None:
513 self._derivation_prefix = derivation_prefix
514 self.is_requesting_to_be_rewritten_to_wallet_file = True
515
516 @lru_cache(maxsize=None)
517 def derive_pubkey(self, for_change: int, n: int) -> bytes:
518 for_change = int(for_change)
519 if for_change not in (0, 1):
520 raise CannotDerivePubkey("forbidden path")
521 xpub = self.xpub_change if for_change else self.xpub_receive
522 if xpub is None:
523 rootnode = self.get_bip32_node_for_xpub()
524 xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
525 if for_change:
526 self.xpub_change = xpub
527 else:
528 self.xpub_receive = xpub
529 return self.get_pubkey_from_xpub(xpub, (n,))
530
531 @classmethod
532 def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes:
533 node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
534 return node.eckey.get_public_key_bytes(compressed=True)
535
536
537 class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
538
539 type = 'bip32'
540
541 def __init__(self, d):
542 Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
543 Deterministic_KeyStore.__init__(self, d)
544 self.xpub = d.get('xpub')
545 self.xprv = d.get('xprv')
546
547 def format_seed(self, seed):
548 return ' '.join(seed.split())
549
550 def dump(self):
551 d = Deterministic_KeyStore.dump(self)
552 d['xpub'] = self.xpub
553 d['xprv'] = self.xprv
554 d['derivation'] = self.get_derivation_prefix()
555 d['root_fingerprint'] = self.get_root_fingerprint()
556 return d
557
558 def get_master_private_key(self, password):
559 return pw_decode(self.xprv, password, version=self.pw_hash_version)
560
561 def check_password(self, password):
562 xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
563 try:
564 bip32node = BIP32Node.from_xkey(xprv)
565 except BaseDecodeError as e:
566 raise InvalidPassword() from e
567 if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode:
568 raise InvalidPassword()
569
570 def update_password(self, old_password, new_password):
571 self.check_password(old_password)
572 if new_password == '':
573 new_password = None
574 if self.has_seed():
575 decoded = self.get_seed(old_password)
576 self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
577 if self.passphrase:
578 decoded = self.get_passphrase(old_password)
579 self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
580 if self.xprv is not None:
581 b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
582 self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
583 self.pw_hash_version = PW_HASH_VERSION_LATEST
584
585 def is_watching_only(self):
586 return self.xprv is None
587
588 def add_xpub(self, xpub):
589 assert is_xpub(xpub)
590 self.xpub = xpub
591 root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub)
592 self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint)
593
594 def add_xprv(self, xprv):
595 assert is_xprv(xprv)
596 self.xprv = xprv
597 self.add_xpub(bip32.xpub_from_xprv(xprv))
598
599 def add_xprv_from_seed(self, bip32_seed, xtype, derivation):
600 rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
601 node = rootnode.subkey_at_private_derivation(derivation)
602 self.add_xprv(node.to_xprv())
603 self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode)
604
605 def get_private_key(self, sequence: Sequence[int], password):
606 xprv = self.get_master_private_key(password)
607 node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
608 pk = node.eckey.get_secret_bytes()
609 return pk, True
610
611 def get_keypair(self, sequence, password):
612 k, _ = self.get_private_key(sequence, password)
613 cK = ecc.ECPrivkey(k).get_public_key_bytes()
614 return cK, k
615
616
617 class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
618
619 type = 'old'
620
621 def __init__(self, d):
622 Deterministic_KeyStore.__init__(self, d)
623 self.mpk = d.get('mpk')
624 self._root_fingerprint = None
625
626 def get_hex_seed(self, password):
627 return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8')
628
629 def dump(self):
630 d = Deterministic_KeyStore.dump(self)
631 d['mpk'] = self.mpk
632 return d
633
634 def add_seed(self, seedphrase):
635 Deterministic_KeyStore.add_seed(self, seedphrase)
636 s = self.get_hex_seed(None)
637 self.mpk = self.mpk_from_seed(s)
638
639 def add_master_public_key(self, mpk):
640 self.mpk = mpk
641
642 def format_seed(self, seed):
643 from . import old_mnemonic, mnemonic
644 seed = mnemonic.normalize_text(seed)
645 # see if seed was entered as hex
646 if seed:
647 try:
648 bfh(seed)
649 return str(seed)
650 except Exception:
651 pass
652 words = seed.split()
653 seed = old_mnemonic.mn_decode(words)
654 if not seed:
655 raise Exception("Invalid seed")
656 return seed
657
658 def get_seed(self, password):
659 from . import old_mnemonic
660 s = self.get_hex_seed(password)
661 return ' '.join(old_mnemonic.mn_encode(s))
662
663 @classmethod
664 def mpk_from_seed(klass, seed):
665 secexp = klass.stretch_key(seed)
666 privkey = ecc.ECPrivkey.from_secret_scalar(secexp)
667 return privkey.get_public_key_hex(compressed=False)[2:]
668
669 @classmethod
670 def stretch_key(self, seed):
671 x = seed
672 for i in range(100000):
673 x = hashlib.sha256(x + seed).digest()
674 return string_to_number(x)
675
676 @classmethod
677 def get_sequence(self, mpk, for_change, n):
678 return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk)))
679
680 @classmethod
681 def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes:
682 z = cls.get_sequence(mpk, for_change, n)
683 master_public_key = ecc.ECPubkey(bfh('04'+mpk))
684 public_key = master_public_key + z*ecc.GENERATOR
685 return public_key.get_public_key_bytes(compressed=False)
686
687 @lru_cache(maxsize=None)
688 def derive_pubkey(self, for_change, n) -> bytes:
689 for_change = int(for_change)
690 if for_change not in (0, 1):
691 raise CannotDerivePubkey("forbidden path")
692 return self.get_pubkey_from_mpk(self.mpk, for_change, n)
693
694 def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
695 secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
696 pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False)
697 return pk
698
699 def get_private_key(self, sequence: Sequence[int], password):
700 seed = self.get_hex_seed(password)
701 secexp = self.stretch_key(seed)
702 self._check_seed(seed, secexp=secexp)
703 for_change, n = sequence
704 pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
705 return pk, False
706
707 def _check_seed(self, seed, *, secexp=None):
708 if secexp is None:
709 secexp = self.stretch_key(seed)
710 master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
711 master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:]
712 if master_public_key != bfh(self.mpk):
713 raise InvalidPassword()
714
715 def check_password(self, password):
716 seed = self.get_hex_seed(password)
717 self._check_seed(seed)
718
719 def get_master_public_key(self):
720 return self.mpk
721
722 def get_derivation_prefix(self) -> str:
723 return 'm'
724
725 def get_root_fingerprint(self) -> str:
726 if self._root_fingerprint is None:
727 master_public_key = ecc.ECPubkey(bfh('04'+self.mpk))
728 xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4]
729 self._root_fingerprint = xfp.hex().lower()
730 return self._root_fingerprint
731
732 def get_fp_and_derivation_to_be_used_in_partial_tx(
733 self,
734 der_suffix: Sequence[int],
735 *,
736 only_der_suffix: bool,
737 ) -> Tuple[bytes, Sequence[int]]:
738 fingerprint_hex = self.get_root_fingerprint()
739 der_prefix_str = self.get_derivation_prefix()
740 fingerprint_bytes = bfh(fingerprint_hex)
741 der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
742 der_full = der_prefix_ints + list(der_suffix)
743 return fingerprint_bytes, der_full
744
745 def update_password(self, old_password, new_password):
746 self.check_password(old_password)
747 if new_password == '':
748 new_password = None
749 if self.has_seed():
750 decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
751 self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
752 self.pw_hash_version = PW_HASH_VERSION_LATEST
753
754
755 class Hardware_KeyStore(Xpub, KeyStore):
756 hw_type: str
757 device: str
758 plugin: 'HW_PluginBase'
759 thread: Optional['TaskThread'] = None
760
761 type = 'hardware'
762
763 def __init__(self, d):
764 Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
765 KeyStore.__init__(self)
766 # Errors and other user interaction is done through the wallet's
767 # handler. The handler is per-window and preserved across
768 # device reconnects
769 self.xpub = d.get('xpub')
770 self.label = d.get('label')
771 self.soft_device_id = d.get('soft_device_id') # type: Optional[str]
772 self.handler = None # type: Optional[HardwareHandlerBase]
773 run_hook('init_keystore', self)
774
775 def set_label(self, label):
776 self.label = label
777
778 def may_have_password(self):
779 return False
780
781 def is_deterministic(self):
782 return True
783
784 def get_type_text(self) -> str:
785 return f'hw[{self.hw_type}]'
786
787 def dump(self):
788 return {
789 'type': self.type,
790 'hw_type': self.hw_type,
791 'xpub': self.xpub,
792 'derivation': self.get_derivation_prefix(),
793 'root_fingerprint': self.get_root_fingerprint(),
794 'label':self.label,
795 'soft_device_id': self.soft_device_id,
796 }
797
798 def unpaired(self):
799 '''A device paired with the wallet was disconnected. This can be
800 called in any thread context.'''
801 self.logger.info("unpaired")
802
803 def paired(self):
804 '''A device paired with the wallet was (re-)connected. This can be
805 called in any thread context.'''
806 self.logger.info("paired")
807
808 def is_watching_only(self):
809 '''The wallet is not watching-only; the user will be prompted for
810 pin and passphrase as appropriate when needed.'''
811 assert not self.has_seed()
812 return False
813
814 def get_password_for_storage_encryption(self) -> str:
815 client = self.plugin.get_client(self)
816 return client.get_password_for_storage_encryption()
817
818 def has_usable_connection_with_device(self) -> bool:
819 if not hasattr(self, 'plugin'):
820 return False
821 client = self.plugin.get_client(self, force_pair=False)
822 if client is None:
823 return False
824 return client.has_usable_connection_with_device()
825
826 def ready_to_sign(self):
827 return super().ready_to_sign() and self.has_usable_connection_with_device()
828
829 def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'):
830 assert client is not None
831 if self._root_fingerprint is None:
832 self._root_fingerprint = client.request_root_fingerprint_from_device()
833 self.is_requesting_to_be_rewritten_to_wallet_file = True
834 if self.label != client.label():
835 self.label = client.label()
836 self.is_requesting_to_be_rewritten_to_wallet_file = True
837 if self.soft_device_id != client.get_soft_device_id():
838 self.soft_device_id = client.get_soft_device_id()
839 self.is_requesting_to_be_rewritten_to_wallet_file = True
840
841
842 KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin] # intersection really...
843 AddressIndexGeneric = Union[Sequence[int], str] # can be hex pubkey str
844
845
846 def bip39_normalize_passphrase(passphrase):
847 return normalize('NFKD', passphrase or '')
848
849 def bip39_to_seed(mnemonic, passphrase):
850 import hashlib, hmac
851 PBKDF2_ROUNDS = 2048
852 mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
853 passphrase = bip39_normalize_passphrase(passphrase)
854 return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'),
855 b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS)
856
857
858 def bip39_is_checksum_valid(mnemonic: str) -> Tuple[bool, bool]:
859 """Test checksum of bip39 mnemonic assuming English wordlist.
860 Returns tuple (is_checksum_valid, is_wordlist_valid)
861 """
862 words = [ normalize('NFKD', word) for word in mnemonic.split() ]
863 words_len = len(words)
864 wordlist = Wordlist.from_file("english.txt")
865 n = len(wordlist)
866 i = 0
867 words.reverse()
868 while words:
869 w = words.pop()
870 try:
871 k = wordlist.index(w)
872 except ValueError:
873 return False, False
874 i = i*n + k
875 if words_len not in [12, 15, 18, 21, 24]:
876 return False, True
877 checksum_length = 11 * words_len // 33 # num bits
878 entropy_length = 32 * checksum_length # num bits
879 entropy = i >> checksum_length
880 checksum = i % 2**checksum_length
881 entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big")
882 hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big")
883 calculated_checksum = hashed >> (256 - checksum_length)
884 return checksum == calculated_checksum, True
885
886
887 def from_bip39_seed(seed, passphrase, derivation, xtype=None):
888 k = BIP32_KeyStore({})
889 bip32_seed = bip39_to_seed(seed, passphrase)
890 if xtype is None:
891 xtype = xtype_from_derivation(derivation)
892 k.add_xprv_from_seed(bip32_seed, xtype, derivation)
893 return k
894
895
896 PURPOSE48_SCRIPT_TYPES = {
897 'p2wsh-p2sh': 1, # specifically multisig
898 'p2wsh': 2, # specifically multisig
899 }
900 PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
901
902
903 def xtype_from_derivation(derivation: str) -> str:
904 """Returns the script type to be used for this derivation."""
905 bip32_indices = convert_bip32_path_to_list_of_uint32(derivation)
906 if len(bip32_indices) >= 1:
907 if bip32_indices[0] == 84 + BIP32_PRIME:
908 return 'p2wpkh'
909 elif bip32_indices[0] == 49 + BIP32_PRIME:
910 return 'p2wpkh-p2sh'
911 elif bip32_indices[0] == 44 + BIP32_PRIME:
912 return 'standard'
913 elif bip32_indices[0] == 45 + BIP32_PRIME:
914 return 'standard'
915
916 if len(bip32_indices) >= 4:
917 if bip32_indices[0] == 48 + BIP32_PRIME:
918 # m / purpose' / coin_type' / account' / script_type' / change / address_index
919 script_type_int = bip32_indices[3] - BIP32_PRIME
920 script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int)
921 if script_type is not None:
922 return script_type
923 return 'standard'
924
925
926 hw_keystores = {}
927
928 def register_keystore(hw_type, constructor):
929 hw_keystores[hw_type] = constructor
930
931 def hardware_keystore(d) -> Hardware_KeyStore:
932 hw_type = d['hw_type']
933 if hw_type in hw_keystores:
934 constructor = hw_keystores[hw_type]
935 return constructor(d)
936 raise WalletFileException(f'unknown hardware type: {hw_type}. '
937 f'hw_keystores: {list(hw_keystores)}')
938
939 def load_keystore(db: 'WalletDB', name: str) -> KeyStore:
940 d = db.get(name, {})
941 t = d.get('type')
942 if not t:
943 raise WalletFileException(
944 'Wallet format requires update.\n'
945 'Cannot find keystore for name {}'.format(name))
946 keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]}
947 keystore_constructors['hardware'] = hardware_keystore
948 try:
949 ks_constructor = keystore_constructors[t]
950 except KeyError:
951 raise WalletFileException(f'Unknown type {t} for keystore named {name}')
952 k = ks_constructor(d)
953 return k
954
955
956 def is_old_mpk(mpk: str) -> bool:
957 try:
958 int(mpk, 16) # test if hex string
959 except:
960 return False
961 if len(mpk) != 128:
962 return False
963 try:
964 ecc.ECPubkey(bfh('04' + mpk))
965 except:
966 return False
967 return True
968
969
970 def is_address_list(text):
971 parts = text.split()
972 return bool(parts) and all(bitcoin.is_address(x) for x in parts)
973
974
975 def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False):
976 if allow_spaces_inside_key: # see #1612
977 parts = text.split('\n')
978 parts = map(lambda x: ''.join(x.split()), parts)
979 parts = list(filter(bool, parts))
980 else:
981 parts = text.split()
982 if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts):
983 return parts
984
985
986 def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False):
987 return bool(get_private_keys(text,
988 allow_spaces_inside_key=allow_spaces_inside_key,
989 raise_on_error=raise_on_error))
990
991
992 def is_master_key(x):
993 return is_old_mpk(x) or is_bip32_key(x)
994
995
996 def is_bip32_key(x):
997 return is_xprv(x) or is_xpub(x)
998
999
1000 def bip44_derivation(account_id, bip43_purpose=44):
1001 coin = constants.net.BIP44_COIN_TYPE
1002 der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
1003 return normalize_bip32_derivation(der)
1004
1005
1006 def purpose48_derivation(account_id: int, xtype: str) -> str:
1007 # m / purpose' / coin_type' / account' / script_type' / change / address_index
1008 bip43_purpose = 48
1009 coin = constants.net.BIP44_COIN_TYPE
1010 account_id = int(account_id)
1011 script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype)
1012 if script_type_int is None:
1013 raise Exception('unknown xtype: {}'.format(xtype))
1014 der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int)
1015 return normalize_bip32_derivation(der)
1016
1017
1018 def from_seed(seed, passphrase, is_p2sh=False):
1019 t = seed_type(seed)
1020 if t == 'old':
1021 keystore = Old_KeyStore({})
1022 keystore.add_seed(seed)
1023 elif t in ['standard', 'segwit']:
1024 keystore = BIP32_KeyStore({})
1025 keystore.add_seed(seed)
1026 keystore.passphrase = passphrase
1027 bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase)
1028 if t == 'standard':
1029 der = "m/"
1030 xtype = 'standard'
1031 else:
1032 der = "m/1'/" if is_p2sh else "m/0'/"
1033 xtype = 'p2wsh' if is_p2sh else 'p2wpkh'
1034 keystore.add_xprv_from_seed(bip32_seed, xtype, der)
1035 else:
1036 raise BitcoinException('Unexpected seed type {}'.format(repr(t)))
1037 return keystore
1038
1039 def from_private_key_list(text):
1040 keystore = Imported_KeyStore({})
1041 for x in get_private_keys(text):
1042 keystore.import_privkey(x, None)
1043 return keystore
1044
1045 def from_old_mpk(mpk):
1046 keystore = Old_KeyStore({})
1047 keystore.add_master_public_key(mpk)
1048 return keystore
1049
1050 def from_xpub(xpub):
1051 k = BIP32_KeyStore({})
1052 k.add_xpub(xpub)
1053 return k
1054
1055 def from_xprv(xprv):
1056 k = BIP32_KeyStore({})
1057 k.add_xprv(xprv)
1058 return k
1059
1060 def from_master_key(text):
1061 if is_xprv(text):
1062 k = from_xprv(text)
1063 elif is_old_mpk(text):
1064 k = from_old_mpk(text)
1065 elif is_xpub(text):
1066 k = from_xpub(text)
1067 else:
1068 raise BitcoinException('Invalid master key')
1069 return k