URI: 
       tkeystore.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tkeystore.py (40334B)
       ---
            1 #!/usr/bin/env python2
            2 # -*- mode: python -*-
            3 #
            4 # Electrum - lightweight Bitcoin client
            5 # Copyright (C) 2016  The Electrum developers
            6 #
            7 # Permission is hereby granted, free of charge, to any person
            8 # obtaining a copy of this software and associated documentation files
            9 # (the "Software"), to deal in the Software without restriction,
           10 # including without limitation the rights to use, copy, modify, merge,
           11 # publish, distribute, sublicense, and/or sell copies of the Software,
           12 # and to permit persons to whom the Software is furnished to do so,
           13 # subject to the following conditions:
           14 #
           15 # The above copyright notice and this permission notice shall be
           16 # included in all copies or substantial portions of the Software.
           17 #
           18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           19 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           20 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           21 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           22 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           23 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           24 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           25 # SOFTWARE.
           26 
           27 from unicodedata import normalize
           28 import hashlib
           29 import re
           30 from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
           31 from functools import lru_cache
           32 from abc import ABC, abstractmethod
           33 
           34 from . import bitcoin, ecc, constants, bip32
           35 from .bitcoin import deserialize_privkey, serialize_privkey, BaseDecodeError
           36 from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput, TxInput
           37 from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME,
           38                     is_xpub, is_xprv, BIP32Node, normalize_bip32_derivation,
           39                     convert_bip32_intpath_to_strpath, is_xkey_consistent_with_key_origin_info)
           40 from .ecc import string_to_number
           41 from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
           42                      SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160)
           43 from .util import (InvalidPassword, WalletFileException,
           44                    BitcoinException, bh2u, bfh, inv_dict, is_hex_str)
           45 from .mnemonic import Mnemonic, Wordlist, seed_type, is_seed
           46 from .plugin import run_hook
           47 from .logging import Logger
           48 
           49 if TYPE_CHECKING:
           50     from .gui.qt.util import TaskThread
           51     from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase, HardwareHandlerBase
           52     from .wallet_db import WalletDB
           53 
           54 
           55 class CannotDerivePubkey(Exception): pass
           56 
           57 
           58 class KeyStore(Logger, ABC):
           59     type: str
           60 
           61     def __init__(self):
           62         Logger.__init__(self)
           63         self.is_requesting_to_be_rewritten_to_wallet_file = False  # type: bool
           64 
           65     def has_seed(self) -> bool:
           66         return False
           67 
           68     def is_watching_only(self) -> bool:
           69         return False
           70 
           71     def can_import(self) -> bool:
           72         return False
           73 
           74     def get_type_text(self) -> str:
           75         return f'{self.type}'
           76 
           77     @abstractmethod
           78     def may_have_password(self):
           79         """Returns whether the keystore can be encrypted with a password."""
           80         pass
           81 
           82     def _get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]:
           83         keypairs = {}
           84         for txin in tx.inputs():
           85             keypairs.update(self._get_txin_derivations(txin))
           86         return keypairs
           87 
           88     def _get_txin_derivations(self, txin: 'PartialTxInput') -> Dict[str, Union[Sequence[int], str]]:
           89         if txin.is_complete():
           90             return {}
           91         keypairs = {}
           92         for pubkey in txin.pubkeys:
           93             if pubkey in txin.part_sigs:
           94                 # this pubkey already signed
           95                 continue
           96             derivation = self.get_pubkey_derivation(pubkey, txin)
           97             if not derivation:
           98                 continue
           99             keypairs[pubkey.hex()] = derivation
          100         return keypairs
          101 
          102     def can_sign(self, tx: 'Transaction', *, ignore_watching_only=False) -> bool:
          103         """Returns whether this keystore could sign *something* in this tx."""
          104         if not ignore_watching_only and self.is_watching_only():
          105             return False
          106         if not isinstance(tx, PartialTransaction):
          107             return False
          108         return bool(self._get_tx_derivations(tx))
          109 
          110     def can_sign_txin(self, txin: 'TxInput', *, ignore_watching_only=False) -> bool:
          111         """Returns whether this keystore could sign this txin."""
          112         if not ignore_watching_only and self.is_watching_only():
          113             return False
          114         if not isinstance(txin, PartialTxInput):
          115             return False
          116         return bool(self._get_txin_derivations(txin))
          117 
          118     def ready_to_sign(self) -> bool:
          119         return not self.is_watching_only()
          120 
          121     @abstractmethod
          122     def dump(self) -> dict:
          123         pass
          124 
          125     @abstractmethod
          126     def is_deterministic(self) -> bool:
          127         pass
          128 
          129     @abstractmethod
          130     def sign_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
          131         pass
          132 
          133     @abstractmethod
          134     def decrypt_message(self, sequence: 'AddressIndexGeneric', message, password) -> bytes:
          135         pass
          136 
          137     @abstractmethod
          138     def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
          139         pass
          140 
          141     @abstractmethod
          142     def get_pubkey_derivation(self, pubkey: bytes,
          143                               txinout: Union['PartialTxInput', 'PartialTxOutput'],
          144                               *, only_der_suffix=True) \
          145             -> Union[Sequence[int], str, None]:
          146         """Returns either a derivation int-list if the pubkey can be HD derived from this keystore,
          147         the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
          148         or None if the pubkey is unrelated.
          149         """
          150         pass
          151 
          152     def find_my_pubkey_in_txinout(
          153             self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
          154             *, only_der_suffix: bool = False
          155     ) -> Tuple[Optional[bytes], Optional[List[int]]]:
          156         # note: we assume that this cosigner only has one pubkey in this txin/txout
          157         for pubkey in txinout.bip32_paths:
          158             path = self.get_pubkey_derivation(pubkey, txinout, only_der_suffix=only_der_suffix)
          159             if path and not isinstance(path, (str, bytes)):
          160                 return pubkey, list(path)
          161         return None, None
          162 
          163 
          164 class Software_KeyStore(KeyStore):
          165 
          166     def __init__(self, d):
          167         KeyStore.__init__(self)
          168         self.pw_hash_version = d.get('pw_hash_version', 1)
          169         if self.pw_hash_version not in SUPPORTED_PW_HASH_VERSIONS:
          170             raise UnsupportedPasswordHashVersion(self.pw_hash_version)
          171 
          172     def may_have_password(self):
          173         return not self.is_watching_only()
          174 
          175     def sign_message(self, sequence, message, password) -> bytes:
          176         privkey, compressed = self.get_private_key(sequence, password)
          177         key = ecc.ECPrivkey(privkey)
          178         return key.sign_message(message, compressed)
          179 
          180     def decrypt_message(self, sequence, message, password) -> bytes:
          181         privkey, compressed = self.get_private_key(sequence, password)
          182         ec = ecc.ECPrivkey(privkey)
          183         decrypted = ec.decrypt_message(message)
          184         return decrypted
          185 
          186     def sign_transaction(self, tx, password):
          187         if self.is_watching_only():
          188             return
          189         # Raise if password is not correct.
          190         self.check_password(password)
          191         # Add private keys
          192         keypairs = self._get_tx_derivations(tx)
          193         for k, v in keypairs.items():
          194             keypairs[k] = self.get_private_key(v, password)
          195         # Sign
          196         if keypairs:
          197             tx.sign(keypairs)
          198 
          199     @abstractmethod
          200     def update_password(self, old_password, new_password):
          201         pass
          202 
          203     @abstractmethod
          204     def check_password(self, password):
          205         pass
          206 
          207     @abstractmethod
          208     def get_private_key(self, sequence: 'AddressIndexGeneric', password) -> Tuple[bytes, bool]:
          209         """Returns (privkey, is_compressed)"""
          210         pass
          211 
          212 
          213 class Imported_KeyStore(Software_KeyStore):
          214     # keystore for imported private keys
          215 
          216     type = 'imported'
          217 
          218     def __init__(self, d):
          219         Software_KeyStore.__init__(self, d)
          220         self.keypairs = d.get('keypairs', {})  # type: Dict[str, str]
          221 
          222     def is_deterministic(self):
          223         return False
          224 
          225     def dump(self):
          226         return {
          227             'type': self.type,
          228             'keypairs': self.keypairs,
          229             'pw_hash_version': self.pw_hash_version,
          230         }
          231 
          232     def can_import(self):
          233         return True
          234 
          235     def check_password(self, password):
          236         pubkey = list(self.keypairs.keys())[0]
          237         self.get_private_key(pubkey, password)
          238 
          239     def import_privkey(self, sec, password):
          240         txin_type, privkey, compressed = deserialize_privkey(sec)
          241         pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
          242         # re-serialize the key so the internal storage format is consistent
          243         serialized_privkey = serialize_privkey(
          244             privkey, compressed, txin_type, internal_use=True)
          245         # NOTE: if the same pubkey is reused for multiple addresses (script types),
          246         # there will only be one pubkey-privkey pair for it in self.keypairs,
          247         # and the privkey will encode a txin_type but that txin_type cannot be trusted.
          248         # Removing keys complicates this further.
          249         self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
          250         return txin_type, pubkey
          251 
          252     def delete_imported_key(self, key):
          253         self.keypairs.pop(key)
          254 
          255     def get_private_key(self, pubkey: str, password):
          256         sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
          257         try:
          258             txin_type, privkey, compressed = deserialize_privkey(sec)
          259         except BaseDecodeError as e:
          260             raise InvalidPassword() from e
          261         if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
          262             raise InvalidPassword()
          263         return privkey, compressed
          264 
          265     def get_pubkey_derivation(self, pubkey, txin, *, only_der_suffix=True):
          266         if pubkey.hex() in self.keypairs:
          267             return pubkey.hex()
          268         return None
          269 
          270     def update_password(self, old_password, new_password):
          271         self.check_password(old_password)
          272         if new_password == '':
          273             new_password = None
          274         for k, v in self.keypairs.items():
          275             b = pw_decode(v, old_password, version=self.pw_hash_version)
          276             c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
          277             self.keypairs[k] = c
          278         self.pw_hash_version = PW_HASH_VERSION_LATEST
          279 
          280 
          281 class Deterministic_KeyStore(Software_KeyStore):
          282 
          283     def __init__(self, d):
          284         Software_KeyStore.__init__(self, d)
          285         self.seed = d.get('seed', '')
          286         self.passphrase = d.get('passphrase', '')
          287 
          288     def is_deterministic(self):
          289         return True
          290 
          291     def dump(self):
          292         d = {
          293             'type': self.type,
          294             'pw_hash_version': self.pw_hash_version,
          295         }
          296         if self.seed:
          297             d['seed'] = self.seed
          298         if self.passphrase:
          299             d['passphrase'] = self.passphrase
          300         return d
          301 
          302     def has_seed(self):
          303         return bool(self.seed)
          304 
          305     def is_watching_only(self):
          306         return not self.has_seed()
          307 
          308     @abstractmethod
          309     def format_seed(self, seed: str) -> str:
          310         pass
          311 
          312     def add_seed(self, seed):
          313         if self.seed:
          314             raise Exception("a seed exists")
          315         self.seed = self.format_seed(seed)
          316 
          317     def get_seed(self, password):
          318         if not self.has_seed():
          319             raise Exception("This wallet has no seed words")
          320         return pw_decode(self.seed, password, version=self.pw_hash_version)
          321 
          322     def get_passphrase(self, password):
          323         if self.passphrase:
          324             return pw_decode(self.passphrase, password, version=self.pw_hash_version)
          325         else:
          326             return ''
          327 
          328 
          329 class MasterPublicKeyMixin(ABC):
          330 
          331     @abstractmethod
          332     def get_master_public_key(self) -> str:
          333         pass
          334 
          335     @abstractmethod
          336     def get_derivation_prefix(self) -> Optional[str]:
          337         """Returns to bip32 path from some root node to self.xpub
          338         Note that the return value might be None; if it is unknown.
          339         """
          340         pass
          341 
          342     @abstractmethod
          343     def get_root_fingerprint(self) -> Optional[str]:
          344         """Returns the bip32 fingerprint of the top level node.
          345         This top level node is the node at the beginning of the derivation prefix,
          346         i.e. applying the derivation prefix to it will result self.xpub
          347         Note that the return value might be None; if it is unknown.
          348         """
          349         pass
          350 
          351     @abstractmethod
          352     def get_fp_and_derivation_to_be_used_in_partial_tx(
          353             self,
          354             der_suffix: Sequence[int],
          355             *,
          356             only_der_suffix: bool,
          357     ) -> Tuple[bytes, Sequence[int]]:
          358         """Returns fingerprint and derivation path corresponding to a derivation suffix.
          359         The fingerprint is either the root fp or the intermediate fp, depending on what is available
          360         and 'only_der_suffix', and the derivation path is adjusted accordingly.
          361         """
          362         pass
          363 
          364     @abstractmethod
          365     def derive_pubkey(self, for_change: int, n: int) -> bytes:
          366         """Returns pubkey at given path.
          367         May raise CannotDerivePubkey.
          368         """
          369         pass
          370 
          371     def get_pubkey_derivation(
          372             self,
          373             pubkey: bytes,
          374             txinout: Union['PartialTxInput', 'PartialTxOutput'],
          375             *,
          376             only_der_suffix=True,
          377     ) -> Union[Sequence[int], str, None]:
          378         EXPECTED_DER_SUFFIX_LEN = 2
          379         def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
          380             if len(der_suffix) != EXPECTED_DER_SUFFIX_LEN:
          381                 return False
          382             try:
          383                 if pubkey != self.derive_pubkey(*der_suffix):
          384                     return False
          385             except CannotDerivePubkey:
          386                 return False
          387             return True
          388 
          389         if pubkey not in txinout.bip32_paths:
          390             return None
          391         fp_found, path_found = txinout.bip32_paths[pubkey]
          392         der_suffix = None
          393         full_path = None
          394         # 1. try fp against our root
          395         ks_root_fingerprint_hex = self.get_root_fingerprint()
          396         ks_der_prefix_str = self.get_derivation_prefix()
          397         ks_der_prefix = convert_bip32_path_to_list_of_uint32(ks_der_prefix_str) if ks_der_prefix_str else None
          398         if (ks_root_fingerprint_hex is not None and ks_der_prefix is not None and
          399                 fp_found.hex() == ks_root_fingerprint_hex):
          400             if path_found[:len(ks_der_prefix)] == ks_der_prefix:
          401                 der_suffix = path_found[len(ks_der_prefix):]
          402                 if not test_der_suffix_against_pubkey(der_suffix, pubkey):
          403                     der_suffix = None
          404         # 2. try fp against our intermediate fingerprint
          405         if (der_suffix is None and isinstance(self, Xpub) and
          406                 fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
          407             der_suffix = path_found
          408             if not test_der_suffix_against_pubkey(der_suffix, pubkey):
          409                 der_suffix = None
          410         # 3. hack/bruteforce: ignore fp and check pubkey anyway
          411         #    This is only to resolve the following scenario/problem:
          412         #    problem: if we don't know our root fp, but tx contains root fp and full path,
          413         #             we will miss the pubkey (false negative match). Though it might still work
          414         #             within gap limit due to tx.add_info_from_wallet overwriting the fields.
          415         #             Example: keystore has intermediate xprv without root fp; tx contains root fp and full path.
          416         if der_suffix is None:
          417             der_suffix = path_found[-EXPECTED_DER_SUFFIX_LEN:]
          418             if not test_der_suffix_against_pubkey(der_suffix, pubkey):
          419                 der_suffix = None
          420         # if all attempts/methods failed, we give up now:
          421         if der_suffix is None:
          422             return None
          423         if ks_der_prefix is not None:
          424             full_path = ks_der_prefix + list(der_suffix)
          425         return der_suffix if only_der_suffix else full_path
          426 
          427 
          428 class Xpub(MasterPublicKeyMixin):
          429 
          430     def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
          431         self.xpub = None
          432         self.xpub_receive = None
          433         self.xpub_change = None
          434         self._xpub_bip32_node = None  # type: Optional[BIP32Node]
          435 
          436         # "key origin" info (subclass should persist these):
          437         self._derivation_prefix = derivation_prefix  # type: Optional[str]
          438         self._root_fingerprint = root_fingerprint  # type: Optional[str]
          439 
          440     def get_master_public_key(self):
          441         return self.xpub
          442 
          443     def get_bip32_node_for_xpub(self) -> Optional[BIP32Node]:
          444         if self._xpub_bip32_node is None:
          445             if self.xpub is None:
          446                 return None
          447             self._xpub_bip32_node = BIP32Node.from_xkey(self.xpub)
          448         return self._xpub_bip32_node
          449 
          450     def get_derivation_prefix(self) -> Optional[str]:
          451         return self._derivation_prefix
          452 
          453     def get_root_fingerprint(self) -> Optional[str]:
          454         return self._root_fingerprint
          455 
          456     def get_fp_and_derivation_to_be_used_in_partial_tx(
          457             self,
          458             der_suffix: Sequence[int],
          459             *,
          460             only_der_suffix: bool,
          461     ) -> Tuple[bytes, Sequence[int]]:
          462         fingerprint_hex = self.get_root_fingerprint()
          463         der_prefix_str = self.get_derivation_prefix()
          464         if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
          465             # use root fp, and true full path
          466             fingerprint_bytes = bfh(fingerprint_hex)
          467             der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
          468         else:
          469             # use intermediate fp, and claim der suffix is the full path
          470             fingerprint_bytes = self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()
          471             der_prefix_ints = convert_bip32_path_to_list_of_uint32('m')
          472         der_full = der_prefix_ints + list(der_suffix)
          473         return fingerprint_bytes, der_full
          474 
          475     def get_xpub_to_be_used_in_partial_tx(self, *, only_der_suffix: bool) -> str:
          476         assert self.xpub
          477         fp_bytes, der_full = self.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[],
          478                                                                                  only_der_suffix=only_der_suffix)
          479         bip32node = self.get_bip32_node_for_xpub()
          480         depth = len(der_full)
          481         child_number_int = der_full[-1] if len(der_full) >= 1 else 0
          482         child_number_bytes = child_number_int.to_bytes(length=4, byteorder="big")
          483         fingerprint = bytes(4) if depth == 0 else bip32node.fingerprint
          484         bip32node = bip32node._replace(depth=depth,
          485                                        fingerprint=fingerprint,
          486                                        child_number=child_number_bytes)
          487         return bip32node.to_xpub()
          488 
          489     def add_key_origin_from_root_node(self, *, derivation_prefix: str, root_node: BIP32Node):
          490         assert self.xpub
          491         # try to derive ourselves from what we were given
          492         child_node1 = root_node.subkey_at_private_derivation(derivation_prefix)
          493         child_pubkey_bytes1 = child_node1.eckey.get_public_key_bytes(compressed=True)
          494         child_node2 = self.get_bip32_node_for_xpub()
          495         child_pubkey_bytes2 = child_node2.eckey.get_public_key_bytes(compressed=True)
          496         if child_pubkey_bytes1 != child_pubkey_bytes2:
          497             raise Exception("(xpub, derivation_prefix, root_node) inconsistency")
          498         self.add_key_origin(derivation_prefix=derivation_prefix,
          499                             root_fingerprint=root_node.calc_fingerprint_of_this_node().hex().lower())
          500 
          501     def add_key_origin(self, *, derivation_prefix: str = None, root_fingerprint: str = None) -> None:
          502         assert self.xpub
          503         if not (root_fingerprint is None or (is_hex_str(root_fingerprint) and len(root_fingerprint) == 8)):
          504             raise Exception("root fp must be 8 hex characters")
          505         derivation_prefix = normalize_bip32_derivation(derivation_prefix)
          506         if not is_xkey_consistent_with_key_origin_info(self.xpub,
          507                                                        derivation_prefix=derivation_prefix,
          508                                                        root_fingerprint=root_fingerprint):
          509             raise Exception("xpub inconsistent with provided key origin info")
          510         if root_fingerprint is not None:
          511             self._root_fingerprint = root_fingerprint
          512         if derivation_prefix is not None:
          513             self._derivation_prefix = derivation_prefix
          514         self.is_requesting_to_be_rewritten_to_wallet_file = True
          515 
          516     @lru_cache(maxsize=None)
          517     def derive_pubkey(self, for_change: int, n: int) -> bytes:
          518         for_change = int(for_change)
          519         if for_change not in (0, 1):
          520             raise CannotDerivePubkey("forbidden path")
          521         xpub = self.xpub_change if for_change else self.xpub_receive
          522         if xpub is None:
          523             rootnode = self.get_bip32_node_for_xpub()
          524             xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
          525             if for_change:
          526                 self.xpub_change = xpub
          527             else:
          528                 self.xpub_receive = xpub
          529         return self.get_pubkey_from_xpub(xpub, (n,))
          530 
          531     @classmethod
          532     def get_pubkey_from_xpub(self, xpub: str, sequence) -> bytes:
          533         node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
          534         return node.eckey.get_public_key_bytes(compressed=True)
          535 
          536 
          537 class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
          538 
          539     type = 'bip32'
          540 
          541     def __init__(self, d):
          542         Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
          543         Deterministic_KeyStore.__init__(self, d)
          544         self.xpub = d.get('xpub')
          545         self.xprv = d.get('xprv')
          546 
          547     def format_seed(self, seed):
          548         return ' '.join(seed.split())
          549 
          550     def dump(self):
          551         d = Deterministic_KeyStore.dump(self)
          552         d['xpub'] = self.xpub
          553         d['xprv'] = self.xprv
          554         d['derivation'] = self.get_derivation_prefix()
          555         d['root_fingerprint'] = self.get_root_fingerprint()
          556         return d
          557 
          558     def get_master_private_key(self, password):
          559         return pw_decode(self.xprv, password, version=self.pw_hash_version)
          560 
          561     def check_password(self, password):
          562         xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
          563         try:
          564             bip32node = BIP32Node.from_xkey(xprv)
          565         except BaseDecodeError as e:
          566             raise InvalidPassword() from e
          567         if bip32node.chaincode != self.get_bip32_node_for_xpub().chaincode:
          568             raise InvalidPassword()
          569 
          570     def update_password(self, old_password, new_password):
          571         self.check_password(old_password)
          572         if new_password == '':
          573             new_password = None
          574         if self.has_seed():
          575             decoded = self.get_seed(old_password)
          576             self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
          577         if self.passphrase:
          578             decoded = self.get_passphrase(old_password)
          579             self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
          580         if self.xprv is not None:
          581             b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
          582             self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
          583         self.pw_hash_version = PW_HASH_VERSION_LATEST
          584 
          585     def is_watching_only(self):
          586         return self.xprv is None
          587 
          588     def add_xpub(self, xpub):
          589         assert is_xpub(xpub)
          590         self.xpub = xpub
          591         root_fingerprint, derivation_prefix = bip32.root_fp_and_der_prefix_from_xkey(xpub)
          592         self.add_key_origin(derivation_prefix=derivation_prefix, root_fingerprint=root_fingerprint)
          593 
          594     def add_xprv(self, xprv):
          595         assert is_xprv(xprv)
          596         self.xprv = xprv
          597         self.add_xpub(bip32.xpub_from_xprv(xprv))
          598 
          599     def add_xprv_from_seed(self, bip32_seed, xtype, derivation):
          600         rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
          601         node = rootnode.subkey_at_private_derivation(derivation)
          602         self.add_xprv(node.to_xprv())
          603         self.add_key_origin_from_root_node(derivation_prefix=derivation, root_node=rootnode)
          604 
          605     def get_private_key(self, sequence: Sequence[int], password):
          606         xprv = self.get_master_private_key(password)
          607         node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
          608         pk = node.eckey.get_secret_bytes()
          609         return pk, True
          610 
          611     def get_keypair(self, sequence, password):
          612         k, _ = self.get_private_key(sequence, password)
          613         cK = ecc.ECPrivkey(k).get_public_key_bytes()
          614         return cK, k
          615 
          616 
          617 class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
          618 
          619     type = 'old'
          620 
          621     def __init__(self, d):
          622         Deterministic_KeyStore.__init__(self, d)
          623         self.mpk = d.get('mpk')
          624         self._root_fingerprint = None
          625 
          626     def get_hex_seed(self, password):
          627         return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8')
          628 
          629     def dump(self):
          630         d = Deterministic_KeyStore.dump(self)
          631         d['mpk'] = self.mpk
          632         return d
          633 
          634     def add_seed(self, seedphrase):
          635         Deterministic_KeyStore.add_seed(self, seedphrase)
          636         s = self.get_hex_seed(None)
          637         self.mpk = self.mpk_from_seed(s)
          638 
          639     def add_master_public_key(self, mpk):
          640         self.mpk = mpk
          641 
          642     def format_seed(self, seed):
          643         from . import old_mnemonic, mnemonic
          644         seed = mnemonic.normalize_text(seed)
          645         # see if seed was entered as hex
          646         if seed:
          647             try:
          648                 bfh(seed)
          649                 return str(seed)
          650             except Exception:
          651                 pass
          652         words = seed.split()
          653         seed = old_mnemonic.mn_decode(words)
          654         if not seed:
          655             raise Exception("Invalid seed")
          656         return seed
          657 
          658     def get_seed(self, password):
          659         from . import old_mnemonic
          660         s = self.get_hex_seed(password)
          661         return ' '.join(old_mnemonic.mn_encode(s))
          662 
          663     @classmethod
          664     def mpk_from_seed(klass, seed):
          665         secexp = klass.stretch_key(seed)
          666         privkey = ecc.ECPrivkey.from_secret_scalar(secexp)
          667         return privkey.get_public_key_hex(compressed=False)[2:]
          668 
          669     @classmethod
          670     def stretch_key(self, seed):
          671         x = seed
          672         for i in range(100000):
          673             x = hashlib.sha256(x + seed).digest()
          674         return string_to_number(x)
          675 
          676     @classmethod
          677     def get_sequence(self, mpk, for_change, n):
          678         return string_to_number(sha256d(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk)))
          679 
          680     @classmethod
          681     def get_pubkey_from_mpk(cls, mpk, for_change, n) -> bytes:
          682         z = cls.get_sequence(mpk, for_change, n)
          683         master_public_key = ecc.ECPubkey(bfh('04'+mpk))
          684         public_key = master_public_key + z*ecc.GENERATOR
          685         return public_key.get_public_key_bytes(compressed=False)
          686 
          687     @lru_cache(maxsize=None)
          688     def derive_pubkey(self, for_change, n) -> bytes:
          689         for_change = int(for_change)
          690         if for_change not in (0, 1):
          691             raise CannotDerivePubkey("forbidden path")
          692         return self.get_pubkey_from_mpk(self.mpk, for_change, n)
          693 
          694     def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
          695         secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
          696         pk = int.to_bytes(secexp, length=32, byteorder='big', signed=False)
          697         return pk
          698 
          699     def get_private_key(self, sequence: Sequence[int], password):
          700         seed = self.get_hex_seed(password)
          701         secexp = self.stretch_key(seed)
          702         self._check_seed(seed, secexp=secexp)
          703         for_change, n = sequence
          704         pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
          705         return pk, False
          706 
          707     def _check_seed(self, seed, *, secexp=None):
          708         if secexp is None:
          709             secexp = self.stretch_key(seed)
          710         master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
          711         master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:]
          712         if master_public_key != bfh(self.mpk):
          713             raise InvalidPassword()
          714 
          715     def check_password(self, password):
          716         seed = self.get_hex_seed(password)
          717         self._check_seed(seed)
          718 
          719     def get_master_public_key(self):
          720         return self.mpk
          721 
          722     def get_derivation_prefix(self) -> str:
          723         return 'm'
          724 
          725     def get_root_fingerprint(self) -> str:
          726         if self._root_fingerprint is None:
          727             master_public_key = ecc.ECPubkey(bfh('04'+self.mpk))
          728             xfp = hash_160(master_public_key.get_public_key_bytes(compressed=True))[0:4]
          729             self._root_fingerprint = xfp.hex().lower()
          730         return self._root_fingerprint
          731 
          732     def get_fp_and_derivation_to_be_used_in_partial_tx(
          733             self,
          734             der_suffix: Sequence[int],
          735             *,
          736             only_der_suffix: bool,
          737     ) -> Tuple[bytes, Sequence[int]]:
          738         fingerprint_hex = self.get_root_fingerprint()
          739         der_prefix_str = self.get_derivation_prefix()
          740         fingerprint_bytes = bfh(fingerprint_hex)
          741         der_prefix_ints = convert_bip32_path_to_list_of_uint32(der_prefix_str)
          742         der_full = der_prefix_ints + list(der_suffix)
          743         return fingerprint_bytes, der_full
          744 
          745     def update_password(self, old_password, new_password):
          746         self.check_password(old_password)
          747         if new_password == '':
          748             new_password = None
          749         if self.has_seed():
          750             decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
          751             self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
          752         self.pw_hash_version = PW_HASH_VERSION_LATEST
          753 
          754 
          755 class Hardware_KeyStore(Xpub, KeyStore):
          756     hw_type: str
          757     device: str
          758     plugin: 'HW_PluginBase'
          759     thread: Optional['TaskThread'] = None
          760 
          761     type = 'hardware'
          762 
          763     def __init__(self, d):
          764         Xpub.__init__(self, derivation_prefix=d.get('derivation'), root_fingerprint=d.get('root_fingerprint'))
          765         KeyStore.__init__(self)
          766         # Errors and other user interaction is done through the wallet's
          767         # handler.  The handler is per-window and preserved across
          768         # device reconnects
          769         self.xpub = d.get('xpub')
          770         self.label = d.get('label')
          771         self.soft_device_id = d.get('soft_device_id')  # type: Optional[str]
          772         self.handler = None  # type: Optional[HardwareHandlerBase]
          773         run_hook('init_keystore', self)
          774 
          775     def set_label(self, label):
          776         self.label = label
          777 
          778     def may_have_password(self):
          779         return False
          780 
          781     def is_deterministic(self):
          782         return True
          783 
          784     def get_type_text(self) -> str:
          785         return f'hw[{self.hw_type}]'
          786 
          787     def dump(self):
          788         return {
          789             'type': self.type,
          790             'hw_type': self.hw_type,
          791             'xpub': self.xpub,
          792             'derivation': self.get_derivation_prefix(),
          793             'root_fingerprint': self.get_root_fingerprint(),
          794             'label':self.label,
          795             'soft_device_id': self.soft_device_id,
          796         }
          797 
          798     def unpaired(self):
          799         '''A device paired with the wallet was disconnected.  This can be
          800         called in any thread context.'''
          801         self.logger.info("unpaired")
          802 
          803     def paired(self):
          804         '''A device paired with the wallet was (re-)connected.  This can be
          805         called in any thread context.'''
          806         self.logger.info("paired")
          807 
          808     def is_watching_only(self):
          809         '''The wallet is not watching-only; the user will be prompted for
          810         pin and passphrase as appropriate when needed.'''
          811         assert not self.has_seed()
          812         return False
          813 
          814     def get_password_for_storage_encryption(self) -> str:
          815         client = self.plugin.get_client(self)
          816         return client.get_password_for_storage_encryption()
          817 
          818     def has_usable_connection_with_device(self) -> bool:
          819         if not hasattr(self, 'plugin'):
          820             return False
          821         client = self.plugin.get_client(self, force_pair=False)
          822         if client is None:
          823             return False
          824         return client.has_usable_connection_with_device()
          825 
          826     def ready_to_sign(self):
          827         return super().ready_to_sign() and self.has_usable_connection_with_device()
          828 
          829     def opportunistically_fill_in_missing_info_from_device(self, client: 'HardwareClientBase'):
          830         assert client is not None
          831         if self._root_fingerprint is None:
          832             self._root_fingerprint = client.request_root_fingerprint_from_device()
          833             self.is_requesting_to_be_rewritten_to_wallet_file = True
          834         if self.label != client.label():
          835             self.label = client.label()
          836             self.is_requesting_to_be_rewritten_to_wallet_file = True
          837         if self.soft_device_id != client.get_soft_device_id():
          838             self.soft_device_id = client.get_soft_device_id()
          839             self.is_requesting_to_be_rewritten_to_wallet_file = True
          840 
          841 
          842 KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin]  # intersection really...
          843 AddressIndexGeneric = Union[Sequence[int], str]  # can be hex pubkey str
          844 
          845 
          846 def bip39_normalize_passphrase(passphrase):
          847     return normalize('NFKD', passphrase or '')
          848 
          849 def bip39_to_seed(mnemonic, passphrase):
          850     import hashlib, hmac
          851     PBKDF2_ROUNDS = 2048
          852     mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
          853     passphrase = bip39_normalize_passphrase(passphrase)
          854     return hashlib.pbkdf2_hmac('sha512', mnemonic.encode('utf-8'),
          855         b'mnemonic' + passphrase.encode('utf-8'), iterations = PBKDF2_ROUNDS)
          856 
          857 
          858 def bip39_is_checksum_valid(mnemonic: str) -> Tuple[bool, bool]:
          859     """Test checksum of bip39 mnemonic assuming English wordlist.
          860     Returns tuple (is_checksum_valid, is_wordlist_valid)
          861     """
          862     words = [ normalize('NFKD', word) for word in mnemonic.split() ]
          863     words_len = len(words)
          864     wordlist = Wordlist.from_file("english.txt")
          865     n = len(wordlist)
          866     i = 0
          867     words.reverse()
          868     while words:
          869         w = words.pop()
          870         try:
          871             k = wordlist.index(w)
          872         except ValueError:
          873             return False, False
          874         i = i*n + k
          875     if words_len not in [12, 15, 18, 21, 24]:
          876         return False, True
          877     checksum_length = 11 * words_len // 33  # num bits
          878     entropy_length = 32 * checksum_length  # num bits
          879     entropy = i >> checksum_length
          880     checksum = i % 2**checksum_length
          881     entropy_bytes = int.to_bytes(entropy, length=entropy_length//8, byteorder="big")
          882     hashed = int.from_bytes(sha256(entropy_bytes), byteorder="big")
          883     calculated_checksum = hashed >> (256 - checksum_length)
          884     return checksum == calculated_checksum, True
          885 
          886 
          887 def from_bip39_seed(seed, passphrase, derivation, xtype=None):
          888     k = BIP32_KeyStore({})
          889     bip32_seed = bip39_to_seed(seed, passphrase)
          890     if xtype is None:
          891         xtype = xtype_from_derivation(derivation)
          892     k.add_xprv_from_seed(bip32_seed, xtype, derivation)
          893     return k
          894 
          895 
          896 PURPOSE48_SCRIPT_TYPES = {
          897     'p2wsh-p2sh': 1,  # specifically multisig
          898     'p2wsh': 2,       # specifically multisig
          899 }
          900 PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
          901 
          902 
          903 def xtype_from_derivation(derivation: str) -> str:
          904     """Returns the script type to be used for this derivation."""
          905     bip32_indices = convert_bip32_path_to_list_of_uint32(derivation)
          906     if len(bip32_indices) >= 1:
          907         if bip32_indices[0] == 84 + BIP32_PRIME:
          908             return 'p2wpkh'
          909         elif bip32_indices[0] == 49 + BIP32_PRIME:
          910             return 'p2wpkh-p2sh'
          911         elif bip32_indices[0] == 44 + BIP32_PRIME:
          912             return 'standard'
          913         elif bip32_indices[0] == 45 + BIP32_PRIME:
          914             return 'standard'
          915 
          916     if len(bip32_indices) >= 4:
          917         if bip32_indices[0] == 48 + BIP32_PRIME:
          918             # m / purpose' / coin_type' / account' / script_type' / change / address_index
          919             script_type_int = bip32_indices[3] - BIP32_PRIME
          920             script_type = PURPOSE48_SCRIPT_TYPES_INV.get(script_type_int)
          921             if script_type is not None:
          922                 return script_type
          923     return 'standard'
          924 
          925 
          926 hw_keystores = {}
          927 
          928 def register_keystore(hw_type, constructor):
          929     hw_keystores[hw_type] = constructor
          930 
          931 def hardware_keystore(d) -> Hardware_KeyStore:
          932     hw_type = d['hw_type']
          933     if hw_type in hw_keystores:
          934         constructor = hw_keystores[hw_type]
          935         return constructor(d)
          936     raise WalletFileException(f'unknown hardware type: {hw_type}. '
          937                               f'hw_keystores: {list(hw_keystores)}')
          938 
          939 def load_keystore(db: 'WalletDB', name: str) -> KeyStore:
          940     d = db.get(name, {})
          941     t = d.get('type')
          942     if not t:
          943         raise WalletFileException(
          944             'Wallet format requires update.\n'
          945             'Cannot find keystore for name {}'.format(name))
          946     keystore_constructors = {ks.type: ks for ks in [Old_KeyStore, Imported_KeyStore, BIP32_KeyStore]}
          947     keystore_constructors['hardware'] = hardware_keystore
          948     try:
          949         ks_constructor = keystore_constructors[t]
          950     except KeyError:
          951         raise WalletFileException(f'Unknown type {t} for keystore named {name}')
          952     k = ks_constructor(d)
          953     return k
          954 
          955 
          956 def is_old_mpk(mpk: str) -> bool:
          957     try:
          958         int(mpk, 16)  # test if hex string
          959     except:
          960         return False
          961     if len(mpk) != 128:
          962         return False
          963     try:
          964         ecc.ECPubkey(bfh('04' + mpk))
          965     except:
          966         return False
          967     return True
          968 
          969 
          970 def is_address_list(text):
          971     parts = text.split()
          972     return bool(parts) and all(bitcoin.is_address(x) for x in parts)
          973 
          974 
          975 def get_private_keys(text, *, allow_spaces_inside_key=True, raise_on_error=False):
          976     if allow_spaces_inside_key:  # see #1612
          977         parts = text.split('\n')
          978         parts = map(lambda x: ''.join(x.split()), parts)
          979         parts = list(filter(bool, parts))
          980     else:
          981         parts = text.split()
          982     if bool(parts) and all(bitcoin.is_private_key(x, raise_on_error=raise_on_error) for x in parts):
          983         return parts
          984 
          985 
          986 def is_private_key_list(text, *, allow_spaces_inside_key=True, raise_on_error=False):
          987     return bool(get_private_keys(text,
          988                                  allow_spaces_inside_key=allow_spaces_inside_key,
          989                                  raise_on_error=raise_on_error))
          990 
          991 
          992 def is_master_key(x):
          993     return is_old_mpk(x) or is_bip32_key(x)
          994 
          995 
          996 def is_bip32_key(x):
          997     return is_xprv(x) or is_xpub(x)
          998 
          999 
         1000 def bip44_derivation(account_id, bip43_purpose=44):
         1001     coin = constants.net.BIP44_COIN_TYPE
         1002     der = "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
         1003     return normalize_bip32_derivation(der)
         1004 
         1005 
         1006 def purpose48_derivation(account_id: int, xtype: str) -> str:
         1007     # m / purpose' / coin_type' / account' / script_type' / change / address_index
         1008     bip43_purpose = 48
         1009     coin = constants.net.BIP44_COIN_TYPE
         1010     account_id = int(account_id)
         1011     script_type_int = PURPOSE48_SCRIPT_TYPES.get(xtype)
         1012     if script_type_int is None:
         1013         raise Exception('unknown xtype: {}'.format(xtype))
         1014     der = "m/%d'/%d'/%d'/%d'" % (bip43_purpose, coin, account_id, script_type_int)
         1015     return normalize_bip32_derivation(der)
         1016 
         1017 
         1018 def from_seed(seed, passphrase, is_p2sh=False):
         1019     t = seed_type(seed)
         1020     if t == 'old':
         1021         keystore = Old_KeyStore({})
         1022         keystore.add_seed(seed)
         1023     elif t in ['standard', 'segwit']:
         1024         keystore = BIP32_KeyStore({})
         1025         keystore.add_seed(seed)
         1026         keystore.passphrase = passphrase
         1027         bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase)
         1028         if t == 'standard':
         1029             der = "m/"
         1030             xtype = 'standard'
         1031         else:
         1032             der = "m/1'/" if is_p2sh else "m/0'/"
         1033             xtype = 'p2wsh' if is_p2sh else 'p2wpkh'
         1034         keystore.add_xprv_from_seed(bip32_seed, xtype, der)
         1035     else:
         1036         raise BitcoinException('Unexpected seed type {}'.format(repr(t)))
         1037     return keystore
         1038 
         1039 def from_private_key_list(text):
         1040     keystore = Imported_KeyStore({})
         1041     for x in get_private_keys(text):
         1042         keystore.import_privkey(x, None)
         1043     return keystore
         1044 
         1045 def from_old_mpk(mpk):
         1046     keystore = Old_KeyStore({})
         1047     keystore.add_master_public_key(mpk)
         1048     return keystore
         1049 
         1050 def from_xpub(xpub):
         1051     k = BIP32_KeyStore({})
         1052     k.add_xpub(xpub)
         1053     return k
         1054 
         1055 def from_xprv(xprv):
         1056     k = BIP32_KeyStore({})
         1057     k.add_xprv(xprv)
         1058     return k
         1059 
         1060 def from_master_key(text):
         1061     if is_xprv(text):
         1062         k = from_xprv(text)
         1063     elif is_old_mpk(text):
         1064         k = from_old_mpk(text)
         1065     elif is_xpub(text):
         1066         k = from_xpub(text)
         1067     else:
         1068         raise BitcoinException('Invalid master key')
         1069     return k