tdigitalbitbox.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tdigitalbitbox.py (31895B)
---
1 # ----------------------------------------------------------------------------------
2 # Electrum plugin for the Digital Bitbox hardware wallet by Shift Devices AG
3 # digitalbitbox.com
4 #
5
6 import base64
7 import binascii
8 import hashlib
9 import hmac
10 import json
11 import math
12 import os
13 import re
14 import struct
15 import sys
16 import time
17 import copy
18
19 from electrum.crypto import sha256d, EncodeAES_base64, EncodeAES_bytes, DecodeAES_bytes, hmac_oneshot
20 from electrum.bitcoin import public_key_to_p2pkh
21 from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath, is_all_public_derivation
22 from electrum import ecc
23 from electrum.ecc import msg_magic
24 from electrum.wallet import Standard_Wallet
25 from electrum import constants
26 from electrum.transaction import Transaction, PartialTransaction, PartialTxInput
27 from electrum.i18n import _
28 from electrum.keystore import Hardware_KeyStore
29 from electrum.util import to_string, UserCancelled, UserFacingException, bfh
30 from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET
31 from electrum.network import Network
32 from electrum.logging import get_logger
33 from electrum.plugin import runs_in_hwd_thread, run_in_hwd_thread
34
35 from ..hw_wallet import HW_PluginBase, HardwareClientBase
36
37
38 _logger = get_logger(__name__)
39
40
41 try:
42 import hid
43 DIGIBOX = True
44 except ImportError as e:
45 DIGIBOX = False
46
47
48
49 # ----------------------------------------------------------------------------------
50 # USB HID interface
51 #
52
53 def to_hexstr(s):
54 return binascii.hexlify(s).decode('ascii')
55
56
57 def derive_keys(x):
58 h = sha256d(x)
59 h = hashlib.sha512(h).digest()
60 return (h[:32],h[32:])
61
62 MIN_MAJOR_VERSION = 5
63
64 ENCRYPTION_PRIVKEY_KEY = 'encryptionprivkey'
65 CHANNEL_ID_KEY = 'comserverchannelid'
66
67 class DigitalBitbox_Client(HardwareClientBase):
68
69 def __init__(self, plugin, hidDevice):
70 HardwareClientBase.__init__(self, plugin=plugin)
71 self.dbb_hid = hidDevice
72 self.opened = True
73 self.password = None
74 self.isInitialized = False
75 self.setupRunning = False
76 self.usbReportSize = 64 # firmware > v2.0.0
77
78 @runs_in_hwd_thread
79 def close(self):
80 if self.opened:
81 try:
82 self.dbb_hid.close()
83 except:
84 pass
85 self.opened = False
86
87
88 def is_pairable(self):
89 return True
90
91
92 def is_initialized(self):
93 return self.dbb_has_password()
94
95
96 def is_paired(self):
97 return self.password is not None
98
99 def has_usable_connection_with_device(self):
100 try:
101 self.dbb_has_password()
102 except BaseException:
103 return False
104 return True
105
106 def _get_xpub(self, bip32_path):
107 if self.check_device_dialog():
108 return self.hid_send_encrypt(('{"xpub": "%s"}' % bip32_path).encode('utf8'))
109
110 def get_xpub(self, bip32_path, xtype):
111 assert xtype in self.plugin.SUPPORTED_XTYPES
112 reply = self._get_xpub(bip32_path)
113 if reply:
114 xpub = reply['xpub']
115 # Change type of xpub to the requested type. The firmware
116 # only ever returns the mainnet standard type, but it is agnostic
117 # to the type when signing.
118 if xtype != 'standard' or constants.net.TESTNET:
119 node = BIP32Node.from_xkey(xpub, net=constants.BitcoinMainnet)
120 xpub = node._replace(xtype=xtype).to_xpub()
121 return xpub
122 else:
123 raise Exception('no reply')
124
125 def dbb_has_password(self):
126 reply = self.hid_send_plain(b'{"ping":""}')
127 if 'ping' not in reply:
128 raise UserFacingException(_('Device communication error. Please unplug and replug your Digital Bitbox.'))
129 if reply['ping'] == 'password':
130 return True
131 return False
132
133
134 def stretch_key(self, key: bytes):
135 return to_hexstr(hashlib.pbkdf2_hmac('sha512', key, b'Digital Bitbox', iterations = 20480))
136
137
138 def backup_password_dialog(self):
139 msg = _("Enter the password used when the backup was created:")
140 while True:
141 password = self.handler.get_passphrase(msg, False)
142 if password is None:
143 return None
144 if len(password) < 4:
145 msg = _("Password must have at least 4 characters.") \
146 + "\n\n" + _("Enter password:")
147 elif len(password) > 64:
148 msg = _("Password must have less than 64 characters.") \
149 + "\n\n" + _("Enter password:")
150 else:
151 return password.encode('utf8')
152
153
154 def password_dialog(self, msg):
155 while True:
156 password = self.handler.get_passphrase(msg, False)
157 if password is None:
158 return False
159 if len(password) < 4:
160 msg = _("Password must have at least 4 characters.") + \
161 "\n\n" + _("Enter password:")
162 elif len(password) > 64:
163 msg = _("Password must have less than 64 characters.") + \
164 "\n\n" + _("Enter password:")
165 else:
166 self.password = password.encode('utf8')
167 return True
168
169 def check_device_dialog(self):
170 match = re.search(r'v([0-9])+\.[0-9]+\.[0-9]+',
171 run_in_hwd_thread(self.dbb_hid.get_serial_number_string))
172 if match is None:
173 raise Exception("error detecting firmware version")
174 major_version = int(match.group(1))
175 if major_version < MIN_MAJOR_VERSION:
176 raise Exception("Please upgrade to the newest firmware using the BitBox Desktop app: https://shiftcrypto.ch/start")
177 # Set password if fresh device
178 if self.password is None and not self.dbb_has_password():
179 if not self.setupRunning:
180 return False # A fresh device cannot connect to an existing wallet
181 msg = _("An uninitialized Digital Bitbox is detected.") + " " + \
182 _("Enter a new password below.") + "\n\n" + \
183 _("REMEMBER THE PASSWORD!") + "\n\n" + \
184 _("You cannot access your coins or a backup without the password.") + "\n" + \
185 _("A backup is saved automatically when generating a new wallet.")
186 if self.password_dialog(msg):
187 reply = self.hid_send_plain(b'{"password":"' + self.password + b'"}')
188 else:
189 return False
190
191 # Get password from user if not yet set
192 msg = _("Enter your Digital Bitbox password:")
193 while self.password is None:
194 if not self.password_dialog(msg):
195 raise UserCancelled()
196 reply = self.hid_send_encrypt(b'{"led":"blink"}')
197 if 'error' in reply:
198 self.password = None
199 if reply['error']['code'] == 109:
200 msg = _("Incorrect password entered.") + "\n\n" + \
201 reply['error']['message'] + "\n\n" + \
202 _("Enter your Digital Bitbox password:")
203 else:
204 # Should never occur
205 msg = _("Unexpected error occurred.") + "\n\n" + \
206 reply['error']['message'] + "\n\n" + \
207 _("Enter your Digital Bitbox password:")
208
209 # Initialize device if not yet initialized
210 if not self.setupRunning:
211 self.isInitialized = True # Wallet exists. Electrum code later checks if the device matches the wallet
212 elif not self.isInitialized:
213 reply = self.hid_send_encrypt(b'{"device":"info"}')
214 if reply['device']['id'] != "":
215 self.recover_or_erase_dialog() # Already seeded
216 else:
217 self.seed_device_dialog() # Seed if not initialized
218 self.mobile_pairing_dialog()
219 return self.isInitialized
220
221
222 def recover_or_erase_dialog(self):
223 msg = _("The Digital Bitbox is already seeded. Choose an option:") + "\n"
224 choices = [
225 (_("Create a wallet using the current seed")),
226 (_("Load a wallet from the micro SD card (the current seed is overwritten)")),
227 (_("Erase the Digital Bitbox"))
228 ]
229 reply = self.handler.query_choice(msg, choices)
230 if reply is None:
231 return # user cancelled
232 if reply == 2:
233 self.dbb_erase()
234 elif reply == 1:
235 if not self.dbb_load_backup():
236 return
237 else:
238 if self.hid_send_encrypt(b'{"device":"info"}')['device']['lock']:
239 raise UserFacingException(_("Full 2FA enabled. This is not supported yet."))
240 # Use existing seed
241 self.isInitialized = True
242
243
244 def seed_device_dialog(self):
245 msg = _("Choose how to initialize your Digital Bitbox:") + "\n"
246 choices = [
247 (_("Generate a new random wallet")),
248 (_("Load a wallet from the micro SD card"))
249 ]
250 reply = self.handler.query_choice(msg, choices)
251 if reply is None:
252 return # user cancelled
253 if reply == 0:
254 self.dbb_generate_wallet()
255 else:
256 if not self.dbb_load_backup(show_msg=False):
257 return
258 self.isInitialized = True
259
260 def mobile_pairing_dialog(self):
261 dbb_user_dir = None
262 if sys.platform == 'darwin':
263 dbb_user_dir = os.path.join(os.environ.get("HOME", ""), "Library", "Application Support", "DBB")
264 elif sys.platform == 'win32':
265 dbb_user_dir = os.path.join(os.environ["APPDATA"], "DBB")
266 else:
267 dbb_user_dir = os.path.join(os.environ["HOME"], ".dbb")
268
269 if not dbb_user_dir:
270 return
271
272 try:
273 # Python 3.5+
274 jsonDecodeError = json.JSONDecodeError
275 except AttributeError:
276 jsonDecodeError = ValueError
277 try:
278 with open(os.path.join(dbb_user_dir, "config.dat")) as f:
279 dbb_config = json.load(f)
280 except (FileNotFoundError, jsonDecodeError):
281 return
282
283 if ENCRYPTION_PRIVKEY_KEY not in dbb_config or CHANNEL_ID_KEY not in dbb_config:
284 return
285
286 choices = [
287 _('Do not pair'),
288 _('Import pairing from the Digital Bitbox desktop app'),
289 ]
290 reply = self.handler.query_choice(_('Mobile pairing options'), choices)
291 if reply is None:
292 return # user cancelled
293
294 if reply == 0:
295 if self.plugin.is_mobile_paired():
296 del self.plugin.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY]
297 del self.plugin.digitalbitbox_config[CHANNEL_ID_KEY]
298 elif reply == 1:
299 # import pairing from dbb app
300 self.plugin.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY] = dbb_config[ENCRYPTION_PRIVKEY_KEY]
301 self.plugin.digitalbitbox_config[CHANNEL_ID_KEY] = dbb_config[CHANNEL_ID_KEY]
302 self.plugin.config.set_key('digitalbitbox', self.plugin.digitalbitbox_config)
303
304 def dbb_generate_wallet(self):
305 key = self.stretch_key(self.password)
306 filename = ("Electrum-" + time.strftime("%Y-%m-%d-%H-%M-%S") + ".pdf")
307 msg = ('{"seed":{"source": "create", "key": "%s", "filename": "%s", "entropy": "%s"}}' % (key, filename, to_hexstr(os.urandom(32)))).encode('utf8')
308 reply = self.hid_send_encrypt(msg)
309 if 'error' in reply:
310 raise UserFacingException(reply['error']['message'])
311
312
313 def dbb_erase(self):
314 self.handler.show_message(_("Are you sure you want to erase the Digital Bitbox?") + "\n\n" +
315 _("To continue, touch the Digital Bitbox's light for 3 seconds.") + "\n\n" +
316 _("To cancel, briefly touch the light or wait for the timeout."))
317 hid_reply = self.hid_send_encrypt(b'{"reset":"__ERASE__"}')
318 self.handler.finished()
319 if 'error' in hid_reply:
320 raise UserFacingException(hid_reply['error']['message'])
321 else:
322 self.password = None
323 raise UserFacingException('Device erased')
324
325
326 def dbb_load_backup(self, show_msg=True):
327 backups = self.hid_send_encrypt(b'{"backup":"list"}')
328 if 'error' in backups:
329 raise UserFacingException(backups['error']['message'])
330 f = self.handler.query_choice(_("Choose a backup file:"), backups['backup'])
331 if f is None:
332 return False # user cancelled
333 key = self.backup_password_dialog()
334 if key is None:
335 raise Exception('Canceled by user')
336 key = self.stretch_key(key)
337 if show_msg:
338 self.handler.show_message(_("Loading backup...") + "\n\n" +
339 _("To continue, touch the Digital Bitbox's light for 3 seconds.") + "\n\n" +
340 _("To cancel, briefly touch the light or wait for the timeout."))
341 msg = ('{"seed":{"source": "backup", "key": "%s", "filename": "%s"}}' % (key, backups['backup'][f])).encode('utf8')
342 hid_reply = self.hid_send_encrypt(msg)
343 self.handler.finished()
344 if 'error' in hid_reply:
345 raise UserFacingException(hid_reply['error']['message'])
346 return True
347
348 @runs_in_hwd_thread
349 def hid_send_frame(self, data):
350 HWW_CID = 0xFF000000
351 HWW_CMD = 0x80 + 0x40 + 0x01
352 data_len = len(data)
353 seq = 0;
354 idx = 0;
355 write = []
356 while idx < data_len:
357 if idx == 0:
358 # INIT frame
359 write = data[idx : idx + min(data_len, self.usbReportSize - 7)]
360 self.dbb_hid.write(b'\0' + struct.pack(">IBH", HWW_CID, HWW_CMD, data_len & 0xFFFF) + write + b'\xEE' * (self.usbReportSize - 7 - len(write)))
361 else:
362 # CONT frame
363 write = data[idx : idx + min(data_len, self.usbReportSize - 5)]
364 self.dbb_hid.write(b'\0' + struct.pack(">IB", HWW_CID, seq) + write + b'\xEE' * (self.usbReportSize - 5 - len(write)))
365 seq += 1
366 idx += len(write)
367
368 @runs_in_hwd_thread
369 def hid_read_frame(self):
370 # INIT response
371 read = bytearray(self.dbb_hid.read(self.usbReportSize))
372 cid = ((read[0] * 256 + read[1]) * 256 + read[2]) * 256 + read[3]
373 cmd = read[4]
374 data_len = read[5] * 256 + read[6]
375 data = read[7:]
376 idx = len(read) - 7;
377 while idx < data_len:
378 # CONT response
379 read = bytearray(self.dbb_hid.read(self.usbReportSize))
380 data += read[5:]
381 idx += len(read) - 5
382 return data
383
384 @runs_in_hwd_thread
385 def hid_send_plain(self, msg):
386 reply = ""
387 try:
388 serial_number = self.dbb_hid.get_serial_number_string()
389 if "v2.0." in serial_number or "v1." in serial_number:
390 hidBufSize = 4096
391 self.dbb_hid.write('\0' + msg + '\0' * (hidBufSize - len(msg)))
392 r = bytearray()
393 while len(r) < hidBufSize:
394 r += bytearray(self.dbb_hid.read(hidBufSize))
395 else:
396 self.hid_send_frame(msg)
397 r = self.hid_read_frame()
398 r = r.rstrip(b' \t\r\n\0')
399 r = r.replace(b"\0", b'')
400 r = to_string(r, 'utf8')
401 reply = json.loads(r)
402 except Exception as e:
403 _logger.info(f'Exception caught {repr(e)}')
404 return reply
405
406 @runs_in_hwd_thread
407 def hid_send_encrypt(self, msg):
408 sha256_byte_len = 32
409 reply = ""
410 try:
411 encryption_key, authentication_key = derive_keys(self.password)
412 msg = EncodeAES_bytes(encryption_key, msg)
413 hmac_digest = hmac_oneshot(authentication_key, msg, hashlib.sha256)
414 authenticated_msg = base64.b64encode(msg + hmac_digest)
415 reply = self.hid_send_plain(authenticated_msg)
416 if 'ciphertext' in reply:
417 b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"])))
418 reply_hmac = b64_unencoded[-sha256_byte_len:]
419 hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256)
420 if not hmac.compare_digest(reply_hmac, hmac_calculated):
421 raise Exception("Failed to validate HMAC")
422 reply = DecodeAES_bytes(encryption_key, b64_unencoded[:-sha256_byte_len])
423 reply = to_string(reply, 'utf8')
424 reply = json.loads(reply)
425 if 'error' in reply:
426 self.password = None
427 except Exception as e:
428 _logger.info(f'Exception caught {repr(e)}')
429 return reply
430
431
432
433 # ----------------------------------------------------------------------------------
434 #
435 #
436
437 class DigitalBitbox_KeyStore(Hardware_KeyStore):
438 hw_type = 'digitalbitbox'
439 device = 'DigitalBitbox'
440
441 plugin: 'DigitalBitboxPlugin'
442
443 def __init__(self, d):
444 Hardware_KeyStore.__init__(self, d)
445 self.force_watching_only = False
446 self.maxInputs = 14 # maximum inputs per single sign command
447
448 def give_error(self, message, clear_client = False):
449 if clear_client:
450 self.client = None
451 raise Exception(message)
452
453
454 def decrypt_message(self, pubkey, message, password):
455 raise RuntimeError(_('Encryption and decryption are currently not supported for {}').format(self.device))
456
457
458 def sign_message(self, sequence, message, password):
459 sig = None
460 try:
461 message = message.encode('utf8')
462 inputPath = self.get_derivation_prefix() + "/%d/%d" % sequence
463 msg_hash = sha256d(msg_magic(message))
464 inputHash = to_hexstr(msg_hash)
465 hasharray = []
466 hasharray.append({'hash': inputHash, 'keypath': inputPath})
467 hasharray = json.dumps(hasharray)
468
469 msg = ('{"sign":{"meta":"sign message", "data":%s}}' % hasharray).encode('utf8')
470
471 dbb_client = self.plugin.get_client(self)
472
473 if not dbb_client.is_paired():
474 raise Exception(_("Could not sign message."))
475
476 reply = dbb_client.hid_send_encrypt(msg)
477 self.handler.show_message(_("Signing message ...") + "\n\n" +
478 _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + "\n\n" +
479 _("To cancel, briefly touch the blinking light or wait for the timeout."))
480 reply = dbb_client.hid_send_encrypt(msg) # Send twice, first returns an echo for smart verification (not implemented)
481 self.handler.finished()
482
483 if 'error' in reply:
484 raise Exception(reply['error']['message'])
485
486 if 'sign' not in reply:
487 raise Exception(_("Could not sign message."))
488
489 if 'recid' in reply['sign'][0]:
490 # firmware > v2.1.1
491 sig_string = binascii.unhexlify(reply['sign'][0]['sig'])
492 recid = int(reply['sign'][0]['recid'], 16)
493 sig = ecc.construct_sig65(sig_string, recid, True)
494 pubkey, compressed = ecc.ECPubkey.from_signature65(sig, msg_hash)
495 addr = public_key_to_p2pkh(pubkey.get_public_key_bytes(compressed=compressed))
496 if ecc.verify_message_with_address(addr, sig, message) is False:
497 raise Exception(_("Could not sign message"))
498 elif 'pubkey' in reply['sign'][0]:
499 # firmware <= v2.1.1
500 for recid in range(4):
501 sig_string = binascii.unhexlify(reply['sign'][0]['sig'])
502 sig = ecc.construct_sig65(sig_string, recid, True)
503 try:
504 addr = public_key_to_p2pkh(binascii.unhexlify(reply['sign'][0]['pubkey']))
505 if ecc.verify_message_with_address(addr, sig, message):
506 break
507 except Exception:
508 continue
509 else:
510 raise Exception(_("Could not sign message"))
511
512
513 except BaseException as e:
514 self.give_error(e)
515 return sig
516
517
518 def sign_transaction(self, tx, password):
519 if tx.is_complete():
520 return
521
522 try:
523 p2pkhTransaction = True
524 inputhasharray = []
525 hasharray = []
526 pubkeyarray = []
527
528 # Build hasharray from inputs
529 for i, txin in enumerate(tx.inputs()):
530 if txin.is_coinbase_input():
531 self.give_error("Coinbase not supported") # should never happen
532
533 if txin.script_type != 'p2pkh':
534 p2pkhTransaction = False
535
536 my_pubkey, inputPath = self.find_my_pubkey_in_txinout(txin)
537 if not inputPath:
538 self.give_error("No matching pubkey for sign_transaction") # should never happen
539 inputPath = convert_bip32_intpath_to_strpath(inputPath)
540 inputHash = sha256d(bfh(tx.serialize_preimage(i)))
541 hasharray_i = {'hash': to_hexstr(inputHash), 'keypath': inputPath}
542 hasharray.append(hasharray_i)
543 inputhasharray.append(inputHash)
544
545 # Build pubkeyarray from outputs
546 for txout in tx.outputs():
547 assert txout.address
548 if txout.is_change:
549 changePubkey, changePath = self.find_my_pubkey_in_txinout(txout)
550 assert changePath
551 changePath = convert_bip32_intpath_to_strpath(changePath)
552 changePubkey = changePubkey.hex()
553 pubkeyarray_i = {'pubkey': changePubkey, 'keypath': changePath}
554 pubkeyarray.append(pubkeyarray_i)
555
556 # Special serialization of the unsigned transaction for
557 # the mobile verification app.
558 # At the moment, verification only works for p2pkh transactions.
559 if p2pkhTransaction:
560 tx_copy = copy.deepcopy(tx)
561 # monkey-patch method of tx_copy instance to change serialization
562 def input_script(self, txin: PartialTxInput, *, estimate_size=False):
563 if txin.script_type == 'p2pkh':
564 return Transaction.get_preimage_script(txin)
565 raise Exception("unsupported type %s" % txin.script_type)
566 tx_copy.input_script = input_script.__get__(tx_copy, PartialTransaction)
567 tx_dbb_serialized = tx_copy.serialize_to_network()
568 else:
569 # We only need this for the signing echo / verification.
570 tx_dbb_serialized = None
571
572 # Build sign command
573 dbb_signatures = []
574 steps = math.ceil(1.0 * len(hasharray) / self.maxInputs)
575 for step in range(int(steps)):
576 hashes = hasharray[step * self.maxInputs : (step + 1) * self.maxInputs]
577
578 msg = {
579 "sign": {
580 "data": hashes,
581 "checkpub": pubkeyarray,
582 },
583 }
584 if tx_dbb_serialized is not None:
585 msg["sign"]["meta"] = to_hexstr(sha256d(tx_dbb_serialized))
586 msg = json.dumps(msg).encode('ascii')
587 dbb_client = self.plugin.get_client(self)
588
589 if not dbb_client.is_paired():
590 raise Exception("Could not sign transaction.")
591
592 reply = dbb_client.hid_send_encrypt(msg)
593 if 'error' in reply:
594 raise Exception(reply['error']['message'])
595
596 if 'echo' not in reply:
597 raise Exception("Could not sign transaction.")
598
599 if self.plugin.is_mobile_paired() and tx_dbb_serialized is not None:
600 reply['tx'] = tx_dbb_serialized
601 self.plugin.comserver_post_notification(reply)
602
603 if steps > 1:
604 self.handler.show_message(_("Signing large transaction. Please be patient ...") + "\n\n" +
605 _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + " " +
606 _("(Touch {} of {})").format((step + 1), steps) + "\n\n" +
607 _("To cancel, briefly touch the blinking light or wait for the timeout.") + "\n\n")
608 else:
609 self.handler.show_message(_("Signing transaction...") + "\n\n" +
610 _("To continue, touch the Digital Bitbox's blinking light for 3 seconds.") + "\n\n" +
611 _("To cancel, briefly touch the blinking light or wait for the timeout."))
612
613 # Send twice, first returns an echo for smart verification
614 reply = dbb_client.hid_send_encrypt(msg)
615 self.handler.finished()
616
617 if 'error' in reply:
618 if reply["error"].get('code') in (600, 601):
619 # aborted via LED short touch or timeout
620 raise UserCancelled()
621 raise Exception(reply['error']['message'])
622
623 if 'sign' not in reply:
624 raise Exception("Could not sign transaction.")
625
626 dbb_signatures.extend(reply['sign'])
627
628 # Fill signatures
629 if len(dbb_signatures) != len(tx.inputs()):
630 raise Exception("Incorrect number of transactions signed.") # Should never occur
631 for i, txin in enumerate(tx.inputs()):
632 for pubkey_bytes in txin.pubkeys:
633 if txin.is_complete():
634 break
635 signed = dbb_signatures[i]
636 if 'recid' in signed:
637 # firmware > v2.1.1
638 recid = int(signed['recid'], 16)
639 s = binascii.unhexlify(signed['sig'])
640 h = inputhasharray[i]
641 pk = ecc.ECPubkey.from_sig_string(s, recid, h)
642 pk = pk.get_public_key_hex(compressed=True)
643 elif 'pubkey' in signed:
644 # firmware <= v2.1.1
645 pk = signed['pubkey']
646 if pk != pubkey_bytes.hex():
647 continue
648 sig_r = int(signed['sig'][:64], 16)
649 sig_s = int(signed['sig'][64:], 16)
650 sig = ecc.der_sig_from_r_and_s(sig_r, sig_s)
651 sig = to_hexstr(sig) + '01'
652 tx.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_bytes.hex(), sig=sig)
653 except UserCancelled:
654 raise
655 except BaseException as e:
656 self.give_error(e, True)
657 else:
658 _logger.info(f"Transaction is_complete {tx.is_complete()}")
659
660
661 class DigitalBitboxPlugin(HW_PluginBase):
662
663 libraries_available = DIGIBOX
664 keystore_class = DigitalBitbox_KeyStore
665 client = None
666 DEVICE_IDS = [
667 (0x03eb, 0x2402) # Digital Bitbox
668 ]
669 SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
670
671 def __init__(self, parent, config, name):
672 HW_PluginBase.__init__(self, parent, config, name)
673 if self.libraries_available:
674 self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
675
676 self.digitalbitbox_config = self.config.get('digitalbitbox', {})
677
678 @runs_in_hwd_thread
679 def get_dbb_device(self, device):
680 dev = hid.device()
681 dev.open_path(device.path)
682 return dev
683
684
685 def create_client(self, device, handler):
686 if device.interface_number == 0 or device.usage_page == 0xffff:
687 if handler:
688 self.handler = handler
689 client = self.get_dbb_device(device)
690 if client is not None:
691 client = DigitalBitbox_Client(self, client)
692 return client
693 else:
694 return None
695
696
697 def setup_device(self, device_info, wizard, purpose):
698 device_id = device_info.device.id_
699 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
700 if purpose == HWD_SETUP_NEW_WALLET:
701 client.setupRunning = True
702 wizard.run_task_without_blocking_gui(
703 task=lambda: client.get_xpub("m/44'/0'", 'standard'))
704 return client
705
706
707 def is_mobile_paired(self):
708 return ENCRYPTION_PRIVKEY_KEY in self.digitalbitbox_config
709
710
711 def comserver_post_notification(self, payload):
712 assert self.is_mobile_paired(), "unexpected mobile pairing error"
713 url = 'https://digitalbitbox.com/smartverification/index.php'
714 key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY])
715 args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % (
716 self.digitalbitbox_config[CHANNEL_ID_KEY],
717 EncodeAES_base64(key_s, json.dumps(payload).encode('ascii')).decode('ascii'),
718 )
719 try:
720 text = Network.send_http_on_proxy('post', url, body=args.encode('ascii'), headers={'content-type': 'application/x-www-form-urlencoded'})
721 _logger.info(f'digitalbitbox reply from server {text}')
722 except Exception as e:
723 self.handler.show_error(repr(e)) # repr because str(Exception()) == ''
724
725
726 def get_xpub(self, device_id, derivation, xtype, wizard):
727 if xtype not in self.SUPPORTED_XTYPES:
728 raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
729 if is_all_public_derivation(derivation):
730 raise Exception(f"The {self.device} does not reveal xpubs corresponding to non-hardened paths. (path: {derivation})")
731 client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
732 client.check_device_dialog()
733 xpub = client.get_xpub(derivation, xtype)
734 return xpub
735
736
737 def get_client(self, keystore, force_pair=True, *,
738 devices=None, allow_user_interaction=True):
739 client = super().get_client(keystore, force_pair,
740 devices=devices,
741 allow_user_interaction=allow_user_interaction)
742 if client is not None:
743 client.check_device_dialog()
744 return client
745
746 def show_address(self, wallet, address, keystore=None):
747 if keystore is None:
748 keystore = wallet.get_keystore()
749 if not self.show_address_helper(wallet, address, keystore):
750 return
751 if type(wallet) is not Standard_Wallet:
752 keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
753 return
754 if not self.is_mobile_paired():
755 keystore.handler.show_error(_('This function is only available after pairing your {} with a mobile device.').format(self.device))
756 return
757 if wallet.get_txin_type(address) != 'p2pkh':
758 keystore.handler.show_error(_('This function is only available for p2pkh keystores when using {}.').format(self.device))
759 return
760 change, index = wallet.get_address_index(address)
761 keypath = '%s/%d/%d' % (keystore.get_derivation_prefix(), change, index)
762 xpub = self.get_client(keystore)._get_xpub(keypath)
763 verify_request_payload = {
764 "type": 'p2pkh',
765 "echo": xpub['echo'],
766 }
767 self.comserver_post_notification(verify_request_payload)