URI: 
       tledger.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tledger.py (32370B)
       ---
            1 from struct import pack, unpack
            2 import hashlib
            3 import sys
            4 import traceback
            5 from typing import Optional, Tuple
            6 
            7 from electrum import ecc
            8 from electrum import bip32
            9 from electrum.crypto import hash_160
           10 from electrum.bitcoin import int_to_hex, var_int, is_segwit_script_type, is_b58_address
           11 from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath
           12 from electrum.i18n import _
           13 from electrum.keystore import Hardware_KeyStore
           14 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
           15 from electrum.wallet import Standard_Wallet
           16 from electrum.util import bfh, bh2u, versiontuple, UserFacingException
           17 from electrum.base_wizard import ScriptTypeNotSupported
           18 from electrum.logging import get_logger
           19 from electrum.plugin import runs_in_hwd_thread, Device
           20 
           21 from ..hw_wallet import HW_PluginBase, HardwareClientBase
           22 from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable
           23 
           24 
           25 _logger = get_logger(__name__)
           26 
           27 
           28 try:
           29     import hid
           30     from btchip.btchipComm import HIDDongleHIDAPI, DongleWait
           31     from btchip.btchip import btchip
           32     from btchip.btchipUtils import compress_public_key,format_transaction, get_regular_input_script, get_p2sh_input_script
           33     from btchip.bitcoinTransaction import bitcoinTransaction
           34     from btchip.btchipFirmwareWizard import checkFirmware, updateFirmware
           35     from btchip.btchipException import BTChipException
           36     BTCHIP = True
           37     BTCHIP_DEBUG = False
           38 except ImportError as e:
           39     if not (isinstance(e, ModuleNotFoundError) and e.name == 'btchip'):
           40         _logger.exception('error importing ledger plugin deps')
           41     BTCHIP = False
           42 
           43 MSG_NEEDS_FW_UPDATE_GENERIC = _('Firmware version too old. Please update at') + \
           44                       ' https://www.ledgerwallet.com'
           45 MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for Segwit support. Please update at') + \
           46                       ' https://www.ledgerwallet.com'
           47 MULTI_OUTPUT_SUPPORT = '1.1.4'
           48 SEGWIT_SUPPORT = '1.1.10'
           49 SEGWIT_SUPPORT_SPECIAL = '1.0.4'
           50 SEGWIT_TRUSTEDINPUTS = '1.4.0'
           51 
           52 
           53 def test_pin_unlocked(func):
           54     """Function decorator to test the Ledger for being unlocked, and if not,
           55     raise a human-readable exception.
           56     """
           57     def catch_exception(self, *args, **kwargs):
           58         try:
           59             return func(self, *args, **kwargs)
           60         except BTChipException as e:
           61             if e.sw == 0x6982:
           62                 raise UserFacingException(_('Your Ledger is locked. Please unlock it.'))
           63             else:
           64                 raise
           65     return catch_exception
           66 
           67 
           68 class Ledger_Client(HardwareClientBase):
           69     def __init__(self, hidDevice, *, product_key: Tuple[int, int],
           70                  plugin: HW_PluginBase):
           71         HardwareClientBase.__init__(self, plugin=plugin)
           72         self.dongleObject = btchip(hidDevice)
           73         self.preflightDone = False
           74         self._product_key = product_key
           75         self._soft_device_id = None
           76 
           77     def is_pairable(self):
           78         return True
           79 
           80     @runs_in_hwd_thread
           81     def close(self):
           82         self.dongleObject.dongle.close()
           83 
           84     def is_initialized(self):
           85         return True
           86 
           87     @runs_in_hwd_thread
           88     def get_soft_device_id(self):
           89         if self._soft_device_id is None:
           90             # modern ledger can provide xpub without user interaction
           91             # (hw1 would prompt for PIN)
           92             if not self.is_hw1():
           93                 self._soft_device_id = self.request_root_fingerprint_from_device()
           94         return self._soft_device_id
           95 
           96     def is_hw1(self) -> bool:
           97         return self._product_key[0] == 0x2581
           98 
           99     def device_model_name(self):
          100         return LedgerPlugin.device_name_from_product_key(self._product_key)
          101 
          102     @runs_in_hwd_thread
          103     def has_usable_connection_with_device(self):
          104         try:
          105             self.dongleObject.getFirmwareVersion()
          106         except BaseException:
          107             return False
          108         return True
          109 
          110     @runs_in_hwd_thread
          111     @test_pin_unlocked
          112     def get_xpub(self, bip32_path, xtype):
          113         self.checkDevice()
          114         # bip32_path is of the form 44'/0'/1'
          115         # S-L-O-W - we don't handle the fingerprint directly, so compute
          116         # it manually from the previous node
          117         # This only happens once so it's bearable
          118         #self.get_client() # prompt for the PIN before displaying the dialog if necessary
          119         #self.handler.show_message("Computing master public key")
          120         if xtype in ['p2wpkh', 'p2wsh'] and not self.supports_native_segwit():
          121             raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT)
          122         if xtype in ['p2wpkh-p2sh', 'p2wsh-p2sh'] and not self.supports_segwit():
          123             raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT)
          124         bip32_path = bip32.normalize_bip32_derivation(bip32_path)
          125         bip32_intpath = bip32.convert_bip32_path_to_list_of_uint32(bip32_path)
          126         bip32_path = bip32_path[2:]  # cut off "m/"
          127         if len(bip32_intpath) >= 1:
          128             prevPath = bip32.convert_bip32_intpath_to_strpath(bip32_intpath[:-1])[2:]
          129             nodeData = self.dongleObject.getWalletPublicKey(prevPath)
          130             publicKey = compress_public_key(nodeData['publicKey'])
          131             fingerprint_bytes = hash_160(publicKey)[0:4]
          132             childnum_bytes = bip32_intpath[-1].to_bytes(length=4, byteorder="big")
          133         else:
          134             fingerprint_bytes = bytes(4)
          135             childnum_bytes = bytes(4)
          136         nodeData = self.dongleObject.getWalletPublicKey(bip32_path)
          137         publicKey = compress_public_key(nodeData['publicKey'])
          138         depth = len(bip32_intpath)
          139         return BIP32Node(xtype=xtype,
          140                          eckey=ecc.ECPubkey(bytes(publicKey)),
          141                          chaincode=nodeData['chainCode'],
          142                          depth=depth,
          143                          fingerprint=fingerprint_bytes,
          144                          child_number=childnum_bytes).to_xpub()
          145 
          146     def has_detached_pin_support(self, client):
          147         try:
          148             client.getVerifyPinRemainingAttempts()
          149             return True
          150         except BTChipException as e:
          151             if e.sw == 0x6d00:
          152                 return False
          153             raise e
          154 
          155     def is_pin_validated(self, client):
          156         try:
          157             # Invalid SET OPERATION MODE to verify the PIN status
          158             client.dongle.exchange(bytearray([0xe0, 0x26, 0x00, 0x00, 0x01, 0xAB]))
          159         except BTChipException as e:
          160             if (e.sw == 0x6982):
          161                 return False
          162             if (e.sw == 0x6A80):
          163                 return True
          164             raise e
          165 
          166     def supports_multi_output(self):
          167         return self.multiOutputSupported
          168 
          169     def supports_segwit(self):
          170         return self.segwitSupported
          171 
          172     def supports_native_segwit(self):
          173         return self.nativeSegwitSupported
          174 
          175     def supports_segwit_trustedInputs(self):
          176         return self.segwitTrustedInputs
          177 
          178     @runs_in_hwd_thread
          179     def perform_hw1_preflight(self):
          180         try:
          181             firmwareInfo = self.dongleObject.getFirmwareVersion()
          182             firmware = firmwareInfo['version']
          183             self.multiOutputSupported = versiontuple(firmware) >= versiontuple(MULTI_OUTPUT_SUPPORT)
          184             self.nativeSegwitSupported = versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT)
          185             self.segwitSupported = self.nativeSegwitSupported or (firmwareInfo['specialVersion'] == 0x20 and versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT_SPECIAL))
          186             self.segwitTrustedInputs = versiontuple(firmware) >= versiontuple(SEGWIT_TRUSTEDINPUTS)
          187 
          188             if not checkFirmware(firmwareInfo):
          189                 self.close()
          190                 raise UserFacingException(MSG_NEEDS_FW_UPDATE_GENERIC)
          191             try:
          192                 self.dongleObject.getOperationMode()
          193             except BTChipException as e:
          194                 if (e.sw == 0x6985):
          195                     self.close()
          196                     self.handler.get_setup( )
          197                     # Acquire the new client on the next run
          198                 else:
          199                     raise e
          200             if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject):
          201                 assert self.handler, "no handler for client"
          202                 remaining_attempts = self.dongleObject.getVerifyPinRemainingAttempts()
          203                 if remaining_attempts != 1:
          204                     msg = "Enter your Ledger PIN - remaining attempts : " + str(remaining_attempts)
          205                 else:
          206                     msg = "Enter your Ledger PIN - WARNING : LAST ATTEMPT. If the PIN is not correct, the dongle will be wiped."
          207                 confirmed, p, pin = self.password_dialog(msg)
          208                 if not confirmed:
          209                     raise UserFacingException('Aborted by user - please unplug the dongle and plug it again before retrying')
          210                 pin = pin.encode()
          211                 self.dongleObject.verifyPin(pin)
          212         except BTChipException as e:
          213             if (e.sw == 0x6faa):
          214                 raise UserFacingException("Dongle is temporarily locked - please unplug it and replug it again")
          215             if ((e.sw & 0xFFF0) == 0x63c0):
          216                 raise UserFacingException("Invalid PIN - please unplug the dongle and plug it again before retrying")
          217             if e.sw == 0x6f00 and e.message == 'Invalid channel':
          218                 # based on docs 0x6f00 might be a more general error, hence we also compare message to be sure
          219                 raise UserFacingException("Invalid channel.\n"
          220                                           "Please make sure that 'Browser support' is disabled on your device.")
          221             raise e
          222 
          223     @runs_in_hwd_thread
          224     def checkDevice(self):
          225         if not self.preflightDone:
          226             try:
          227                 self.perform_hw1_preflight()
          228             except BTChipException as e:
          229                 if (e.sw == 0x6d00 or e.sw == 0x6700):
          230                     raise UserFacingException(_("Device not in Bitcoin mode")) from e
          231                 raise e
          232             self.preflightDone = True
          233 
          234     def password_dialog(self, msg=None):
          235         response = self.handler.get_word(msg)
          236         if response is None:
          237             return False, None, None
          238         return True, response, response
          239 
          240 
          241 class Ledger_KeyStore(Hardware_KeyStore):
          242     hw_type = 'ledger'
          243     device = 'Ledger'
          244 
          245     plugin: 'LedgerPlugin'
          246 
          247     def __init__(self, d):
          248         Hardware_KeyStore.__init__(self, d)
          249         # Errors and other user interaction is done through the wallet's
          250         # handler.  The handler is per-window and preserved across
          251         # device reconnects
          252         self.force_watching_only = False
          253         self.signing = False
          254         self.cfg = d.get('cfg', {'mode': 0})
          255 
          256     def dump(self):
          257         obj = Hardware_KeyStore.dump(self)
          258         obj['cfg'] = self.cfg
          259         return obj
          260 
          261     def get_client(self):
          262         return self.plugin.get_client(self).dongleObject
          263 
          264     def get_client_electrum(self) -> Optional[Ledger_Client]:
          265         return self.plugin.get_client(self)
          266 
          267     def give_error(self, message, clear_client = False):
          268         _logger.info(message)
          269         if not self.signing:
          270             self.handler.show_error(message)
          271         else:
          272             self.signing = False
          273         if clear_client:
          274             self.client = None
          275         raise UserFacingException(message)
          276 
          277     def set_and_unset_signing(func):
          278         """Function decorator to set and unset self.signing."""
          279         def wrapper(self, *args, **kwargs):
          280             try:
          281                 self.signing = True
          282                 return func(self, *args, **kwargs)
          283             finally:
          284                 self.signing = False
          285         return wrapper
          286 
          287     def decrypt_message(self, pubkey, message, password):
          288         raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device))
          289 
          290     @runs_in_hwd_thread
          291     @test_pin_unlocked
          292     @set_and_unset_signing
          293     def sign_message(self, sequence, message, password):
          294         message = message.encode('utf8')
          295         message_hash = hashlib.sha256(message).hexdigest().upper()
          296         # prompt for the PIN before displaying the dialog if necessary
          297         client_ledger = self.get_client()
          298         client_electrum = self.get_client_electrum()
          299         address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
          300         self.handler.show_message("Signing message ...\r\nMessage hash: "+message_hash)
          301         try:
          302             info = client_ledger.signMessagePrepare(address_path, message)
          303             pin = ""
          304             if info['confirmationNeeded']:
          305                 # do the authenticate dialog and get pin:
          306                 pin = self.handler.get_auth(info, client=client_electrum)
          307                 if not pin:
          308                     raise UserWarning(_('Cancelled by user'))
          309                 pin = str(pin).encode()
          310             signature = client_ledger.signMessageSign(pin)
          311         except BTChipException as e:
          312             if e.sw == 0x6a80:
          313                 self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. Only alphanumerical messages shorter than 140 characters are supported. Please remove any extra characters (tab, carriage return) and retry.")
          314             elif e.sw == 0x6985:  # cancelled by user
          315                 return b''
          316             elif e.sw == 0x6982:
          317                 raise  # pin lock. decorator will catch it
          318             else:
          319                 self.give_error(e, True)
          320         except UserWarning:
          321             self.handler.show_error(_('Cancelled by user'))
          322             return b''
          323         except Exception as e:
          324             self.give_error(e, True)
          325         finally:
          326             self.handler.finished()
          327         # Parse the ASN.1 signature
          328         rLength = signature[3]
          329         r = signature[4 : 4 + rLength]
          330         sLength = signature[4 + rLength + 1]
          331         s = signature[4 + rLength + 2:]
          332         if rLength == 33:
          333             r = r[1:]
          334         if sLength == 33:
          335             s = s[1:]
          336         # And convert it
          337 
          338         # Pad r and s points with 0x00 bytes when the point is small to get valid signature.
          339         r_padded = bytes([0x00]) * (32 - len(r)) + r
          340         s_padded = bytes([0x00]) * (32 - len(s)) + s
          341         
          342         return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded
          343 
          344     @runs_in_hwd_thread
          345     @test_pin_unlocked
          346     @set_and_unset_signing
          347     def sign_transaction(self, tx, password):
          348         if tx.is_complete():
          349             return
          350         inputs = []
          351         inputsPaths = []
          352         chipInputs = []
          353         redeemScripts = []
          354         changePath = ""
          355         output = None
          356         p2shTransaction = False
          357         segwitTransaction = False
          358         pin = ""
          359         client_ledger = self.get_client() # prompt for the PIN before displaying the dialog if necessary
          360         client_electrum = self.get_client_electrum()
          361         assert client_electrum
          362 
          363         # Fetch inputs of the transaction to sign
          364         for txin in tx.inputs():
          365             if txin.is_coinbase_input():
          366                 self.give_error("Coinbase not supported")     # should never happen
          367 
          368             if txin.script_type in ['p2sh']:
          369                 p2shTransaction = True
          370 
          371             if txin.script_type in ['p2wpkh-p2sh', 'p2wsh-p2sh']:
          372                 if not client_electrum.supports_segwit():
          373                     self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT)
          374                 segwitTransaction = True
          375 
          376             if txin.script_type in ['p2wpkh', 'p2wsh']:
          377                 if not client_electrum.supports_native_segwit():
          378                     self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT)
          379                 segwitTransaction = True
          380 
          381             my_pubkey, full_path = self.find_my_pubkey_in_txinout(txin)
          382             if not full_path:
          383                 self.give_error("No matching pubkey for sign_transaction")  # should never happen
          384             full_path = convert_bip32_intpath_to_strpath(full_path)[2:]
          385 
          386             redeemScript = Transaction.get_preimage_script(txin)
          387             txin_prev_tx = txin.utxo
          388             if txin_prev_tx is None and not txin.is_segwit():
          389                 raise UserFacingException(_('Missing previous tx for legacy input.'))
          390             txin_prev_tx_raw = txin_prev_tx.serialize() if txin_prev_tx else None
          391             inputs.append([txin_prev_tx_raw,
          392                            txin.prevout.out_idx,
          393                            redeemScript,
          394                            txin.prevout.txid.hex(),
          395                            my_pubkey,
          396                            txin.nsequence,
          397                            txin.value_sats()])
          398             inputsPaths.append(full_path)
          399 
          400         # Sanity check
          401         if p2shTransaction:
          402             for txin in tx.inputs():
          403                 if txin.script_type != 'p2sh':
          404                     self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen
          405 
          406         txOutput = var_int(len(tx.outputs()))
          407         for o in tx.outputs():
          408             txOutput += int_to_hex(o.value, 8)
          409             script = o.scriptpubkey.hex()
          410             txOutput += var_int(len(script)//2)
          411             txOutput += script
          412         txOutput = bfh(txOutput)
          413 
          414         if not client_electrum.supports_multi_output():
          415             if len(tx.outputs()) > 2:
          416                 self.give_error("Transaction with more than 2 outputs not supported")
          417         for txout in tx.outputs():
          418             if client_electrum.is_hw1() and txout.address and not is_b58_address(txout.address):
          419                 self.give_error(_("This {} device can only send to base58 addresses.").format(self.device))
          420             if not txout.address:
          421                 if client_electrum.is_hw1():
          422                     self.give_error(_("Only address outputs are supported by {}").format(self.device))
          423                 # note: max_size based on https://github.com/LedgerHQ/ledger-app-btc/commit/3a78dee9c0484821df58975803e40d58fbfc2c38#diff-c61ccd96a6d8b54d48f54a3bc4dfa7e2R26
          424                 validate_op_return_output(txout, max_size=190)
          425 
          426         # Output "change" detection
          427         # - only one output and one change is authorized (for hw.1 and nano)
          428         # - at most one output can bypass confirmation (~change) (for all)
          429         if not p2shTransaction:
          430             has_change = False
          431             any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
          432             for txout in tx.outputs():
          433                 if txout.is_mine and len(tx.outputs()) > 1 \
          434                         and not has_change:
          435                     # prioritise hiding outputs on the 'change' branch from user
          436                     # because no more than one change address allowed
          437                     if txout.is_change == any_output_on_change_branch:
          438                         my_pubkey, changePath = self.find_my_pubkey_in_txinout(txout)
          439                         assert changePath
          440                         changePath = convert_bip32_intpath_to_strpath(changePath)[2:]
          441                         has_change = True
          442                     else:
          443                         output = txout.address
          444                 else:
          445                     output = txout.address
          446 
          447         self.handler.show_message(_("Confirm Transaction on your Ledger device..."))
          448         try:
          449             # Get trusted inputs from the original transactions
          450             for utxo in inputs:
          451                 sequence = int_to_hex(utxo[5], 4)
          452                 if segwitTransaction and not client_electrum.supports_segwit_trustedInputs():
          453                     tmp = bfh(utxo[3])[::-1]
          454                     tmp += bfh(int_to_hex(utxo[1], 4))
          455                     tmp += bfh(int_to_hex(utxo[6], 8))  # txin['value']
          456                     chipInputs.append({'value' : tmp, 'witness' : True, 'sequence' : sequence})
          457                     redeemScripts.append(bfh(utxo[2]))
          458                 elif (not p2shTransaction) or client_electrum.supports_multi_output():
          459                     txtmp = bitcoinTransaction(bfh(utxo[0]))
          460                     trustedInput = client_ledger.getTrustedInput(txtmp, utxo[1])
          461                     trustedInput['sequence'] = sequence
          462                     if segwitTransaction:
          463                         trustedInput['witness'] = True
          464                     chipInputs.append(trustedInput)
          465                     if p2shTransaction or segwitTransaction:
          466                         redeemScripts.append(bfh(utxo[2]))
          467                     else:
          468                         redeemScripts.append(txtmp.outputs[utxo[1]].script)
          469                 else:
          470                     tmp = bfh(utxo[3])[::-1]
          471                     tmp += bfh(int_to_hex(utxo[1], 4))
          472                     chipInputs.append({'value' : tmp, 'sequence' : sequence})
          473                     redeemScripts.append(bfh(utxo[2]))
          474 
          475             # Sign all inputs
          476             firstTransaction = True
          477             inputIndex = 0
          478             rawTx = tx.serialize_to_network()
          479             client_ledger.enableAlternate2fa(False)
          480             if segwitTransaction:
          481                 client_ledger.startUntrustedTransaction(True, inputIndex,
          482                                                             chipInputs, redeemScripts[inputIndex], version=tx.version)
          483                 # we don't set meaningful outputAddress, amount and fees
          484                 # as we only care about the alternateEncoding==True branch
          485                 outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
          486                 outputData['outputData'] = txOutput
          487                 if outputData['confirmationNeeded']:
          488                     outputData['address'] = output
          489                     self.handler.finished()
          490                     # do the authenticate dialog and get pin:
          491                     pin = self.handler.get_auth(outputData, client=client_electrum)
          492                     if not pin:
          493                         raise UserWarning()
          494                     self.handler.show_message(_("Confirmed. Signing Transaction..."))
          495                 while inputIndex < len(inputs):
          496                     singleInput = [ chipInputs[inputIndex] ]
          497                     client_ledger.startUntrustedTransaction(False, 0,
          498                                                             singleInput, redeemScripts[inputIndex], version=tx.version)
          499                     inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
          500                     inputSignature[0] = 0x30 # force for 1.4.9+
          501                     my_pubkey = inputs[inputIndex][4]
          502                     tx.add_signature_to_txin(txin_idx=inputIndex,
          503                                              signing_pubkey=my_pubkey.hex(),
          504                                              sig=inputSignature.hex())
          505                     inputIndex = inputIndex + 1
          506             else:
          507                 while inputIndex < len(inputs):
          508                     client_ledger.startUntrustedTransaction(firstTransaction, inputIndex,
          509                                                                 chipInputs, redeemScripts[inputIndex], version=tx.version)
          510                     # we don't set meaningful outputAddress, amount and fees
          511                     # as we only care about the alternateEncoding==True branch
          512                     outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
          513                     outputData['outputData'] = txOutput
          514                     if outputData['confirmationNeeded']:
          515                         outputData['address'] = output
          516                         self.handler.finished()
          517                         # do the authenticate dialog and get pin:
          518                         pin = self.handler.get_auth(outputData, client=client_electrum)
          519                         if not pin:
          520                             raise UserWarning()
          521                         self.handler.show_message(_("Confirmed. Signing Transaction..."))
          522                     else:
          523                         # Sign input with the provided PIN
          524                         inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
          525                         inputSignature[0] = 0x30 # force for 1.4.9+
          526                         my_pubkey = inputs[inputIndex][4]
          527                         tx.add_signature_to_txin(txin_idx=inputIndex,
          528                                                  signing_pubkey=my_pubkey.hex(),
          529                                                  sig=inputSignature.hex())
          530                         inputIndex = inputIndex + 1
          531                     firstTransaction = False
          532         except UserWarning:
          533             self.handler.show_error(_('Cancelled by user'))
          534             return
          535         except BTChipException as e:
          536             if e.sw in (0x6985, 0x6d00):  # cancelled by user
          537                 return
          538             elif e.sw == 0x6982:
          539                 raise  # pin lock. decorator will catch it
          540             else:
          541                 self.logger.exception('')
          542                 self.give_error(e, True)
          543         except BaseException as e:
          544             self.logger.exception('')
          545             self.give_error(e, True)
          546         finally:
          547             self.handler.finished()
          548 
          549     @runs_in_hwd_thread
          550     @test_pin_unlocked
          551     @set_and_unset_signing
          552     def show_address(self, sequence, txin_type):
          553         client = self.get_client()
          554         address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
          555         self.handler.show_message(_("Showing address ..."))
          556         segwit = is_segwit_script_type(txin_type)
          557         segwitNative = txin_type == 'p2wpkh'
          558         try:
          559             client.getWalletPublicKey(address_path, showOnScreen=True, segwit=segwit, segwitNative=segwitNative)
          560         except BTChipException as e:
          561             if e.sw == 0x6985:  # cancelled by user
          562                 pass
          563             elif e.sw == 0x6982:
          564                 raise  # pin lock. decorator will catch it
          565             elif e.sw == 0x6b00:  # hw.1 raises this
          566                 self.handler.show_error('{}\n{}\n{}'.format(
          567                     _('Error showing address') + ':',
          568                     e,
          569                     _('Your device might not have support for this functionality.')))
          570             else:
          571                 self.logger.exception('')
          572                 self.handler.show_error(e)
          573         except BaseException as e:
          574             self.logger.exception('')
          575             self.handler.show_error(e)
          576         finally:
          577             self.handler.finished()
          578 
          579 class LedgerPlugin(HW_PluginBase):
          580     keystore_class = Ledger_KeyStore
          581     minimum_library = (0, 1, 32)
          582     client = None
          583     DEVICE_IDS = [
          584                    (0x2581, 0x1807), # HW.1 legacy btchip
          585                    (0x2581, 0x2b7c), # HW.1 transitional production
          586                    (0x2581, 0x3b7c), # HW.1 ledger production
          587                    (0x2581, 0x4b7c), # HW.1 ledger test
          588                    (0x2c97, 0x0000), # Blue
          589                    (0x2c97, 0x0001), # Nano-S
          590                    (0x2c97, 0x0004), # Nano-X
          591                    (0x2c97, 0x0005), # RFU
          592                    (0x2c97, 0x0006), # RFU
          593                    (0x2c97, 0x0007), # RFU
          594                    (0x2c97, 0x0008), # RFU
          595                    (0x2c97, 0x0009), # RFU
          596                    (0x2c97, 0x000a)  # RFU
          597                  ]
          598     VENDOR_IDS = (0x2c97, )
          599     LEDGER_MODEL_IDS = {
          600         0x10: "Ledger Nano S",
          601         0x40: "Ledger Nano X",
          602     }
          603     SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
          604 
          605     def __init__(self, parent, config, name):
          606         self.segwit = config.get("segwit")
          607         HW_PluginBase.__init__(self, parent, config, name)
          608         self.libraries_available = self.check_libraries_available()
          609         if not self.libraries_available:
          610             return
          611         # to support legacy devices and legacy firmwares
          612         self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
          613         # to support modern firmware
          614         self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self)
          615 
          616     def get_library_version(self):
          617         try:
          618             import btchip
          619             version = btchip.__version__
          620         except ImportError:
          621             raise
          622         except:
          623             version = "unknown"
          624         if BTCHIP:
          625             return version
          626         else:
          627             raise LibraryFoundButUnusable(library_version=version)
          628 
          629     @classmethod
          630     def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]:
          631         """Returns (can_recognize, model_name) tuple."""
          632         # legacy product_keys
          633         if product_key in cls.DEVICE_IDS:
          634             if product_key[0] == 0x2581:
          635                 return True, "Ledger HW.1"
          636             if product_key == (0x2c97, 0x0000):
          637                 return True, "Ledger Blue"
          638             if product_key == (0x2c97, 0x0001):
          639                 return True, "Ledger Nano S"
          640             if product_key == (0x2c97, 0x0004):
          641                 return True, "Ledger Nano X"
          642             return True, None
          643         # modern product_keys
          644         if product_key[0] == 0x2c97:
          645             product_id = product_key[1]
          646             model_id = product_id >> 8
          647             if model_id in cls.LEDGER_MODEL_IDS:
          648                 model_name = cls.LEDGER_MODEL_IDS[model_id]
          649                 return True, model_name
          650         # give up
          651         return False, None
          652 
          653     def can_recognize_device(self, device: Device) -> bool:
          654         return self._recognize_device(device.product_key)[0]
          655 
          656     @classmethod
          657     def device_name_from_product_key(cls, product_key) -> Optional[str]:
          658         return cls._recognize_device(product_key)[1]
          659 
          660     def create_device_from_hid_enumeration(self, d, *, product_key):
          661         device = super().create_device_from_hid_enumeration(d, product_key=product_key)
          662         if not self.can_recognize_device(device):
          663             return None
          664         return device
          665 
          666     @runs_in_hwd_thread
          667     def get_btchip_device(self, device):
          668         ledger = False
          669         if device.product_key[0] == 0x2581 and device.product_key[1] == 0x3b7c:
          670             ledger = True
          671         if device.product_key[0] == 0x2581 and device.product_key[1] == 0x4b7c:
          672             ledger = True
          673         if device.product_key[0] == 0x2c97:
          674             if device.interface_number == 0 or device.usage_page == 0xffa0:
          675                 ledger = True
          676             else:
          677                 return None  # non-compatible interface of a Nano S or Blue
          678         dev = hid.device()
          679         dev.open_path(device.path)
          680         dev.set_nonblocking(True)
          681         return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG)
          682 
          683     @runs_in_hwd_thread
          684     def create_client(self, device, handler):
          685         if handler:
          686             self.handler = handler
          687 
          688         client = self.get_btchip_device(device)
          689         if client is not None:
          690             client = Ledger_Client(client, product_key=device.product_key, plugin=self)
          691         return client
          692 
          693     def setup_device(self, device_info, wizard, purpose):
          694         device_id = device_info.device.id_
          695         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
          696         wizard.run_task_without_blocking_gui(
          697             task=lambda: client.get_xpub("m/0'", 'standard'))  # TODO replace by direct derivation once Nano S > 1.1
          698         return client
          699 
          700     def get_xpub(self, device_id, derivation, xtype, wizard):
          701         if xtype not in self.SUPPORTED_XTYPES:
          702             raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
          703         client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
          704         client.checkDevice()
          705         xpub = client.get_xpub(derivation, xtype)
          706         return xpub
          707 
          708     @runs_in_hwd_thread
          709     def get_client(self, keystore, force_pair=True, *,
          710                    devices=None, allow_user_interaction=True):
          711         # All client interaction should not be in the main GUI thread
          712         client = super().get_client(keystore, force_pair,
          713                                     devices=devices,
          714                                     allow_user_interaction=allow_user_interaction)
          715         # returns the client for a given keystore. can use xpub
          716         #if client:
          717         #    client.used()
          718         if client is not None:
          719             client.checkDevice()
          720         return client
          721 
          722     @runs_in_hwd_thread
          723     def show_address(self, wallet, address, keystore=None):
          724         if keystore is None:
          725             keystore = wallet.get_keystore()
          726         if not self.show_address_helper(wallet, address, keystore):
          727             return
          728         if type(wallet) is not Standard_Wallet:
          729             keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
          730             return
          731         sequence = wallet.get_address_index(address)
          732         txin_type = wallet.get_txin_type(address)
          733         keystore.show_address(sequence, txin_type)