tMerge branch 'master' of gitorious.org:electrum/electrum - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit b36a71cf93353aa9d390bd01396bd83bd23feee0 DIR parent 9e316b7d2ba72a89e8bf4253aae02cb3ac4a77f7 HTML Author: ThomasV <thomasv@gitorious> Date: Tue, 13 Mar 2012 16:44:10 +0100 Merge branch 'master' of gitorious.org:electrum/electrum Diffstat: M server/server.py | 115 ++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 31 deletions(-) --- DIR diff --git a/server/server.py b/server/server.py t@@ -75,12 +75,19 @@ bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.g stopping = False block_number = -1 +old_block_number = -1 sessions = {} +sessions_sub_numblocks = [] # sessions that have subscribed to the service + dblock = thread.allocate_lock() peer_list = {} wallets = {} # for ultra-light clients such as bccapi +from Queue import Queue +input_queue = Queue() +output_queue = Queue() + class MyStore(Datastore_class): def import_tx(self, tx, is_coinbase): t@@ -408,18 +415,7 @@ def poll_session(session_id): k = 0 for addr in addresses: if store.tx_cache.get( addr ) is not None: k += 1 - - # get addtess status, i.e. the last block for that address. - tx_points = store.get_history(addr) - if not tx_points: - status = None - else: - lastpoint = tx_points[-1] - status = lastpoint['blk_hash'] - # this is a temporary hack; move it up once old clients have disappeared - if status == 'mempool' and session['version'] != "old": - status = status + ':%d'% len(tx_points) - + status = get_address_status( addr ) last_status = addresses.get( addr ) if last_status != status: addresses[addr] = status t@@ -433,6 +429,36 @@ def poll_session(session_id): return out +def get_address_status(addr): + # get addtess status, i.e. the last block for that address. + tx_points = store.get_history(addr) + if not tx_points: + status = None + else: + lastpoint = tx_points[-1] + status = lastpoint['blk_hash'] + # this is a temporary hack; move it up once old clients have disappeared + if status == 'mempool': # and session['version'] != "old": + status = status + ':%d'% len(tx_points) + return status + + +def send_numblocks(session_id): + out = json.dumps( {'method':'numblocks.subscribe', 'result':block_number} ) + output_queue.put((session_id, out)) + +def subscribe_to_numblocks(session_id): + sessions_sub_numblocks.append(session_id) + send_numblocks(session_id) + +def subscribe_to_address(session_id, address): + #print "%s subscribing to %s"%(session_id,address) + status = get_address_status(address) + sessions[session_id]['type'] = 'subscribe' + sessions[session_id]['addresses'][address] = status + sessions[session_id]['last_time'] = time.time() + out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } ) + output_queue.put((session_id, out)) def new_session(version, addresses): session_id = random_string(10) t@@ -443,11 +469,6 @@ def new_session(version, addresses): sessions[session_id]['last_time'] = time.time() return out -def subscribe_to_address(session_id, address): - sessions[session_id]['addresses'][address] = '' - sessions[session_id]['last_time'] = time.time() - - def update_session(session_id,addresses): sessions[session_id]['addresses'] = {} for a in addresses: t@@ -611,6 +632,9 @@ def do_command(cmd, data, ipaddr): #################################################################### def tcp_server_thread(): + thread.start_new_thread(process_input_queue, ()) + thread.start_new_thread(process_output_queue, ()) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((config.get('server','host'), 50001)) t@@ -624,19 +648,19 @@ def tcp_server_thread(): traceback.print_exc(file=sys.stdout) - +# one thread per client. put requests in a queue. def tcp_client_thread(ipaddr,conn): """ use a persistent connection. put commands in a queue.""" print "persistent client thread", ipaddr global sessions session_id = random_string(10) - sessions[session_id] = { 'addresses':{}, 'version':'unknown' } + sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown' } ipaddr = ipaddr[0] msg = '' - while True: + while not stopping: d = conn.recv(1024) msg += d if not d: break t@@ -655,18 +679,41 @@ def tcp_client_thread(ipaddr,conn): print "syntax error", repr(c), ipaddr continue - out = None - if cmd == 'blockchain.address.subscribe': - subscribe_to_address(session_id,data) - elif cmd == 'client.version': - sessions[session_id]['version'] = data - elif cmd == 'server.banner': - out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } ) - else: - print "unknown command", cmd + # add to queue + input_queue.put((session_id, cmd, data)) + + +# read commands from the input queue. perform requests, etc. this should be called from the main thread. +def process_input_queue(): + while not stopping: + session_id, cmd, data = input_queue.get() + out = None + if cmd == 'address.subscribe': + subscribe_to_address(session_id,data) + elif cmd == 'numblocks.subscribe': + subscribe_to_numblocks(session_id) + elif cmd == 'client.version': + sessions[session_id]['version'] = data + elif cmd == 'server.banner': + out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } ) + elif cmd == 'address.get_history': + address = data + out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } ) + elif cmd == 'transaction.broadcast': + out = json.dumps( { 'method':'transaction.broadcast', 'result':send_tx(data) } ) + else: + print "unknown command", cmd + if out: + output_queue.put((session_id, out)) - if out: - conn.send(out+'\n') +# this is a separate thread +def process_output_queue(): + while not stopping: + session_id, out = output_queue.get() + session = sessions.get(session_id) + if session: + conn = session.get('conn') + conn.send(out+'\n') t@@ -835,6 +882,12 @@ if __name__ == '__main__': store.catch_up() memorypool_update(store) block_number = store.get_block_number(1) + + if block_number != old_block_number: + old_block_number = block_number + for session_id in sessions_sub_numblocks: + send_numblocks(session_id) + except IOError: print "IOError: cannot reach bitcoind" block_number = 0