ttransaction.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
ttransaction.py (87885B)
---
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 Thomas Voegtlin
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation files
8 # (the "Software"), to deal in the Software without restriction,
9 # including without limitation the rights to use, copy, modify, merge,
10 # publish, distribute, sublicense, and/or sell copies of the Software,
11 # and to permit persons to whom the Software is furnished to do so,
12 # subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25
26
27
28 # Note: The deserialization code originally comes from ABE.
29
30 import struct
31 import traceback
32 import sys
33 import io
34 import base64
35 from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable,
36 Callable, List, Dict, Set, TYPE_CHECKING)
37 from collections import defaultdict
38 from enum import IntEnum
39 import itertools
40 import binascii
41 import copy
42
43 from . import ecc, bitcoin, constants, segwit_addr, bip32
44 from .bip32 import BIP32Node
45 from .util import profiler, to_bytes, bh2u, bfh, chunks, is_hex_str
46 from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160,
47 hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr,
48 var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN,
49 int_to_hex, push_script, b58_address_to_hash160,
50 opcodes, add_number_to_script, base_decode, is_segwit_script_type,
51 base_encode, construct_witness, construct_script)
52 from .crypto import sha256d
53 from .logging import get_logger
54
55 if TYPE_CHECKING:
56 from .wallet import Abstract_Wallet
57
58
59 _logger = get_logger(__name__)
60 DEBUG_PSBT_PARSING = False
61
62
63 class SerializationError(Exception):
64 """ Thrown when there's a problem deserializing or serializing """
65
66
67 class UnknownTxinType(Exception):
68 pass
69
70
71 class BadHeaderMagic(SerializationError):
72 pass
73
74
75 class UnexpectedEndOfStream(SerializationError):
76 pass
77
78
79 class PSBTInputConsistencyFailure(SerializationError):
80 pass
81
82
83 class MalformedBitcoinScript(Exception):
84 pass
85
86
87 class MissingTxInputAmount(Exception):
88 pass
89
90
91 SIGHASH_ALL = 1
92
93
94 class TxOutput:
95 scriptpubkey: bytes
96 value: Union[int, str]
97
98 def __init__(self, *, scriptpubkey: bytes, value: Union[int, str]):
99 self.scriptpubkey = scriptpubkey
100 self.value = value # str when the output is set to max: '!' # in satoshis
101
102 @classmethod
103 def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
104 return cls(scriptpubkey=bfh(bitcoin.address_to_script(address)),
105 value=value)
106
107 def serialize_to_network(self) -> bytes:
108 buf = int.to_bytes(self.value, 8, byteorder="little", signed=False)
109 script = self.scriptpubkey
110 buf += bfh(var_int(len(script.hex()) // 2))
111 buf += script
112 return buf
113
114 @classmethod
115 def from_network_bytes(cls, raw: bytes) -> 'TxOutput':
116 vds = BCDataStream()
117 vds.write(raw)
118 txout = parse_output(vds)
119 if vds.can_read_more():
120 raise SerializationError('extra junk at the end of TxOutput bytes')
121 return txout
122
123 def to_legacy_tuple(self) -> Tuple[int, str, Union[int, str]]:
124 if self.address:
125 return TYPE_ADDRESS, self.address, self.value
126 return TYPE_SCRIPT, self.scriptpubkey.hex(), self.value
127
128 @classmethod
129 def from_legacy_tuple(cls, _type: int, addr: str, val: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
130 if _type == TYPE_ADDRESS:
131 return cls.from_address_and_value(addr, val)
132 if _type == TYPE_SCRIPT:
133 return cls(scriptpubkey=bfh(addr), value=val)
134 raise Exception(f"unexptected legacy address type: {_type}")
135
136 @property
137 def address(self) -> Optional[str]:
138 return get_address_from_output_script(self.scriptpubkey) # TODO cache this?
139
140 def get_ui_address_str(self) -> str:
141 addr = self.address
142 if addr is not None:
143 return addr
144 return f"SCRIPT {self.scriptpubkey.hex()}"
145
146 def __repr__(self):
147 return f"<TxOutput script={self.scriptpubkey.hex()} address={self.address} value={self.value}>"
148
149 def __eq__(self, other):
150 if not isinstance(other, TxOutput):
151 return False
152 return self.scriptpubkey == other.scriptpubkey and self.value == other.value
153
154 def __ne__(self, other):
155 return not (self == other)
156
157 def to_json(self):
158 d = {
159 'scriptpubkey': self.scriptpubkey.hex(),
160 'address': self.address,
161 'value_sats': self.value,
162 }
163 return d
164
165
166 class BIP143SharedTxDigestFields(NamedTuple):
167 hashPrevouts: str
168 hashSequence: str
169 hashOutputs: str
170
171
172 class TxOutpoint(NamedTuple):
173 txid: bytes # endianness same as hex string displayed; reverse of tx serialization order
174 out_idx: int
175
176 @classmethod
177 def from_str(cls, s: str) -> 'TxOutpoint':
178 hash_str, idx_str = s.split(':')
179 assert len(hash_str) == 64, f"{hash_str} should be a sha256 hash"
180 return TxOutpoint(txid=bfh(hash_str),
181 out_idx=int(idx_str))
182
183 def to_str(self) -> str:
184 return f"{self.txid.hex()}:{self.out_idx}"
185
186 def to_json(self):
187 return [self.txid.hex(), self.out_idx]
188
189 def serialize_to_network(self) -> bytes:
190 return self.txid[::-1] + bfh(int_to_hex(self.out_idx, 4))
191
192 def is_coinbase(self) -> bool:
193 return self.txid == bytes(32)
194
195
196 class TxInput:
197 prevout: TxOutpoint
198 script_sig: Optional[bytes]
199 nsequence: int
200 witness: Optional[bytes]
201 _is_coinbase_output: bool
202
203 def __init__(self, *,
204 prevout: TxOutpoint,
205 script_sig: bytes = None,
206 nsequence: int = 0xffffffff - 1,
207 witness: bytes = None,
208 is_coinbase_output: bool = False):
209 self.prevout = prevout
210 self.script_sig = script_sig
211 self.nsequence = nsequence
212 self.witness = witness
213 self._is_coinbase_output = is_coinbase_output
214
215 def is_coinbase_input(self) -> bool:
216 """Whether this is the input of a coinbase tx."""
217 return self.prevout.is_coinbase()
218
219 def is_coinbase_output(self) -> bool:
220 """Whether the coin being spent is an output of a coinbase tx.
221 This matters for coin maturity.
222 """
223 return self._is_coinbase_output
224
225 def value_sats(self) -> Optional[int]:
226 return None
227
228 def to_json(self):
229 d = {
230 'prevout_hash': self.prevout.txid.hex(),
231 'prevout_n': self.prevout.out_idx,
232 'coinbase': self.is_coinbase_output(),
233 'nsequence': self.nsequence,
234 }
235 if self.script_sig is not None:
236 d['scriptSig'] = self.script_sig.hex()
237 if self.witness is not None:
238 d['witness'] = self.witness.hex()
239 return d
240
241 def witness_elements(self)-> Sequence[bytes]:
242 vds = BCDataStream()
243 vds.write(self.witness)
244 n = vds.read_compact_size()
245 return list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
246
247 def is_segwit(self, *, guess_for_address=False) -> bool:
248 if self.witness not in (b'\x00', b'', None):
249 return True
250 return False
251
252
253 class BCDataStream(object):
254 """Workalike python implementation of Bitcoin's CDataStream class."""
255
256 def __init__(self):
257 self.input = None # type: Optional[bytearray]
258 self.read_cursor = 0
259
260 def clear(self):
261 self.input = None
262 self.read_cursor = 0
263
264 def write(self, _bytes: Union[bytes, bytearray]): # Initialize with string of _bytes
265 assert isinstance(_bytes, (bytes, bytearray))
266 if self.input is None:
267 self.input = bytearray(_bytes)
268 else:
269 self.input += bytearray(_bytes)
270
271 def read_string(self, encoding='ascii'):
272 # Strings are encoded depending on length:
273 # 0 to 252 : 1-byte-length followed by bytes (if any)
274 # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
275 # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
276 # ... and the Bitcoin client is coded to understand:
277 # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
278 # ... but I don't think it actually handles any strings that big.
279 if self.input is None:
280 raise SerializationError("call write(bytes) before trying to deserialize")
281
282 length = self.read_compact_size()
283
284 return self.read_bytes(length).decode(encoding)
285
286 def write_string(self, string, encoding='ascii'):
287 string = to_bytes(string, encoding)
288 # Length-encoded as with read-string
289 self.write_compact_size(len(string))
290 self.write(string)
291
292 def read_bytes(self, length: int) -> bytes:
293 if self.input is None:
294 raise SerializationError("call write(bytes) before trying to deserialize")
295 assert length >= 0
296 input_len = len(self.input)
297 read_begin = self.read_cursor
298 read_end = read_begin + length
299 if 0 <= read_begin <= read_end <= input_len:
300 result = self.input[read_begin:read_end] # type: bytearray
301 self.read_cursor += length
302 return bytes(result)
303 else:
304 raise SerializationError('attempt to read past end of buffer')
305
306 def write_bytes(self, _bytes: Union[bytes, bytearray], length: int):
307 assert len(_bytes) == length, len(_bytes)
308 self.write(_bytes)
309
310 def can_read_more(self) -> bool:
311 if not self.input:
312 return False
313 return self.read_cursor < len(self.input)
314
315 def read_boolean(self) -> bool: return self.read_bytes(1) != b'\x00'
316 def read_int16(self): return self._read_num('<h')
317 def read_uint16(self): return self._read_num('<H')
318 def read_int32(self): return self._read_num('<i')
319 def read_uint32(self): return self._read_num('<I')
320 def read_int64(self): return self._read_num('<q')
321 def read_uint64(self): return self._read_num('<Q')
322
323 def write_boolean(self, val): return self.write(b'\x01' if val else b'\x00')
324 def write_int16(self, val): return self._write_num('<h', val)
325 def write_uint16(self, val): return self._write_num('<H', val)
326 def write_int32(self, val): return self._write_num('<i', val)
327 def write_uint32(self, val): return self._write_num('<I', val)
328 def write_int64(self, val): return self._write_num('<q', val)
329 def write_uint64(self, val): return self._write_num('<Q', val)
330
331 def read_compact_size(self):
332 try:
333 size = self.input[self.read_cursor]
334 self.read_cursor += 1
335 if size == 253:
336 size = self._read_num('<H')
337 elif size == 254:
338 size = self._read_num('<I')
339 elif size == 255:
340 size = self._read_num('<Q')
341 return size
342 except IndexError as e:
343 raise SerializationError("attempt to read past end of buffer") from e
344
345 def write_compact_size(self, size):
346 if size < 0:
347 raise SerializationError("attempt to write size < 0")
348 elif size < 253:
349 self.write(bytes([size]))
350 elif size < 2**16:
351 self.write(b'\xfd')
352 self._write_num('<H', size)
353 elif size < 2**32:
354 self.write(b'\xfe')
355 self._write_num('<I', size)
356 elif size < 2**64:
357 self.write(b'\xff')
358 self._write_num('<Q', size)
359 else:
360 raise Exception(f"size {size} too large for compact_size")
361
362 def _read_num(self, format):
363 try:
364 (i,) = struct.unpack_from(format, self.input, self.read_cursor)
365 self.read_cursor += struct.calcsize(format)
366 except Exception as e:
367 raise SerializationError(e) from e
368 return i
369
370 def _write_num(self, format, num):
371 s = struct.pack(format, num)
372 self.write(s)
373
374
375 def script_GetOp(_bytes : bytes):
376 i = 0
377 while i < len(_bytes):
378 vch = None
379 opcode = _bytes[i]
380 i += 1
381
382 if opcode <= opcodes.OP_PUSHDATA4:
383 nSize = opcode
384 if opcode == opcodes.OP_PUSHDATA1:
385 try: nSize = _bytes[i]
386 except IndexError: raise MalformedBitcoinScript()
387 i += 1
388 elif opcode == opcodes.OP_PUSHDATA2:
389 try: (nSize,) = struct.unpack_from('<H', _bytes, i)
390 except struct.error: raise MalformedBitcoinScript()
391 i += 2
392 elif opcode == opcodes.OP_PUSHDATA4:
393 try: (nSize,) = struct.unpack_from('<I', _bytes, i)
394 except struct.error: raise MalformedBitcoinScript()
395 i += 4
396 vch = _bytes[i:i + nSize]
397 i += nSize
398
399 yield opcode, vch, i
400
401
402 class OPPushDataGeneric:
403 def __init__(self, pushlen: Callable=None):
404 if pushlen is not None:
405 self.check_data_len = pushlen
406
407 @classmethod
408 def check_data_len(cls, datalen: int) -> bool:
409 # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
410 return opcodes.OP_PUSHDATA4 >= datalen >= 0
411
412 @classmethod
413 def is_instance(cls, item):
414 # accept objects that are instances of this class
415 # or other classes that are subclasses
416 return isinstance(item, cls) \
417 or (isinstance(item, type) and issubclass(item, cls))
418
419
420 OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65))
421
422 SCRIPTPUBKEY_TEMPLATE_P2PKH = [opcodes.OP_DUP, opcodes.OP_HASH160,
423 OPPushDataGeneric(lambda x: x == 20),
424 opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG]
425 SCRIPTPUBKEY_TEMPLATE_P2SH = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL]
426 SCRIPTPUBKEY_TEMPLATE_WITNESS_V0 = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))]
427 SCRIPTPUBKEY_TEMPLATE_P2WPKH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 20)]
428 SCRIPTPUBKEY_TEMPLATE_P2WSH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 32)]
429
430
431 def match_script_against_template(script, template) -> bool:
432 """Returns whether 'script' matches 'template'."""
433 if script is None:
434 return False
435 # optionally decode script now:
436 if isinstance(script, (bytes, bytearray)):
437 try:
438 script = [x for x in script_GetOp(script)]
439 except MalformedBitcoinScript:
440 return False
441 if len(script) != len(template):
442 return False
443 for i in range(len(script)):
444 template_item = template[i]
445 script_item = script[i]
446 if OPPushDataGeneric.is_instance(template_item) and template_item.check_data_len(script_item[0]):
447 continue
448 if template_item != script_item[0]:
449 return False
450 return True
451
452 def get_script_type_from_output_script(_bytes: bytes) -> Optional[str]:
453 if _bytes is None:
454 return None
455 try:
456 decoded = [x for x in script_GetOp(_bytes)]
457 except MalformedBitcoinScript:
458 return None
459 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
460 return 'p2pkh'
461 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
462 return 'p2sh'
463 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WPKH):
464 return 'p2wpkh'
465 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WSH):
466 return 'p2wsh'
467 return None
468
469 def get_address_from_output_script(_bytes: bytes, *, net=None) -> Optional[str]:
470 try:
471 decoded = [x for x in script_GetOp(_bytes)]
472 except MalformedBitcoinScript:
473 return None
474
475 # p2pkh
476 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
477 return hash160_to_p2pkh(decoded[2][1], net=net)
478
479 # p2sh
480 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
481 return hash160_to_p2sh(decoded[1][1], net=net)
482
483 # segwit address (version 0)
484 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
485 return hash_to_segwit_addr(decoded[1][1], witver=0, net=net)
486
487 # segwit address (version 1-16)
488 future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
489 for witver, opcode in enumerate(future_witness_versions, start=1):
490 match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
491 if match_script_against_template(decoded, match):
492 return hash_to_segwit_addr(decoded[1][1], witver=witver, net=net)
493
494 return None
495
496
497 def parse_input(vds: BCDataStream) -> TxInput:
498 prevout_hash = vds.read_bytes(32)[::-1]
499 prevout_n = vds.read_uint32()
500 prevout = TxOutpoint(txid=prevout_hash, out_idx=prevout_n)
501 script_sig = vds.read_bytes(vds.read_compact_size())
502 nsequence = vds.read_uint32()
503 return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence)
504
505
506 def parse_witness(vds: BCDataStream, txin: TxInput) -> None:
507 n = vds.read_compact_size()
508 witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
509 txin.witness = bfh(construct_witness(witness_elements))
510
511
512 def parse_output(vds: BCDataStream) -> TxOutput:
513 value = vds.read_int64()
514 if value > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN:
515 raise SerializationError('invalid output amount (too large)')
516 if value < 0:
517 raise SerializationError('invalid output amount (negative)')
518 scriptpubkey = vds.read_bytes(vds.read_compact_size())
519 return TxOutput(value=value, scriptpubkey=scriptpubkey)
520
521
522 # pay & redeem scripts
523
524 def multisig_script(public_keys: Sequence[str], m: int) -> str:
525 n = len(public_keys)
526 assert 1 <= m <= n <= 15, f'm {m}, n {n}'
527 return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG])
528
529
530
531
532 class Transaction:
533 _cached_network_ser: Optional[str]
534
535 def __str__(self):
536 return self.serialize()
537
538 def __init__(self, raw):
539 if raw is None:
540 self._cached_network_ser = None
541 elif isinstance(raw, str):
542 self._cached_network_ser = raw.strip() if raw else None
543 assert is_hex_str(self._cached_network_ser)
544 elif isinstance(raw, (bytes, bytearray)):
545 self._cached_network_ser = bh2u(raw)
546 else:
547 raise Exception(f"cannot initialize transaction from {raw}")
548 self._inputs = None # type: List[TxInput]
549 self._outputs = None # type: List[TxOutput]
550 self._locktime = 0
551 self._version = 2
552
553 self._cached_txid = None # type: Optional[str]
554
555 @property
556 def locktime(self):
557 self.deserialize()
558 return self._locktime
559
560 @locktime.setter
561 def locktime(self, value: int):
562 assert isinstance(value, int), f"locktime must be int, not {value!r}"
563 self._locktime = value
564 self.invalidate_ser_cache()
565
566 @property
567 def version(self):
568 self.deserialize()
569 return self._version
570
571 @version.setter
572 def version(self, value):
573 self._version = value
574 self.invalidate_ser_cache()
575
576 def to_json(self) -> dict:
577 d = {
578 'version': self.version,
579 'locktime': self.locktime,
580 'inputs': [txin.to_json() for txin in self.inputs()],
581 'outputs': [txout.to_json() for txout in self.outputs()],
582 }
583 return d
584
585 def inputs(self) -> Sequence[TxInput]:
586 if self._inputs is None:
587 self.deserialize()
588 return self._inputs
589
590 def outputs(self) -> Sequence[TxOutput]:
591 if self._outputs is None:
592 self.deserialize()
593 return self._outputs
594
595 def deserialize(self) -> None:
596 if self._cached_network_ser is None:
597 return
598 if self._inputs is not None:
599 return
600
601 raw_bytes = bfh(self._cached_network_ser)
602 vds = BCDataStream()
603 vds.write(raw_bytes)
604 self._version = vds.read_int32()
605 n_vin = vds.read_compact_size()
606 is_segwit = (n_vin == 0)
607 if is_segwit:
608 marker = vds.read_bytes(1)
609 if marker != b'\x01':
610 raise ValueError('invalid txn marker byte: {}'.format(marker))
611 n_vin = vds.read_compact_size()
612 if n_vin < 1:
613 raise SerializationError('tx needs to have at least 1 input')
614 self._inputs = [parse_input(vds) for i in range(n_vin)]
615 n_vout = vds.read_compact_size()
616 if n_vout < 1:
617 raise SerializationError('tx needs to have at least 1 output')
618 self._outputs = [parse_output(vds) for i in range(n_vout)]
619 if is_segwit:
620 for txin in self._inputs:
621 parse_witness(vds, txin)
622 self._locktime = vds.read_uint32()
623 if vds.can_read_more():
624 raise SerializationError('extra junk at the end')
625
626 @classmethod
627 def get_siglist(self, txin: 'PartialTxInput', *, estimate_size=False):
628 if txin.is_coinbase_input():
629 return [], []
630
631 if estimate_size:
632 try:
633 pubkey_size = len(txin.pubkeys[0])
634 except IndexError:
635 pubkey_size = 33 # guess it is compressed
636 num_pubkeys = max(1, len(txin.pubkeys))
637 pk_list = ["00" * pubkey_size] * num_pubkeys
638 num_sig = max(1, txin.num_sig)
639 # we guess that signatures will be 72 bytes long
640 # note: DER-encoded ECDSA signatures are 71 or 72 bytes in practice
641 # See https://bitcoin.stackexchange.com/questions/77191/what-is-the-maximum-size-of-a-der-encoded-ecdsa-signature
642 # We assume low S (as that is a bitcoin standardness rule).
643 # We do not assume low R (even though the sigs we create conform), as external sigs,
644 # e.g. from a hw signer cannot be expected to have a low R.
645 sig_list = [ "00" * 72 ] * num_sig
646 else:
647 pk_list = [pubkey.hex() for pubkey in txin.pubkeys]
648 sig_list = [txin.part_sigs.get(pubkey, b'').hex() for pubkey in txin.pubkeys]
649 if txin.is_complete():
650 sig_list = [sig for sig in sig_list if sig]
651 return pk_list, sig_list
652
653 @classmethod
654 def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> str:
655 if txin.witness is not None:
656 return txin.witness.hex()
657 if txin.is_coinbase_input():
658 return ''
659 assert isinstance(txin, PartialTxInput)
660
661 _type = txin.script_type
662 if not txin.is_segwit():
663 return construct_witness([])
664
665 if _type in ('address', 'unknown') and estimate_size:
666 _type = cls.guess_txintype_from_address(txin.address)
667 pubkeys, sig_list = cls.get_siglist(txin, estimate_size=estimate_size)
668 if _type in ['p2wpkh', 'p2wpkh-p2sh']:
669 return construct_witness([sig_list[0], pubkeys[0]])
670 elif _type in ['p2wsh', 'p2wsh-p2sh']:
671 witness_script = multisig_script(pubkeys, txin.num_sig)
672 return construct_witness([0, *sig_list, witness_script])
673 elif _type in ['p2pk', 'p2pkh', 'p2sh']:
674 return construct_witness([])
675 raise UnknownTxinType(f'cannot construct witness for txin_type: {_type}')
676
677 @classmethod
678 def guess_txintype_from_address(cls, addr: Optional[str]) -> str:
679 # It's not possible to tell the script type in general
680 # just from an address.
681 # - "1" addresses are of course p2pkh
682 # - "3" addresses are p2sh but we don't know the redeem script..
683 # - "bc1" addresses (if they are 42-long) are p2wpkh
684 # - "bc1" addresses that are 62-long are p2wsh but we don't know the script..
685 # If we don't know the script, we _guess_ it is pubkeyhash.
686 # As this method is used e.g. for tx size estimation,
687 # the estimation will not be precise.
688 if addr is None:
689 return 'p2wpkh'
690 witver, witprog = segwit_addr.decode(constants.net.SEGWIT_HRP, addr)
691 if witprog is not None:
692 return 'p2wpkh'
693 addrtype, hash_160_ = b58_address_to_hash160(addr)
694 if addrtype == constants.net.ADDRTYPE_P2PKH:
695 return 'p2pkh'
696 elif addrtype == constants.net.ADDRTYPE_P2SH:
697 return 'p2wpkh-p2sh'
698 raise Exception(f'unrecognized address: {repr(addr)}')
699
700 @classmethod
701 def input_script(self, txin: TxInput, *, estimate_size=False) -> str:
702 if txin.script_sig is not None:
703 return txin.script_sig.hex()
704 if txin.is_coinbase_input():
705 return ''
706 assert isinstance(txin, PartialTxInput)
707
708 if txin.is_p2sh_segwit() and txin.redeem_script:
709 return construct_script([txin.redeem_script])
710 if txin.is_native_segwit():
711 return ''
712
713 _type = txin.script_type
714 pubkeys, sig_list = self.get_siglist(txin, estimate_size=estimate_size)
715 if _type in ('address', 'unknown') and estimate_size:
716 _type = self.guess_txintype_from_address(txin.address)
717 if _type == 'p2pk':
718 return construct_script([sig_list[0]])
719 elif _type == 'p2sh':
720 # put op_0 before script
721 redeem_script = multisig_script(pubkeys, txin.num_sig)
722 return construct_script([0, *sig_list, redeem_script])
723 elif _type == 'p2pkh':
724 return construct_script([sig_list[0], pubkeys[0]])
725 elif _type in ['p2wpkh', 'p2wsh']:
726 return ''
727 elif _type == 'p2wpkh-p2sh':
728 redeem_script = bitcoin.p2wpkh_nested_script(pubkeys[0])
729 return construct_script([redeem_script])
730 elif _type == 'p2wsh-p2sh':
731 if estimate_size:
732 witness_script = ''
733 else:
734 witness_script = self.get_preimage_script(txin)
735 redeem_script = bitcoin.p2wsh_nested_script(witness_script)
736 return construct_script([redeem_script])
737 raise UnknownTxinType(f'cannot construct scriptSig for txin_type: {_type}')
738
739 @classmethod
740 def get_preimage_script(cls, txin: 'PartialTxInput') -> str:
741 if txin.witness_script:
742 if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]:
743 raise Exception('OP_CODESEPARATOR black magic is not supported')
744 return txin.witness_script.hex()
745 if not txin.is_segwit() and txin.redeem_script:
746 if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]:
747 raise Exception('OP_CODESEPARATOR black magic is not supported')
748 return txin.redeem_script.hex()
749
750 pubkeys = [pk.hex() for pk in txin.pubkeys]
751 if txin.script_type in ['p2sh', 'p2wsh', 'p2wsh-p2sh']:
752 return multisig_script(pubkeys, txin.num_sig)
753 elif txin.script_type in ['p2pkh', 'p2wpkh', 'p2wpkh-p2sh']:
754 pubkey = pubkeys[0]
755 pkh = bh2u(hash_160(bfh(pubkey)))
756 return bitcoin.pubkeyhash_to_p2pkh_script(pkh)
757 elif txin.script_type == 'p2pk':
758 pubkey = pubkeys[0]
759 return bitcoin.public_key_to_p2pk_script(pubkey)
760 else:
761 raise UnknownTxinType(f'cannot construct preimage_script for txin_type: {txin.script_type}')
762
763 @classmethod
764 def serialize_input(self, txin: TxInput, script: str) -> str:
765 # Prev hash and index
766 s = txin.prevout.serialize_to_network().hex()
767 # Script length, script, sequence
768 s += var_int(len(script)//2)
769 s += script
770 s += int_to_hex(txin.nsequence, 4)
771 return s
772
773 def _calc_bip143_shared_txdigest_fields(self) -> BIP143SharedTxDigestFields:
774 inputs = self.inputs()
775 outputs = self.outputs()
776 hashPrevouts = bh2u(sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs)))
777 hashSequence = bh2u(sha256d(bfh(''.join(int_to_hex(txin.nsequence, 4) for txin in inputs))))
778 hashOutputs = bh2u(sha256d(bfh(''.join(o.serialize_to_network().hex() for o in outputs))))
779 return BIP143SharedTxDigestFields(hashPrevouts=hashPrevouts,
780 hashSequence=hashSequence,
781 hashOutputs=hashOutputs)
782
783 def is_segwit(self, *, guess_for_address=False):
784 return any(txin.is_segwit(guess_for_address=guess_for_address)
785 for txin in self.inputs())
786
787 def invalidate_ser_cache(self):
788 self._cached_network_ser = None
789 self._cached_txid = None
790
791 def serialize(self) -> str:
792 if not self._cached_network_ser:
793 self._cached_network_ser = self.serialize_to_network(estimate_size=False, include_sigs=True)
794 return self._cached_network_ser
795
796 def serialize_as_bytes(self) -> bytes:
797 return bfh(self.serialize())
798
799 def serialize_to_network(self, *, estimate_size=False, include_sigs=True, force_legacy=False) -> str:
800 """Serialize the transaction as used on the Bitcoin network, into hex.
801 `include_sigs` signals whether to include scriptSigs and witnesses.
802 `force_legacy` signals to use the pre-segwit format
803 note: (not include_sigs) implies force_legacy
804 """
805 self.deserialize()
806 nVersion = int_to_hex(self.version, 4)
807 nLocktime = int_to_hex(self.locktime, 4)
808 inputs = self.inputs()
809 outputs = self.outputs()
810
811 def create_script_sig(txin: TxInput) -> str:
812 if include_sigs:
813 return self.input_script(txin, estimate_size=estimate_size)
814 return ''
815 txins = var_int(len(inputs)) + ''.join(self.serialize_input(txin, create_script_sig(txin))
816 for txin in inputs)
817 txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs)
818
819 use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True)
820 use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit()
821 use_segwit_ser = use_segwit_ser_for_estimate_size or use_segwit_ser_for_actual_use
822 if include_sigs and not force_legacy and use_segwit_ser:
823 marker = '00'
824 flag = '01'
825 witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size) for x in inputs)
826 return nVersion + marker + flag + txins + txouts + witness + nLocktime
827 else:
828 return nVersion + txins + txouts + nLocktime
829
830 def to_qr_data(self) -> str:
831 """Returns tx as data to be put into a QR code. No side-effects."""
832 tx = copy.deepcopy(self) # make copy as we mutate tx
833 if isinstance(tx, PartialTransaction):
834 # this makes QR codes a lot smaller (or just possible in the first place!)
835 tx.convert_all_utxos_to_witness_utxos()
836 tx_bytes = tx.serialize_as_bytes()
837 return base_encode(tx_bytes, base=43)
838
839 def txid(self) -> Optional[str]:
840 if self._cached_txid is None:
841 self.deserialize()
842 all_segwit = all(txin.is_segwit() for txin in self.inputs())
843 if not all_segwit and not self.is_complete():
844 return None
845 try:
846 ser = self.serialize_to_network(force_legacy=True)
847 except UnknownTxinType:
848 # we might not know how to construct scriptSig for some scripts
849 return None
850 self._cached_txid = bh2u(sha256d(bfh(ser))[::-1])
851 return self._cached_txid
852
853 def wtxid(self) -> Optional[str]:
854 self.deserialize()
855 if not self.is_complete():
856 return None
857 try:
858 ser = self.serialize_to_network()
859 except UnknownTxinType:
860 # we might not know how to construct scriptSig/witness for some scripts
861 return None
862 return bh2u(sha256d(bfh(ser))[::-1])
863
864 def add_info_from_wallet(self, wallet: 'Abstract_Wallet', **kwargs) -> None:
865 return # no-op
866
867 def is_final(self) -> bool:
868 """Whether RBF is disabled."""
869 return not any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()])
870
871 def estimated_size(self):
872 """Return an estimated virtual tx size in vbytes.
873 BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up.
874 This definition is only for humans, and has little meaning otherwise.
875 If we wanted sub-byte precision, fee calculation should use transaction
876 weights, but for simplicity we approximate that with (virtual_size)x4
877 """
878 weight = self.estimated_weight()
879 return self.virtual_size_from_weight(weight)
880
881 @classmethod
882 def estimated_input_weight(cls, txin, is_segwit_tx):
883 '''Return an estimate of serialized input weight in weight units.'''
884 script = cls.input_script(txin, estimate_size=True)
885 input_size = len(cls.serialize_input(txin, script)) // 2
886
887 if txin.is_segwit(guess_for_address=True):
888 witness_size = len(cls.serialize_witness(txin, estimate_size=True)) // 2
889 else:
890 witness_size = 1 if is_segwit_tx else 0
891
892 return 4 * input_size + witness_size
893
894 @classmethod
895 def estimated_output_size_for_address(cls, address: str) -> int:
896 """Return an estimate of serialized output size in bytes."""
897 script = bitcoin.address_to_script(address)
898 return cls.estimated_output_size_for_script(script)
899
900 @classmethod
901 def estimated_output_size_for_script(cls, script: str) -> int:
902 """Return an estimate of serialized output size in bytes."""
903 # 8 byte value + varint script len + script
904 script_len = len(script) // 2
905 var_int_len = len(var_int(script_len)) // 2
906 return 8 + var_int_len + script_len
907
908 @classmethod
909 def virtual_size_from_weight(cls, weight):
910 return weight // 4 + (weight % 4 > 0)
911
912 @classmethod
913 def satperbyte_from_satperkw(cls, feerate_kw):
914 """Converts feerate from sat/kw to sat/vbyte."""
915 return feerate_kw * 4 / 1000
916
917 def estimated_total_size(self):
918 """Return an estimated total transaction size in bytes."""
919 if not self.is_complete() or self._cached_network_ser is None:
920 return len(self.serialize_to_network(estimate_size=True)) // 2
921 else:
922 return len(self._cached_network_ser) // 2 # ASCII hex string
923
924 def estimated_witness_size(self):
925 """Return an estimate of witness size in bytes."""
926 estimate = not self.is_complete()
927 if not self.is_segwit(guess_for_address=estimate):
928 return 0
929 inputs = self.inputs()
930 witness = ''.join(self.serialize_witness(x, estimate_size=estimate) for x in inputs)
931 witness_size = len(witness) // 2 + 2 # include marker and flag
932 return witness_size
933
934 def estimated_base_size(self):
935 """Return an estimated base transaction size in bytes."""
936 return self.estimated_total_size() - self.estimated_witness_size()
937
938 def estimated_weight(self):
939 """Return an estimate of transaction weight."""
940 total_tx_size = self.estimated_total_size()
941 base_tx_size = self.estimated_base_size()
942 return 3 * base_tx_size + total_tx_size
943
944 def is_complete(self) -> bool:
945 return True
946
947 def get_output_idxs_from_scriptpubkey(self, script: str) -> Set[int]:
948 """Returns the set indices of outputs with given script."""
949 assert isinstance(script, str) # hex
950 # build cache if there isn't one yet
951 # note: can become stale and return incorrect data
952 # if the tx is modified later; that's out of scope.
953 if not hasattr(self, '_script_to_output_idx'):
954 d = defaultdict(set)
955 for output_idx, o in enumerate(self.outputs()):
956 o_script = o.scriptpubkey.hex()
957 assert isinstance(o_script, str)
958 d[o_script].add(output_idx)
959 self._script_to_output_idx = d
960 return set(self._script_to_output_idx[script]) # copy
961
962 def get_output_idxs_from_address(self, addr: str) -> Set[int]:
963 script = bitcoin.address_to_script(addr)
964 return self.get_output_idxs_from_scriptpubkey(script)
965
966 def output_value_for_address(self, addr):
967 # assumes exactly one output has that address
968 for o in self.outputs():
969 if o.address == addr:
970 return o.value
971 else:
972 raise Exception('output not found', addr)
973
974 def get_input_idx_that_spent_prevout(self, prevout: TxOutpoint) -> Optional[int]:
975 # build cache if there isn't one yet
976 # note: can become stale and return incorrect data
977 # if the tx is modified later; that's out of scope.
978 if not hasattr(self, '_prevout_to_input_idx'):
979 d = {} # type: Dict[TxOutpoint, int]
980 for i, txin in enumerate(self.inputs()):
981 d[txin.prevout] = i
982 self._prevout_to_input_idx = d
983 idx = self._prevout_to_input_idx.get(prevout)
984 if idx is not None:
985 assert self.inputs()[idx].prevout == prevout
986 return idx
987
988
989 def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str:
990 """Sanitizes tx-describing input (hex/base43/base64) into
991 raw tx hex string."""
992 if not raw:
993 raise ValueError("empty string")
994 raw_unstripped = raw
995 raw = raw.strip()
996 # try hex
997 try:
998 return binascii.unhexlify(raw).hex()
999 except:
1000 pass
1001 # try base43
1002 try:
1003 return base_decode(raw, base=43).hex()
1004 except:
1005 pass
1006 # try base64
1007 if raw[0:6] in ('cHNidP', b'cHNidP'): # base64 psbt
1008 try:
1009 return base64.b64decode(raw).hex()
1010 except:
1011 pass
1012 # raw bytes (do not strip whitespaces in this case)
1013 if isinstance(raw_unstripped, bytes):
1014 return raw_unstripped.hex()
1015 raise ValueError(f"failed to recognize transaction encoding for txt: {raw[:30]}...")
1016
1017
1018 def tx_from_any(raw: Union[str, bytes], *,
1019 deserialize: bool = True) -> Union['PartialTransaction', 'Transaction']:
1020 if isinstance(raw, bytearray):
1021 raw = bytes(raw)
1022 raw = convert_raw_tx_to_hex(raw)
1023 try:
1024 return PartialTransaction.from_raw_psbt(raw)
1025 except BadHeaderMagic:
1026 if raw[:10] == b'EPTF\xff'.hex():
1027 raise SerializationError("Partial transactions generated with old Electrum versions "
1028 "(< 4.0) are no longer supported. Please upgrade Electrum on "
1029 "the other machine where this transaction was created.")
1030 try:
1031 tx = Transaction(raw)
1032 if deserialize:
1033 tx.deserialize()
1034 return tx
1035 except Exception as e:
1036 raise SerializationError(f"Failed to recognise tx encoding, or to parse transaction. "
1037 f"raw: {raw[:30]}...") from e
1038
1039
1040 class PSBTGlobalType(IntEnum):
1041 UNSIGNED_TX = 0
1042 XPUB = 1
1043 VERSION = 0xFB
1044
1045
1046 class PSBTInputType(IntEnum):
1047 NON_WITNESS_UTXO = 0
1048 WITNESS_UTXO = 1
1049 PARTIAL_SIG = 2
1050 SIGHASH_TYPE = 3
1051 REDEEM_SCRIPT = 4
1052 WITNESS_SCRIPT = 5
1053 BIP32_DERIVATION = 6
1054 FINAL_SCRIPTSIG = 7
1055 FINAL_SCRIPTWITNESS = 8
1056
1057
1058 class PSBTOutputType(IntEnum):
1059 REDEEM_SCRIPT = 0
1060 WITNESS_SCRIPT = 1
1061 BIP32_DERIVATION = 2
1062
1063
1064 # Serialization/deserialization tools
1065 def deser_compact_size(f) -> Optional[int]:
1066 try:
1067 nit = f.read(1)[0]
1068 except IndexError:
1069 return None # end of file
1070
1071 if nit == 253:
1072 nit = struct.unpack("<H", f.read(2))[0]
1073 elif nit == 254:
1074 nit = struct.unpack("<I", f.read(4))[0]
1075 elif nit == 255:
1076 nit = struct.unpack("<Q", f.read(8))[0]
1077 return nit
1078
1079
1080 class PSBTSection:
1081
1082 def _populate_psbt_fields_from_fd(self, fd=None):
1083 if not fd: return
1084
1085 while True:
1086 try:
1087 key_type, key, val = self.get_next_kv_from_fd(fd)
1088 except StopIteration:
1089 break
1090 self.parse_psbt_section_kv(key_type, key, val)
1091
1092 @classmethod
1093 def get_next_kv_from_fd(cls, fd) -> Tuple[int, bytes, bytes]:
1094 key_size = deser_compact_size(fd)
1095 if key_size == 0:
1096 raise StopIteration()
1097 if key_size is None:
1098 raise UnexpectedEndOfStream()
1099
1100 full_key = fd.read(key_size)
1101 key_type, key = cls.get_keytype_and_key_from_fullkey(full_key)
1102
1103 val_size = deser_compact_size(fd)
1104 if val_size is None: raise UnexpectedEndOfStream()
1105 val = fd.read(val_size)
1106
1107 return key_type, key, val
1108
1109 @classmethod
1110 def create_psbt_writer(cls, fd):
1111 def wr(key_type: int, val: bytes, key: bytes = b''):
1112 full_key = cls.get_fullkey_from_keytype_and_key(key_type, key)
1113 fd.write(bytes.fromhex(var_int(len(full_key)))) # key_size
1114 fd.write(full_key) # key
1115 fd.write(bytes.fromhex(var_int(len(val)))) # val_size
1116 fd.write(val) # val
1117 return wr
1118
1119 @classmethod
1120 def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]:
1121 with io.BytesIO(full_key) as key_stream:
1122 key_type = deser_compact_size(key_stream)
1123 if key_type is None: raise UnexpectedEndOfStream()
1124 key = key_stream.read()
1125 return key_type, key
1126
1127 @classmethod
1128 def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes:
1129 key_type_bytes = bytes.fromhex(var_int(key_type))
1130 return key_type_bytes + key
1131
1132 def _serialize_psbt_section(self, fd):
1133 wr = self.create_psbt_writer(fd)
1134 self.serialize_psbt_section_kvs(wr)
1135 fd.write(b'\x00') # section-separator
1136
1137 def parse_psbt_section_kv(self, kt: int, key: bytes, val: bytes) -> None:
1138 raise NotImplementedError() # implemented by subclasses
1139
1140 def serialize_psbt_section_kvs(self, wr) -> None:
1141 raise NotImplementedError() # implemented by subclasses
1142
1143
1144 class PartialTxInput(TxInput, PSBTSection):
1145 def __init__(self, *args, **kwargs):
1146 TxInput.__init__(self, *args, **kwargs)
1147 self._utxo = None # type: Optional[Transaction]
1148 self._witness_utxo = None # type: Optional[TxOutput]
1149 self.part_sigs = {} # type: Dict[bytes, bytes] # pubkey -> sig
1150 self.sighash = None # type: Optional[int]
1151 self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path)
1152 self.redeem_script = None # type: Optional[bytes]
1153 self.witness_script = None # type: Optional[bytes]
1154 self._unknown = {} # type: Dict[bytes, bytes]
1155
1156 self.script_type = 'unknown'
1157 self.num_sig = 0 # type: int # num req sigs for multisig
1158 self.pubkeys = [] # type: List[bytes] # note: order matters
1159 self._trusted_value_sats = None # type: Optional[int]
1160 self._trusted_address = None # type: Optional[str]
1161 self.block_height = None # type: Optional[int] # height at which the TXO is mined; None means unknown
1162 self.spent_height = None # type: Optional[int] # height at which the TXO got spent
1163 self._is_p2sh_segwit = None # type: Optional[bool] # None means unknown
1164 self._is_native_segwit = None # type: Optional[bool] # None means unknown
1165
1166 @property
1167 def utxo(self):
1168 return self._utxo
1169
1170 @utxo.setter
1171 def utxo(self, tx: Optional[Transaction]):
1172 if tx is None:
1173 return
1174 # note that tx might be a PartialTransaction
1175 # serialize and de-serialize tx now. this might e.g. convert a complete PartialTx to a Tx
1176 tx = tx_from_any(str(tx))
1177 # 'utxo' field in PSBT cannot be another PSBT:
1178 if not tx.is_complete():
1179 return
1180 self._utxo = tx
1181 self.validate_data()
1182 self.ensure_there_is_only_one_utxo()
1183
1184 @property
1185 def witness_utxo(self):
1186 return self._witness_utxo
1187
1188 @witness_utxo.setter
1189 def witness_utxo(self, value: Optional[TxOutput]):
1190 self._witness_utxo = value
1191 self.validate_data()
1192 self.ensure_there_is_only_one_utxo()
1193
1194 def to_json(self):
1195 d = super().to_json()
1196 d.update({
1197 'height': self.block_height,
1198 'value_sats': self.value_sats(),
1199 'address': self.address,
1200 'utxo': str(self.utxo) if self.utxo else None,
1201 'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None,
1202 'sighash': self.sighash,
1203 'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
1204 'witness_script': self.witness_script.hex() if self.witness_script else None,
1205 'part_sigs': {pubkey.hex(): sig.hex() for pubkey, sig in self.part_sigs.items()},
1206 'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1207 for pubkey, (xfp, path) in self.bip32_paths.items()},
1208 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1209 })
1210 return d
1211
1212 @classmethod
1213 def from_txin(cls, txin: TxInput, *, strip_witness: bool = True) -> 'PartialTxInput':
1214 # FIXME: if strip_witness is True, res.is_segwit() will return False,
1215 # and res.estimated_size() will return an incorrect value. These methods
1216 # will return the correct values after we call add_input_info(). (see dscancel and bump_fee)
1217 # This is very fragile: the value returned by estimate_size() depends on the calling order.
1218 res = PartialTxInput(prevout=txin.prevout,
1219 script_sig=None if strip_witness else txin.script_sig,
1220 nsequence=txin.nsequence,
1221 witness=None if strip_witness else txin.witness,
1222 is_coinbase_output=txin.is_coinbase_output())
1223 return res
1224
1225 def validate_data(self, *, for_signing=False) -> None:
1226 if self.utxo:
1227 if self.prevout.txid.hex() != self.utxo.txid():
1228 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1229 f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout")
1230 if self.witness_utxo:
1231 if self.utxo.outputs()[self.prevout.out_idx] != self.witness_utxo:
1232 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1233 f"If both non-witness UTXO and witness UTXO are provided, they must be consistent")
1234 # The following test is disabled, so we are willing to sign non-segwit inputs
1235 # without verifying the input amount. This means, given a maliciously modified PSBT,
1236 # for non-segwit inputs, we might end up burning coins as miner fees.
1237 if for_signing and False:
1238 if not self.is_segwit() and self.witness_utxo:
1239 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1240 f"If a witness UTXO is provided, no non-witness signature may be created")
1241 if self.redeem_script and self.address:
1242 addr = hash160_to_p2sh(hash_160(self.redeem_script))
1243 if self.address != addr:
1244 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1245 f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript")
1246 if self.witness_script:
1247 if self.redeem_script:
1248 if self.redeem_script != bfh(bitcoin.p2wsh_nested_script(self.witness_script.hex())):
1249 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1250 f"If a witnessScript is provided, the redeemScript must be for that witnessScript")
1251 elif self.address:
1252 if self.address != bitcoin.script_to_p2wsh(self.witness_script.hex()):
1253 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
1254 f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript")
1255
1256 def parse_psbt_section_kv(self, kt, key, val):
1257 try:
1258 kt = PSBTInputType(kt)
1259 except ValueError:
1260 pass # unknown type
1261 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
1262 if kt == PSBTInputType.NON_WITNESS_UTXO:
1263 if self.utxo is not None:
1264 raise SerializationError(f"duplicate key: {repr(kt)}")
1265 self.utxo = Transaction(val)
1266 self.utxo.deserialize()
1267 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1268 elif kt == PSBTInputType.WITNESS_UTXO:
1269 if self.witness_utxo is not None:
1270 raise SerializationError(f"duplicate key: {repr(kt)}")
1271 self.witness_utxo = TxOutput.from_network_bytes(val)
1272 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1273 elif kt == PSBTInputType.PARTIAL_SIG:
1274 if key in self.part_sigs:
1275 raise SerializationError(f"duplicate key: {repr(kt)}")
1276 if len(key) not in (33, 65): # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32...
1277 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
1278 self.part_sigs[key] = val
1279 elif kt == PSBTInputType.SIGHASH_TYPE:
1280 if self.sighash is not None:
1281 raise SerializationError(f"duplicate key: {repr(kt)}")
1282 if len(val) != 4:
1283 raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
1284 self.sighash = struct.unpack("<I", val)[0]
1285 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1286 elif kt == PSBTInputType.BIP32_DERIVATION:
1287 if key in self.bip32_paths:
1288 raise SerializationError(f"duplicate key: {repr(kt)}")
1289 if len(key) not in (33, 65): # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32...
1290 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
1291 self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
1292 elif kt == PSBTInputType.REDEEM_SCRIPT:
1293 if self.redeem_script is not None:
1294 raise SerializationError(f"duplicate key: {repr(kt)}")
1295 self.redeem_script = val
1296 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1297 elif kt == PSBTInputType.WITNESS_SCRIPT:
1298 if self.witness_script is not None:
1299 raise SerializationError(f"duplicate key: {repr(kt)}")
1300 self.witness_script = val
1301 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1302 elif kt == PSBTInputType.FINAL_SCRIPTSIG:
1303 if self.script_sig is not None:
1304 raise SerializationError(f"duplicate key: {repr(kt)}")
1305 self.script_sig = val
1306 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1307 elif kt == PSBTInputType.FINAL_SCRIPTWITNESS:
1308 if self.witness is not None:
1309 raise SerializationError(f"duplicate key: {repr(kt)}")
1310 self.witness = val
1311 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1312 else:
1313 full_key = self.get_fullkey_from_keytype_and_key(kt, key)
1314 if full_key in self._unknown:
1315 raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}')
1316 self._unknown[full_key] = val
1317
1318 def serialize_psbt_section_kvs(self, wr):
1319 self.ensure_there_is_only_one_utxo()
1320 if self.witness_utxo:
1321 wr(PSBTInputType.WITNESS_UTXO, self.witness_utxo.serialize_to_network())
1322 if self.utxo:
1323 wr(PSBTInputType.NON_WITNESS_UTXO, bfh(self.utxo.serialize_to_network(include_sigs=True)))
1324 for pk, val in sorted(self.part_sigs.items()):
1325 wr(PSBTInputType.PARTIAL_SIG, val, pk)
1326 if self.sighash is not None:
1327 wr(PSBTInputType.SIGHASH_TYPE, struct.pack('<I', self.sighash))
1328 if self.redeem_script is not None:
1329 wr(PSBTInputType.REDEEM_SCRIPT, self.redeem_script)
1330 if self.witness_script is not None:
1331 wr(PSBTInputType.WITNESS_SCRIPT, self.witness_script)
1332 for k in sorted(self.bip32_paths):
1333 packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
1334 wr(PSBTInputType.BIP32_DERIVATION, packed_path, k)
1335 if self.script_sig is not None:
1336 wr(PSBTInputType.FINAL_SCRIPTSIG, self.script_sig)
1337 if self.witness is not None:
1338 wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness)
1339 for full_key, val in sorted(self._unknown.items()):
1340 key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
1341 wr(key_type, val, key=key)
1342
1343 def value_sats(self) -> Optional[int]:
1344 if self._trusted_value_sats is not None:
1345 return self._trusted_value_sats
1346 if self.utxo:
1347 out_idx = self.prevout.out_idx
1348 return self.utxo.outputs()[out_idx].value
1349 if self.witness_utxo:
1350 return self.witness_utxo.value
1351 return None
1352
1353 @property
1354 def address(self) -> Optional[str]:
1355 if self._trusted_address is not None:
1356 return self._trusted_address
1357 scriptpubkey = self.scriptpubkey
1358 if scriptpubkey:
1359 return get_address_from_output_script(scriptpubkey)
1360 return None
1361
1362 @property
1363 def scriptpubkey(self) -> Optional[bytes]:
1364 if self._trusted_address is not None:
1365 return bfh(bitcoin.address_to_script(self._trusted_address))
1366 if self.utxo:
1367 out_idx = self.prevout.out_idx
1368 return self.utxo.outputs()[out_idx].scriptpubkey
1369 if self.witness_utxo:
1370 return self.witness_utxo.scriptpubkey
1371 return None
1372
1373 def set_script_type(self) -> None:
1374 if self.scriptpubkey is None:
1375 return
1376 type = get_script_type_from_output_script(self.scriptpubkey)
1377 inner_type = None
1378 if type is not None:
1379 if type == 'p2sh':
1380 inner_type = get_script_type_from_output_script(self.redeem_script)
1381 elif type == 'p2wsh':
1382 inner_type = get_script_type_from_output_script(self.witness_script)
1383 if inner_type is not None:
1384 type = inner_type + '-' + type
1385 if type in ('p2pkh', 'p2wpkh-p2sh', 'p2wpkh'):
1386 self.script_type = type
1387 return
1388
1389 def is_complete(self) -> bool:
1390 if self.script_sig is not None and self.witness is not None:
1391 return True
1392 if self.is_coinbase_input():
1393 return True
1394 if self.script_sig is not None and not self.is_segwit():
1395 return True
1396 signatures = list(self.part_sigs.values())
1397 s = len(signatures)
1398 # note: The 'script_type' field is currently only set by the wallet,
1399 # for its own addresses. This means we can only finalize inputs
1400 # that are related to the wallet.
1401 # The 'fix' would be adding extra logic that matches on templates,
1402 # and figures out the script_type from available fields.
1403 if self.script_type in ('p2pk', 'p2pkh', 'p2wpkh', 'p2wpkh-p2sh'):
1404 return s >= 1
1405 if self.script_type in ('p2sh', 'p2wsh', 'p2wsh-p2sh'):
1406 return s >= self.num_sig
1407 return False
1408
1409 def finalize(self) -> None:
1410 def clear_fields_when_finalized():
1411 # BIP-174: "All other data except the UTXO and unknown fields in the
1412 # input key-value map should be cleared from the PSBT"
1413 self.part_sigs = {}
1414 self.sighash = None
1415 self.bip32_paths = {}
1416 self.redeem_script = None
1417 self.witness_script = None
1418
1419 if self.script_sig is not None and self.witness is not None:
1420 clear_fields_when_finalized()
1421 return # already finalized
1422 if self.is_complete():
1423 self.script_sig = bfh(Transaction.input_script(self))
1424 self.witness = bfh(Transaction.serialize_witness(self))
1425 clear_fields_when_finalized()
1426
1427 def combine_with_other_txin(self, other_txin: 'TxInput') -> None:
1428 assert self.prevout == other_txin.prevout
1429 if other_txin.script_sig is not None:
1430 self.script_sig = other_txin.script_sig
1431 if other_txin.witness is not None:
1432 self.witness = other_txin.witness
1433 if isinstance(other_txin, PartialTxInput):
1434 if other_txin.witness_utxo:
1435 self.witness_utxo = other_txin.witness_utxo
1436 if other_txin.utxo:
1437 self.utxo = other_txin.utxo
1438 self.part_sigs.update(other_txin.part_sigs)
1439 if other_txin.sighash is not None:
1440 self.sighash = other_txin.sighash
1441 self.bip32_paths.update(other_txin.bip32_paths)
1442 if other_txin.redeem_script is not None:
1443 self.redeem_script = other_txin.redeem_script
1444 if other_txin.witness_script is not None:
1445 self.witness_script = other_txin.witness_script
1446 self._unknown.update(other_txin._unknown)
1447 self.ensure_there_is_only_one_utxo()
1448 # try to finalize now
1449 self.finalize()
1450
1451 def ensure_there_is_only_one_utxo(self):
1452 # we prefer having the full previous tx, even for segwit inputs. see #6198
1453 # for witness v1, witness_utxo will be enough though
1454 if self.utxo is not None and self.witness_utxo is not None:
1455 self.witness_utxo = None
1456
1457 def convert_utxo_to_witness_utxo(self) -> None:
1458 if self.utxo:
1459 self._witness_utxo = self.utxo.outputs()[self.prevout.out_idx]
1460 self._utxo = None # type: Optional[Transaction]
1461
1462 def is_native_segwit(self) -> Optional[bool]:
1463 """Whether this input is native segwit. None means inconclusive."""
1464 if self._is_native_segwit is None:
1465 if self.address:
1466 self._is_native_segwit = bitcoin.is_segwit_address(self.address)
1467 return self._is_native_segwit
1468
1469 def is_p2sh_segwit(self) -> Optional[bool]:
1470 """Whether this input is p2sh-embedded-segwit. None means inconclusive."""
1471 if self._is_p2sh_segwit is None:
1472 def calc_if_p2sh_segwit_now():
1473 if not (self.address and self.redeem_script):
1474 return None
1475 if self.address != bitcoin.hash160_to_p2sh(hash_160(self.redeem_script)):
1476 # not p2sh address
1477 return False
1478 try:
1479 decoded = [x for x in script_GetOp(self.redeem_script)]
1480 except MalformedBitcoinScript:
1481 decoded = None
1482 # witness version 0
1483 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
1484 return True
1485 # witness version 1-16
1486 future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
1487 for witver, opcode in enumerate(future_witness_versions, start=1):
1488 match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
1489 if match_script_against_template(decoded, match):
1490 return True
1491 return False
1492
1493 self._is_p2sh_segwit = calc_if_p2sh_segwit_now()
1494 return self._is_p2sh_segwit
1495
1496 def is_segwit(self, *, guess_for_address=False) -> bool:
1497 if super().is_segwit():
1498 return True
1499 if self.is_native_segwit() or self.is_p2sh_segwit():
1500 return True
1501 if self.is_native_segwit() is False and self.is_p2sh_segwit() is False:
1502 return False
1503 if self.witness_script:
1504 return True
1505 _type = self.script_type
1506 if _type == 'address' and guess_for_address:
1507 _type = Transaction.guess_txintype_from_address(self.address)
1508 return is_segwit_script_type(_type)
1509
1510 def already_has_some_signatures(self) -> bool:
1511 """Returns whether progress has been made towards completing this input."""
1512 return (self.part_sigs
1513 or self.script_sig is not None
1514 or self.witness is not None)
1515
1516
1517 class PartialTxOutput(TxOutput, PSBTSection):
1518 def __init__(self, *args, **kwargs):
1519 TxOutput.__init__(self, *args, **kwargs)
1520 self.redeem_script = None # type: Optional[bytes]
1521 self.witness_script = None # type: Optional[bytes]
1522 self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path)
1523 self._unknown = {} # type: Dict[bytes, bytes]
1524
1525 self.script_type = 'unknown'
1526 self.num_sig = 0 # num req sigs for multisig
1527 self.pubkeys = [] # type: List[bytes] # note: order matters
1528 self.is_mine = False # type: bool # whether the wallet considers the output to be ismine
1529 self.is_change = False # type: bool # whether the wallet considers the output to be change
1530
1531 def to_json(self):
1532 d = super().to_json()
1533 d.update({
1534 'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
1535 'witness_script': self.witness_script.hex() if self.witness_script else None,
1536 'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1537 for pubkey, (xfp, path) in self.bip32_paths.items()},
1538 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1539 })
1540 return d
1541
1542 @classmethod
1543 def from_txout(cls, txout: TxOutput) -> 'PartialTxOutput':
1544 res = PartialTxOutput(scriptpubkey=txout.scriptpubkey,
1545 value=txout.value)
1546 return res
1547
1548 def parse_psbt_section_kv(self, kt, key, val):
1549 try:
1550 kt = PSBTOutputType(kt)
1551 except ValueError:
1552 pass # unknown type
1553 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
1554 if kt == PSBTOutputType.REDEEM_SCRIPT:
1555 if self.redeem_script is not None:
1556 raise SerializationError(f"duplicate key: {repr(kt)}")
1557 self.redeem_script = val
1558 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1559 elif kt == PSBTOutputType.WITNESS_SCRIPT:
1560 if self.witness_script is not None:
1561 raise SerializationError(f"duplicate key: {repr(kt)}")
1562 self.witness_script = val
1563 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1564 elif kt == PSBTOutputType.BIP32_DERIVATION:
1565 if key in self.bip32_paths:
1566 raise SerializationError(f"duplicate key: {repr(kt)}")
1567 if len(key) not in (33, 65): # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32...
1568 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
1569 self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
1570 else:
1571 full_key = self.get_fullkey_from_keytype_and_key(kt, key)
1572 if full_key in self._unknown:
1573 raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}')
1574 self._unknown[full_key] = val
1575
1576 def serialize_psbt_section_kvs(self, wr):
1577 if self.redeem_script is not None:
1578 wr(PSBTOutputType.REDEEM_SCRIPT, self.redeem_script)
1579 if self.witness_script is not None:
1580 wr(PSBTOutputType.WITNESS_SCRIPT, self.witness_script)
1581 for k in sorted(self.bip32_paths):
1582 packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
1583 wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k)
1584 for full_key, val in sorted(self._unknown.items()):
1585 key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
1586 wr(key_type, val, key=key)
1587
1588 def combine_with_other_txout(self, other_txout: 'TxOutput') -> None:
1589 assert self.scriptpubkey == other_txout.scriptpubkey
1590 if not isinstance(other_txout, PartialTxOutput):
1591 return
1592 if other_txout.redeem_script is not None:
1593 self.redeem_script = other_txout.redeem_script
1594 if other_txout.witness_script is not None:
1595 self.witness_script = other_txout.witness_script
1596 self.bip32_paths.update(other_txout.bip32_paths)
1597 self._unknown.update(other_txout._unknown)
1598
1599
1600 class PartialTransaction(Transaction):
1601
1602 def __init__(self):
1603 Transaction.__init__(self, None)
1604 self.xpubs = {} # type: Dict[BIP32Node, Tuple[bytes, Sequence[int]]] # intermediate bip32node -> (xfp, der_prefix)
1605 self._inputs = [] # type: List[PartialTxInput]
1606 self._outputs = [] # type: List[PartialTxOutput]
1607 self._unknown = {} # type: Dict[bytes, bytes]
1608
1609 def to_json(self) -> dict:
1610 d = super().to_json()
1611 d.update({
1612 'xpubs': {bip32node.to_xpub(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
1613 for bip32node, (xfp, path) in self.xpubs.items()},
1614 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
1615 })
1616 return d
1617
1618 @classmethod
1619 def from_tx(cls, tx: Transaction) -> 'PartialTransaction':
1620 res = cls()
1621 res._inputs = [PartialTxInput.from_txin(txin, strip_witness=True)
1622 for txin in tx.inputs()]
1623 res._outputs = [PartialTxOutput.from_txout(txout) for txout in tx.outputs()]
1624 res.version = tx.version
1625 res.locktime = tx.locktime
1626 return res
1627
1628 @classmethod
1629 def from_raw_psbt(cls, raw) -> 'PartialTransaction':
1630 # auto-detect and decode Base64 and Hex.
1631 if raw[0:10].lower() in (b'70736274ff', '70736274ff'): # hex
1632 raw = bytes.fromhex(raw)
1633 elif raw[0:6] in (b'cHNidP', 'cHNidP'): # base64
1634 raw = base64.b64decode(raw)
1635 if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
1636 raise BadHeaderMagic("bad magic")
1637
1638 tx = None # type: Optional[PartialTransaction]
1639
1640 # We parse the raw stream twice. The first pass is used to find the
1641 # PSBT_GLOBAL_UNSIGNED_TX key in the global section and set 'tx'.
1642 # The second pass does everything else.
1643 with io.BytesIO(raw[5:]) as fd: # parsing "first pass"
1644 while True:
1645 try:
1646 kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
1647 except StopIteration:
1648 break
1649 try:
1650 kt = PSBTGlobalType(kt)
1651 except ValueError:
1652 pass # unknown type
1653 if kt == PSBTGlobalType.UNSIGNED_TX:
1654 if tx is not None:
1655 raise SerializationError(f"duplicate key: {repr(kt)}")
1656 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1657 unsigned_tx = Transaction(val.hex())
1658 for txin in unsigned_tx.inputs():
1659 if txin.script_sig or txin.witness:
1660 raise SerializationError(f"PSBT {repr(kt)} must have empty scriptSigs and witnesses")
1661 tx = PartialTransaction.from_tx(unsigned_tx)
1662
1663 if tx is None:
1664 raise SerializationError(f"PSBT missing required global section PSBT_GLOBAL_UNSIGNED_TX")
1665
1666 with io.BytesIO(raw[5:]) as fd: # parsing "second pass"
1667 # global section
1668 while True:
1669 try:
1670 kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
1671 except StopIteration:
1672 break
1673 try:
1674 kt = PSBTGlobalType(kt)
1675 except ValueError:
1676 pass # unknown type
1677 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
1678 if kt == PSBTGlobalType.UNSIGNED_TX:
1679 pass # already handled during "first" parsing pass
1680 elif kt == PSBTGlobalType.XPUB:
1681 bip32node = BIP32Node.from_bytes(key)
1682 if bip32node in tx.xpubs:
1683 raise SerializationError(f"duplicate key: {repr(kt)}")
1684 xfp, path = unpack_bip32_root_fingerprint_and_int_path(val)
1685 if bip32node.depth != len(path):
1686 raise SerializationError(f"PSBT global xpub has mismatching depth ({bip32node.depth}) "
1687 f"and derivation prefix len ({len(path)})")
1688 child_number_of_xpub = int.from_bytes(bip32node.child_number, 'big')
1689 if not ((bip32node.depth == 0 and child_number_of_xpub == 0)
1690 or (bip32node.depth != 0 and child_number_of_xpub == path[-1])):
1691 raise SerializationError(f"PSBT global xpub has inconsistent child_number and derivation prefix")
1692 tx.xpubs[bip32node] = xfp, path
1693 elif kt == PSBTGlobalType.VERSION:
1694 if len(val) > 4:
1695 raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)} > 4")
1696 psbt_version = int.from_bytes(val, byteorder='little', signed=False)
1697 if psbt_version > 0:
1698 raise SerializationError(f"Only PSBTs with version 0 are supported. Found version: {psbt_version}")
1699 if key: raise SerializationError(f"key for {repr(kt)} must be empty")
1700 else:
1701 full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key)
1702 if full_key in tx._unknown:
1703 raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}')
1704 tx._unknown[full_key] = val
1705 try:
1706 # inputs sections
1707 for txin in tx.inputs():
1708 if DEBUG_PSBT_PARSING: print("-> new input starts")
1709 txin._populate_psbt_fields_from_fd(fd)
1710 # outputs sections
1711 for txout in tx.outputs():
1712 if DEBUG_PSBT_PARSING: print("-> new output starts")
1713 txout._populate_psbt_fields_from_fd(fd)
1714 except UnexpectedEndOfStream:
1715 raise UnexpectedEndOfStream('Unexpected end of stream. Num input and output maps provided does not match unsigned tx.') from None
1716
1717 if fd.read(1) != b'':
1718 raise SerializationError("extra junk at the end of PSBT")
1719
1720 for txin in tx.inputs():
1721 txin.validate_data()
1722
1723 return tx
1724
1725 @classmethod
1726 def from_io(cls, inputs: Sequence[PartialTxInput], outputs: Sequence[PartialTxOutput], *,
1727 locktime: int = None, version: int = None):
1728 self = cls()
1729 self._inputs = list(inputs)
1730 self._outputs = list(outputs)
1731 if locktime is not None:
1732 self.locktime = locktime
1733 if version is not None:
1734 self.version = version
1735 self.BIP69_sort()
1736 return self
1737
1738 def _serialize_psbt(self, fd) -> None:
1739 wr = PSBTSection.create_psbt_writer(fd)
1740 fd.write(b'psbt\xff')
1741 # global section
1742 wr(PSBTGlobalType.UNSIGNED_TX, bfh(self.serialize_to_network(include_sigs=False)))
1743 for bip32node, (xfp, path) in sorted(self.xpubs.items()):
1744 val = pack_bip32_root_fingerprint_and_int_path(xfp, path)
1745 wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes())
1746 for full_key, val in sorted(self._unknown.items()):
1747 key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key)
1748 wr(key_type, val, key=key)
1749 fd.write(b'\x00') # section-separator
1750 # input sections
1751 for inp in self._inputs:
1752 inp._serialize_psbt_section(fd)
1753 # output sections
1754 for outp in self._outputs:
1755 outp._serialize_psbt_section(fd)
1756
1757 def finalize_psbt(self) -> None:
1758 for txin in self.inputs():
1759 txin.finalize()
1760
1761 def combine_with_other_psbt(self, other_tx: 'Transaction') -> None:
1762 """Pulls in all data from other_tx we don't yet have (e.g. signatures).
1763 other_tx must be concerning the same unsigned tx.
1764 """
1765 if self.serialize_to_network(include_sigs=False) != other_tx.serialize_to_network(include_sigs=False):
1766 raise Exception('A Combiner must not combine two different PSBTs.')
1767 # BIP-174: "The resulting PSBT must contain all of the key-value pairs from each of the PSBTs.
1768 # The Combiner must remove any duplicate key-value pairs, in accordance with the specification."
1769 # global section
1770 if isinstance(other_tx, PartialTransaction):
1771 self.xpubs.update(other_tx.xpubs)
1772 self._unknown.update(other_tx._unknown)
1773 # input sections
1774 for txin, other_txin in zip(self.inputs(), other_tx.inputs()):
1775 txin.combine_with_other_txin(other_txin)
1776 # output sections
1777 for txout, other_txout in zip(self.outputs(), other_tx.outputs()):
1778 txout.combine_with_other_txout(other_txout)
1779 self.invalidate_ser_cache()
1780
1781 def join_with_other_psbt(self, other_tx: 'PartialTransaction') -> None:
1782 """Adds inputs and outputs from other_tx into this one."""
1783 if not isinstance(other_tx, PartialTransaction):
1784 raise Exception('Can only join partial transactions.')
1785 # make sure there are no duplicate prevouts
1786 prevouts = set()
1787 for txin in itertools.chain(self.inputs(), other_tx.inputs()):
1788 prevout_str = txin.prevout.to_str()
1789 if prevout_str in prevouts:
1790 raise Exception(f"Duplicate inputs! "
1791 f"Transactions that spend the same prevout cannot be joined.")
1792 prevouts.add(prevout_str)
1793 # copy global PSBT section
1794 self.xpubs.update(other_tx.xpubs)
1795 self._unknown.update(other_tx._unknown)
1796 # copy and add inputs and outputs
1797 self.add_inputs(list(other_tx.inputs()))
1798 self.add_outputs(list(other_tx.outputs()))
1799 self.remove_signatures()
1800 self.invalidate_ser_cache()
1801
1802 def inputs(self) -> Sequence[PartialTxInput]:
1803 return self._inputs
1804
1805 def outputs(self) -> Sequence[PartialTxOutput]:
1806 return self._outputs
1807
1808 def add_inputs(self, inputs: List[PartialTxInput]) -> None:
1809 self._inputs.extend(inputs)
1810 self.BIP69_sort(outputs=False)
1811 self.invalidate_ser_cache()
1812
1813 def add_outputs(self, outputs: List[PartialTxOutput]) -> None:
1814 self._outputs.extend(outputs)
1815 self.BIP69_sort(inputs=False)
1816 self.invalidate_ser_cache()
1817
1818 def set_rbf(self, rbf: bool) -> None:
1819 nSequence = 0xffffffff - (2 if rbf else 1)
1820 for txin in self.inputs():
1821 txin.nsequence = nSequence
1822 self.invalidate_ser_cache()
1823
1824 def BIP69_sort(self, inputs=True, outputs=True):
1825 # NOTE: other parts of the code rely on these sorts being *stable* sorts
1826 if inputs:
1827 self._inputs.sort(key = lambda i: (i.prevout.txid, i.prevout.out_idx))
1828 if outputs:
1829 self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey))
1830 self.invalidate_ser_cache()
1831
1832 def input_value(self) -> int:
1833 input_values = [txin.value_sats() for txin in self.inputs()]
1834 if any([val is None for val in input_values]):
1835 raise MissingTxInputAmount()
1836 return sum(input_values)
1837
1838 def output_value(self) -> int:
1839 return sum(o.value for o in self.outputs())
1840
1841 def get_fee(self) -> Optional[int]:
1842 try:
1843 return self.input_value() - self.output_value()
1844 except MissingTxInputAmount:
1845 return None
1846
1847 def serialize_preimage(self, txin_index: int, *,
1848 bip143_shared_txdigest_fields: BIP143SharedTxDigestFields = None) -> str:
1849 nVersion = int_to_hex(self.version, 4)
1850 nLocktime = int_to_hex(self.locktime, 4)
1851 inputs = self.inputs()
1852 outputs = self.outputs()
1853 txin = inputs[txin_index]
1854 sighash = txin.sighash if txin.sighash is not None else SIGHASH_ALL
1855 if sighash != SIGHASH_ALL:
1856 raise Exception("only SIGHASH_ALL signing is supported!")
1857 nHashType = int_to_hex(sighash, 4)
1858 preimage_script = self.get_preimage_script(txin)
1859 if txin.is_segwit():
1860 if bip143_shared_txdigest_fields is None:
1861 bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields()
1862 hashPrevouts = bip143_shared_txdigest_fields.hashPrevouts
1863 hashSequence = bip143_shared_txdigest_fields.hashSequence
1864 hashOutputs = bip143_shared_txdigest_fields.hashOutputs
1865 outpoint = txin.prevout.serialize_to_network().hex()
1866 scriptCode = var_int(len(preimage_script) // 2) + preimage_script
1867 amount = int_to_hex(txin.value_sats(), 8)
1868 nSequence = int_to_hex(txin.nsequence, 4)
1869 preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType
1870 else:
1871 txins = var_int(len(inputs)) + ''.join(self.serialize_input(txin, preimage_script if txin_index==k else '')
1872 for k, txin in enumerate(inputs))
1873 txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs)
1874 preimage = nVersion + txins + txouts + nLocktime + nHashType
1875 return preimage
1876
1877 def sign(self, keypairs) -> None:
1878 # keypairs: pubkey_hex -> (secret_bytes, is_compressed)
1879 bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields()
1880 for i, txin in enumerate(self.inputs()):
1881 pubkeys = [pk.hex() for pk in txin.pubkeys]
1882 for pubkey in pubkeys:
1883 if txin.is_complete():
1884 break
1885 if pubkey not in keypairs:
1886 continue
1887 _logger.info(f"adding signature for {pubkey}")
1888 sec, compressed = keypairs[pubkey]
1889 sig = self.sign_txin(i, sec, bip143_shared_txdigest_fields=bip143_shared_txdigest_fields)
1890 self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig)
1891
1892 _logger.debug(f"is_complete {self.is_complete()}")
1893 self.invalidate_ser_cache()
1894
1895 def sign_txin(self, txin_index, privkey_bytes, *, bip143_shared_txdigest_fields=None) -> str:
1896 txin = self.inputs()[txin_index]
1897 txin.validate_data(for_signing=True)
1898 pre_hash = sha256d(bfh(self.serialize_preimage(txin_index,
1899 bip143_shared_txdigest_fields=bip143_shared_txdigest_fields)))
1900 privkey = ecc.ECPrivkey(privkey_bytes)
1901 sig = privkey.sign_transaction(pre_hash)
1902 sig = bh2u(sig) + '01' # SIGHASH_ALL
1903 return sig
1904
1905 def is_complete(self) -> bool:
1906 return all([txin.is_complete() for txin in self.inputs()])
1907
1908 def signature_count(self) -> Tuple[int, int]:
1909 s = 0 # "num Sigs we have"
1910 r = 0 # "Required"
1911 for txin in self.inputs():
1912 if txin.is_coinbase_input():
1913 continue
1914 signatures = list(txin.part_sigs.values())
1915 s += len(signatures)
1916 r += txin.num_sig
1917 return s, r
1918
1919 def serialize(self) -> str:
1920 """Returns PSBT as base64 text, or raw hex of network tx (if complete)."""
1921 self.finalize_psbt()
1922 if self.is_complete():
1923 return Transaction.serialize(self)
1924 return self._serialize_as_base64()
1925
1926 def serialize_as_bytes(self, *, force_psbt: bool = False) -> bytes:
1927 """Returns PSBT as raw bytes, or raw bytes of network tx (if complete)."""
1928 self.finalize_psbt()
1929 if force_psbt or not self.is_complete():
1930 with io.BytesIO() as fd:
1931 self._serialize_psbt(fd)
1932 return fd.getvalue()
1933 else:
1934 return Transaction.serialize_as_bytes(self)
1935
1936 def _serialize_as_base64(self) -> str:
1937 raw_bytes = self.serialize_as_bytes()
1938 return base64.b64encode(raw_bytes).decode('ascii')
1939
1940 def update_signatures(self, signatures: Sequence[str]):
1941 """Add new signatures to a transaction
1942
1943 `signatures` is expected to be a list of sigs with signatures[i]
1944 intended for self._inputs[i].
1945 This is used by the Trezor, KeepKey and Safe-T plugins.
1946 """
1947 if self.is_complete():
1948 return
1949 if len(self.inputs()) != len(signatures):
1950 raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures)))
1951 for i, txin in enumerate(self.inputs()):
1952 pubkeys = [pk.hex() for pk in txin.pubkeys]
1953 sig = signatures[i]
1954 if bfh(sig) in list(txin.part_sigs.values()):
1955 continue
1956 pre_hash = sha256d(bfh(self.serialize_preimage(i)))
1957 sig_string = ecc.sig_string_from_der_sig(bfh(sig[:-2]))
1958 for recid in range(4):
1959 try:
1960 public_key = ecc.ECPubkey.from_sig_string(sig_string, recid, pre_hash)
1961 except ecc.InvalidECPointException:
1962 # the point might not be on the curve for some recid values
1963 continue
1964 pubkey_hex = public_key.get_public_key_hex(compressed=True)
1965 if pubkey_hex in pubkeys:
1966 try:
1967 public_key.verify_message_hash(sig_string, pre_hash)
1968 except Exception:
1969 _logger.exception('')
1970 continue
1971 _logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_hex}, sig={sig}")
1972 self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_hex, sig=sig)
1973 break
1974 # redo raw
1975 self.invalidate_ser_cache()
1976
1977 def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: str, sig: str):
1978 txin = self._inputs[txin_idx]
1979 txin.part_sigs[bfh(signing_pubkey)] = bfh(sig)
1980 # force re-serialization
1981 txin.script_sig = None
1982 txin.witness = None
1983 self.invalidate_ser_cache()
1984
1985 def add_info_from_wallet(
1986 self,
1987 wallet: 'Abstract_Wallet',
1988 *,
1989 include_xpubs: bool = False,
1990 ignore_network_issues: bool = True,
1991 ) -> None:
1992 if self.is_complete():
1993 return
1994 # only include xpubs for multisig wallets; currently only they need it in practice
1995 # note: coldcard fw have a limitation that if they are included then all
1996 # inputs are assumed to be multisig... https://github.com/spesmilo/electrum/pull/5440#issuecomment-549504761
1997 # note: trezor plugin needs xpubs included, if there are multisig inputs/change_outputs
1998 from .wallet import Multisig_Wallet
1999 if include_xpubs and isinstance(wallet, Multisig_Wallet):
2000 from .keystore import Xpub
2001 for ks in wallet.get_keystores():
2002 if isinstance(ks, Xpub):
2003 fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(
2004 der_suffix=[], only_der_suffix=False)
2005 xpub = ks.get_xpub_to_be_used_in_partial_tx(only_der_suffix=False)
2006 bip32node = BIP32Node.from_xkey(xpub)
2007 self.xpubs[bip32node] = (fp_bytes, der_full)
2008 for txin in self.inputs():
2009 wallet.add_input_info(
2010 txin,
2011 only_der_suffix=False,
2012 ignore_network_issues=ignore_network_issues,
2013 )
2014 for txout in self.outputs():
2015 wallet.add_output_info(
2016 txout,
2017 only_der_suffix=False,
2018 )
2019
2020 def remove_xpubs_and_bip32_paths(self) -> None:
2021 self.xpubs.clear()
2022 for txin in self.inputs():
2023 txin.bip32_paths.clear()
2024 for txout in self.outputs():
2025 txout.bip32_paths.clear()
2026
2027 def prepare_for_export_for_coinjoin(self) -> None:
2028 """Removes all sensitive details."""
2029 # globals
2030 self.xpubs.clear()
2031 self._unknown.clear()
2032 # inputs
2033 for txin in self.inputs():
2034 txin.bip32_paths.clear()
2035 # outputs
2036 for txout in self.outputs():
2037 txout.redeem_script = None
2038 txout.witness_script = None
2039 txout.bip32_paths.clear()
2040 txout._unknown.clear()
2041
2042 def convert_all_utxos_to_witness_utxos(self) -> None:
2043 """Replaces all NON-WITNESS-UTXOs with WITNESS-UTXOs.
2044 This will likely make an exported PSBT invalid spec-wise,
2045 but it makes e.g. QR codes significantly smaller.
2046 """
2047 for txin in self.inputs():
2048 txin.convert_utxo_to_witness_utxo()
2049
2050 def remove_signatures(self):
2051 for txin in self.inputs():
2052 txin.part_sigs = {}
2053 txin.script_sig = None
2054 txin.witness = None
2055 assert not self.is_complete()
2056 self.invalidate_ser_cache()
2057
2058 def update_txin_script_type(self):
2059 """Determine the script_type of each input by analyzing the scripts.
2060 It updates all tx-Inputs, NOT only the wallet owned, if the
2061 scriptpubkey is present.
2062 """
2063 for txin in self.inputs():
2064 if txin.script_type in ('unknown', 'address'):
2065 txin.set_script_type()
2066
2067 def pack_bip32_root_fingerprint_and_int_path(xfp: bytes, path: Sequence[int]) -> bytes:
2068 if len(xfp) != 4:
2069 raise Exception(f'unexpected xfp length. xfp={xfp}')
2070 return xfp + b''.join(i.to_bytes(4, byteorder='little', signed=False) for i in path)
2071
2072
2073 def unpack_bip32_root_fingerprint_and_int_path(path: bytes) -> Tuple[bytes, Sequence[int]]:
2074 if len(path) % 4 != 0:
2075 raise Exception(f'unexpected packed path length. path={path.hex()}')
2076 xfp = path[0:4]
2077 int_path = [int.from_bytes(b, byteorder='little', signed=False) for b in chunks(path[4:], 4)]
2078 return xfp, int_path