tutil.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tutil.py (52263B)
---
1 # Electrum - lightweight Bitcoin client
2 # Copyright (C) 2011 Thomas Voegtlin
3 #
4 # Permission is hereby granted, free of charge, to any person
5 # obtaining a copy of this software and associated documentation files
6 # (the "Software"), to deal in the Software without restriction,
7 # including without limitation the rights to use, copy, modify, merge,
8 # publish, distribute, sublicense, and/or sell copies of the Software,
9 # and to permit persons to whom the Software is furnished to do so,
10 # subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be
13 # included in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 # SOFTWARE.
23 import binascii
24 import os, sys, re, json
25 from collections import defaultdict, OrderedDict
26 from typing import (NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any,
27 Sequence, Dict, Generic, TypeVar, List, Iterable)
28 from datetime import datetime
29 import decimal
30 from decimal import Decimal
31 import traceback
32 import urllib
33 import threading
34 import hmac
35 import stat
36 from locale import localeconv
37 import asyncio
38 import urllib.request, urllib.parse, urllib.error
39 import builtins
40 import json
41 import time
42 from typing import NamedTuple, Optional
43 import ssl
44 import ipaddress
45 from ipaddress import IPv4Address, IPv6Address
46 import random
47 import secrets
48 import functools
49 from abc import abstractmethod, ABC
50
51 import attr
52 import aiohttp
53 from aiohttp_socks import ProxyConnector, ProxyType
54 import aiorpcx
55 from aiorpcx import TaskGroup
56 import certifi
57 import dns.resolver
58
59 from .i18n import _
60 from .logging import get_logger, Logger
61
62 if TYPE_CHECKING:
63 from .network import Network
64 from .interface import Interface
65 from .simple_config import SimpleConfig
66
67
68 _logger = get_logger(__name__)
69
70
71 def inv_dict(d):
72 return {v: k for k, v in d.items()}
73
74
75 ca_path = certifi.where()
76
77
78 base_units = {'BTC':8, 'mBTC':5, 'bits':2, 'sat':0}
79 base_units_inverse = inv_dict(base_units)
80 base_units_list = ['BTC', 'mBTC', 'bits', 'sat'] # list(dict) does not guarantee order
81
82 DECIMAL_POINT_DEFAULT = 5 # mBTC
83
84
85 class UnknownBaseUnit(Exception): pass
86
87
88 def decimal_point_to_base_unit_name(dp: int) -> str:
89 # e.g. 8 -> "BTC"
90 try:
91 return base_units_inverse[dp]
92 except KeyError:
93 raise UnknownBaseUnit(dp) from None
94
95
96 def base_unit_name_to_decimal_point(unit_name: str) -> int:
97 # e.g. "BTC" -> 8
98 try:
99 return base_units[unit_name]
100 except KeyError:
101 raise UnknownBaseUnit(unit_name) from None
102
103
104 class NotEnoughFunds(Exception):
105 def __str__(self):
106 return _("Insufficient funds")
107
108
109 class NoDynamicFeeEstimates(Exception):
110 def __str__(self):
111 return _('Dynamic fee estimates not available')
112
113
114 class MultipleSpendMaxTxOutputs(Exception):
115 def __str__(self):
116 return _('At most one output can be set to spend max')
117
118
119 class InvalidPassword(Exception):
120 def __str__(self):
121 return _("Incorrect password")
122
123
124 class AddTransactionException(Exception):
125 pass
126
127
128 class UnrelatedTransactionException(AddTransactionException):
129 def __str__(self):
130 return _("Transaction is unrelated to this wallet.")
131
132
133 class FileImportFailed(Exception):
134 def __init__(self, message=''):
135 self.message = str(message)
136
137 def __str__(self):
138 return _("Failed to import from file.") + "\n" + self.message
139
140
141 class FileExportFailed(Exception):
142 def __init__(self, message=''):
143 self.message = str(message)
144
145 def __str__(self):
146 return _("Failed to export to file.") + "\n" + self.message
147
148
149 class WalletFileException(Exception): pass
150
151
152 class BitcoinException(Exception): pass
153
154
155 class UserFacingException(Exception):
156 """Exception that contains information intended to be shown to the user."""
157
158
159 class InvoiceError(UserFacingException): pass
160
161
162 # Throw this exception to unwind the stack like when an error occurs.
163 # However unlike other exceptions the user won't be informed.
164 class UserCancelled(Exception):
165 '''An exception that is suppressed from the user'''
166 pass
167
168
169 # note: this is not a NamedTuple as then its json encoding cannot be customized
170 class Satoshis(object):
171 __slots__ = ('value',)
172
173 def __new__(cls, value):
174 self = super(Satoshis, cls).__new__(cls)
175 # note: 'value' sometimes has msat precision
176 self.value = value
177 return self
178
179 def __repr__(self):
180 return f'Satoshis({self.value})'
181
182 def __str__(self):
183 # note: precision is truncated to satoshis here
184 return format_satoshis(self.value)
185
186 def __eq__(self, other):
187 return self.value == other.value
188
189 def __ne__(self, other):
190 return not (self == other)
191
192 def __add__(self, other):
193 return Satoshis(self.value + other.value)
194
195
196 # note: this is not a NamedTuple as then its json encoding cannot be customized
197 class Fiat(object):
198 __slots__ = ('value', 'ccy')
199
200 def __new__(cls, value: Optional[Decimal], ccy: str):
201 self = super(Fiat, cls).__new__(cls)
202 self.ccy = ccy
203 if not isinstance(value, (Decimal, type(None))):
204 raise TypeError(f"value should be Decimal or None, not {type(value)}")
205 self.value = value
206 return self
207
208 def __repr__(self):
209 return 'Fiat(%s)'% self.__str__()
210
211 def __str__(self):
212 if self.value is None or self.value.is_nan():
213 return _('No Data')
214 else:
215 return "{:.2f}".format(self.value)
216
217 def to_ui_string(self):
218 if self.value is None or self.value.is_nan():
219 return _('No Data')
220 else:
221 return "{:.2f}".format(self.value) + ' ' + self.ccy
222
223 def __eq__(self, other):
224 if not isinstance(other, Fiat):
225 return False
226 if self.ccy != other.ccy:
227 return False
228 if isinstance(self.value, Decimal) and isinstance(other.value, Decimal) \
229 and self.value.is_nan() and other.value.is_nan():
230 return True
231 return self.value == other.value
232
233 def __ne__(self, other):
234 return not (self == other)
235
236 def __add__(self, other):
237 assert self.ccy == other.ccy
238 return Fiat(self.value + other.value, self.ccy)
239
240
241 class MyEncoder(json.JSONEncoder):
242 def default(self, obj):
243 # note: this does not get called for namedtuples :( https://bugs.python.org/issue30343
244 from .transaction import Transaction, TxOutput
245 from .lnutil import UpdateAddHtlc
246 if isinstance(obj, UpdateAddHtlc):
247 return obj.to_tuple()
248 if isinstance(obj, Transaction):
249 return obj.serialize()
250 if isinstance(obj, TxOutput):
251 return obj.to_legacy_tuple()
252 if isinstance(obj, Satoshis):
253 return str(obj)
254 if isinstance(obj, Fiat):
255 return str(obj)
256 if isinstance(obj, Decimal):
257 return str(obj)
258 if isinstance(obj, datetime):
259 return obj.isoformat(' ')[:-3]
260 if isinstance(obj, set):
261 return list(obj)
262 if isinstance(obj, bytes): # for nametuples in lnchannel
263 return obj.hex()
264 if hasattr(obj, 'to_json') and callable(obj.to_json):
265 return obj.to_json()
266 return super(MyEncoder, self).default(obj)
267
268
269 class ThreadJob(Logger):
270 """A job that is run periodically from a thread's main loop. run() is
271 called from that thread's context.
272 """
273
274 def __init__(self):
275 Logger.__init__(self)
276
277 def run(self):
278 """Called periodically from the thread"""
279 pass
280
281 class DebugMem(ThreadJob):
282 '''A handy class for debugging GC memory leaks'''
283 def __init__(self, classes, interval=30):
284 ThreadJob.__init__(self)
285 self.next_time = 0
286 self.classes = classes
287 self.interval = interval
288
289 def mem_stats(self):
290 import gc
291 self.logger.info("Start memscan")
292 gc.collect()
293 objmap = defaultdict(list)
294 for obj in gc.get_objects():
295 for class_ in self.classes:
296 if isinstance(obj, class_):
297 objmap[class_].append(obj)
298 for class_, objs in objmap.items():
299 self.logger.info(f"{class_.__name__}: {len(objs)}")
300 self.logger.info("Finish memscan")
301
302 def run(self):
303 if time.time() > self.next_time:
304 self.mem_stats()
305 self.next_time = time.time() + self.interval
306
307 class DaemonThread(threading.Thread, Logger):
308 """ daemon thread that terminates cleanly """
309
310 LOGGING_SHORTCUT = 'd'
311
312 def __init__(self):
313 threading.Thread.__init__(self)
314 Logger.__init__(self)
315 self.parent_thread = threading.currentThread()
316 self.running = False
317 self.running_lock = threading.Lock()
318 self.job_lock = threading.Lock()
319 self.jobs = []
320 self.stopped_event = threading.Event() # set when fully stopped
321
322 def add_jobs(self, jobs):
323 with self.job_lock:
324 self.jobs.extend(jobs)
325
326 def run_jobs(self):
327 # Don't let a throwing job disrupt the thread, future runs of
328 # itself, or other jobs. This is useful protection against
329 # malformed or malicious server responses
330 with self.job_lock:
331 for job in self.jobs:
332 try:
333 job.run()
334 except Exception as e:
335 self.logger.exception('')
336
337 def remove_jobs(self, jobs):
338 with self.job_lock:
339 for job in jobs:
340 self.jobs.remove(job)
341
342 def start(self):
343 with self.running_lock:
344 self.running = True
345 return threading.Thread.start(self)
346
347 def is_running(self):
348 with self.running_lock:
349 return self.running and self.parent_thread.is_alive()
350
351 def stop(self):
352 with self.running_lock:
353 self.running = False
354
355 def on_stop(self):
356 if 'ANDROID_DATA' in os.environ:
357 import jnius
358 jnius.detach()
359 self.logger.info("jnius detach")
360 self.logger.info("stopped")
361 self.stopped_event.set()
362
363
364 def print_stderr(*args):
365 args = [str(item) for item in args]
366 sys.stderr.write(" ".join(args) + "\n")
367 sys.stderr.flush()
368
369 def print_msg(*args):
370 # Stringify args
371 args = [str(item) for item in args]
372 sys.stdout.write(" ".join(args) + "\n")
373 sys.stdout.flush()
374
375 def json_encode(obj):
376 try:
377 s = json.dumps(obj, sort_keys = True, indent = 4, cls=MyEncoder)
378 except TypeError:
379 s = repr(obj)
380 return s
381
382 def json_decode(x):
383 try:
384 return json.loads(x, parse_float=Decimal)
385 except:
386 return x
387
388 def json_normalize(x):
389 # note: The return value of commands, when going through the JSON-RPC interface,
390 # is json-encoded. The encoder used there cannot handle some types, e.g. electrum.util.Satoshis.
391 # note: We should not simply do "json_encode(x)" here, as then later x would get doubly json-encoded.
392 # see #5868
393 return json_decode(json_encode(x))
394
395
396 # taken from Django Source Code
397 def constant_time_compare(val1, val2):
398 """Return True if the two strings are equal, False otherwise."""
399 return hmac.compare_digest(to_bytes(val1, 'utf8'), to_bytes(val2, 'utf8'))
400
401
402 # decorator that prints execution time
403 _profiler_logger = _logger.getChild('profiler')
404 def profiler(func):
405 def do_profile(args, kw_args):
406 name = func.__qualname__
407 t0 = time.time()
408 o = func(*args, **kw_args)
409 t = time.time() - t0
410 _profiler_logger.debug(f"{name} {t:,.4f}")
411 return o
412 return lambda *args, **kw_args: do_profile(args, kw_args)
413
414
415 def android_ext_dir():
416 from android.storage import primary_external_storage_path
417 return primary_external_storage_path()
418
419 def android_backup_dir():
420 d = os.path.join(android_ext_dir(), 'org.electrum.electrum')
421 if not os.path.exists(d):
422 os.mkdir(d)
423 return d
424
425 def android_data_dir():
426 import jnius
427 PythonActivity = jnius.autoclass('org.kivy.android.PythonActivity')
428 return PythonActivity.mActivity.getFilesDir().getPath() + '/data'
429
430 def get_backup_dir(config):
431 if 'ANDROID_DATA' in os.environ:
432 return android_backup_dir() if config.get('android_backups') else None
433 else:
434 return config.get('backup_dir')
435
436 def ensure_sparse_file(filename):
437 # On modern Linux, no need to do anything.
438 # On Windows, need to explicitly mark file.
439 if os.name == "nt":
440 try:
441 os.system('fsutil sparse setflag "{}" 1'.format(filename))
442 except Exception as e:
443 _logger.info(f'error marking file {filename} as sparse: {e}')
444
445
446 def get_headers_dir(config):
447 return config.path
448
449
450 def assert_datadir_available(config_path):
451 path = config_path
452 if os.path.exists(path):
453 return
454 else:
455 raise FileNotFoundError(
456 'Electrum datadir does not exist. Was it deleted while running?' + '\n' +
457 'Should be at {}'.format(path))
458
459
460 def assert_file_in_datadir_available(path, config_path):
461 if os.path.exists(path):
462 return
463 else:
464 assert_datadir_available(config_path)
465 raise FileNotFoundError(
466 'Cannot find file but datadir is there.' + '\n' +
467 'Should be at {}'.format(path))
468
469
470 def standardize_path(path):
471 return os.path.normcase(
472 os.path.realpath(
473 os.path.abspath(
474 os.path.expanduser(
475 path
476 ))))
477
478
479 def get_new_wallet_name(wallet_folder: str) -> str:
480 i = 1
481 while True:
482 filename = "wallet_%d" % i
483 if filename in os.listdir(wallet_folder):
484 i += 1
485 else:
486 break
487 return filename
488
489
490 def assert_bytes(*args):
491 """
492 porting helper, assert args type
493 """
494 try:
495 for x in args:
496 assert isinstance(x, (bytes, bytearray))
497 except:
498 print('assert bytes failed', list(map(type, args)))
499 raise
500
501
502 def assert_str(*args):
503 """
504 porting helper, assert args type
505 """
506 for x in args:
507 assert isinstance(x, str)
508
509
510 def to_string(x, enc) -> str:
511 if isinstance(x, (bytes, bytearray)):
512 return x.decode(enc)
513 if isinstance(x, str):
514 return x
515 else:
516 raise TypeError("Not a string or bytes like object")
517
518
519 def to_bytes(something, encoding='utf8') -> bytes:
520 """
521 cast string to bytes() like object, but for python2 support it's bytearray copy
522 """
523 if isinstance(something, bytes):
524 return something
525 if isinstance(something, str):
526 return something.encode(encoding)
527 elif isinstance(something, bytearray):
528 return bytes(something)
529 else:
530 raise TypeError("Not a string or bytes like object")
531
532
533 bfh = bytes.fromhex
534
535
536 def bh2u(x: bytes) -> str:
537 """
538 str with hex representation of a bytes-like object
539
540 >>> x = bytes((1, 2, 10))
541 >>> bh2u(x)
542 '01020A'
543 """
544 return x.hex()
545
546
547 def xor_bytes(a: bytes, b: bytes) -> bytes:
548 size = min(len(a), len(b))
549 return ((int.from_bytes(a[:size], "big") ^ int.from_bytes(b[:size], "big"))
550 .to_bytes(size, "big"))
551
552
553 def user_dir():
554 if "ELECTRUMDIR" in os.environ:
555 return os.environ["ELECTRUMDIR"]
556 elif 'ANDROID_DATA' in os.environ:
557 return android_data_dir()
558 elif os.name == 'posix':
559 return os.path.join(os.environ["HOME"], ".electrum")
560 elif "APPDATA" in os.environ:
561 return os.path.join(os.environ["APPDATA"], "Electrum")
562 elif "LOCALAPPDATA" in os.environ:
563 return os.path.join(os.environ["LOCALAPPDATA"], "Electrum")
564 else:
565 #raise Exception("No home directory found in environment variables.")
566 return
567
568
569 def resource_path(*parts):
570 return os.path.join(pkg_dir, *parts)
571
572
573 # absolute path to python package folder of electrum ("lib")
574 pkg_dir = os.path.split(os.path.realpath(__file__))[0]
575
576
577 def is_valid_email(s):
578 regexp = r"[^@]+@[^@]+\.[^@]+"
579 return re.match(regexp, s) is not None
580
581
582 def is_hash256_str(text: Any) -> bool:
583 if not isinstance(text, str): return False
584 if len(text) != 64: return False
585 return is_hex_str(text)
586
587
588 def is_hex_str(text: Any) -> bool:
589 if not isinstance(text, str): return False
590 try:
591 b = bytes.fromhex(text)
592 except:
593 return False
594 # forbid whitespaces in text:
595 if len(text) != 2 * len(b):
596 return False
597 return True
598
599
600 def is_integer(val: Any) -> bool:
601 return isinstance(val, int)
602
603
604 def is_non_negative_integer(val: Any) -> bool:
605 if is_integer(val):
606 return val >= 0
607 return False
608
609
610 def is_int_or_float(val: Any) -> bool:
611 return isinstance(val, (int, float))
612
613
614 def is_non_negative_int_or_float(val: Any) -> bool:
615 if is_int_or_float(val):
616 return val >= 0
617 return False
618
619
620 def chunks(items, size: int):
621 """Break up items, an iterable, into chunks of length size."""
622 if size < 1:
623 raise ValueError(f"size must be positive, not {repr(size)}")
624 for i in range(0, len(items), size):
625 yield items[i: i + size]
626
627
628 def format_satoshis_plain(x, *, decimal_point=8) -> str:
629 """Display a satoshi amount scaled. Always uses a '.' as a decimal
630 point and has no thousands separator"""
631 if x == '!':
632 return 'max'
633 scale_factor = pow(10, decimal_point)
634 return "{:.8f}".format(Decimal(x) / scale_factor).rstrip('0').rstrip('.')
635
636
637 # Check that Decimal precision is sufficient.
638 # We need at the very least ~20, as we deal with msat amounts, and
639 # log10(21_000_000 * 10**8 * 1000) ~= 18.3
640 # decimal.DefaultContext.prec == 28 by default, but it is mutable.
641 # We enforce that we have at least that available.
642 assert decimal.getcontext().prec >= 28, f"PyDecimal precision too low: {decimal.getcontext().prec}"
643
644 DECIMAL_POINT = localeconv()['decimal_point'] # type: str
645
646
647 def format_satoshis(
648 x, # in satoshis
649 *,
650 num_zeros=0,
651 decimal_point=8,
652 precision=None,
653 is_diff=False,
654 whitespaces=False,
655 ) -> str:
656 if x is None:
657 return 'unknown'
658 if x == '!':
659 return 'max'
660 if precision is None:
661 precision = decimal_point
662 # format string
663 decimal_format = "." + str(precision) if precision > 0 else ""
664 if is_diff:
665 decimal_format = '+' + decimal_format
666 # initial result
667 scale_factor = pow(10, decimal_point)
668 if not isinstance(x, Decimal):
669 x = Decimal(x).quantize(Decimal('1E-8'))
670 result = ("{:" + decimal_format + "f}").format(x / scale_factor)
671 if "." not in result: result += "."
672 result = result.rstrip('0')
673 # extra decimal places
674 integer_part, fract_part = result.split(".")
675 if len(fract_part) < num_zeros:
676 fract_part += "0" * (num_zeros - len(fract_part))
677 result = integer_part + DECIMAL_POINT + fract_part
678 # leading/trailing whitespaces
679 if whitespaces:
680 result += " " * (decimal_point - len(fract_part))
681 result = " " * (15 - len(result)) + result
682 return result
683
684
685 FEERATE_PRECISION = 1 # num fractional decimal places for sat/byte fee rates
686 _feerate_quanta = Decimal(10) ** (-FEERATE_PRECISION)
687
688
689 def format_fee_satoshis(fee, *, num_zeros=0, precision=None):
690 if precision is None:
691 precision = FEERATE_PRECISION
692 num_zeros = min(num_zeros, FEERATE_PRECISION) # no more zeroes than available prec
693 return format_satoshis(fee, num_zeros=num_zeros, decimal_point=0, precision=precision)
694
695
696 def quantize_feerate(fee) -> Union[None, Decimal, int]:
697 """Strip sat/byte fee rate of excess precision."""
698 if fee is None:
699 return None
700 return Decimal(fee).quantize(_feerate_quanta, rounding=decimal.ROUND_HALF_DOWN)
701
702
703 def timestamp_to_datetime(timestamp):
704 if timestamp is None:
705 return None
706 return datetime.fromtimestamp(timestamp)
707
708 def format_time(timestamp):
709 date = timestamp_to_datetime(timestamp)
710 return date.isoformat(' ')[:-3] if date else _("Unknown")
711
712
713 # Takes a timestamp and returns a string with the approximation of the age
714 def age(from_date, since_date = None, target_tz=None, include_seconds=False):
715 if from_date is None:
716 return "Unknown"
717
718 from_date = datetime.fromtimestamp(from_date)
719 if since_date is None:
720 since_date = datetime.now(target_tz)
721
722 td = time_difference(from_date - since_date, include_seconds)
723 return td + " ago" if from_date < since_date else "in " + td
724
725
726 def time_difference(distance_in_time, include_seconds):
727 #distance_in_time = since_date - from_date
728 distance_in_seconds = int(round(abs(distance_in_time.days * 86400 + distance_in_time.seconds)))
729 distance_in_minutes = int(round(distance_in_seconds/60))
730
731 if distance_in_minutes == 0:
732 if include_seconds:
733 return "%s seconds" % distance_in_seconds
734 else:
735 return "less than a minute"
736 elif distance_in_minutes < 45:
737 return "%s minutes" % distance_in_minutes
738 elif distance_in_minutes < 90:
739 return "about 1 hour"
740 elif distance_in_minutes < 1440:
741 return "about %d hours" % (round(distance_in_minutes / 60.0))
742 elif distance_in_minutes < 2880:
743 return "1 day"
744 elif distance_in_minutes < 43220:
745 return "%d days" % (round(distance_in_minutes / 1440))
746 elif distance_in_minutes < 86400:
747 return "about 1 month"
748 elif distance_in_minutes < 525600:
749 return "%d months" % (round(distance_in_minutes / 43200))
750 elif distance_in_minutes < 1051200:
751 return "about 1 year"
752 else:
753 return "over %d years" % (round(distance_in_minutes / 525600))
754
755 mainnet_block_explorers = {
756 'Bitupper Explorer': ('https://bitupper.com/en/explorer/bitcoin/',
757 {'tx': 'transactions/', 'addr': 'addresses/'}),
758 'Bitflyer.jp': ('https://chainflyer.bitflyer.jp/',
759 {'tx': 'Transaction/', 'addr': 'Address/'}),
760 'Blockchain.info': ('https://blockchain.com/btc/',
761 {'tx': 'tx/', 'addr': 'address/'}),
762 'blockchainbdgpzk.onion': ('https://blockchainbdgpzk.onion/',
763 {'tx': 'tx/', 'addr': 'address/'}),
764 'Blockstream.info': ('https://blockstream.info/',
765 {'tx': 'tx/', 'addr': 'address/'}),
766 'Bitaps.com': ('https://btc.bitaps.com/',
767 {'tx': '', 'addr': ''}),
768 'BTC.com': ('https://btc.com/',
769 {'tx': '', 'addr': ''}),
770 'Chain.so': ('https://www.chain.so/',
771 {'tx': 'tx/BTC/', 'addr': 'address/BTC/'}),
772 'Insight.is': ('https://insight.bitpay.com/',
773 {'tx': 'tx/', 'addr': 'address/'}),
774 'TradeBlock.com': ('https://tradeblock.com/blockchain/',
775 {'tx': 'tx/', 'addr': 'address/'}),
776 'BlockCypher.com': ('https://live.blockcypher.com/btc/',
777 {'tx': 'tx/', 'addr': 'address/'}),
778 'Blockchair.com': ('https://blockchair.com/bitcoin/',
779 {'tx': 'transaction/', 'addr': 'address/'}),
780 'blockonomics.co': ('https://www.blockonomics.co/',
781 {'tx': 'api/tx?txid=', 'addr': '#/search?q='}),
782 'mempool.space': ('https://mempool.space/',
783 {'tx': 'tx/', 'addr': 'address/'}),
784 'mempool.emzy.de': ('https://mempool.emzy.de/',
785 {'tx': 'tx/', 'addr': 'address/'}),
786 'OXT.me': ('https://oxt.me/',
787 {'tx': 'transaction/', 'addr': 'address/'}),
788 'smartbit.com.au': ('https://www.smartbit.com.au/',
789 {'tx': 'tx/', 'addr': 'address/'}),
790 'mynode.local': ('http://mynode.local:3002/',
791 {'tx': 'tx/', 'addr': 'address/'}),
792 'system default': ('blockchain:/',
793 {'tx': 'tx/', 'addr': 'address/'}),
794 }
795
796 testnet_block_explorers = {
797 'Bitaps.com': ('https://tbtc.bitaps.com/',
798 {'tx': '', 'addr': ''}),
799 'BlockCypher.com': ('https://live.blockcypher.com/btc-testnet/',
800 {'tx': 'tx/', 'addr': 'address/'}),
801 'Blockchain.info': ('https://www.blockchain.com/btc-testnet/',
802 {'tx': 'tx/', 'addr': 'address/'}),
803 'Blockstream.info': ('https://blockstream.info/testnet/',
804 {'tx': 'tx/', 'addr': 'address/'}),
805 'mempool.space': ('https://mempool.space/testnet/',
806 {'tx': 'tx/', 'addr': 'address/'}),
807 'smartbit.com.au': ('https://testnet.smartbit.com.au/',
808 {'tx': 'tx/', 'addr': 'address/'}),
809 'system default': ('blockchain://000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943/',
810 {'tx': 'tx/', 'addr': 'address/'}),
811 }
812
813 _block_explorer_default_api_loc = {'tx': 'tx/', 'addr': 'address/'}
814
815
816 def block_explorer_info():
817 from . import constants
818 return mainnet_block_explorers if not constants.net.TESTNET else testnet_block_explorers
819
820
821 def block_explorer(config: 'SimpleConfig') -> Optional[str]:
822 """Returns name of selected block explorer,
823 or None if a custom one (not among hardcoded ones) is configured.
824 """
825 if config.get('block_explorer_custom') is not None:
826 return None
827 default_ = 'Blockstream.info'
828 be_key = config.get('block_explorer', default_)
829 be_tuple = block_explorer_info().get(be_key)
830 if be_tuple is None:
831 be_key = default_
832 assert isinstance(be_key, str), f"{be_key!r} should be str"
833 return be_key
834
835
836 def block_explorer_tuple(config: 'SimpleConfig') -> Optional[Tuple[str, dict]]:
837 custom_be = config.get('block_explorer_custom')
838 if custom_be:
839 if isinstance(custom_be, str):
840 return custom_be, _block_explorer_default_api_loc
841 if isinstance(custom_be, (tuple, list)) and len(custom_be) == 2:
842 return tuple(custom_be)
843 _logger.warning(f"not using 'block_explorer_custom' from config. "
844 f"expected a str or a pair but got {custom_be!r}")
845 return None
846 else:
847 # using one of the hardcoded block explorers
848 return block_explorer_info().get(block_explorer(config))
849
850
851 def block_explorer_URL(config: 'SimpleConfig', kind: str, item: str) -> Optional[str]:
852 be_tuple = block_explorer_tuple(config)
853 if not be_tuple:
854 return
855 explorer_url, explorer_dict = be_tuple
856 kind_str = explorer_dict.get(kind)
857 if kind_str is None:
858 return
859 if explorer_url[-1] != "/":
860 explorer_url += "/"
861 url_parts = [explorer_url, kind_str, item]
862 return ''.join(url_parts)
863
864 # URL decode
865 #_ud = re.compile('%([0-9a-hA-H]{2})', re.MULTILINE)
866 #urldecode = lambda x: _ud.sub(lambda m: chr(int(m.group(1), 16)), x)
867
868
869 # note: when checking against these, use .lower() to support case-insensitivity
870 BITCOIN_BIP21_URI_SCHEME = 'bitcoin'
871 LIGHTNING_URI_SCHEME = 'lightning'
872
873
874 class InvalidBitcoinURI(Exception): pass
875
876
877 # TODO rename to parse_bip21_uri or similar
878 def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict:
879 """Raises InvalidBitcoinURI on malformed URI."""
880 from . import bitcoin
881 from .bitcoin import COIN
882
883 if not isinstance(uri, str):
884 raise InvalidBitcoinURI(f"expected string, not {repr(uri)}")
885
886 if ':' not in uri:
887 if not bitcoin.is_address(uri):
888 raise InvalidBitcoinURI("Not a bitcoin address")
889 return {'address': uri}
890
891 u = urllib.parse.urlparse(uri)
892 if u.scheme.lower() != BITCOIN_BIP21_URI_SCHEME:
893 raise InvalidBitcoinURI("Not a bitcoin URI")
894 address = u.path
895
896 # python for android fails to parse query
897 if address.find('?') > 0:
898 address, query = u.path.split('?')
899 pq = urllib.parse.parse_qs(query)
900 else:
901 pq = urllib.parse.parse_qs(u.query)
902
903 for k, v in pq.items():
904 if len(v) != 1:
905 raise InvalidBitcoinURI(f'Duplicate Key: {repr(k)}')
906
907 out = {k: v[0] for k, v in pq.items()}
908 if address:
909 if not bitcoin.is_address(address):
910 raise InvalidBitcoinURI(f"Invalid bitcoin address: {address}")
911 out['address'] = address
912 if 'amount' in out:
913 am = out['amount']
914 try:
915 m = re.match(r'([0-9.]+)X([0-9])', am)
916 if m:
917 k = int(m.group(2)) - 8
918 amount = Decimal(m.group(1)) * pow( Decimal(10) , k)
919 else:
920 amount = Decimal(am) * COIN
921 out['amount'] = int(amount)
922 except Exception as e:
923 raise InvalidBitcoinURI(f"failed to parse 'amount' field: {repr(e)}") from e
924 if 'message' in out:
925 out['message'] = out['message']
926 out['memo'] = out['message']
927 if 'time' in out:
928 try:
929 out['time'] = int(out['time'])
930 except Exception as e:
931 raise InvalidBitcoinURI(f"failed to parse 'time' field: {repr(e)}") from e
932 if 'exp' in out:
933 try:
934 out['exp'] = int(out['exp'])
935 except Exception as e:
936 raise InvalidBitcoinURI(f"failed to parse 'exp' field: {repr(e)}") from e
937 if 'sig' in out:
938 try:
939 out['sig'] = bh2u(bitcoin.base_decode(out['sig'], base=58))
940 except Exception as e:
941 raise InvalidBitcoinURI(f"failed to parse 'sig' field: {repr(e)}") from e
942
943 r = out.get('r')
944 sig = out.get('sig')
945 name = out.get('name')
946 if on_pr and (r or (name and sig)):
947 @log_exceptions
948 async def get_payment_request():
949 from . import paymentrequest as pr
950 if name and sig:
951 s = pr.serialize_request(out).SerializeToString()
952 request = pr.PaymentRequest(s)
953 else:
954 request = await pr.get_payment_request(r)
955 if on_pr:
956 on_pr(request)
957 loop = loop or asyncio.get_event_loop()
958 asyncio.run_coroutine_threadsafe(get_payment_request(), loop)
959
960 return out
961
962
963 def create_bip21_uri(addr, amount_sat: Optional[int], message: Optional[str],
964 *, extra_query_params: Optional[dict] = None) -> str:
965 from . import bitcoin
966 if not bitcoin.is_address(addr):
967 return ""
968 if extra_query_params is None:
969 extra_query_params = {}
970 query = []
971 if amount_sat:
972 query.append('amount=%s'%format_satoshis_plain(amount_sat))
973 if message:
974 query.append('message=%s'%urllib.parse.quote(message))
975 for k, v in extra_query_params.items():
976 if not isinstance(k, str) or k != urllib.parse.quote(k):
977 raise Exception(f"illegal key for URI: {repr(k)}")
978 v = urllib.parse.quote(v)
979 query.append(f"{k}={v}")
980 p = urllib.parse.ParseResult(
981 scheme=BITCOIN_BIP21_URI_SCHEME,
982 netloc='',
983 path=addr,
984 params='',
985 query='&'.join(query),
986 fragment='',
987 )
988 return str(urllib.parse.urlunparse(p))
989
990
991 def maybe_extract_bolt11_invoice(data: str) -> Optional[str]:
992 data = data.strip() # whitespaces
993 data = data.lower()
994 if data.startswith(LIGHTNING_URI_SCHEME + ':ln'):
995 data = data[10:]
996 if data.startswith('ln'):
997 return data
998 return None
999
1000
1001 # Python bug (http://bugs.python.org/issue1927) causes raw_input
1002 # to be redirected improperly between stdin/stderr on Unix systems
1003 #TODO: py3
1004 def raw_input(prompt=None):
1005 if prompt:
1006 sys.stdout.write(prompt)
1007 return builtin_raw_input()
1008
1009 builtin_raw_input = builtins.input
1010 builtins.input = raw_input
1011
1012
1013 def parse_json(message):
1014 # TODO: check \r\n pattern
1015 n = message.find(b'\n')
1016 if n==-1:
1017 return None, message
1018 try:
1019 j = json.loads(message[0:n].decode('utf8'))
1020 except:
1021 j = None
1022 return j, message[n+1:]
1023
1024
1025 def setup_thread_excepthook():
1026 """
1027 Workaround for `sys.excepthook` thread bug from:
1028 http://bugs.python.org/issue1230540
1029
1030 Call once from the main thread before creating any threads.
1031 """
1032
1033 init_original = threading.Thread.__init__
1034
1035 def init(self, *args, **kwargs):
1036
1037 init_original(self, *args, **kwargs)
1038 run_original = self.run
1039
1040 def run_with_except_hook(*args2, **kwargs2):
1041 try:
1042 run_original(*args2, **kwargs2)
1043 except Exception:
1044 sys.excepthook(*sys.exc_info())
1045
1046 self.run = run_with_except_hook
1047
1048 threading.Thread.__init__ = init
1049
1050
1051 def send_exception_to_crash_reporter(e: BaseException):
1052 sys.excepthook(type(e), e, e.__traceback__)
1053
1054
1055 def versiontuple(v):
1056 return tuple(map(int, (v.split("."))))
1057
1058
1059 def read_json_file(path):
1060 try:
1061 with open(path, 'r', encoding='utf-8') as f:
1062 data = json.loads(f.read())
1063 #backwards compatibility for JSONDecodeError
1064 except ValueError:
1065 _logger.exception('')
1066 raise FileImportFailed(_("Invalid JSON code."))
1067 except BaseException as e:
1068 _logger.exception('')
1069 raise FileImportFailed(e)
1070 return data
1071
1072 def write_json_file(path, data):
1073 try:
1074 with open(path, 'w+', encoding='utf-8') as f:
1075 json.dump(data, f, indent=4, sort_keys=True, cls=MyEncoder)
1076 except (IOError, os.error) as e:
1077 _logger.exception('')
1078 raise FileExportFailed(e)
1079
1080
1081 def make_dir(path, allow_symlink=True):
1082 """Make directory if it does not yet exist."""
1083 if not os.path.exists(path):
1084 if not allow_symlink and os.path.islink(path):
1085 raise Exception('Dangling link: ' + path)
1086 os.mkdir(path)
1087 os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1088
1089
1090 def log_exceptions(func):
1091 """Decorator to log AND re-raise exceptions."""
1092 assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
1093 @functools.wraps(func)
1094 async def wrapper(*args, **kwargs):
1095 self = args[0] if len(args) > 0 else None
1096 try:
1097 return await func(*args, **kwargs)
1098 except asyncio.CancelledError as e:
1099 raise
1100 except BaseException as e:
1101 mylogger = self.logger if hasattr(self, 'logger') else _logger
1102 try:
1103 mylogger.exception(f"Exception in {func.__name__}: {repr(e)}")
1104 except BaseException as e2:
1105 print(f"logging exception raised: {repr(e2)}... orig exc: {repr(e)} in {func.__name__}")
1106 raise
1107 return wrapper
1108
1109
1110 def ignore_exceptions(func):
1111 """Decorator to silently swallow all exceptions."""
1112 assert asyncio.iscoroutinefunction(func), 'func needs to be a coroutine'
1113 @functools.wraps(func)
1114 async def wrapper(*args, **kwargs):
1115 try:
1116 return await func(*args, **kwargs)
1117 except asyncio.CancelledError:
1118 # note: with python 3.8, CancelledError no longer inherits Exception, so this catch is redundant
1119 raise
1120 except Exception as e:
1121 pass
1122 return wrapper
1123
1124
1125 class TxMinedInfo(NamedTuple):
1126 height: int # height of block that mined tx
1127 conf: Optional[int] = None # number of confirmations, SPV verified (None means unknown)
1128 timestamp: Optional[int] = None # timestamp of block that mined tx
1129 txpos: Optional[int] = None # position of tx in serialized block
1130 header_hash: Optional[str] = None # hash of block that mined tx
1131
1132
1133 def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None):
1134 if headers is None:
1135 headers = {'User-Agent': 'Electrum'}
1136 if timeout is None:
1137 # The default timeout is high intentionally.
1138 # DNS on some systems can be really slow, see e.g. #5337
1139 timeout = aiohttp.ClientTimeout(total=45)
1140 elif isinstance(timeout, (int, float)):
1141 timeout = aiohttp.ClientTimeout(total=timeout)
1142 ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
1143
1144 if proxy:
1145 connector = ProxyConnector(
1146 proxy_type=ProxyType.SOCKS5 if proxy['mode'] == 'socks5' else ProxyType.SOCKS4,
1147 host=proxy['host'],
1148 port=int(proxy['port']),
1149 username=proxy.get('user', None),
1150 password=proxy.get('password', None),
1151 rdns=True,
1152 ssl=ssl_context,
1153 )
1154 else:
1155 connector = aiohttp.TCPConnector(ssl=ssl_context)
1156
1157 return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
1158
1159
1160 class SilentTaskGroup(TaskGroup):
1161
1162 def spawn(self, *args, **kwargs):
1163 # don't complain if group is already closed.
1164 if self._closed:
1165 raise asyncio.CancelledError()
1166 return super().spawn(*args, **kwargs)
1167
1168
1169 class NetworkJobOnDefaultServer(Logger, ABC):
1170 """An abstract base class for a job that runs on the main network
1171 interface. Every time the main interface changes, the job is
1172 restarted, and some of its internals are reset.
1173 """
1174 def __init__(self, network: 'Network'):
1175 Logger.__init__(self)
1176 asyncio.set_event_loop(network.asyncio_loop)
1177 self.network = network
1178 self.interface = None # type: Interface
1179 self._restart_lock = asyncio.Lock()
1180 # Ensure fairness between NetworkJobs. e.g. if multiple wallets
1181 # are open, a large wallet's Synchronizer should not starve the small wallets:
1182 self._network_request_semaphore = asyncio.Semaphore(100)
1183
1184 self._reset()
1185 # every time the main interface changes, restart:
1186 register_callback(self._restart, ['default_server_changed'])
1187 # also schedule a one-off restart now, as there might already be a main interface:
1188 asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
1189
1190 def _reset(self):
1191 """Initialise fields. Called every time the underlying
1192 server connection changes.
1193 """
1194 self.taskgroup = SilentTaskGroup()
1195
1196 async def _start(self, interface: 'Interface'):
1197 self.interface = interface
1198 await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup))
1199
1200 @abstractmethod
1201 async def _run_tasks(self, *, taskgroup: TaskGroup) -> None:
1202 """Start tasks in taskgroup. Called every time the underlying
1203 server connection changes.
1204 """
1205 # If self.taskgroup changed, don't start tasks. This can happen if we have
1206 # been restarted *just now*, i.e. after the _run_tasks coroutine object was created.
1207 if taskgroup != self.taskgroup:
1208 raise asyncio.CancelledError()
1209
1210 async def stop(self, *, full_shutdown: bool = True):
1211 if full_shutdown:
1212 unregister_callback(self._restart)
1213 await self.taskgroup.cancel_remaining()
1214
1215 @log_exceptions
1216 async def _restart(self, *args):
1217 interface = self.network.interface
1218 if interface is None:
1219 return # we should get called again soon
1220
1221 async with self._restart_lock:
1222 await self.stop(full_shutdown=False)
1223 self._reset()
1224 await self._start(interface)
1225
1226 @property
1227 def session(self):
1228 # ORIG: s = self.interface.session
1229 # TODO: libbitcoin
1230 s = self.interface.client
1231 assert s is not None
1232 return s
1233
1234
1235 def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
1236 asyncio.Future,
1237 threading.Thread]:
1238 def on_exception(loop, context):
1239 """Suppress spurious messages it appears we cannot control."""
1240 SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
1241 'SSL error in data received')
1242 message = context.get('message')
1243 if message and SUPPRESS_MESSAGE_REGEX.match(message):
1244 return
1245 loop.default_exception_handler(context)
1246
1247 loop = asyncio.get_event_loop()
1248 loop.set_exception_handler(on_exception)
1249 # loop.set_debug(1)
1250 stopping_fut = asyncio.Future()
1251 loop_thread = threading.Thread(target=loop.run_until_complete,
1252 args=(stopping_fut,),
1253 name='EventLoop')
1254 loop_thread.start()
1255 loop._mythread = loop_thread
1256 return loop, stopping_fut, loop_thread
1257
1258
1259 class OrderedDictWithIndex(OrderedDict):
1260 """An OrderedDict that keeps track of the positions of keys.
1261
1262 Note: very inefficient to modify contents, except to add new items.
1263 """
1264
1265 def __init__(self):
1266 super().__init__()
1267 self._key_to_pos = {}
1268 self._pos_to_key = {}
1269
1270 def _recalc_index(self):
1271 self._key_to_pos = {key: pos for (pos, key) in enumerate(self.keys())}
1272 self._pos_to_key = {pos: key for (pos, key) in enumerate(self.keys())}
1273
1274 def pos_from_key(self, key):
1275 return self._key_to_pos[key]
1276
1277 def value_from_pos(self, pos):
1278 key = self._pos_to_key[pos]
1279 return self[key]
1280
1281 def popitem(self, *args, **kwargs):
1282 ret = super().popitem(*args, **kwargs)
1283 self._recalc_index()
1284 return ret
1285
1286 def move_to_end(self, *args, **kwargs):
1287 ret = super().move_to_end(*args, **kwargs)
1288 self._recalc_index()
1289 return ret
1290
1291 def clear(self):
1292 ret = super().clear()
1293 self._recalc_index()
1294 return ret
1295
1296 def pop(self, *args, **kwargs):
1297 ret = super().pop(*args, **kwargs)
1298 self._recalc_index()
1299 return ret
1300
1301 def update(self, *args, **kwargs):
1302 ret = super().update(*args, **kwargs)
1303 self._recalc_index()
1304 return ret
1305
1306 def __delitem__(self, *args, **kwargs):
1307 ret = super().__delitem__(*args, **kwargs)
1308 self._recalc_index()
1309 return ret
1310
1311 def __setitem__(self, key, *args, **kwargs):
1312 is_new_key = key not in self
1313 ret = super().__setitem__(key, *args, **kwargs)
1314 if is_new_key:
1315 pos = len(self) - 1
1316 self._key_to_pos[key] = pos
1317 self._pos_to_key[pos] = key
1318 return ret
1319
1320
1321 def multisig_type(wallet_type):
1322 '''If wallet_type is mofn multi-sig, return [m, n],
1323 otherwise return None.'''
1324 if not wallet_type:
1325 return None
1326 match = re.match(r'(\d+)of(\d+)', wallet_type)
1327 if match:
1328 match = [int(x) for x in match.group(1, 2)]
1329 return match
1330
1331
1332 def is_ip_address(x: Union[str, bytes]) -> bool:
1333 if isinstance(x, bytes):
1334 x = x.decode("utf-8")
1335 try:
1336 ipaddress.ip_address(x)
1337 return True
1338 except ValueError:
1339 return False
1340
1341
1342 def is_private_netaddress(host: str) -> bool:
1343 if str(host) in ('localhost', 'localhost.',):
1344 return True
1345 if host[0] == '[' and host[-1] == ']': # IPv6
1346 host = host[1:-1]
1347 try:
1348 ip_addr = ipaddress.ip_address(host) # type: Union[IPv4Address, IPv6Address]
1349 return ip_addr.is_private
1350 except ValueError:
1351 pass # not an IP
1352 return False
1353
1354
1355 def list_enabled_bits(x: int) -> Sequence[int]:
1356 """e.g. 77 (0b1001101) --> (0, 2, 3, 6)"""
1357 binary = bin(x)[2:]
1358 rev_bin = reversed(binary)
1359 return tuple(i for i, b in enumerate(rev_bin) if b == '1')
1360
1361
1362 def resolve_dns_srv(host: str):
1363 srv_records = dns.resolver.resolve(host, 'SRV')
1364 # priority: prefer lower
1365 # weight: tie breaker; prefer higher
1366 srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
1367
1368 def dict_from_srv_record(srv):
1369 return {
1370 'host': str(srv.target),
1371 'port': srv.port,
1372 }
1373 return [dict_from_srv_record(srv) for srv in srv_records]
1374
1375
1376 def randrange(bound: int) -> int:
1377 """Return a random integer k such that 1 <= k < bound, uniformly
1378 distributed across that range."""
1379 # secrets.randbelow(bound) returns a random int: 0 <= r < bound,
1380 # hence transformations:
1381 return secrets.randbelow(bound - 1) + 1
1382
1383
1384 class CallbackManager:
1385 # callbacks set by the GUI or any thread
1386 # guarantee: the callbacks will always get triggered from the asyncio thread.
1387
1388 def __init__(self):
1389 self.callback_lock = threading.Lock()
1390 self.callbacks = defaultdict(list) # note: needs self.callback_lock
1391 self.asyncio_loop = None
1392
1393 def register_callback(self, callback, events):
1394 with self.callback_lock:
1395 for event in events:
1396 self.callbacks[event].append(callback)
1397
1398 def unregister_callback(self, callback):
1399 with self.callback_lock:
1400 for callbacks in self.callbacks.values():
1401 if callback in callbacks:
1402 callbacks.remove(callback)
1403
1404 def trigger_callback(self, event, *args):
1405 """Trigger a callback with given arguments.
1406 Can be called from any thread. The callback itself will get scheduled
1407 on the event loop.
1408 """
1409 if self.asyncio_loop is None:
1410 self.asyncio_loop = asyncio.get_event_loop()
1411 assert self.asyncio_loop.is_running(), "event loop not running"
1412 with self.callback_lock:
1413 callbacks = self.callbacks[event][:]
1414 for callback in callbacks:
1415 # FIXME: if callback throws, we will lose the traceback
1416 if asyncio.iscoroutinefunction(callback):
1417 asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
1418 else:
1419 self.asyncio_loop.call_soon_threadsafe(callback, event, *args)
1420
1421
1422 callback_mgr = CallbackManager()
1423 trigger_callback = callback_mgr.trigger_callback
1424 register_callback = callback_mgr.register_callback
1425 unregister_callback = callback_mgr.unregister_callback
1426
1427
1428 _NetAddrType = TypeVar("_NetAddrType")
1429
1430
1431 class NetworkRetryManager(Generic[_NetAddrType]):
1432 """Truncated Exponential Backoff for network connections."""
1433
1434 def __init__(
1435 self, *,
1436 max_retry_delay_normal: float,
1437 init_retry_delay_normal: float,
1438 max_retry_delay_urgent: float = None,
1439 init_retry_delay_urgent: float = None,
1440 ):
1441 self._last_tried_addr = {} # type: Dict[_NetAddrType, Tuple[float, int]] # (unix ts, num_attempts)
1442
1443 # note: these all use "seconds" as unit
1444 if max_retry_delay_urgent is None:
1445 max_retry_delay_urgent = max_retry_delay_normal
1446 if init_retry_delay_urgent is None:
1447 init_retry_delay_urgent = init_retry_delay_normal
1448 self._max_retry_delay_normal = max_retry_delay_normal
1449 self._init_retry_delay_normal = init_retry_delay_normal
1450 self._max_retry_delay_urgent = max_retry_delay_urgent
1451 self._init_retry_delay_urgent = init_retry_delay_urgent
1452
1453 def _trying_addr_now(self, addr: _NetAddrType) -> None:
1454 last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
1455 # we add up to 1 second of noise to the time, so that clients are less likely
1456 # to get synchronised and bombard the remote in connection waves:
1457 cur_time = time.time() + random.random()
1458 self._last_tried_addr[addr] = cur_time, num_attempts + 1
1459
1460 def _on_connection_successfully_established(self, addr: _NetAddrType) -> None:
1461 self._last_tried_addr[addr] = time.time(), 0
1462
1463 def _can_retry_addr(self, addr: _NetAddrType, *,
1464 now: float = None, urgent: bool = False) -> bool:
1465 if now is None:
1466 now = time.time()
1467 last_time, num_attempts = self._last_tried_addr.get(addr, (0, 0))
1468 if urgent:
1469 max_delay = self._max_retry_delay_urgent
1470 init_delay = self._init_retry_delay_urgent
1471 else:
1472 max_delay = self._max_retry_delay_normal
1473 init_delay = self._init_retry_delay_normal
1474 delay = self.__calc_delay(multiplier=init_delay, max_delay=max_delay, num_attempts=num_attempts)
1475 next_time = last_time + delay
1476 return next_time < now
1477
1478 @classmethod
1479 def __calc_delay(cls, *, multiplier: float, max_delay: float,
1480 num_attempts: int) -> float:
1481 num_attempts = min(num_attempts, 100_000)
1482 try:
1483 res = multiplier * 2 ** num_attempts
1484 except OverflowError:
1485 return max_delay
1486 return max(0, min(max_delay, res))
1487
1488 def _clear_addr_retry_times(self) -> None:
1489 self._last_tried_addr.clear()
1490
1491
1492 class MySocksProxy(aiorpcx.SOCKSProxy):
1493
1494 async def open_connection(self, host=None, port=None, **kwargs):
1495 loop = asyncio.get_event_loop()
1496 reader = asyncio.StreamReader(loop=loop)
1497 protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
1498 transport, _ = await self.create_connection(
1499 lambda: protocol, host, port, **kwargs)
1500 writer = asyncio.StreamWriter(transport, protocol, reader, loop)
1501 return reader, writer
1502
1503 @classmethod
1504 def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']:
1505 if not proxy:
1506 return None
1507 username, pw = proxy.get('user'), proxy.get('password')
1508 if not username or not pw:
1509 auth = None
1510 else:
1511 auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
1512 addr = aiorpcx.NetAddress(proxy['host'], proxy['port'])
1513 if proxy['mode'] == "socks4":
1514 ret = cls(addr, aiorpcx.socks.SOCKS4a, auth)
1515 elif proxy['mode'] == "socks5":
1516 ret = cls(addr, aiorpcx.socks.SOCKS5, auth)
1517 else:
1518 raise NotImplementedError # http proxy not available with aiorpcx
1519 return ret
1520
1521
1522 class JsonRPCClient:
1523
1524 def __init__(self, session: aiohttp.ClientSession, url: str):
1525 self.session = session
1526 self.url = url
1527 self._id = 0
1528
1529 async def request(self, endpoint, *args):
1530 # TODO: libbitcoin
1531 self._id += 1
1532 data = ('{"jsonrpc": "2.0", "id":"%d", "method": "%s", "params": %s }'
1533 % (self._id, endpoint, json.dumps(args)))
1534 async with self.session.post(self.url, data=data) as resp:
1535 if resp.status == 200:
1536 r = await resp.json()
1537 result = r.get('result')
1538 error = r.get('error')
1539 if error:
1540 return 'Error: ' + str(error)
1541 else:
1542 return result
1543 else:
1544 text = await resp.text()
1545 return 'Error: ' + str(text)
1546
1547 def add_method(self, endpoint):
1548 async def coro(*args):
1549 return await self.request(endpoint, *args)
1550 setattr(self, endpoint, coro)
1551
1552
1553 T = TypeVar('T')
1554
1555 def random_shuffled_copy(x: Iterable[T]) -> List[T]:
1556 """Returns a shuffled copy of the input."""
1557 x_copy = list(x) # copy
1558 random.shuffle(x_copy) # shuffle in-place
1559 return x_copy
1560
1561
1562 def test_read_write_permissions(path) -> None:
1563 # note: There might already be a file at 'path'.
1564 # Make sure we do NOT overwrite/corrupt that!
1565 temp_path = "%s.tmptest.%s" % (path, os.getpid())
1566 echo = "fs r/w test"
1567 try:
1568 # test READ permissions for actual path
1569 if os.path.exists(path):
1570 with open(path, "rb") as f:
1571 f.read(1) # read 1 byte
1572 # test R/W sanity for "similar" path
1573 with open(temp_path, "w", encoding='utf-8') as f:
1574 f.write(echo)
1575 with open(temp_path, "r", encoding='utf-8') as f:
1576 echo2 = f.read()
1577 os.remove(temp_path)
1578 except Exception as e:
1579 raise IOError(e) from e
1580 if echo != echo2:
1581 raise IOError('echo sanity-check failed')
1582
1583
1584 class nullcontext:
1585 """Context manager that does no additional processing.
1586 This is a ~backport of contextlib.nullcontext from Python 3.10
1587 """
1588
1589 def __init__(self, enter_result=None):
1590 self.enter_result = enter_result
1591
1592 def __enter__(self):
1593 return self.enter_result
1594
1595 def __exit__(self, *excinfo):
1596 pass
1597
1598 async def __aenter__(self):
1599 return self.enter_result
1600
1601 async def __aexit__(self, *excinfo):
1602 pass