URI: 
       tMerge branch 'master' of gitorious.org:electrum/electrum - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 782cc4fe226a099332fe169aa52a4cd0febd1cc6
   DIR parent 6d8965401cba90565b358935979417b8a50281c9
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Sun, 18 Mar 2012 01:33:57 +0300
       
       Merge branch 'master' of gitorious.org:electrum/electrum
       
       Diffstat:
         A client/blocks                       |      12 ++++++++++++
         M client/electrum                     |       6 +++++-
         M client/interface.py                 |     379 +++++++++++++++++--------------
         M client/wallet.py                    |       7 ++++---
       
       4 files changed, 230 insertions(+), 174 deletions(-)
       ---
   DIR diff --git a/client/blocks b/client/blocks
       t@@ -0,0 +1,12 @@
       +#!/usr/bin/env python
       +
       +import socket, time, interface
       +
       +def cb(block_number):  
       +    print block_number
       +
       +i = interface.AsynchronousInterface('ecdsa.org', 50001, newblock_callback=cb)
       +i.start_session([],"blocks")
       +
       +while True:
       +    time.sleep(1)
   DIR diff --git a/client/electrum b/client/electrum
       t@@ -164,7 +164,11 @@ if __name__ == '__main__':
        
            # open session
            if cmd not in ['password', 'mktx', 'history', 'label', 'contacts', 'help', 'validateaddress', 'signmessage', 'verifymessage', 'eval', 'create', 'addresses', 'import']:
       -        interface.start_session(wallet)
       +
       +        addresses = wallet.all_addresses()
       +        version = wallet.electrum_version
       +        address_callback = wallet.retrieve_status_callback
       +        interface.start_session(addresses, version, address_callback)
                interface.update_wallet(wallet)
                wallet.save()
        
   DIR diff --git a/client/interface.py b/client/interface.py
       t@@ -27,21 +27,32 @@ DEFAULT_SERVERS = ['ecdsa.org','electrum.novit.ro']  # list of default servers
        
        
        class Interface:
       -    def __init__(self, host, port):
       +    def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
                self.host = host
                self.port = port
       +        self.address_callback = address_callback
       +        self.history_callback = history_callback
       +        self.newblock_callback = newblock_callback
        
                self.servers = DEFAULT_SERVERS                            # actual list from IRC
                self.rtime = 0
                self.blocks = 0 
                self.message = ''
                self.was_updated = True # fixme: use a semaphore
       -        self.is_up_to_date = False # True after the first poll
       +        self.is_up_to_date = False
        
                self.is_connected = False
                self.disconnected_event = threading.Event()
                self.disconnected_event.clear()
        
       +        #only asynchrnous
       +        self.addresses_waiting_for_status = []
       +        self.addresses_waiting_for_history = []
       +        self.tx_event = threading.Event()
       +        self.up_to_date_event = threading.Event()
       +        self.up_to_date_event.clear()
       +
       +
            def send_tx(self, data):
                out = self.handler('transaction.broadcast', data )
                return out
       t@@ -49,84 +60,98 @@ class Interface:
            def get_servers(self):
                pass
        
       -    def start_session(self, wallet):
       +    def start_session(self, addresses, version):
                pass
        
        
       -class NativeInterface(Interface):
       -    """This is the original Electrum protocol. It uses polling, and a non-persistent tcp connection"""
       +    def handle_json_response(self, c):
       +        #print c
       +        msg_id = c.get('id')
       +        result = c.get('result')
       +        error = c.get('error')
       +        if msg_id is None:
       +            print "error: message without ID"
       +            return
        
       -    def __init__(self, host, port):
       -        Interface.__init__(self, host, port)
       +        method, params = self.messages[msg_id]
       +        if error:
       +            print "received error:", c, method, params
       +        else:
       +            self.handle_response(method, params, result)
        
       -    def start_session(self, wallet):
       -        addresses = wallet.all_addresses()
       -        version = wallet.electrum_version
       -        out = self.handler('session.new', [ version, addresses ] )
       -        self.session_id, self.message = ast.literal_eval( out )
       -        thread.start_new_thread(self.poll_thread, (wallet,))
        
       -    def update_session(self, addresses):
       -        out = self.handler('session.update', [ self.session_id, addresses ] )
       -        return out    
        
       -    def handler(self, method, params = ''):
       -        import time
       -        cmds = {'session.new':'new_session',
       -                'peers':'peers',
       -                'session.poll':'poll',
       -                'session.update':'update_session',
       -                'transaction.broadcast':'tx',
       -                'address.get_history':'h',
       -                'address.subscribe':'address.subscribe'
       -                }
       -        cmd = cmds[method]
       -        if type(params) != type(''): params = repr( params )
       -        t1 = time.time()
       -        request = repr ( (cmd, params) ) + "#"
       -        s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
       -        s.settimeout(DEFAULT_TIMEOUT)
       -        s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) )
       -        s.send( request )
       -        out = ''
       -        while 1:
       -            msg = s.recv(1024)
       -            if msg: out += msg
       -            else: break
       -        s.close()
       -        self.rtime = time.time() - t1
       -        self.is_connected = True
       -        if cmd in[ 'peers','h']:
       -            out = ast.literal_eval( out )
       -        return out
       +    def handle_response(self, method, params, result):
        
       -    def poll_interval(self):
       -        return 5
       -
       -    def retrieve_history(self, address):
       -        out = self.handler('address.get_history', address )
       -        return out
       -
       -    def get_history(self, addr, history_callback):
       -        data = self.retrieve_history(addr)
       -        apply(history_callback, (addr, data) )
       -        self.was_updated = True
       +        if method == 'session.new':
       +            self.session_id, self.message = ast.literal_eval( result )
       +            self.was_updated = True
        
       -    def subscribe(self, addr, status_callback):
       -        status = self.handler('address.subscribe', [ self.session_id, addr ] )
       -        apply(status_callback, (addr, status) )
       +        elif method == 'server.banner':
       +            self.message = result
       +            self.was_updated = True
        
       -    def update_wallet(self, wallet):
       -        while True:
       -            changed_addresses = self.poll()
       +        elif method == 'session.poll':
       +            blocks, changed_addresses = ast.literal_eval( result )
       +            if blocks == -1: raise BaseException("session not found")
       +            self.blocks = int(blocks)
                    if changed_addresses:
                        self.is_up_to_date = False
       +                self.was_updated = True
       +                for addr, status in changed_addresses.items():
       +                    apply(self.address_callback, (addr, status))
                    else:
                        self.is_up_to_date = True
       -                break
        
       -            for addr, status in changed_addresses.items():
       -                wallet.receive_status_callback(addr, status)
       +        elif method == 'server.peers':
       +            self.servers = map( lambda x:x[1], result )
       +
       +        elif method == 'address.subscribe':
       +            addr = params[-1]
       +            if addr in self.addresses_waiting_for_status:
       +                self.addresses_waiting_for_status.remove(addr)
       +            apply(self.address_callback,(addr, result))
       +                            
       +        elif method == 'address.get_history':
       +            addr = params[0]
       +            if addr in self.addresses_waiting_for_history:
       +                self.addresses_waiting_for_history.remove(addr)
       +            apply(self.history_callback, (addr, result))
       +            self.was_updated = True
       +
       +        elif method == 'transaction.broadcast':
       +            self.tx_result = result
       +            self.tx_event.set()
       +
       +        elif method == 'numblocks.subscribe':
       +            self.blocks = result
       +            if self.newblock_callback: apply(self.newblock_callback,(result,))
       +        else:
       +            print "received message:", c, method, params
       +
       +
       +
       +class PollingInterface(Interface):
       +    """ non-persistent connection. synchronous calls"""
       +
       +    def start_session(self, addresses, version):
       +        self.handler([('session.new', [ version, addresses ])] )
       +        thread.start_new_thread(self.poll_thread, ())
       +
       +    def poll_interval(self):
       +        return 5
       +
       +    def get_history(self, address):
       +        self.handler([('address.get_history', [address] )])
       +
       +    def subscribe(self, addresses):
       +        for addr in addresses:
       +            self.handler([('address.subscribe', [ self.session_id, addr ] )])
       +
       +    def update_wallet(self):
       +        while True:
       +            self.handler([('session.poll', self.session_id )])
       +            if self.is_up_to_date: break
        
                #if is_new or wallet.remote_url:
                #    self.was_updated = True
       t@@ -137,17 +162,10 @@ class NativeInterface(Interface):
                #else:
                #    return False
        
       -    def poll(self):
       -        out = self.handler('session.poll', self.session_id )
       -        blocks, changed_addr = ast.literal_eval( out )
       -        if blocks == -1: raise BaseException("session not found")
       -        self.blocks = int(blocks)
       -        return changed_addr
       -
       -    def poll_thread(self, wallet):
       +    def poll_thread(self):
                while self.is_connected:
                    try:
       -                self.update_wallet(wallet)
       +                self.update_wallet()
                        time.sleep(self.poll_interval())
                    except socket.gaierror:
                        break
       t@@ -171,8 +189,7 @@ class NativeInterface(Interface):
                    for server in DEFAULT_SERVERS:
                        try:
                            self.peers_server = server
       -                    out = self.handler('peers')
       -                    self.servers = map( lambda x:x[1], out )
       +                    self.handler([('server.peers',[])])
                            # print "Received server list from %s" % self.peers_server, out
                            break
                        except socket.timeout:
       t@@ -186,53 +203,100 @@ class NativeInterface(Interface):
        
        
        
       -class HttpInterface(NativeInterface):
        
       -    def handler(self, method, params = []):
       +
       +class NativeInterface(PollingInterface):
       +
       +    def handler(self, messages):
       +        import time
       +        cmds = {'session.new':'new_session',
       +                'server.peers':'peers',
       +                'session.poll':'poll',
       +                'transaction.broadcast':'tx',
       +                'address.get_history':'h',
       +                'address.subscribe':'address.subscribe'
       +                }
       +
       +        for m in messages:
       +            method, params = m
       +            cmd = cmds[method]
       +
       +            if cmd=='h':
       +                str_params = params[0]
       +            elif type(params) != type(''): 
       +                str_params = repr( params )
       +            else:
       +                str_params = params
       +            t1 = time.time()
       +            request = repr ( (cmd, str_params) ) + "#"
       +            s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
       +            s.settimeout(DEFAULT_TIMEOUT)
       +            s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) )
       +            s.send( request )
       +            out = ''
       +            while 1:
       +                msg = s.recv(1024)
       +                if msg: out += msg
       +                else: break
       +            s.close()
       +            self.rtime = time.time() - t1
       +            self.is_connected = True
       +            if cmd in[ 'peers','h']:
       +                out = ast.literal_eval( out )
       +
       +            if out=='': out=None #fixme
       +
       +            self.handle_response(method, params, out)
       +
       +
       +
       +
       +
       +class HttpInterface(PollingInterface):
       +
       +    def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
       +        Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback)
       +        self.message_id = 0
       +        self.messages = {}
       +
       +    def handler(self, messages):
                import urllib2, json, time
       -        if type(params) != type([]): params = [ params ]
       -        t1 = time.time()
       -        data = { 'method':method, 'id':'jsonrpc', 'params':params }
       +
       +        data = []
       +        for m in messages:
       +            method, params = m
       +            if type(params) != type([]): params = [params]
       +            t1 = time.time()
       +            data.append( { 'method':method, 'id':self.message_id, 'params':params } )
       +            self.messages[self.message_id] = (method, params)
       +            self.message_id += 1
       +
                data_json = json.dumps(data)
       -        host = 'http://%s:%d'%( self.host if method!='peers' else self.peers_server, self.port )
       +        host = 'http://%s:%d'%( self.host if method!='server.peers' else self.peers_server, self.port )
                req = urllib2.Request(host, data_json, {'content-type': 'application/json'})
                response_stream = urllib2.urlopen(req)
                response = json.loads( response_stream.read() )
       -        out = response.get('result')
       -        if not out:
       -            print response
       +
                self.rtime = time.time() - t1
                self.is_connected = True
       -        return out
       +
       +        for item in response:
       +            self.handle_json_response(item)
        
        
        
        
        import threading
        
       -class TCPInterface(Interface):
       -    """json-rpc over persistent TCP connection"""
       +class AsynchronousInterface(Interface):
       +    """json-rpc over persistent TCP connection, asynchronous"""
        
       -    def __init__(self, host, port):
       -        Interface.__init__(self, host, port)
       +    def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None):
       +        Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback)
                self.message_id = 0
                self.messages = {}
        
       -        self.tx_event = threading.Event()
       -        self.addresses_waiting_for_status = []
       -        self.addresses_waiting_for_history = []
       -        # up to date
       -        self.is_up_to_date = False
       -        self.up_to_date_event = threading.Event()
       -        self.up_to_date_event.clear()
       -
       -    def send(self, method, params = []):
       -        request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
       -        self.messages[self.message_id] = (method, params)
       -        self.s.send( request + '\n' )
       -        self.message_id += 1
       -
       -    def listen_thread(self, wallet):
       +    def listen_thread(self):
                try:
                    self.is_connected = True
                    out = ''
       t@@ -250,90 +314,59 @@ class TCPInterface(Interface):
                            c = out[0:s]
                            out = out[s+1:]
                            c = json.loads(c)
       -
       -                    #print c
       -                    msg_id = c.get('id')
       -                    result = c.get('result')
       -                    error = c.get('error')
       -
       -                    if msg_id is None:
       -                        print "error: message without ID"
       -                        continue
       -
       -                    method, params = self.messages[msg_id]
       -
       -                    if method == 'server.banner':
       -                        self.message = result
       -                        self.was_updated = True
       -
       -                    elif method == 'server.peers':
       -                        self.servers = map( lambda x:x[1], result )
       -
       -                    elif method == 'address.subscribe':
       -                        addr = params[0]
       -                        if addr in self.addresses_waiting_for_status:
       -                            self.addresses_waiting_for_status.remove(addr)
       -                        wallet.receive_status_callback(addr, result)
       -                            
       -                    elif method == 'address.get_history':
       -                        addr = params[0]
       -                        if addr in self.addresses_waiting_for_history:
       -                            self.addresses_waiting_for_history.remove(addr)
       -                        wallet.receive_history_callback(addr, result)
       -                        self.was_updated = True
       -
       -                    elif method == 'transaction.broadcast':
       -                        self.tx_result = result
       -                        self.tx_event.set()
       -
       -                    elif method == 'numblocks.subscribe':
       -                        self.blocks = result
       -
       -                    else:
       -                        print "received message:", c
       -
       +                    self.handle_json_response(c)
                            if self.addresses_waiting_for_status or self.addresses_waiting_for_history:
                                self.is_up_to_date = False
                            else:
                                self.is_up_to_date = True
                                self.up_to_date_event.set()
       +
                except:
                    traceback.print_exc(file=sys.stdout)
        
                self.is_connected = False
                self.disconnected_event.set()
        
       -    def update_wallet(self,wallet):
       +    def update_wallet(self,cb):
                self.up_to_date_event.wait()
        
       +    def send(self, messages):
       +        out = ''
       +        for m in messages:
       +            method, params = m 
       +            request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
       +            self.messages[self.message_id] = (method, params)
       +            self.message_id += 1
       +            out += request + '\n'
       +        self.s.send( out )
       +
            def send_tx(self, data):
                self.tx_event.clear()
       -        self.send('transaction.broadcast', [data] )
       +        self.send([('transaction.broadcast', [data])])
                self.tx_event.wait()
                return self.tx_result
        
       -    def subscribe(self, address, callback):
       -        self.send('address.subscribe', [address])
       -        self.addresses_waiting_for_status.append(address)
       -        
       +    def subscribe(self, addresses):
       +        messages = []
       +        for addr in addresses:
       +            messages.append(('address.subscribe', [addr]))
       +            self.addresses_waiting_for_status.append(addr)
       +        self.send(messages)
       +
            def get_servers(self):
       -        self.send('server.peers')
       +        self.send([('server.peers',[])])
        
       -    def get_history(self, addr, callback):
       -        self.send('address.get_history', [addr])
       -        self.addresses_waiting_for_history.append(addr) 
       +    def get_history(self, addr):
       +        self.send([('address.get_history', [addr])])
       +        self.addresses_waiting_for_history.append(addr)
        
       -    def start_session(self, wallet):
       +    def start_session(self, addresses, version):
                self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
                self.s.settimeout(1)
                self.s.connect(( self.host, self.port))
       -        thread.start_new_thread(self.listen_thread, (wallet,))
       -        self.send('client.version', [wallet.electrum_version])
       -        self.send('server.banner')
       -        self.send('numblocks.subscribe')
       -        for address in wallet.all_addresses():
       -            self.subscribe(address, wallet.receive_status_callback)
       -
       +        thread.start_new_thread(self.listen_thread, ())
       +        self.send([('client.version', [version]), ('server.banner',[]), ('numblocks.subscribe',[])])
       +        self.subscribe(addresses)
        
        
        
       t@@ -345,16 +378,20 @@ def new_interface(wallet):
            else:
                host = random.choice( DEFAULT_SERVERS )         # random choice when the wallet is created
            port = wallet.port
       +    address_cb = wallet.receive_status_callback
       +    history_cb = wallet.receive_history_callback
        
            if port == 50000:
       -        interface = NativeInterface(host,port)
       +        InterfaceClass = NativeInterface
            elif port == 50001:
       -        interface = TCPInterface(host,port)
       +        InterfaceClass = AsynchronousInterface
            elif port in [80, 81, 8080, 8081]:
       -        interface = HttpInterface(host,port)            
       +        InterfaceClass = HttpInterface
            else:
                print "unknown port number: %d. using native protocol."%port
       -        interface = NativeInterface(host,port)
       +        InterfaceClass = NativeInterface
       +
       +    interface = InterfaceClass(host, port, address_cb, history_cb)
                
            return interface
               
       t@@ -362,7 +399,9 @@ def new_interface(wallet):
        def loop_interfaces_thread(wallet):
            while True:
                try:
       -            wallet.interface.start_session(wallet)
       +            addresses = wallet.all_addresses()
       +            version = wallet.electrum_version
       +            wallet.interface.start_session(addresses, version)
                    wallet.interface.get_servers()
        
                    wallet.interface.disconnected_event.wait()
   DIR diff --git a/client/wallet.py b/client/wallet.py
       t@@ -456,7 +456,7 @@ class Wallet:
        
            def create_new_address(self, bool):
                address = self.create_new_address_without_history(bool)
       -        self.interface.subscribe(address, self.receive_status_callback)
       +        self.interface.subscribe([address])
                return address
        
        
       t@@ -701,11 +701,12 @@ class Wallet:
        
            def receive_status_callback(self, addr, status):
                if self.status.get(addr) != status:
       +            #print "updating status for", addr, repr(self.status.get(addr)), repr(status)
                    self.status[addr] = status
       -            self.interface.get_history(addr, self.receive_history_callback)
       +            self.interface.get_history(addr)
        
            def receive_history_callback(self, addr, data):
       -        #print "updating history for", addr
       +        #print "updating history for", addr, repr(data)
                self.history[addr] = data
                self.synchronize()
                self.update_tx_history()