URI: 
       tstorage.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tstorage.py (7860B)
       ---
            1 #!/usr/bin/env python
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2015 Thomas Voegtlin
            5 #
            6 # Permission is hereby granted, free of charge, to any person
            7 # obtaining a copy of this software and associated documentation files
            8 # (the "Software"), to deal in the Software without restriction,
            9 # including without limitation the rights to use, copy, modify, merge,
           10 # publish, distribute, sublicense, and/or sell copies of the Software,
           11 # and to permit persons to whom the Software is furnished to do so,
           12 # subject to the following conditions:
           13 #
           14 # The above copyright notice and this permission notice shall be
           15 # included in all copies or substantial portions of the Software.
           16 #
           17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           24 # SOFTWARE.
           25 import os
           26 import threading
           27 import stat
           28 import hashlib
           29 import base64
           30 import zlib
           31 from enum import IntEnum
           32 
           33 from . import ecc
           34 from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path,
           35                    test_read_write_permissions)
           36 
           37 from .wallet_db import WalletDB
           38 from .logging import Logger
           39 
           40 
           41 def get_derivation_used_for_hw_device_encryption():
           42     return ("m"
           43             "/4541509'"      # ascii 'ELE'  as decimal ("BIP43 purpose")
           44             "/1112098098'")  # ascii 'BIE2' as decimal
           45 
           46 
           47 class StorageEncryptionVersion(IntEnum):
           48     PLAINTEXT = 0
           49     USER_PASSWORD = 1
           50     XPUB_PASSWORD = 2
           51 
           52 
           53 class StorageReadWriteError(Exception): pass
           54 
           55 
           56 # TODO: Rename to Storage
           57 class WalletStorage(Logger):
           58 
           59     def __init__(self, path):
           60         Logger.__init__(self)
           61         self.path = standardize_path(path)
           62         self._file_exists = bool(self.path and os.path.exists(self.path))
           63         self.logger.info(f"wallet path {self.path}")
           64         self.pubkey = None
           65         self.decrypted = ''
           66         try:
           67             test_read_write_permissions(self.path)
           68         except IOError as e:
           69             raise StorageReadWriteError(e) from e
           70         if self.file_exists():
           71             with open(self.path, "r", encoding='utf-8') as f:
           72                 self.raw = f.read()
           73             self._encryption_version = self._init_encryption_version()
           74         else:
           75             self.raw = ''
           76             self._encryption_version = StorageEncryptionVersion.PLAINTEXT
           77 
           78     def read(self):
           79         return self.decrypted if self.is_encrypted() else self.raw
           80 
           81     def write(self, data: str) -> None:
           82         s = self.encrypt_before_writing(data)
           83         temp_path = "%s.tmp.%s" % (self.path, os.getpid())
           84         with open(temp_path, "w", encoding='utf-8') as f:
           85             f.write(s)
           86             f.flush()
           87             os.fsync(f.fileno())
           88 
           89         try:
           90             mode = os.stat(self.path).st_mode
           91         except FileNotFoundError:
           92             mode = stat.S_IREAD | stat.S_IWRITE
           93 
           94         # assert that wallet file does not exist, to prevent wallet corruption (see issue #5082)
           95         if not self.file_exists():
           96             assert not os.path.exists(self.path)
           97         os.replace(temp_path, self.path)
           98         os.chmod(self.path, mode)
           99         self._file_exists = True
          100         self.logger.info(f"saved {self.path}")
          101 
          102     def file_exists(self) -> bool:
          103         return self._file_exists
          104 
          105     def is_past_initial_decryption(self):
          106         """Return if storage is in a usable state for normal operations.
          107 
          108         The value is True exactly
          109             if encryption is disabled completely (self.is_encrypted() == False),
          110             or if encryption is enabled but the contents have already been decrypted.
          111         """
          112         return not self.is_encrypted() or bool(self.pubkey)
          113 
          114     def is_encrypted(self):
          115         """Return if storage encryption is currently enabled."""
          116         return self.get_encryption_version() != StorageEncryptionVersion.PLAINTEXT
          117 
          118     def is_encrypted_with_user_pw(self):
          119         return self.get_encryption_version() == StorageEncryptionVersion.USER_PASSWORD
          120 
          121     def is_encrypted_with_hw_device(self):
          122         return self.get_encryption_version() == StorageEncryptionVersion.XPUB_PASSWORD
          123 
          124     def get_encryption_version(self):
          125         """Return the version of encryption used for this storage.
          126 
          127         0: plaintext / no encryption
          128 
          129         ECIES, private key derived from a password,
          130         1: password is provided by user
          131         2: password is derived from an xpub; used with hw wallets
          132         """
          133         return self._encryption_version
          134 
          135     def _init_encryption_version(self):
          136         try:
          137             magic = base64.b64decode(self.raw)[0:4]
          138             if magic == b'BIE1':
          139                 return StorageEncryptionVersion.USER_PASSWORD
          140             elif magic == b'BIE2':
          141                 return StorageEncryptionVersion.XPUB_PASSWORD
          142             else:
          143                 return StorageEncryptionVersion.PLAINTEXT
          144         except:
          145             return StorageEncryptionVersion.PLAINTEXT
          146 
          147     @staticmethod
          148     def get_eckey_from_password(password):
          149         secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
          150         ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)
          151         return ec_key
          152 
          153     def _get_encryption_magic(self):
          154         v = self._encryption_version
          155         if v == StorageEncryptionVersion.USER_PASSWORD:
          156             return b'BIE1'
          157         elif v == StorageEncryptionVersion.XPUB_PASSWORD:
          158             return b'BIE2'
          159         else:
          160             raise WalletFileException('no encryption magic for version: %s' % v)
          161 
          162     def decrypt(self, password) -> None:
          163         if self.is_past_initial_decryption():
          164             return
          165         ec_key = self.get_eckey_from_password(password)
          166         if self.raw:
          167             enc_magic = self._get_encryption_magic()
          168             s = zlib.decompress(ec_key.decrypt_message(self.raw, enc_magic))
          169             s = s.decode('utf8')
          170         else:
          171             s = ''
          172         self.pubkey = ec_key.get_public_key_hex()
          173         self.decrypted = s
          174 
          175     def encrypt_before_writing(self, plaintext: str) -> str:
          176         s = plaintext
          177         if self.pubkey:
          178             s = bytes(s, 'utf8')
          179             c = zlib.compress(s, level=zlib.Z_BEST_SPEED)
          180             enc_magic = self._get_encryption_magic()
          181             public_key = ecc.ECPubkey(bfh(self.pubkey))
          182             s = public_key.encrypt_message(c, enc_magic)
          183             s = s.decode('utf8')
          184         return s
          185 
          186     def check_password(self, password) -> None:
          187         """Raises an InvalidPassword exception on invalid password"""
          188         if not self.is_encrypted():
          189             return
          190         if not self.is_past_initial_decryption():
          191             self.decrypt(password)  # this sets self.pubkey
          192         assert self.pubkey is not None
          193         if self.pubkey != self.get_eckey_from_password(password).get_public_key_hex():
          194             raise InvalidPassword()
          195 
          196     def set_password(self, password, enc_version=None):
          197         """Set a password to be used for encrypting this storage."""
          198         if not self.is_past_initial_decryption():
          199             raise Exception("storage needs to be decrypted before changing password")
          200         if enc_version is None:
          201             enc_version = self._encryption_version
          202         if password and enc_version != StorageEncryptionVersion.PLAINTEXT:
          203             ec_key = self.get_eckey_from_password(password)
          204             self.pubkey = ec_key.get_public_key_hex()
          205             self._encryption_version = enc_version
          206         else:
          207             self.pubkey = None
          208             self._encryption_version = StorageEncryptionVersion.PLAINTEXT
          209 
          210     def basename(self) -> str:
          211         return os.path.basename(self.path)
          212