URI: 
       tLNWorker: connect to multiple peers. save exceptions in aiosafe. enable adding peer in GUI. - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 8f779f504f76a48899722203d045c1e3ac850d11
   DIR parent 35adc3231b297d03ad0a0534d65665ad14c0d9f6
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Fri, 13 Jul 2018 17:05:04 +0200
       
       LNWorker: connect to multiple peers.
       save exceptions in aiosafe.
       enable adding peer in GUI.
       
       Diffstat:
         M electrum/gui/qt/channels_list.py    |      40 +++++++++++++++++++++++++-------
         M electrum/lnbase.py                  |      28 +++++++++++++++++-----------
         M electrum/lnrouter.py                |       4 ++--
         M electrum/lnworker.py                |      38 +++++++++++++++++++++----------
       
       4 files changed, 77 insertions(+), 33 deletions(-)
       ---
   DIR diff --git a/electrum/gui/qt/channels_list.py b/electrum/gui/qt/channels_list.py
       t@@ -87,7 +87,7 @@ class ChannelsList(MyTreeWidget):
                push_amt_inp.setAmount(0)
                h.addWidget(QLabel(_('Your Node ID')), 0, 0)
                h.addWidget(local_nodeid, 0, 1)
       -        h.addWidget(QLabel(_('Remote Node ID')), 1, 0)
       +        h.addWidget(QLabel(_('Remote Node ID or connection string')), 1, 0)
                h.addWidget(remote_nodeid, 1, 1)
                h.addWidget(QLabel('Local amount'), 2, 0)
                h.addWidget(local_amt_inp, 2, 1)
       t@@ -97,19 +97,43 @@ class ChannelsList(MyTreeWidget):
                vbox.addLayout(Buttons(CancelButton(d), OkButton(d)))
                if not d.exec_():
                    return
       -        nodeid_hex = str(remote_nodeid.text())
                local_amt = local_amt_inp.get_amount()
                push_amt = push_amt_inp.get_amount()
       +        connect_contents = str(remote_nodeid.text())
       +        rest = None
       +        try:
       +            nodeid_hex, rest = connect_contents.split("@")
       +        except ValueError:
       +            nodeid_hex = connect_contents
                try:
                    node_id = bfh(nodeid_hex)
       +            assert len(node_id) == 33
                except:
       -            self.parent.show_error(_('Invalid node ID'))
       -            return
       -        if node_id not in self.parent.wallet.lnworker.peers and node_id not in self.parent.network.lightning_nodes:
       -            self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex)
       +            self.parent.show_error(_('Invalid node ID, must be 33 bytes and hexadecimal'))
                    return
       -        assert local_amt >= 200000
       -        assert local_amt >= push_amt
       +        peer = self.parent.wallet.lnworker.peers.get(node_id)
       +
       +        if not peer:
       +            known = node_id in self.parent.network.lightning_nodes
       +            if rest is not None:
       +                try:
       +                    host, port = rest.split(":")
       +                except ValueError:
       +                    self.parent.show_error(_('Connection strings must be in <node_pubkey>@<host>:<port> format'))
       +            elif known:
       +                node = self.network.lightning_nodes.get(node_id)
       +                host, port = node['addresses'][0]
       +            else:
       +                self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex)
       +                return
       +            try:
       +                int(port)
       +            except:
       +                self.parent.show_error(_('Port number must be decimal'))
       +                return
       +
       +            self.parent.wallet.lnworker.add_peer(host, port, node_id)
       +
                self.main_window.protect(self.open_channel, (node_id, local_amt, push_amt))
        
            def open_channel(self, *args, **kwargs):
   DIR diff --git a/electrum/lnbase.py b/electrum/lnbase.py
       t@@ -267,21 +267,24 @@ def create_ephemeral_key(privkey):
        
        
        def aiosafe(f):
       +    # save exception in object.
       +    # f must be a method of a PrintError instance.
       +    # aiosafe calls should not be nested
            async def f2(*args, **kwargs):
       +        self = args[0]
                try:
                    return await f(*args, **kwargs)
       -        except:
       -            # if the loop isn't stopped
       -            # run_forever in network.py would not return,
       -            # the asyncioThread would not die,
       -            # and we would block on shutdown
       -            asyncio.get_event_loop().stop()
       -            traceback.print_exc()
       +        except BaseException as e:
       +            self.print_msg("Exception in", f.__name__, ":", e.__class__.__name__, str(e))
       +            self.exception = e
            return f2
        
       +
       +
        class Peer(PrintError):
        
            def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False):
       +        self.exception = None # set by aiosafe
                self.host = host
                self.port = port
                self.pubkey = pubkey
       t@@ -307,7 +310,7 @@ class Peer(PrintError):
                self.attempted_route = {}
        
            def diagnostic_name(self):
       -        return self.host
       +        return 'lnbase:' + self.host
        
            def ping_if_required(self):
                if time.time() - self.ping_time > 120:
       t@@ -455,7 +458,7 @@ class Peer(PrintError):
                    'alias': alias,
                    'addresses': addresses
                }
       -        self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses)
       +        #self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses)
                self.network.trigger_callback('ln_status')
        
            def on_init(self, payload):
       t@@ -476,8 +479,7 @@ class Peer(PrintError):
                else:
                    self.announcement_signatures[channel_id].put_nowait(payload)
        
       -    @aiosafe
       -    async def main_loop(self):
       +    async def initialize(self):
                self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
                await self.handshake()
                # send init
       t@@ -486,6 +488,10 @@ class Peer(PrintError):
                msg = await self.read_message()
                self.process_message(msg)
                self.initialized.set_result(True)
       +
       +    @aiosafe
       +    async def main_loop(self):
       +        await asyncio.wait_for(self.initialize(), 5)
                # loop
                while True:
                    self.ping_if_required()
   DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
       t@@ -70,7 +70,7 @@ class ChannelInfo(PrintError):
                    self.policy_node1 = ChannelInfoDirectedPolicy(msg_payload)
                else:
                    self.policy_node2 = ChannelInfoDirectedPolicy(msg_payload)
       -        self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags)
       +        #self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags)
        
            def get_policy_for_node(self, node_id):
                if node_id == self.node_id_1:
       t@@ -112,7 +112,7 @@ class ChannelDB(PrintError):
        
            def on_channel_announcement(self, msg_payload):
                short_channel_id = msg_payload['short_channel_id']
       -        self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii"))
       +        #self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii"))
                channel_info = ChannelInfo(msg_payload)
                self._id_to_channel_info[short_channel_id] = channel_info
                self._channels_for_node[channel_info.node_id_1].add(short_channel_id)
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -5,12 +5,13 @@ import os
        from decimal import Decimal
        import threading
        from collections import defaultdict
       +import random
        
        from . import constants
        from .bitcoin import sha256, COIN
        from .util import bh2u, bfh, PrintError
        from .constants import set_testnet, set_simnet
       -from .lnbase import Peer, privkey_to_pubkey
       +from .lnbase import Peer, privkey_to_pubkey, aiosafe
        from .lnaddr import lnencode, LnAddr, lndecode
        from .ecc import der_sig_from_sig_string
        from .transaction import Transaction
       t@@ -39,15 +40,13 @@ class LNWorker(PrintError):
                self.peers = {}
                self.channels = {x.channel_id: x for x in map(HTLCStateMachine, wallet.storage.get("channels", []))}
                self.invoices = wallet.storage.get('lightning_invoices', {})
       -        peer_list = network.config.get('lightning_peers', node_list)
                self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()}
                for chan_id, chan in self.channels.items():
                    self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos)
       -        for host, port, pubkey in peer_list:
       -            self.add_peer(host, int(port), bfh(pubkey))
                # wait until we see confirmations
                self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe
                self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
       +        asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop())
        
            def channels_for_peer(self, node_id):
                assert type(node_id) is bytes
       t@@ -118,15 +117,9 @@ class LNWorker(PrintError):
                        conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
                        peer.on_network_update(chan, conf)
        
       -    async def _open_channel_coroutine(self, node_id, amount_sat, push_sat, password):
       -        if node_id not in self.peers:
       -            node = self.network.lightning_nodes.get(node_id)
       -            if node is None:
       -                return "node not found, peers available are: " + str(self.network.lightning_nodes.keys())
       -            host, port = node['addresses'][0]
       -            self.add_peer(host, port, node_id)
       +    async def _open_channel_coroutine(self, node_id, local_amount_sat, push_sat, password):
                peer = self.peers[node_id]
       -        openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount_sat, push_sat * 1000, temp_channel_id=os.urandom(32))
       +        openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, local_amount_sat + push_sat, push_sat * 1000, temp_channel_id=os.urandom(32))
                self.save_channel(openingchannel)
                self.network.lnwatcher.watch_channel(openingchannel, self.on_channel_utxos)
                self.on_channels_updated()
       t@@ -192,3 +185,24 @@ class LNWorker(PrintError):
                tx.add_signature_to_txin(0, none_idx, bh2u(remote_sig))
                assert tx.is_complete()
                return self.network.broadcast_transaction(tx)
       +
       +    @aiosafe
       +    async def main_loop(self):
       +        peer_list = self.config.get('lightning_peers', node_list)
       +        for host, port, pubkey in peer_list:
       +            self.add_peer(host, int(port), bfh(pubkey))
       +        while True:
       +            await asyncio.sleep(1)
       +            for k, peer in list(self.peers.items()):
       +                if peer.exception:
       +                    self.print_error("removing peer", peer.host)
       +                    self.peers.pop(k)
       +            if len(self.peers) > 3:
       +                continue
       +            node_id = random.choice(list(self.network.lightning_nodes.keys()))
       +            node = self.network.lightning_nodes.get(node_id)
       +            addresses = node.get('addresses')
       +            if addresses:
       +                host, port = addresses[0]
       +                self.print_error("trying node", bh2u(node_id))
       +                self.add_peer(host, port, node_id)