URI: 
       tkeystore: use abstract base classes, introduce MPKMixin - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 0ab88b821c702b6a424bae8508ce1a9e30b7fa59
   DIR parent f2d42d79ba4f59352697e31cc275f9b436f0b487
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Tue, 10 Dec 2019 00:31:01 +0100
       
       keystore: use abstract base classes, introduce MPKMixin
       
       Diffstat:
         M electrum/keystore.py                |     186 +++++++++++++++++++------------
         M electrum/plugins/coldcard/coldcard… |       7 ++-----
         M electrum/wallet.py                  |       4 ++--
       
       3 files changed, 117 insertions(+), 80 deletions(-)
       ---
   DIR diff --git a/electrum/keystore.py b/electrum/keystore.py
       t@@ -29,6 +29,7 @@ import hashlib
        import re
        from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
        from functools import lru_cache
       +from abc import ABC, abstractmethod
        
        from . import bitcoin, ecc, constants, bip32
        from .bitcoin import deserialize_privkey, serialize_privkey
       t@@ -50,7 +51,7 @@ if TYPE_CHECKING:
            from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase
        
        
       -class KeyStore(Logger):
       +class KeyStore(Logger, ABC):
            type: str
        
            def __init__(self):
       t@@ -69,9 +70,10 @@ class KeyStore(Logger):
            def get_type_text(self) -> str:
                return f'{self.type}'
        
       +    @abstractmethod
            def may_have_password(self):
                """Returns whether the keystore can be encrypted with a password."""
       -        raise NotImplementedError()
       +        pass
        
            def get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]:
                keypairs = {}
       t@@ -96,21 +98,27 @@ class KeyStore(Logger):
            def ready_to_sign(self) -> bool:
                return not self.is_watching_only()
        
       +    @abstractmethod
            def dump(self) -> dict:
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def is_deterministic(self) -> bool:
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def sign_message(self, sequence, message, password) -> bytes:
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def decrypt_message(self, sequence, message, password) -> bytes:
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def get_pubkey_derivation(self, pubkey: bytes,
                                      txinout: Union['PartialTxInput', 'PartialTxOutput'],
                                      *, only_der_suffix=True) \
       t@@ -119,41 +127,7 @@ class KeyStore(Logger):
                the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
                or None if the pubkey is unrelated.
                """
       -        def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
       -            if len(der_suffix) != 2:
       -                return False
       -            if pubkey.hex() != self.derive_pubkey(*der_suffix):
       -                return False
       -            return True
       -
       -        if hasattr(self, 'get_root_fingerprint'):
       -            if pubkey not in txinout.bip32_paths:
       -                return None
       -            fp_found, path_found = txinout.bip32_paths[pubkey]
       -            der_suffix = None
       -            full_path = None
       -            # try fp against our root
       -            my_root_fingerprint_hex = self.get_root_fingerprint()
       -            my_der_prefix_str = self.get_derivation_prefix()
       -            ks_der_prefix = convert_bip32_path_to_list_of_uint32(my_der_prefix_str) if my_der_prefix_str else None
       -            if (my_root_fingerprint_hex is not None and ks_der_prefix is not None and
       -                    fp_found.hex() == my_root_fingerprint_hex):
       -                if path_found[:len(ks_der_prefix)] == ks_der_prefix:
       -                    der_suffix = path_found[len(ks_der_prefix):]
       -                    if not test_der_suffix_against_pubkey(der_suffix, pubkey):
       -                        der_suffix = None
       -            # try fp against our intermediate fingerprint
       -            if (der_suffix is None and hasattr(self, 'get_bip32_node_for_xpub') and
       -                    fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
       -                der_suffix = path_found
       -                if not test_der_suffix_against_pubkey(der_suffix, pubkey):
       -                    der_suffix = None
       -            if der_suffix is None:
       -                return None
       -            if ks_der_prefix is not None:
       -                full_path = ks_der_prefix + list(der_suffix)
       -            return der_suffix if only_der_suffix else full_path
       -        return None
       +        pass
        
            def find_my_pubkey_in_txinout(
                    self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
       t@@ -202,14 +176,17 @@ class Software_KeyStore(KeyStore):
                if keypairs:
                    tx.sign(keypairs)
        
       +    @abstractmethod
            def update_password(self, old_password, new_password):
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def check_password(self, password):
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
       +    @abstractmethod
            def get_private_key(self, *args, **kwargs) -> Tuple[bytes, bool]:
       -        raise NotImplementedError()  # implemented by subclasses
       +        pass
        
        
        class Imported_KeyStore(Software_KeyStore):
       t@@ -224,9 +201,6 @@ class Imported_KeyStore(Software_KeyStore):
            def is_deterministic(self):
                return False
        
       -    def get_master_public_key(self):
       -        return None
       -
            def dump(self):
                return {
                    'type': self.type,
       t@@ -308,6 +282,10 @@ class Deterministic_KeyStore(Software_KeyStore):
            def is_watching_only(self):
                return not self.has_seed()
        
       +    @abstractmethod
       +    def format_seed(self, seed: str) -> str:
       +        pass
       +
            def add_seed(self, seed):
                if self.seed:
                    raise Exception("a seed exists")
       t@@ -325,7 +303,81 @@ class Deterministic_KeyStore(Software_KeyStore):
                    return ''
        
        
       -class Xpub:
       +class MasterPublicKeyMixin(ABC):
       +
       +    @abstractmethod
       +    def get_master_public_key(self) -> str:
       +        pass
       +
       +    @abstractmethod
       +    def get_derivation_prefix(self) -> Optional[str]:
       +        """Returns to bip32 path from some root node to self.xpub
       +        Note that the return value might be None; if it is unknown.
       +        """
       +        pass
       +
       +    @abstractmethod
       +    def get_root_fingerprint(self) -> Optional[str]:
       +        """Returns the bip32 fingerprint of the top level node.
       +        This top level node is the node at the beginning of the derivation prefix,
       +        i.e. applying the derivation prefix to it will result self.xpub
       +        Note that the return value might be None; if it is unknown.
       +        """
       +        pass
       +
       +    @abstractmethod
       +    def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *,
       +                                                       only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]:
       +        """Returns fingerprint and derivation path corresponding to a derivation suffix.
       +        The fingerprint is either the root fp or the intermediate fp, depending on what is available
       +        and 'only_der_suffix', and the derivation path is adjusted accordingly.
       +        """
       +        pass
       +
       +    @abstractmethod
       +    def derive_pubkey(self, for_change: int, n: int) -> str:
       +        pass
       +
       +    def get_pubkey_derivation(self, pubkey: bytes,
       +                              txinout: Union['PartialTxInput', 'PartialTxOutput'],
       +                              *, only_der_suffix=True) \
       +            -> Union[Sequence[int], str, None]:
       +        def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
       +            if len(der_suffix) != 2:
       +                return False
       +            if pubkey.hex() != self.derive_pubkey(*der_suffix):
       +                return False
       +            return True
       +
       +        if pubkey not in txinout.bip32_paths:
       +            return None
       +        fp_found, path_found = txinout.bip32_paths[pubkey]
       +        der_suffix = None
       +        full_path = None
       +        # try fp against our root
       +        my_root_fingerprint_hex = self.get_root_fingerprint()
       +        my_der_prefix_str = self.get_derivation_prefix()
       +        ks_der_prefix = convert_bip32_path_to_list_of_uint32(my_der_prefix_str) if my_der_prefix_str else None
       +        if (my_root_fingerprint_hex is not None and ks_der_prefix is not None and
       +                fp_found.hex() == my_root_fingerprint_hex):
       +            if path_found[:len(ks_der_prefix)] == ks_der_prefix:
       +                der_suffix = path_found[len(ks_der_prefix):]
       +                if not test_der_suffix_against_pubkey(der_suffix, pubkey):
       +                    der_suffix = None
       +        # try fp against our intermediate fingerprint
       +        if (der_suffix is None and isinstance(self, Xpub) and
       +                fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
       +            der_suffix = path_found
       +            if not test_der_suffix_against_pubkey(der_suffix, pubkey):
       +                der_suffix = None
       +        if der_suffix is None:
       +            return None
       +        if ks_der_prefix is not None:
       +            full_path = ks_der_prefix + list(der_suffix)
       +        return der_suffix if only_der_suffix else full_path
       +
       +
       +class Xpub(MasterPublicKeyMixin):
        
            def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
                self.xpub = None
       t@@ -348,25 +400,13 @@ class Xpub:
                return self._xpub_bip32_node
        
            def get_derivation_prefix(self) -> Optional[str]:
       -        """Returns to bip32 path from some root node to self.xpub
       -        Note that the return value might be None; if it is unknown.
       -        """
                return self._derivation_prefix
        
            def get_root_fingerprint(self) -> Optional[str]:
       -        """Returns the bip32 fingerprint of the top level node.
       -        This top level node is the node at the beginning of the derivation prefix,
       -        i.e. applying the derivation prefix to it will result self.xpub
       -        Note that the return value might be None; if it is unknown.
       -        """
                return self._root_fingerprint
        
            def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *,
                                                               only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]:
       -        """Returns fingerprint and derivation path corresponding to a derivation suffix.
       -        The fingerprint is either the root fp or the intermediate fp, depending on what is available
       -        and 'only_der_suffix', and the derivation path is adjusted accordingly.
       -        """
                fingerprint_hex = self.get_root_fingerprint()
                der_prefix_str = self.get_derivation_prefix()
                if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
       t@@ -437,7 +477,7 @@ class Xpub:
                return node.eckey.get_public_key_bytes(compressed=True)
        
        
       -class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
       +class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
        
            type = 'bip32'
        
       t@@ -512,7 +552,8 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
                cK = ecc.ECPrivkey(k).get_public_key_bytes()
                return cK, k
        
       -class Old_KeyStore(Deterministic_KeyStore):
       +
       +class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
        
            type = 'old'
        
       t@@ -585,7 +626,7 @@ class Old_KeyStore(Deterministic_KeyStore):
            def derive_pubkey(self, for_change, n) -> str:
                return self.get_pubkey_from_mpk(self.mpk, for_change, n)
        
       -    def get_private_key_from_stretched_exponent(self, for_change, n, secexp):
       +    def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
                secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
                pk = number_to_string(secexp, ecc.CURVE_ORDER)
                return pk
       t@@ -593,12 +634,12 @@ class Old_KeyStore(Deterministic_KeyStore):
            def get_private_key(self, sequence, password):
                seed = self.get_hex_seed(password)
                secexp = self.stretch_key(seed)
       -        self.check_seed(seed, secexp=secexp)
       +        self._check_seed(seed, secexp=secexp)
                for_change, n = sequence
       -        pk = self.get_private_key_from_stretched_exponent(for_change, n, secexp)
       +        pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
                return pk, False
        
       -    def check_seed(self, seed, *, secexp=None):
       +    def _check_seed(self, seed, *, secexp=None):
                if secexp is None:
                    secexp = self.stretch_key(seed)
                master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
       t@@ -608,7 +649,7 @@ class Old_KeyStore(Deterministic_KeyStore):
        
            def check_password(self, password):
                seed = self.get_hex_seed(password)
       -        self.check_seed(seed)
       +        self._check_seed(seed)
        
            def get_master_public_key(self):
                return self.mpk
       t@@ -623,7 +664,6 @@ class Old_KeyStore(Deterministic_KeyStore):
                    self._root_fingerprint = xfp.hex().lower()
                return self._root_fingerprint
        
       -    # TODO Old_KeyStore and Xpub could share a common baseclass?
            def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *,
                                                               only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]:
                fingerprint_hex = self.get_root_fingerprint()
       t@@ -643,7 +683,7 @@ class Old_KeyStore(Deterministic_KeyStore):
                self.pw_hash_version = PW_HASH_VERSION_LATEST
        
        
       -class Hardware_KeyStore(KeyStore, Xpub):
       +class Hardware_KeyStore(Xpub, KeyStore):
            hw_type: str
            device: str
            plugin: 'HW_PluginBase'
       t@@ -694,9 +734,6 @@ class Hardware_KeyStore(KeyStore, Xpub):
                called in any thread context.'''
                self.logger.info("paired")
        
       -    def can_export(self):
       -        return False
       -
            def is_watching_only(self):
                '''The wallet is not watching-only; the user will be prompted for
                pin and passphrase as appropriate when needed.'''
       t@@ -732,6 +769,9 @@ class Hardware_KeyStore(KeyStore, Xpub):
                    self.is_requesting_to_be_rewritten_to_wallet_file = True
        
        
       +KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin]  # intersection really...
       +
       +
        def bip39_normalize_passphrase(passphrase):
            return normalize('NFKD', passphrase or '')
        
   DIR diff --git a/electrum/plugins/coldcard/coldcard.py b/electrum/plugins/coldcard/coldcard.py
       t@@ -11,7 +11,7 @@ from electrum import bip32
        from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes
        from electrum.i18n import _
        from electrum.plugin import Device, hook
       -from electrum.keystore import Hardware_KeyStore
       +from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK
        from electrum.transaction import PartialTransaction
        from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet
        from electrum.util import bfh, bh2u, versiontuple, UserFacingException
       t@@ -21,9 +21,6 @@ from electrum.logging import get_logger
        from ..hw_wallet import HW_PluginBase, HardwareClientBase
        from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available
        
       -if TYPE_CHECKING:
       -    from electrum.keystore import Xpub
       -
        
        _logger = get_logger(__name__)
        
       t@@ -571,7 +568,7 @@ class ColdcardPlugin(HW_PluginBase):
        
                xpubs = []
                derivs = set()
       -        for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()):
       +        for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()):  # type: str, KeyStoreWithMPK
                    fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], only_der_suffix=False)
                    fp_hex = fp_bytes.hex().upper()
                    der_prefix_str = bip32.convert_bip32_intpath_to_strpath(der_full)
   DIR diff --git a/electrum/wallet.py b/electrum/wallet.py
       t@@ -55,7 +55,7 @@ from .bitcoin import (COIN, is_address, address_to_script,
                              is_minikey, relayfee, dust_threshold)
        from .crypto import sha256d
        from . import keystore
       -from .keystore import load_keystore, Hardware_KeyStore, KeyStore
       +from .keystore import load_keystore, Hardware_KeyStore, KeyStore, KeyStoreWithMPK
        from .util import multisig_type
        from .storage import StorageEncryptionVersion, WalletStorage
        from . import transaction, bitcoin, coinchooser, paymentrequest, ecc, bip32
       t@@ -454,7 +454,7 @@ class Abstract_Wallet(AddressSynchronizer):
            def get_public_keys(self, address):
                return [self.get_public_key(address)]
        
       -    def get_public_keys_with_deriv_info(self, address: str) -> Dict[str, Tuple[KeyStore, Sequence[int]]]:
       +    def get_public_keys_with_deriv_info(self, address: str) -> Dict[str, Tuple[KeyStoreWithMPK, Sequence[int]]]:
                """Returns a map: pubkey_hex -> (keystore, derivation_suffix)"""
                return {}