URI: 
       tfix most "scripts" - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit e37da62a1ce68ed2374ddc18796a89bd21529b70
   DIR parent 5c4a6c0f2b531bd84d4a0c5dad91ace4b8ad3596
  HTML Author: SomberNight <somber.night@protonmail.com>
       Date:   Fri,  2 Nov 2018 20:14:59 +0100
       
       fix most "scripts"
       
       related: #4754
       
       Diffstat:
         M electrum/bitcoin.py                 |      24 +++++++++++++++---------
         M electrum/blockchain.py              |       6 +++++-
         M electrum/daemon.py                  |      26 +++-----------------------
         M electrum/network.py                 |      11 +++++++----
         M electrum/scripts/bip70.py           |       1 +
         M electrum/scripts/block_headers.py   |      38 +++++++++++++++++--------------
         M electrum/scripts/estimate_fee.py    |      32 ++++++++++++++++++++++++++-----
         M electrum/scripts/get_history.py     |      26 +++++++++++++++++++-------
         M electrum/scripts/peers.py           |      30 ++++++++++++++++++++++--------
         M electrum/scripts/servers.py         |      31 ++++++++++++++++++++++++-------
         M electrum/scripts/txradar.py         |      38 +++++++++++++++++++++++--------
         M electrum/scripts/util.py            |     119 ++++++++++---------------------
         M electrum/scripts/watch_address.py   |      48 ++++++++++++++++++-------------
         M electrum/synchronizer.py            |       5 +++--
         M electrum/util.py                    |      25 ++++++++++++++++++++++++-
       
       15 files changed, 266 insertions(+), 194 deletions(-)
       ---
   DIR diff --git a/electrum/bitcoin.py b/electrum/bitcoin.py
       t@@ -284,13 +284,15 @@ def script_to_address(script, *, net=None):
            assert t == TYPE_ADDRESS
            return addr
        
       -def address_to_script(addr, *, net=None):
       +def address_to_script(addr: str, *, net=None) -> str:
            if net is None:
                net = constants.net
       +    if not is_address(addr, net=net):
       +        raise BitcoinException(f"invalid bitcoin address: {addr}")
            witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr)
            if witprog is not None:
                if not (0 <= witver <= 16):
       -            raise BitcoinException('impossible witness version: {}'.format(witver))
       +            raise BitcoinException(f'impossible witness version: {witver}')
                OP_n = witver + 0x50 if witver > 0 else 0
                script = bh2u(bytes([OP_n]))
                script += push_script(bh2u(bytes(witprog)))
       t@@ -305,7 +307,7 @@ def address_to_script(addr, *, net=None):
                script += push_script(bh2u(hash_160_))
                script += '87'                                       # op_equal
            else:
       -        raise BitcoinException('unknown address type: {}'.format(addrtype))
       +        raise BitcoinException(f'unknown address type: {addrtype}')
            return script
        
        def address_to_scripthash(addr):
       t@@ -491,24 +493,28 @@ def address_from_private_key(sec):
            public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
            return pubkey_to_address(txin_type, public_key)
        
       -def is_segwit_address(addr):
       +def is_segwit_address(addr, *, net=None):
       +    if net is None: net = constants.net
            try:
       -        witver, witprog = segwit_addr.decode(constants.net.SEGWIT_HRP, addr)
       +        witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr)
            except Exception as e:
                return False
            return witprog is not None
        
       -def is_b58_address(addr):
       +def is_b58_address(addr, *, net=None):
       +    if net is None: net = constants.net
            try:
                addrtype, h = b58_address_to_hash160(addr)
            except Exception as e:
                return False
       -    if addrtype not in [constants.net.ADDRTYPE_P2PKH, constants.net.ADDRTYPE_P2SH]:
       +    if addrtype not in [net.ADDRTYPE_P2PKH, net.ADDRTYPE_P2SH]:
                return False
            return addr == hash160_to_b58_address(h, addrtype)
        
       -def is_address(addr):
       -    return is_segwit_address(addr) or is_b58_address(addr)
       +def is_address(addr, *, net=None):
       +    if net is None: net = constants.net
       +    return is_segwit_address(addr, net=net) \
       +           or is_b58_address(addr, net=net)
        
        
        def is_private_key(key):
   DIR diff --git a/electrum/blockchain.py b/electrum/blockchain.py
       t@@ -72,7 +72,11 @@ def hash_header(header: dict) -> str:
                return '0' * 64
            if header.get('prev_block_hash') is None:
                header['prev_block_hash'] = '00'*32
       -    return hash_encode(sha256d(bfh(serialize_header(header))))
       +    return hash_raw_header(serialize_header(header))
       +
       +
       +def hash_raw_header(header: str) -> str:
       +    return hash_encode(sha256d(bfh(header)))
        
        
        blockchains = {}  # type: Dict[int, Blockchain]
   DIR diff --git a/electrum/daemon.py b/electrum/daemon.py
       t@@ -30,15 +30,14 @@ import traceback
        import sys
        import threading
        from typing import Dict, Optional, Tuple
       -import re
        
        import jsonrpclib
        
        from .jsonrpc import VerifyingJSONRPCServer
        from .version import ELECTRUM_VERSION
        from .network import Network
       -from .util import json_decode, DaemonThread
       -from .util import print_error, to_string
       +from .util import (json_decode, DaemonThread, print_error, to_string,
       +                   create_and_start_event_loop)
        from .wallet import Wallet, Abstract_Wallet
        from .storage import WalletStorage
        from .commands import known_commands, Commands
       t@@ -128,7 +127,7 @@ class Daemon(DaemonThread):
                if fd is None and listen_jsonrpc:
                    fd, server = get_fd_or_server(config)
                    if fd is None: raise Exception('failed to lock daemon; already running?')
       -        self.create_and_start_event_loop()
       +        self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
                if config.get('offline'):
                    self.network = None
                else:
       t@@ -330,22 +329,3 @@ class Daemon(DaemonThread):
                except BaseException as e:
                    traceback.print_exc(file=sys.stdout)
                    # app will exit now
       -
       -    def create_and_start_event_loop(self):
       -        def on_exception(loop, context):
       -            """Suppress spurious messages it appears we cannot control."""
       -            SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
       -                                                'SSL error in data received')
       -            message = context.get('message')
       -            if message and SUPPRESS_MESSAGE_REGEX.match(message):
       -                return
       -            loop.default_exception_handler(context)
       -
       -        self.asyncio_loop = asyncio.get_event_loop()
       -        self.asyncio_loop.set_exception_handler(on_exception)
       -        # self.asyncio_loop.set_debug(1)
       -        self._stop_loop = asyncio.Future()
       -        self._loop_thread = threading.Thread(target=self.asyncio_loop.run_until_complete,
       -                                             args=(self._stop_loop,),
       -                                             name='EventLoop')
       -        self._loop_thread.start()
   DIR diff --git a/electrum/network.py b/electrum/network.py
       t@@ -32,7 +32,7 @@ import json
        import sys
        import ipaddress
        import asyncio
       -from typing import NamedTuple, Optional, Sequence, List, Dict
       +from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple
        import traceback
        
        import dns
       t@@ -53,7 +53,7 @@ NODES_RETRY_INTERVAL = 60
        SERVER_RETRY_INTERVAL = 10
        
        
       -def parse_servers(result):
       +def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
            """ parse servers list into dict format"""
            servers = {}
            for item in result:
       t@@ -170,6 +170,7 @@ class Network(PrintError):
                INSTANCE = self
        
                self.asyncio_loop = asyncio.get_event_loop()
       +        assert self.asyncio_loop.is_running(), "event loop not running"
                self._loop_thread = None  # type: threading.Thread  # set by caller; only used for sanity checks
        
                if config is None:
       t@@ -225,6 +226,8 @@ class Network(PrintError):
                self.server_queue = None
                self.proxy = None
        
       +        self._set_status('disconnected')
       +
            def run_from_another_thread(self, coro):
                assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
                fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
       t@@ -411,10 +414,10 @@ class Network(PrintError):
                    out = filter_noonion(out)
                return out
        
       -    def _start_interface(self, server):
       +    def _start_interface(self, server: str):
                if server not in self.interfaces and server not in self.connecting:
                    if server == self.default_server:
       -                self.print_error("connecting to %s as new interface" % server)
       +                self.print_error(f"connecting to {server} as new interface")
                        self._set_status('connecting')
                    self.connecting.add(server)
                    self.server_queue.put(server)
   DIR diff --git a/electrum/scripts/bip70.py b/electrum/scripts/bip70.py
       t@@ -1,5 +1,6 @@
        #!/usr/bin/env python3
        # create a BIP70 payment request signed with a certificate
       +# FIXME: the code here is outdated, and no longer working
        
        import tlslite
        
   DIR diff --git a/electrum/scripts/block_headers.py b/electrum/scripts/block_headers.py
       t@@ -3,29 +3,33 @@
        # A simple script that connects to a server and displays block headers
        
        import time
       -import sys
       +import asyncio
        
       -from .. import SimpleConfig, Network
       -from electrum.util import print_msg, json_encode
       +from electrum.network import Network
       +from electrum.util import print_msg, json_encode, create_and_start_event_loop, log_exceptions
        
        # start network
       -c = SimpleConfig()
       -network = Network(c)
       +loop, stopping_fut, loop_thread = create_and_start_event_loop()
       +network = Network()
        network.start()
        
        # wait until connected
       -while network.is_connecting():
       -    time.sleep(0.1)
       +while not network.is_connected():
       +    time.sleep(1)
       +    print_msg("waiting for network to get connected...")
        
       -if not network.is_connected():
       -    print_msg("daemon is not connected")
       -    sys.exit(1)
       +header_queue = asyncio.Queue()
        
       -# 2. send the subscription
       -callback = lambda response: print_msg(json_encode(response.get('result')))
       -network.send([('server.version',["block_headers script", "1.2"])], callback)
       -network.subscribe_to_headers(callback)
       +@log_exceptions
       +async def f():
       +    try:
       +        await network.interface.session.subscribe('blockchain.headers.subscribe', [], header_queue)
       +        # 3. wait for results
       +        while network.is_connected():
       +            header = await header_queue.get()
       +            print_msg(json_encode(header))
       +    finally:
       +        stopping_fut.set_result(1)
        
       -# 3. wait for results
       -while network.is_connected():
       -    time.sleep(1)
       +# 2. send the subscription
       +asyncio.run_coroutine_threadsafe(f(), loop)
   DIR diff --git a/electrum/scripts/estimate_fee.py b/electrum/scripts/estimate_fee.py
       t@@ -1,7 +1,29 @@
        #!/usr/bin/env python3
       -from . import util
        import json
       -from electrum.network import filter_protocol
       -peers = filter_protocol(util.get_peers())
       -results = util.send_request(peers, 'blockchain.estimatefee', [2])
       -print(json.dumps(results, indent=4))
       +import asyncio
       +from statistics import median
       +from numbers import Number
       +
       +from electrum.network import filter_protocol, Network
       +from electrum.util import create_and_start_event_loop, log_exceptions
       +
       +import util
       +
       +
       +loop, stopping_fut, loop_thread = create_and_start_event_loop()
       +network = Network()
       +network.start()
       +
       +@log_exceptions
       +async def f():
       +    try:
       +        peers = await util.get_peers(network)
       +        peers = filter_protocol(peers)
       +        results = await util.send_request(network, peers, 'blockchain.estimatefee', [2])
       +        print(json.dumps(results, indent=4))
       +        feerate_estimates = filter(lambda x: isinstance(x, Number), results.values())
       +        print(f"median feerate: {median(feerate_estimates)}")
       +    finally:
       +        stopping_fut.set_result(1)
       +
       +asyncio.run_coroutine_threadsafe(f(), loop)
   DIR diff --git a/electrum/scripts/get_history.py b/electrum/scripts/get_history.py
       t@@ -1,9 +1,12 @@
        #!/usr/bin/env python3
        
        import sys
       -from .. import Network
       -from electrum.util import json_encode, print_msg
       +import asyncio
       +
        from electrum import bitcoin
       +from electrum.network import Network
       +from electrum.util import json_encode, print_msg, create_and_start_event_loop, log_exceptions
       +
        
        try:
            addr = sys.argv[1]
       t@@ -11,8 +14,17 @@ except Exception:
            print("usage: get_history <bitcoin_address>")
            sys.exit(1)
        
       -n = Network()
       -n.start()
       -_hash = bitcoin.address_to_scripthash(addr)
       -h = n.get_history_for_scripthash(_hash)
       -print_msg(json_encode(h))
       +loop, stopping_fut, loop_thread = create_and_start_event_loop()
       +network = Network()
       +network.start()
       +
       +@log_exceptions
       +async def f():
       +    try:
       +        sh = bitcoin.address_to_scripthash(addr)
       +        hist = await network.get_history_for_scripthash(sh)
       +        print_msg(json_encode(hist))
       +    finally:
       +        stopping_fut.set_result(1)
       +
       +asyncio.run_coroutine_threadsafe(f(), loop)
   DIR diff --git a/electrum/scripts/peers.py b/electrum/scripts/peers.py
       t@@ -1,14 +1,28 @@
        #!/usr/bin/env python3
       +import asyncio
        
       -from . import util
       +from electrum.network import filter_protocol, Network
       +from electrum.util import create_and_start_event_loop, log_exceptions
       +from electrum.blockchain import hash_raw_header
        
       -from electrum.network import filter_protocol
       -from electrum.blockchain import hash_header
       +import util
        
       -peers = util.get_peers()
       -peers = filter_protocol(peers, 's')
        
       -results = util.send_request(peers, 'blockchain.headers.subscribe', [])
       +loop, stopping_fut, loop_thread = create_and_start_event_loop()
       +network = Network()
       +network.start()
        
       -for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')):
       -    print("%60s"%n, v.get('block_height'), hash_header(v))
       +@log_exceptions
       +async def f():
       +    try:
       +        peers = await util.get_peers(network)
       +        peers = filter_protocol(peers, 's')
       +        results = await util.send_request(network, peers, 'blockchain.headers.subscribe', [])
       +        for server, header in sorted(results.items(), key=lambda x: x[1].get('height')):
       +            height = header.get('height')
       +            blockhash = hash_raw_header(header.get('hex'))
       +            print("%60s" % server, height, blockhash)
       +    finally:
       +        stopping_fut.set_result(1)
       +
       +asyncio.run_coroutine_threadsafe(f(), loop)
   DIR diff --git a/electrum/scripts/servers.py b/electrum/scripts/servers.py
       t@@ -1,10 +1,27 @@
        #!/usr/bin/env python3
       -
       -from .. import set_verbosity
       -from electrum.network import filter_version
       -from . import util
        import json
       -set_verbosity(False)
       +import asyncio
       +
       +from electrum.network import filter_version, Network
       +from electrum.util import create_and_start_event_loop, log_exceptions
       +from electrum import constants
       +
       +import util
       +
       +
       +#constants.set_testnet()
       +
       +loop, stopping_fut, loop_thread = create_and_start_event_loop()
       +network = Network()
       +network.start()
       +
       +@log_exceptions
       +async def f():
       +    try:
       +        peers = await util.get_peers(network)
       +        peers = filter_version(peers)
       +        print(json.dumps(peers, sort_keys=True, indent=4))
       +    finally:
       +        stopping_fut.set_result(1)
        
       -servers = filter_version(util.get_peers())
       -print(json.dumps(servers, sort_keys = True, indent = 4))
       +asyncio.run_coroutine_threadsafe(f(), loop)
   DIR diff --git a/electrum/scripts/txradar.py b/electrum/scripts/txradar.py
       t@@ -1,20 +1,38 @@
        #!/usr/bin/env python3
       -from . import util
        import sys
       +import asyncio
       +
       +from electrum.network import filter_protocol, Network
       +from electrum.util import create_and_start_event_loop, log_exceptions
       +
       +import util
       +
       +
        try:
       -    tx = sys.argv[1]
       +    txid = sys.argv[1]
        except:
            print("usage: txradar txid")
            sys.exit(1)
        
       -peers = util.get_peers()
       -results = util.send_request(peers, 'blockchain.transaction.get', [tx])
        
       -r1 = []
       -r2 = []
       +loop, stopping_fut, loop_thread = create_and_start_event_loop()
       +network = Network()
       +network.start()
        
       -for k, v in results.items():
       -    (r1 if v else r2).append(k)
       +@log_exceptions
       +async def f():
       +    try:
       +        peers = await util.get_peers(network)
       +        peers = filter_protocol(peers, 's')
       +        results = await util.send_request(network, peers, 'blockchain.transaction.get', [txid])
       +        r1, r2 = [], []
       +        for k, v in results.items():
       +            (r1 if not isinstance(v, Exception) else r2).append(k)
       +        print(f"Received {len(results)} answers")
       +        try: propagation = len(r1) * 100. / (len(r1) + len(r2))
       +        except ZeroDivisionError: propagation = 0
       +        print(f"Propagation rate: {propagation:.1f} percent")
       +    finally:
       +        stopping_fut.set_result(1)
        
       -print("Received %d answers"%len(results))
       -print("Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2))))
       +asyncio.run_coroutine_threadsafe(f(), loop)
   DIR diff --git a/electrum/scripts/util.py b/electrum/scripts/util.py
       t@@ -1,87 +1,46 @@
       -import select, time, queue
       -# import electrum
       -from .. import Connection, Interface, SimpleConfig
       +import asyncio
       +from typing import List, Sequence
        
       -from electrum.network import parse_servers
       -from collections import defaultdict
       +from aiorpcx import TaskGroup
       +
       +from electrum.network import parse_servers, Network
       +from electrum.interface import Interface
        
       -# electrum.util.set_verbosity(1)
       -def get_interfaces(servers, timeout=10):
       -    '''Returns a map of servers to connected interfaces.  If any
       -    connections fail or timeout, they will be missing from the map.
       -    '''
       -    assert type(servers) is list
       -    socket_queue = queue.Queue()
       -    config = SimpleConfig()
       -    connecting = {}
       -    for server in servers:
       -        if server not in connecting:
       -            connecting[server] = Connection(server, socket_queue, config.path)
       -    interfaces = {}
       -    timeout = time.time() + timeout
       -    count = 0
       -    while time.time() < timeout and count < len(servers):
       -        try:
       -            server, socket = socket_queue.get(True, 0.3)
       -        except queue.Empty:
       -            continue
       -        if socket:
       -            interfaces[server] = Interface(server, socket)
       -        count += 1
       -    return interfaces
        
       -def wait_on_interfaces(interfaces, timeout=10):
       -    '''Return a map of servers to a list of (request, response) tuples.
       -    Waits timeout seconds, or until each interface has a response'''
       -    result = defaultdict(list)
       -    timeout = time.time() + timeout
       -    while len(result) < len(interfaces) and time.time() < timeout:
       -        rin = [i for i in interfaces.values()]
       -        win = [i for i in interfaces.values() if i.unsent_requests]
       -        rout, wout, xout = select.select(rin, win, [], 1)
       -        for interface in wout:
       -            interface.send_requests()
       -        for interface in rout:
       -            responses = interface.get_responses()
       -            if responses:
       -                result[interface.server].extend(responses)
       -    return result
       +#electrum.util.set_verbosity(True)
        
       -def get_peers():
       -    config = SimpleConfig()
       -    peers = {}
       -    # 1. get connected interfaces
       -    server = config.get('server')
       -    if server is None:
       -        print("You need to set a secure server, for example (for mainnet): 'electrum setconfig server helicarrier.bauerj.eu:50002:s'")
       -        return []
       -    interfaces = get_interfaces([server])
       -    if not interfaces:
       -        print("No connection to", server)
       -        return []
       -    # 2. get list of peers
       -    interface = interfaces[server]
       -    interface.queue_request('server.peers.subscribe', [], 0)
       -    responses = wait_on_interfaces(interfaces).get(server)
       -    if responses:
       -        response = responses[0][1]  # One response, (req, response) tuple
       -        peers = parse_servers(response.get('result'))
       +async def get_peers(network: Network):
       +    while not network.is_connected():
       +        await asyncio.sleep(1)
       +    interface = network.interface
       +    session = interface.session
       +    print(f"asking server {interface.server} for its peers")
       +    peers = parse_servers(await session.send_request('server.peers.subscribe'))
       +    print(f"got {len(peers)} servers")
            return peers
        
        
       -def send_request(peers, method, params):
       -    print("Contacting %d servers"%len(peers))
       -    interfaces = get_interfaces(peers)
       -    print("%d servers could be reached" % len(interfaces))
       -    for peer in peers:
       -        if not peer in interfaces:
       -            print("Connection failed:", peer)
       -    for msg_id, i in enumerate(interfaces.values()):
       -        i.queue_request(method, params, msg_id)
       -    responses = wait_on_interfaces(interfaces)
       -    for peer in interfaces:
       -        if not peer in responses:
       -            print(peer, "did not answer")
       -    results = dict(zip(responses.keys(), [t[0][1].get('result') for t in responses.values()]))
       -    print("%d answers"%len(results))
       -    return results
       +async def send_request(network: Network, servers: List[str], method: str, params: Sequence):
       +    print(f"contacting {len(servers)} servers")
       +    num_connecting = len(network.connecting)
       +    for server in servers:
       +        network._start_interface(server)
       +    # sleep a bit
       +    for _ in range(10):
       +        if len(network.connecting) < num_connecting:
       +            break
       +        await asyncio.sleep(1)
       +    print(f"connected to {len(network.interfaces)} servers. sending request to all.")
       +    responses = dict()
       +    async def get_response(iface: Interface):
       +        try:
       +            res = await iface.session.send_request(method, params, timeout=10)
       +        except Exception as e:
       +            print(f"server {iface.server} errored or timed out: ({repr(e)})")
       +            res = e
       +        responses[iface.server] = res
       +    async with TaskGroup() as group:
       +        for interface in network.interfaces.values():
       +            await group.spawn(get_response(interface))
       +    print("%d answers" % len(responses))
       +    return responses
   DIR diff --git a/electrum/scripts/watch_address.py b/electrum/scripts/watch_address.py
       t@@ -1,10 +1,12 @@
        #!/usr/bin/env python3
        
        import sys
       -import time
       -from electrum import bitcoin
       -from .. import SimpleConfig, Network
       -from electrum.util import print_msg, json_encode
       +import asyncio
       +
       +from electrum.network import Network
       +from electrum.util import print_msg, create_and_start_event_loop
       +from electrum.synchronizer import SynchronizerBase
       +
        
        try:
            addr = sys.argv[1]
       t@@ -12,25 +14,31 @@ except Exception:
            print("usage: watch_address <bitcoin_address>")
            sys.exit(1)
        
       -sh = bitcoin.address_to_scripthash(addr)
       -
        # start network
       -c = SimpleConfig()
       -network = Network(c)
       +loop = create_and_start_event_loop()[0]
       +network = Network()
        network.start()
        
       -# wait until connected
       -while network.is_connecting():
       -    time.sleep(0.1)
        
       -if not network.is_connected():
       -    print_msg("daemon is not connected")
       -    sys.exit(1)
       +class Notifier(SynchronizerBase):
       +    def __init__(self, network):
       +        SynchronizerBase.__init__(self, network)
       +        self.watched_addresses = set()
       +        self.watch_queue = asyncio.Queue()
       +
       +    async def main(self):
       +        # resend existing subscriptions if we were restarted
       +        for addr in self.watched_addresses:
       +            await self._add_address(addr)
       +        # main loop
       +        while True:
       +            addr = await self.watch_queue.get()
       +            self.watched_addresses.add(addr)
       +            await self._add_address(addr)
       +
       +    async def _on_address_status(self, addr, status):
       +        print_msg(f"addr {addr}, status {status}")
        
       -# 2. send the subscription
       -callback = lambda response: print_msg(json_encode(response.get('result')))
       -network.subscribe_to_address(addr, callback)
        
       -# 3. wait for results
       -while network.is_connected():
       -    time.sleep(1)
       +notifier = Notifier(network)
       +asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop)
   DIR diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py
       t@@ -31,7 +31,7 @@ from aiorpcx import TaskGroup, run_in_thread
        
        from .transaction import Transaction
        from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer
       -from .bitcoin import address_to_scripthash
       +from .bitcoin import address_to_scripthash, is_address
        
        if TYPE_CHECKING:
            from .network import Network
       t@@ -77,7 +77,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
            def add(self, addr):
                asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
        
       -    async def _add_address(self, addr):
       +    async def _add_address(self, addr: str):
       +        if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}")
                if addr in self.requested_addrs: return
                self.requested_addrs.add(addr)
                await self.add_queue.put(addr)
   DIR diff --git a/electrum/util.py b/electrum/util.py
       t@@ -278,7 +278,7 @@ class DaemonThread(threading.Thread, PrintError):
                self.print_error("stopped")
        
        
       -verbosity = '*'
       +verbosity = ''
        def set_verbosity(filters: Union[str, bool]):
            global verbosity
            if type(filters) is bool:  # backwards compat
       t@@ -983,3 +983,26 @@ class NetworkJobOnDefaultServer(PrintError):
                s = self.interface.session
                assert s is not None
                return s
       +
       +
       +def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
       +                                           asyncio.Future,
       +                                           threading.Thread]:
       +    def on_exception(loop, context):
       +        """Suppress spurious messages it appears we cannot control."""
       +        SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
       +                                            'SSL error in data received')
       +        message = context.get('message')
       +        if message and SUPPRESS_MESSAGE_REGEX.match(message):
       +            return
       +        loop.default_exception_handler(context)
       +
       +    loop = asyncio.get_event_loop()
       +    loop.set_exception_handler(on_exception)
       +    # loop.set_debug(1)
       +    stopping_fut = asyncio.Future()
       +    loop_thread = threading.Thread(target=loop.run_until_complete,
       +                                         args=(stopping_fut,),
       +                                         name='EventLoop')
       +    loop_thread.start()
       +    return loop, stopping_fut, loop_thread