URI: 
       tlightning: post aiorpcx rebase fixup - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 3eabd70df59dc43e6f627198a6530ffe90f26766
   DIR parent 7edba633413b48e2ffdaa5dc61ece8d8c4c915fe
  HTML Author: Janus <ysangkok@gmail.com>
       Date:   Mon, 10 Sep 2018 18:01:55 +0200
       
       lightning: post aiorpcx rebase fixup
       
       Diffstat:
         M electrum/lnbase.py                  |      19 ++-----------------
         M electrum/lnchanannverifier.py       |      40 +++++++++++++-------------------
         M electrum/lnrouter.py                |       4 +++-
         M electrum/lnwatcher.py               |      79 +++++++++++++++----------------
         M electrum/lnworker.py                |      62 +++++++++++++++----------------
       
       5 files changed, 88 insertions(+), 116 deletions(-)
       ---
   DIR diff --git a/electrum/lnbase.py b/electrum/lnbase.py
       t@@ -29,7 +29,7 @@ from . import crypto
        from .crypto import sha256
        from . import constants
        from . import transaction
       -from .util import PrintError, bh2u, print_error, bfh
       +from .util import PrintError, bh2u, print_error, bfh, aiosafe
        from .transaction import opcodes, Transaction, TxOutput
        from .lnonion import new_onion_packet, OnionHopsDataSingle, OnionPerHop, decode_onion_error, ONION_FAILURE_CODE_MAP
        from .lnaddr import lndecode
       t@@ -266,21 +266,6 @@ def create_ephemeral_key() -> (bytes, bytes):
            return privkey.get_secret_bytes(), privkey.get_public_key_bytes()
        
        
       -def aiosafe(f):
       -    # save exception in object.
       -    # f must be a method of a PrintError instance.
       -    # aiosafe calls should not be nested
       -    async def f2(*args, **kwargs):
       -        self = args[0]
       -        try:
       -            return await f(*args, **kwargs)
       -        except BaseException as e:
       -            self.print_error("Exception in", f.__name__, ":", e.__class__.__name__, str(e))
       -            self.exception = e
       -    return f2
       -
       -
       -
        class Peer(PrintError):
        
            def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False):
       t@@ -612,7 +597,7 @@ class Peer(PrintError):
                remote_sig = payload['signature']
                m.receive_new_commitment(remote_sig, [])
                # broadcast funding tx
       -        success, _txid = self.network.broadcast_transaction(funding_tx)
       +        success, _txid = await self.network.broadcast_transaction(funding_tx)
                assert success, success
                m.remote_state = m.remote_state._replace(ctn=0)
                m.local_state = m.local_state._replace(ctn=0, current_commitment_signature=remote_sig)
   DIR diff --git a/electrum/lnchanannverifier.py b/electrum/lnchanannverifier.py
       t@@ -23,7 +23,9 @@
        # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        # SOFTWARE.
        
       +import asyncio
        import threading
       +from aiorpcx import TaskGroup
        
        from . import lnbase
        from . import bitcoin
       t@@ -60,7 +62,13 @@ class LNChanAnnVerifier(ThreadJob):
            def get_pending_channel_info(self, short_channel_id):
                return self.unverified_channel_info.get(short_channel_id, None)
        
       -    def run(self):
       +    async def main(self):
       +        while True:
       +            async with TaskGroup() as tg:
       +                await self.iteration(tg)
       +            await asyncio.sleep(0.1)
       +
       +    async def iteration(self, tg):
                interface = self.network.interface
                if not interface:
                    return
       t@@ -81,40 +89,24 @@ class LNChanAnnVerifier(ThreadJob):
                    if header is None:
                        index = block_height // 2016
                        if index < len(blockchain.checkpoints):
       -                    self.network.request_chunk(interface, index)
       +                    await tg.spawn(self.network.request_chunk(interface, index))
                        continue
       -            callback = lambda resp, short_channel_id=short_channel_id: self.on_txid_and_merkle(resp, short_channel_id)
       -            self.network.get_txid_from_txpos(block_height, tx_pos, True,
       -                                             callback=callback)
       +            await tg.spawn(self.verify_channel(block_height, tx_pos, short_channel_id))
                    #self.print_error('requested short_channel_id', bh2u(short_channel_id))
       -            with self.lock:
       -                self.started_verifying_channel.add(short_channel_id)
        
       -    def on_txid_and_merkle(self, response, short_channel_id):
       -        if response.get('error'):
       -            self.print_error('received an error:', response)
       -            return
       -        result = response['result']
       +    async def verify_channel(self, block_height, tx_pos, short_channel_id):
       +        with self.lock:
       +            self.started_verifying_channel.add(short_channel_id)
       +        result = await self.network.get_txid_from_txpos(block_height, tx_pos, True)
                tx_hash = result['tx_hash']
                merkle_branch = result['merkle']
       -        block_height, tx_pos, output_idx = invert_short_channel_id(short_channel_id)
                header = self.network.blockchain().read_header(block_height)
                try:
                    verify_tx_is_in_block(tx_hash, merkle_branch, tx_pos, header, block_height)
                except MerkleVerificationFailure as e:
                    self.print_error(str(e))
                    return
       -        callback = lambda resp, short_channel_id=short_channel_id: self.on_tx_response(resp, short_channel_id)
       -        self.network.get_transaction(tx_hash, callback=callback)
       -
       -    def on_tx_response(self, response, short_channel_id):
       -        if response.get('error'):
       -            self.print_error('received an error:', response)
       -            return
       -        params = response['params']
       -        result = response['result']
       -        tx_hash = params[0]
       -        tx = Transaction(result)
       +        tx = Transaction(await self.network.get_transaction(tx_hash))
                try:
                    tx.deserialize()
                except Exception:
   DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
       t@@ -31,6 +31,7 @@ from collections import namedtuple, defaultdict
        from typing import Sequence, Union, Tuple, Optional
        import binascii
        import base64
       +import asyncio
        
        from . import constants
        from .util import PrintError, bh2u, profiler, get_headers_dir, bfh, is_ip_address, list_enabled_bits
       t@@ -277,7 +278,8 @@ class ChannelDB(JsonDB):
                self._last_good_address = {}  # node_id -> LNPeerAddr
        
                self.ca_verifier = LNChanAnnVerifier(network, self)
       -        self.network.add_jobs([self.ca_verifier])
       +        # FIXME if the channel verifier raises, it kills network.main_taskgroup
       +        asyncio.run_coroutine_threadsafe(self.network.add_job(self.ca_verifier.main()), network.asyncio_loop)
        
                self.load_data()
        
   DIR diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py
       t@@ -1,12 +1,13 @@
        import threading
       +import asyncio
        
       -from .util import PrintError, bh2u, bfh, NoDynamicFeeEstimates
       +from .util import PrintError, bh2u, bfh, NoDynamicFeeEstimates, aiosafe
        from .lnutil import (extract_ctn_from_tx, derive_privkey,
                             get_per_commitment_secret_from_seed, derive_pubkey,
                             make_commitment_output_to_remote_address,
                             RevocationStore, UnableToDeriveSecret)
        from . import lnutil
       -from .bitcoin import redeem_script_to_address, TYPE_ADDRESS
       +from .bitcoin import redeem_script_to_address, TYPE_ADDRESS, address_to_scripthash
        from . import transaction
        from .transaction import Transaction, TxOutput
        from . import ecc
       t@@ -22,33 +23,27 @@ class LNWatcher(PrintError):
                self.watched_channels = {}
                self.address_status = {}  # addr -> status
        
       -    def parse_response(self, response):
       -        if response.get('error'):
       -            self.print_error("response error:", response)
       -            return None, None
       -        return response['params'], response['result']
       +    @aiosafe
       +    async def handle_addresses(self, funding_address):
       +        queue = asyncio.Queue()
       +        params = [address_to_scripthash(funding_address)]
       +        await self.network.interface.session.subscribe('blockchain.scripthash.subscribe', params, queue)
       +        await queue.get()
       +        while True:
       +            result = await queue.get()
       +            await self.on_address_status(funding_address, result)
        
            def watch_channel(self, chan, callback):
                funding_address = chan.get_funding_address()
                self.watched_channels[funding_address] = chan, callback
       -        self.network.subscribe_to_addresses([funding_address], self.on_address_status)
       +        asyncio.get_event_loop().create_task(self.handle_addresses(funding_address))
        
       -    def on_address_status(self, response):
       -        params, result = self.parse_response(response)
       -        if not params:
       -            return
       -        addr = params[0]
       +    async def on_address_status(self, addr, result):
                if self.address_status.get(addr) != result:
                    self.address_status[addr] = result
       -            self.network.request_address_utxos(addr, self.on_utxos)
       -
       -    def on_utxos(self, response):
       -        params, result = self.parse_response(response)
       -        if not params:
       -            return
       -        addr = params[0]
       -        chan, callback = self.watched_channels[addr]
       -        callback(chan, result)
       +            result = await self.network.interface.session.send_request('blockchain.scripthash.listunspent', [address_to_scripthash(addr)])
       +            chan, callback = self.watched_channels[addr]
       +            await callback(chan, result)
        
        
        
       t@@ -69,9 +64,9 @@ class LNChanCloseHandler(PrintError):
                network.register_callback(self.on_network_update, ['updated'])
                self.watch_address(self.funding_address)
        
       -    def on_network_update(self, event, *args):
       +    async def on_network_update(self, event, *args):
                if self.wallet.synchronizer.is_up_to_date():
       -            self.check_onchain_situation()
       +            await self.check_onchain_situation()
        
            def stop_and_delete(self):
                self.network.unregister_callback(self.on_network_update)
       t@@ -82,7 +77,7 @@ class LNChanCloseHandler(PrintError):
                    self.watched_addresses.add(addr)
                    self.wallet.synchronizer.add(addr)
        
       -    def check_onchain_situation(self):
       +    async def check_onchain_situation(self):
                funding_outpoint = self.chan.funding_outpoint
                ctx_candidate_txid = self.wallet.spent_outpoints[funding_outpoint.txid].get(funding_outpoint.output_index)
                if ctx_candidate_txid is None:
       t@@ -104,13 +99,13 @@ class LNChanCloseHandler(PrintError):
                conf = self.wallet.get_tx_height(ctx_candidate_txid).conf
                if conf == 0:
                    return
       -        keep_watching_this = self.inspect_ctx_candidate(ctx_candidate, i)
       +        keep_watching_this = await self.inspect_ctx_candidate(ctx_candidate, i)
                if not keep_watching_this:
                    self.stop_and_delete()
        
            # TODO batch sweeps
            # TODO sweep HTLC outputs
       -    def inspect_ctx_candidate(self, ctx, txin_idx: int):
       +    async def inspect_ctx_candidate(self, ctx, txin_idx: int):
                """Returns True iff found any not-deeply-spent outputs that we could
                potentially sweep at some point."""
                keep_watching_this = False
       t@@ -127,7 +122,7 @@ class LNChanCloseHandler(PrintError):
                    # note that we might also get here if this is our ctx and the ctn just happens to match
                    their_cur_pcp = chan.remote_state.current_per_commitment_point
                    if their_cur_pcp is not None:
       -                keep_watching_this |= self.find_and_sweep_their_ctx_to_remote(ctx, their_cur_pcp)
       +                keep_watching_this |= await self.find_and_sweep_their_ctx_to_remote(ctx, their_cur_pcp)
                # see if we have a revoked secret for this ctn ("breach")
                try:
                    per_commitment_secret = chan.remote_state.revocation_store.retrieve_secret(
       t@@ -138,13 +133,13 @@ class LNChanCloseHandler(PrintError):
                    # note that we might also get here if this is our ctx and we just happen to have
                    # the secret for the symmetric ctn
                    their_pcp = ecc.ECPrivkey(per_commitment_secret).get_public_key_bytes(compressed=True)
       -            keep_watching_this |= self.find_and_sweep_their_ctx_to_remote(ctx, their_pcp)
       -            keep_watching_this |= self.find_and_sweep_their_ctx_to_local(ctx, per_commitment_secret)
       +            keep_watching_this |= await self.find_and_sweep_their_ctx_to_remote(ctx, their_pcp)
       +            keep_watching_this |= await self.find_and_sweep_their_ctx_to_local(ctx, per_commitment_secret)
                # see if it's our ctx
                our_per_commitment_secret = get_per_commitment_secret_from_seed(
                    chan.local_state.per_commitment_secret_seed, RevocationStore.START_INDEX - ctn)
                our_per_commitment_point = ecc.ECPrivkey(our_per_commitment_secret).get_public_key_bytes(compressed=True)
       -        keep_watching_this |= self.find_and_sweep_our_ctx_to_local(ctx, our_per_commitment_point)
       +        keep_watching_this |= await self.find_and_sweep_our_ctx_to_local(ctx, our_per_commitment_point)
                return keep_watching_this
        
            def get_tx_mined_status(self, txid):
       t@@ -166,7 +161,7 @@ class LNChanCloseHandler(PrintError):
                else:
                    raise NotImplementedError()
        
       -    def find_and_sweep_their_ctx_to_remote(self, ctx, their_pcp: bytes):
       +    async def find_and_sweep_their_ctx_to_remote(self, ctx, their_pcp: bytes):
                """Returns True iff found a not-deeply-spent output that we could
                potentially sweep at some point."""
                payment_bp_privkey = ecc.ECPrivkey(self.chan.local_config.payment_basepoint.privkey)
       t@@ -193,12 +188,12 @@ class LNChanCloseHandler(PrintError):
                    return True
                sweep_tx = create_sweeptx_their_ctx_to_remote(self.network, self.sweep_address, ctx,
                                                              output_idx, our_payment_privkey)
       -        self.network.broadcast_transaction(sweep_tx,
       -                                           lambda res: self.print_tx_broadcast_result('sweep_their_ctx_to_remote', res))
       +        res = await self.network.broadcast_transaction(sweep_tx)
       +        self.print_tx_broadcast_result('sweep_their_ctx_to_remote', res)
                return True
        
        
       -    def find_and_sweep_their_ctx_to_local(self, ctx, per_commitment_secret: bytes):
       +    async def find_and_sweep_their_ctx_to_local(self, ctx, per_commitment_secret: bytes):
                """Returns True iff found a not-deeply-spent output that we could
                potentially sweep at some point."""
                per_commitment_point = ecc.ECPrivkey(per_commitment_secret).get_public_key_bytes(compressed=True)
       t@@ -230,11 +225,11 @@ class LNChanCloseHandler(PrintError):
                    return True
                sweep_tx = create_sweeptx_ctx_to_local(self.network, self.sweep_address, ctx, output_idx,
                                                       witness_script, revocation_privkey, True)
       -        self.network.broadcast_transaction(sweep_tx,
       -                                           lambda res: self.print_tx_broadcast_result('sweep_their_ctx_to_local', res))
       +        res = await self.network.broadcast_transaction(sweep_tx)
       +        self.print_tx_broadcast_result('sweep_their_ctx_to_local', res)
                return True
        
       -    def find_and_sweep_our_ctx_to_local(self, ctx, our_pcp: bytes):
       +    async def find_and_sweep_our_ctx_to_local(self, ctx, our_pcp: bytes):
                """Returns True iff found a not-deeply-spent output that we could
                potentially sweep at some point."""
                delayed_bp_privkey = ecc.ECPrivkey(self.chan.local_config.delayed_basepoint.privkey)
       t@@ -272,14 +267,14 @@ class LNChanCloseHandler(PrintError):
                sweep_tx = create_sweeptx_ctx_to_local(self.network, self.sweep_address, ctx, output_idx,
                                                       witness_script, our_localdelayed_privkey.get_secret_bytes(),
                                                       False, to_self_delay)
       -        self.network.broadcast_transaction(sweep_tx,
       -                                           lambda res: self.print_tx_broadcast_result('sweep_our_ctx_to_local', res))
       +        res = await self.network.broadcast_transaction(sweep_tx)
       +        self.print_tx_broadcast_result('sweep_our_ctx_to_local', res)
                return True
        
            def print_tx_broadcast_result(self, name, res):
       -        error = res.get('error')
       +        error, msg = res
                if error:
       -            self.print_error('{} broadcast failed: {}'.format(name, error))
       +            self.print_error('{} broadcast failed: {}'.format(name, msg))
                else:
                    self.print_error('{} broadcast succeeded'.format(name))
        
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -11,8 +11,8 @@ import dns.exception
        
        from . import constants
        from .bitcoin import sha256, COIN
       -from .util import bh2u, bfh, PrintError, InvoiceError, resolve_dns_srv
       -from .lnbase import Peer, privkey_to_pubkey, aiosafe
       +from .util import bh2u, bfh, PrintError, InvoiceError, resolve_dns_srv, aiosafe
       +from .lnbase import Peer, privkey_to_pubkey
        from .lnaddr import lnencode, LnAddr, lndecode
        from .ecc import der_sig_from_sig_string
        from .lnhtlc import HTLCStateMachine
       t@@ -55,8 +55,7 @@ class LNWorker(PrintError):
                self._add_peers_from_config()
                # wait until we see confirmations
                self.network.register_callback(self.on_network_update, ['updated', 'verified', 'fee']) # thread safe
       -        self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
       -        self.network.futures.append(asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop()))
       +        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
        
            def _add_peers_from_config(self):
                peer_list = self.config.get('lightning_peers', [])
       t@@ -84,7 +83,7 @@ class LNWorker(PrintError):
                self._last_tried_peer[peer_addr] = time.time()
                self.print_error("adding peer", peer_addr)
                peer = Peer(self, host, port, node_id, request_initial_sync=self.config.get("request_initial_sync", True))
       -        self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop()))
       +        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(peer.main_loop()), self.network.asyncio_loop)
                self.peers[node_id] = peer
                self.network.trigger_callback('ln_status')
        
       t@@ -119,7 +118,7 @@ class LNWorker(PrintError):
                    return True
                return False
        
       -    def on_channel_utxos(self, chan, utxos):
       +    async def on_channel_utxos(self, chan, utxos):
                outpoints = [Outpoint(x["tx_hash"], x["tx_pos"]) for x in utxos]
                if chan.funding_outpoint not in outpoints:
                    chan.set_funding_txo_spentness(True)
       t@@ -131,33 +130,32 @@ class LNWorker(PrintError):
                    chan.set_funding_txo_spentness(False)
                self.network.trigger_callback('channel', chan)
        
       -    def on_network_update(self, event, *args):
       -        """ called from network thread """
       +    @aiosafe
       +    async def on_network_update(self, event, *args):
       +        # TODO
                # Race discovered in save_channel (assertion failing):
                # since short_channel_id could be changed while saving.
       -        # Mitigated by posting to loop:
       -        async def network_jobs():
       -            with self.lock:
       -                channels = list(self.channels.values())
       -            for chan in channels:
       -                if chan.get_state() == "OPENING":
       -                    res = self.save_short_chan_id(chan)
       -                    if not res:
       -                        self.print_error("network update but funding tx is still not at sufficient depth")
       -                        continue
       -                    # this results in the channel being marked OPEN
       -                    peer = self.peers[chan.node_id]
       -                    peer.funding_locked(chan)
       -                elif chan.get_state() == "OPEN":
       -                    peer = self.peers.get(chan.node_id)
       -                    if peer is None:
       -                        self.print_error("peer not found for {}".format(bh2u(chan.node_id)))
       -                        return
       -                    if event == 'fee':
       -                        peer.on_bitcoin_fee_update(chan)
       -                    conf = self.wallet.get_tx_height(chan.funding_outpoint.txid).conf
       -                    peer.on_network_update(chan, conf)
       -        asyncio.run_coroutine_threadsafe(network_jobs(), self.network.asyncio_loop).result()
       +        with self.lock:
       +            channels = list(self.channels.values())
       +        for chan in channels:
       +            print("update", chan.get_state())
       +            if chan.get_state() == "OPENING":
       +                res = self.save_short_chan_id(chan)
       +                if not res:
       +                    self.print_error("network update but funding tx is still not at sufficient depth")
       +                    continue
       +                # this results in the channel being marked OPEN
       +                peer = self.peers[chan.node_id]
       +                peer.funding_locked(chan)
       +            elif chan.get_state() == "OPEN":
       +                peer = self.peers.get(chan.node_id)
       +                if peer is None:
       +                    self.print_error("peer not found for {}".format(bh2u(chan.node_id)))
       +                    return
       +                if event == 'fee':
       +                    peer.on_bitcoin_fee_update(chan)
       +                conf = self.wallet.get_tx_height(chan.funding_outpoint.txid).conf
       +                peer.on_network_update(chan, conf)
        
            async def _open_channel_coroutine(self, node_id, local_amount_sat, push_sat, password):
                peer = self.peers[node_id]
       t@@ -345,8 +343,8 @@ class LNWorker(PrintError):
                        coro = peer.reestablish_channel(chan)
                        asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
        
       -    @aiosafe
            async def main_loop(self):
       +        await self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
                while True:
                    await asyncio.sleep(1)
                    now = time.time()