tdaemon.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tdaemon.py (22371B)
---
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2015 Thomas Voegtlin
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation files
8 # (the "Software"), to deal in the Software without restriction,
9 # including without limitation the rights to use, copy, modify, merge,
10 # publish, distribute, sublicense, and/or sell copies of the Software,
11 # and to permit persons to whom the Software is furnished to do so,
12 # subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25 import asyncio
26 import ast
27 import os
28 import time
29 import traceback
30 import sys
31 import threading
32 from typing import Dict, Optional, Tuple, Iterable, Callable, Union, Sequence, Mapping, TYPE_CHECKING
33 from base64 import b64decode, b64encode
34 from collections import defaultdict
35 import json
36
37 import aiohttp
38 from aiohttp import web, client_exceptions
39 from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after
40
41 from . import util
42 from .network import Network
43 from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
44 from .invoices import PR_PAID, PR_EXPIRED
45 from .util import log_exceptions, ignore_exceptions, randrange
46 from .wallet import Wallet, Abstract_Wallet
47 from .storage import WalletStorage
48 from .wallet_db import WalletDB
49 from .commands import known_commands, Commands
50 from .simple_config import SimpleConfig
51 from .exchange_rate import FxThread
52 from .logging import get_logger, Logger
53
54 if TYPE_CHECKING:
55 from electrum import gui
56
57
58 _logger = get_logger(__name__)
59
60
61 class DaemonNotRunning(Exception):
62 pass
63
64 def get_lockfile(config: SimpleConfig):
65 return os.path.join(config.path, 'daemon')
66
67
68 def remove_lockfile(lockfile):
69 os.unlink(lockfile)
70
71
72 def get_file_descriptor(config: SimpleConfig):
73 '''Tries to create the lockfile, using O_EXCL to
74 prevent races. If it succeeds it returns the FD.
75 Otherwise try and connect to the server specified in the lockfile.
76 If this succeeds, the server is returned. Otherwise remove the
77 lockfile and try again.'''
78 lockfile = get_lockfile(config)
79 while True:
80 try:
81 return os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
82 except OSError:
83 pass
84 try:
85 request(config, 'ping')
86 return None
87 except DaemonNotRunning:
88 # Couldn't connect; remove lockfile and try again.
89 remove_lockfile(lockfile)
90
91
92
93 def request(config: SimpleConfig, endpoint, args=(), timeout=60):
94 lockfile = get_lockfile(config)
95 while True:
96 create_time = None
97 try:
98 with open(lockfile) as f:
99 (host, port), create_time = ast.literal_eval(f.read())
100 except Exception:
101 raise DaemonNotRunning()
102 rpc_user, rpc_password = get_rpc_credentials(config)
103 server_url = 'http://%s:%d' % (host, port)
104 auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
105 loop = asyncio.get_event_loop()
106 async def request_coroutine():
107 async with aiohttp.ClientSession(auth=auth) as session:
108 c = util.JsonRPCClient(session, server_url)
109 return await c.request(endpoint, *args)
110 try:
111 fut = asyncio.run_coroutine_threadsafe(request_coroutine(), loop)
112 return fut.result(timeout=timeout)
113 except aiohttp.client_exceptions.ClientConnectorError as e:
114 _logger.info(f"failed to connect to JSON-RPC server {e}")
115 if not create_time or create_time < time.time() - 1.0:
116 raise DaemonNotRunning()
117 # Sleep a bit and try again; it might have just been started
118 time.sleep(1.0)
119
120
121 def get_rpc_credentials(config: SimpleConfig) -> Tuple[str, str]:
122 rpc_user = config.get('rpcuser', None)
123 rpc_password = config.get('rpcpassword', None)
124 if rpc_user == '':
125 rpc_user = None
126 if rpc_password == '':
127 rpc_password = None
128 if rpc_user is None or rpc_password is None:
129 rpc_user = 'user'
130 bits = 128
131 nbytes = bits // 8 + (bits % 8 > 0)
132 pw_int = randrange(pow(2, bits))
133 pw_b64 = b64encode(
134 pw_int.to_bytes(nbytes, 'big'), b'-_')
135 rpc_password = to_string(pw_b64, 'ascii')
136 config.set_key('rpcuser', rpc_user)
137 config.set_key('rpcpassword', rpc_password, save=True)
138 return rpc_user, rpc_password
139
140
141 class AuthenticationError(Exception):
142 pass
143
144 class AuthenticationInvalidOrMissing(AuthenticationError):
145 pass
146
147 class AuthenticationCredentialsInvalid(AuthenticationError):
148 pass
149
150 class AuthenticatedServer(Logger):
151
152 def __init__(self, rpc_user, rpc_password):
153 Logger.__init__(self)
154 self.rpc_user = rpc_user
155 self.rpc_password = rpc_password
156 self.auth_lock = asyncio.Lock()
157 self._methods = {} # type: Dict[str, Callable]
158
159 def register_method(self, f):
160 assert f.__name__ not in self._methods, f"name collision for {f.__name__}"
161 self._methods[f.__name__] = f
162
163 async def authenticate(self, headers):
164 if self.rpc_password == '':
165 # RPC authentication is disabled
166 return
167 auth_string = headers.get('Authorization', None)
168 if auth_string is None:
169 raise AuthenticationInvalidOrMissing('CredentialsMissing')
170 basic, _, encoded = auth_string.partition(' ')
171 if basic != 'Basic':
172 raise AuthenticationInvalidOrMissing('UnsupportedType')
173 encoded = to_bytes(encoded, 'utf8')
174 credentials = to_string(b64decode(encoded), 'utf8')
175 username, _, password = credentials.partition(':')
176 if not (constant_time_compare(username, self.rpc_user)
177 and constant_time_compare(password, self.rpc_password)):
178 await asyncio.sleep(0.050)
179 raise AuthenticationCredentialsInvalid('Invalid Credentials')
180
181 async def handle(self, request):
182 async with self.auth_lock:
183 try:
184 await self.authenticate(request.headers)
185 except AuthenticationInvalidOrMissing:
186 return web.Response(headers={"WWW-Authenticate": "Basic realm=Electrum"},
187 text='Unauthorized', status=401)
188 except AuthenticationCredentialsInvalid:
189 return web.Response(text='Forbidden', status=403)
190 try:
191 request = await request.text()
192 request = json.loads(request)
193 method = request['method']
194 _id = request['id']
195 params = request.get('params', []) # type: Union[Sequence, Mapping]
196 if method not in self._methods:
197 raise Exception(f"attempting to use unregistered method: {method}")
198 f = self._methods[method]
199 except Exception as e:
200 self.logger.exception("invalid request")
201 return web.Response(text='Invalid Request', status=500)
202 response = {
203 'id': _id,
204 'jsonrpc': '2.0',
205 }
206 try:
207 if isinstance(params, dict):
208 response['result'] = await f(**params)
209 else:
210 response['result'] = await f(*params)
211 except BaseException as e:
212 self.logger.exception("internal error while executing RPC")
213 response['error'] = {
214 'code': 1,
215 'message': str(e),
216 }
217 return web.json_response(response)
218
219
220 class CommandsServer(AuthenticatedServer):
221
222 def __init__(self, daemon, fd):
223 rpc_user, rpc_password = get_rpc_credentials(daemon.config)
224 AuthenticatedServer.__init__(self, rpc_user, rpc_password)
225 self.daemon = daemon
226 self.fd = fd
227 self.config = daemon.config
228 self.host = self.config.get('rpchost', '127.0.0.1')
229 self.port = self.config.get('rpcport', 0)
230 self.app = web.Application()
231 self.app.router.add_post("/", self.handle)
232 self.register_method(self.ping)
233 self.register_method(self.gui)
234 self.cmd_runner = Commands(config=self.config, network=self.daemon.network, daemon=self.daemon)
235 for cmdname in known_commands:
236 self.register_method(getattr(self.cmd_runner, cmdname))
237 self.register_method(self.run_cmdline)
238
239 async def run(self):
240 self.runner = web.AppRunner(self.app)
241 await self.runner.setup()
242 site = web.TCPSite(self.runner, self.host, self.port)
243 await site.start()
244 socket = site._server.sockets[0]
245 os.write(self.fd, bytes(repr((socket.getsockname(), time.time())), 'utf8'))
246 os.close(self.fd)
247
248 async def ping(self):
249 return True
250
251 async def gui(self, config_options):
252 if self.daemon.gui_object:
253 if hasattr(self.daemon.gui_object, 'new_window'):
254 path = self.config.get_wallet_path(use_gui_last_wallet=True)
255 self.daemon.gui_object.new_window(path, config_options.get('url'))
256 response = "ok"
257 else:
258 response = "error: current GUI does not support multiple windows"
259 else:
260 response = "Error: Electrum is running in daemon mode. Please stop the daemon first."
261 return response
262
263 async def run_cmdline(self, config_options):
264 cmdname = config_options['cmd']
265 cmd = known_commands[cmdname]
266 # arguments passed to function
267 args = [config_options.get(x) for x in cmd.params]
268 # decode json arguments
269 args = [json_decode(i) for i in args]
270 # options
271 kwargs = {}
272 for x in cmd.options:
273 kwargs[x] = config_options.get(x)
274 if 'wallet_path' in cmd.options:
275 kwargs['wallet_path'] = config_options.get('wallet_path')
276 elif 'wallet' in cmd.options:
277 kwargs['wallet'] = config_options.get('wallet_path')
278 func = getattr(self.cmd_runner, cmd.name)
279 # fixme: not sure how to retrieve message in jsonrpcclient
280 try:
281 result = await func(*args, **kwargs)
282 except Exception as e:
283 result = {'error':str(e)}
284 return result
285
286
287 class WatchTowerServer(AuthenticatedServer):
288
289 def __init__(self, network, netaddress):
290 self.addr = netaddress
291 self.config = network.config
292 self.network = network
293 watchtower_user = self.config.get('watchtower_user', '')
294 watchtower_password = self.config.get('watchtower_password', '')
295 AuthenticatedServer.__init__(self, watchtower_user, watchtower_password)
296 self.lnwatcher = network.local_watchtower
297 self.app = web.Application()
298 self.app.router.add_post("/", self.handle)
299 self.register_method(self.get_ctn)
300 self.register_method(self.add_sweep_tx)
301
302 async def run(self):
303 self.runner = web.AppRunner(self.app)
304 await self.runner.setup()
305 site = web.TCPSite(self.runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context())
306 await site.start()
307
308 async def get_ctn(self, *args):
309 return await self.lnwatcher.sweepstore.get_ctn(*args)
310
311 async def add_sweep_tx(self, *args):
312 return await self.lnwatcher.sweepstore.add_sweep_tx(*args)
313
314
315 class PayServer(Logger):
316
317 def __init__(self, daemon: 'Daemon', netaddress):
318 Logger.__init__(self)
319 self.addr = netaddress
320 self.daemon = daemon
321 self.config = daemon.config
322 self.pending = defaultdict(asyncio.Event)
323 util.register_callback(self.on_payment, ['request_status'])
324
325 @property
326 def wallet(self):
327 # FIXME specify wallet somehow?
328 return list(self.daemon.get_wallets().values())[0]
329
330 async def on_payment(self, evt, wallet, key, status):
331 if status == PR_PAID:
332 self.pending[key].set()
333
334 @ignore_exceptions
335 @log_exceptions
336 async def run(self):
337 root = self.config.get('payserver_root', '/r')
338 app = web.Application()
339 app.add_routes([web.get('/api/get_invoice', self.get_request)])
340 app.add_routes([web.get('/api/get_status', self.get_status)])
341 app.add_routes([web.get('/bip70/{key}.bip70', self.get_bip70_request)])
342 app.add_routes([web.static(root, os.path.join(os.path.dirname(__file__), 'www'))])
343 if self.config.get('payserver_allow_create_invoice'):
344 app.add_routes([web.post('/api/create_invoice', self.create_request)])
345 runner = web.AppRunner(app)
346 await runner.setup()
347 site = web.TCPSite(runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context())
348 await site.start()
349
350 async def create_request(self, request):
351 params = await request.post()
352 wallet = self.wallet
353 if 'amount_sat' not in params or not params['amount_sat'].isdigit():
354 raise web.HTTPUnsupportedMediaType()
355 amount = int(params['amount_sat'])
356 message = params['message'] or "donation"
357 payment_hash = wallet.lnworker.add_request(
358 amount_sat=amount,
359 message=message,
360 expiry=3600)
361 key = payment_hash.hex()
362 raise web.HTTPFound(self.root + '/pay?id=' + key)
363
364 async def get_request(self, r):
365 key = r.query_string
366 request = self.wallet.get_formatted_request(key)
367 return web.json_response(request)
368
369 async def get_bip70_request(self, r):
370 from .paymentrequest import make_request
371 key = r.match_info['key']
372 request = self.wallet.get_request(key)
373 if not request:
374 return web.HTTPNotFound()
375 pr = make_request(self.config, request)
376 return web.Response(body=pr.SerializeToString(), content_type='application/bitcoin-paymentrequest')
377
378 async def get_status(self, request):
379 ws = web.WebSocketResponse()
380 await ws.prepare(request)
381 key = request.query_string
382 info = self.wallet.get_formatted_request(key)
383 if not info:
384 await ws.send_str('unknown invoice')
385 await ws.close()
386 return ws
387 if info.get('status') == PR_PAID:
388 await ws.send_str(f'paid')
389 await ws.close()
390 return ws
391 if info.get('status') == PR_EXPIRED:
392 await ws.send_str(f'expired')
393 await ws.close()
394 return ws
395 while True:
396 try:
397 await asyncio.wait_for(self.pending[key].wait(), 1)
398 break
399 except asyncio.TimeoutError:
400 # send data on the websocket, to keep it alive
401 await ws.send_str('waiting')
402 await ws.send_str('paid')
403 await ws.close()
404 return ws
405
406
407
408 class Daemon(Logger):
409
410 network: Optional[Network]
411 gui_object: Optional[Union['gui.qt.ElectrumGui', 'gui.kivy.ElectrumGui']]
412
413 @profiler
414 def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
415 Logger.__init__(self)
416 self.running = False
417 self.running_lock = threading.Lock()
418 self.config = config
419 if fd is None and listen_jsonrpc:
420 fd = get_file_descriptor(config)
421 if fd is None:
422 raise Exception('failed to lock daemon; already running?')
423 if 'wallet_path' in config.cmdline_options:
424 self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
425 "Use the load_wallet command instead.")
426 self.asyncio_loop = asyncio.get_event_loop()
427 self.network = None
428 if not config.get('offline'):
429 self.network = Network(config, daemon=self)
430 self.fx = FxThread(config, self.network)
431 self.gui_object = None
432 # path -> wallet; make sure path is standardized.
433 self._wallets = {} # type: Dict[str, Abstract_Wallet]
434 daemon_jobs = []
435 # Setup commands server
436 self.commands_server = None
437 if listen_jsonrpc:
438 self.commands_server = CommandsServer(self, fd)
439 daemon_jobs.append(self.commands_server.run())
440 # pay server
441 self.pay_server = None
442 payserver_address = self.config.get_netaddress('payserver_address')
443 if not config.get('offline') and payserver_address:
444 self.pay_server = PayServer(self, payserver_address)
445 daemon_jobs.append(self.pay_server.run())
446 # server-side watchtower
447 self.watchtower = None
448 watchtower_address = self.config.get_netaddress('watchtower_address')
449 if not config.get('offline') and watchtower_address:
450 self.watchtower = WatchTowerServer(self.network, watchtower_address)
451 daemon_jobs.append(self.watchtower.run)
452 if self.network:
453 self.network.start(jobs=[self.fx.run])
454 # prepare lightning functionality, also load channel db early
455 if self.config.get('use_gossip', False):
456 self.network.start_gossip()
457
458 self.taskgroup = TaskGroup()
459 asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)
460
461 @log_exceptions
462 async def _run(self, jobs: Iterable = None):
463 if jobs is None:
464 jobs = []
465 self.logger.info("starting taskgroup.")
466 try:
467 async with self.taskgroup as group:
468 [await group.spawn(job) for job in jobs]
469 await group.spawn(asyncio.Event().wait) # run forever (until cancel)
470 except asyncio.CancelledError:
471 raise
472 except Exception as e:
473 self.logger.exception("taskgroup died.")
474 finally:
475 self.logger.info("taskgroup stopped.")
476
477 def load_wallet(self, path, password, *, manual_upgrades=True) -> Optional[Abstract_Wallet]:
478 path = standardize_path(path)
479 # wizard will be launched if we return
480 if path in self._wallets:
481 wallet = self._wallets[path]
482 return wallet
483 storage = WalletStorage(path)
484 if not storage.file_exists():
485 return
486 if storage.is_encrypted():
487 if not password:
488 return
489 storage.decrypt(password)
490 # read data, pass it to db
491 db = WalletDB(storage.read(), manual_upgrades=manual_upgrades)
492 if db.requires_split():
493 return
494 if db.requires_upgrade():
495 return
496 if db.get_action():
497 return
498 wallet = Wallet(db, storage, config=self.config)
499 wallet.start_network(self.network)
500 self._wallets[path] = wallet
501 return wallet
502
503 def add_wallet(self, wallet: Abstract_Wallet) -> None:
504 path = wallet.storage.path
505 path = standardize_path(path)
506 self._wallets[path] = wallet
507
508 def get_wallet(self, path: str) -> Optional[Abstract_Wallet]:
509 path = standardize_path(path)
510 return self._wallets.get(path)
511
512 def get_wallets(self) -> Dict[str, Abstract_Wallet]:
513 return dict(self._wallets) # copy
514
515 def delete_wallet(self, path: str) -> bool:
516 self.stop_wallet(path)
517 if os.path.exists(path):
518 os.unlink(path)
519 return True
520 return False
521
522 def stop_wallet(self, path: str) -> bool:
523 """Returns True iff a wallet was found."""
524 path = standardize_path(path)
525 wallet = self._wallets.pop(path, None)
526 if not wallet:
527 return False
528 fut = asyncio.run_coroutine_threadsafe(wallet.stop(), self.asyncio_loop)
529 fut.result()
530 return True
531
532 def run_daemon(self):
533 self.running = True
534 try:
535 while self.is_running():
536 time.sleep(0.1)
537 except KeyboardInterrupt:
538 self.running = False
539 self.on_stop()
540
541 def is_running(self):
542 with self.running_lock:
543 return self.running and not self.taskgroup.closed()
544
545 def stop(self):
546 with self.running_lock:
547 self.running = False
548
549 def on_stop(self):
550 self.logger.info("on_stop() entered. initiating shutdown")
551 if self.gui_object:
552 self.gui_object.stop()
553
554 @log_exceptions
555 async def stop_async():
556 self.logger.info("stopping all wallets")
557 async with TaskGroup() as group:
558 for k, wallet in self._wallets.items():
559 await group.spawn(wallet.stop())
560 self.logger.info("stopping network and taskgroup")
561 async with ignore_after(2):
562 async with TaskGroup() as group:
563 if self.network:
564 await group.spawn(self.network.stop(full_shutdown=True))
565 await group.spawn(self.taskgroup.cancel_remaining())
566
567 fut = asyncio.run_coroutine_threadsafe(stop_async(), self.asyncio_loop)
568 fut.result()
569 self.logger.info("removing lockfile")
570 remove_lockfile(get_lockfile(self.config))
571 self.logger.info("stopped")
572
573 def run_gui(self, config, plugins):
574 threading.current_thread().setName('GUI')
575 gui_name = config.get('gui', 'qt')
576 if gui_name in ['lite', 'classic']:
577 gui_name = 'qt'
578 self.logger.info(f'launching GUI: {gui_name}')
579 try:
580 gui = __import__('electrum.gui.' + gui_name, fromlist=['electrum'])
581 self.gui_object = gui.ElectrumGui(config, self, plugins)
582 self.gui_object.main()
583 except BaseException as e:
584 self.logger.error(f'GUI raised exception: {repr(e)}. shutting down.')
585 raise
586 finally:
587 # app will exit now
588 self.on_stop()