tmoved transaction monitoring code to a new file transactionmonitor.py - electrum-personal-server - Maximally lightweight electrum server for a single user HTML git clone https://git.parazyd.org/electrum-personal-server DIR Log DIR Files DIR Refs DIR README --- DIR commit 3a91eb324efc00ba935fc2467121b49937d22b6a DIR parent 8e80656387d16054d80842de41fa544132b9a68a HTML Author: chris-belcher <chris-belcher@users.noreply.github.com> Date: Wed, 21 Mar 2018 00:41:37 +0000 moved transaction monitoring code to a new file transactionmonitor.py Diffstat: M server.py | 348 +++---------------------------- A transactionmonitor.py | 326 +++++++++++++++++++++++++++++++ 2 files changed, 354 insertions(+), 320 deletions(-) --- DIR diff --git a/server.py b/server.py t@@ -3,12 +3,11 @@ #the electrum protocol uses hash(scriptpubkey) as a key for lookups # as an alternative to address or scriptpubkey -import socket, time, json, datetime, struct, binascii, math, pprint, ssl +import socket, time, json, datetime, struct, binascii, ssl from configparser import ConfigParser, NoSectionError -from decimal import Decimal from jsonrpc import JsonRpc, JsonRpcError -import hashes, merkleproof, deterministicwallet +import hashes, merkleproof, deterministicwallet, transactionmonitor ADDRESSES_LABEL = "electrum-watchonly-addresses" t@@ -31,7 +30,6 @@ Pruning: {pruning} ## global variables are actually mutable lists subscribed_to_headers = [False] bestblockhash = [None] -last_known_recent_txid = [None] #log for checking up/seeing your wallet, debug for when something has gone wrong def debugorlog(line, ttype): t@@ -55,14 +53,11 @@ def send_update(sock, update): sock.sendall(json.dumps(update).encode('utf-8') + b'\n') debug('<= ' + json.dumps(update)) -def on_heartbeat_listening(rpc, address_history, unconfirmed_txes, - deterministic_wallets): +def on_heartbeat_listening(txmonitor): debug("on heartbeat listening") - check_for_updated_txes(rpc, address_history, unconfirmed_txes, - deterministic_wallets) + txmonitor.check_for_updated_txes() -def on_heartbeat_connected(sock, rpc, address_history, unconfirmed_txes, - deterministic_wallets): +def on_heartbeat_connected(sock, rpc, txmonitor): debug("on heartbeat connected") is_tip_updated, header = check_for_new_blockchain_tip(rpc) if is_tip_updated: t@@ -71,23 +66,18 @@ def on_heartbeat_connected(sock, rpc, address_history, unconfirmed_txes, update = {"method": "blockchain.headers.subscribe", "params": [header]} send_update(sock, update) - updated_scripthashes = check_for_updated_txes(rpc, address_history, - unconfirmed_txes, deterministic_wallets) + updated_scripthashes = txmonitor.check_for_updated_txes() for scrhash in updated_scripthashes: - if not address_history[scrhash]["subscribed"]: - continue - history_hash = hashes.get_status_electrum( ((h["tx_hash"], h["height"]) - for h in address_history[scrhash]["history"]) ) + history_hash = txmonitor.get_electrum_history_hash(scrhash) update = {"method": "blockchain.scripthash.subscribe", "params": [scrhash, history_hash]} send_update(sock, update) -def on_disconnect(address_history): +def on_disconnect(txmonitor): subscribed_to_headers[0] = False - for srchash, his in address_history.items(): - his["subscribed"] = False + txmonitor.unsubscribe_all_addresses() -def handle_query(sock, line, rpc, address_history, deterministic_wallets): +def handle_query(sock, line, rpc, txmonitor): debug("=> " + line) try: query = json.loads(line) t@@ -123,20 +113,16 @@ def handle_query(sock, line, rpc, address_history, deterministic_wallets): send_response(sock, query, reply) elif method == "blockchain.scripthash.subscribe": scrhash = query["params"][0] - if scrhash in address_history: - address_history[scrhash]["subscribed"] = True - history_hash = hashes.get_status_electrum(( - (h["tx_hash"], h["height"]) - for h in address_history[scrhash]["history"])) + if txmonitor.subscribe_address(scrhash): + history_hash = txmonitor.get_electrum_history_hash(scrhash) else: log("WARNING: address scripthash not known to us: " + scrhash) history_hash = hashes.get_status_electrum([]) send_response(sock, query, history_hash) elif method == "blockchain.scripthash.get_history": scrhash = query["params"][0] - if scrhash in address_history: - history = address_history[scrhash]["history"] - else: + history = txmonitor.get_electrum_history(scrhash) + if history == None: history = [] log("WARNING: address scripthash history not known to us: " + scrhash) t@@ -180,7 +166,7 @@ def handle_query(sock, line, rpc, address_history, deterministic_wallets): debug("tx broadcast result = " + str(result)) send_response(sock, query, result) elif method == "mempool.get_fee_histogram": - result = [] #not handling, sending empty + result = [] #TODO not handling, sending empty send_response(sock, query, result) elif method == "blockchain.estimatefee": estimate = rpc.call("estimatesmartfee", [query["params"][0]]) t@@ -196,8 +182,8 @@ def handle_query(sock, line, rpc, address_history, deterministic_wallets): blockchaininfo = rpc.call("getblockchaininfo", []) uptime = rpc.call("uptime", []) send_response(sock, query, BANNER.format( - detwallets=len(deterministic_wallets), - addr=len(address_history), + detwallets=len(txmonitor.deterministic_wallets), + addr=len(txmonitor.address_history), useragent=networkinfo["subversion"], peers=networkinfo["connections"], uptime=str(datetime.timedelta(seconds=uptime)), t@@ -246,8 +232,7 @@ def create_server_socket(hostport): log("Listening on " + str(hostport)) return server_sock -def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes, - deterministic_wallets, poll_interval_listening, +def run_electrum_server(hostport, rpc, txmonitor, poll_interval_listening, poll_interval_connected, certfile, keyfile): log("Starting electrum server") server_sock = create_server_socket(hostport) t@@ -262,8 +247,7 @@ def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes, certfile=certfile, keyfile=keyfile, ssl_version=ssl.PROTOCOL_SSLv23) except socket.timeout: - on_heartbeat_listening(rpc, address_history, - unconfirmed_txes, deterministic_wallets) + on_heartbeat_listening(txmonitor) except ssl.SSLError: sock.close() sock = None t@@ -285,10 +269,9 @@ def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes, recv_buffer = recv_buffer[lb + 1:] lb = recv_buffer.find(b'\n') handle_query(sock, line.decode("utf-8"), rpc, - address_history, deterministic_wallets) + txmonitor) except socket.timeout: - on_heartbeat_connected(sock, rpc, address_history, - unconfirmed_txes, deterministic_wallets) + on_heartbeat_connected(sock, rpc, txmonitor) except (IOError, EOFError) as e: if isinstance(e, EOFError): log("Electrum wallet disconnected") t@@ -299,284 +282,9 @@ def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes, except IOError: pass sock = None - on_disconnect(address_history) + on_disconnect(txmonitor) time.sleep(0.2) -def get_input_and_output_scriptpubkeys(rpc, txid): - gettx = rpc.call("gettransaction", [txid]) - txd = rpc.call("decoderawtransaction", [gettx["hex"]]) - output_scriptpubkeys = [out["scriptPubKey"]["hex"] for out in txd["vout"]] - input_scriptpubkeys = [] - for inn in txd["vin"]: - try: - wallet_tx = rpc.call("gettransaction", [inn["txid"]]) - except JsonRpcError: - #wallet doesnt know about this tx, so the input isnt ours - continue - input_decoded = rpc.call("decoderawtransaction", [wallet_tx["hex"]]) - script = input_decoded["vout"][inn["vout"]]["scriptPubKey"]["hex"] - input_scriptpubkeys.append(script) - return output_scriptpubkeys, input_scriptpubkeys, txd - -def generate_new_history_element(rpc, tx, txd): - if tx["confirmations"] == 0: - unconfirmed_input = False - total_input_value = 0 - for inn in txd["vin"]: - utxo = rpc.call("gettxout", [inn["txid"], inn["vout"], True]) - if utxo is None: - utxo = rpc.call("gettxout", [inn["txid"], inn["vout"], False]) - if utxo is None: - debug("utxo not found(!)") - #TODO detect this and figure out how to tell - # electrum that we dont know the fee - total_input_value += int(Decimal(utxo["value"]) * Decimal(1e8)) - unconfirmed_input = unconfirmed_input or utxo["confirmations"] == 0 - debug("total_input_value = " + str(total_input_value)) - - fee = total_input_value - sum([int(Decimal(out["value"])*Decimal(1e8)) - for out in txd["vout"]]) - height = -1 if unconfirmed_input else 0 - new_history_element = ({"tx_hash": tx["txid"], "height": height, - "fee": fee}) - else: - blockheader = rpc.call("getblockheader", [tx['blockhash']]) - new_history_element = ({"tx_hash": tx["txid"], - "height": blockheader["height"]}) - return new_history_element - -def sort_address_history_list(his): - unconfirm_txes = list(filter(lambda h:h["height"] == 0, his["history"])) - confirm_txes = filter(lambda h:h["height"] != 0, his["history"]) - #TODO txes must be "in blockchain order" - # the order they appear in the block - # it might be "blockindex" in listtransactions and gettransaction - #so must sort with key height+':'+blockindex - #perhaps check if any heights are the same then get the pos only for those - #a better way to do this is to have a separate dict that isnt in history - # which maps txid => blockindex - # and then sort by key height+":"+idx[txid] - his["history"] = sorted(confirm_txes, key=lambda h:h["height"]) - his["history"].extend(unconfirm_txes) - return unconfirm_txes - -def check_for_updated_txes(rpc, address_history, unconfirmed_txes, - deterministic_wallets): - updated_srchashes1 = check_for_new_txes(rpc, address_history, - unconfirmed_txes, deterministic_wallets) - updated_srchashes2 = check_for_confirmations(rpc, address_history, - unconfirmed_txes) - updated_srchashes = updated_srchashes1 | updated_srchashes2 - for ush in updated_srchashes: - his = address_history[ush] - sort_address_history_list(his) - if len(updated_srchashes) > 0: - debug("new tx address_history =\n" + pprint.pformat(address_history)) - debug("unconfirmed txes = " + pprint.pformat(unconfirmed_txes)) - debug("updated_scripthashes = " + str(updated_srchashes)) - else: - debug("no updated txes") - return updated_srchashes - -def check_for_confirmations(rpc, address_history, unconfirmed_txes): - confirmed_txes_srchashes = [] - debug("check4con unconfirmed_txes = " + pprint.pformat(unconfirmed_txes)) - for uc_txid, srchashes in unconfirmed_txes.items(): - tx = rpc.call("gettransaction", [uc_txid]) - debug("uc_txid=" + uc_txid + " => " + str(tx)) - if tx["confirmations"] == 0: - continue #still unconfirmed - log("A transaction confirmed: " + uc_txid) - confirmed_txes_srchashes.append((uc_txid, srchashes)) - block = rpc.call("getblockheader", [tx["blockhash"]]) - for srchash in srchashes: - #delete the old unconfirmed entry in address_history - deleted_entries = [h for h in address_history[srchash][ - "history"] if h["tx_hash"] == uc_txid] - for d_his in deleted_entries: - address_history[srchash]["history"].remove(d_his) - #create the new confirmed entry in address_history - address_history[srchash]["history"].append({"height": - block["height"], "tx_hash": uc_txid}) - updated_srchashes = set() - for tx, srchashes in confirmed_txes_srchashes: - del unconfirmed_txes[tx] - updated_srchashes.update(set(srchashes)) - return updated_srchashes - -def check_for_new_txes(rpc, address_history, unconfirmed_txes, - deterministic_wallets): - MAX_TX_REQUEST_COUNT = 256 - tx_request_count = 2 - max_attempts = int(math.log(MAX_TX_REQUEST_COUNT, 2)) - for i in range(max_attempts): - debug("listtransactions tx_request_count=" + str(tx_request_count)) - ret = rpc.call("listtransactions", ["*", tx_request_count, 0, True]) - ret = ret[::-1] - if last_known_recent_txid[0] == None: - recent_tx_index = len(ret) #=0 means no new txes - break - else: - txid_list = [(tx["txid"], tx["address"]) for tx in ret] - recent_tx_index = next((i for i, (txid, addr) - in enumerate(txid_list) if - txid == last_known_recent_txid[0][0] and - addr == last_known_recent_txid[0][1]), -1) - if recent_tx_index != -1: - break - tx_request_count *= 2 - - #TODO low priority: handle a user getting more than 255 new - # transactions in 15 seconds - debug("recent tx index = " + str(recent_tx_index) + " ret = " + str(ret)) - # str([(t["txid"], t["address"]) for t in ret])) - if len(ret) > 0: - last_known_recent_txid[0] = (ret[0]["txid"], ret[0]["address"]) - debug("last_known_recent_txid = " + str(last_known_recent_txid[0])) - assert(recent_tx_index != -1) - if recent_tx_index == 0: - return set() - new_txes = ret[:recent_tx_index][::-1] - debug("new txes = " + str(new_txes)) - #tests: finding one unconfirmed tx, finding one confirmed tx - #sending a tx that has nothing to do with our wallets - #getting a new tx on a completely empty wallet - #finding a confirmed and unconfirmed tx, in that order, then both confirm - #finding an unconfirmed and confirmed tx, in that order, then both confirm - #send a tx to an address which hasnt been used before - #import two addresses, transaction from one to the other, unc then confirm - obtained_txids = set() - updated_scripthashes = [] - for tx in new_txes: - if "txid" not in tx or "category" not in tx: - continue - if tx["category"] not in ("receive", "send"): - continue - if tx["txid"] in obtained_txids: - continue - obtained_txids.add(tx["txid"]) - output_scriptpubkeys, input_scriptpubkeys, txd = \ - get_input_and_output_scriptpubkeys(rpc, tx["txid"]) - matching_scripthashes = [] - for spk in (output_scriptpubkeys + input_scriptpubkeys): - scripthash = hashes.script_to_scripthash(spk) - if scripthash in address_history: - matching_scripthashes.append(scripthash) - if len(matching_scripthashes) == 0: - continue - - for wal in deterministic_wallets: - overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit( - output_scriptpubkeys) - if overrun_depths != None: - for change, import_count in overrun_depths.items(): - spks = wal.get_new_scriptpubkeys(change, import_count) - new_addrs = [hashes.script_to_address(s, rpc) for s in spks] - debug("Importing " + str(len(spks)) + " into change=" - + str(change)) - import_addresses(rpc, new_addrs) - - updated_scripthashes.extend(matching_scripthashes) - new_history_element = generate_new_history_element(rpc, tx, txd) - log("Found new tx: " + str(new_history_element)) - for srchash in matching_scripthashes: - address_history[srchash]["history"].append(new_history_element) - if new_history_element["height"] == 0: - if tx["txid"] in unconfirmed_txes: - unconfirmed_txes[tx["txid"]].append(srchash) - else: - unconfirmed_txes[tx["txid"]] = [srchash] - #check whether the gap limits have been overrun and import more addrs - return set(updated_scripthashes) - -def build_address_history(rpc, monitored_scriptpubkeys, deterministic_wallets): - log("Building history with " + str(len(monitored_scriptpubkeys)) + - " addresses") - st = time.time() - address_history = {} - for spk in monitored_scriptpubkeys: - address_history[hashes.script_to_scripthash(spk)] = {'history': [], - 'subscribed': False} - wallet_addr_scripthashes = set(address_history.keys()) - #populate history - #which is a blockheight-ordered list of ("txhash", height) - #unconfirmed transactions go at the end as ("txhash", 0, fee) - # 0=unconfirmed -1=unconfirmed with unconfirmed parents - - BATCH_SIZE = 1000 - ret = list(range(BATCH_SIZE)) - t = 0 - count = 0 - obtained_txids = set() - while len(ret) == BATCH_SIZE: - ret = rpc.call("listtransactions", ["*", BATCH_SIZE, t, True]) - debug("listtransactions skip=" + str(t) + " len(ret)=" + str(len(ret))) - t += len(ret) - for tx in ret: - if "txid" not in tx or "category" not in tx: - continue - if tx["category"] not in ("receive", "send"): - continue - if tx["txid"] in obtained_txids: - continue - debug("adding obtained tx=" + str(tx["txid"])) - obtained_txids.add(tx["txid"]) - - #obtain all the addresses this transaction is involved with - output_scriptpubkeys, input_scriptpubkeys, txd = \ - get_input_and_output_scriptpubkeys(rpc, tx["txid"]) - output_scripthashes = [hashes.script_to_scripthash(sc) - for sc in output_scriptpubkeys] - sh_to_add = wallet_addr_scripthashes.intersection(set( - output_scripthashes)) - input_scripthashes = [hashes.script_to_scripthash(sc) - for sc in input_scriptpubkeys] - sh_to_add |= wallet_addr_scripthashes.intersection(set( - input_scripthashes)) - if len(sh_to_add) == 0: - continue - - for wal in deterministic_wallets: - overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit( - output_scriptpubkeys) - if overrun_depths != None: - log("ERROR: Not enough addresses imported. Exiting.") - log("Delete wallet.dat and increase the value of " + - "`initial_import_count` in the file `config.cfg` " + - "then reimport and rescan") - #TODO make it so users dont have to delete wallet.dat - # check whether all initial_import_count addresses are - # imported rather than just the first one - return None, None - new_history_element = generate_new_history_element(rpc, tx, txd) - for scripthash in sh_to_add: - address_history[scripthash][ - "history"].append(new_history_element) - count += 1 - - unconfirmed_txes = {} - for srchash, his in address_history.items(): - uctx = sort_address_history_list(his) - for u in uctx: - if u["tx_hash"] in unconfirmed_txes: - unconfirmed_txes[u["tx_hash"]].append(srchash) - else: - unconfirmed_txes[u["tx_hash"]] = [srchash] - debug("unconfirmed_txes = " + str(unconfirmed_txes)) - if len(ret) > 0: - #txid doesnt uniquely identify transactions from listtransactions - #but the tuple (txid, address) does - last_known_recent_txid[0] = (ret[-1]["txid"], ret[-1]["address"]) - else: - last_known_recent_txid[0] = None - debug("last_known_recent_txid = " + str(last_known_recent_txid[0])) - - et = time.time() - log("Found " + str(count) + " txes. History built in " + str(et - st) - + "sec") - debug("address_history =\n" + pprint.pformat(address_history)) - return address_history, unconfirmed_txes - def get_scriptpubkeys_to_monitor(rpc, config): imported_addresses = set(rpc.call("getaddressesbyaccount", [ADDRESSES_LABEL])) t@@ -693,9 +401,9 @@ def main(): "that the wallets are new\nand empty then there's no need to " + "rescan, just restart this script") else: - address_history, unconfirmed_txes = build_address_history( - rpc, relevant_spks_addrs, deterministic_wallets) - if address_history == None: + txmonitor = transactionmonitor.TransactionMonitor(rpc, + deterministic_wallets) + if not txmonitor.build_address_history(relevant_spks_addrs): return hostport = (config.get("electrum-server", "host"), int(config.get("electrum-server", "port"))) t@@ -705,8 +413,8 @@ def main(): "poll_interval_connected")) certfile = config.get("electrum-server", "certfile") keyfile = config.get("electrum-server", "keyfile") - run_electrum_server(hostport, rpc, address_history, unconfirmed_txes, - deterministic_wallets, poll_interval_listening, + run_electrum_server(hostport, rpc, txmonitor, poll_interval_listening, poll_interval_connected, certfile, keyfile) -main() +if __name__ == "__main__": + main() DIR diff --git a/transactionmonitor.py b/transactionmonitor.py t@@ -0,0 +1,326 @@ + +import time, pprint, math +from decimal import Decimal + +from jsonrpc import JsonRpcError +from server import debug, log, import_addresses +import hashes + +class TransactionMonitor(object): + def __init__(self, rpc, deterministic_wallets): + self.rpc = rpc + self.deterministic_wallets = deterministic_wallets + self.last_known_recent_txid = None + self.address_history = None + self.unconfirmed_txes = None + + def get_electrum_history_hash(self, scrhash): + return hashes.get_status_electrum( ((h["tx_hash"], h["height"]) + for h in self.address_history[scrhash]["history"]) ) + + def get_electrum_history(self, scrhash): + if scrhash in self.address_history: + return self.address_history[scrhash]["history"] + else: + return None + + def subscribe_address(self, scrhash): + if scrhash in self.address_history: + self.address_history[scrhash]["subscribed"] = True + return True + else: + return False + + def unsubscribe_all_addresses(self): + for srchash, his in self.address_history.items(): + his["subscribed"] = False + + def build_address_history(self, monitored_scriptpubkeys): + log("Building history with " + str(len(monitored_scriptpubkeys)) + + " addresses") + st = time.time() + address_history = {} + for spk in monitored_scriptpubkeys: + address_history[hashes.script_to_scripthash(spk)] = {'history': [], + 'subscribed': False} + wallet_addr_scripthashes = set(address_history.keys()) + #populate history + #which is a blockheight-ordered list of ("txhash", height) + #unconfirmed transactions go at the end as ("txhash", 0, fee) + # 0=unconfirmed -1=unconfirmed with unconfirmed parents + + BATCH_SIZE = 1000 + ret = list(range(BATCH_SIZE)) + t = 0 + count = 0 + obtained_txids = set() + while len(ret) == BATCH_SIZE: + ret = self.rpc.call("listtransactions", ["*", BATCH_SIZE, t, True]) + debug("listtransactions skip=" + str(t) + " len(ret)=" + + str(len(ret))) + t += len(ret) + for tx in ret: + if "txid" not in tx or "category" not in tx: + continue + if tx["category"] not in ("receive", "send"): + continue + if tx["txid"] in obtained_txids: + continue + debug("adding obtained tx=" + str(tx["txid"])) + obtained_txids.add(tx["txid"]) + + #obtain all the addresses this transaction is involved with + output_scriptpubkeys, input_scriptpubkeys, txd = \ + self.get_input_and_output_scriptpubkeys(tx["txid"]) + output_scripthashes = [hashes.script_to_scripthash(sc) + for sc in output_scriptpubkeys] + sh_to_add = wallet_addr_scripthashes.intersection(set( + output_scripthashes)) + input_scripthashes = [hashes.script_to_scripthash(sc) + for sc in input_scriptpubkeys] + sh_to_add |= wallet_addr_scripthashes.intersection(set( + input_scripthashes)) + if len(sh_to_add) == 0: + continue + + for wal in self.deterministic_wallets: + overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit( + output_scriptpubkeys) + if overrun_depths != None: + log("ERROR: Not enough addresses imported.") + log("Delete wallet.dat and increase the value " + + "of `initial_import_count` in the file " + + "`config.cfg` then reimport and rescan") + #TODO make it so users dont have to delete wallet.dat + # check whether all initial_import_count addresses are + # imported rather than just the first one + return False + new_history_element = self.generate_new_history_element(tx, txd) + for scripthash in sh_to_add: + address_history[scripthash][ + "history"].append(new_history_element) + count += 1 + + unconfirmed_txes = {} + for srchash, his in address_history.items(): + uctx = self.sort_address_history_list(his) + for u in uctx: + if u["tx_hash"] in unconfirmed_txes: + unconfirmed_txes[u["tx_hash"]].append(srchash) + else: + unconfirmed_txes[u["tx_hash"]] = [srchash] + debug("unconfirmed_txes = " + str(unconfirmed_txes)) + if len(ret) > 0: + #txid doesnt uniquely identify transactions from listtransactions + #but the tuple (txid, address) does + self.last_known_recent_txid = (ret[-1]["txid"], ret[-1]["address"]) + else: + self.last_known_recent_txid = None + debug("last_known_recent_txid = " + str(self.last_known_recent_txid)) + + et = time.time() + log("Found " + str(count) + " txes. History built in " + + str(et - st) + "sec") + debug("address_history =\n" + pprint.pformat(address_history)) + self.address_history = address_history + self.unconfirmed_txes = unconfirmed_txes + return True + + def get_input_and_output_scriptpubkeys(self, txid): + gettx = self.rpc.call("gettransaction", [txid]) + txd = self.rpc.call("decoderawtransaction", [gettx["hex"]]) + output_scriptpubkeys = [out["scriptPubKey"]["hex"] + for out in txd["vout"]] + input_scriptpubkeys = [] + for inn in txd["vin"]: + try: + wallet_tx = self.rpc.call("gettransaction", [inn["txid"]]) + except JsonRpcError: + #wallet doesnt know about this tx, so the input isnt ours + continue + input_decoded = self.rpc.call("decoderawtransaction", [wallet_tx[ + "hex"]]) + script = input_decoded["vout"][inn["vout"]]["scriptPubKey"]["hex"] + input_scriptpubkeys.append(script) + return output_scriptpubkeys, input_scriptpubkeys, txd + + def generate_new_history_element(self, tx, txd): + if tx["confirmations"] == 0: + unconfirmed_input = False + total_input_value = 0 + for inn in txd["vin"]: + utxo = self.rpc.call("gettxout", [inn["txid"], inn["vout"], + True]) + if utxo is None: + utxo = self.rpc.call("gettxout", [inn["txid"], inn["vout"], + False]) + if utxo is None: + debug("utxo not found(!)") + #TODO detect this and figure out how to tell + # electrum that we dont know the fee + total_input_value += int(Decimal(utxo["value"]) * Decimal(1e8)) + unconfirmed_input = (unconfirmed_input or + utxo["confirmations"] == 0) + debug("total_input_value = " + str(total_input_value)) + + fee = total_input_value - sum([int(Decimal(out["value"]) + * Decimal(1e8)) for out in txd["vout"]]) + height = -1 if unconfirmed_input else 0 + new_history_element = ({"tx_hash": tx["txid"], "height": height, + "fee": fee}) + else: + blockheader = self.rpc.call("getblockheader", [tx['blockhash']]) + new_history_element = ({"tx_hash": tx["txid"], + "height": blockheader["height"]}) + return new_history_element + + def sort_address_history_list(self, his): + unconfirm_txes = list(filter(lambda h:h["height"] == 0, his["history"])) + confirm_txes = filter(lambda h:h["height"] != 0, his["history"]) + #TODO txes must be "in blockchain order" + # the order they appear in the block + # it might be "blockindex" in listtransactions and gettransaction + #so must sort with key height+':'+blockindex + #maybe check if any heights are the same then get the pos only for those + #better way to do this is to have a separate dict that isnt in history + # which maps txid => blockindex + # and then sort by key height+":"+idx[txid] + his["history"] = sorted(confirm_txes, key=lambda h:h["height"]) + his["history"].extend(unconfirm_txes) + return unconfirm_txes + + def check_for_updated_txes(self): + updated_srchashes1 = self.check_for_new_txes() + updated_srchashes2 = self.check_for_confirmations() + updated_srchashes = updated_srchashes1 | updated_srchashes2 + for ush in updated_srchashes: + his = self.address_history[ush] + self.sort_address_history_list(his) + if len(updated_srchashes) > 0: + debug("new tx address_history =\n" + + pprint.pformat(self.address_history)) + debug("unconfirmed txes = " + pprint.pformat(self.unconfirmed_txes)) + debug("updated_scripthashes = " + str(updated_srchashes)) + else: + debug("no updated txes") + updated_srchashes = filter(lambda sh:self.address_history[sh][ + "subscribed"], updated_srchashes) + #TODO srchashes is misspelled, should be scrhashes + return updated_srchashes + + def check_for_confirmations(self): + confirmed_txes_srchashes = [] + debug("check4con unconfirmed_txes = " + + pprint.pformat(self.unconfirmed_txes)) + for uc_txid, srchashes in self.unconfirmed_txes.items(): + tx = self.rpc.call("gettransaction", [uc_txid]) + debug("uc_txid=" + uc_txid + " => " + str(tx)) + if tx["confirmations"] == 0: + continue #still unconfirmed + log("A transaction confirmed: " + uc_txid) + confirmed_txes_srchashes.append((uc_txid, srchashes)) + block = self.rpc.call("getblockheader", [tx["blockhash"]]) + for srchash in srchashes: + #delete the old unconfirmed entry in address_history + deleted_entries = [h for h in self.address_history[srchash][ + "history"] if h["tx_hash"] == uc_txid] + for d_his in deleted_entries: + self.address_history[srchash]["history"].remove(d_his) + #create the new confirmed entry in address_history + self.address_history[srchash]["history"].append({"height": + block["height"], "tx_hash": uc_txid}) + updated_srchashes = set() + for tx, srchashes in confirmed_txes_srchashes: + del self.unconfirmed_txes[tx] + updated_srchashes.update(set(srchashes)) + return updated_srchashes + + def check_for_new_txes(self): + MAX_TX_REQUEST_COUNT = 256 + tx_request_count = 2 + max_attempts = int(math.log(MAX_TX_REQUEST_COUNT, 2)) + for i in range(max_attempts): + debug("listtransactions tx_request_count=" + str(tx_request_count)) + ret = self.rpc.call("listtransactions", ["*", tx_request_count, 0, + True]) + ret = ret[::-1] + if self.last_known_recent_txid == None: + recent_tx_index = len(ret) #=0 means no new txes + break + else: + txid_list = [(tx["txid"], tx["address"]) for tx in ret] + recent_tx_index = next((i for i, (txid, addr) + in enumerate(txid_list) if + txid == self.last_known_recent_txid[0] and + addr == self.last_known_recent_txid[1]), -1) + if recent_tx_index != -1: + break + tx_request_count *= 2 + + #TODO low priority: handle a user getting more than 255 new + # transactions in 15 seconds + debug("recent tx index = " + str(recent_tx_index) + " ret = " + + str(ret)) + # str([(t["txid"], t["address"]) for t in ret])) + if len(ret) > 0: + self.last_known_recent_txid = (ret[0]["txid"], ret[0]["address"]) + debug("last_known_recent_txid = " + str( + self.last_known_recent_txid)) + assert(recent_tx_index != -1) + if recent_tx_index == 0: + return set() + new_txes = ret[:recent_tx_index][::-1] + debug("new txes = " + str(new_txes)) + #tests: finding one unconfirmed tx, finding one confirmed tx + #sending a tx that has nothing to do with our wallets + #getting a new tx on a completely empty wallet + #finding confirmed and unconfirmed tx, in that order, then both confirm + #finding unconfirmed and confirmed tx, in that order, then both confirm + #send a tx to an address which hasnt been used before + #import two addresses, transaction from one to the other + obtained_txids = set() + updated_scripthashes = [] + for tx in new_txes: + if "txid" not in tx or "category" not in tx: + continue + if tx["category"] not in ("receive", "send"): + continue + if tx["txid"] in obtained_txids: + continue + obtained_txids.add(tx["txid"]) + output_scriptpubkeys, input_scriptpubkeys, txd = \ + self.get_input_and_output_scriptpubkeys(tx["txid"]) + matching_scripthashes = [] + for spk in (output_scriptpubkeys + input_scriptpubkeys): + scripthash = hashes.script_to_scripthash(spk) + if scripthash in self.address_history: + matching_scripthashes.append(scripthash) + if len(matching_scripthashes) == 0: + continue + + for wal in self.deterministic_wallets: + overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit( + output_scriptpubkeys) + if overrun_depths != None: + for change, import_count in overrun_depths.items(): + spks = wal.get_new_scriptpubkeys(change, import_count) + new_addrs = [hashes.script_to_address(s, rpc) + for s in spks] + debug("Importing " + str(len(spks)) + " into change=" + + str(change)) + import_addresses(rpc, new_addrs) + + updated_scripthashes.extend(matching_scripthashes) + new_history_element = self.generate_new_history_element(tx, txd) + log("Found new tx: " + str(new_history_element)) + for srchash in matching_scripthashes: + self.address_history[srchash]["history"].append( + new_history_element) + if new_history_element["height"] == 0: + if tx["txid"] in self.unconfirmed_txes: + self.unconfirmed_txes[tx["txid"]].append(srchash) + else: + self.unconfirmed_txes[tx["txid"]] = [srchash] + #check whether gap limits have been overrun and import more addrs + return set(updated_scripthashes) +