URI: 
       tstratum http server - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 8d88b0702da6102f2f2aeae801ab679382506367
   DIR parent ec373602587223cab273f19fa399a80b5cd94fc1
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Mon, 19 Mar 2012 21:19:36 +0300
       
       stratum http server
       
       Diffstat:
         A server/StratumJSONRPCServer.py      |     296 +++++++++++++++++++++++++++++++
         M server/server.py                    |     140 +++++++++++++++++++++++--------
       
       2 files changed, 401 insertions(+), 35 deletions(-)
       ---
   DIR diff --git a/server/StratumJSONRPCServer.py b/server/StratumJSONRPCServer.py
       t@@ -0,0 +1,296 @@
       +import jsonrpclib
       +from jsonrpclib import Fault
       +from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
       +import SimpleXMLRPCServer
       +import SocketServer
       +import socket
       +import logging
       +import os
       +import types
       +import traceback
       +import sys
       +try:
       +    import fcntl
       +except ImportError:
       +    # For Windows
       +    fcntl = None
       +
       +import json
       +
       +def get_version(request):
       +    # must be a dict
       +    if 'jsonrpc' in request.keys():
       +        return 2.0
       +    if 'id' in request.keys():
       +        return 1.0
       +    return None
       +    
       +def validate_request(request):
       +    if type(request) is not types.DictType:
       +        fault = Fault(
       +            -32600, 'Request must be {}, not %s.' % type(request)
       +        )
       +        return fault
       +    rpcid = request.get('id', None)
       +    version = get_version(request)
       +    if not version:
       +        fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
       +        return fault        
       +    request.setdefault('params', [])
       +    method = request.get('method', None)
       +    params = request.get('params')
       +    param_types = (types.ListType, types.DictType, types.TupleType)
       +    if not method or type(method) not in types.StringTypes or \
       +        type(params) not in param_types:
       +        fault = Fault(
       +            -32600, 'Invalid request parameters or method.', rpcid=rpcid
       +        )
       +        return fault
       +    return True
       +
       +class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
       +
       +    def __init__(self, encoding=None):
       +        SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
       +                                        allow_none=True,
       +                                        encoding=encoding)
       +
       +    def _marshaled_dispatch(self, data, dispatch_method = None):
       +        response = None
       +        try:
       +            request = jsonrpclib.loads(data)
       +        except Exception, e:
       +            fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
       +            response = fault.response()
       +            return response
       +
       +        responses = []
       +        if type(request) is not types.ListType:
       +            request = [ request ]
       +
       +        for req_entry in request:
       +            result = validate_request(req_entry)
       +            if type(result) is Fault:
       +                responses.append(result.response())
       +                continue
       +            resp_entry = self._marshaled_single_dispatch(req_entry)
       +            if resp_entry is not None:
       +                responses.append(resp_entry)
       +
       +        # poll
       +        r = self._marshaled_single_dispatch({'method':'session.poll', 'params':[], 'id':'z' })
       +        r = jsonrpclib.loads(r)
       +        r = r.get('result')
       +        for item in r:
       +            responses.append(json.dumps(item))
       +            
       +        if len(responses) > 1:
       +            response = '[%s]' % ','.join(responses)
       +        elif len(responses) == 1:
       +            response = responses[0]
       +        else:
       +            response = ''
       +
       +        return response
       +
       +    def _marshaled_single_dispatch(self, request):
       +        # TODO - Use the multiprocessing and skip the response if
       +        # it is a notification
       +        # Put in support for custom dispatcher here
       +        # (See SimpleXMLRPCServer._marshaled_dispatch)
       +        method = request.get('method')
       +        params = request.get('params')
       +        if params is None: params=[]
       +        params = [ self.session_id, request['id'] ] + params
       +        print method, params
       +        try:
       +            response = self._dispatch(method, params)
       +        except:
       +            exc_type, exc_value, exc_tb = sys.exc_info()
       +            fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
       +            return fault.response()
       +        if 'id' not in request.keys() or request['id'] == None:
       +            # It's a notification
       +            return None
       +
       +        try:
       +            response = jsonrpclib.dumps(response,
       +                                        methodresponse=True,
       +                                        rpcid=request['id']
       +                                        )
       +            return response
       +        except:
       +            exc_type, exc_value, exc_tb = sys.exc_info()
       +            fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
       +            return fault.response()
       +
       +    def _dispatch(self, method, params):
       +        func = None
       +        try:
       +            func = self.funcs[method]
       +        except KeyError:
       +            if self.instance is not None:
       +                if hasattr(self.instance, '_dispatch'):
       +                    return self.instance._dispatch(method, params)
       +                else:
       +                    try:
       +                        func = SimpleXMLRPCServer.resolve_dotted_attribute(
       +                            self.instance,
       +                            method,
       +                            True
       +                            )
       +                    except AttributeError:
       +                        pass
       +        if func is not None:
       +            try:
       +                if type(params) is types.ListType:
       +                    response = func(*params)
       +                else:
       +                    response = func(**params)
       +                return response
       +            except TypeError:
       +                return Fault(-32602, 'Invalid parameters.')
       +            except:
       +                err_lines = traceback.format_exc().splitlines()
       +                trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
       +                fault = jsonrpclib.Fault(-32603, 'Server error: %s' % 
       +                                         trace_string)
       +                return fault
       +        else:
       +            return Fault(-32601, 'Method %s not supported.' % method)
       +
       +class StratumJSONRPCRequestHandler(
       +        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
       +    
       +    def do_GET(self):
       +        if not self.is_rpc_path_valid():
       +            self.report_404()
       +            return
       +        try:
       +            print "GET"
       +
       +            self.server.session_id = None
       +            c = self.headers.get('cookie')
       +            if c:
       +                if c[0:8]=='SESSION=':
       +                    print "found cookie", c[8:]
       +                    self.server.session_id = c[8:]
       +
       +            if self.server.session_id is None:
       +                r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
       +                r = jsonrpclib.loads(r)
       +                self.server.session_id = r.get('result')
       +                print "setting cookie", self.server.session_id
       +
       +            data = json.dumps([])
       +            response = self.server._marshaled_dispatch(data)
       +            self.send_response(200)
       +        except Exception, e:
       +            self.send_response(500)
       +            err_lines = traceback.format_exc().splitlines()
       +            trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
       +            fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
       +            response = fault.response()
       +            print "500", trace_string
       +        if response == None:
       +            response = ''
       +
       +        if hasattr(self.server, 'session_id'):
       +            if self.server.session_id:
       +                self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
       +                self.session_id = None
       +
       +        self.send_header("Content-type", "application/json-rpc")
       +        self.send_header("Content-length", str(len(response)))
       +        self.end_headers()
       +        self.wfile.write(response)
       +        self.wfile.flush()
       +        self.connection.shutdown(1)
       +
       +
       +    def do_POST(self):
       +        if not self.is_rpc_path_valid():
       +            self.report_404()
       +            return
       +        try:
       +            max_chunk_size = 10*1024*1024
       +            size_remaining = int(self.headers["content-length"])
       +            L = []
       +            while size_remaining:
       +                chunk_size = min(size_remaining, max_chunk_size)
       +                L.append(self.rfile.read(chunk_size))
       +                size_remaining -= len(L[-1])
       +            data = ''.join(L)
       +
       +            self.server.session_id = None
       +            c = self.headers.get('cookie')
       +            if c:
       +                if c[0:8]=='SESSION=':
       +                    print "found cookie", c[8:]
       +                    self.server.session_id = c[8:]
       +
       +            if self.server.session_id is None:
       +                r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
       +                r = jsonrpclib.loads(r)
       +                self.server.session_id = r.get('result')
       +                #print "setting cookie", self.server.session_id
       +
       +            response = self.server._marshaled_dispatch(data)
       +            self.send_response(200)
       +        except Exception, e:
       +            self.send_response(500)
       +            err_lines = traceback.format_exc().splitlines()
       +            trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
       +            fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
       +            response = fault.response()
       +            print "500", trace_string
       +        if response == None:
       +            response = ''
       +
       +        if hasattr(self.server, 'session_id'):
       +            if self.server.session_id:
       +                self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
       +                self.session_id = None
       +
       +        self.send_header("Content-type", "application/json-rpc")
       +        self.send_header("Content-length", str(len(response)))
       +        self.end_headers()
       +        self.wfile.write(response)
       +        self.wfile.flush()
       +        self.connection.shutdown(1)
       +
       +
       +class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
       +
       +    allow_reuse_address = True
       +
       +    def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
       +                 logRequests=True, encoding=None, bind_and_activate=True,
       +                 address_family=socket.AF_INET):
       +        self.logRequests = logRequests
       +        StratumJSONRPCDispatcher.__init__(self, encoding)
       +        # TCPServer.__init__ has an extra parameter on 2.6+, so
       +        # check Python version and decide on how to call it
       +        vi = sys.version_info
       +        self.address_family = address_family
       +        if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
       +            # Unix sockets can't be bound if they already exist in the
       +            # filesystem. The convention of e.g. X11 is to unlink
       +            # before binding again.
       +            if os.path.exists(addr): 
       +                try:
       +                    os.unlink(addr)
       +                except OSError:
       +                    logging.warning("Could not unlink socket %s", addr)
       +        # if python 2.5 and lower
       +        if vi[0] < 3 and vi[1] < 6:
       +            SocketServer.TCPServer.__init__(self, addr, requestHandler)
       +        else:
       +            SocketServer.TCPServer.__init__(self, addr, requestHandler,
       +                bind_and_activate)
       +        if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
       +            flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
       +            flags |= fcntl.FD_CLOEXEC
       +            fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
       +
       +
   DIR diff --git a/server/server.py b/server/server.py
       t@@ -78,6 +78,8 @@ old_block_number = -1
        sessions = {}
        sessions_sub_numblocks = {} # sessions that have subscribed to the service
        
       +m_sessions = [{}] # served by http
       +
        dblock = thread.allocate_lock()
        peer_list = {}
        
       t@@ -88,6 +90,8 @@ input_queue = Queue()
        output_queue = Queue()
        address_queue = Queue()
        
       +
       +
        class MyStore(Datastore_class):
        
            def import_block(self, b, chain_ids=frozenset()):
       t@@ -384,41 +388,39 @@ def random_string(N):
        
            
        
       -def cmd_stop(data):
       +def cmd_stop(_,__,pw):
            global stopping
       -    if password == data:
       +    if password == pw:
                stopping = True
                return 'ok'
            else:
                return 'wrong password'
        
       -def cmd_load(pw):
       +def cmd_load(_,__,pw):
            if password == pw:
                return repr( len(sessions) )
            else:
                return 'wrong password'
        
        
       -def clear_cache(pw):
       +def clear_cache(_,__,pw):
            if password == pw:
                store.tx_cache = {}
                return 'ok'
            else:
                return 'wrong password'
        
       -def get_cache(pw,addr):
       +def get_cache(_,__,pw,addr):
            if password == pw:
                return store.tx_cache.get(addr)
            else:
                return 'wrong password'
        
        
       -def poll_session(session_id):
       -    session = sessions.get(session_id)
       -    if session is None:
       -        print time.asctime(), "session not found", session_id
       -        out = repr( (-1, {}))
       -    else:
       +
       +
       +def modified_addresses(session):
       +    if 1:
                t1 = time.time()
                addresses = session['addresses']
                session['last_time'] = time.time()
       t@@ -427,16 +429,47 @@ def poll_session(session_id):
                for addr in addresses:
                    if store.tx_cache.get( addr ) is not None: k += 1
                    status = get_address_status( addr )
       -            last_status = addresses.get( addr )
       +            msg_id, last_status = addresses.get( addr )
                    if last_status != status:
       -                addresses[addr] = status
       +                addresses[addr] = msg_id, status
                        ret[addr] = status
       -        if ret:
       -            sessions[session_id]['addresses'] = addresses
       -        out = repr( (block_number, ret ) )
       +
                t2 = time.time() - t1 
       -        if t2 > 10:
       -            print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
       +        #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
       +        return ret, addresses
       +
       +
       +def poll_session(session_id): 
       +    # native
       +    session = sessions.get(session_id)
       +    if session is None:
       +        print time.asctime(), "session not found", session_id
       +        return -1, {}
       +    else:
       +        ret, addresses = modified_addresses(session)
       +        if ret: sessions[session_id]['addresses'] = addresses
       +        return repr( (block_number,ret))
       +
       +
       +def poll_session_json(session_id, message_id):
       +    session = m_sessions[0].get(session_id)
       +    if session is None:
       +        raise BaseException("session not found %s"%session_id)
       +    else:
       +        print "poll: session found", session_id
       +        out = []
       +        ret, addresses = modified_addresses(session)
       +        if ret: 
       +            m_sessions[0][session_id]['addresses'] = addresses
       +            for addr in ret:
       +                msg_id, status = addresses[addr]
       +                out.append(  { 'id':msg_id, 'result':status } )
       +
       +        msg_id, last_nb = session.get('numblocks')
       +        if last_nb:
       +            if last_nb != block_number:
       +                m_sessions[0][session_id]['numblocks'] = msg_id, block_number
       +                out.append( {'id':msg_id, 'result':block_number} )
        
                return out
        
       t@@ -444,6 +477,7 @@ def poll_session(session_id):
        def do_update_address(addr):
            # an address was involved in a transaction; we check if it was subscribed to in a session
            # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
       +
            for session_id in sessions.keys():
                session = sessions[session_id]
                if session.get('type') != 'persistent': continue
       t@@ -457,7 +491,6 @@ def do_update_address(addr):
                        send_status(session_id,message_id,addr,status)
                        sessions[session_id]['addresses'][addr] = (message_id,status)
        
       -
        def get_address_status(addr):
            # get address status, i.e. the last block for that address.
            tx_points = store.get_history(addr)
       t@@ -481,19 +514,36 @@ def send_status(session_id, message_id, address, status):
            out = json.dumps( { 'id':message_id, 'result':status } )
            output_queue.put((session_id, out))
        
       +def address_get_history_json(_,message_id,address):
       +    return store.get_history(address)
       +
        def subscribe_to_numblocks(session_id, message_id):
            sessions_sub_numblocks[session_id] = message_id
            send_numblocks(session_id)
        
       +def subscribe_to_numblocks_json(session_id, message_id):
       +    global m_sessions
       +    m_sessions[0][session_id]['numblocks'] = message_id,block_number
       +    return block_number
       +
        def subscribe_to_address(session_id, message_id, address):
            status = get_address_status(address)
            sessions[session_id]['addresses'][address] = (message_id, status)
            sessions[session_id]['last_time'] = time.time()
            send_status(session_id, message_id, address, status)
        
       +def add_address_to_session_json(session_id, message_id, address):
       +    global m_sessions
       +    sessions = m_sessions[0]
       +    status = get_address_status(address)
       +    sessions[session_id]['addresses'][address] = (message_id, status)
       +    sessions[session_id]['last_time'] = time.time()
       +    m_sessions[0] = sessions
       +    return status
       +
        def add_address_to_session(session_id, address):
            status = get_address_status(address)
       -    sessions[session_id]['addresses'][addr] = status
       +    sessions[session_id]['addresses'][addr] = ("", status)
            sessions[session_id]['last_time'] = time.time()
            return status
        
       t@@ -501,13 +551,30 @@ def new_session(version, addresses):
            session_id = random_string(10)
            sessions[session_id] = { 'addresses':{}, 'version':version }
            for a in addresses:
       -        sessions[session_id]['addresses'][a] = ''
       +        sessions[session_id]['addresses'][a] = ('','')
            out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
            sessions[session_id]['last_time'] = time.time()
            return out
        
       -def get_banner():
       -    print "get banner"
       +
       +def client_version_json(session_id, _, version):
       +    global m_sessions
       +    sessions = m_sessions[0]
       +    sessions[session_id]['version'] = version
       +    m_sessions[0] = sessions
       +
       +def create_session_json(_, __):
       +    sessions = m_sessions[0]
       +    session_id = random_string(10)
       +    print "creating session", session_id
       +    sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
       +    sessions[session_id]['last_time'] = time.time()
       +    m_sessions[0] = sessions
       +    return session_id
       +
       +
       +
       +def get_banner(_,__):
            return config.get('server','banner').replace('\\n','\n')
        
        def update_session(session_id,addresses):
       t@@ -892,26 +959,29 @@ def irc_thread():
                    s.close()
        
        
       +def get_peers_json(_,__):
       +    return peer_list.values()
        
        def http_server_thread(store):
            # see http://code.google.com/p/jsonrpclib/
            from SocketServer import ThreadingMixIn
       -    from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
       -    class SimpleThreadedJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): pass
       -    server = SimpleThreadedJSONRPCServer(( config.get('server','host'), 8081))
       -    server.register_function(lambda : peer_list.values(), 'server.peers')
       +    from StratumJSONRPCServer import StratumJSONRPCServer
       +    class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
       +    server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
       +    server.register_function(get_peers_json, 'server.peers')
            server.register_function(cmd_stop, 'stop')
            server.register_function(cmd_load, 'load')
       -    server.register_function(lambda : block_number, 'blocks')
            server.register_function(clear_cache, 'clear_cache')
            server.register_function(get_cache, 'get_cache')
            server.register_function(get_banner, 'server.banner')
            server.register_function(send_tx, 'transaction.broadcast')
       -    server.register_function(store.get_history, 'address.get_history')
       -    server.register_function(add_address_to_session, 'address.subscribe')
       -    server.register_function(new_session, 'session.new')
       -    server.register_function(update_session, 'session.update')
       -    server.register_function(poll_session, 'session.poll')
       +    server.register_function(address_get_history_json, 'address.get_history')
       +    server.register_function(add_address_to_session_json, 'address.subscribe')
       +    server.register_function(create_session_json, 'session.create') #internal message
       +    server.register_function(poll_session_json, 'session.poll')
       +    server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
       +    server.register_function(client_version_json, 'client.version')
       +    server.register_function(lambda a,b:None, 'ping')
            server.serve_forever()
        
        
       t@@ -927,7 +997,7 @@ if __name__ == '__main__':
                if cmd == 'load':
                    out = server.load(password)
                elif cmd == 'peers':
       -            out = server.peers()
       +            out = server.server.peers()
                elif cmd == 'stop':
                    out = server.stop(password)
                elif cmd == 'clear_cache':
       t@@ -939,7 +1009,7 @@ if __name__ == '__main__':
                elif cmd == 'tx':
                    out = server.transaction.broadcast(sys.argv[2])
                elif cmd == 'b':
       -            out = server.blocks()
       +            out = server.numblocks.subscribe()
                else:
                    out = "Unknown command: '%s'" % cmd
                print out