tnetwork.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tnetwork.py (58124B)
---
1 # Electrum - Lightweight Bitcoin Client
2 # Copyright (c) 2011-2016 Thomas Voegtlin
3 #
4 # Permission is hereby granted, free of charge, to any person
5 # obtaining a copy of this software and associated documentation files
6 # (the "Software"), to deal in the Software without restriction,
7 # including without limitation the rights to use, copy, modify, merge,
8 # publish, distribute, sublicense, and/or sell copies of the Software,
9 # and to permit persons to whom the Software is furnished to do so,
10 # subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be
13 # included in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 # SOFTWARE.
23 import asyncio
24 import time
25 import queue
26 import os
27 import random
28 import re
29 from collections import defaultdict
30 import threading
31 import socket
32 import json
33 import sys
34 import asyncio
35 from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple, TYPE_CHECKING, Iterable, Set, Any
36 import traceback
37 import concurrent
38 from concurrent import futures
39 import copy
40 import functools
41
42 import aiorpcx
43 from aiorpcx import TaskGroup, ignore_after
44 from aiohttp import ClientResponse
45
46 from . import util
47 from .util import (log_exceptions, ignore_exceptions,
48 bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter,
49 is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
50 nullcontext)
51 from .bitcoin import COIN
52 from . import constants
53 from . import blockchain
54 from . import bitcoin
55 from . import dns_hacks
56 from .transaction import Transaction
57 from .blockchain import Blockchain, HEADER_SIZE
58 from .interface import (Interface, PREFERRED_NETWORK_PROTOCOL,
59 RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS,
60 NetworkException, RequestCorrupted, ServerAddr)
61 from .version import PROTOCOL_VERSION
62 from .simple_config import SimpleConfig
63 from .i18n import _
64 from .logging import get_logger, Logger
65 from .lnutil import ChannelBlackList
66
67 if TYPE_CHECKING:
68 from .channel_db import ChannelDB
69 from .lnworker import LNGossip
70 from .lnwatcher import WatchTower
71 from .daemon import Daemon
72
73
74 _logger = get_logger(__name__)
75
76
77 NUM_TARGET_CONNECTED_SERVERS = 10
78 NUM_STICKY_SERVERS = 4
79 NUM_RECENT_SERVERS = 20
80
81
82 def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]:
83 """Convert servers list (from protocol method "server.peers.subscribe") into dict format.
84 Also validate values, such as IP addresses and ports.
85 """
86 servers = {}
87 for item in result:
88 host = item[1]
89 out = {}
90 version = None
91 pruning_level = '-'
92 if len(item) > 2:
93 for v in item[2]:
94 if re.match(r"[st]\d*", v):
95 protocol, port = v[0], v[1:]
96 if port == '': port = constants.net.DEFAULT_PORTS[protocol]
97 ServerAddr(host, port, protocol=protocol) # check if raises
98 out[protocol] = port
99 elif re.match("v(.?)+", v):
100 version = v[1:]
101 elif re.match(r"p\d*", v):
102 pruning_level = v[1:]
103 if pruning_level == '': pruning_level = '0'
104 if out:
105 out['pruning'] = pruning_level
106 out['version'] = version
107 servers[host] = out
108 return servers
109
110
111 def filter_version(servers):
112 def is_recent(version):
113 try:
114 return util.versiontuple(version) >= util.versiontuple(PROTOCOL_VERSION)
115 except Exception as e:
116 return False
117 return {k: v for k, v in servers.items() if is_recent(v.get('version'))}
118
119
120 def filter_noonion(servers):
121 return {k: v for k, v in servers.items() if not k.endswith('.onion')}
122
123
124 def filter_protocol(hostmap, *, allowed_protocols: Iterable[str] = None) -> Sequence[ServerAddr]:
125 """Filters the hostmap for those implementing protocol."""
126 if allowed_protocols is None:
127 allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
128 eligible = []
129 for host, portmap in hostmap.items():
130 for protocol in allowed_protocols:
131 port = portmap.get(protocol)
132 if port:
133 eligible.append(ServerAddr(host, port, protocol=protocol))
134 return eligible
135
136
137 def pick_random_server(hostmap=None, *, allowed_protocols: Iterable[str],
138 exclude_set: Set[ServerAddr] = None) -> Optional[ServerAddr]:
139 if hostmap is None:
140 hostmap = constants.net.DEFAULT_SERVERS
141 if exclude_set is None:
142 exclude_set = set()
143 servers = set(filter_protocol(hostmap, allowed_protocols=allowed_protocols))
144 eligible = list(servers - exclude_set)
145 return random.choice(eligible) if eligible else None
146
147
148 class NetworkParameters(NamedTuple):
149 server: ServerAddr
150 proxy: Optional[dict]
151 auto_connect: bool
152 oneserver: bool = False
153
154
155 proxy_modes = ['socks4', 'socks5']
156
157
158 def serialize_proxy(p):
159 if not isinstance(p, dict):
160 return None
161 return ':'.join([p.get('mode'), p.get('host'), p.get('port'),
162 p.get('user', ''), p.get('password', '')])
163
164
165 def deserialize_proxy(s: str) -> Optional[dict]:
166 if not isinstance(s, str):
167 return None
168 if s.lower() == 'none':
169 return None
170 proxy = { "mode":"socks5", "host":"localhost" }
171 # FIXME raw IPv6 address fails here
172 args = s.split(':')
173 n = 0
174 if proxy_modes.count(args[n]) == 1:
175 proxy["mode"] = args[n]
176 n += 1
177 if len(args) > n:
178 proxy["host"] = args[n]
179 n += 1
180 if len(args) > n:
181 proxy["port"] = args[n]
182 n += 1
183 else:
184 proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
185 if len(args) > n:
186 proxy["user"] = args[n]
187 n += 1
188 if len(args) > n:
189 proxy["password"] = args[n]
190 return proxy
191
192
193 class BestEffortRequestFailed(NetworkException): pass
194
195
196 class TxBroadcastError(NetworkException):
197 def get_message_for_gui(self):
198 raise NotImplementedError()
199
200
201 class TxBroadcastHashMismatch(TxBroadcastError):
202 def get_message_for_gui(self):
203 return "{}\n{}\n\n{}" \
204 .format(_("The server returned an unexpected transaction ID when broadcasting the transaction."),
205 _("Consider trying to connect to a different server, or updating Electrum."),
206 str(self))
207
208
209 class TxBroadcastServerReturnedError(TxBroadcastError):
210 def get_message_for_gui(self):
211 return "{}\n{}\n\n{}" \
212 .format(_("The server returned an error when broadcasting the transaction."),
213 _("Consider trying to connect to a different server, or updating Electrum."),
214 str(self))
215
216
217 class TxBroadcastUnknownError(TxBroadcastError):
218 def get_message_for_gui(self):
219 return "{}\n{}" \
220 .format(_("Unknown error when broadcasting the transaction."),
221 _("Consider trying to connect to a different server, or updating Electrum."))
222
223
224 class UntrustedServerReturnedError(NetworkException):
225 def __init__(self, *, original_exception):
226 self.original_exception = original_exception
227
228 def get_message_for_gui(self) -> str:
229 return str(self)
230
231 def __str__(self):
232 return _("The server returned an error.")
233
234 def __repr__(self):
235 return (f"<UntrustedServerReturnedError "
236 f"[DO NOT TRUST THIS MESSAGE] original_exception: {repr(self.original_exception)}>")
237
238
239 _INSTANCE = None
240
241
242 class Network(Logger, NetworkRetryManager[ServerAddr]):
243 """The Network class manages a set of connections to remote electrum
244 servers, each connected socket is handled by an Interface() object.
245 """
246
247 LOGGING_SHORTCUT = 'n'
248
249 taskgroup: Optional[TaskGroup]
250 interface: Optional[Interface]
251 interfaces: Dict[ServerAddr, Interface]
252 _connecting_ifaces: Set[ServerAddr]
253 _closing_ifaces: Set[ServerAddr]
254 default_server: ServerAddr
255 _recent_servers: List[ServerAddr]
256
257 channel_blacklist: 'ChannelBlackList'
258 channel_db: Optional['ChannelDB'] = None
259 lngossip: Optional['LNGossip'] = None
260 local_watchtower: Optional['WatchTower'] = None
261
262 def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
263 global _INSTANCE
264 assert _INSTANCE is None, "Network is a singleton!"
265 _INSTANCE = self
266
267 Logger.__init__(self)
268 NetworkRetryManager.__init__(
269 self,
270 max_retry_delay_normal=600,
271 init_retry_delay_normal=15,
272 max_retry_delay_urgent=10,
273 init_retry_delay_urgent=1,
274 )
275
276 self.asyncio_loop = asyncio.get_event_loop()
277 assert self.asyncio_loop.is_running(), "event loop not running"
278 try:
279 self._loop_thread = self.asyncio_loop._mythread # type: threading.Thread # only used for sanity checks
280 except AttributeError as e:
281 self.logger.warning(f"asyncio loop does not have _mythread set: {e!r}")
282 self._loop_thread = None
283
284 assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
285 self.config = config
286
287 self.daemon = daemon
288
289 blockchain.read_blockchains(self.config)
290 blockchain.init_headers_file_for_best_chain()
291 self.logger.info(f"blockchains {list(map(lambda b: b.forkpoint, blockchain.blockchains.values()))}")
292 self._blockchain_preferred_block = self.config.get('blockchain_preferred_block', None) # type: Dict[str, Any]
293 if self._blockchain_preferred_block is None:
294 self._set_preferred_chain(None)
295 self._blockchain = blockchain.get_best_chain()
296
297 self._allowed_protocols = {PREFERRED_NETWORK_PROTOCOL}
298
299 # Server for addresses and transactions
300 self.default_server = self.config.get('server', None)
301 # Sanitize default server
302 if self.default_server:
303 try:
304 self.default_server = ServerAddr.from_str(self.default_server)
305 except:
306 self.logger.warning('failed to parse server-string; falling back to localhost:1:s.')
307 self.default_server = ServerAddr.from_str("localhost:1:s")
308 else:
309 self.default_server = pick_random_server(allowed_protocols=self._allowed_protocols)
310 assert isinstance(self.default_server, ServerAddr), f"invalid type for default_server: {self.default_server!r}"
311
312 self.taskgroup = None
313
314 # locks
315 self.restart_lock = asyncio.Lock()
316 self.bhi_lock = asyncio.Lock()
317 self.recent_servers_lock = threading.RLock() # <- re-entrant
318 self.interfaces_lock = threading.Lock() # for mutating/iterating self.interfaces
319
320 self.server_peers = {} # returned by interface (servers that the main interface knows about)
321 self._recent_servers = self._read_recent_servers() # note: needs self.recent_servers_lock
322
323 self.banner = ''
324 self.donation_address = ''
325 self.relay_fee = None # type: Optional[int]
326
327 dir_path = os.path.join(self.config.path, 'certs')
328 util.make_dir(dir_path)
329
330 # the main server we are currently communicating with
331 self.interface = None
332 self.default_server_changed_event = asyncio.Event()
333 # Set of servers we have an ongoing connection with.
334 # For any ServerAddr, at most one corresponding Interface object
335 # can exist at any given time. Depending on the state of that Interface,
336 # the ServerAddr can be found in one of the following sets.
337 # Note: during a transition, the ServerAddr can appear in two sets momentarily.
338 self._connecting_ifaces = set()
339 self.interfaces = {} # these are the ifaces in "initialised and usable" state
340 self._closing_ifaces = set()
341
342 self.auto_connect = self.config.get('auto_connect', True)
343 self.proxy = None
344 self._maybe_set_oneserver()
345
346 # Dump network messages (all interfaces). Set at runtime from the console.
347 self.debug = False
348
349 self._set_status('disconnected')
350 self._has_ever_managed_to_connect_to_server = False
351
352 # lightning network
353 self.channel_blacklist = ChannelBlackList()
354 if self.config.get('run_local_watchtower', False):
355 from . import lnwatcher
356 self.local_watchtower = lnwatcher.WatchTower(self)
357 self.local_watchtower.start_network(self)
358 asyncio.ensure_future(self.local_watchtower.start_watching())
359
360 def has_internet_connection(self) -> bool:
361 """Our guess whether the device has Internet-connectivity."""
362 return self._has_ever_managed_to_connect_to_server
363
364 def has_channel_db(self):
365 return self.channel_db is not None
366
367 def start_gossip(self):
368 from . import lnrouter
369 from . import channel_db
370 from . import lnworker
371 if not self.config.get('use_gossip'):
372 return
373 if self.lngossip is None:
374 self.channel_db = channel_db.ChannelDB(self)
375 self.path_finder = lnrouter.LNPathFinder(self.channel_db)
376 self.channel_db.load_data()
377 self.lngossip = lnworker.LNGossip()
378 self.lngossip.start_network(self)
379
380 async def stop_gossip(self, *, full_shutdown: bool = False):
381 if self.lngossip:
382 await self.lngossip.stop()
383 self.lngossip = None
384 self.channel_db.stop()
385 if full_shutdown:
386 await self.channel_db.stopped_event.wait()
387 self.channel_db = None
388
389 def run_from_another_thread(self, coro, *, timeout=None):
390 assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
391 fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
392 return fut.result(timeout)
393
394 @staticmethod
395 def get_instance() -> Optional["Network"]:
396 return _INSTANCE
397
398 def with_recent_servers_lock(func):
399 def func_wrapper(self, *args, **kwargs):
400 with self.recent_servers_lock:
401 return func(self, *args, **kwargs)
402 return func_wrapper
403
404 def _read_recent_servers(self) -> List[ServerAddr]:
405 if not self.config.path:
406 return []
407 path = os.path.join(self.config.path, "recent_servers")
408 try:
409 with open(path, "r", encoding='utf-8') as f:
410 data = f.read()
411 servers_list = json.loads(data)
412 return [ServerAddr.from_str(s) for s in servers_list]
413 except:
414 return []
415
416 @with_recent_servers_lock
417 def _save_recent_servers(self):
418 if not self.config.path:
419 return
420 path = os.path.join(self.config.path, "recent_servers")
421 s = json.dumps(self._recent_servers, indent=4, sort_keys=True, cls=MyEncoder)
422 try:
423 with open(path, "w", encoding='utf-8') as f:
424 f.write(s)
425 except:
426 pass
427
428 async def _server_is_lagging(self) -> bool:
429 sh = self.get_server_height()
430 if not sh:
431 self.logger.info('no height for main interface')
432 return True
433 lh = self.get_local_height()
434 result = (lh - sh) > 1
435 if result:
436 self.logger.info(f'{self.default_server} is lagging ({sh} vs {lh})')
437 return result
438
439 def _set_status(self, status):
440 self.connection_status = status
441 self.notify('status')
442
443 def is_connected(self):
444 interface = self.interface
445 return interface is not None and interface.ready.done()
446
447 def is_connecting(self):
448 return self.connection_status == 'connecting'
449
450 async def _request_server_info(self, interface: 'Interface'):
451 await interface.ready
452 # TODO: libbitcoin: session = interface.session
453
454 async def get_banner():
455 self.banner = await interface.get_server_banner()
456 self.notify('banner')
457 async def get_donation_address():
458 self.donation_address = await interface.get_donation_address()
459 async def get_server_peers():
460 # ORIG: server_peers = await session.send_request('server.peers.subscribe')
461 # TODO: libbitcoin
462 server_peers = []
463 random.shuffle(server_peers)
464 max_accepted_peers = len(constants.net.DEFAULT_SERVERS) + NUM_RECENT_SERVERS
465 server_peers = server_peers[:max_accepted_peers]
466 # note that 'parse_servers' also validates the data (which is untrusted input!)
467 self.server_peers = parse_servers(server_peers)
468 self.notify('servers')
469 async def get_relay_fee():
470 self.relay_fee = await interface.get_relay_fee()
471
472 async with TaskGroup() as group:
473 await group.spawn(get_banner)
474 await group.spawn(get_donation_address)
475 await group.spawn(get_server_peers)
476 await group.spawn(get_relay_fee)
477 await group.spawn(self._request_fee_estimates(interface))
478
479 async def _request_fee_estimates(self, interface):
480 self.config.requested_fee_estimates()
481 histogram = await interface.get_fee_histogram()
482 self.config.mempool_fees = histogram
483 self.logger.info(f'fee_histogram {histogram}')
484 self.notify('fee_histogram')
485
486 def get_status_value(self, key):
487 if key == 'status':
488 value = self.connection_status
489 elif key == 'banner':
490 value = self.banner
491 elif key == 'fee':
492 value = self.config.fee_estimates
493 elif key == 'fee_histogram':
494 value = self.config.mempool_fees
495 elif key == 'servers':
496 value = self.get_servers()
497 else:
498 raise Exception('unexpected trigger key {}'.format(key))
499 return value
500
501 def notify(self, key):
502 if key in ['status', 'updated']:
503 util.trigger_callback(key)
504 else:
505 util.trigger_callback(key, self.get_status_value(key))
506
507 def get_parameters(self) -> NetworkParameters:
508 return NetworkParameters(server=self.default_server,
509 proxy=self.proxy,
510 auto_connect=self.auto_connect,
511 oneserver=self.oneserver)
512
513 def get_donation_address(self):
514 if self.is_connected():
515 return self.donation_address
516
517 def get_interfaces(self) -> List[ServerAddr]:
518 """The list of servers for the connected interfaces."""
519 with self.interfaces_lock:
520 return list(self.interfaces)
521
522 def get_fee_estimates(self):
523 from statistics import median
524 from .simple_config import FEE_ETA_TARGETS
525 if self.auto_connect:
526 with self.interfaces_lock:
527 out = {}
528 for n in FEE_ETA_TARGETS:
529 try:
530 out[n] = int(median(filter(None, [i.fee_estimates_eta.get(n) for i in self.interfaces.values()])))
531 except:
532 continue
533 return out
534 else:
535 if not self.interface:
536 return {}
537 return self.interface.fee_estimates_eta
538
539 def update_fee_estimates(self):
540 e = self.get_fee_estimates()
541 for nblock_target, fee in e.items():
542 self.config.update_fee_estimates(nblock_target, fee)
543 if not hasattr(self, "_prev_fee_est") or self._prev_fee_est != e:
544 self._prev_fee_est = copy.copy(e)
545 self.logger.info(f'fee_estimates {e}')
546 self.notify('fee')
547
548 @with_recent_servers_lock
549 def get_servers(self):
550 # note: order of sources when adding servers here is crucial!
551 # don't let "server_peers" overwrite anything,
552 # otherwise main server can eclipse the client
553 out = dict()
554 # add servers received from main interface
555 server_peers = self.server_peers
556 if server_peers:
557 out.update(filter_version(server_peers.copy()))
558 # hardcoded servers
559 out.update(constants.net.DEFAULT_SERVERS)
560 # add recent servers
561 for server in self._recent_servers:
562 port = str(server.port)
563 if server.host in out:
564 out[server.host].update({server.protocol: port})
565 else:
566 out[server.host] = {server.protocol: port}
567 # potentially filter out some
568 if self.config.get('noonion'):
569 out = filter_noonion(out)
570 return out
571
572 def _get_next_server_to_try(self) -> Optional[ServerAddr]:
573 now = time.time()
574 with self.interfaces_lock:
575 connected_servers = set(self.interfaces) | self._connecting_ifaces | self._closing_ifaces
576 # First try from recent servers. (which are persisted)
577 # As these are servers we successfully connected to recently, they are
578 # most likely to work. This also makes servers "sticky".
579 # Note: with sticky servers, it is more difficult for an attacker to eclipse the client,
580 # however if they succeed, the eclipsing would persist. To try to balance this,
581 # we only give priority to recent_servers up to NUM_STICKY_SERVERS.
582 with self.recent_servers_lock:
583 recent_servers = list(self._recent_servers)
584 recent_servers = [s for s in recent_servers if s.protocol in self._allowed_protocols]
585 if len(connected_servers & set(recent_servers)) < NUM_STICKY_SERVERS:
586 for server in recent_servers:
587 if server in connected_servers:
588 continue
589 if not self._can_retry_addr(server, now=now):
590 continue
591 return server
592 # try all servers we know about, pick one at random
593 hostmap = self.get_servers()
594 servers = list(set(filter_protocol(hostmap, allowed_protocols=self._allowed_protocols)) - connected_servers)
595 random.shuffle(servers)
596 for server in servers:
597 if not self._can_retry_addr(server, now=now):
598 continue
599 return server
600 return None
601
602 def _set_proxy(self, proxy: Optional[dict]):
603 self.proxy = proxy
604 dns_hacks.configure_dns_depending_on_proxy(bool(proxy))
605 self.logger.info(f'setting proxy {proxy}')
606 util.trigger_callback('proxy_set', self.proxy)
607
608 @log_exceptions
609 async def set_parameters(self, net_params: NetworkParameters):
610 proxy = net_params.proxy
611 proxy_str = serialize_proxy(proxy)
612 server = net_params.server
613 # sanitize parameters
614 try:
615 if proxy:
616 proxy_modes.index(proxy['mode']) + 1
617 int(proxy['port'])
618 except:
619 return
620 self.config.set_key('auto_connect', net_params.auto_connect, False)
621 self.config.set_key('oneserver', net_params.oneserver, False)
622 self.config.set_key('proxy', proxy_str, False)
623 self.config.set_key('server', str(server), True)
624 # abort if changes were not allowed by config
625 if self.config.get('server') != str(server) \
626 or self.config.get('proxy') != proxy_str \
627 or self.config.get('oneserver') != net_params.oneserver:
628 return
629
630 async with self.restart_lock:
631 self.auto_connect = net_params.auto_connect
632 if self.proxy != proxy or self.oneserver != net_params.oneserver:
633 # Restart the network defaulting to the given server
634 await self.stop(full_shutdown=False)
635 self.default_server = server
636 await self._start()
637 elif self.default_server != server:
638 await self.switch_to_interface(server)
639 else:
640 await self.switch_lagging_interface()
641
642 def _maybe_set_oneserver(self) -> None:
643 oneserver = bool(self.config.get('oneserver', False))
644 self.oneserver = oneserver
645 self.num_server = NUM_TARGET_CONNECTED_SERVERS if not oneserver else 0
646
647 async def _switch_to_random_interface(self):
648 '''Switch to a random connected server other than the current one'''
649 servers = self.get_interfaces() # Those in connected state
650 if self.default_server in servers:
651 servers.remove(self.default_server)
652 if servers:
653 await self.switch_to_interface(random.choice(servers))
654
655 async def switch_lagging_interface(self):
656 """If auto_connect and lagging, switch interface (only within fork)."""
657 if self.auto_connect and await self._server_is_lagging():
658 # switch to one that has the correct header (not height)
659 best_header = self.blockchain().header_at_tip()
660 with self.interfaces_lock: interfaces = list(self.interfaces.values())
661 filtered = list(filter(lambda iface: iface.tip_header == best_header, interfaces))
662 if filtered:
663 chosen_iface = random.choice(filtered)
664 await self.switch_to_interface(chosen_iface.server)
665
666 async def switch_unwanted_fork_interface(self) -> None:
667 """If auto_connect, maybe switch to another fork/chain."""
668 if not self.auto_connect or not self.interface:
669 return
670 with self.interfaces_lock: interfaces = list(self.interfaces.values())
671 pref_height = self._blockchain_preferred_block['height']
672 pref_hash = self._blockchain_preferred_block['hash']
673 # shortcut for common case
674 if pref_height == 0:
675 return
676 # maybe try switching chains; starting with most desirable first
677 matching_chains = blockchain.get_chains_that_contain_header(pref_height, pref_hash)
678 chains_to_try = list(matching_chains) + [blockchain.get_best_chain()]
679 for rank, chain in enumerate(chains_to_try):
680 # check if main interface is already on this fork
681 if self.interface.blockchain == chain:
682 return
683 # switch to another random interface that is on this fork, if any
684 filtered = [iface for iface in interfaces
685 if iface.blockchain == chain]
686 if filtered:
687 self.logger.info(f"switching to (more) preferred fork (rank {rank})")
688 chosen_iface = random.choice(filtered)
689 await self.switch_to_interface(chosen_iface.server)
690 return
691 self.logger.info("tried to switch to (more) preferred fork but no interfaces are on any")
692
693 async def switch_to_interface(self, server: ServerAddr):
694 """Switch to server as our main interface. If no connection exists,
695 queue interface to be started. The actual switch will
696 happen when the interface becomes ready.
697 """
698 self.default_server = server
699 old_interface = self.interface
700 old_server = old_interface.server if old_interface else None
701
702 # Stop any current interface in order to terminate subscriptions,
703 # and to cancel tasks in interface.taskgroup.
704 if old_server and old_server != server:
705 # don't wait for old_interface to close as that might be slow:
706 await self.taskgroup.spawn(self._close_interface(old_interface))
707
708 if server not in self.interfaces:
709 self.interface = None
710 await self.taskgroup.spawn(self._run_new_interface(server))
711 return
712
713 i = self.interfaces[server]
714 if old_interface != i:
715 self.logger.info(f"switching to {server}")
716 assert i.ready.done(), "interface we are switching to is not ready yet"
717 blockchain_updated = i.blockchain != self.blockchain()
718 self.interface = i
719 await i.taskgroup.spawn(self._request_server_info(i))
720 util.trigger_callback('default_server_changed')
721 self.default_server_changed_event.set()
722 self.default_server_changed_event.clear()
723 self._set_status('connected')
724 util.trigger_callback('network_updated')
725 if blockchain_updated:
726 util.trigger_callback('blockchain_updated')
727
728 async def _close_interface(self, interface: Optional[Interface]):
729 if not interface:
730 return
731 if interface.server in self._closing_ifaces:
732 return
733 self._closing_ifaces.add(interface.server)
734 with self.interfaces_lock:
735 if self.interfaces.get(interface.server) == interface:
736 self.interfaces.pop(interface.server)
737 if interface == self.interface:
738 self.interface = None
739 try:
740 # this can take some time if server/connection is slow:
741 await interface.close()
742 await interface.got_disconnected.wait()
743 finally:
744 self._closing_ifaces.discard(interface.server)
745
746 @with_recent_servers_lock
747 def _add_recent_server(self, server: ServerAddr) -> None:
748 self._on_connection_successfully_established(server)
749 # list is ordered
750 if server in self._recent_servers:
751 self._recent_servers.remove(server)
752 self._recent_servers.insert(0, server)
753 self._recent_servers = self._recent_servers[:NUM_RECENT_SERVERS]
754 self._save_recent_servers()
755
756 async def connection_down(self, interface: Interface):
757 '''A connection to server either went down, or was never made.
758 We distinguish by whether it is in self.interfaces.'''
759 if not interface: return
760 if interface.server == self.default_server:
761 self._set_status('disconnected')
762 await self._close_interface(interface)
763 util.trigger_callback('network_updated')
764
765 def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
766 if self.oneserver and not self.auto_connect:
767 return request_type.MOST_RELAXED
768 if self.proxy:
769 return request_type.RELAXED
770 return request_type.NORMAL
771
772 @ignore_exceptions # do not kill outer taskgroup
773 @log_exceptions
774 async def _run_new_interface(self, server: ServerAddr):
775 if (server in self.interfaces
776 or server in self._connecting_ifaces
777 or server in self._closing_ifaces):
778 return
779 self._connecting_ifaces.add(server)
780 if server == self.default_server:
781 self.logger.info(f"connecting to {server} as new interface")
782 self._set_status('connecting')
783 self._trying_addr_now(server)
784
785 interface = Interface(network=self, server=server, proxy=self.proxy)
786 # note: using longer timeouts here as DNS can sometimes be slow!
787 timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
788 try:
789 await asyncio.wait_for(interface.ready, timeout)
790 except BaseException as e:
791 self.logger.info(f"couldn't launch iface {server} -- {repr(e)}")
792 await interface.close()
793 return
794 else:
795 with self.interfaces_lock:
796 assert server not in self.interfaces
797 self.interfaces[server] = interface
798 finally:
799 self._connecting_ifaces.discard(server)
800
801 if server == self.default_server:
802 await self.switch_to_interface(server)
803
804 self._has_ever_managed_to_connect_to_server = True
805 self._add_recent_server(server)
806 util.trigger_callback('network_updated')
807
808 def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check: Interface) -> bool:
809 # main interface is exempt. this makes switching servers easier
810 if iface_to_check.is_main_server():
811 return True
812 if not iface_to_check.bucket_based_on_ipaddress():
813 return True
814 # bucket connected interfaces
815 with self.interfaces_lock:
816 interfaces = list(self.interfaces.values())
817 if iface_to_check in interfaces:
818 interfaces.remove(iface_to_check)
819 buckets = defaultdict(list)
820 for iface in interfaces:
821 buckets[iface.bucket_based_on_ipaddress()].append(iface)
822 # check proposed server against buckets
823 onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
824 if iface_to_check.is_tor():
825 # keep number of onion servers below half of all connected servers
826 if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
827 return False
828 else:
829 bucket = iface_to_check.bucket_based_on_ipaddress()
830 if len(buckets[bucket]) > 0:
831 return False
832 return True
833
834 def best_effort_reliable(func):
835 @functools.wraps(func)
836 async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
837 for i in range(10):
838 iface = self.interface
839 # retry until there is a main interface
840 if not iface:
841 async with ignore_after(1):
842 await self.default_server_changed_event.wait()
843 continue # try again
844 assert iface.ready.done(), "interface not ready yet"
845 # try actual request
846 try:
847 async with TaskGroup(wait=any) as group:
848 task = await group.spawn(func(self, *args, **kwargs))
849 await group.spawn(iface.got_disconnected.wait())
850 except RequestTimedOut:
851 await iface.close()
852 await iface.got_disconnected.wait()
853 continue # try again
854 except RequestCorrupted as e:
855 # TODO ban server?
856 iface.logger.exception(f"RequestCorrupted: {e}")
857 await iface.close()
858 await iface.got_disconnected.wait()
859 continue # try again
860 if task.done() and not task.cancelled():
861 return task.result()
862 # otherwise; try again
863 raise BestEffortRequestFailed('no interface to do request on... gave up.')
864 return make_reliable_wrapper
865
866 def catch_server_exceptions(func):
867 @functools.wraps(func)
868 async def wrapper(self, *args, **kwargs):
869 try:
870 return await func(self, *args, **kwargs)
871 except aiorpcx.jsonrpc.CodeMessageError as e:
872 raise UntrustedServerReturnedError(original_exception=e) from e
873 return wrapper
874
875 @best_effort_reliable
876 @catch_server_exceptions
877 async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict:
878 return await self.interface.get_merkle_for_transaction(tx_hash=tx_hash, tx_height=tx_height)
879
880 @best_effort_reliable
881 async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
882 if timeout is None:
883 timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
884 # TODO: libbitcoin
885 _ec = await self.interface.broadcast_transaction(tx.serialize(), timeout=timeout)
886 if _ec != 0:
887 raise TxBroadcastServerReturnedError(f"not validated, error: {_ec!r}")
888
889 async def try_broadcasting(self, tx, name):
890 try:
891 await self.broadcast_transaction(tx)
892 except Exception as e:
893 self.logger.info(f'error: could not broadcast {name} {tx.txid()}, {str(e)}')
894 else:
895 self.logger.info(f'success: broadcasting {name} {tx.txid()}')
896
897 @staticmethod
898 def sanitize_tx_broadcast_response(server_msg) -> str:
899 # Unfortunately, bitcoind and hence the Electrum protocol doesn't return a useful error code.
900 # So, we use substring matching to grok the error message.
901 # server_msg is untrusted input so it should not be shown to the user. see #4968
902 server_msg = str(server_msg)
903 server_msg = server_msg.replace("\n", r"\n")
904 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/policy/policy.cpp
905 # grep "reason ="
906 policy_error_messages = {
907 r"version": _("Transaction uses non-standard version."),
908 r"tx-size": _("The transaction was rejected because it is too large (in bytes)."),
909 r"scriptsig-size": None,
910 r"scriptsig-not-pushonly": None,
911 r"scriptpubkey":
912 ("scriptpubkey\n" +
913 _("Some of the outputs pay to a non-standard script.")),
914 r"bare-multisig": None,
915 r"dust":
916 (_("Transaction could not be broadcast due to dust outputs.\n"
917 "Some of the outputs are too small in value, probably lower than 1000 satoshis.\n"
918 "Check the units, make sure you haven't confused e.g. mBTC and BTC.")),
919 r"multi-op-return": _("The transaction was rejected because it contains multiple OP_RETURN outputs."),
920 }
921 for substring in policy_error_messages:
922 if substring in server_msg:
923 msg = policy_error_messages[substring]
924 return msg if msg else substring
925 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/script/script_error.cpp
926 script_error_messages = {
927 r"Script evaluated without error but finished with a false/empty top stack element",
928 r"Script failed an OP_VERIFY operation",
929 r"Script failed an OP_EQUALVERIFY operation",
930 r"Script failed an OP_CHECKMULTISIGVERIFY operation",
931 r"Script failed an OP_CHECKSIGVERIFY operation",
932 r"Script failed an OP_NUMEQUALVERIFY operation",
933 r"Script is too big",
934 r"Push value size limit exceeded",
935 r"Operation limit exceeded",
936 r"Stack size limit exceeded",
937 r"Signature count negative or greater than pubkey count",
938 r"Pubkey count negative or limit exceeded",
939 r"Opcode missing or not understood",
940 r"Attempted to use a disabled opcode",
941 r"Operation not valid with the current stack size",
942 r"Operation not valid with the current altstack size",
943 r"OP_RETURN was encountered",
944 r"Invalid OP_IF construction",
945 r"Negative locktime",
946 r"Locktime requirement not satisfied",
947 r"Signature hash type missing or not understood",
948 r"Non-canonical DER signature",
949 r"Data push larger than necessary",
950 r"Only push operators allowed in signatures",
951 r"Non-canonical signature: S value is unnecessarily high",
952 r"Dummy CHECKMULTISIG argument must be zero",
953 r"OP_IF/NOTIF argument must be minimal",
954 r"Signature must be zero for failed CHECK(MULTI)SIG operation",
955 r"NOPx reserved for soft-fork upgrades",
956 r"Witness version reserved for soft-fork upgrades",
957 r"Taproot version reserved for soft-fork upgrades",
958 r"OP_SUCCESSx reserved for soft-fork upgrades",
959 r"Public key version reserved for soft-fork upgrades",
960 r"Public key is neither compressed or uncompressed",
961 r"Stack size must be exactly one after execution",
962 r"Extra items left on stack after execution",
963 r"Witness program has incorrect length",
964 r"Witness program was passed an empty witness",
965 r"Witness program hash mismatch",
966 r"Witness requires empty scriptSig",
967 r"Witness requires only-redeemscript scriptSig",
968 r"Witness provided for non-witness script",
969 r"Using non-compressed keys in segwit",
970 r"Invalid Schnorr signature size",
971 r"Invalid Schnorr signature hash type",
972 r"Invalid Schnorr signature",
973 r"Invalid Taproot control block size",
974 r"Too much signature validation relative to witness weight",
975 r"OP_CHECKMULTISIG(VERIFY) is not available in tapscript",
976 r"OP_IF/NOTIF argument must be minimal in tapscript",
977 r"Using OP_CODESEPARATOR in non-witness script",
978 r"Signature is found in scriptCode",
979 }
980 for substring in script_error_messages:
981 if substring in server_msg:
982 return substring
983 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/validation.cpp
984 # grep "REJECT_"
985 # grep "TxValidationResult"
986 # should come after script_error.cpp (due to e.g. non-mandatory-script-verify-flag)
987 validation_error_messages = {
988 r"coinbase": None,
989 r"tx-size-small": None,
990 r"non-final": None,
991 r"txn-already-in-mempool": None,
992 r"txn-mempool-conflict": None,
993 r"txn-already-known": None,
994 r"non-BIP68-final": None,
995 r"bad-txns-nonstandard-inputs": None,
996 r"bad-witness-nonstandard": None,
997 r"bad-txns-too-many-sigops": None,
998 r"mempool min fee not met":
999 ("mempool min fee not met\n" +
1000 _("Your transaction is paying a fee that is so low that the bitcoin node cannot "
1001 "fit it into its mempool. The mempool is already full of hundreds of megabytes "
1002 "of transactions that all pay higher fees. Try to increase the fee.")),
1003 r"min relay fee not met": None,
1004 r"absurdly-high-fee": None,
1005 r"max-fee-exceeded": None,
1006 r"too-long-mempool-chain": None,
1007 r"bad-txns-spends-conflicting-tx": None,
1008 r"insufficient fee": ("insufficient fee\n" +
1009 _("Your transaction is trying to replace another one in the mempool but it "
1010 "does not meet the rules to do so. Try to increase the fee.")),
1011 r"too many potential replacements": None,
1012 r"replacement-adds-unconfirmed": None,
1013 r"mempool full": None,
1014 r"non-mandatory-script-verify-flag": None,
1015 r"mandatory-script-verify-flag-failed": None,
1016 r"Transaction check failed": None,
1017 }
1018 for substring in validation_error_messages:
1019 if substring in server_msg:
1020 msg = validation_error_messages[substring]
1021 return msg if msg else substring
1022 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/rpc/rawtransaction.cpp
1023 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/util/error.cpp
1024 # grep "RPC_TRANSACTION"
1025 # grep "RPC_DESERIALIZATION_ERROR"
1026 rawtransaction_error_messages = {
1027 r"Missing inputs": None,
1028 r"Inputs missing or spent": None,
1029 r"transaction already in block chain": None,
1030 r"Transaction already in block chain": None,
1031 r"TX decode failed": None,
1032 r"Peer-to-peer functionality missing or disabled": None,
1033 r"Transaction rejected by AcceptToMemoryPool": None,
1034 r"AcceptToMemoryPool failed": None,
1035 r"Fee exceeds maximum configured by user": None,
1036 }
1037 for substring in rawtransaction_error_messages:
1038 if substring in server_msg:
1039 msg = rawtransaction_error_messages[substring]
1040 return msg if msg else substring
1041 # https://github.com/bitcoin/bitcoin/blob/5bb64acd9d3ced6e6f95df282a1a0f8b98522cb0/src/consensus/tx_verify.cpp
1042 # https://github.com/bitcoin/bitcoin/blob/c7ad94428ab6f54661d7a5441e1fdd0ebf034903/src/consensus/tx_check.cpp
1043 # grep "REJECT_"
1044 # grep "TxValidationResult"
1045 tx_verify_error_messages = {
1046 r"bad-txns-vin-empty": None,
1047 r"bad-txns-vout-empty": None,
1048 r"bad-txns-oversize": None,
1049 r"bad-txns-vout-negative": None,
1050 r"bad-txns-vout-toolarge": None,
1051 r"bad-txns-txouttotal-toolarge": None,
1052 r"bad-txns-inputs-duplicate": None,
1053 r"bad-cb-length": None,
1054 r"bad-txns-prevout-null": None,
1055 r"bad-txns-inputs-missingorspent":
1056 ("bad-txns-inputs-missingorspent\n" +
1057 _("You might have a local transaction in your wallet that this transaction "
1058 "builds on top. You need to either broadcast or remove the local tx.")),
1059 r"bad-txns-premature-spend-of-coinbase": None,
1060 r"bad-txns-inputvalues-outofrange": None,
1061 r"bad-txns-in-belowout": None,
1062 r"bad-txns-fee-outofrange": None,
1063 }
1064 for substring in tx_verify_error_messages:
1065 if substring in server_msg:
1066 msg = tx_verify_error_messages[substring]
1067 return msg if msg else substring
1068 # otherwise:
1069 return _("Unknown error")
1070
1071 @best_effort_reliable
1072 @catch_server_exceptions
1073 async def request_chunk(self, height: int, tip=None, *, can_return_early=False):
1074 return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early)
1075
1076 @best_effort_reliable
1077 @catch_server_exceptions
1078 async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
1079 return await self.interface.get_transaction(tx_hash=tx_hash, timeout=timeout)
1080
1081 @best_effort_reliable
1082 @catch_server_exceptions
1083 async def get_history_for_scripthash(self, sh: str) -> List[dict]:
1084 return await self.interface.get_history_for_scripthash(sh)
1085
1086 @best_effort_reliable
1087 @catch_server_exceptions
1088 async def listunspent_for_scripthash(self, sh: str) -> List[dict]:
1089 return await self.interface.listunspent_for_scripthash(sh)
1090
1091 @best_effort_reliable
1092 @catch_server_exceptions
1093 async def get_balance_for_scripthash(self, sh: str) -> dict:
1094 return await self.interface.get_balance_for_scripthash(sh)
1095
1096 @best_effort_reliable
1097 @catch_server_exceptions
1098 async def get_txid_from_txpos(self, tx_height, tx_pos, merkle):
1099 return await self.interface.get_txid_from_txpos(tx_height, tx_pos, merkle)
1100
1101 def blockchain(self) -> Blockchain:
1102 interface = self.interface
1103 if interface and interface.blockchain is not None:
1104 self._blockchain = interface.blockchain
1105 return self._blockchain
1106
1107 def get_blockchains(self):
1108 out = {} # blockchain_id -> list(interfaces)
1109 with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items())
1110 with self.interfaces_lock: interfaces_values = list(self.interfaces.values())
1111 for chain_id, bc in blockchain_items:
1112 r = list(filter(lambda i: i.blockchain==bc, interfaces_values))
1113 if r:
1114 out[chain_id] = r
1115 return out
1116
1117 def _set_preferred_chain(self, chain: Optional[Blockchain]):
1118 if chain:
1119 height = chain.get_max_forkpoint()
1120 header_hash = chain.get_hash(height)
1121 else:
1122 height = 0
1123 header_hash = constants.net.GENESIS
1124 self._blockchain_preferred_block = {
1125 'height': height,
1126 'hash': header_hash,
1127 }
1128 self.config.set_key('blockchain_preferred_block', self._blockchain_preferred_block)
1129
1130 async def follow_chain_given_id(self, chain_id: str) -> None:
1131 bc = blockchain.blockchains.get(chain_id)
1132 if not bc:
1133 raise Exception('blockchain {} not found'.format(chain_id))
1134 self._set_preferred_chain(bc)
1135 # select server on this chain
1136 with self.interfaces_lock: interfaces = list(self.interfaces.values())
1137 interfaces_on_selected_chain = list(filter(lambda iface: iface.blockchain == bc, interfaces))
1138 if len(interfaces_on_selected_chain) == 0: return
1139 chosen_iface = random.choice(interfaces_on_selected_chain) # type: Interface
1140 # switch to server (and save to config)
1141 net_params = self.get_parameters()
1142 net_params = net_params._replace(server=chosen_iface.server)
1143 await self.set_parameters(net_params)
1144
1145 async def follow_chain_given_server(self, server: ServerAddr) -> None:
1146 # note that server_str should correspond to a connected interface
1147 iface = self.interfaces.get(server)
1148 if iface is None:
1149 return
1150 self._set_preferred_chain(iface.blockchain)
1151 # switch to server (and save to config)
1152 net_params = self.get_parameters()
1153 net_params = net_params._replace(server=server)
1154 await self.set_parameters(net_params)
1155
1156 def get_server_height(self) -> int:
1157 """Length of header chain, as claimed by main interface."""
1158 interface = self.interface
1159 return interface.tip if interface else 0
1160
1161 def get_local_height(self):
1162 """Length of header chain, POW-verified.
1163 In case of a chain split, this is for the branch the main interface is on,
1164 but it is the tip of that branch (even if main interface is behind).
1165 """
1166 return self.blockchain().height()
1167
1168 def export_checkpoints(self, path):
1169 """Run manually to generate blockchain checkpoints.
1170 Kept for console use only.
1171 """
1172 cp = self.blockchain().get_checkpoints()
1173 with open(path, 'w', encoding='utf-8') as f:
1174 f.write(json.dumps(cp, indent=4))
1175
1176 async def _start(self):
1177 assert not self.taskgroup
1178 self.taskgroup = taskgroup = SilentTaskGroup()
1179 assert not self.interface and not self.interfaces
1180 assert not self._connecting_ifaces
1181 assert not self._closing_ifaces
1182 self.logger.info('starting network')
1183 self._clear_addr_retry_times()
1184 self._set_proxy(deserialize_proxy(self.config.get('proxy')))
1185 self._maybe_set_oneserver()
1186 await self.taskgroup.spawn(self._run_new_interface(self.default_server))
1187
1188 async def main():
1189 self.logger.info("starting taskgroup.")
1190 try:
1191 # note: if a task finishes with CancelledError, that
1192 # will NOT raise, and the group will keep the other tasks running
1193 async with taskgroup as group:
1194 await group.spawn(self._maintain_sessions())
1195 [await group.spawn(job) for job in self._jobs]
1196 except asyncio.CancelledError:
1197 raise
1198 except Exception as e:
1199 self.logger.exception("taskgroup died.")
1200 finally:
1201 self.logger.info("taskgroup stopped.")
1202 asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
1203
1204 util.trigger_callback('network_updated')
1205
1206 def start(self, jobs: Iterable = None):
1207 """Schedule starting the network, along with the given job co-routines.
1208
1209 Note: the jobs will *restart* every time the network restarts, e.g. on proxy
1210 setting changes.
1211 """
1212 self._jobs = jobs or []
1213 asyncio.run_coroutine_threadsafe(self._start(), self.asyncio_loop)
1214
1215 @log_exceptions
1216 async def stop(self, *, full_shutdown: bool = True):
1217 self.logger.info("stopping network")
1218 # timeout: if full_shutdown, it is up to the caller to time us out,
1219 # otherwise if e.g. restarting due to proxy changes, we time out fast
1220 async with (nullcontext() if full_shutdown else ignore_after(1)):
1221 async with TaskGroup() as group:
1222 await group.spawn(self.taskgroup.cancel_remaining())
1223 if full_shutdown:
1224 await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
1225 self.taskgroup = None
1226 self.interface = None
1227 self.interfaces = {}
1228 self._connecting_ifaces.clear()
1229 self._closing_ifaces.clear()
1230 if not full_shutdown:
1231 util.trigger_callback('network_updated')
1232
1233 async def _ensure_there_is_a_main_interface(self):
1234 if self.is_connected():
1235 return
1236 # if auto_connect is set, try a different server
1237 if self.auto_connect and not self.is_connecting():
1238 await self._switch_to_random_interface()
1239 # if auto_connect is not set, or still no main interface, retry current
1240 if not self.is_connected() and not self.is_connecting():
1241 if self._can_retry_addr(self.default_server, urgent=True):
1242 await self.switch_to_interface(self.default_server)
1243
1244 async def _maintain_sessions(self):
1245 async def maybe_start_new_interfaces():
1246 num_existing_ifaces = len(self.interfaces) + len(self._connecting_ifaces) + len(self._closing_ifaces)
1247 for i in range(self.num_server - num_existing_ifaces):
1248 # FIXME this should try to honour "healthy spread of connected servers"
1249 server = self._get_next_server_to_try()
1250 if server:
1251 await self.taskgroup.spawn(self._run_new_interface(server))
1252 async def maintain_healthy_spread_of_connected_servers():
1253 with self.interfaces_lock: interfaces = list(self.interfaces.values())
1254 random.shuffle(interfaces)
1255 for iface in interfaces:
1256 if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
1257 self.logger.info(f"disconnecting from {iface.server}. too many connected "
1258 f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
1259 await self._close_interface(iface)
1260 async def maintain_main_interface():
1261 await self._ensure_there_is_a_main_interface()
1262 if self.is_connected():
1263 if self.config.is_fee_estimates_update_required():
1264 await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
1265
1266 while True:
1267 try:
1268 await maybe_start_new_interfaces()
1269 await maintain_healthy_spread_of_connected_servers()
1270 await maintain_main_interface()
1271 except asyncio.CancelledError:
1272 # suppress spurious cancellations
1273 group = self.taskgroup
1274 if not group or group.closed():
1275 raise
1276 await asyncio.sleep(0.1)
1277
1278 @classmethod
1279 async def _send_http_on_proxy(cls, method: str, url: str, params: str = None,
1280 body: bytes = None, json: dict = None, headers=None,
1281 on_finish=None, timeout=None):
1282 async def default_on_finish(resp: ClientResponse):
1283 resp.raise_for_status()
1284 return await resp.text()
1285 if headers is None:
1286 headers = {}
1287 if on_finish is None:
1288 on_finish = default_on_finish
1289 network = cls.get_instance()
1290 proxy = network.proxy if network else None
1291 async with make_aiohttp_session(proxy, timeout=timeout) as session:
1292 if method == 'get':
1293 async with session.get(url, params=params, headers=headers) as resp:
1294 return await on_finish(resp)
1295 elif method == 'post':
1296 assert body is not None or json is not None, 'body or json must be supplied if method is post'
1297 if body is not None:
1298 async with session.post(url, data=body, headers=headers) as resp:
1299 return await on_finish(resp)
1300 elif json is not None:
1301 async with session.post(url, json=json, headers=headers) as resp:
1302 return await on_finish(resp)
1303 else:
1304 assert False
1305
1306 @classmethod
1307 def send_http_on_proxy(cls, method, url, **kwargs):
1308 network = cls.get_instance()
1309 if network:
1310 assert network._loop_thread is not threading.currentThread()
1311 loop = network.asyncio_loop
1312 else:
1313 loop = asyncio.get_event_loop()
1314 coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
1315 # note: _send_http_on_proxy has its own timeout, so no timeout here:
1316 return coro.result()
1317
1318 # methods used in scripts
1319 async def get_peers(self):
1320 while not self.is_connected():
1321 await asyncio.sleep(1)
1322 session = self.interface.session
1323 # TODO: libbitcoin
1324 return parse_servers(await session.send_request('server.peers.subscribe'))
1325
1326 async def send_multiple_requests(self, servers: Sequence[ServerAddr], method: str, params: Sequence):
1327 responses = dict()
1328 async def get_response(server: ServerAddr):
1329 interface = Interface(network=self, server=server, proxy=self.proxy)
1330 timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
1331 try:
1332 await asyncio.wait_for(interface.ready, timeout)
1333 except BaseException as e:
1334 await interface.close()
1335 return
1336 try:
1337 # TODO: libbitcoin XXX:
1338 res = await interface.session.send_request(method, params, timeout=10)
1339 except Exception as e:
1340 res = e
1341 responses[interface.server] = res
1342 async with TaskGroup() as group:
1343 for server in servers:
1344 await group.spawn(get_response(server))
1345 return responses