tMove callback manager out of Network class - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit 92244041081db96b92925c9e76b117035e241011 DIR parent 73325831b7075f461e106f36099865a3fedadb72 HTML Author: ThomasV <thomasv@electrum.org> Date: Tue, 14 Apr 2020 16:12:47 +0200 Move callback manager out of Network class Diffstat: M electrum/address_synchronizer.py | 8 ++++---- M electrum/channel_db.py | 6 +++--- M electrum/daemon.py | 3 ++- M electrum/exchange_rate.py | 10 ++++------ M electrum/gui/kivy/main_window.py | 30 +++++++++++++++--------------- M electrum/gui/qt/channel_details.py | 9 +++++---- M electrum/gui/qt/lightning_dialog.py | 7 ++++--- M electrum/gui/qt/main_window.py | 9 ++++----- M electrum/gui/qt/network_dialog.py | 4 ++-- M electrum/gui/stdio.py | 3 ++- M electrum/gui/text.py | 4 ++-- M electrum/interface.py | 6 +++--- M electrum/lnchannel.py | 8 +++----- M electrum/lnpeer.py | 8 ++++---- M electrum/lnwatcher.py | 8 +++++--- M electrum/lnworker.py | 60 ++++++++++++++++---------------- M electrum/network.py | 47 ++++++++----------------------- M electrum/synchronizer.py | 5 +++-- M electrum/util.py | 40 ++++++++++++++++++++++++++++++- 19 files changed, 146 insertions(+), 129 deletions(-) --- DIR diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py t@@ -28,7 +28,7 @@ import itertools from collections import defaultdict from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List -from . import bitcoin +from . import bitcoin, util from .bitcoin import COINBASE_MATURITY from .util import profiler, bfh, TxMinedInfo from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction t@@ -161,7 +161,7 @@ class AddressSynchronizer(Logger): if self.network is not None: self.synchronizer = Synchronizer(self) self.verifier = SPV(self.network, self) - self.network.register_callback(self.on_blockchain_updated, ['blockchain_updated']) + util.register_callback(self.on_blockchain_updated, ['blockchain_updated']) def on_blockchain_updated(self, event, *args): self._get_addr_balance_cache = {} # invalidate cache t@@ -174,7 +174,7 @@ class AddressSynchronizer(Logger): if self.verifier: asyncio.run_coroutine_threadsafe(self.verifier.stop(), self.network.asyncio_loop) self.verifier = None - self.network.unregister_callback(self.on_blockchain_updated) + util.unregister_callback(self.on_blockchain_updated) self.db.put('stored_height', self.get_local_height()) def add_address(self, address): t@@ -546,7 +546,7 @@ class AddressSynchronizer(Logger): self.unverified_tx.pop(tx_hash, None) self.db.add_verified_tx(tx_hash, info) tx_mined_status = self.get_tx_height(tx_hash) - self.network.trigger_callback('verified', self, tx_hash, tx_mined_status) + util.trigger_callback('verified', self, tx_hash, tx_mined_status) def get_unverified_txs(self): '''Returns a map from tx hash to transaction height''' DIR diff --git a/electrum/channel_db.py b/electrum/channel_db.py t@@ -35,7 +35,7 @@ import threading from .sql_db import SqlDB, sql -from . import constants +from . import constants, util from .util import bh2u, profiler, get_headers_dir, bfh, is_ip_address, list_enabled_bits from .logging import Logger from .lnutil import (LNPeerAddr, format_short_channel_id, ShortChannelID, t@@ -269,8 +269,8 @@ class ChannelDB(SqlDB): self.num_nodes = len(self._nodes) self.num_channels = len(self._channels) self.num_policies = len(self._policies) - self.network.trigger_callback('channel_db', self.num_nodes, self.num_channels, self.num_policies) - self.network.trigger_callback('ln_gossip_sync_progress') + util.trigger_callback('channel_db', self.num_nodes, self.num_channels, self.num_policies) + util.trigger_callback('ln_gossip_sync_progress') def get_channel_ids(self): with self.lock: DIR diff --git a/electrum/daemon.py b/electrum/daemon.py t@@ -41,6 +41,7 @@ from jsonrpcserver import response from jsonrpcclient.clients.aiohttp_client import AiohttpClient from aiorpcx import TaskGroup +from . import util from .network import Network from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare) from .util import PR_PAID, PR_EXPIRED, get_request_status t@@ -181,7 +182,7 @@ class PayServer(Logger): self.daemon = daemon self.config = daemon.config self.pending = defaultdict(asyncio.Event) - self.daemon.network.register_callback(self.on_payment, ['payment_received']) + util.register_callback(self.on_payment, ['payment_received']) async def on_payment(self, evt, wallet, key, status): if status == PR_PAID: DIR diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py t@@ -12,6 +12,7 @@ from typing import Sequence, Optional from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup +from . import util from .bitcoin import COIN from .i18n import _ from .util import (ThreadJob, make_dir, log_exceptions, t@@ -456,8 +457,7 @@ class FxThread(ThreadJob): ThreadJob.__init__(self) self.config = config self.network = network - if self.network: - self.network.register_callback(self.set_proxy, ['proxy_set']) + util.register_callback(self.set_proxy, ['proxy_set']) self.ccy = self.get_currency() self.history_used_spot = False self.ccy_combo = None t@@ -567,12 +567,10 @@ class FxThread(ThreadJob): self.exchange.read_historical_rates(self.ccy, self.cache_dir) def on_quotes(self): - if self.network: - self.network.trigger_callback('on_quotes') + util.trigger_callback('on_quotes') def on_history(self): - if self.network: - self.network.trigger_callback('on_history') + util.trigger_callback('on_history') def exchange_rate(self) -> Decimal: """Returns the exchange rate as a Decimal""" DIR diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py t@@ -13,6 +13,7 @@ from electrum.storage import WalletStorage, StorageReadWriteError from electrum.wallet_db import WalletDB from electrum.wallet import Wallet, InternalAddressCorruption, Abstract_Wallet from electrum.plugin import run_hook +from electrum import util from electrum.util import (profiler, InvalidPassword, send_exception_to_crash_reporter, format_satoshis, format_satoshis_plain, format_fee_satoshis, PR_PAID, PR_FAILED, maybe_extract_bolt11_invoice) t@@ -50,7 +51,6 @@ from .uix.dialogs.question import Question # delayed imports: for startup speed on android notification = app = ref = None -util = False # register widget cache for keeping memory down timeout to forever to cache # the data t@@ -565,20 +565,20 @@ class ElectrumWindow(App): if self.network: interests = ['wallet_updated', 'network_updated', 'blockchain_updated', 'status', 'new_transaction', 'verified'] - self.network.register_callback(self.on_network_event, interests) - self.network.register_callback(self.on_fee, ['fee']) - self.network.register_callback(self.on_fee_histogram, ['fee_histogram']) - self.network.register_callback(self.on_quotes, ['on_quotes']) - self.network.register_callback(self.on_history, ['on_history']) - self.network.register_callback(self.on_channels, ['channels_updated']) - self.network.register_callback(self.on_channel, ['channel']) - self.network.register_callback(self.on_invoice_status, ['invoice_status']) - self.network.register_callback(self.on_request_status, ['request_status']) - self.network.register_callback(self.on_payment_failed, ['payment_failed']) - self.network.register_callback(self.on_payment_succeeded, ['payment_succeeded']) - self.network.register_callback(self.on_channel_db, ['channel_db']) - self.network.register_callback(self.set_num_peers, ['gossip_peers']) - self.network.register_callback(self.set_unknown_channels, ['unknown_channels']) + util.register_callback(self.on_network_event, interests) + util.register_callback(self.on_fee, ['fee']) + util.register_callback(self.on_fee_histogram, ['fee_histogram']) + util.register_callback(self.on_quotes, ['on_quotes']) + util.register_callback(self.on_history, ['on_history']) + util.register_callback(self.on_channels, ['channels_updated']) + util.register_callback(self.on_channel, ['channel']) + util.register_callback(self.on_invoice_status, ['invoice_status']) + util.register_callback(self.on_request_status, ['request_status']) + util.register_callback(self.on_payment_failed, ['payment_failed']) + util.register_callback(self.on_payment_succeeded, ['payment_succeeded']) + util.register_callback(self.on_channel_db, ['channel_db']) + util.register_callback(self.set_num_peers, ['gossip_peers']) + util.register_callback(self.set_unknown_channels, ['unknown_channels']) # load wallet self.load_wallet_by_name(self.electrum_config.get_wallet_path(use_gui_last_wallet=True)) # URI passed in config DIR diff --git a/electrum/gui/qt/channel_details.py b/electrum/gui/qt/channel_details.py t@@ -5,6 +5,7 @@ import PyQt5.QtWidgets as QtWidgets import PyQt5.QtCore as QtCore from PyQt5.QtWidgets import QLabel, QLineEdit +from electrum import util from electrum.i18n import _ from electrum.util import bh2u, format_time from electrum.lnutil import format_short_channel_id, LOCAL, REMOTE, UpdateAddHtlc, Direction t@@ -132,10 +133,10 @@ class ChannelDetailsDialog(QtWidgets.QDialog): self.htlc_added.connect(self.do_htlc_added) # register callbacks for updating - window.network.register_callback(self.ln_payment_completed.emit, ['ln_payment_completed']) - window.network.register_callback(self.ln_payment_failed.emit, ['ln_payment_failed']) - window.network.register_callback(self.htlc_added.emit, ['htlc_added']) - window.network.register_callback(self.state_changed.emit, ['channel']) + util.register_callback(self.ln_payment_completed.emit, ['ln_payment_completed']) + util.register_callback(self.ln_payment_failed.emit, ['ln_payment_failed']) + util.register_callback(self.htlc_added.emit, ['htlc_added']) + util.register_callback(self.state_changed.emit, ['channel']) # set attributes of QDialog self.setWindowTitle(_('Channel Details')) DIR diff --git a/electrum/gui/qt/lightning_dialog.py b/electrum/gui/qt/lightning_dialog.py t@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING from PyQt5.QtWidgets import (QDialog, QLabel, QVBoxLayout, QPushButton) +from electrum import util from electrum.i18n import _ from .util import Buttons t@@ -58,9 +59,9 @@ class LightningDialog(QDialog): b = QPushButton(_('Close')) b.clicked.connect(self.close) vbox.addLayout(Buttons(b)) - self.network.register_callback(self.on_channel_db, ['channel_db']) - self.network.register_callback(self.set_num_peers, ['gossip_peers']) - self.network.register_callback(self.set_unknown_channels, ['unknown_channels']) + util.register_callback(self.on_channel_db, ['channel_db']) + util.register_callback(self.set_num_peers, ['gossip_peers']) + util.register_callback(self.set_unknown_channels, ['unknown_channels']) self.network.channel_db.update_counts() # trigger callback self.set_num_peers('', self.network.lngossip.num_peers()) self.set_unknown_channels('', len(self.network.lngossip.unknown_ids)) DIR diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py t@@ -272,7 +272,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): # window from being GC-ed when closed, callbacks should be # methods of this class only, and specifically not be # partials, lambdas or methods of subobjects. Hence... - self.network.register_callback(self.on_network, interests) + util.register_callback(self.on_network, interests) # set initial message self.console.showMessage(self.network.banner) t@@ -466,8 +466,8 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): def load_wallet(self, wallet): wallet.thread = TaskThread(self, self.on_error) self.update_recently_visited(wallet.storage.path) - if wallet.lnworker and wallet.network: - wallet.network.trigger_callback('channels_updated', wallet) + if wallet.lnworker: + util.trigger_callback('channels_updated', wallet) self.need_update.set() # Once GUI has been initialized check if we want to announce something since the callback has been called before the GUI was initialized # update menus t@@ -2889,8 +2889,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): def clean_up(self): self.wallet.thread.stop() - if self.network: - self.network.unregister_callback(self.on_network) + util.unregister_callback(self.on_network) self.config.set_key("is_maximized", self.isMaximized()) if not self.isMaximized(): g = self.geometry() DIR diff --git a/electrum/gui/qt/network_dialog.py b/electrum/gui/qt/network_dialog.py t@@ -35,7 +35,7 @@ from PyQt5.QtWidgets import (QTreeWidget, QTreeWidgetItem, QMenu, QGridLayout, Q from PyQt5.QtGui import QFontMetrics from electrum.i18n import _ -from electrum import constants, blockchain +from electrum import constants, blockchain, util from electrum.interface import serialize_server, deserialize_server from electrum.network import Network from electrum.logging import get_logger t@@ -61,7 +61,7 @@ class NetworkDialog(QDialog): vbox.addLayout(Buttons(CloseButton(self))) self.network_updated_signal_obj.network_updated_signal.connect( self.on_update) - network.register_callback(self.on_network, ['network_updated']) + util.register_callback(self.on_network, ['network_updated']) def on_network(self, event, *args): self.network_updated_signal_obj.network_updated_signal.emit(event, args) DIR diff --git a/electrum/gui/stdio.py b/electrum/gui/stdio.py t@@ -3,6 +3,7 @@ import getpass import datetime import logging +from electrum import util from electrum import WalletStorage, Wallet from electrum.util import format_satoshis from electrum.bitcoin import is_address, COIN t@@ -43,7 +44,7 @@ class ElectrumGui: self.wallet.start_network(self.network) self.contacts = self.wallet.contacts - self.network.register_callback(self.on_network, ['wallet_updated', 'network_updated', 'banner']) + util.register_callback(self.on_network, ['wallet_updated', 'network_updated', 'banner']) self.commands = [_("[h] - displays this help text"), \ _("[i] - display transaction history"), \ _("[o] - enter payment order"), \ DIR diff --git a/electrum/gui/text.py b/electrum/gui/text.py t@@ -8,6 +8,7 @@ import getpass import logging import electrum +from electrum import util from electrum.util import format_satoshis from electrum.bitcoin import is_address, COIN from electrum.transaction import PartialTxOutput t@@ -65,8 +66,7 @@ class ElectrumGui: self.str_fee = "" self.history = None - if self.network: - self.network.register_callback(self.update, ['wallet_updated', 'network_updated']) + util.register_callback(self.update, ['wallet_updated', 'network_updated']) self.tab_names = [_("History"), _("Send"), _("Receive"), _("Addresses"), _("Contacts"), _("Banner")] self.num_tabs = len(self.tab_names) DIR diff --git a/electrum/interface.py b/electrum/interface.py t@@ -548,7 +548,7 @@ class Interface(Logger): raise GracefulDisconnect('server tip below max checkpoint') self._mark_ready() await self._process_header_at_tip() - self.network.trigger_callback('network_updated') + util.trigger_callback('network_updated') await self.network.switch_unwanted_fork_interface() await self.network.switch_lagging_interface() t@@ -563,7 +563,7 @@ class Interface(Logger): # in the simple case, height == self.tip+1 if height <= self.tip: await self.sync_until(height) - self.network.trigger_callback('blockchain_updated') + util.trigger_callback('blockchain_updated') async def sync_until(self, height, next_height=None): if next_height is None: t@@ -578,7 +578,7 @@ class Interface(Logger): raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') last, height = await self.step(height) continue - self.network.trigger_callback('network_updated') + util.trigger_callback('network_updated') height = (height // 2016 * 2016) + num_headers assert height <= next_height+1, (height, self.tip) last = 'catchup' DIR diff --git a/electrum/lnchannel.py b/electrum/lnchannel.py t@@ -33,7 +33,7 @@ from aiorpcx import NetAddress import attr from . import ecc -from . import constants +from . import constants, util from .util import bfh, bh2u, chunks, TxMinedInfo from .bitcoin import redeem_script_to_address from .crypto import sha256, sha256d t@@ -679,16 +679,14 @@ class Channel(AbstractChannel): def set_frozen_for_sending(self, b: bool) -> None: self.storage['frozen_for_sending'] = bool(b) - if self.lnworker: - self.lnworker.network.trigger_callback('channel', self) + util.trigger_callback('channel', self) def is_frozen_for_receiving(self) -> bool: return self.storage.get('frozen_for_receiving', False) def set_frozen_for_receiving(self, b: bool) -> None: self.storage['frozen_for_receiving'] = bool(b) - if self.lnworker: - self.lnworker.network.trigger_callback('channel', self) + util.trigger_callback('channel', self) def _assert_can_add_htlc(self, *, htlc_proposer: HTLCOwner, amount_msat: int) -> None: """Raises PaymentFailure if the htlc_proposer cannot add this new HTLC. DIR diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py t@@ -19,7 +19,7 @@ from datetime import datetime import aiorpcx from .crypto import sha256, sha256d -from . import bitcoin +from . import bitcoin, util from . import ecc from .ecc import sig_string_from_r_and_s, get_r_and_s_from_sig_string, der_sig_from_sig_string from . import constants t@@ -744,7 +744,7 @@ class Peer(Logger): f'already in peer_state {chan.peer_state}') return chan.peer_state = PeerState.REESTABLISHING - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) # BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side" chan.hm.discard_unsigned_remote_updates() # ctns t@@ -891,7 +891,7 @@ class Peer(Logger): # checks done if chan.is_funded() and chan.config[LOCAL].funding_locked_received: self.mark_open(chan) - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) if chan.get_state() == ChannelState.CLOSING: await self.send_shutdown(chan) t@@ -979,7 +979,7 @@ class Peer(Logger): return assert chan.config[LOCAL].funding_locked_received chan.set_state(ChannelState.OPEN) - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) # peer may have sent us a channel update for the incoming direction previously pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id) if pending_channel_update: DIR diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py t@@ -8,6 +8,7 @@ import asyncio from enum import IntEnum, auto from typing import NamedTuple, Dict +from . import util from .sql_db import SqlDB, sql from .wallet_db import WalletDB from .util import bh2u, bfh, log_exceptions, ignore_exceptions, TxMinedInfo t@@ -139,8 +140,9 @@ class LNWatcher(AddressSynchronizer): self.config = network.config self.channels = {} self.network = network - self.network.register_callback(self.on_network_update, - ['network_updated', 'blockchain_updated', 'verified', 'wallet_updated', 'fee']) + util.register_callback( + self.on_network_update, + ['network_updated', 'blockchain_updated', 'verified', 'wallet_updated', 'fee']) # status gets populated when we run self.channel_status = {} t@@ -420,4 +422,4 @@ class LNWalletWatcher(LNWatcher): tx_was_added = False if tx_was_added: self.logger.info(f'added future tx: {name}. prevout: {prevout}') - self.network.trigger_callback('wallet_updated', self.lnworker.wallet) + util.trigger_callback('wallet_updated', self.lnworker.wallet) DIR diff --git a/electrum/lnworker.py b/electrum/lnworker.py t@@ -21,7 +21,7 @@ import dns.resolver import dns.exception from aiorpcx import run_in_thread -from . import constants +from . import constants, util from . import keystore from .util import profiler from .util import PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING t@@ -367,7 +367,7 @@ class LNGossip(LNWorker): max_age = 14*24*3600 LOGGING_SHORTCUT = 'g' - def __init__(self, network): + def __init__(self): seed = os.urandom(32) node = BIP32Node.from_rootseed(seed, xtype='standard') xprv = node.to_xprv() t@@ -393,16 +393,16 @@ class LNGossip(LNWorker): known = self.channel_db.get_channel_ids() new = set(ids) - set(known) self.unknown_ids.update(new) - self.network.trigger_callback('unknown_channels', len(self.unknown_ids)) - self.network.trigger_callback('gossip_peers', self.num_peers()) - self.network.trigger_callback('ln_gossip_sync_progress') + util.trigger_callback('unknown_channels', len(self.unknown_ids)) + util.trigger_callback('gossip_peers', self.num_peers()) + util.trigger_callback('ln_gossip_sync_progress') def get_ids_to_query(self): N = 500 l = list(self.unknown_ids) self.unknown_ids = set(l[N:]) - self.network.trigger_callback('unknown_channels', len(self.unknown_ids)) - self.network.trigger_callback('ln_gossip_sync_progress') + util.trigger_callback('unknown_channels', len(self.unknown_ids)) + util.trigger_callback('ln_gossip_sync_progress') return l[0:N] def get_sync_progress_estimate(self) -> Tuple[Optional[int], Optional[int]]: t@@ -514,7 +514,7 @@ class LNWallet(LNWorker): def peer_closed(self, peer): for chan in self.channels_for_peer(peer.pubkey).values(): chan.peer_state = PeerState.DISCONNECTED - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) super().peer_closed(peer) def get_settled_payments(self): t@@ -645,14 +645,14 @@ class LNWallet(LNWorker): def channel_state_changed(self, chan): self.save_channel(chan) - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) def save_channel(self, chan): assert type(chan) is Channel if chan.config[REMOTE].next_per_commitment_point == chan.config[REMOTE].current_per_commitment_point: raise Exception("Tried to save channel with next_point == current_point, this should not happen") self.wallet.save_db() - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) def channel_by_txo(self, txo): with self.lock: t@@ -703,7 +703,7 @@ class LNWallet(LNWorker): funding_sat=funding_sat, push_msat=push_sat * 1000, temp_channel_id=os.urandom(32)) - self.network.trigger_callback('channels_updated', self.wallet) + util.trigger_callback('channels_updated', self.wallet) self.wallet.add_transaction(funding_tx) # save tx as local into the wallet self.wallet.set_label(funding_tx.txid(), _('Open channel')) if funding_tx.is_complete(): t@@ -804,10 +804,10 @@ class LNWallet(LNWorker): # note: path-finding runs in a separate thread so that we don't block the asyncio loop # graph updates might occur during the computation self.set_invoice_status(key, PR_ROUTING) - self.network.trigger_callback('invoice_status', key) + util.trigger_callback('invoice_status', key) route = await run_in_thread(self._create_route_from_invoice, lnaddr) self.set_invoice_status(key, PR_INFLIGHT) - self.network.trigger_callback('invoice_status', key) + util.trigger_callback('invoice_status', key) payment_attempt_log = await self._pay_to_route(route, lnaddr) except Exception as e: log.append(PaymentAttemptLog(success=False, exception=e)) t@@ -820,11 +820,11 @@ class LNWallet(LNWorker): break else: reason = _('Failed after {} attempts').format(attempts) - self.network.trigger_callback('invoice_status', key) + util.trigger_callback('invoice_status', key) if success: - self.network.trigger_callback('payment_succeeded', key) + util.trigger_callback('payment_succeeded', key) else: - self.network.trigger_callback('payment_failed', key, reason) + util.trigger_callback('payment_failed', key, reason) return success async def _pay_to_route(self, route: LNPaymentRoute, lnaddr: LnAddr) -> PaymentAttemptLog: t@@ -840,7 +840,7 @@ class LNWallet(LNWorker): payment_hash=lnaddr.paymenthash, min_final_cltv_expiry=lnaddr.get_min_final_cltv_expiry(), payment_secret=lnaddr.payment_secret) - self.network.trigger_callback('htlc_added', htlc, lnaddr, SENT) + util.trigger_callback('htlc_added', htlc, lnaddr, SENT) payment_attempt = await self.await_payment(lnaddr.paymenthash) if payment_attempt.success: failure_log = None t@@ -1139,9 +1139,9 @@ class LNWallet(LNWorker): f.set_result(payment_attempt) else: chan.logger.info('received unexpected payment_failed, probably from previous session') - self.network.trigger_callback('invoice_status', key) - self.network.trigger_callback('payment_failed', key, '') - self.network.trigger_callback('ln_payment_failed', payment_hash, chan.channel_id) + util.trigger_callback('invoice_status', key) + util.trigger_callback('payment_failed', key, '') + util.trigger_callback('ln_payment_failed', payment_hash, chan.channel_id) def payment_sent(self, chan, payment_hash: bytes): self.set_payment_status(payment_hash, PR_PAID) t@@ -1155,14 +1155,14 @@ class LNWallet(LNWorker): f.set_result(payment_attempt) else: chan.logger.info('received unexpected payment_sent, probably from previous session') - self.network.trigger_callback('invoice_status', key) - self.network.trigger_callback('payment_succeeded', key) - self.network.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id) + util.trigger_callback('invoice_status', key) + util.trigger_callback('payment_succeeded', key) + util.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id) def payment_received(self, chan, payment_hash: bytes): self.set_payment_status(payment_hash, PR_PAID) - self.network.trigger_callback('request_status', payment_hash.hex(), PR_PAID) - self.network.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id) + util.trigger_callback('request_status', payment_hash.hex(), PR_PAID) + util.trigger_callback('ln_payment_completed', payment_hash, chan.channel_id) async def _calc_routing_hints_for_invoice(self, amount_sat): """calculate routing hints (BOLT-11 'r' field)""" t@@ -1251,8 +1251,8 @@ class LNWallet(LNWorker): self.channels.pop(chan_id) self.db.get('channels').pop(chan_id.hex()) - self.network.trigger_callback('channels_updated', self.wallet) - self.network.trigger_callback('wallet_updated', self.wallet) + util.trigger_callback('channels_updated', self.wallet) + util.trigger_callback('wallet_updated', self.wallet) @ignore_exceptions @log_exceptions t@@ -1355,7 +1355,7 @@ class LNBackups(Logger): self.channel_backups[bfh(channel_id)] = ChannelBackup(cb, sweep_address=self.sweep_address, lnworker=self) def channel_state_changed(self, chan): - self.network.trigger_callback('channel', chan) + util.trigger_callback('channel', chan) def peer_closed(self, chan): pass t@@ -1389,7 +1389,7 @@ class LNBackups(Logger): d[channel_id] = cb_storage self.channel_backups[bfh(channel_id)] = cb = ChannelBackup(cb_storage, sweep_address=self.sweep_address, lnworker=self) self.wallet.save_db() - self.network.trigger_callback('channels_updated', self.wallet) + util.trigger_callback('channels_updated', self.wallet) self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address()) def remove_channel_backup(self, channel_id): t@@ -1399,7 +1399,7 @@ class LNBackups(Logger): d.pop(channel_id.hex()) self.channel_backups.pop(channel_id) self.wallet.save_db() - self.network.trigger_callback('channels_updated', self.wallet) + util.trigger_callback('channels_updated', self.wallet) @log_exceptions async def request_force_close(self, channel_id): DIR diff --git a/electrum/network.py b/electrum/network.py t@@ -278,7 +278,6 @@ class Network(Logger): # locks self.restart_lock = asyncio.Lock() self.bhi_lock = asyncio.Lock() - self.callback_lock = threading.Lock() self.recent_servers_lock = threading.RLock() # <- re-entrant self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces t@@ -288,8 +287,6 @@ class Network(Logger): self.banner = '' self.donation_address = '' self.relay_fee = None # type: Optional[int] - # callbacks set by the GUI - self.callbacks = defaultdict(list) # note: needs self.callback_lock dir_path = os.path.join(self.config.path, 'certs') util.make_dir(dir_path) t@@ -332,7 +329,7 @@ class Network(Logger): from . import channel_db self.channel_db = channel_db.ChannelDB(self) self.path_finder = lnrouter.LNPathFinder(self.channel_db) - self.lngossip = lnworker.LNGossip(self) + self.lngossip = lnworker.LNGossip() self.lngossip.start_network(self) def run_from_another_thread(self, coro, *, timeout=None): t@@ -350,27 +347,6 @@ class Network(Logger): return func(self, *args, **kwargs) return func_wrapper - def register_callback(self, callback, events): - with self.callback_lock: - for event in events: - self.callbacks[event].append(callback) - - def unregister_callback(self, callback): - with self.callback_lock: - for callbacks in self.callbacks.values(): - if callback in callbacks: - callbacks.remove(callback) - - def trigger_callback(self, event, *args): - with self.callback_lock: - callbacks = self.callbacks[event][:] - for callback in callbacks: - # FIXME: if callback throws, we will lose the traceback - if asyncio.iscoroutinefunction(callback): - asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) - else: - self.asyncio_loop.call_soon_threadsafe(callback, event, *args) - def _read_recent_servers(self): if not self.config.path: return [] t@@ -481,9 +457,9 @@ class Network(Logger): def notify(self, key): if key in ['status', 'updated']: - self.trigger_callback(key) + util.trigger_callback(key) else: - self.trigger_callback(key, self.get_status_value(key)) + util.trigger_callback(key, self.get_status_value(key)) def get_parameters(self) -> NetworkParameters: host, port, protocol = deserialize_server(self.default_server) t@@ -574,7 +550,7 @@ class Network(Logger): self.proxy = proxy dns_hacks.configure_dns_depending_on_proxy(bool(proxy)) self.logger.info(f'setting proxy {proxy}') - self.trigger_callback('proxy_set', self.proxy) + util.trigger_callback('proxy_set', self.proxy) @log_exceptions async def set_parameters(self, net_params: NetworkParameters): t@@ -700,12 +676,13 @@ class Network(Logger): blockchain_updated = i.blockchain != self.blockchain() self.interface = i await i.taskgroup.spawn(self._request_server_info(i)) - self.trigger_callback('default_server_changed') + util.trigger_callback('default_server_changed') self.default_server_changed_event.set() self.default_server_changed_event.clear() self._set_status('connected') - self.trigger_callback('network_updated') - if blockchain_updated: self.trigger_callback('blockchain_updated') + util.trigger_callback('network_updated') + if blockchain_updated: + util.trigger_callback('blockchain_updated') async def _close_interface(self, interface: Interface): if interface: t@@ -734,7 +711,7 @@ class Network(Logger): if server == self.default_server: self._set_status('disconnected') await self._close_interface(interface) - self.trigger_callback('network_updated') + util.trigger_callback('network_updated') def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int: if self.oneserver and not self.auto_connect: t@@ -767,7 +744,7 @@ class Network(Logger): await self.switch_to_interface(server) self._add_recent_server(server) - self.trigger_callback('network_updated') + util.trigger_callback('network_updated') def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool: # main interface is exempt. this makes switching servers easier t@@ -1152,7 +1129,7 @@ class Network(Logger): self.logger.info("taskgroup stopped.") asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop) - self.trigger_callback('network_updated') + util.trigger_callback('network_updated') def start(self, jobs: Iterable = None): """Schedule starting the network, along with the given job co-routines. t@@ -1176,7 +1153,7 @@ class Network(Logger): self.connecting.clear() self.server_queue = None if not full_shutdown: - self.trigger_callback('network_updated') + util.trigger_callback('network_updated') def stop(self): assert self._loop_thread != threading.current_thread(), 'must not be called from network thread' DIR diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py t@@ -30,6 +30,7 @@ import logging from aiorpcx import TaskGroup, run_in_thread, RPCError +from . import util from .transaction import Transaction, PartialTransaction from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer from .bitcoin import address_to_scripthash, is_address t@@ -227,7 +228,7 @@ class Synchronizer(SynchronizerBase): self.wallet.receive_tx_callback(tx_hash, tx, tx_height) self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}") # callbacks - self.wallet.network.trigger_callback('new_transaction', self.wallet, tx) + util.trigger_callback('new_transaction', self.wallet, tx) async def main(self): self.wallet.set_up_to_date(False) t@@ -252,7 +253,7 @@ class Synchronizer(SynchronizerBase): if up_to_date: self._reset_request_counters() self.wallet.set_up_to_date(up_to_date) - self.wallet.network.trigger_callback('wallet_updated', self.wallet) + util.trigger_callback('wallet_updated', self.wallet) class Notifier(SynchronizerBase): DIR diff --git a/electrum/util.py b/electrum/util.py t@@ -1130,7 +1130,7 @@ class NetworkJobOnDefaultServer(Logger): self._restart_lock = asyncio.Lock() self._reset() asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop) - network.register_callback(self._restart, ['default_server_changed']) + register_callback(self._restart, ['default_server_changed']) def _reset(self): """Initialise fields. Called every time the underlying t@@ -1304,3 +1304,41 @@ def randrange(bound: int) -> int: """Return a random integer k such that 1 <= k < bound, uniformly distributed across that range.""" return ecdsa.util.randrange(bound) + + +class CallbackManager: + # callbacks set by the GUI + def __init__(self): + self.callback_lock = threading.Lock() + self.callbacks = defaultdict(list) # note: needs self.callback_lock + self.asyncio_loop = None + + def register_callback(self, callback, events): + with self.callback_lock: + for event in events: + self.callbacks[event].append(callback) + + def unregister_callback(self, callback): + with self.callback_lock: + for callbacks in self.callbacks.values(): + if callback in callbacks: + callbacks.remove(callback) + + def trigger_callback(self, event, *args): + if self.asyncio_loop is None: + self.asyncio_loop = asyncio.get_event_loop() + assert self.asyncio_loop.is_running(), "event loop not running" + with self.callback_lock: + callbacks = self.callbacks[event][:] + for callback in callbacks: + # FIXME: if callback throws, we will lose the traceback + if asyncio.iscoroutinefunction(callback): + asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) + else: + self.asyncio_loop.call_soon_threadsafe(callback, event, *args) + + +callback_mgr = CallbackManager() +trigger_callback = callback_mgr.trigger_callback +register_callback = callback_mgr.register_callback +unregister_callback = callback_mgr.unregister_callback