URI: 
       ttrezor.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       ttrezor.py (20338B)
       ---
            1 import traceback
            2 import sys
            3 from typing import NamedTuple, Any, Optional, Dict, Union, List, Tuple, TYPE_CHECKING
            4 
            5 from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
            6 from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32 as parse_path
            7 from electrum import constants
            8 from electrum.i18n import _
            9 from electrum.plugin import Device, runs_in_hwd_thread
           10 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
           11 from electrum.keystore import Hardware_KeyStore
           12 from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET
           13 from electrum.logging import get_logger
           14 
           15 from ..hw_wallet import HW_PluginBase
           16 from ..hw_wallet.plugin import (is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data,
           17                                 LibraryFoundButUnusable, OutdatedHwFirmwareException,
           18                                 get_xpubs_and_der_suffixes_from_txinout)
           19 
           20 _logger = get_logger(__name__)
           21 
           22 
           23 try:
           24     import trezorlib
           25     import trezorlib.transport
           26     from trezorlib.transport.bridge import BridgeTransport, call_bridge
           27 
           28     from .clientbase import TrezorClientBase
           29 
           30     from trezorlib.messages import (
           31         Capability, BackupType, RecoveryDeviceType, HDNodeType, HDNodePathType,
           32         InputScriptType, OutputScriptType, MultisigRedeemScriptType,
           33         TxInputType, TxOutputType, TxOutputBinType, TransactionType, SignTx)
           34 
           35     from trezorlib.client import PASSPHRASE_ON_DEVICE
           36 
           37     TREZORLIB = True
           38 except Exception as e:
           39     if not (isinstance(e, ModuleNotFoundError) and e.name == 'trezorlib'):
           40         _logger.exception('error importing trezor plugin deps')
           41     TREZORLIB = False
           42 
           43     class _EnumMissing:
           44         def __init__(self):
           45             self.counter = 0
           46             self.values = {}
           47 
           48         def __getattr__(self, key):
           49             if key not in self.values:
           50                 self.values[key] = self.counter
           51                 self.counter += 1
           52             return self.values[key]
           53 
           54     Capability = _EnumMissing()
           55     BackupType = _EnumMissing()
           56     RecoveryDeviceType = _EnumMissing()
           57 
           58     PASSPHRASE_ON_DEVICE = object()
           59 
           60 
           61 # Trezor initialization methods
           62 TIM_NEW, TIM_RECOVER = range(2)
           63 
           64 TREZOR_PRODUCT_KEY = 'Trezor'
           65 
           66 
           67 class TrezorKeyStore(Hardware_KeyStore):
           68     hw_type = 'trezor'
           69     device = TREZOR_PRODUCT_KEY
           70 
           71     plugin: 'TrezorPlugin'
           72 
           73     def get_client(self, force_pair=True):
           74         return self.plugin.get_client(self, force_pair)
           75 
           76     def decrypt_message(self, sequence, message, password):
           77         raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device))
           78 
           79     def sign_message(self, sequence, message, password):
           80         client = self.get_client()
           81         address_path = self.get_derivation_prefix() + "/%d/%d"%sequence
           82         msg_sig = client.sign_message(address_path, message)
           83         return msg_sig.signature
           84 
           85     def sign_transaction(self, tx, password):
           86         if tx.is_complete():
           87             return
           88         # previous transactions used as inputs
           89         prev_tx = {}
           90         for txin in tx.inputs():
           91             tx_hash = txin.prevout.txid.hex()
           92             if txin.utxo is None:
           93                 raise UserFacingException(_('Missing previous tx.'))
           94             prev_tx[tx_hash] = txin.utxo
           95 
           96         self.plugin.sign_transaction(self, tx, prev_tx)
           97 
           98 
           99 class TrezorInitSettings(NamedTuple):
          100     word_count: int
          101     label: str
          102     pin_enabled: bool
          103     passphrase_enabled: bool
          104     recovery_type: Any = None
          105     backup_type: int = BackupType.Bip39
          106     no_backup: bool = False
          107 
          108 
          109 class TrezorPlugin(HW_PluginBase):
          110     # Derived classes provide:
          111     #
          112     #  class-static variables: client_class, firmware_URL, handler_class,
          113     #     libraries_available, libraries_URL, minimum_firmware,
          114     #     wallet_class, types
          115 
          116     firmware_URL = 'https://wallet.trezor.io'
          117     libraries_URL = 'https://pypi.org/project/trezor/'
          118     minimum_firmware = (1, 5, 2)
          119     keystore_class = TrezorKeyStore
          120     minimum_library = (0, 12, 0)
          121     maximum_library = (0, 13)
          122     SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
          123     DEVICE_IDS = (TREZOR_PRODUCT_KEY,)
          124 
          125     MAX_LABEL_LEN = 32
          126 
          127     def __init__(self, parent, config, name):
          128         super().__init__(parent, config, name)
          129 
          130         self.libraries_available = self.check_libraries_available()
          131         if not self.libraries_available:
          132             return
          133         self.device_manager().register_enumerate_func(self.enumerate)
          134         self._is_bridge_available = None
          135 
          136     def get_library_version(self):
          137         import trezorlib
          138         try:
          139             version = trezorlib.__version__
          140         except Exception:
          141             version = 'unknown'
          142         if TREZORLIB:
          143             return version
          144         else:
          145             raise LibraryFoundButUnusable(library_version=version)
          146 
          147     @runs_in_hwd_thread
          148     def is_bridge_available(self) -> bool:
          149         # Testing whether the Bridge is available can take several seconds
          150         # (when it is not), as it is slow to timeout, hence we cache it.
          151         if self._is_bridge_available is None:
          152             try:
          153                 call_bridge("enumerate")
          154             except Exception:
          155                 self._is_bridge_available = False
          156                 # never again try with Bridge due to slow timeout
          157                 BridgeTransport.ENABLED = False
          158             else:
          159                 self._is_bridge_available = True
          160         return self._is_bridge_available
          161 
          162     @runs_in_hwd_thread
          163     def enumerate(self):
          164         # If there is a bridge, prefer that.
          165         # On Windows, the bridge runs as Admin (and Electrum usually does not),
          166         # so the bridge has better chances of finding devices. see #5420
          167         # This also avoids duplicate entries.
          168         if self.is_bridge_available():
          169             devices = BridgeTransport.enumerate()
          170         else:
          171             devices = trezorlib.transport.enumerate_devices()
          172         return [Device(path=d.get_path(),
          173                        interface_number=-1,
          174                        id_=d.get_path(),
          175                        product_key=TREZOR_PRODUCT_KEY,
          176                        usage_page=0,
          177                        transport_ui_string=d.get_path())
          178                 for d in devices]
          179 
          180     @runs_in_hwd_thread
          181     def create_client(self, device, handler):
          182         try:
          183             self.logger.info(f"connecting to device at {device.path}")
          184             transport = trezorlib.transport.get_transport(device.path)
          185         except BaseException as e:
          186             self.logger.info(f"cannot connect at {device.path} {e}")
          187             return None
          188 
          189         if not transport:
          190             self.logger.info(f"cannot connect at {device.path}")
          191             return
          192 
          193         self.logger.info(f"connected to device at {device.path}")
          194         # note that this call can still raise!
          195         return TrezorClientBase(transport, handler, self)
          196 
          197     @runs_in_hwd_thread
          198     def get_client(self, keystore, force_pair=True, *,
          199                    devices=None, allow_user_interaction=True) -> Optional['TrezorClientBase']:
          200         client = super().get_client(keystore, force_pair,
          201                                     devices=devices,
          202                                     allow_user_interaction=allow_user_interaction)
          203         # returns the client for a given keystore. can use xpub
          204         if client:
          205             client.used()
          206         return client
          207 
          208     def get_coin_name(self):
          209         return "Testnet" if constants.net.TESTNET else "Bitcoin"
          210 
          211     def initialize_device(self, device_id, wizard, handler):
          212         # Initialization method
          213         msg = _("Choose how you want to initialize your {}.").format(self.device, self.device)
          214         choices = [
          215             # Must be short as QT doesn't word-wrap radio button text
          216             (TIM_NEW, _("Let the device generate a completely new seed randomly")),
          217             (TIM_RECOVER, _("Recover from a seed you have previously written down")),
          218         ]
          219         def f(method):
          220             import threading
          221             settings = self.request_trezor_init_settings(wizard, method, device_id)
          222             t = threading.Thread(target=self._initialize_device_safe, args=(settings, method, device_id, wizard, handler))
          223             t.setDaemon(True)
          224             t.start()
          225             exit_code = wizard.loop.exec_()
          226             if exit_code != 0:
          227                 # this method (initialize_device) was called with the expectation
          228                 # of leaving the device in an initialized state when finishing.
          229                 # signal that this is not the case:
          230                 raise UserCancelled()
          231         wizard.choice_dialog(title=_('Initialize Device'), message=msg, choices=choices, run_next=f)
          232 
          233     def _initialize_device_safe(self, settings, method, device_id, wizard, handler):
          234         exit_code = 0
          235         try:
          236             self._initialize_device(settings, method, device_id, wizard, handler)
          237         except UserCancelled:
          238             exit_code = 1
          239         except BaseException as e:
          240             self.logger.exception('')
          241             handler.show_error(repr(e))
          242             exit_code = 1
          243         finally:
          244             wizard.loop.exit(exit_code)
          245 
          246     @runs_in_hwd_thread
          247     def _initialize_device(self, settings: TrezorInitSettings, method, device_id, wizard, handler):
          248         if method == TIM_RECOVER and settings.recovery_type == RecoveryDeviceType.ScrambledWords:
          249             handler.show_error(_(
          250                 "You will be asked to enter 24 words regardless of your "
          251                 "seed's actual length.  If you enter a word incorrectly or "
          252                 "misspell it, you cannot change it or go back - you will need "
          253                 "to start again from the beginning.\n\nSo please enter "
          254                 "the words carefully!"),
          255                 blocking=True)
          256 
          257         devmgr = self.device_manager()
          258         client = devmgr.client_by_id(device_id)
          259         if not client:
          260             raise Exception(_("The device was disconnected."))
          261 
          262         if method == TIM_NEW:
          263             strength_from_word_count = {12: 128, 18: 192, 20: 128, 24: 256, 33: 256}
          264             client.reset_device(
          265                 strength=strength_from_word_count[settings.word_count],
          266                 passphrase_protection=settings.passphrase_enabled,
          267                 pin_protection=settings.pin_enabled,
          268                 label=settings.label,
          269                 backup_type=settings.backup_type,
          270                 no_backup=settings.no_backup)
          271         elif method == TIM_RECOVER:
          272             client.recover_device(
          273                 recovery_type=settings.recovery_type,
          274                 word_count=settings.word_count,
          275                 passphrase_protection=settings.passphrase_enabled,
          276                 pin_protection=settings.pin_enabled,
          277                 label=settings.label)
          278             if settings.recovery_type == RecoveryDeviceType.Matrix:
          279                 handler.close_matrix_dialog()
          280         else:
          281             raise RuntimeError("Unsupported recovery method")
          282 
          283     def _make_node_path(self, xpub, address_n):
          284         bip32node = BIP32Node.from_xkey(xpub)
          285         node = HDNodeType(
          286             depth=bip32node.depth,
          287             fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
          288             child_num=int.from_bytes(bip32node.child_number, 'big'),
          289             chain_code=bip32node.chaincode,
          290             public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
          291         )
          292         return HDNodePathType(node=node, address_n=address_n)
          293 
          294     def setup_device(self, device_info, wizard, purpose):
          295         device_id = device_info.device.id_
          296         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
          297 
          298         if not client.is_uptodate():
          299             msg = (_('Outdated {} firmware for device labelled {}. Please '
          300                      'download the updated firmware from {}')
          301                    .format(self.device, client.label(), self.firmware_URL))
          302             raise OutdatedHwFirmwareException(msg)
          303 
          304         if not device_info.initialized:
          305             self.initialize_device(device_id, wizard, client.handler)
          306         is_creating_wallet = purpose == HWD_SETUP_NEW_WALLET
          307         wizard.run_task_without_blocking_gui(
          308             task=lambda: client.get_xpub('m', 'standard', creating=is_creating_wallet))
          309         client.used()
          310         return client
          311 
          312     def get_xpub(self, device_id, derivation, xtype, wizard):
          313         if xtype not in self.SUPPORTED_XTYPES:
          314             raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
          315         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
          316         xpub = client.get_xpub(derivation, xtype)
          317         client.used()
          318         return xpub
          319 
          320     def get_trezor_input_script_type(self, electrum_txin_type: str):
          321         if electrum_txin_type in ('p2wpkh', 'p2wsh'):
          322             return InputScriptType.SPENDWITNESS
          323         if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
          324             return InputScriptType.SPENDP2SHWITNESS
          325         if electrum_txin_type in ('p2pkh', ):
          326             return InputScriptType.SPENDADDRESS
          327         if electrum_txin_type in ('p2sh', ):
          328             return InputScriptType.SPENDMULTISIG
          329         raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
          330 
          331     def get_trezor_output_script_type(self, electrum_txin_type: str):
          332         if electrum_txin_type in ('p2wpkh', 'p2wsh'):
          333             return OutputScriptType.PAYTOWITNESS
          334         if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
          335             return OutputScriptType.PAYTOP2SHWITNESS
          336         if electrum_txin_type in ('p2pkh', ):
          337             return OutputScriptType.PAYTOADDRESS
          338         if electrum_txin_type in ('p2sh', ):
          339             return OutputScriptType.PAYTOMULTISIG
          340         raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
          341 
          342     @runs_in_hwd_thread
          343     def sign_transaction(self, keystore, tx: PartialTransaction, prev_tx):
          344         prev_tx = { bfh(txhash): self.electrum_tx_to_txtype(tx) for txhash, tx in prev_tx.items() }
          345         client = self.get_client(keystore)
          346         inputs = self.tx_inputs(tx, for_sig=True, keystore=keystore)
          347         outputs = self.tx_outputs(tx, keystore=keystore)
          348         details = SignTx(lock_time=tx.locktime, version=tx.version)
          349         signatures, _ = client.sign_tx(self.get_coin_name(), inputs, outputs, details=details, prev_txes=prev_tx)
          350         signatures = [(bh2u(x) + '01') for x in signatures]
          351         tx.update_signatures(signatures)
          352 
          353     @runs_in_hwd_thread
          354     def show_address(self, wallet, address, keystore=None):
          355         if keystore is None:
          356             keystore = wallet.get_keystore()
          357         if not self.show_address_helper(wallet, address, keystore):
          358             return
          359         deriv_suffix = wallet.get_address_index(address)
          360         derivation = keystore.get_derivation_prefix()
          361         address_path = "%s/%d/%d"%(derivation, *deriv_suffix)
          362         script_type = self.get_trezor_input_script_type(wallet.txin_type)
          363 
          364         # prepare multisig, if available:
          365         xpubs = wallet.get_master_public_keys()
          366         if len(xpubs) > 1:
          367             pubkeys = wallet.get_public_keys(address)
          368             # sort xpubs using the order of pubkeys
          369             sorted_pairs = sorted(zip(pubkeys, xpubs))
          370             multisig = self._make_multisig(
          371                 wallet.m,
          372                 [(xpub, deriv_suffix) for pubkey, xpub in sorted_pairs])
          373         else:
          374             multisig = None
          375 
          376         client = self.get_client(keystore)
          377         client.show_address(address_path, script_type, multisig)
          378 
          379     def tx_inputs(self, tx: Transaction, *, for_sig=False, keystore: 'TrezorKeyStore' = None):
          380         inputs = []
          381         for txin in tx.inputs():
          382             txinputtype = TxInputType()
          383             if txin.is_coinbase_input():
          384                 prev_hash = b"\x00"*32
          385                 prev_index = 0xffffffff  # signed int -1
          386             else:
          387                 if for_sig:
          388                     assert isinstance(tx, PartialTransaction)
          389                     assert isinstance(txin, PartialTxInput)
          390                     assert keystore
          391                     if len(txin.pubkeys) > 1:
          392                         xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txin)
          393                         multisig = self._make_multisig(txin.num_sig, xpubs_and_deriv_suffixes)
          394                     else:
          395                         multisig = None
          396                     script_type = self.get_trezor_input_script_type(txin.script_type)
          397                     txinputtype = TxInputType(
          398                         script_type=script_type,
          399                         multisig=multisig)
          400                     my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin)
          401                     if full_path:
          402                         txinputtype.address_n = full_path
          403 
          404                 prev_hash = txin.prevout.txid
          405                 prev_index = txin.prevout.out_idx
          406 
          407             if txin.value_sats() is not None:
          408                 txinputtype.amount = txin.value_sats()
          409             txinputtype.prev_hash = prev_hash
          410             txinputtype.prev_index = prev_index
          411 
          412             if txin.script_sig is not None:
          413                 txinputtype.script_sig = txin.script_sig
          414 
          415             txinputtype.sequence = txin.nsequence
          416 
          417             inputs.append(txinputtype)
          418 
          419         return inputs
          420 
          421     def _make_multisig(self, m, xpubs):
          422         if len(xpubs) == 1:
          423             return None
          424         pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs]
          425         return MultisigRedeemScriptType(
          426             pubkeys=pubkeys,
          427             signatures=[b''] * len(pubkeys),
          428             m=m)
          429 
          430     def tx_outputs(self, tx: PartialTransaction, *, keystore: 'TrezorKeyStore'):
          431 
          432         def create_output_by_derivation():
          433             script_type = self.get_trezor_output_script_type(txout.script_type)
          434             if len(txout.pubkeys) > 1:
          435                 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txout)
          436                 multisig = self._make_multisig(txout.num_sig, xpubs_and_deriv_suffixes)
          437             else:
          438                 multisig = None
          439             my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txout)
          440             assert full_path
          441             txoutputtype = TxOutputType(
          442                 multisig=multisig,
          443                 amount=txout.value,
          444                 address_n=full_path,
          445                 script_type=script_type)
          446             return txoutputtype
          447 
          448         def create_output_by_address():
          449             txoutputtype = TxOutputType()
          450             txoutputtype.amount = txout.value
          451             if address:
          452                 txoutputtype.script_type = OutputScriptType.PAYTOADDRESS
          453                 txoutputtype.address = address
          454             else:
          455                 txoutputtype.script_type = OutputScriptType.PAYTOOPRETURN
          456                 txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(txout)
          457             return txoutputtype
          458 
          459         outputs = []
          460         has_change = False
          461         any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
          462 
          463         for txout in tx.outputs():
          464             address = txout.address
          465             use_create_by_derivation = False
          466 
          467             if txout.is_mine and not has_change:
          468                 # prioritise hiding outputs on the 'change' branch from user
          469                 # because no more than one change address allowed
          470                 # note: ^ restriction can be removed once we require fw
          471                 # that has https://github.com/trezor/trezor-mcu/pull/306
          472                 if txout.is_change == any_output_on_change_branch:
          473                     use_create_by_derivation = True
          474                     has_change = True
          475 
          476             if use_create_by_derivation:
          477                 txoutputtype = create_output_by_derivation()
          478             else:
          479                 txoutputtype = create_output_by_address()
          480             outputs.append(txoutputtype)
          481 
          482         return outputs
          483 
          484     def electrum_tx_to_txtype(self, tx: Optional[Transaction]):
          485         t = TransactionType()
          486         if tx is None:
          487             # probably for segwit input and we don't need this prev txn
          488             return t
          489         tx.deserialize()
          490         t.version = tx.version
          491         t.lock_time = tx.locktime
          492         t.inputs = self.tx_inputs(tx)
          493         t.bin_outputs = [
          494             TxOutputBinType(amount=o.value, script_pubkey=o.scriptpubkey)
          495             for o in tx.outputs()
          496         ]
          497         return t