URI: 
       tdaemon.py - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
       tdaemon.py (22371B)
       ---
            1 #!/usr/bin/env python
            2 #
            3 # Electrum - lightweight Bitcoin client
            4 # Copyright (C) 2015 Thomas Voegtlin
            5 #
            6 # Permission is hereby granted, free of charge, to any person
            7 # obtaining a copy of this software and associated documentation files
            8 # (the "Software"), to deal in the Software without restriction,
            9 # including without limitation the rights to use, copy, modify, merge,
           10 # publish, distribute, sublicense, and/or sell copies of the Software,
           11 # and to permit persons to whom the Software is furnished to do so,
           12 # subject to the following conditions:
           13 #
           14 # The above copyright notice and this permission notice shall be
           15 # included in all copies or substantial portions of the Software.
           16 #
           17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
           20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
           21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
           22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
           23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
           24 # SOFTWARE.
           25 import asyncio
           26 import ast
           27 import os
           28 import time
           29 import traceback
           30 import sys
           31 import threading
           32 from typing import Dict, Optional, Tuple, Iterable, Callable, Union, Sequence, Mapping, TYPE_CHECKING
           33 from base64 import b64decode, b64encode
           34 from collections import defaultdict
           35 import json
           36 
           37 import aiohttp
           38 from aiohttp import web, client_exceptions
           39 from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after
           40 
           41 from . import util
           42 from .network import Network
           43 from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
           44 from .invoices import PR_PAID, PR_EXPIRED
           45 from .util import log_exceptions, ignore_exceptions, randrange
           46 from .wallet import Wallet, Abstract_Wallet
           47 from .storage import WalletStorage
           48 from .wallet_db import WalletDB
           49 from .commands import known_commands, Commands
           50 from .simple_config import SimpleConfig
           51 from .exchange_rate import FxThread
           52 from .logging import get_logger, Logger
           53 
           54 if TYPE_CHECKING:
           55     from electrum import gui
           56 
           57 
           58 _logger = get_logger(__name__)
           59 
           60 
           61 class DaemonNotRunning(Exception):
           62     pass
           63 
           64 def get_lockfile(config: SimpleConfig):
           65     return os.path.join(config.path, 'daemon')
           66 
           67 
           68 def remove_lockfile(lockfile):
           69     os.unlink(lockfile)
           70 
           71 
           72 def get_file_descriptor(config: SimpleConfig):
           73     '''Tries to create the lockfile, using O_EXCL to
           74     prevent races.  If it succeeds it returns the FD.
           75     Otherwise try and connect to the server specified in the lockfile.
           76     If this succeeds, the server is returned.  Otherwise remove the
           77     lockfile and try again.'''
           78     lockfile = get_lockfile(config)
           79     while True:
           80         try:
           81             return os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
           82         except OSError:
           83             pass
           84         try:
           85             request(config, 'ping')
           86             return None
           87         except DaemonNotRunning:
           88             # Couldn't connect; remove lockfile and try again.
           89             remove_lockfile(lockfile)
           90 
           91 
           92 
           93 def request(config: SimpleConfig, endpoint, args=(), timeout=60):
           94     lockfile = get_lockfile(config)
           95     while True:
           96         create_time = None
           97         try:
           98             with open(lockfile) as f:
           99                 (host, port), create_time = ast.literal_eval(f.read())
          100         except Exception:
          101             raise DaemonNotRunning()
          102         rpc_user, rpc_password = get_rpc_credentials(config)
          103         server_url = 'http://%s:%d' % (host, port)
          104         auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
          105         loop = asyncio.get_event_loop()
          106         async def request_coroutine():
          107             async with aiohttp.ClientSession(auth=auth) as session:
          108                 c = util.JsonRPCClient(session, server_url)
          109                 return await c.request(endpoint, *args)
          110         try:
          111             fut = asyncio.run_coroutine_threadsafe(request_coroutine(), loop)
          112             return fut.result(timeout=timeout)
          113         except aiohttp.client_exceptions.ClientConnectorError as e:
          114             _logger.info(f"failed to connect to JSON-RPC server {e}")
          115             if not create_time or create_time < time.time() - 1.0:
          116                 raise DaemonNotRunning()
          117         # Sleep a bit and try again; it might have just been started
          118         time.sleep(1.0)
          119 
          120 
          121 def get_rpc_credentials(config: SimpleConfig) -> Tuple[str, str]:
          122     rpc_user = config.get('rpcuser', None)
          123     rpc_password = config.get('rpcpassword', None)
          124     if rpc_user == '':
          125         rpc_user = None
          126     if rpc_password == '':
          127         rpc_password = None
          128     if rpc_user is None or rpc_password is None:
          129         rpc_user = 'user'
          130         bits = 128
          131         nbytes = bits // 8 + (bits % 8 > 0)
          132         pw_int = randrange(pow(2, bits))
          133         pw_b64 = b64encode(
          134             pw_int.to_bytes(nbytes, 'big'), b'-_')
          135         rpc_password = to_string(pw_b64, 'ascii')
          136         config.set_key('rpcuser', rpc_user)
          137         config.set_key('rpcpassword', rpc_password, save=True)
          138     return rpc_user, rpc_password
          139 
          140 
          141 class AuthenticationError(Exception):
          142     pass
          143 
          144 class AuthenticationInvalidOrMissing(AuthenticationError):
          145     pass
          146 
          147 class AuthenticationCredentialsInvalid(AuthenticationError):
          148     pass
          149 
          150 class AuthenticatedServer(Logger):
          151 
          152     def __init__(self, rpc_user, rpc_password):
          153         Logger.__init__(self)
          154         self.rpc_user = rpc_user
          155         self.rpc_password = rpc_password
          156         self.auth_lock = asyncio.Lock()
          157         self._methods = {}  # type: Dict[str, Callable]
          158 
          159     def register_method(self, f):
          160         assert f.__name__ not in self._methods, f"name collision for {f.__name__}"
          161         self._methods[f.__name__] = f
          162 
          163     async def authenticate(self, headers):
          164         if self.rpc_password == '':
          165             # RPC authentication is disabled
          166             return
          167         auth_string = headers.get('Authorization', None)
          168         if auth_string is None:
          169             raise AuthenticationInvalidOrMissing('CredentialsMissing')
          170         basic, _, encoded = auth_string.partition(' ')
          171         if basic != 'Basic':
          172             raise AuthenticationInvalidOrMissing('UnsupportedType')
          173         encoded = to_bytes(encoded, 'utf8')
          174         credentials = to_string(b64decode(encoded), 'utf8')
          175         username, _, password = credentials.partition(':')
          176         if not (constant_time_compare(username, self.rpc_user)
          177                 and constant_time_compare(password, self.rpc_password)):
          178             await asyncio.sleep(0.050)
          179             raise AuthenticationCredentialsInvalid('Invalid Credentials')
          180 
          181     async def handle(self, request):
          182         async with self.auth_lock:
          183             try:
          184                 await self.authenticate(request.headers)
          185             except AuthenticationInvalidOrMissing:
          186                 return web.Response(headers={"WWW-Authenticate": "Basic realm=Electrum"},
          187                                     text='Unauthorized', status=401)
          188             except AuthenticationCredentialsInvalid:
          189                 return web.Response(text='Forbidden', status=403)
          190         try:
          191             request = await request.text()
          192             request = json.loads(request)
          193             method = request['method']
          194             _id = request['id']
          195             params = request.get('params', [])  # type: Union[Sequence, Mapping]
          196             if method not in self._methods:
          197                 raise Exception(f"attempting to use unregistered method: {method}")
          198             f = self._methods[method]
          199         except Exception as e:
          200             self.logger.exception("invalid request")
          201             return web.Response(text='Invalid Request', status=500)
          202         response = {
          203             'id': _id,
          204             'jsonrpc': '2.0',
          205         }
          206         try:
          207             if isinstance(params, dict):
          208                 response['result'] = await f(**params)
          209             else:
          210                 response['result'] = await f(*params)
          211         except BaseException as e:
          212             self.logger.exception("internal error while executing RPC")
          213             response['error'] = {
          214                 'code': 1,
          215                 'message': str(e),
          216             }
          217         return web.json_response(response)
          218 
          219 
          220 class CommandsServer(AuthenticatedServer):
          221 
          222     def __init__(self, daemon, fd):
          223         rpc_user, rpc_password = get_rpc_credentials(daemon.config)
          224         AuthenticatedServer.__init__(self, rpc_user, rpc_password)
          225         self.daemon = daemon
          226         self.fd = fd
          227         self.config = daemon.config
          228         self.host = self.config.get('rpchost', '127.0.0.1')
          229         self.port = self.config.get('rpcport', 0)
          230         self.app = web.Application()
          231         self.app.router.add_post("/", self.handle)
          232         self.register_method(self.ping)
          233         self.register_method(self.gui)
          234         self.cmd_runner = Commands(config=self.config, network=self.daemon.network, daemon=self.daemon)
          235         for cmdname in known_commands:
          236             self.register_method(getattr(self.cmd_runner, cmdname))
          237         self.register_method(self.run_cmdline)
          238 
          239     async def run(self):
          240         self.runner = web.AppRunner(self.app)
          241         await self.runner.setup()
          242         site = web.TCPSite(self.runner, self.host, self.port)
          243         await site.start()
          244         socket = site._server.sockets[0]
          245         os.write(self.fd, bytes(repr((socket.getsockname(), time.time())), 'utf8'))
          246         os.close(self.fd)
          247 
          248     async def ping(self):
          249         return True
          250 
          251     async def gui(self, config_options):
          252         if self.daemon.gui_object:
          253             if hasattr(self.daemon.gui_object, 'new_window'):
          254                 path = self.config.get_wallet_path(use_gui_last_wallet=True)
          255                 self.daemon.gui_object.new_window(path, config_options.get('url'))
          256                 response = "ok"
          257             else:
          258                 response = "error: current GUI does not support multiple windows"
          259         else:
          260             response = "Error: Electrum is running in daemon mode. Please stop the daemon first."
          261         return response
          262 
          263     async def run_cmdline(self, config_options):
          264         cmdname = config_options['cmd']
          265         cmd = known_commands[cmdname]
          266         # arguments passed to function
          267         args = [config_options.get(x) for x in cmd.params]
          268         # decode json arguments
          269         args = [json_decode(i) for i in args]
          270         # options
          271         kwargs = {}
          272         for x in cmd.options:
          273             kwargs[x] = config_options.get(x)
          274         if 'wallet_path' in cmd.options:
          275             kwargs['wallet_path'] = config_options.get('wallet_path')
          276         elif 'wallet' in cmd.options:
          277             kwargs['wallet'] = config_options.get('wallet_path')
          278         func = getattr(self.cmd_runner, cmd.name)
          279         # fixme: not sure how to retrieve message in jsonrpcclient
          280         try:
          281             result = await func(*args, **kwargs)
          282         except Exception as e:
          283             result = {'error':str(e)}
          284         return result
          285 
          286 
          287 class WatchTowerServer(AuthenticatedServer):
          288 
          289     def __init__(self, network, netaddress):
          290         self.addr = netaddress
          291         self.config = network.config
          292         self.network = network
          293         watchtower_user = self.config.get('watchtower_user', '')
          294         watchtower_password = self.config.get('watchtower_password', '')
          295         AuthenticatedServer.__init__(self, watchtower_user, watchtower_password)
          296         self.lnwatcher = network.local_watchtower
          297         self.app = web.Application()
          298         self.app.router.add_post("/", self.handle)
          299         self.register_method(self.get_ctn)
          300         self.register_method(self.add_sweep_tx)
          301 
          302     async def run(self):
          303         self.runner = web.AppRunner(self.app)
          304         await self.runner.setup()
          305         site = web.TCPSite(self.runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context())
          306         await site.start()
          307 
          308     async def get_ctn(self, *args):
          309         return await self.lnwatcher.sweepstore.get_ctn(*args)
          310 
          311     async def add_sweep_tx(self, *args):
          312         return await self.lnwatcher.sweepstore.add_sweep_tx(*args)
          313 
          314 
          315 class PayServer(Logger):
          316 
          317     def __init__(self, daemon: 'Daemon', netaddress):
          318         Logger.__init__(self)
          319         self.addr = netaddress
          320         self.daemon = daemon
          321         self.config = daemon.config
          322         self.pending = defaultdict(asyncio.Event)
          323         util.register_callback(self.on_payment, ['request_status'])
          324 
          325     @property
          326     def wallet(self):
          327         # FIXME specify wallet somehow?
          328         return list(self.daemon.get_wallets().values())[0]
          329 
          330     async def on_payment(self, evt, wallet, key, status):
          331         if status == PR_PAID:
          332             self.pending[key].set()
          333 
          334     @ignore_exceptions
          335     @log_exceptions
          336     async def run(self):
          337         root = self.config.get('payserver_root', '/r')
          338         app = web.Application()
          339         app.add_routes([web.get('/api/get_invoice', self.get_request)])
          340         app.add_routes([web.get('/api/get_status', self.get_status)])
          341         app.add_routes([web.get('/bip70/{key}.bip70', self.get_bip70_request)])
          342         app.add_routes([web.static(root, os.path.join(os.path.dirname(__file__), 'www'))])
          343         if self.config.get('payserver_allow_create_invoice'):
          344             app.add_routes([web.post('/api/create_invoice', self.create_request)])
          345         runner = web.AppRunner(app)
          346         await runner.setup()
          347         site = web.TCPSite(runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context())
          348         await site.start()
          349 
          350     async def create_request(self, request):
          351         params = await request.post()
          352         wallet = self.wallet
          353         if 'amount_sat' not in params or not params['amount_sat'].isdigit():
          354             raise web.HTTPUnsupportedMediaType()
          355         amount = int(params['amount_sat'])
          356         message = params['message'] or "donation"
          357         payment_hash = wallet.lnworker.add_request(
          358             amount_sat=amount,
          359             message=message,
          360             expiry=3600)
          361         key = payment_hash.hex()
          362         raise web.HTTPFound(self.root + '/pay?id=' + key)
          363 
          364     async def get_request(self, r):
          365         key = r.query_string
          366         request = self.wallet.get_formatted_request(key)
          367         return web.json_response(request)
          368 
          369     async def get_bip70_request(self, r):
          370         from .paymentrequest import make_request
          371         key = r.match_info['key']
          372         request = self.wallet.get_request(key)
          373         if not request:
          374             return web.HTTPNotFound()
          375         pr = make_request(self.config, request)
          376         return web.Response(body=pr.SerializeToString(), content_type='application/bitcoin-paymentrequest')
          377 
          378     async def get_status(self, request):
          379         ws = web.WebSocketResponse()
          380         await ws.prepare(request)
          381         key = request.query_string
          382         info = self.wallet.get_formatted_request(key)
          383         if not info:
          384             await ws.send_str('unknown invoice')
          385             await ws.close()
          386             return ws
          387         if info.get('status') == PR_PAID:
          388             await ws.send_str(f'paid')
          389             await ws.close()
          390             return ws
          391         if info.get('status') == PR_EXPIRED:
          392             await ws.send_str(f'expired')
          393             await ws.close()
          394             return ws
          395         while True:
          396             try:
          397                 await asyncio.wait_for(self.pending[key].wait(), 1)
          398                 break
          399             except asyncio.TimeoutError:
          400                 # send data on the websocket, to keep it alive
          401                 await ws.send_str('waiting')
          402         await ws.send_str('paid')
          403         await ws.close()
          404         return ws
          405 
          406 
          407 
          408 class Daemon(Logger):
          409 
          410     network: Optional[Network]
          411     gui_object: Optional[Union['gui.qt.ElectrumGui', 'gui.kivy.ElectrumGui']]
          412 
          413     @profiler
          414     def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
          415         Logger.__init__(self)
          416         self.running = False
          417         self.running_lock = threading.Lock()
          418         self.config = config
          419         if fd is None and listen_jsonrpc:
          420             fd = get_file_descriptor(config)
          421             if fd is None:
          422                 raise Exception('failed to lock daemon; already running?')
          423         if 'wallet_path' in config.cmdline_options:
          424             self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
          425                                 "Use the load_wallet command instead.")
          426         self.asyncio_loop = asyncio.get_event_loop()
          427         self.network = None
          428         if not config.get('offline'):
          429             self.network = Network(config, daemon=self)
          430         self.fx = FxThread(config, self.network)
          431         self.gui_object = None
          432         # path -> wallet;   make sure path is standardized.
          433         self._wallets = {}  # type: Dict[str, Abstract_Wallet]
          434         daemon_jobs = []
          435         # Setup commands server
          436         self.commands_server = None
          437         if listen_jsonrpc:
          438             self.commands_server = CommandsServer(self, fd)
          439             daemon_jobs.append(self.commands_server.run())
          440         # pay server
          441         self.pay_server = None
          442         payserver_address = self.config.get_netaddress('payserver_address')
          443         if not config.get('offline') and payserver_address:
          444             self.pay_server = PayServer(self, payserver_address)
          445             daemon_jobs.append(self.pay_server.run())
          446         # server-side watchtower
          447         self.watchtower = None
          448         watchtower_address = self.config.get_netaddress('watchtower_address')
          449         if not config.get('offline') and watchtower_address:
          450             self.watchtower = WatchTowerServer(self.network, watchtower_address)
          451             daemon_jobs.append(self.watchtower.run)
          452         if self.network:
          453             self.network.start(jobs=[self.fx.run])
          454             # prepare lightning functionality, also load channel db early
          455             if self.config.get('use_gossip', False):
          456                 self.network.start_gossip()
          457 
          458         self.taskgroup = TaskGroup()
          459         asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)
          460 
          461     @log_exceptions
          462     async def _run(self, jobs: Iterable = None):
          463         if jobs is None:
          464             jobs = []
          465         self.logger.info("starting taskgroup.")
          466         try:
          467             async with self.taskgroup as group:
          468                 [await group.spawn(job) for job in jobs]
          469                 await group.spawn(asyncio.Event().wait)  # run forever (until cancel)
          470         except asyncio.CancelledError:
          471             raise
          472         except Exception as e:
          473             self.logger.exception("taskgroup died.")
          474         finally:
          475             self.logger.info("taskgroup stopped.")
          476 
          477     def load_wallet(self, path, password, *, manual_upgrades=True) -> Optional[Abstract_Wallet]:
          478         path = standardize_path(path)
          479         # wizard will be launched if we return
          480         if path in self._wallets:
          481             wallet = self._wallets[path]
          482             return wallet
          483         storage = WalletStorage(path)
          484         if not storage.file_exists():
          485             return
          486         if storage.is_encrypted():
          487             if not password:
          488                 return
          489             storage.decrypt(password)
          490         # read data, pass it to db
          491         db = WalletDB(storage.read(), manual_upgrades=manual_upgrades)
          492         if db.requires_split():
          493             return
          494         if db.requires_upgrade():
          495             return
          496         if db.get_action():
          497             return
          498         wallet = Wallet(db, storage, config=self.config)
          499         wallet.start_network(self.network)
          500         self._wallets[path] = wallet
          501         return wallet
          502 
          503     def add_wallet(self, wallet: Abstract_Wallet) -> None:
          504         path = wallet.storage.path
          505         path = standardize_path(path)
          506         self._wallets[path] = wallet
          507 
          508     def get_wallet(self, path: str) -> Optional[Abstract_Wallet]:
          509         path = standardize_path(path)
          510         return self._wallets.get(path)
          511 
          512     def get_wallets(self) -> Dict[str, Abstract_Wallet]:
          513         return dict(self._wallets)  # copy
          514 
          515     def delete_wallet(self, path: str) -> bool:
          516         self.stop_wallet(path)
          517         if os.path.exists(path):
          518             os.unlink(path)
          519             return True
          520         return False
          521 
          522     def stop_wallet(self, path: str) -> bool:
          523         """Returns True iff a wallet was found."""
          524         path = standardize_path(path)
          525         wallet = self._wallets.pop(path, None)
          526         if not wallet:
          527             return False
          528         fut = asyncio.run_coroutine_threadsafe(wallet.stop(), self.asyncio_loop)
          529         fut.result()
          530         return True
          531 
          532     def run_daemon(self):
          533         self.running = True
          534         try:
          535             while self.is_running():
          536                 time.sleep(0.1)
          537         except KeyboardInterrupt:
          538             self.running = False
          539         self.on_stop()
          540 
          541     def is_running(self):
          542         with self.running_lock:
          543             return self.running and not self.taskgroup.closed()
          544 
          545     def stop(self):
          546         with self.running_lock:
          547             self.running = False
          548 
          549     def on_stop(self):
          550         self.logger.info("on_stop() entered. initiating shutdown")
          551         if self.gui_object:
          552             self.gui_object.stop()
          553 
          554         @log_exceptions
          555         async def stop_async():
          556             self.logger.info("stopping all wallets")
          557             async with TaskGroup() as group:
          558                 for k, wallet in self._wallets.items():
          559                     await group.spawn(wallet.stop())
          560             self.logger.info("stopping network and taskgroup")
          561             async with ignore_after(2):
          562                 async with TaskGroup() as group:
          563                     if self.network:
          564                         await group.spawn(self.network.stop(full_shutdown=True))
          565                     await group.spawn(self.taskgroup.cancel_remaining())
          566 
          567         fut = asyncio.run_coroutine_threadsafe(stop_async(), self.asyncio_loop)
          568         fut.result()
          569         self.logger.info("removing lockfile")
          570         remove_lockfile(get_lockfile(self.config))
          571         self.logger.info("stopped")
          572 
          573     def run_gui(self, config, plugins):
          574         threading.current_thread().setName('GUI')
          575         gui_name = config.get('gui', 'qt')
          576         if gui_name in ['lite', 'classic']:
          577             gui_name = 'qt'
          578         self.logger.info(f'launching GUI: {gui_name}')
          579         try:
          580             gui = __import__('electrum.gui.' + gui_name, fromlist=['electrum'])
          581             self.gui_object = gui.ElectrumGui(config, self, plugins)
          582             self.gui_object.main()
          583         except BaseException as e:
          584             self.logger.error(f'GUI raised exception: {repr(e)}. shutting down.')
          585             raise
          586         finally:
          587             # app will exit now
          588             self.on_stop()