tkeystore: use abstract base classes, introduce MPKMixin - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 0ab88b821c702b6a424bae8508ce1a9e30b7fa59 DIR parent f2d42d79ba4f59352697e31cc275f9b436f0b487 HTML Author: SomberNight <somber.night@protonmail.com> Date: Tue, 10 Dec 2019 00:31:01 +0100 keystore: use abstract base classes, introduce MPKMixin Diffstat: M electrum/keystore.py | 186 +++++++++++++++++++------------ M electrum/plugins/coldcard/coldcard… | 7 ++----- M electrum/wallet.py | 4 ++-- 3 files changed, 117 insertions(+), 80 deletions(-) --- DIR diff --git a/electrum/keystore.py b/electrum/keystore.py t@@ -29,6 +29,7 @@ import hashlib import re from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple from functools import lru_cache +from abc import ABC, abstractmethod from . import bitcoin, ecc, constants, bip32 from .bitcoin import deserialize_privkey, serialize_privkey t@@ -50,7 +51,7 @@ if TYPE_CHECKING: from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase -class KeyStore(Logger): +class KeyStore(Logger, ABC): type: str def __init__(self): t@@ -69,9 +70,10 @@ class KeyStore(Logger): def get_type_text(self) -> str: return f'{self.type}' + @abstractmethod def may_have_password(self): """Returns whether the keystore can be encrypted with a password.""" - raise NotImplementedError() + pass def get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]: keypairs = {} t@@ -96,21 +98,27 @@ class KeyStore(Logger): def ready_to_sign(self) -> bool: return not self.is_watching_only() + @abstractmethod def dump(self) -> dict: - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def is_deterministic(self) -> bool: - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def sign_message(self, sequence, message, password) -> bytes: - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def decrypt_message(self, sequence, message, password) -> bytes: - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def sign_transaction(self, tx: 'PartialTransaction', password) -> None: - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def get_pubkey_derivation(self, pubkey: bytes, txinout: Union['PartialTxInput', 'PartialTxOutput'], *, only_der_suffix=True) \ t@@ -119,41 +127,7 @@ class KeyStore(Logger): the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived, or None if the pubkey is unrelated. """ - def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool: - if len(der_suffix) != 2: - return False - if pubkey.hex() != self.derive_pubkey(*der_suffix): - return False - return True - - if hasattr(self, 'get_root_fingerprint'): - if pubkey not in txinout.bip32_paths: - return None - fp_found, path_found = txinout.bip32_paths[pubkey] - der_suffix = None - full_path = None - # try fp against our root - my_root_fingerprint_hex = self.get_root_fingerprint() - my_der_prefix_str = self.get_derivation_prefix() - ks_der_prefix = convert_bip32_path_to_list_of_uint32(my_der_prefix_str) if my_der_prefix_str else None - if (my_root_fingerprint_hex is not None and ks_der_prefix is not None and - fp_found.hex() == my_root_fingerprint_hex): - if path_found[:len(ks_der_prefix)] == ks_der_prefix: - der_suffix = path_found[len(ks_der_prefix):] - if not test_der_suffix_against_pubkey(der_suffix, pubkey): - der_suffix = None - # try fp against our intermediate fingerprint - if (der_suffix is None and hasattr(self, 'get_bip32_node_for_xpub') and - fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()): - der_suffix = path_found - if not test_der_suffix_against_pubkey(der_suffix, pubkey): - der_suffix = None - if der_suffix is None: - return None - if ks_der_prefix is not None: - full_path = ks_der_prefix + list(der_suffix) - return der_suffix if only_der_suffix else full_path - return None + pass def find_my_pubkey_in_txinout( self, txinout: Union['PartialTxInput', 'PartialTxOutput'], t@@ -202,14 +176,17 @@ class Software_KeyStore(KeyStore): if keypairs: tx.sign(keypairs) + @abstractmethod def update_password(self, old_password, new_password): - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def check_password(self, password): - raise NotImplementedError() # implemented by subclasses + pass + @abstractmethod def get_private_key(self, *args, **kwargs) -> Tuple[bytes, bool]: - raise NotImplementedError() # implemented by subclasses + pass class Imported_KeyStore(Software_KeyStore): t@@ -224,9 +201,6 @@ class Imported_KeyStore(Software_KeyStore): def is_deterministic(self): return False - def get_master_public_key(self): - return None - def dump(self): return { 'type': self.type, t@@ -308,6 +282,10 @@ class Deterministic_KeyStore(Software_KeyStore): def is_watching_only(self): return not self.has_seed() + @abstractmethod + def format_seed(self, seed: str) -> str: + pass + def add_seed(self, seed): if self.seed: raise Exception("a seed exists") t@@ -325,7 +303,81 @@ class Deterministic_KeyStore(Software_KeyStore): return '' -class Xpub: +class MasterPublicKeyMixin(ABC): + + @abstractmethod + def get_master_public_key(self) -> str: + pass + + @abstractmethod + def get_derivation_prefix(self) -> Optional[str]: + """Returns to bip32 path from some root node to self.xpub + Note that the return value might be None; if it is unknown. + """ + pass + + @abstractmethod + def get_root_fingerprint(self) -> Optional[str]: + """Returns the bip32 fingerprint of the top level node. + This top level node is the node at the beginning of the derivation prefix, + i.e. applying the derivation prefix to it will result self.xpub + Note that the return value might be None; if it is unknown. + """ + pass + + @abstractmethod + def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *, + only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]: + """Returns fingerprint and derivation path corresponding to a derivation suffix. + The fingerprint is either the root fp or the intermediate fp, depending on what is available + and 'only_der_suffix', and the derivation path is adjusted accordingly. + """ + pass + + @abstractmethod + def derive_pubkey(self, for_change: int, n: int) -> str: + pass + + def get_pubkey_derivation(self, pubkey: bytes, + txinout: Union['PartialTxInput', 'PartialTxOutput'], + *, only_der_suffix=True) \ + -> Union[Sequence[int], str, None]: + def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool: + if len(der_suffix) != 2: + return False + if pubkey.hex() != self.derive_pubkey(*der_suffix): + return False + return True + + if pubkey not in txinout.bip32_paths: + return None + fp_found, path_found = txinout.bip32_paths[pubkey] + der_suffix = None + full_path = None + # try fp against our root + my_root_fingerprint_hex = self.get_root_fingerprint() + my_der_prefix_str = self.get_derivation_prefix() + ks_der_prefix = convert_bip32_path_to_list_of_uint32(my_der_prefix_str) if my_der_prefix_str else None + if (my_root_fingerprint_hex is not None and ks_der_prefix is not None and + fp_found.hex() == my_root_fingerprint_hex): + if path_found[:len(ks_der_prefix)] == ks_der_prefix: + der_suffix = path_found[len(ks_der_prefix):] + if not test_der_suffix_against_pubkey(der_suffix, pubkey): + der_suffix = None + # try fp against our intermediate fingerprint + if (der_suffix is None and isinstance(self, Xpub) and + fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()): + der_suffix = path_found + if not test_der_suffix_against_pubkey(der_suffix, pubkey): + der_suffix = None + if der_suffix is None: + return None + if ks_der_prefix is not None: + full_path = ks_der_prefix + list(der_suffix) + return der_suffix if only_der_suffix else full_path + + +class Xpub(MasterPublicKeyMixin): def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None): self.xpub = None t@@ -348,25 +400,13 @@ class Xpub: return self._xpub_bip32_node def get_derivation_prefix(self) -> Optional[str]: - """Returns to bip32 path from some root node to self.xpub - Note that the return value might be None; if it is unknown. - """ return self._derivation_prefix def get_root_fingerprint(self) -> Optional[str]: - """Returns the bip32 fingerprint of the top level node. - This top level node is the node at the beginning of the derivation prefix, - i.e. applying the derivation prefix to it will result self.xpub - Note that the return value might be None; if it is unknown. - """ return self._root_fingerprint def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *, only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]: - """Returns fingerprint and derivation path corresponding to a derivation suffix. - The fingerprint is either the root fp or the intermediate fp, depending on what is available - and 'only_der_suffix', and the derivation path is adjusted accordingly. - """ fingerprint_hex = self.get_root_fingerprint() der_prefix_str = self.get_derivation_prefix() if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None: t@@ -437,7 +477,7 @@ class Xpub: return node.eckey.get_public_key_bytes(compressed=True) -class BIP32_KeyStore(Deterministic_KeyStore, Xpub): +class BIP32_KeyStore(Xpub, Deterministic_KeyStore): type = 'bip32' t@@ -512,7 +552,8 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub): cK = ecc.ECPrivkey(k).get_public_key_bytes() return cK, k -class Old_KeyStore(Deterministic_KeyStore): + +class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore): type = 'old' t@@ -585,7 +626,7 @@ class Old_KeyStore(Deterministic_KeyStore): def derive_pubkey(self, for_change, n) -> str: return self.get_pubkey_from_mpk(self.mpk, for_change, n) - def get_private_key_from_stretched_exponent(self, for_change, n, secexp): + def _get_private_key_from_stretched_exponent(self, for_change, n, secexp): secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER pk = number_to_string(secexp, ecc.CURVE_ORDER) return pk t@@ -593,12 +634,12 @@ class Old_KeyStore(Deterministic_KeyStore): def get_private_key(self, sequence, password): seed = self.get_hex_seed(password) secexp = self.stretch_key(seed) - self.check_seed(seed, secexp=secexp) + self._check_seed(seed, secexp=secexp) for_change, n = sequence - pk = self.get_private_key_from_stretched_exponent(for_change, n, secexp) + pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp) return pk, False - def check_seed(self, seed, *, secexp=None): + def _check_seed(self, seed, *, secexp=None): if secexp is None: secexp = self.stretch_key(seed) master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp) t@@ -608,7 +649,7 @@ class Old_KeyStore(Deterministic_KeyStore): def check_password(self, password): seed = self.get_hex_seed(password) - self.check_seed(seed) + self._check_seed(seed) def get_master_public_key(self): return self.mpk t@@ -623,7 +664,6 @@ class Old_KeyStore(Deterministic_KeyStore): self._root_fingerprint = xfp.hex().lower() return self._root_fingerprint - # TODO Old_KeyStore and Xpub could share a common baseclass? def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *, only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]: fingerprint_hex = self.get_root_fingerprint() t@@ -643,7 +683,7 @@ class Old_KeyStore(Deterministic_KeyStore): self.pw_hash_version = PW_HASH_VERSION_LATEST -class Hardware_KeyStore(KeyStore, Xpub): +class Hardware_KeyStore(Xpub, KeyStore): hw_type: str device: str plugin: 'HW_PluginBase' t@@ -694,9 +734,6 @@ class Hardware_KeyStore(KeyStore, Xpub): called in any thread context.''' self.logger.info("paired") - def can_export(self): - return False - def is_watching_only(self): '''The wallet is not watching-only; the user will be prompted for pin and passphrase as appropriate when needed.''' t@@ -732,6 +769,9 @@ class Hardware_KeyStore(KeyStore, Xpub): self.is_requesting_to_be_rewritten_to_wallet_file = True +KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin] # intersection really... + + def bip39_normalize_passphrase(passphrase): return normalize('NFKD', passphrase or '') DIR diff --git a/electrum/plugins/coldcard/coldcard.py b/electrum/plugins/coldcard/coldcard.py t@@ -11,7 +11,7 @@ from electrum import bip32 from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes from electrum.i18n import _ from electrum.plugin import Device, hook -from electrum.keystore import Hardware_KeyStore +from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK from electrum.transaction import PartialTransaction from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet from electrum.util import bfh, bh2u, versiontuple, UserFacingException t@@ -21,9 +21,6 @@ from electrum.logging import get_logger from ..hw_wallet import HW_PluginBase, HardwareClientBase from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available -if TYPE_CHECKING: - from electrum.keystore import Xpub - _logger = get_logger(__name__) t@@ -571,7 +568,7 @@ class ColdcardPlugin(HW_PluginBase): xpubs = [] derivs = set() - for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()): + for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()): # type: str, KeyStoreWithMPK fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], only_der_suffix=False) fp_hex = fp_bytes.hex().upper() der_prefix_str = bip32.convert_bip32_intpath_to_strpath(der_full) DIR diff --git a/electrum/wallet.py b/electrum/wallet.py t@@ -55,7 +55,7 @@ from .bitcoin import (COIN, is_address, address_to_script, is_minikey, relayfee, dust_threshold) from .crypto import sha256d from . import keystore -from .keystore import load_keystore, Hardware_KeyStore, KeyStore +from .keystore import load_keystore, Hardware_KeyStore, KeyStore, KeyStoreWithMPK from .util import multisig_type from .storage import StorageEncryptionVersion, WalletStorage from . import transaction, bitcoin, coinchooser, paymentrequest, ecc, bip32 t@@ -454,7 +454,7 @@ class Abstract_Wallet(AddressSynchronizer): def get_public_keys(self, address): return [self.get_public_key(address)] - def get_public_keys_with_deriv_info(self, address: str) -> Dict[str, Tuple[KeyStore, Sequence[int]]]: + def get_public_keys_with_deriv_info(self, address: str) -> Dict[str, Tuple[KeyStoreWithMPK, Sequence[int]]]: """Returns a map: pubkey_hex -> (keystore, derivation_suffix)""" return {}