tcoldcard.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tcoldcard.py (24379B)
---
1 #
2 # Coldcard Electrum plugin main code.
3 #
4 #
5 import os, time, io
6 import traceback
7 from typing import TYPE_CHECKING, Optional
8 import struct
9
10 from electrum import bip32
11 from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes
12 from electrum.i18n import _
13 from electrum.plugin import Device, hook, runs_in_hwd_thread
14 from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK
15 from electrum.transaction import PartialTransaction
16 from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet
17 from electrum.util import bfh, bh2u, versiontuple, UserFacingException
18 from electrum.base_wizard import ScriptTypeNotSupported
19 from electrum.logging import get_logger
20
21 from ..hw_wallet import HW_PluginBase, HardwareClientBase
22 from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available
23
24
25 _logger = get_logger(__name__)
26
27
28 try:
29 import hid
30 from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker
31 from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError
32 from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN,
33 AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH)
34
35 from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH
36
37 requirements_ok = True
38
39
40 class ElectrumColdcardDevice(ColdcardDevice):
41 # avoid use of pycoin for MiTM message signature test
42 def mitm_verify(self, sig, expect_xpub):
43 # verify a signature (65 bytes) over the session key, using the master bip32 node
44 # - customized to use specific EC library of Electrum.
45 pubkey = BIP32Node.from_xkey(expect_xpub).eckey
46 try:
47 pubkey.verify_message_hash(sig[1:65], self.session_key)
48 return True
49 except:
50 return False
51
52 except ImportError as e:
53 if not (isinstance(e, ModuleNotFoundError) and e.name == 'ckcc'):
54 _logger.exception('error importing coldcard plugin deps')
55 requirements_ok = False
56
57 COINKITE_VID = 0xd13e
58 CKCC_PID = 0xcc10
59
60 CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa
61
62
63 class CKCCClient(HardwareClientBase):
64
65 def __init__(self, plugin, handler, dev_path, *, is_simulator=False):
66 HardwareClientBase.__init__(self, plugin=plugin)
67 self.device = plugin.device
68 self.handler = handler
69
70 # if we know what the (xfp, xpub) "should be" then track it here
71 self._expected_device = None
72
73 if is_simulator:
74 self.dev = ElectrumColdcardDevice(dev_path, encrypt=True)
75 else:
76 # open the real HID device
77 hd = hid.device(path=dev_path)
78 hd.open_path(dev_path)
79
80 self.dev = ElectrumColdcardDevice(dev=hd, encrypt=True)
81
82 # NOTE: MiTM test is delayed until we have a hint as to what XPUB we
83 # should expect. It's also kinda slow.
84
85 def __repr__(self):
86 return '<CKCCClient: xfp=%s label=%r>' % (xfp2str(self.dev.master_fingerprint),
87 self.label())
88
89 @runs_in_hwd_thread
90 def verify_connection(self, expected_xfp: int, expected_xpub=None):
91 ex = (expected_xfp, expected_xpub)
92
93 if self._expected_device == ex:
94 # all is as expected
95 return
96
97 if expected_xpub is None:
98 expected_xpub = self.dev.master_xpub
99
100 if ( (self._expected_device is not None)
101 or (self.dev.master_fingerprint != expected_xfp)
102 or (self.dev.master_xpub != expected_xpub)):
103 # probably indicating programing error, not hacking
104 _logger.info(f"xpubs. reported by device: {self.dev.master_xpub}. "
105 f"stored in file: {expected_xpub}")
106 raise RuntimeError("Expecting %s but that's not what's connected?!" %
107 xfp2str(expected_xfp))
108
109 # check signature over session key
110 # - mitm might have lied about xfp and xpub up to here
111 # - important that we use value capture at wallet creation time, not some value
112 # we read over USB today
113 self.dev.check_mitm(expected_xpub=expected_xpub)
114
115 self._expected_device = ex
116
117 if not getattr(self, 'ckcc_xpub', None):
118 self.ckcc_xpub = expected_xpub
119
120 _logger.info("Successfully verified against MiTM")
121
122 def is_pairable(self):
123 # can't do anything w/ devices that aren't setup (this code not normally reachable)
124 return bool(self.dev.master_xpub)
125
126 @runs_in_hwd_thread
127 def close(self):
128 # close the HID device (so can be reused)
129 self.dev.close()
130 self.dev = None
131
132 def is_initialized(self):
133 return bool(self.dev.master_xpub)
134
135 def label(self):
136 # 'label' of this Coldcard. Warning: gets saved into wallet file, which might
137 # not be encrypted, so better for privacy if based on xpub/fingerprint rather than
138 # USB serial number.
139 if self.dev.is_simulator:
140 lab = 'Coldcard Simulator ' + xfp2str(self.dev.master_fingerprint)
141 elif not self.dev.master_fingerprint:
142 # failback; not expected
143 lab = 'Coldcard #' + self.dev.serial
144 else:
145 lab = 'Coldcard ' + xfp2str(self.dev.master_fingerprint)
146
147 return lab
148
149 def manipulate_keystore_dict_during_wizard_setup(self, d: dict):
150 master_xpub = self.dev.master_xpub
151 if master_xpub is not None:
152 try:
153 node = BIP32Node.from_xkey(master_xpub)
154 except InvalidMasterKeyVersionBytes:
155 raise UserFacingException(
156 _('Invalid xpub magic. Make sure your {} device is set to the correct chain.').format(self.device) + ' ' +
157 _('You might have to unplug and plug it in again.')
158 ) from None
159 d['ckcc_xpub'] = master_xpub
160
161 @runs_in_hwd_thread
162 def has_usable_connection_with_device(self):
163 # Do end-to-end ping test
164 try:
165 self.ping_check()
166 return True
167 except:
168 return False
169
170 @runs_in_hwd_thread
171 def get_xpub(self, bip32_path, xtype):
172 assert xtype in ColdcardPlugin.SUPPORTED_XTYPES
173 _logger.info('Derive xtype = %r' % xtype)
174 xpub = self.dev.send_recv(CCProtocolPacker.get_xpub(bip32_path), timeout=5000)
175 # TODO handle timeout?
176 # change type of xpub to the requested type
177 try:
178 node = BIP32Node.from_xkey(xpub)
179 except InvalidMasterKeyVersionBytes:
180 raise UserFacingException(_('Invalid xpub magic. Make sure your {} device is set to the correct chain.')
181 .format(self.device)) from None
182 if xtype != 'standard':
183 xpub = node._replace(xtype=xtype).to_xpub()
184 return xpub
185
186 @runs_in_hwd_thread
187 def ping_check(self):
188 # check connection is working
189 assert self.dev.session_key, 'not encrypted?'
190 req = b'1234 Electrum Plugin 4321' # free up to 59 bytes
191 try:
192 echo = self.dev.send_recv(CCProtocolPacker.ping(req))
193 assert echo == req
194 except:
195 raise RuntimeError("Communication trouble with Coldcard")
196
197 @runs_in_hwd_thread
198 def show_address(self, path, addr_fmt):
199 # prompt user w/ address, also returns it immediately.
200 return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None)
201
202 @runs_in_hwd_thread
203 def show_p2sh_address(self, *args, **kws):
204 # prompt user w/ p2sh address, also returns it immediately.
205 return self.dev.send_recv(CCProtocolPacker.show_p2sh_address(*args, **kws), timeout=None)
206
207 @runs_in_hwd_thread
208 def get_version(self):
209 # gives list of strings
210 return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n')
211
212 @runs_in_hwd_thread
213 def sign_message_start(self, path, msg):
214 # this starts the UX experience.
215 self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None)
216
217 @runs_in_hwd_thread
218 def sign_message_poll(self):
219 # poll device... if user has approved, will get tuple: (addr, sig) else None
220 return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None)
221
222 @runs_in_hwd_thread
223 def sign_transaction_start(self, raw_psbt: bytes, *, finalize: bool = False):
224 # Multiple steps to sign:
225 # - upload binary
226 # - start signing UX
227 # - wait for coldcard to complete process, or have it refused.
228 # - download resulting txn
229 assert 20 <= len(raw_psbt) < MAX_TXN_LEN, 'PSBT is too big'
230 dlen, chk = self.dev.upload_file(raw_psbt)
231
232 resp = self.dev.send_recv(CCProtocolPacker.sign_transaction(dlen, chk, finalize=finalize),
233 timeout=None)
234
235 if resp != None:
236 raise ValueError(resp)
237
238 @runs_in_hwd_thread
239 def sign_transaction_poll(self):
240 # poll device... if user has approved, will get tuple: (legnth, checksum) else None
241 return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None)
242
243 @runs_in_hwd_thread
244 def download_file(self, length, checksum, file_number=1):
245 # get a file
246 return self.dev.download_file(length, checksum, file_number=file_number)
247
248
249
250 class Coldcard_KeyStore(Hardware_KeyStore):
251 hw_type = 'coldcard'
252 device = 'Coldcard'
253
254 plugin: 'ColdcardPlugin'
255
256 def __init__(self, d):
257 Hardware_KeyStore.__init__(self, d)
258 # Errors and other user interaction is done through the wallet's
259 # handler. The handler is per-window and preserved across
260 # device reconnects
261 self.force_watching_only = False
262 self.ux_busy = False
263
264 # we need to know at least the fingerprint of the master xpub to verify against MiTM
265 # - device reports these value during encryption setup process
266 # - full xpub value now optional
267 self.ckcc_xpub = d.get('ckcc_xpub', None)
268
269 def dump(self):
270 # our additions to the stored data about keystore -- only during creation?
271 d = Hardware_KeyStore.dump(self)
272 d['ckcc_xpub'] = self.ckcc_xpub
273 return d
274
275 def get_xfp_int(self) -> int:
276 xfp = self.get_root_fingerprint()
277 assert xfp is not None
278 return xfp_int_from_xfp_bytes(bfh(xfp))
279
280 def get_client(self):
281 # called when user tries to do something like view address, sign somthing.
282 # - not called during probing/setup
283 # - will fail if indicated device can't produce the xpub (at derivation) expected
284 rv = self.plugin.get_client(self)
285 if rv:
286 xfp_int = self.get_xfp_int()
287 rv.verify_connection(xfp_int, self.ckcc_xpub)
288
289 return rv
290
291 def give_error(self, message, clear_client=False):
292 self.logger.info(message)
293 if not self.ux_busy:
294 self.handler.show_error(message)
295 else:
296 self.ux_busy = False
297 if clear_client:
298 self.client = None
299 raise UserFacingException(message)
300
301 def wrap_busy(func):
302 # decorator: function takes over the UX on the device.
303 def wrapper(self, *args, **kwargs):
304 try:
305 self.ux_busy = True
306 return func(self, *args, **kwargs)
307 finally:
308 self.ux_busy = False
309 return wrapper
310
311 def decrypt_message(self, pubkey, message, password):
312 raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device))
313
314 @wrap_busy
315 def sign_message(self, sequence, message, password):
316 # Sign a message on device. Since we have big screen, of course we
317 # have to show the message unabiguously there first!
318 try:
319 msg = message.encode('ascii', errors='strict')
320 assert 1 <= len(msg) <= MSG_SIGNING_MAX_LENGTH
321 except (UnicodeError, AssertionError):
322 # there are other restrictions on message content,
323 # but let the device enforce and report those
324 self.handler.show_error('Only short (%d max) ASCII messages can be signed.'
325 % MSG_SIGNING_MAX_LENGTH)
326 return b''
327
328 path = self.get_derivation_prefix() + ("/%d/%d" % sequence)
329 try:
330 cl = self.get_client()
331 try:
332 self.handler.show_message("Signing message (using %s)..." % path)
333
334 cl.sign_message_start(path, msg)
335
336 while 1:
337 # How to kill some time, without locking UI?
338 time.sleep(0.250)
339
340 resp = cl.sign_message_poll()
341 if resp is not None:
342 break
343
344 finally:
345 self.handler.finished()
346
347 assert len(resp) == 2
348 addr, raw_sig = resp
349
350 # already encoded in Bitcoin fashion, binary.
351 assert 40 < len(raw_sig) <= 65
352
353 return raw_sig
354
355 except (CCUserRefused, CCBusyError) as exc:
356 self.handler.show_error(str(exc))
357 except CCProtoError as exc:
358 self.logger.exception('Error showing address')
359 self.handler.show_error('{}\n\n{}'.format(
360 _('Error showing address') + ':', str(exc)))
361 except Exception as e:
362 self.give_error(e, True)
363
364 # give empty bytes for error cases; it seems to clear the old signature box
365 return b''
366
367 @wrap_busy
368 def sign_transaction(self, tx, password):
369 # Upload PSBT for signing.
370 # - we can also work offline (without paired device present)
371 if tx.is_complete():
372 return
373
374 client = self.get_client()
375
376 assert client.dev.master_fingerprint == self.get_xfp_int()
377
378 raw_psbt = tx.serialize_as_bytes()
379
380 try:
381 try:
382 self.handler.show_message("Authorize Transaction...")
383
384 client.sign_transaction_start(raw_psbt)
385
386 while 1:
387 # How to kill some time, without locking UI?
388 time.sleep(0.250)
389
390 resp = client.sign_transaction_poll()
391 if resp is not None:
392 break
393
394 rlen, rsha = resp
395
396 # download the resulting txn.
397 raw_resp = client.download_file(rlen, rsha)
398
399 finally:
400 self.handler.finished()
401
402 except (CCUserRefused, CCBusyError) as exc:
403 self.logger.info(f'Did not sign: {exc}')
404 self.handler.show_error(str(exc))
405 return
406 except BaseException as e:
407 self.logger.exception('')
408 self.give_error(e, True)
409 return
410
411 tx2 = PartialTransaction.from_raw_psbt(raw_resp)
412 # apply partial signatures back into txn
413 tx.combine_with_other_psbt(tx2)
414 # caller's logic looks at tx now and if it's sufficiently signed,
415 # will send it if that's the user's intent.
416
417 @staticmethod
418 def _encode_txin_type(txin_type):
419 # Map from Electrum code names to our code numbers.
420 return {'standard': AF_CLASSIC, 'p2pkh': AF_CLASSIC,
421 'p2sh': AF_P2SH,
422 'p2wpkh-p2sh': AF_P2WPKH_P2SH,
423 'p2wpkh': AF_P2WPKH,
424 'p2wsh-p2sh': AF_P2WSH_P2SH,
425 'p2wsh': AF_P2WSH,
426 }[txin_type]
427
428 @wrap_busy
429 def show_address(self, sequence, txin_type):
430 client = self.get_client()
431 address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
432 addr_fmt = self._encode_txin_type(txin_type)
433 try:
434 try:
435 self.handler.show_message(_("Showing address ..."))
436 dev_addr = client.show_address(address_path, addr_fmt)
437 # we could double check address here
438 finally:
439 self.handler.finished()
440 except CCProtoError as exc:
441 self.logger.exception('Error showing address')
442 self.handler.show_error('{}\n\n{}'.format(
443 _('Error showing address') + ':', str(exc)))
444 except BaseException as exc:
445 self.logger.exception('')
446 self.handler.show_error(exc)
447
448 @wrap_busy
449 def show_p2sh_address(self, M, script, xfp_paths, txin_type):
450 client = self.get_client()
451 addr_fmt = self._encode_txin_type(txin_type)
452 try:
453 try:
454 self.handler.show_message(_("Showing address ..."))
455 dev_addr = client.show_p2sh_address(M, xfp_paths, script, addr_fmt=addr_fmt)
456 # we could double check address here
457 finally:
458 self.handler.finished()
459 except CCProtoError as exc:
460 self.logger.exception('Error showing address')
461 self.handler.show_error('{}.\n{}\n\n{}'.format(
462 _('Error showing address'),
463 _('Make sure you have imported the correct wallet description '
464 'file on the device for this multisig wallet.'),
465 str(exc)))
466 except BaseException as exc:
467 self.logger.exception('')
468 self.handler.show_error(exc)
469
470
471 class ColdcardPlugin(HW_PluginBase):
472 keystore_class = Coldcard_KeyStore
473 minimum_library = (0, 7, 7)
474
475 DEVICE_IDS = [
476 (COINKITE_VID, CKCC_PID),
477 (COINKITE_VID, CKCC_SIMULATED_PID)
478 ]
479
480 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
481
482 def __init__(self, parent, config, name):
483 HW_PluginBase.__init__(self, parent, config, name)
484
485 self.libraries_available = self.check_libraries_available()
486 if not self.libraries_available:
487 return
488
489 self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
490 self.device_manager().register_enumerate_func(self.detect_simulator)
491
492 def get_library_version(self):
493 import ckcc
494 try:
495 version = ckcc.__version__
496 except AttributeError:
497 version = 'unknown'
498 if requirements_ok:
499 return version
500 else:
501 raise LibraryFoundButUnusable(library_version=version)
502
503 def detect_simulator(self):
504 # if there is a simulator running on this machine,
505 # return details about it so it's offered as a pairing choice
506 fn = CKCC_SIMULATOR_PATH
507
508 if os.path.exists(fn):
509 return [Device(path=fn,
510 interface_number=-1,
511 id_=fn,
512 product_key=(COINKITE_VID, CKCC_SIMULATED_PID),
513 usage_page=0,
514 transport_ui_string='simulator')]
515
516 return []
517
518 @runs_in_hwd_thread
519 def create_client(self, device, handler):
520 if handler:
521 self.handler = handler
522
523 # We are given a HID device, or at least some details about it.
524 # Not sure why not we aren't just given a HID library handle, but
525 # the 'path' is unabiguous, so we'll use that.
526 try:
527 rv = CKCCClient(self, handler, device.path,
528 is_simulator=(device.product_key[1] == CKCC_SIMULATED_PID))
529 return rv
530 except Exception as e:
531 self.logger.exception('late failure connecting to device?')
532 return None
533
534 def setup_device(self, device_info, wizard, purpose):
535 device_id = device_info.device.id_
536 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
537 return client
538
539 def get_xpub(self, device_id, derivation, xtype, wizard):
540 # this seems to be part of the pairing process only, not during normal ops?
541 # base_wizard:on_hw_derivation
542 if xtype not in self.SUPPORTED_XTYPES:
543 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
544 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
545 client.ping_check()
546
547 xpub = client.get_xpub(derivation, xtype)
548 return xpub
549
550 @runs_in_hwd_thread
551 def get_client(self, keystore, force_pair=True, *,
552 devices=None, allow_user_interaction=True) -> Optional['CKCCClient']:
553 # Acquire a connection to the hardware device (via USB)
554 client = super().get_client(keystore, force_pair,
555 devices=devices,
556 allow_user_interaction=allow_user_interaction)
557
558 if client is not None:
559 client.ping_check()
560
561 return client
562
563 @staticmethod
564 def export_ms_wallet(wallet: Multisig_Wallet, fp, name):
565 # Build the text file Coldcard needs to understand the multisig wallet
566 # it is participating in. All involved Coldcards can share same file.
567 assert isinstance(wallet, Multisig_Wallet)
568
569 print('# Exported from Electrum', file=fp)
570 print(f'Name: {name:.20s}', file=fp)
571 print(f'Policy: {wallet.m} of {wallet.n}', file=fp)
572 print(f'Format: {wallet.txin_type.upper()}' , file=fp)
573
574 xpubs = []
575 for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()): # type: str, KeyStoreWithMPK
576 fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], only_der_suffix=False)
577 fp_hex = fp_bytes.hex().upper()
578 der_prefix_str = bip32.convert_bip32_intpath_to_strpath(der_full)
579 xpubs.append( (fp_hex, xpub, der_prefix_str) )
580
581 # Before v3.2.1 derivation didn't matter too much to the Coldcard, since it
582 # could use key path data from PSBT or USB request as needed. However,
583 # derivation data is now required.
584
585 print('', file=fp)
586
587 assert len(xpubs) == wallet.n
588 for xfp, xpub, der_prefix in xpubs:
589 print(f'Derivation: {der_prefix}', file=fp)
590 print(f'{xfp}: {xpub}\n', file=fp)
591
592 def show_address(self, wallet, address, keystore: 'Coldcard_KeyStore' = None):
593 if keystore is None:
594 keystore = wallet.get_keystore()
595 if not self.show_address_helper(wallet, address, keystore):
596 return
597
598 txin_type = wallet.get_txin_type(address)
599
600 # Standard_Wallet => not multisig, must be bip32
601 if type(wallet) is Standard_Wallet:
602 sequence = wallet.get_address_index(address)
603 keystore.show_address(sequence, txin_type)
604 elif type(wallet) is Multisig_Wallet:
605 assert isinstance(wallet, Multisig_Wallet) # only here for type-hints in IDE
606 # More involved for P2SH/P2WSH addresses: need M, and all public keys, and their
607 # derivation paths. Must construct script, and track fingerprints+paths for
608 # all those keys
609
610 pubkey_deriv_info = wallet.get_public_keys_with_deriv_info(address)
611 pubkey_hexes = sorted([pk.hex() for pk in list(pubkey_deriv_info)])
612 xfp_paths = []
613 for pubkey_hex in pubkey_hexes:
614 pubkey = bytes.fromhex(pubkey_hex)
615 ks, der_suffix = pubkey_deriv_info[pubkey]
616 fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix, only_der_suffix=False)
617 xfp_int = xfp_int_from_xfp_bytes(fp_bytes)
618 xfp_paths.append([xfp_int] + list(der_full))
619
620 script = bfh(wallet.pubkeys_to_scriptcode(pubkey_hexes))
621
622 keystore.show_p2sh_address(wallet.m, script, xfp_paths, txin_type)
623
624 else:
625 keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
626 return
627
628
629 def xfp_int_from_xfp_bytes(fp_bytes: bytes) -> int:
630 return int.from_bytes(fp_bytes, byteorder="little", signed=False)
631
632
633 def xfp2str(xfp: int) -> str:
634 # Standardized way to show an xpub's fingerprint... it's a 4-byte string
635 # and not really an integer. Used to show as '0x%08x' but that's wrong endian.
636 return struct.pack('<I', xfp).hex().lower()
637
638 # EOF