tLNWorker: connect to multiple peers. save exceptions in aiosafe. enable adding peer in GUI. - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 8f779f504f76a48899722203d045c1e3ac850d11 DIR parent 35adc3231b297d03ad0a0534d65665ad14c0d9f6 HTML Author: ThomasV <thomasv@electrum.org> Date: Fri, 13 Jul 2018 17:05:04 +0200 LNWorker: connect to multiple peers. save exceptions in aiosafe. enable adding peer in GUI. Diffstat: M electrum/gui/qt/channels_list.py | 40 +++++++++++++++++++++++++------- M electrum/lnbase.py | 28 +++++++++++++++++----------- M electrum/lnrouter.py | 4 ++-- M electrum/lnworker.py | 38 +++++++++++++++++++++---------- 4 files changed, 77 insertions(+), 33 deletions(-) --- DIR diff --git a/electrum/gui/qt/channels_list.py b/electrum/gui/qt/channels_list.py t@@ -87,7 +87,7 @@ class ChannelsList(MyTreeWidget): push_amt_inp.setAmount(0) h.addWidget(QLabel(_('Your Node ID')), 0, 0) h.addWidget(local_nodeid, 0, 1) - h.addWidget(QLabel(_('Remote Node ID')), 1, 0) + h.addWidget(QLabel(_('Remote Node ID or connection string')), 1, 0) h.addWidget(remote_nodeid, 1, 1) h.addWidget(QLabel('Local amount'), 2, 0) h.addWidget(local_amt_inp, 2, 1) t@@ -97,19 +97,43 @@ class ChannelsList(MyTreeWidget): vbox.addLayout(Buttons(CancelButton(d), OkButton(d))) if not d.exec_(): return - nodeid_hex = str(remote_nodeid.text()) local_amt = local_amt_inp.get_amount() push_amt = push_amt_inp.get_amount() + connect_contents = str(remote_nodeid.text()) + rest = None + try: + nodeid_hex, rest = connect_contents.split("@") + except ValueError: + nodeid_hex = connect_contents try: node_id = bfh(nodeid_hex) + assert len(node_id) == 33 except: - self.parent.show_error(_('Invalid node ID')) - return - if node_id not in self.parent.wallet.lnworker.peers and node_id not in self.parent.network.lightning_nodes: - self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex) + self.parent.show_error(_('Invalid node ID, must be 33 bytes and hexadecimal')) return - assert local_amt >= 200000 - assert local_amt >= push_amt + peer = self.parent.wallet.lnworker.peers.get(node_id) + + if not peer: + known = node_id in self.parent.network.lightning_nodes + if rest is not None: + try: + host, port = rest.split(":") + except ValueError: + self.parent.show_error(_('Connection strings must be in <node_pubkey>@<host>:<port> format')) + elif known: + node = self.network.lightning_nodes.get(node_id) + host, port = node['addresses'][0] + else: + self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex) + return + try: + int(port) + except: + self.parent.show_error(_('Port number must be decimal')) + return + + self.parent.wallet.lnworker.add_peer(host, port, node_id) + self.main_window.protect(self.open_channel, (node_id, local_amt, push_amt)) def open_channel(self, *args, **kwargs): DIR diff --git a/electrum/lnbase.py b/electrum/lnbase.py t@@ -267,21 +267,24 @@ def create_ephemeral_key(privkey): def aiosafe(f): + # save exception in object. + # f must be a method of a PrintError instance. + # aiosafe calls should not be nested async def f2(*args, **kwargs): + self = args[0] try: return await f(*args, **kwargs) - except: - # if the loop isn't stopped - # run_forever in network.py would not return, - # the asyncioThread would not die, - # and we would block on shutdown - asyncio.get_event_loop().stop() - traceback.print_exc() + except BaseException as e: + self.print_msg("Exception in", f.__name__, ":", e.__class__.__name__, str(e)) + self.exception = e return f2 + + class Peer(PrintError): def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False): + self.exception = None # set by aiosafe self.host = host self.port = port self.pubkey = pubkey t@@ -307,7 +310,7 @@ class Peer(PrintError): self.attempted_route = {} def diagnostic_name(self): - return self.host + return 'lnbase:' + self.host def ping_if_required(self): if time.time() - self.ping_time > 120: t@@ -455,7 +458,7 @@ class Peer(PrintError): 'alias': alias, 'addresses': addresses } - self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses) + #self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses) self.network.trigger_callback('ln_status') def on_init(self, payload): t@@ -476,8 +479,7 @@ class Peer(PrintError): else: self.announcement_signatures[channel_id].put_nowait(payload) - @aiosafe - async def main_loop(self): + async def initialize(self): self.reader, self.writer = await asyncio.open_connection(self.host, self.port) await self.handshake() # send init t@@ -486,6 +488,10 @@ class Peer(PrintError): msg = await self.read_message() self.process_message(msg) self.initialized.set_result(True) + + @aiosafe + async def main_loop(self): + await asyncio.wait_for(self.initialize(), 5) # loop while True: self.ping_if_required() DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py t@@ -70,7 +70,7 @@ class ChannelInfo(PrintError): self.policy_node1 = ChannelInfoDirectedPolicy(msg_payload) else: self.policy_node2 = ChannelInfoDirectedPolicy(msg_payload) - self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags) + #self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags) def get_policy_for_node(self, node_id): if node_id == self.node_id_1: t@@ -112,7 +112,7 @@ class ChannelDB(PrintError): def on_channel_announcement(self, msg_payload): short_channel_id = msg_payload['short_channel_id'] - self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii")) + #self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii")) channel_info = ChannelInfo(msg_payload) self._id_to_channel_info[short_channel_id] = channel_info self._channels_for_node[channel_info.node_id_1].add(short_channel_id) DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -5,12 +5,13 @@ import os from decimal import Decimal import threading from collections import defaultdict +import random from . import constants from .bitcoin import sha256, COIN from .util import bh2u, bfh, PrintError from .constants import set_testnet, set_simnet -from .lnbase import Peer, privkey_to_pubkey +from .lnbase import Peer, privkey_to_pubkey, aiosafe from .lnaddr import lnencode, LnAddr, lndecode from .ecc import der_sig_from_sig_string from .transaction import Transaction t@@ -39,15 +40,13 @@ class LNWorker(PrintError): self.peers = {} self.channels = {x.channel_id: x for x in map(HTLCStateMachine, wallet.storage.get("channels", []))} self.invoices = wallet.storage.get('lightning_invoices', {}) - peer_list = network.config.get('lightning_peers', node_list) self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()} for chan_id, chan in self.channels.items(): self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos) - for host, port, pubkey in peer_list: - self.add_peer(host, int(port), bfh(pubkey)) # wait until we see confirmations self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified + asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop()) def channels_for_peer(self, node_id): assert type(node_id) is bytes t@@ -118,15 +117,9 @@ class LNWorker(PrintError): conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1] peer.on_network_update(chan, conf) - async def _open_channel_coroutine(self, node_id, amount_sat, push_sat, password): - if node_id not in self.peers: - node = self.network.lightning_nodes.get(node_id) - if node is None: - return "node not found, peers available are: " + str(self.network.lightning_nodes.keys()) - host, port = node['addresses'][0] - self.add_peer(host, port, node_id) + async def _open_channel_coroutine(self, node_id, local_amount_sat, push_sat, password): peer = self.peers[node_id] - openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount_sat, push_sat * 1000, temp_channel_id=os.urandom(32)) + openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, local_amount_sat + push_sat, push_sat * 1000, temp_channel_id=os.urandom(32)) self.save_channel(openingchannel) self.network.lnwatcher.watch_channel(openingchannel, self.on_channel_utxos) self.on_channels_updated() t@@ -192,3 +185,24 @@ class LNWorker(PrintError): tx.add_signature_to_txin(0, none_idx, bh2u(remote_sig)) assert tx.is_complete() return self.network.broadcast_transaction(tx) + + @aiosafe + async def main_loop(self): + peer_list = self.config.get('lightning_peers', node_list) + for host, port, pubkey in peer_list: + self.add_peer(host, int(port), bfh(pubkey)) + while True: + await asyncio.sleep(1) + for k, peer in list(self.peers.items()): + if peer.exception: + self.print_error("removing peer", peer.host) + self.peers.pop(k) + if len(self.peers) > 3: + continue + node_id = random.choice(list(self.network.lightning_nodes.keys())) + node = self.network.lightning_nodes.get(node_id) + addresses = node.get('addresses') + if addresses: + host, port = addresses[0] + self.print_error("trying node", bh2u(node_id)) + self.add_peer(host, port, node_id)