URI: 
       tclean implementation of daemon threads - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 72688a5cfa25066d1fd6eee646b874725359ff04
   DIR parent 58f9ab3492dfb9f7401f92db6b75f97819e1e5d8
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Fri, 13 Mar 2015 23:04:29 +0100
       
       clean implementation of daemon threads
       
       Diffstat:
         M lib/blockchain.py                   |      41 ++++++-------------------------
         M lib/daemon.py                       |      27 ++++++---------------------
         M lib/network.py                      |      18 +++---------------
         M lib/network_proxy.py                |      11 ++---------
         M lib/synchronizer.py                 |      17 +++--------------
         M lib/util.py                         |      25 +++++++++++++++++++++++++
         M lib/verifier.py                     |      20 ++++----------------
         M lib/wallet.py                       |       5 ++---
       
       8 files changed, 53 insertions(+), 111 deletions(-)
       ---
   DIR diff --git a/lib/blockchain.py b/lib/blockchain.py
       t@@ -19,74 +19,53 @@
        
        import threading, time, Queue, os, sys, shutil
        from util import user_dir, print_error
       +import util
        from bitcoin import *
        
        
       -class Blockchain(threading.Thread):
       +class Blockchain(util.DaemonThread):
        
            def __init__(self, config, network):
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       +        util.DaemonThread.__init__(self)
                self.config = config
                self.network = network
                self.lock = threading.Lock()
                self.local_height = 0
       -        self.running = False
                self.headers_url = 'http://headers.electrum.org/blockchain_headers'
                self.set_local_height()
                self.queue = Queue.Queue()
        
       -
            def height(self):
                return self.local_height
        
       -
       -    def stop(self):
       -        with self.lock: self.running = False
       -
       -
       -    def is_running(self):
       -        with self.lock: return self.running
       -
       -
            def run(self):
                self.init_headers_file()
                self.set_local_height()
                print_error( "blocks:", self.local_height )
        
       -        with self.lock:
       -            self.running = True
       -
                while self.is_running():
       -
                    try:
       -                result = self.queue.get()
       +                result = self.queue.get(timeout=0.1)
                    except Queue.Empty:
                        continue
       -
       -            if not result: continue
       -
       +            if not result:
       +                continue
                    i, header = result
       -            if not header: continue
       -
       +            if not header:
       +                continue
                    height = header.get('block_height')
       -
                    if height <= self.local_height:
                        continue
       -
                    if height > self.local_height + 50:
                        if not self.get_and_verify_chunks(i, header, height):
                            continue
       -
                    if height > self.local_height:
                        # get missing parts from interface (until it connects to my chain)
                        chain = self.get_chain( i, header )
       -
                        # skip that server if the result is not consistent
                        if not chain:
                            print_error('e')
                            continue
       -
                        # verify the chain
                        if self.verify_chain( chain ):
                            print_error("height:", height, i.server)
       t@@ -96,13 +75,9 @@ class Blockchain(threading.Thread):
                            print_error("error", i.server)
                            # todo: dismiss that server
                            continue
       -
       -
                    self.network.new_blockchain_height(height, i)
        
        
       -
       -
            def verify_chain(self, chain):
        
                first_header = chain[0]
   DIR diff --git a/lib/daemon.py b/lib/daemon.py
       t@@ -66,18 +66,17 @@ def get_daemon(config, start_daemon=True):
        
        
        
       -class ClientThread(threading.Thread):
       +class ClientThread(util.DaemonThread):
        
            def __init__(self, server, s):
       -        threading.Thread.__init__(self)
       +        util.DaemonThread.__init__(self)
                self.server = server
       -        self.daemon = True
                self.client_pipe = util.SocketPipe(s)
                self.response_queue = Queue.Queue()
                self.server.add_client(self)
        
            def reading_thread(self):
       -        while self.running:
       +        while self.is_running():
                    try:
                        request = self.client_pipe.get()
                    except util.timeout:
       t@@ -91,9 +90,8 @@ class ClientThread(threading.Thread):
                    self.server.send_request(self, request)
        
            def run(self):
       -        self.running = True
                threading.Thread(target=self.reading_thread).start()
       -        while self.running:
       +        while self.is_running():
                    try:
                        response = self.response_queue.get(timeout=0.1)
                    except Queue.Empty:
       t@@ -109,11 +107,10 @@ class ClientThread(threading.Thread):
        
        
        
       -class NetworkServer(threading.Thread):
       +class NetworkServer(util.DaemonThread):
        
            def __init__(self, config):
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       +        util.DaemonThread.__init__(self)
                self.debug = False
                self.config = config
                self.network = Network(config)
       t@@ -128,18 +125,6 @@ class NetworkServer(threading.Thread):
                self.request_id = 0
                self.requests = {}
        
       -    def is_running(self):
       -        with self.lock:
       -            return self.running
       -
       -    def stop(self):
       -        with self.lock:
       -            self.running = False
       -
       -    def start(self):
       -        self.running = True
       -        threading.Thread.start(self)
       -
            def add_client(self, client):
                for key in ['status','banner','updated','servers','interfaces']:
                    value = self.network.get_status_value(key)
   DIR diff --git a/lib/network.py b/lib/network.py
       t@@ -120,20 +120,18 @@ def serialize_server(host, port, protocol):
            return str(':'.join([host, port, protocol]))
        
        
       -class Network(threading.Thread):
       +class Network(util.DaemonThread):
        
            def __init__(self, config=None):
                if config is None:
                    config = {}  # Do not use mutables as default values!
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       +        util.DaemonThread.__init__(self)
                self.config = SimpleConfig(config) if type(config) == type({}) else config
                self.lock = threading.Lock()
                self.num_server = 8 if not self.config.get('oneserver') else 0
                self.blockchain = Blockchain(self.config, self)
                self.interfaces = {}
                self.queue = Queue.Queue()
       -        self.running = False
                # Server for addresses and transactions
                self.default_server = self.config.get('server')
                # Sanitize default server
       t@@ -270,10 +268,9 @@ class Network(threading.Thread):
                self.response_queue = response_queue
                self.start_interfaces()
                t = threading.Thread(target=self.process_requests_thread)
       -        t.daemon = True
                t.start()
                self.blockchain.start()
       -        threading.Thread.start(self)
       +        util.DaemonThread.start(self)
        
            def set_proxy(self, proxy):
                self.proxy = proxy
       t@@ -540,15 +537,6 @@ class Network(threading.Thread):
                self.addresses[addr] = result
                self.response_queue.put(r)
        
       -    def stop(self):
       -        self.print_error("stopping network")
       -        with self.lock:
       -            self.running = False
       -
       -    def is_running(self):
       -        with self.lock:
       -            return self.running
       -
            def get_header(self, tx_height):
                return self.blockchain.read_header(tx_height)
        
   DIR diff --git a/lib/network_proxy.py b/lib/network_proxy.py
       t@@ -33,13 +33,13 @@ from daemon import NetworkServer
        
        
        
       -class NetworkProxy(threading.Thread):
       +class NetworkProxy(util.DaemonThread):
        
            def __init__(self, socket, config=None):
        
                if config is None:
                    config = {}  # Do not use mutables as default arguments!
       -        threading.Thread.__init__(self)
       +        util.DaemonThread.__init__(self)
                self.config = SimpleConfig(config) if type(config) == type({}) else config
                self.message_id = 0
                self.unanswered_requests = {}
       t@@ -48,8 +48,6 @@ class NetworkProxy(threading.Thread):
                self.lock = threading.Lock()
                self.pending_transactions_for_notifications = []
                self.callbacks = {}
       -        self.running = True
       -        self.daemon = True
        
                if socket:
                    self.pipe = util.SocketPipe(socket)
       t@@ -70,8 +68,6 @@ class NetworkProxy(threading.Thread):
                self.server_height = 0
                self.interfaces = []
        
       -    def is_running(self):
       -        return self.running
        
            def run(self):
                while self.is_running():
       t@@ -213,9 +209,6 @@ class NetworkProxy(threading.Thread):
            def set_parameters(self, *args):
                return self.synchronous_get([('network.set_parameters', args)])[0]
        
       -    def stop(self):
       -        self.running = False
       -
            def stop_daemon(self):
                return self.send([('daemon.stop',[])], None)
        
   DIR diff --git a/lib/synchronizer.py b/lib/synchronizer.py
       t@@ -22,31 +22,22 @@ import time
        import Queue
        
        import bitcoin
       +import util
        from util import print_error
        from transaction import Transaction
        
        
       -class WalletSynchronizer(threading.Thread):
       +class WalletSynchronizer(util.DaemonThread):
        
            def __init__(self, wallet, network):
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       +        util.DaemonThread.__init__(self)
                self.wallet = wallet
                self.network = network
                self.was_updated = True
       -        self.running = False
                self.lock = threading.Lock()
                self.queue = Queue.Queue()
                self.address_queue = Queue.Queue()
        
       -    def stop(self):
       -        with self.lock:
       -            self.running = False
       -
       -    def is_running(self):
       -        with self.lock:
       -            return self.running
       -
            def add(self, address):
                self.address_queue.put(address)
        
       t@@ -57,8 +48,6 @@ class WalletSynchronizer(threading.Thread):
                self.network.send(messages, self.queue.put)
        
            def run(self):
       -        with self.lock:
       -            self.running = True
                while self.is_running():
                    while not self.network.is_connected():
                        time.sleep(0.1)
   DIR diff --git a/lib/util.py b/lib/util.py
       t@@ -4,6 +4,7 @@ import shutil
        from datetime import datetime
        import urlparse
        import urllib
       +import threading
        
        class NotEnoughFunds(Exception): pass
        
       t@@ -20,6 +21,30 @@ class MyEncoder(json.JSONEncoder):
                return super(MyEncoder, self).default(obj)
        
        
       +class DaemonThread(threading.Thread):
       +    """ daemon thread that terminates cleanly """
       +
       +    def __init__(self):
       +        threading.Thread.__init__(self)
       +        self.parent_thread = threading.currentThread()
       +        self.running = False
       +        self.running_lock = threading.Lock()
       +
       +    def start(self):
       +        with self.running_lock:
       +            self.running = True
       +        return threading.Thread.start(self)
       +
       +    def is_running(self):
       +        with self.running_lock:
       +            return self.running and self.parent_thread.is_alive()
       +
       +    def stop(self):
       +        with self.running_lock:
       +            self.running = False
       +
       +
       +
        is_verbose = False
        def set_verbosity(b):
            global is_verbose
   DIR diff --git a/lib/verifier.py b/lib/verifier.py
       t@@ -18,28 +18,27 @@
        
        
        import threading, time, Queue, os, sys, shutil
       +
       +import util
        from util import user_dir, print_error
        from bitcoin import *
        
        
        
        
       -class TxVerifier(threading.Thread):
       +class TxVerifier(util.DaemonThread):
            """ Simple Payment Verification """
        
            def __init__(self, network, storage):
       -        threading.Thread.__init__(self)
       -        self.daemon = True
       +        util.DaemonThread.__init__(self)
                self.storage = storage
                self.network = network
                self.transactions    = {}                                 # requested verifications (with height sent by the requestor)
                self.verified_tx     = storage.get('verified_tx3',{})      # height, timestamp of verified transactions
                self.merkle_roots    = storage.get('merkle_roots',{})      # hashed by me
                self.lock = threading.Lock()
       -        self.running = False
                self.queue = Queue.Queue()
        
       -
            def get_confirmations(self, tx):
                """ return the number of confirmations of a monitored transaction. """
                with self.lock:
       t@@ -47,11 +46,9 @@ class TxVerifier(threading.Thread):
                        height, timestamp, pos = self.verified_tx[tx]
                        conf = (self.network.get_local_height() - height + 1)
                        if conf <= 0: timestamp = None
       -
                    elif tx in self.transactions:
                        conf = -1
                        timestamp = None
       -
                    else:
                        conf = 0
                        timestamp = None
       t@@ -87,17 +84,8 @@ class TxVerifier(threading.Thread):
                    if tx_hash not in self.transactions.keys():
                        self.transactions[tx_hash] = tx_height
        
       -    def stop(self):
       -        with self.lock: self.running = False
       -
       -    def is_running(self):
       -        with self.lock: return self.running
       -
            def run(self):
       -        with self.lock:
       -            self.running = True
                requested_merkle = []
       -
                while self.is_running():
                    # request missing tx
                    for tx_hash, tx_height in self.transactions.items():
   DIR diff --git a/lib/wallet.py b/lib/wallet.py
       t@@ -134,9 +134,8 @@ class WalletStorage(object):
        
            def write(self):
                s = json.dumps(self.data, indent=4, sort_keys=True)
       -        f = open(self.path,"w")
       -        f.write(s)
       -        f.close()
       +        with open(self.path,"w") as f:
       +            f.write(s)
                if 'ANDROID_DATA' not in os.environ:
                    import stat
                    os.chmod(self.path,stat.S_IREAD | stat.S_IWRITE)