tclientbase.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tclientbase.py (10111B)
---
1 import time
2 from struct import pack
3 from typing import Optional
4
5 from electrum import ecc
6 from electrum.i18n import _
7 from electrum.util import UserCancelled
8 from electrum.keystore import bip39_normalize_passphrase
9 from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32
10 from electrum.logging import Logger
11 from electrum.plugin import runs_in_hwd_thread
12 from electrum.plugins.hw_wallet.plugin import HardwareClientBase, HardwareHandlerBase
13
14
15 class GuiMixin(object):
16 # Requires: self.proto, self.device
17 handler: Optional[HardwareHandlerBase]
18
19 # ref: https://github.com/trezor/trezor-common/blob/44dfb07cfaafffada4b2ce0d15ba1d90d17cf35e/protob/types.proto#L89
20 messages = {
21 3: _("Confirm the transaction output on your {} device"),
22 4: _("Confirm internal entropy on your {} device to begin"),
23 5: _("Write down the seed word shown on your {}"),
24 6: _("Confirm on your {} that you want to wipe it clean"),
25 7: _("Confirm on your {} device the message to sign"),
26 8: _("Confirm the total amount spent and the transaction fee on your "
27 "{} device"),
28 10: _("Confirm wallet address on your {} device"),
29 14: _("Choose on your {} device where to enter your passphrase"),
30 'default': _("Check your {} device to continue"),
31 }
32
33 def callback_Failure(self, msg):
34 # BaseClient's unfortunate call() implementation forces us to
35 # raise exceptions on failure in order to unwind the stack.
36 # However, making the user acknowledge they cancelled
37 # gets old very quickly, so we suppress those. The NotInitialized
38 # one is misnamed and indicates a passphrase request was cancelled.
39 if msg.code in (self.types.FailureType.PinCancelled,
40 self.types.FailureType.ActionCancelled,
41 self.types.FailureType.NotInitialized):
42 raise UserCancelled()
43 raise RuntimeError(msg.message)
44
45 def callback_ButtonRequest(self, msg):
46 message = self.msg
47 if not message:
48 message = self.messages.get(msg.code, self.messages['default'])
49 self.handler.show_message(message.format(self.device), self.cancel)
50 return self.proto.ButtonAck()
51
52 def callback_PinMatrixRequest(self, msg):
53 show_strength = True
54 if msg.type == 2:
55 msg = _("Enter a new PIN for your {}:")
56 elif msg.type == 3:
57 msg = (_("Re-enter the new PIN for your {}.\n\n"
58 "NOTE: the positions of the numbers have changed!"))
59 else:
60 msg = _("Enter your current {} PIN:")
61 show_strength = False
62 pin = self.handler.get_pin(msg.format(self.device), show_strength=show_strength)
63 if len(pin) > 9:
64 self.handler.show_error(_('The PIN cannot be longer than 9 characters.'))
65 pin = '' # to cancel below
66 if not pin:
67 return self.proto.Cancel()
68 return self.proto.PinMatrixAck(pin=pin)
69
70 def callback_PassphraseRequest(self, req):
71 if req and hasattr(req, 'on_device') and req.on_device is True:
72 return self.proto.PassphraseAck()
73
74 if self.creating_wallet:
75 msg = _("Enter a passphrase to generate this wallet. Each time "
76 "you use this wallet your {} will prompt you for the "
77 "passphrase. If you forget the passphrase you cannot "
78 "access the bitcoins in the wallet.").format(self.device)
79 else:
80 msg = _("Enter the passphrase to unlock this wallet:")
81 passphrase = self.handler.get_passphrase(msg, self.creating_wallet)
82 if passphrase is None:
83 return self.proto.Cancel()
84 passphrase = bip39_normalize_passphrase(passphrase)
85
86 ack = self.proto.PassphraseAck(passphrase=passphrase)
87 length = len(ack.passphrase)
88 if length > 50:
89 self.handler.show_error(_("Too long passphrase ({} > 50 chars).").format(length))
90 return self.proto.Cancel()
91 return ack
92
93 def callback_PassphraseStateRequest(self, msg):
94 return self.proto.PassphraseStateAck()
95
96 def callback_WordRequest(self, msg):
97 self.step += 1
98 msg = _("Step {}/24. Enter seed word as explained on "
99 "your {}:").format(self.step, self.device)
100 word = self.handler.get_word(msg)
101 # Unfortunately the device can't handle self.proto.Cancel()
102 return self.proto.WordAck(word=word)
103
104
105 class SafeTClientBase(HardwareClientBase, GuiMixin, Logger):
106
107 def __init__(self, handler, plugin, proto):
108 assert hasattr(self, 'tx_api') # ProtocolMixin already constructed?
109 HardwareClientBase.__init__(self, plugin=plugin)
110 self.proto = proto
111 self.device = plugin.device
112 self.handler = handler
113 self.tx_api = plugin
114 self.types = plugin.types
115 self.msg = None
116 self.creating_wallet = False
117 Logger.__init__(self)
118 self.used()
119
120 def __str__(self):
121 return "%s/%s" % (self.label(), self.features.device_id)
122
123 def label(self):
124 return self.features.label
125
126 def get_soft_device_id(self):
127 return self.features.device_id
128
129 def is_initialized(self):
130 return self.features.initialized
131
132 def is_pairable(self):
133 return not self.features.bootloader_mode
134
135 @runs_in_hwd_thread
136 def has_usable_connection_with_device(self):
137 try:
138 res = self.ping("electrum pinging device")
139 assert res == "electrum pinging device"
140 except BaseException:
141 return False
142 return True
143
144 def used(self):
145 self.last_operation = time.time()
146
147 def prevent_timeouts(self):
148 self.last_operation = float('inf')
149
150 @runs_in_hwd_thread
151 def timeout(self, cutoff):
152 '''Time out the client if the last operation was before cutoff.'''
153 if self.last_operation < cutoff:
154 self.logger.info("timed out")
155 self.clear_session()
156
157 @staticmethod
158 def expand_path(n):
159 return convert_bip32_path_to_list_of_uint32(n)
160
161 @runs_in_hwd_thread
162 def cancel(self):
163 '''Provided here as in keepkeylib but not safetlib.'''
164 self.transport.write(self.proto.Cancel())
165
166 def i4b(self, x):
167 return pack('>I', x)
168
169 @runs_in_hwd_thread
170 def get_xpub(self, bip32_path, xtype):
171 address_n = self.expand_path(bip32_path)
172 creating = False
173 node = self.get_public_node(address_n, creating).node
174 return BIP32Node(xtype=xtype,
175 eckey=ecc.ECPubkey(node.public_key),
176 chaincode=node.chain_code,
177 depth=node.depth,
178 fingerprint=self.i4b(node.fingerprint),
179 child_number=self.i4b(node.child_num)).to_xpub()
180
181 @runs_in_hwd_thread
182 def toggle_passphrase(self):
183 if self.features.passphrase_protection:
184 self.msg = _("Confirm on your {} device to disable passphrases")
185 else:
186 self.msg = _("Confirm on your {} device to enable passphrases")
187 enabled = not self.features.passphrase_protection
188 self.apply_settings(use_passphrase=enabled)
189
190 @runs_in_hwd_thread
191 def change_label(self, label):
192 self.msg = _("Confirm the new label on your {} device")
193 self.apply_settings(label=label)
194
195 @runs_in_hwd_thread
196 def change_homescreen(self, homescreen):
197 self.msg = _("Confirm on your {} device to change your home screen")
198 self.apply_settings(homescreen=homescreen)
199
200 @runs_in_hwd_thread
201 def set_pin(self, remove):
202 if remove:
203 self.msg = _("Confirm on your {} device to disable PIN protection")
204 elif self.features.pin_protection:
205 self.msg = _("Confirm on your {} device to change your PIN")
206 else:
207 self.msg = _("Confirm on your {} device to set a PIN")
208 self.change_pin(remove)
209
210 @runs_in_hwd_thread
211 def clear_session(self):
212 '''Clear the session to force pin (and passphrase if enabled)
213 re-entry. Does not leak exceptions.'''
214 self.logger.info(f"clear session: {self}")
215 self.prevent_timeouts()
216 try:
217 super(SafeTClientBase, self).clear_session()
218 except BaseException as e:
219 # If the device was removed it has the same effect...
220 self.logger.info(f"clear_session: ignoring error {e}")
221
222 @runs_in_hwd_thread
223 def get_public_node(self, address_n, creating):
224 self.creating_wallet = creating
225 return super(SafeTClientBase, self).get_public_node(address_n)
226
227 @runs_in_hwd_thread
228 def close(self):
229 '''Called when Our wallet was closed or the device removed.'''
230 self.logger.info("closing client")
231 self.clear_session()
232 # Release the device
233 self.transport.close()
234
235 def firmware_version(self):
236 f = self.features
237 return (f.major_version, f.minor_version, f.patch_version)
238
239 def atleast_version(self, major, minor=0, patch=0):
240 return self.firmware_version() >= (major, minor, patch)
241
242 @staticmethod
243 def wrapper(func):
244 '''Wrap methods to clear any message box they opened.'''
245
246 def wrapped(self, *args, **kwargs):
247 try:
248 self.prevent_timeouts()
249 return func(self, *args, **kwargs)
250 finally:
251 self.used()
252 self.handler.finished()
253 self.creating_wallet = False
254 self.msg = None
255
256 return wrapped
257
258 @staticmethod
259 def wrap_methods(cls):
260 for method in ['apply_settings', 'change_pin',
261 'get_address', 'get_public_node',
262 'load_device_by_mnemonic', 'load_device_by_xprv',
263 'recovery_device', 'reset_device', 'sign_message',
264 'sign_tx', 'wipe_device']:
265 setattr(cls, method, cls.wrapper(getattr(cls, method)))