tstore raw messages in gossip_db. Fixes #5960 - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 97900c09851e14fe0b5137b16cfb9b58fb4d4fd0 DIR parent 077f7786327167e70919a15880d880e565ba26af HTML Author: ThomasV <thomasv@electrum.org> Date: Fri, 28 Feb 2020 09:24:10 +0100 store raw messages in gossip_db. Fixes #5960 Diffstat: M electrum/channel_db.py | 76 +++++++++++++++---------------- 1 file changed, 37 insertions(+), 39 deletions(-) --- DIR diff --git a/electrum/channel_db.py b/electrum/channel_db.py t@@ -111,6 +111,12 @@ class Policy(NamedTuple): timestamp = int.from_bytes(payload['timestamp'], "big") ) + @staticmethod + def from_raw_msg(key:bytes, raw: bytes) -> 'Policy': + payload = decode_msg(raw)[1] + payload['start_node'] = key[8:] + return Policy.from_msg(payload) + def is_disabled(self): return self.channel_flags & FLAG_DISABLE t@@ -142,6 +148,11 @@ class NodeInfo(NamedTuple): Address(host=host, port=port, node_id=node_id, last_connected_date=None) for host, port in addresses] @staticmethod + def from_raw_msg(raw: bytes) -> 'NodeInfo': + payload_dict = decode_msg(raw)[1] + return NodeInfo.from_msg(payload_dict) + + @staticmethod def parse_addresses_field(addresses_field): buf = addresses_field def read(n): t@@ -197,35 +208,24 @@ class CategorizedChannelUpdates(NamedTuple): to_delete: List # database entries to delete -# TODO It would make more sense to store the raw gossip messages in the db. -# That is pretty much a pre-requisite of actively participating in gossip. create_channel_info = """ CREATE TABLE IF NOT EXISTS channel_info ( -short_channel_id VARCHAR(64), -node1_id VARCHAR(66), -node2_id VARCHAR(66), -capacity_sat INTEGER, +short_channel_id BLOB(8), +msg BLOB, PRIMARY KEY(short_channel_id) )""" create_policy = """ CREATE TABLE IF NOT EXISTS policy ( -key VARCHAR(66), -cltv_expiry_delta INTEGER NOT NULL, -htlc_minimum_msat INTEGER NOT NULL, -htlc_maximum_msat INTEGER, -fee_base_msat INTEGER NOT NULL, -fee_proportional_millionths INTEGER NOT NULL, -channel_flags INTEGER NOT NULL, -message_flags INTEGER NOT NULL, -timestamp INTEGER NOT NULL, +key BLOB(41), +msg BLOB, PRIMARY KEY(key) )""" create_address = """ CREATE TABLE IF NOT EXISTS address ( -node_id VARCHAR(66), +node_id BLOB(33), host STRING(256), port INTEGER NOT NULL, timestamp INTEGER, t@@ -234,10 +234,8 @@ PRIMARY KEY(node_id, host, port) create_node_info = """ CREATE TABLE IF NOT EXISTS node_info ( -node_id VARCHAR(66), -features INTEGER NOT NULL, -timestamp INTEGER NOT NULL, -alias STRING(64), +node_id BOB(33), +msg BLOB, PRIMARY KEY(node_id) )""" t@@ -247,7 +245,7 @@ class ChannelDB(SqlDB): NUM_MAX_RECENT_PEERS = 20 def __init__(self, network: 'Network'): - path = os.path.join(get_headers_dir(network.config), 'channel_db') + path = os.path.join(get_headers_dir(network.config), 'gossip_db') super().__init__(network, path, commit_interval=100) self.num_nodes = 0 self.num_channels = 0 t@@ -341,7 +339,7 @@ class ChannelDB(SqlDB): self._channels[channel_info.short_channel_id] = channel_info self._channels_for_node[channel_info.node1_id].add(channel_info.short_channel_id) self._channels_for_node[channel_info.node2_id].add(channel_info.short_channel_id) - self.save_channel(channel_info) + self.save_channel(channel_info.short_channel_id, msg['raw']) def print_change(self, old_policy: Policy, new_policy: Policy): # print what changed between policies t@@ -399,7 +397,7 @@ class ChannelDB(SqlDB): self.verify_channel_update(payload) policy = Policy.from_msg(payload) self._policies[key] = policy - self.save_policy(policy) + self.save_policy(policy.key, payload['raw']) # self.update_counts() return CategorizedChannelUpdates( t@@ -423,9 +421,9 @@ class ChannelDB(SqlDB): self.conn.commit() @sql - def save_policy(self, policy): + def save_policy(self, key, msg): c = self.conn.cursor() - c.execute("""REPLACE INTO policy (key, cltv_expiry_delta, htlc_minimum_msat, htlc_maximum_msat, fee_base_msat, fee_proportional_millionths, channel_flags, message_flags, timestamp) VALUES (?,?,?,?,?,?,?,?,?)""", list(policy)) + c.execute("""REPLACE INTO policy (key, msg) VALUES (?,?)""", [key, msg]) @sql def delete_policy(self, node_id, short_channel_id): t@@ -434,9 +432,9 @@ class ChannelDB(SqlDB): c.execute("""DELETE FROM policy WHERE key=?""", (key,)) @sql - def save_channel(self, channel_info): + def save_channel(self, short_channel_id, msg): c = self.conn.cursor() - c.execute("REPLACE INTO channel_info (short_channel_id, node1_id, node2_id, capacity_sat) VALUES (?,?,?,?)", list(channel_info)) + c.execute("REPLACE INTO channel_info (short_channel_id, msg) VALUES (?,?)", [short_channel_id, msg]) @sql def delete_channel(self, short_channel_id): t@@ -444,9 +442,9 @@ class ChannelDB(SqlDB): c.execute("""DELETE FROM channel_info WHERE short_channel_id=?""", (short_channel_id,)) @sql - def save_node(self, node_info): + def save_node_info(self, node_id, msg): c = self.conn.cursor() - c.execute("REPLACE INTO node_info (node_id, features, timestamp, alias) VALUES (?,?,?,?)", list(node_info)) + c.execute("REPLACE INTO node_info (node_id, msg) VALUES (?,?)", [node_id, msg]) @sql def save_node_address(self, node_id, peer, now): t@@ -493,7 +491,7 @@ class ChannelDB(SqlDB): continue # save self._nodes[node_id] = node_info - self.save_node(node_info) + self.save_node_info(node_id, msg_payload['raw']) for addr in node_addresses: self._addresses[node_id].add((addr.host, addr.port, 0)) self.save_node_addresses(node_id, node_addresses) t@@ -553,17 +551,17 @@ class ChannelDB(SqlDB): node_id, host, port, timestamp = x self._addresses[node_id].add((str(host), int(port), int(timestamp or 0))) c.execute("""SELECT * FROM channel_info""") - for x in c: - x = (ShortChannelID.normalize(x[0]), *x[1:]) - ci = ChannelInfo(*x) - self._channels[ci.short_channel_id] = ci + for short_channel_id, msg in c: + ci = ChannelInfo.from_raw_msg(msg) + self._channels[short_channel_id] = ci c.execute("""SELECT * FROM node_info""") - for x in c: - ni = NodeInfo(*x) - self._nodes[ni.node_id] = ni + for node_id, msg in c: + node_info, node_addresses = NodeInfo.from_raw_msg(msg) + # don't load node_addresses because they dont have timestamps + self._nodes[node_id] = node_info c.execute("""SELECT * FROM policy""") - for x in c: - p = Policy(*x) + for key, msg in c: + p = Policy.from_raw_msg(key, msg) self._policies[(p.start_node, p.short_channel_id)] = p for channel_info in self._channels.values(): self._channels_for_node[channel_info.node1_id].add(channel_info.short_channel_id)