tsafe_t.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tsafe_t.py (19535B)
---
1 from binascii import hexlify, unhexlify
2 import traceback
3 import sys
4 from typing import NamedTuple, Any, Optional, Dict, Union, List, Tuple, TYPE_CHECKING
5
6 from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
7 from electrum.bip32 import BIP32Node
8 from electrum import constants
9 from electrum.i18n import _
10 from electrum.plugin import Device, runs_in_hwd_thread
11 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
12 from electrum.keystore import Hardware_KeyStore
13 from electrum.base_wizard import ScriptTypeNotSupported
14
15 from ..hw_wallet import HW_PluginBase
16 from ..hw_wallet.plugin import (is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data,
17 get_xpubs_and_der_suffixes_from_txinout)
18
19 if TYPE_CHECKING:
20 from .client import SafeTClient
21
22 # Safe-T mini initialization methods
23 TIM_NEW, TIM_RECOVER, TIM_MNEMONIC, TIM_PRIVKEY = range(0, 4)
24
25
26 class SafeTKeyStore(Hardware_KeyStore):
27 hw_type = 'safe_t'
28 device = 'Safe-T mini'
29
30 plugin: 'SafeTPlugin'
31
32 def get_client(self, force_pair=True):
33 return self.plugin.get_client(self, force_pair)
34
35 def decrypt_message(self, sequence, message, password):
36 raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device))
37
38 @runs_in_hwd_thread
39 def sign_message(self, sequence, message, password):
40 client = self.get_client()
41 address_path = self.get_derivation_prefix() + "/%d/%d"%sequence
42 address_n = client.expand_path(address_path)
43 msg_sig = client.sign_message(self.plugin.get_coin_name(), address_n, message)
44 return msg_sig.signature
45
46 @runs_in_hwd_thread
47 def sign_transaction(self, tx, password):
48 if tx.is_complete():
49 return
50 # previous transactions used as inputs
51 prev_tx = {}
52 for txin in tx.inputs():
53 tx_hash = txin.prevout.txid.hex()
54 if txin.utxo is None and not txin.is_segwit():
55 raise UserFacingException(_('Missing previous tx for legacy input.'))
56 prev_tx[tx_hash] = txin.utxo
57
58 self.plugin.sign_transaction(self, tx, prev_tx)
59
60
61 class SafeTPlugin(HW_PluginBase):
62 # Derived classes provide:
63 #
64 # class-static variables: client_class, firmware_URL, handler_class,
65 # libraries_available, libraries_URL, minimum_firmware,
66 # wallet_class, types
67
68 firmware_URL = 'https://safe-t.io'
69 libraries_URL = 'https://github.com/archos-safe-t/python-safet'
70 minimum_firmware = (1, 0, 5)
71 keystore_class = SafeTKeyStore
72 minimum_library = (0, 1, 0)
73 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
74
75 MAX_LABEL_LEN = 32
76
77 def __init__(self, parent, config, name):
78 HW_PluginBase.__init__(self, parent, config, name)
79
80 self.libraries_available = self.check_libraries_available()
81 if not self.libraries_available:
82 return
83
84 from . import client
85 from . import transport
86 import safetlib.messages
87 self.client_class = client.SafeTClient
88 self.types = safetlib.messages
89 self.DEVICE_IDS = ('Safe-T mini',)
90
91 self.transport_handler = transport.SafeTTransport()
92 self.device_manager().register_enumerate_func(self.enumerate)
93
94 def get_library_version(self):
95 import safetlib
96 try:
97 return safetlib.__version__
98 except AttributeError:
99 return 'unknown'
100
101 @runs_in_hwd_thread
102 def enumerate(self):
103 devices = self.transport_handler.enumerate_devices()
104 return [Device(path=d.get_path(),
105 interface_number=-1,
106 id_=d.get_path(),
107 product_key='Safe-T mini',
108 usage_page=0,
109 transport_ui_string=d.get_path())
110 for d in devices]
111
112 @runs_in_hwd_thread
113 def create_client(self, device, handler):
114 try:
115 self.logger.info(f"connecting to device at {device.path}")
116 transport = self.transport_handler.get_transport(device.path)
117 except BaseException as e:
118 self.logger.info(f"cannot connect at {device.path} {e}")
119 return None
120
121 if not transport:
122 self.logger.info(f"cannot connect at {device.path}")
123 return
124
125 self.logger.info(f"connected to device at {device.path}")
126 client = self.client_class(transport, handler, self)
127
128 # Try a ping for device sanity
129 try:
130 client.ping('t')
131 except BaseException as e:
132 self.logger.info(f"ping failed {e}")
133 return None
134
135 if not client.atleast_version(*self.minimum_firmware):
136 msg = (_('Outdated {} firmware for device labelled {}. Please '
137 'download the updated firmware from {}')
138 .format(self.device, client.label(), self.firmware_URL))
139 self.logger.info(msg)
140 if handler:
141 handler.show_error(msg)
142 else:
143 raise UserFacingException(msg)
144 return None
145
146 return client
147
148 @runs_in_hwd_thread
149 def get_client(self, keystore, force_pair=True, *,
150 devices=None, allow_user_interaction=True) -> Optional['SafeTClient']:
151 client = super().get_client(keystore, force_pair,
152 devices=devices,
153 allow_user_interaction=allow_user_interaction)
154 # returns the client for a given keystore. can use xpub
155 if client:
156 client.used()
157 return client
158
159 def get_coin_name(self):
160 return "Testnet" if constants.net.TESTNET else "Bitcoin"
161
162 def initialize_device(self, device_id, wizard, handler):
163 # Initialization method
164 msg = _("Choose how you want to initialize your {}.\n\n"
165 "The first two methods are secure as no secret information "
166 "is entered into your computer.\n\n"
167 "For the last two methods you input secrets on your keyboard "
168 "and upload them to your {}, and so you should "
169 "only do those on a computer you know to be trustworthy "
170 "and free of malware."
171 ).format(self.device, self.device)
172 choices = [
173 # Must be short as QT doesn't word-wrap radio button text
174 (TIM_NEW, _("Let the device generate a completely new seed randomly")),
175 (TIM_RECOVER, _("Recover from a seed you have previously written down")),
176 (TIM_MNEMONIC, _("Upload a BIP39 mnemonic to generate the seed")),
177 (TIM_PRIVKEY, _("Upload a master private key"))
178 ]
179 def f(method):
180 import threading
181 settings = self.request_safe_t_init_settings(wizard, method, self.device)
182 t = threading.Thread(target=self._initialize_device_safe, args=(settings, method, device_id, wizard, handler))
183 t.setDaemon(True)
184 t.start()
185 exit_code = wizard.loop.exec_()
186 if exit_code != 0:
187 # this method (initialize_device) was called with the expectation
188 # of leaving the device in an initialized state when finishing.
189 # signal that this is not the case:
190 raise UserCancelled()
191 wizard.choice_dialog(title=_('Initialize Device'), message=msg, choices=choices, run_next=f)
192
193 def _initialize_device_safe(self, settings, method, device_id, wizard, handler):
194 exit_code = 0
195 try:
196 self._initialize_device(settings, method, device_id, wizard, handler)
197 except UserCancelled:
198 exit_code = 1
199 except BaseException as e:
200 self.logger.exception('')
201 handler.show_error(repr(e))
202 exit_code = 1
203 finally:
204 wizard.loop.exit(exit_code)
205
206 @runs_in_hwd_thread
207 def _initialize_device(self, settings, method, device_id, wizard, handler):
208 item, label, pin_protection, passphrase_protection = settings
209
210 if method == TIM_RECOVER:
211 handler.show_error(_(
212 "You will be asked to enter 24 words regardless of your "
213 "seed's actual length. If you enter a word incorrectly or "
214 "misspell it, you cannot change it or go back - you will need "
215 "to start again from the beginning.\n\nSo please enter "
216 "the words carefully!"),
217 blocking=True)
218
219 language = 'english'
220 devmgr = self.device_manager()
221 client = devmgr.client_by_id(device_id)
222 if not client:
223 raise Exception(_("The device was disconnected."))
224
225 if method == TIM_NEW:
226 strength = 64 * (item + 2) # 128, 192 or 256
227 u2f_counter = 0
228 skip_backup = False
229 client.reset_device(True, strength, passphrase_protection,
230 pin_protection, label, language,
231 u2f_counter, skip_backup)
232 elif method == TIM_RECOVER:
233 word_count = 6 * (item + 2) # 12, 18 or 24
234 client.step = 0
235 client.recovery_device(word_count, passphrase_protection,
236 pin_protection, label, language)
237 elif method == TIM_MNEMONIC:
238 pin = pin_protection # It's the pin, not a boolean
239 client.load_device_by_mnemonic(str(item), pin,
240 passphrase_protection,
241 label, language)
242 else:
243 pin = pin_protection # It's the pin, not a boolean
244 client.load_device_by_xprv(item, pin, passphrase_protection,
245 label, language)
246
247 def _make_node_path(self, xpub, address_n):
248 bip32node = BIP32Node.from_xkey(xpub)
249 node = self.types.HDNodeType(
250 depth=bip32node.depth,
251 fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
252 child_num=int.from_bytes(bip32node.child_number, 'big'),
253 chain_code=bip32node.chaincode,
254 public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
255 )
256 return self.types.HDNodePathType(node=node, address_n=address_n)
257
258 def setup_device(self, device_info, wizard, purpose):
259 device_id = device_info.device.id_
260 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
261 if not device_info.initialized:
262 self.initialize_device(device_id, wizard, client.handler)
263 wizard.run_task_without_blocking_gui(
264 task=lambda: client.get_xpub("m", 'standard'))
265 client.used()
266 return client
267
268 def get_xpub(self, device_id, derivation, xtype, wizard):
269 if xtype not in self.SUPPORTED_XTYPES:
270 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
271 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
272 xpub = client.get_xpub(derivation, xtype)
273 client.used()
274 return xpub
275
276 def get_safet_input_script_type(self, electrum_txin_type: str):
277 if electrum_txin_type in ('p2wpkh', 'p2wsh'):
278 return self.types.InputScriptType.SPENDWITNESS
279 if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
280 return self.types.InputScriptType.SPENDP2SHWITNESS
281 if electrum_txin_type in ('p2pkh', ):
282 return self.types.InputScriptType.SPENDADDRESS
283 if electrum_txin_type in ('p2sh', ):
284 return self.types.InputScriptType.SPENDMULTISIG
285 raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
286
287 def get_safet_output_script_type(self, electrum_txin_type: str):
288 if electrum_txin_type in ('p2wpkh', 'p2wsh'):
289 return self.types.OutputScriptType.PAYTOWITNESS
290 if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
291 return self.types.OutputScriptType.PAYTOP2SHWITNESS
292 if electrum_txin_type in ('p2pkh', ):
293 return self.types.OutputScriptType.PAYTOADDRESS
294 if electrum_txin_type in ('p2sh', ):
295 return self.types.OutputScriptType.PAYTOMULTISIG
296 raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
297
298 @runs_in_hwd_thread
299 def sign_transaction(self, keystore, tx: PartialTransaction, prev_tx):
300 self.prev_tx = prev_tx
301 client = self.get_client(keystore)
302 inputs = self.tx_inputs(tx, for_sig=True, keystore=keystore)
303 outputs = self.tx_outputs(tx, keystore=keystore)
304 signatures = client.sign_tx(self.get_coin_name(), inputs, outputs,
305 lock_time=tx.locktime, version=tx.version)[0]
306 signatures = [(bh2u(x) + '01') for x in signatures]
307 tx.update_signatures(signatures)
308
309 @runs_in_hwd_thread
310 def show_address(self, wallet, address, keystore=None):
311 if keystore is None:
312 keystore = wallet.get_keystore()
313 if not self.show_address_helper(wallet, address, keystore):
314 return
315 client = self.get_client(keystore)
316 if not client.atleast_version(1, 0):
317 keystore.handler.show_error(_("Your device firmware is too old"))
318 return
319 deriv_suffix = wallet.get_address_index(address)
320 derivation = keystore.get_derivation_prefix()
321 address_path = "%s/%d/%d"%(derivation, *deriv_suffix)
322 address_n = client.expand_path(address_path)
323 script_type = self.get_safet_input_script_type(wallet.txin_type)
324
325 # prepare multisig, if available:
326 xpubs = wallet.get_master_public_keys()
327 if len(xpubs) > 1:
328 pubkeys = wallet.get_public_keys(address)
329 # sort xpubs using the order of pubkeys
330 sorted_pairs = sorted(zip(pubkeys, xpubs))
331 multisig = self._make_multisig(
332 wallet.m,
333 [(xpub, deriv_suffix) for pubkey, xpub in sorted_pairs])
334 else:
335 multisig = None
336
337 client.get_address(self.get_coin_name(), address_n, True, multisig=multisig, script_type=script_type)
338
339 def tx_inputs(self, tx: Transaction, *, for_sig=False, keystore: 'SafeTKeyStore' = None):
340 inputs = []
341 for txin in tx.inputs():
342 txinputtype = self.types.TxInputType()
343 if txin.is_coinbase_input():
344 prev_hash = b"\x00"*32
345 prev_index = 0xffffffff # signed int -1
346 else:
347 if for_sig:
348 assert isinstance(tx, PartialTransaction)
349 assert isinstance(txin, PartialTxInput)
350 assert keystore
351 if len(txin.pubkeys) > 1:
352 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txin)
353 multisig = self._make_multisig(txin.num_sig, xpubs_and_deriv_suffixes)
354 else:
355 multisig = None
356 script_type = self.get_safet_input_script_type(txin.script_type)
357 txinputtype = self.types.TxInputType(
358 script_type=script_type,
359 multisig=multisig)
360 my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin)
361 if full_path:
362 txinputtype._extend_address_n(full_path)
363
364 prev_hash = txin.prevout.txid
365 prev_index = txin.prevout.out_idx
366
367 if txin.value_sats() is not None:
368 txinputtype.amount = txin.value_sats()
369 txinputtype.prev_hash = prev_hash
370 txinputtype.prev_index = prev_index
371
372 if txin.script_sig is not None:
373 txinputtype.script_sig = txin.script_sig
374
375 txinputtype.sequence = txin.nsequence
376
377 inputs.append(txinputtype)
378
379 return inputs
380
381 def _make_multisig(self, m, xpubs):
382 if len(xpubs) == 1:
383 return None
384 pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs]
385 return self.types.MultisigRedeemScriptType(
386 pubkeys=pubkeys,
387 signatures=[b''] * len(pubkeys),
388 m=m)
389
390 def tx_outputs(self, tx: PartialTransaction, *, keystore: 'SafeTKeyStore'):
391
392 def create_output_by_derivation():
393 script_type = self.get_safet_output_script_type(txout.script_type)
394 if len(txout.pubkeys) > 1:
395 xpubs_and_deriv_suffixes = get_xpubs_and_der_suffixes_from_txinout(tx, txout)
396 multisig = self._make_multisig(txout.num_sig, xpubs_and_deriv_suffixes)
397 else:
398 multisig = None
399 my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txout)
400 assert full_path
401 txoutputtype = self.types.TxOutputType(
402 multisig=multisig,
403 amount=txout.value,
404 address_n=full_path,
405 script_type=script_type)
406 return txoutputtype
407
408 def create_output_by_address():
409 txoutputtype = self.types.TxOutputType()
410 txoutputtype.amount = txout.value
411 if address:
412 txoutputtype.script_type = self.types.OutputScriptType.PAYTOADDRESS
413 txoutputtype.address = address
414 else:
415 txoutputtype.script_type = self.types.OutputScriptType.PAYTOOPRETURN
416 txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(txout)
417 return txoutputtype
418
419 outputs = []
420 has_change = False
421 any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
422
423 for txout in tx.outputs():
424 address = txout.address
425 use_create_by_derivation = False
426
427 if txout.is_mine and not has_change:
428 # prioritise hiding outputs on the 'change' branch from user
429 # because no more than one change address allowed
430 # note: ^ restriction can be removed once we require fw
431 # that has https://github.com/trezor/trezor-mcu/pull/306
432 if txout.is_change == any_output_on_change_branch:
433 use_create_by_derivation = True
434 has_change = True
435
436 if use_create_by_derivation:
437 txoutputtype = create_output_by_derivation()
438 else:
439 txoutputtype = create_output_by_address()
440 outputs.append(txoutputtype)
441
442 return outputs
443
444 def electrum_tx_to_txtype(self, tx: Optional[Transaction]):
445 t = self.types.TransactionType()
446 if tx is None:
447 # probably for segwit input and we don't need this prev txn
448 return t
449 tx.deserialize()
450 t.version = tx.version
451 t.lock_time = tx.locktime
452 inputs = self.tx_inputs(tx)
453 t._extend_inputs(inputs)
454 for out in tx.outputs():
455 o = t._add_bin_outputs()
456 o.amount = out.value
457 o.script_pubkey = out.scriptpubkey
458 return t
459
460 # This function is called from the TREZOR libraries (via tx_api)
461 def get_tx(self, tx_hash):
462 tx = self.prev_tx[tx_hash]
463 return self.electrum_tx_to_txtype(tx)