URI: 
       tlnrouter fixes: - use gossip_queries_req instead of initial_routing_sync - add connected nodes to recent peers only after successful init - derive timestamp used with gossip_timestamp_filter from channel_db - fix query_short_channel_ids: 1. channel IDs must be sorted with zlib 2. limit request to 100 3. do not abuse this to request node_announcements; it is fine not to have all nodes. - fix get_recent_peers: 1. do not set last_connected_date to 'now' if we never connected a node 2. sql query was misconstructed and was returning only one peer - populate FALLBACK_NODE_LIST_MAINNET with nodes that have the requested flags - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit aa398993cfcbd27e4431e05fc90c0585a81d83e8
   DIR parent e7888a50bedd8133fec92e960c8a44bedf8c1311
  HTML Author: ThomasV <thomasv@electrum.org>
       Date:   Mon, 18 Mar 2019 11:03:37 +0100
       
       lnrouter fixes:
        - use gossip_queries_req instead of initial_routing_sync
        - add connected nodes to recent peers only after successful init
        - derive timestamp used with gossip_timestamp_filter from channel_db
        - fix query_short_channel_ids:
            1. channel IDs must be sorted with zlib
            2. limit request to 100
            3. do not abuse this to request node_announcements; it is fine not to have all nodes.
        - fix get_recent_peers:
            1. do not set last_connected_date to 'now' if we never connected a node
            2. sql query was misconstructed and was returning only one peer
        - populate FALLBACK_NODE_LIST_MAINNET with nodes that have the requested flags
       
       Diffstat:
         M electrum/lnpeer.py                  |      70 ++++++++++++++++++++-----------
         M electrum/lnrouter.py                |      56 ++++++++++++++-----------------
         M electrum/lnworker.py                |      41 +++++++++----------------------
         M electrum/tests/test_lnpeer.py       |       6 +++---
       
       4 files changed, 86 insertions(+), 87 deletions(-)
       ---
   DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py
       t@@ -14,6 +14,7 @@ from functools import partial
        from typing import List, Tuple, Dict, TYPE_CHECKING, Optional, Callable
        import traceback
        import sys
       +from datetime import datetime
        
        import aiorpcx
        
       t@@ -53,8 +54,7 @@ def channel_id_from_funding_tx(funding_txid: str, funding_index: int) -> Tuple[b
        
        class Peer(PrintError):
        
       -    def __init__(self, lnworker: 'LNWorker', pubkey:bytes, transport: LNTransportBase,
       -                 request_initial_sync=False):
       +    def __init__(self, lnworker: 'LNWorker', pubkey:bytes, transport: LNTransportBase):
                self.initialized = asyncio.Event()
                self.node_anns = []
                self.chan_anns = []
       t@@ -77,8 +77,7 @@ class Peer(PrintError):
                self.closing_signed = defaultdict(asyncio.Queue)
                self.payment_preimages = defaultdict(asyncio.Queue)
                self.localfeatures = LnLocalFeatures(0)
       -        if request_initial_sync:
       -            self.localfeatures |= LnLocalFeatures.INITIAL_ROUTING_SYNC
       +        self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_REQ
                self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ
                self.attempted_route = {}
                self.orphan_channel_updates = OrderedDict()
       t@@ -96,7 +95,6 @@ class Peer(PrintError):
            async def initialize(self):
                if isinstance(self.transport, LNTransport):
                    await self.transport.handshake()
       -            self.channel_db.add_recent_peer(self.transport.peer_addr)
                self.send_message("init", gflen=0, lflen=1, localfeatures=self.localfeatures)
        
            @property
       t@@ -172,8 +170,8 @@ class Peer(PrintError):
                            raise LightningPeerConnectionClosed("remote does not have even flag {}"
                                                                .format(str(LnLocalFeatures(1 << flag))))
                        self.localfeatures ^= 1 << flag  # disable flag
       -        first_timestamp = self.lnworker.get_first_timestamp()
       -        self.send_message('gossip_timestamp_filter', chain_hash=constants.net.rev_genesis_bytes(), first_timestamp=first_timestamp, timestamp_range=b"\xff"*4)
       +        if isinstance(self.transport, LNTransport):
       +            self.channel_db.add_recent_peer(self.transport.peer_addr)
                self.initialized.set()
        
            def on_node_announcement(self, payload):
       t@@ -215,6 +213,17 @@ class Peer(PrintError):
            @log_exceptions
            async def _gossip_loop(self):
                await self.initialized.wait()
       +        timestamp = self.channel_db.get_last_timestamp()
       +        if timestamp == 0:
       +            self.print_error('requesting whole channel graph')
       +        else:
       +            self.print_error('requesting channel graph since', datetime.fromtimestamp(timestamp).ctime())
       +        timestamp_range = int(time.time()) - timestamp
       +        self.send_message(
       +            'gossip_timestamp_filter',
       +            chain_hash=constants.net.rev_genesis_bytes(),
       +            first_timestamp=timestamp,
       +            timestamp_range=timestamp_range)
                while True:
                    await asyncio.sleep(5)
                    if self.node_anns:
       t@@ -226,13 +235,13 @@ class Peer(PrintError):
                    if self.chan_upds:
                        self.channel_db.on_channel_update(self.chan_upds)
                        self.chan_upds = []
       -            need_to_get = self.channel_db.missing_short_chan_ids() #type: Set[int]
       +            need_to_get = sorted(self.channel_db.missing_short_chan_ids())
                    if need_to_get and not self.receiving_channels:
       -                self.print_error('QUERYING SHORT CHANNEL IDS; missing', len(need_to_get), 'channels')
       -                zlibencoded = zlib.compress(bfh(''.join(need_to_get)))
       +                self.print_error('missing', len(need_to_get), 'channels')
       +                zlibencoded = zlib.compress(bfh(''.join(need_to_get[0:100])))
                        self.send_message(
                            'query_short_channel_ids',
       -                    chain_hash=bytes.fromhex(bitcoin.rev_hex(constants.net.GENESIS)),
       +                    chain_hash=constants.net.rev_genesis_bytes(),
                            len=1+len(zlibencoded),
                            encoded_short_ids=b'\x01' + zlibencoded)
                        self.receiving_channels = True
       t@@ -705,20 +714,33 @@ class Peer(PrintError):
                #   that the remote sends, even if the channel was not announced
                #   (from BOLT-07: "MAY create a channel_update to communicate the channel
                #    parameters to the final node, even though the channel has not yet been announced")
       -        self.channel_db.on_channel_announcement({"short_channel_id": chan.short_channel_id, "node_id_1": node_ids[0], "node_id_2": node_ids[1],
       -                                                 'chain_hash': constants.net.rev_genesis_bytes(), 'len': b'\x00\x00', 'features': b'',
       -                                                 'bitcoin_key_1': bitcoin_keys[0], 'bitcoin_key_2': bitcoin_keys[1]},
       -                                                trusted=True)
       +        self.channel_db.on_channel_announcement(
       +            {
       +                "short_channel_id": chan.short_channel_id,
       +                "node_id_1": node_ids[0],
       +                "node_id_2": node_ids[1],
       +                'chain_hash': constants.net.rev_genesis_bytes(),
       +                'len': b'\x00\x00',
       +                'features': b'',
       +                'bitcoin_key_1': bitcoin_keys[0],
       +                'bitcoin_key_2': bitcoin_keys[1]
       +            },
       +            trusted=True)
                # only inject outgoing direction:
       -        if node_ids[0] == privkey_to_pubkey(self.privkey):
       -            channel_flags = b'\x00'
       -        else:
       -            channel_flags = b'\x01'
       -        now = int(time.time()).to_bytes(4, byteorder="big")
       -        self.channel_db.on_channel_update({"short_channel_id": chan.short_channel_id, 'channel_flags': channel_flags, 'cltv_expiry_delta': b'\x90',
       -                                           'htlc_minimum_msat': b'\x03\xe8', 'fee_base_msat': b'\x03\xe8', 'fee_proportional_millionths': b'\x01',
       -                                           'chain_hash': constants.net.rev_genesis_bytes(), 'timestamp': now},
       -                                          trusted=True)
       +        channel_flags = b'\x00' if node_ids[0] == privkey_to_pubkey(self.privkey) else b'\x01'
       +        now = int(time.time())
       +        self.channel_db.on_channel_update(
       +            {
       +                "short_channel_id": chan.short_channel_id,
       +                'channel_flags': channel_flags,
       +                'cltv_expiry_delta': b'\x90',
       +                'htlc_minimum_msat': b'\x03\xe8',
       +                'fee_base_msat': b'\x03\xe8',
       +                'fee_proportional_millionths': b'\x01',
       +                'chain_hash': constants.net.rev_genesis_bytes(),
       +                'timestamp': now.to_bytes(4, byteorder="big")
       +            },
       +            trusted=True)
                # peer may have sent us a channel update for the incoming direction previously
                # note: if we were offline when the 3rd conf happened, lnd will never send us this channel_update
                # see https://github.com/lightningnetwork/lnd/issues/1347
   DIR diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py
       t@@ -23,6 +23,7 @@
        # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        # SOFTWARE.
        
       +from datetime import datetime
        import time
        import random
        import queue
       t@@ -157,9 +158,8 @@ class NodeInfo(Base):
                addresses = NodeInfo.parse_addresses_field(payload['addresses'])
                alias = payload['alias'].rstrip(b'\x00').hex()
                timestamp = int.from_bytes(payload['timestamp'], "big")
       -        now = int(time.time())
                return NodeInfo(node_id=node_id, features=features, timestamp=timestamp, alias=alias), [
       -            Address(host=host, port=port, node_id=node_id, last_connected_date=now) for host, port in addresses]
       +            Address(host=host, port=port, node_id=node_id, last_connected_date=None) for host, port in addresses]
        
            @staticmethod
            def parse_addresses_field(addresses_field):
       t@@ -206,8 +206,7 @@ class Address(Base):
            node_id = Column(String(66), ForeignKey('node_info.node_id'), primary_key=True)
            host = Column(String(256), primary_key=True)
            port = Column(Integer, primary_key=True)
       -    last_connected_date = Column(Integer(), nullable=False)
       -
       +    last_connected_date = Column(Integer(), nullable=True)
        
        
        
       t@@ -273,11 +272,8 @@ class ChannelDB(SqlDB):
        
            @sql
            def get_recent_peers(self):
       -        return [LNPeerAddr(x.host, x.port, bytes.fromhex(x.node_id)) for x in self.DBSession \
       -            .query(Address) \
       -            .select_from(NodeInfo) \
       -            .order_by(Address.last_connected_date.desc()) \
       -            .limit(self.NUM_MAX_RECENT_PEERS)]
       +        r = self.DBSession.query(Address).filter(Address.last_connected_date.isnot(None)).order_by(Address.last_connected_date.desc()).limit(self.NUM_MAX_RECENT_PEERS).all()
       +        return [LNPeerAddr(x.host, x.port, bytes.fromhex(x.node_id)) for x in r]
        
            @sql
            def get_channel_info(self, channel_id: bytes):
       t@@ -298,15 +294,6 @@ class ChannelDB(SqlDB):
                chan_ids_from_policy = set(x[0] for x in self.DBSession.query(Policy.short_channel_id).filter(expr).all())
                if chan_ids_from_policy:
                    return chan_ids_from_policy
       -        # fetch channels for node_ids missing in node_info. that will also give us node_announcement
       -        expr = not_(ChannelInfo.node1_id.in_(self.DBSession.query(NodeInfo.node_id)))
       -        chan_ids_from_id1 = set(x[0] for x in self.DBSession.query(ChannelInfo.short_channel_id).filter(expr).all())
       -        if chan_ids_from_id1:
       -            return chan_ids_from_id1
       -        expr = not_(ChannelInfo.node2_id.in_(self.DBSession.query(NodeInfo.node_id)))
       -        chan_ids_from_id2 = set(x[0] for x in self.DBSession.query(ChannelInfo.short_channel_id).filter(expr).all())
       -        if chan_ids_from_id2:
       -            return chan_ids_from_id2
                return set()
        
            @sql
       t@@ -318,7 +305,7 @@ class ChannelDB(SqlDB):
                self.DBSession.commit()
        
            @sql
       -    #@profiler
       +    @profiler
            def on_channel_announcement(self, msg_payloads, trusted=False):
                if type(msg_payloads) is dict:
                    msg_payloads = [msg_payloads]
       t@@ -342,11 +329,17 @@ class ChannelDB(SqlDB):
                for channel_info in new_channels.values():
                    self.DBSession.add(channel_info)
                self.DBSession.commit()
       -        self.print_error('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
       +        #self.print_error('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)))
                self._update_counts()
                self.network.trigger_callback('ln_status')
        
            @sql
       +    def get_last_timestamp(self):
       +        from sqlalchemy.sql import func
       +        r = self.DBSession.query(func.max(Policy.timestamp).label('max_timestamp')).one()
       +        return r.max_timestamp or 0
       +
       +    @sql
            @profiler
            def on_channel_update(self, msg_payloads, trusted=False):
                if type(msg_payloads) is dict:
       t@@ -368,7 +361,8 @@ class ChannelDB(SqlDB):
                    if not trusted and not verify_sig_for_channel_update(msg_payload, bytes.fromhex(node_id)):
                        continue
                    short_channel_id = channel_info.short_channel_id
       -            new_policy = Policy.from_msg(msg_payload, node_id, channel_info.short_channel_id)
       +            new_policy = Policy.from_msg(msg_payload, node_id, short_channel_id)
       +            #self.print_error('on_channel_update', datetime.fromtimestamp(new_policy.timestamp).ctime())
                    old_policy = self.DBSession.query(Policy).filter_by(short_channel_id=short_channel_id, start_node=node_id).one_or_none()
                    if old_policy:
                        if old_policy.timestamp >= new_policy.timestamp:
       t@@ -378,6 +372,7 @@ class ChannelDB(SqlDB):
                    if p and p.timestamp >= new_policy.timestamp:
                        continue
                    new_policies[(short_channel_id, node_id)] = new_policy
       +        self.print_error('on_channel_update: %d/%d'%(len(new_policies), len(msg_payloads)))
                # commit pending removals
                self.DBSession.commit()
                # add and commit new policies
       t@@ -386,7 +381,7 @@ class ChannelDB(SqlDB):
                self.DBSession.commit()
        
            @sql
       -    #@profiler
       +    @profiler
            def on_node_announcement(self, msg_payloads):
                if type(msg_payloads) is dict:
                    msg_payloads = [msg_payloads]
       t@@ -403,7 +398,13 @@ class ChannelDB(SqlDB):
                        node_info, node_addresses = NodeInfo.from_msg(msg_payload)
                    except UnknownEvenFeatureBits:
                        continue
       +            #self.print_error('received node announcement from', datetime.fromtimestamp(node_info.timestamp).ctime())
                    node_id = node_info.node_id
       +            # Ignore node if it has no associated channel (DoS protection)
       +            expr = or_(ChannelInfo.node1_id==node_id, ChannelInfo.node2_id==node_id)
       +            if self.DBSession.query(ChannelInfo.short_channel_id).filter(expr).count() == 0:
       +                #self.print_error('ignoring orphan node_announcement')
       +                continue
                    node = self.DBSession.query(NodeInfo).filter_by(node_id=node_id).one_or_none()
                    if node and node.timestamp >= node_info.timestamp:
                        continue
       t@@ -413,20 +414,13 @@ class ChannelDB(SqlDB):
                    new_nodes[node_id] = node_info
                    for addr in node_addresses:
                        new_addresses[(addr.node_id,addr.host,addr.port)] = addr
       -
       -        self.print_error("on_node_announcements: %d/%d"%(len(new_nodes), len(msg_payloads)))
       +        self.print_error("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads)))
                for node_info in new_nodes.values():
                    self.DBSession.add(node_info)
                for new_addr in new_addresses.values():
                    old_addr = self.DBSession.query(Address).filter_by(node_id=new_addr.node_id, host=new_addr.host, port=new_addr.port).one_or_none()
       -            if old_addr:
       -                old_addr.last_connected_date = new_addr.last_connected_date
       -            else:
       +            if not old_addr:
                        self.DBSession.add(new_addr)
       -            # TODO if this message is for a new node, and if we have no associated
       -            # channels for this node, we should ignore the message and return here,
       -            # to mitigate DOS. but race condition: the channels we have for this
       -            # node, might be under verification in self.ca_verifier, what then?
                self.DBSession.commit()
                self._update_counts()
                self.network.trigger_callback('ln_status')
   DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -59,10 +59,16 @@ FALLBACK_NODE_LIST_TESTNET = (
            LNPeerAddr('148.251.87.112', 9735, bfh('021a8bd8d8f1f2e208992a2eb755cdc74d44e66b6a0c924d3a3cce949123b9ce40')), # janus test server
            LNPeerAddr('122.199.61.90', 9735, bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')), # popular node https://1ml.com/testnet/node/038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9
        )
       -FALLBACK_NODE_LIST_MAINNET = (
       -    LNPeerAddr('104.198.32.198', 9735, bfh('02f6725f9c1c40333b67faea92fd211c183050f28df32cac3f9d69685fe9665432')), # Blockstream
       -    LNPeerAddr('13.80.67.162', 9735, bfh('02c0ac82c33971de096d87ce5ed9b022c2de678f08002dc37fdb1b6886d12234b5')),   # Stampery
       -)
       +
       +FALLBACK_NODE_LIST_MAINNET = [
       +    LNPeerAddr(host='52.168.166.221', port=9735, pubkey=b'\x02\x148+\xdc\xe7u\r\xfc\xb8\x12m\xf8\xe2\xb1-\xe3\x856\x90-\xc3j\xbc\xeb\xda\xee\xfd\xec\xa1\xdf\x82\x84'),
       +    LNPeerAddr(host='35.230.100.60', port=9735, pubkey=b'\x02?^5\x82qk\xed\x96\xf6\xf2l\xfc\xd8\x03~\x07GM{GC\xaf\xdc\x8b\x07\xe6\x92\xdfcFM~'),
       +    LNPeerAddr(host='40.69.71.114', port=9735, pubkey=b'\x02\x83\x03\x18,\x98\x85\xda\x93\xb3\xb2\\\x96!\xd2,\xf3Du\xe6<\x129B\xe4\x02\xabS\x0c\x05V\xe6u'),
       +    LNPeerAddr(host='62.210.110.5', port=9735, pubkey=b'\x02v\xe0\x9a&u\x92\xe7E\x1a\x93\x9c\x93,\xf6\x85\xf0uM\xe3\x82\xa3\xca\x85\xd2\xfb:\x86ML6Z\xd5'),
       +    LNPeerAddr(host='34.236.113.58', port=9735, pubkey=b'\x02\xfaP\xc7.\xe1\xe2\xeb_\x1bm\x9c02\x08\x0cL\x86Cs\xc4 \x1d\xfa)f\xaa4\xee\xe1\x05\x1f\x97'),
       +    LNPeerAddr(host='52.168.166.221', port=9735, pubkey=b'\x02\x148+\xdc\xe7u\r\xfc\xb8\x12m\xf8\xe2\xb1-\xe3\x856\x90-\xc3j\xbc\xeb\xda\xee\xfd\xec\xa1\xdf\x82\x84'),
       +    LNPeerAddr(host='34.236.113.58', port=9735, pubkey=b'\x02\xfaP\xc7.\xe1\xe2\xeb_\x1bm\x9c02\x08\x0cL\x86Cs\xc4 \x1d\xfa)f\xaa4\xee\xe1\x05\x1f\x97'),
       +]
        
        encoder = ChannelJsonEncoder()
        
       t@@ -103,29 +109,6 @@ class LNWorker(PrintError):
                asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.main_loop()), self.network.asyncio_loop)
                self.first_timestamp_requested = None
        
       -    def get_first_timestamp(self):
       -        first_request = False
       -        if self.first_timestamp_requested is None:
       -            self.first_timestamp_requested = time.time()
       -            first_request = True
       -        first_timestamp = self.storage.get('lightning_gossip_until', 0)
       -        if first_timestamp == 0:
       -            self.print_error('requesting whole channel graph')
       -        else:
       -            self.print_error('requesting channel graph since', datetime.fromtimestamp(first_timestamp).ctime())
       -        if first_request:
       -            asyncio.run_coroutine_threadsafe(self.save_gossip_timestamp(), self.network.asyncio_loop)
       -        return first_timestamp
       -
       -    @log_exceptions
       -    async def save_gossip_timestamp(self):
       -        while True:
       -            await asyncio.sleep(GRAPH_DOWNLOAD_SECONDS)
       -            yesterday = int(time.time()) - 24*60*60 # now minus a day
       -            self.storage.put('lightning_gossip_until', yesterday)
       -            self.storage.write()
       -            self.print_error('saved lightning gossip timestamp')
       -
            def payment_completed(self, chan: Channel, direction: Direction,
                                  htlc: UpdateAddHtlc):
                chan_id = chan.channel_id
       t@@ -258,7 +241,7 @@ class LNWorker(PrintError):
                transport = LNTransport(self.node_keypair.privkey, peer_addr)
                self._last_tried_peer[peer_addr] = time.time()
                self.print_error("adding peer", peer_addr)
       -        peer = Peer(self, node_id, transport, request_initial_sync=self.config.get("request_initial_sync", True))
       +        peer = Peer(self, node_id, transport)
                await self.network.main_taskgroup.spawn(peer.main_loop())
                self.peers[node_id] = peer
                self.network.trigger_callback('ln_status')
       t@@ -839,7 +822,7 @@ class LNWorker(PrintError):
                        except:
                            self.print_error('handshake failure from incoming connection')
                            return
       -                peer = Peer(self, node_id, transport, request_initial_sync=self.config.get("request_initial_sync", True))
       +                peer = Peer(self, node_id, transport)
                        self.peers[node_id] = peer
                        await self.network.main_taskgroup.spawn(peer.main_loop())
                        self.network.trigger_callback('ln_status')
   DIR diff --git a/electrum/tests/test_lnpeer.py b/electrum/tests/test_lnpeer.py
       t@@ -178,7 +178,7 @@ class TestPeer(SequentialTestCase):
            def test_require_data_loss_protect(self):
                mock_lnworker = MockLNWorker(keypair(), keypair(), self.alice_channel, tx_queue=None)
                mock_transport = NoFeaturesTransport('')
       -        p1 = Peer(mock_lnworker, b"\x00" * 33, mock_transport, request_initial_sync=False)
       +        p1 = Peer(mock_lnworker, b"\x00" * 33, mock_transport)
                mock_lnworker.peer = p1
                with self.assertRaises(LightningPeerConnectionClosed):
                    run(asyncio.wait_for(p1._message_loop(), 1))
       t@@ -189,8 +189,8 @@ class TestPeer(SequentialTestCase):
                q1, q2 = asyncio.Queue(), asyncio.Queue()
                w1 = MockLNWorker(k1, k2, self.alice_channel, tx_queue=q1)
                w2 = MockLNWorker(k2, k1, self.bob_channel, tx_queue=q2)
       -        p1 = Peer(w1, k1.pubkey, t1, request_initial_sync=False)
       -        p2 = Peer(w2, k2.pubkey, t2, request_initial_sync=False)
       +        p1 = Peer(w1, k1.pubkey, t1)
       +        p2 = Peer(w2, k2.pubkey, t2)
                w1.peer = p1
                w2.peer = p2
                # mark_open won't work if state is already OPEN.