tmake daemon usable with the GUI - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 9ee0614edb3c53d482bb6e95ebcd8b7b5ed7d951 DIR parent ab41c6f940490883a7a2d6922e50aae7bf2bac7e HTML Author: ThomasV <thomasv@gitorious> Date: Thu, 24 Jul 2014 10:59:13 +0200 make daemon usable with the GUI Diffstat: M electrum | 75 +++++++++++++++++++++++++++---- M gui/qt/lite_window.py | 6 +++--- M gui/qt/main_window.py | 6 +++--- M lib/__init__.py | 3 ++- M lib/commands.py | 14 +------------- M lib/daemon.py | 269 ++++++++----------------------- M lib/network.py | 23 +---------------------- A lib/network_proxy.py | 187 +++++++++++++++++++++++++++++++ M lib/synchronizer.py | 6 ++++-- M setup.py | 1 + 10 files changed, 332 insertions(+), 258 deletions(-) --- DIR diff --git a/electrum b/electrum t@@ -69,6 +69,7 @@ def arg_parser(): parser.add_option("-g", "--gui", dest="gui", help="User interface: qt, lite, gtk, text or stdio") parser.add_option("-w", "--wallet", dest="wallet_path", help="wallet path (default: electrum.dat)") parser.add_option("-o", "--offline", action="store_true", dest="offline", default=False, help="remain offline") + parser.add_option("-d", "--daemon", action="store_true", dest="daemon", default=False, help="use daemon") parser.add_option("-C", "--concealed", action="store_true", dest="concealed", default=False, help="don't echo seed to console when restoring") parser.add_option("-a", "--all", action="store_true", dest="show_all", default=False, help="show all addresses") parser.add_option("-l", "--labels", action="store_true", dest="show_labels", default=False, help="show the labels of listed addresses") t@@ -109,11 +110,9 @@ def run_command(cmd, password=None, args=None): if args is None: args = [] # Do not use mutables as default values! if cmd.requires_network and not options.offline: - network = NetworkProxy(config) - if not network.start(start_daemon= (True if cmd.name!='daemon' else False)): - print "Daemon not running" - sys.exit(1) - + s = daemon_socket() + network = NetworkProxy(s, config) + network.start() if wallet: wallet.start_threads(network) wallet.update() t@@ -133,6 +132,7 @@ def run_command(cmd, password=None, args=None): if cmd.requires_network and not options.offline: if wallet: wallet.stop_threads() + network.stop() if type(result) == str: t@@ -142,6 +142,28 @@ def run_command(cmd, password=None, args=None): +def daemon_socket(start_daemon=True): + import socket + from electrum.daemon import DAEMON_PORT + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + daemon_port = config.get('daemon_port', DAEMON_PORT) + daemon_started = False + while True: + try: + s.connect(('', daemon_port)) + return s + except socket.error: + if not start_daemon: + return False + elif not daemon_started: + import subprocess + logfile = open(os.path.join(config.path, 'daemon.log'),'w') + p = subprocess.Popen([__file__,"daemon","start"], stderr=logfile, stdout=logfile) + print "starting daemon (PID %d)"%p.pid + daemon_started = True + else: + time.sleep(0.1) + t@@ -193,8 +215,13 @@ if __name__ == '__main__': # network interface if not options.offline: - network = Network(config) - network.start() + if options.daemon: + s = daemon_socket() + network = NetworkProxy(s, config) + network.start() + else: + network = Network(config) + network.start() else: network = None t@@ -209,6 +236,38 @@ if __name__ == '__main__': time.sleep(0.1) sys.exit(0) + if cmd == 'daemon': + arg = args[1] + if arg=='start': + from electrum import daemon, util + util.set_verbosity(True) + print_stderr( "Starting daemon [%s]"%config.get('server')) + server = daemon.NetworkServer(config) + try: + server.main_loop() + except KeyboardInterrupt: + print_msg("Ctrl C - Stopping server") + sys.exit(0) + elif arg in ['status','stop']: + s = daemon_socket(start_daemon=False) + if not s: + print_msg("Daemon not running") + sys.exit(1) + network = NetworkProxy(s, config) + network.start() + if arg == 'status': + print_json({ + 'server':network.main_server(), + 'connected':network.is_connected() + }) + elif arg == 'stop': + network.stop_daemon() + network.stop() + else: + print "unknown command \"%s\""% arg + sys.exit(0) + + if cmd not in known_commands: cmd = 'help' t@@ -217,7 +276,6 @@ if __name__ == '__main__': # instanciate wallet for command-line storage = WalletStorage(config) - if cmd.name in ['create', 'restore']: if storage.file_exists: sys.exit("Error: Remove the existing wallet first!") t@@ -428,6 +486,5 @@ if __name__ == '__main__': else: run_command(cmd, password, args) - time.sleep(0.1) sys.exit(0) DIR diff --git a/gui/qt/lite_window.py b/gui/qt/lite_window.py t@@ -811,9 +811,9 @@ class MiniDriver(QObject): def update(self): if not self.network: self.initializing() - elif not self.network.interface: - self.initializing() - elif not self.network.interface.is_connected: + #elif not self.network.interface: + # self.initializing() + elif not self.network.is_connected(): self.connecting() if self.g.wallet is None: DIR diff --git a/gui/qt/main_window.py b/gui/qt/main_window.py t@@ -471,9 +471,9 @@ class ElectrumWindow(QMainWindow): if not self.wallet.up_to_date: text = _("Synchronizing...") icon = QIcon(":icons/status_waiting.png") - elif self.network.server_lag > 1: - text = _("Server is lagging (%d blocks)"%self.network.server_lag) - icon = QIcon(":icons/status_lagging.png") + #elif self.network.server_lag > 1: + # text = _("Server is lagging (%d blocks)"%self.network.server_lag) + # icon = QIcon(":icons/status_lagging.png") else: c, u = self.wallet.get_account_balance(self.current_account) text = _( "Balance" ) + ": %s "%( self.format_amount(c) ) + self.base_unit() DIR diff --git a/lib/__init__.py b/lib/__init__.py t@@ -14,4 +14,5 @@ from plugins import BasePlugin from mnemonic import mn_encode as mnemonic_encode from mnemonic import mn_decode as mnemonic_decode from commands import Commands, known_commands -from daemon import NetworkProxy, NetworkServer +from daemon import NetworkServer +from network_proxy import NetworkProxy DIR diff --git a/lib/commands.py b/lib/commands.py t@@ -19,7 +19,7 @@ import datetime import time import copy -from util import print_msg, format_satoshis +from util import print_msg, format_satoshis, print_stderr from bitcoin import is_valid, hash_160_to_bc_address, hash_160 from decimal import Decimal import bitcoin t@@ -105,7 +105,6 @@ register_command('verifymessage', 3,-1, False, False, False, 'Verifies a register_command('encrypt', 2,-1, False, False, False, 'encrypt a message with pubkey','encrypt <pubkey> <message>') register_command('decrypt', 2,-1, False, True, True, 'decrypt a message encrypted with pubkey','decrypt <pubkey> <message>') -register_command('daemon', 1, 1, True, False, False, '<stop|status>') register_command('getproof', 1, 1, True, False, False, 'get merkle proof', 'getproof <address>') register_command('getutxoaddress', 2, 2, True, False, False, 'get the address of an unspent transaction output','getutxoaddress <txid> <pos>') register_command('sweep', 2, 3, True, False, False, 'Sweep a private key.', 'sweep privkey addr [fee]') t@@ -133,17 +132,6 @@ class Commands: def getaddresshistory(self, addr): return self.network.synchronous_get([ ('blockchain.address.get_history',[addr]) ])[0] - def daemon(self, arg): - if arg=='stop': - return self.network.stop() - elif arg=='status': - return { - 'server':self.network.main_server(), - 'connected':self.network.is_connected() - } - else: - return "unknown command \"%s\""% arg - def listunspent(self): l = copy.deepcopy(self.wallet.get_unspent_coins()) for i in l: i["value"] = str(Decimal(i["value"])/100000000) DIR diff --git a/lib/daemon.py b/lib/daemon.py t@@ -25,182 +25,29 @@ import traceback import json import Queue from network import Network -from util import print_msg, print_stderr +from util import print_error, print_stderr from simple_config import SimpleConfig -DAEMON_PORT=8001 - -class NetworkProxy(threading.Thread): - # connects to daemon - # sends requests, runs callbacks - - def __init__(self, config=None): - if config is None: - config = {} # Do not use mutables as default arguments! - threading.Thread.__init__(self) - self.daemon = True - self.config = SimpleConfig(config) if type(config) == type({}) else config - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.daemon_port = config.get('daemon_port', DAEMON_PORT) - self.message_id = 0 - self.unanswered_requests = {} - self.subscriptions = {} - self.debug = False - self.lock = threading.Lock() - self.pending_transactions_for_notifications = [] - - - def start(self, start_daemon=False): - daemon_started = False - while True: - try: - self.socket.connect(('', self.daemon_port)) - threading.Thread.start(self) - return True - - except socket.error: - if not start_daemon: - return False - - elif not daemon_started: - print_stderr( "Starting daemon [%s]"%self.config.get('server')) - daemon_started = True - pid = os.fork() - if (pid == 0): # The first child. - os.chdir("/") - os.setsid() - os.umask(0) - pid2 = os.fork() - if (pid2 == 0): # Second child - server = NetworkServer(self.config) - try: - server.main_loop() - except KeyboardInterrupt: - print "Ctrl C - Stopping server" - sys.exit(1) - sys.exit(0) - else: - time.sleep(0.1) - - - - def parse_json(self, message): - s = message.find('\n') - if s==-1: - return None, message - j = json.loads( message[0:s] ) - return j, message[s+1:] - - - def run(self): - # read responses and trigger callbacks - message = '' - while True: - try: - data = self.socket.recv(1024) - except: - data = '' - if not data: - break - - message += data - while True: - response, message = self.parse_json(message) - if response is not None: - self.process(response) - else: - break - - print "NetworkProxy: exiting" - - def process(self, response): - # runs callbacks - if self.debug: print "<--", response - - msg_id = response.get('id') - with self.lock: - method, params, callback = self.unanswered_requests.pop(msg_id) - - result = response.get('result') - callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id}) - - - def subscribe(self, messages, callback): - # detect if it is a subscription - with self.lock: - if self.subscriptions.get(callback) is None: - self.subscriptions[callback] = [] - for message in messages: - if message not in self.subscriptions[callback]: - self.subscriptions[callback].append(message) - - self.send( messages, callback ) - - - def send(self, messages, callback): - """return the ids of the requests that we sent""" - out = '' - ids = [] - for m in messages: - method, params = m - request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params, callback - ids.append(self.message_id) - if self.debug: print "-->", request - self.message_id += 1 - out += request + '\n' - while out: - sent = self.socket.send( out ) - out = out[sent:] - return ids - - - def synchronous_get(self, requests, timeout=100000000): - queue = Queue.Queue() - ids = self.send(requests, lambda i,x: queue.put(x)) - id2 = ids[:] - res = {} - while ids: - r = queue.get(True, timeout) - _id = r.get('id') - if _id in ids: - ids.remove(_id) - res[_id] = r.get('result') - out = [] - for _id in id2: - out.append(res[_id]) - return out - - - def get_servers(self): - return self.synchronous_get([('network.get_servers',[])])[0] - - def get_header(self, height): - return self.synchronous_get([('network.get_header',[height])])[0] - - def get_local_height(self): - return self.synchronous_get([('network.get_local_height',[])])[0] - - def is_connected(self): - return self.synchronous_get([('network.is_connected',[])])[0] - - def is_up_to_date(self): - return self.synchronous_get([('network.is_up_to_date',[])])[0] - - def main_server(self): - return self.synchronous_get([('network.main_server',[])])[0] - - def stop(self): - return self.synchronous_get([('daemon.shutdown',[])])[0] - - - def trigger_callback(self, cb): - pass +""" +The Network object is not aware of clients/subscribers +It only does subscribe/unsubscribe to addresses +Which client has wich address is managed by the daemon +Network also reports status changes +""" +DAEMON_PORT=8001 +def parse_json(message): + n = message.find('\n') + if n==-1: + return None, message + try: + j = json.loads( message[0:n] ) + except: + j = None + return j, message[n+1:] t@@ -208,12 +55,11 @@ class ClientThread(threading.Thread): # read messages from client (socket), and sends them to Network # responses are sent back on the same socket - def __init__(self, server, network, socket): + def __init__(self, server, network, s): threading.Thread.__init__(self) self.server = server self.daemon = True - self.s = socket - self.s.settimeout(0.1) + self.s = s self.network = network self.queue = Queue.Queue() self.unanswered_requests = {} t@@ -221,6 +67,7 @@ class ClientThread(threading.Thread): def run(self): + self.server.add_client(self) message = '' while True: self.send_responses() t@@ -228,30 +75,25 @@ class ClientThread(threading.Thread): data = self.s.recv(1024) except socket.timeout: continue - + except: + data = '' if not data: break message += data - while True: - cmd, message = self.parse_json(message) + cmd, message = parse_json(message) if not cmd: break self.process(cmd) - #print "client thread terminating" + self.server.remove_client(self) - def parse_json(self, message): - n = message.find('\n') - if n==-1: - return None, message - j = json.loads( message[0:n] ) - return j, message[n+1:] def process(self, request): - if self.debug: print "<--", request + if self.debug: + print_error("<--", request) method = request['method'] params = request['params'] _id = request['id'] t@@ -269,11 +111,6 @@ class ClientThread(threading.Thread): self.queue.put(out) return - if method == 'daemon.shutdown': - self.server.running = False - self.queue.put({'id':_id, 'result':True}) - return - def cb(i,r): _id = r.get('id') if _id is not None: t@@ -295,8 +132,8 @@ class ClientThread(threading.Thread): while out: n = self.s.send(out) out = out[n:] - if self.debug: print "-->", r - + if self.debug: + print_error("-->", r) t@@ -304,39 +141,61 @@ class NetworkServer: def __init__(self, config): network = Network(config) - if not network.start(wait=True): - print_msg("Not connected, aborting.") - sys.exit(1) + network.start(wait=False) self.network = network - self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.daemon_port = config.get('daemon_port', DAEMON_PORT) - self.server.bind(('', self.daemon_port)) - self.server.listen(5) - self.server.settimeout(1) + self.socket.bind(('', self.daemon_port)) + self.socket.listen(5) + self.socket.settimeout(1) self.running = False self.timeout = config.get('daemon_timeout', 60) + # + self.lock = threading.RLock() + self.clients = [] + # need to know which client subscribed to which address + # + # report status + self.network.status_callback = self.on_status + + def add_client(self, client): + with self.lock: + self.clients.append(client) + + def remove_client(self, client): + with self.lock: + self.clients.remove(client) + print_error("client quit:", len(self.clients)) + + def on_status(self, status): + for client in self.clients: + client.queue.put({'method':'network.subscribe', 'status':status}) def main_loop(self): self.running = True t = time.time() while self.running: try: - connection, address = self.server.accept() + connection, address = self.socket.accept() except socket.timeout: - if time.time() - t > self.timeout: - break + if not self.clients: + if time.time() - t > self.timeout: + break + else: + t = time.time() continue - t = time.time() client = ClientThread(self, self.network, connection) client.start() + print_error("daemon: timed out") if __name__ == '__main__': - import simple_config - config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'}) + import simple_config, util + config = simple_config.SimpleConfig() + util.set_verbosity(True) server = NetworkServer(config) try: server.main_loop() DIR diff --git a/lib/network.py b/lib/network.py t@@ -221,6 +221,7 @@ class Network(threading.Thread): self.start_interfaces() threading.Thread.start(self) if wait: + raise return self.wait_until_connected() def wait_until_connected(self): t@@ -420,25 +421,3 @@ class Network(threading.Thread): return self.blockchain.height() - - #def retrieve_transaction(self, tx_hash, tx_height=0): - # import transaction - # r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0] - # if r: - # return transaction.Transaction(r) - - - - - -if __name__ == "__main__": - network = NetworkProxy({}) - network.start() - print network.get_servers() - - q = Queue.Queue() - network.send([('blockchain.headers.subscribe',[])], q.put) - while True: - r = q.get(timeout=10000) - print r - DIR diff --git a/lib/network_proxy.py b/lib/network_proxy.py t@@ -0,0 +1,187 @@ +#!/usr/bin/env python +# +# Electrum - lightweight Bitcoin client +# Copyright (C) 2014 Thomas Voegtlin +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import socket +import time +import sys +import os +import threading +import traceback +import json +import Queue +from network import Network +from util import print_error, print_stderr +from simple_config import SimpleConfig + +from daemon import parse_json, NetworkServer, DAEMON_PORT + + + + + +class NetworkProxy(threading.Thread): + + def __init__(self, socket, config=None): + if config is None: + config = {} # Do not use mutables as default arguments! + threading.Thread.__init__(self) + self.config = SimpleConfig(config) if type(config) == type({}) else config + self.socket = socket + self.socket.settimeout(0.1) + self.message_id = 0 + self.unanswered_requests = {} + self.subscriptions = {} + self.debug = False + self.lock = threading.Lock() + self.pending_transactions_for_notifications = [] + self.banner = '' + self.callbacks = {} + self.running = True + self.daemon = True + + def is_running(self): + return self.running + + def run(self): + # read responses and trigger callbacks + message = '' + while self.is_running(): + try: + data = self.socket.recv(1024) + except socket.timeout: + continue + except: + data = '' + if not data: + break + message += data + while True: + response, message = parse_json(message) + if response is not None: + self.process(response) + else: + break + # fixme: server does not detect if we don't call shutdown + self.socket.shutdown(2) + self.socket.close() + print_error("NetworkProxy thread terminating") + + def process(self, response): + if self.debug: + print_error("<--", response) + + if response.get('method') == 'network.subscribe': + status = response.get('status') + self.trigger_callback(status) + return + + msg_id = response.get('id') + with self.lock: + method, params, callback = self.unanswered_requests.pop(msg_id) + + result = response.get('result') + callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id}) + + + def subscribe(self, messages, callback): + # detect if it is a subscription + with self.lock: + if self.subscriptions.get(callback) is None: + self.subscriptions[callback] = [] + for message in messages: + if message not in self.subscriptions[callback]: + self.subscriptions[callback].append(message) + + self.send( messages, callback ) + + + def send(self, messages, callback): + """return the ids of the requests that we sent""" + with self.lock: + out = '' + ids = [] + for m in messages: + method, params = m + request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) + self.unanswered_requests[self.message_id] = method, params, callback + ids.append(self.message_id) + if self.debug: + print_error("-->", request) + self.message_id += 1 + out += request + '\n' + while out: + sent = self.socket.send( out ) + out = out[sent:] + return ids + + + def synchronous_get(self, requests, timeout=100000000): + queue = Queue.Queue() + ids = self.send(requests, lambda i,x: queue.put(x)) + id2 = ids[:] + res = {} + while ids: + r = queue.get(True, timeout) + _id = r.get('id') + if _id in ids: + ids.remove(_id) + res[_id] = r.get('result') + else: + raise + out = [] + for _id in id2: + out.append(res[_id]) + return out + + + def get_servers(self): + return self.synchronous_get([('network.get_servers',[])])[0] + + def get_header(self, height): + return self.synchronous_get([('network.get_header',[height])])[0] + + def get_local_height(self): + return self.synchronous_get([('network.get_local_height',[])])[0] + + def is_connected(self): + return self.synchronous_get([('network.is_connected',[])])[0] + + def is_up_to_date(self): + return self.synchronous_get([('network.is_up_to_date',[])])[0] + + def main_server(self): + return self.synchronous_get([('network.main_server',[])])[0] + + def stop(self): + self.running = False + + + def register_callback(self, event, callback): + with self.lock: + if not self.callbacks.get(event): + self.callbacks[event] = [] + self.callbacks[event].append(callback) + + + def trigger_callback(self, event): + with self.lock: + callbacks = self.callbacks.get(event,[])[:] + if callbacks: + [callback() for callback in callbacks] + + print_error("trigger_callback", event, len(callbacks)) DIR diff --git a/lib/synchronizer.py b/lib/synchronizer.py t@@ -56,8 +56,10 @@ class WalletSynchronizer(threading.Thread): with self.lock: self.running = True while self.is_running(): - if not self.network.is_connected(): - self.network.wait_until_connected() + while not self.network.is_connected(): + import time + time.sleep(5) + #self.network.wait_until_connected() self.run_interface() def run_interface(self): DIR diff --git a/setup.py b/setup.py t@@ -76,6 +76,7 @@ setup( 'electrum.mnemonic', 'electrum.msqr', 'electrum.network', + 'electrum.network_proxy', 'electrum.paymentrequest', 'electrum.paymentrequest_pb2', 'electrum.plugins',