tstorage.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tstorage.py (7860B)
---
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2015 Thomas Voegtlin
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation files
8 # (the "Software"), to deal in the Software without restriction,
9 # including without limitation the rights to use, copy, modify, merge,
10 # publish, distribute, sublicense, and/or sell copies of the Software,
11 # and to permit persons to whom the Software is furnished to do so,
12 # subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25 import os
26 import threading
27 import stat
28 import hashlib
29 import base64
30 import zlib
31 from enum import IntEnum
32
33 from . import ecc
34 from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path,
35 test_read_write_permissions)
36
37 from .wallet_db import WalletDB
38 from .logging import Logger
39
40
41 def get_derivation_used_for_hw_device_encryption():
42 return ("m"
43 "/4541509'" # ascii 'ELE' as decimal ("BIP43 purpose")
44 "/1112098098'") # ascii 'BIE2' as decimal
45
46
47 class StorageEncryptionVersion(IntEnum):
48 PLAINTEXT = 0
49 USER_PASSWORD = 1
50 XPUB_PASSWORD = 2
51
52
53 class StorageReadWriteError(Exception): pass
54
55
56 # TODO: Rename to Storage
57 class WalletStorage(Logger):
58
59 def __init__(self, path):
60 Logger.__init__(self)
61 self.path = standardize_path(path)
62 self._file_exists = bool(self.path and os.path.exists(self.path))
63 self.logger.info(f"wallet path {self.path}")
64 self.pubkey = None
65 self.decrypted = ''
66 try:
67 test_read_write_permissions(self.path)
68 except IOError as e:
69 raise StorageReadWriteError(e) from e
70 if self.file_exists():
71 with open(self.path, "r", encoding='utf-8') as f:
72 self.raw = f.read()
73 self._encryption_version = self._init_encryption_version()
74 else:
75 self.raw = ''
76 self._encryption_version = StorageEncryptionVersion.PLAINTEXT
77
78 def read(self):
79 return self.decrypted if self.is_encrypted() else self.raw
80
81 def write(self, data: str) -> None:
82 s = self.encrypt_before_writing(data)
83 temp_path = "%s.tmp.%s" % (self.path, os.getpid())
84 with open(temp_path, "w", encoding='utf-8') as f:
85 f.write(s)
86 f.flush()
87 os.fsync(f.fileno())
88
89 try:
90 mode = os.stat(self.path).st_mode
91 except FileNotFoundError:
92 mode = stat.S_IREAD | stat.S_IWRITE
93
94 # assert that wallet file does not exist, to prevent wallet corruption (see issue #5082)
95 if not self.file_exists():
96 assert not os.path.exists(self.path)
97 os.replace(temp_path, self.path)
98 os.chmod(self.path, mode)
99 self._file_exists = True
100 self.logger.info(f"saved {self.path}")
101
102 def file_exists(self) -> bool:
103 return self._file_exists
104
105 def is_past_initial_decryption(self):
106 """Return if storage is in a usable state for normal operations.
107
108 The value is True exactly
109 if encryption is disabled completely (self.is_encrypted() == False),
110 or if encryption is enabled but the contents have already been decrypted.
111 """
112 return not self.is_encrypted() or bool(self.pubkey)
113
114 def is_encrypted(self):
115 """Return if storage encryption is currently enabled."""
116 return self.get_encryption_version() != StorageEncryptionVersion.PLAINTEXT
117
118 def is_encrypted_with_user_pw(self):
119 return self.get_encryption_version() == StorageEncryptionVersion.USER_PASSWORD
120
121 def is_encrypted_with_hw_device(self):
122 return self.get_encryption_version() == StorageEncryptionVersion.XPUB_PASSWORD
123
124 def get_encryption_version(self):
125 """Return the version of encryption used for this storage.
126
127 0: plaintext / no encryption
128
129 ECIES, private key derived from a password,
130 1: password is provided by user
131 2: password is derived from an xpub; used with hw wallets
132 """
133 return self._encryption_version
134
135 def _init_encryption_version(self):
136 try:
137 magic = base64.b64decode(self.raw)[0:4]
138 if magic == b'BIE1':
139 return StorageEncryptionVersion.USER_PASSWORD
140 elif magic == b'BIE2':
141 return StorageEncryptionVersion.XPUB_PASSWORD
142 else:
143 return StorageEncryptionVersion.PLAINTEXT
144 except:
145 return StorageEncryptionVersion.PLAINTEXT
146
147 @staticmethod
148 def get_eckey_from_password(password):
149 secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
150 ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)
151 return ec_key
152
153 def _get_encryption_magic(self):
154 v = self._encryption_version
155 if v == StorageEncryptionVersion.USER_PASSWORD:
156 return b'BIE1'
157 elif v == StorageEncryptionVersion.XPUB_PASSWORD:
158 return b'BIE2'
159 else:
160 raise WalletFileException('no encryption magic for version: %s' % v)
161
162 def decrypt(self, password) -> None:
163 if self.is_past_initial_decryption():
164 return
165 ec_key = self.get_eckey_from_password(password)
166 if self.raw:
167 enc_magic = self._get_encryption_magic()
168 s = zlib.decompress(ec_key.decrypt_message(self.raw, enc_magic))
169 s = s.decode('utf8')
170 else:
171 s = ''
172 self.pubkey = ec_key.get_public_key_hex()
173 self.decrypted = s
174
175 def encrypt_before_writing(self, plaintext: str) -> str:
176 s = plaintext
177 if self.pubkey:
178 s = bytes(s, 'utf8')
179 c = zlib.compress(s, level=zlib.Z_BEST_SPEED)
180 enc_magic = self._get_encryption_magic()
181 public_key = ecc.ECPubkey(bfh(self.pubkey))
182 s = public_key.encrypt_message(c, enc_magic)
183 s = s.decode('utf8')
184 return s
185
186 def check_password(self, password) -> None:
187 """Raises an InvalidPassword exception on invalid password"""
188 if not self.is_encrypted():
189 return
190 if not self.is_past_initial_decryption():
191 self.decrypt(password) # this sets self.pubkey
192 assert self.pubkey is not None
193 if self.pubkey != self.get_eckey_from_password(password).get_public_key_hex():
194 raise InvalidPassword()
195
196 def set_password(self, password, enc_version=None):
197 """Set a password to be used for encrypting this storage."""
198 if not self.is_past_initial_decryption():
199 raise Exception("storage needs to be decrypted before changing password")
200 if enc_version is None:
201 enc_version = self._encryption_version
202 if password and enc_version != StorageEncryptionVersion.PLAINTEXT:
203 ec_key = self.get_eckey_from_password(password)
204 self.pubkey = ec_key.get_public_key_hex()
205 self._encryption_version = enc_version
206 else:
207 self.pubkey = None
208 self._encryption_version = StorageEncryptionVersion.PLAINTEXT
209
210 def basename(self) -> str:
211 return os.path.basename(self.path)
212