URI: 
       ttest_electrum_protocol.py - electrum-personal-server - Maximally lightweight electrum server for a single user
  HTML git clone https://git.parazyd.org/electrum-personal-server
   DIR Log
   DIR Files
   DIR Refs
   DIR README
       ---
       ttest_electrum_protocol.py (9449B)
       ---
            1 
            2 import pytest
            3 import logging
            4 import json
            5 
            6 from electrumpersonalserver.server import (
            7     TransactionMonitor,
            8     JsonRpcError,
            9     ElectrumProtocol,
           10     get_block_header,
           11     get_current_header,
           12     get_block_headers_hex,
           13     JsonRpcError,
           14     get_status_electrum
           15 )
           16 
           17 logger = logging.getLogger('ELECTRUMPERSONALSERVER-TEST')
           18 logger.setLevel(logging.DEBUG)
           19 
           20 DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT = 100000
           21 
           22 def get_dummy_hash_from_height(height):
           23     if height == 0:
           24         return "00"*32
           25     return str(height) + "a"*(64 - len(str(height)))
           26 
           27 def get_height_from_dummy_hash(hhash):
           28     if hhash == "00"*32:
           29         return 0
           30     return int(hhash[:hhash.index("a")])
           31 
           32 class DummyJsonRpc(object):
           33     def __init__(self):
           34         self.calls = {}
           35         self.blockchain_height = DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT
           36 
           37     def call(self, method, params):
           38         if method not in self.calls:
           39             self.calls[method] = [0, []]
           40         self.calls[method][0] += 1
           41         self.calls[method][1].append(params)
           42         if method == "getbestblockhash":
           43             return get_dummy_hash_from_height(self.blockchain_height)
           44         elif method == "getblockhash":
           45             height = params[0]
           46             if height > self.blockchain_height:
           47                 raise JsonRpcError()
           48             return get_dummy_hash_from_height(height)
           49         elif method == "getblockheader":
           50             blockhash = params[0]
           51             height = get_height_from_dummy_hash(blockhash)
           52             header = {
           53                 "hash": blockhash,
           54                 "confirmations": self.blockchain_height - height + 1,
           55                 "height": height,
           56                 "version": 536870912,
           57                 "versionHex": "20000000",
           58                 "merkleroot": "aa"*32,
           59                 "time": height*100,
           60                 "mediantime": height*100,
           61                 "nonce": 1,
           62                 "bits": "207fffff",
           63                 "difficulty": 4.656542373906925e-10,
           64                 "chainwork": "000000000000000000000000000000000000000000000"
           65                     + "00000000000000000da",
           66                 "nTx": 1,
           67             }
           68             if height > 1:
           69                 header["previousblockhash"] = get_dummy_hash_from_height(
           70                     height - 1)
           71             elif height == 1:
           72                 header["previousblockhash"] = "00"*32 #genesis block
           73             elif height == 0:
           74                 pass #no prevblock for genesis
           75             else:
           76                 assert 0
           77             if height < self.blockchain_height:
           78                 header["nextblockhash"] = get_dummy_hash_from_height(height + 1)
           79             return header
           80         elif method == "gettransaction":
           81             for t in self.txlist:
           82                 if t["txid"] == params[0]:
           83                     return t
           84             raise JsonRpcError()
           85         else:
           86             raise ValueError("unknown method in dummy jsonrpc")
           87 
           88 def test_get_block_header():
           89     rpc = DummyJsonRpc()
           90     for height in [0, 1000]:
           91         for raw in [True, False]:
           92             blockhash = rpc.call("getblockhash", [height])
           93             ret = get_block_header(rpc, blockhash, raw)
           94             if raw:
           95                 assert type(ret) == dict
           96                 assert "hex" in ret
           97                 assert "height" in ret
           98                 assert len(ret["hex"]) == 160
           99             else:
          100                 assert type(ret) == dict
          101                 assert len(ret) == 7
          102 
          103 def test_get_current_header():
          104     rpc = DummyJsonRpc()
          105     for raw in [True, False]:
          106         ret = get_current_header(rpc, raw)
          107         assert type(ret[0]) == str
          108         assert len(ret[0]) == 64
          109         if raw:
          110             assert type(ret[1]) == dict
          111             assert "hex" in ret[1]
          112             assert "height" in ret[1]
          113             assert len(ret[1]["hex"]) == 160
          114         else:
          115             assert type(ret[1]) == dict
          116             assert len(ret[1]) == 7
          117 
          118 @pytest.mark.parametrize(
          119     "start_height, count",
          120     [(100, 200),
          121     (DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT + 10, 5),
          122     (DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT - 10, 15),
          123     (0, 250)
          124     ]
          125 )
          126 def test_get_block_headers_hex(start_height, count):
          127     rpc = DummyJsonRpc()
          128     ret = get_block_headers_hex(rpc, start_height, count)
          129     print("start_height=" + str(start_height) + " count=" + str(count))
          130     assert len(ret) == 2
          131     available_blocks = -min(0, start_height - DUMMY_JSONRPC_BLOCKCHAIN_HEIGHT
          132         - 1)
          133     expected_count = min(available_blocks, count)
          134     assert len(ret[0]) == expected_count*80*2 #80 bytes/header, 2 chars/byte
          135     assert ret[1] == expected_count
          136 
          137 @pytest.mark.parametrize(
          138     "invalid_json_query",
          139     [
          140         {"valid-json-no-method": 5}
          141     ]
          142 ) 
          143 def test_invalid_json_query_line(invalid_json_query):
          144     protocol = ElectrumProtocol(None, None, logger, None, None, None)
          145     with pytest.raises(IOError) as e:
          146         protocol.handle_query(invalid_json_query)
          147 
          148 def create_electrum_protocol_instance(broadcast_method="own-node",
          149         tor_hostport=("127.0.0.1", 9050),
          150         disable_mempool_fee_histogram=False):
          151     protocol = ElectrumProtocol(DummyJsonRpc(), DummyTransactionMonitor(),
          152         logger, broadcast_method, tor_hostport, disable_mempool_fee_histogram)
          153     sent_replies = []
          154     protocol.set_send_reply_fun(lambda l: sent_replies.append(l))
          155     assert len(sent_replies) == 0
          156     return protocol, sent_replies
          157 
          158 def dummy_script_hash_to_history(scrhash):
          159     index = int(scrhash[:scrhash.index("s")])
          160     tx_count = (index+2) % 5
          161     height = 500
          162     return [(index_to_dummy_txid(i), height) for i in range(tx_count)]
          163 
          164 def index_to_dummy_script_hash(index):
          165     return str(index) + "s"*(64 - len(str(index)))
          166 
          167 def index_to_dummy_txid(index):
          168     return str(index) + "t"*(64 - len(str(index)))
          169 
          170 def dummy_txid_to_dummy_tx(txid):
          171     return txid[::-1] * 6
          172 
          173 class DummyTransactionMonitor(object):
          174     def __init__(self):
          175         self.deterministic_wallets = list(range(5))
          176         self.address_history = list(range(5))
          177         self.subscribed_addresses = []
          178         self.history_hashes = {}
          179 
          180     def get_electrum_history_hash(self, scrhash):
          181         history = dummy_script_hash_to_history(scrhash)
          182         hhash = get_status_electrum(history)
          183         self.history_hashes[scrhash] = history
          184         return hhash
          185 
          186     def get_electrum_history(self, scrhash):
          187         return self.history_hashes[scrhash]
          188 
          189     def unsubscribe_all_addresses(self):
          190         self.subscribed_addresses = []
          191 
          192     def subscribe_address(self, scrhash):
          193         self.subscribed_addresses.append(scrhash)
          194         return True
          195 
          196     def get_address_balance(self, scrhash):
          197         pass
          198 
          199 def test_script_hash_sync():
          200     protocol, sent_replies = create_electrum_protocol_instance()
          201     scrhash_index = 0
          202     scrhash = index_to_dummy_script_hash(scrhash_index)
          203     protocol.handle_query({"method": "blockchain.scripthash.subscribe",
          204         "params": [scrhash], "id": 0})
          205     assert len(sent_replies) == 1
          206     assert len(protocol.txmonitor.subscribed_addresses) == 1
          207     assert protocol.txmonitor.subscribed_addresses[0] == scrhash
          208     assert len(sent_replies) == 1
          209     assert len(sent_replies[0]["result"]) == 64
          210     history_hash = sent_replies[0]["result"]
          211 
          212     protocol.handle_query({"method": "blockchain.scripthash.get_history",
          213         "params": [scrhash], "id": 0})
          214     assert len(sent_replies) == 2
          215     assert get_status_electrum(sent_replies[1]["result"]) == history_hash
          216 
          217     #updated scripthash but actually nothing changed, history_hash unchanged
          218     protocol.on_updated_scripthashes([scrhash])
          219     assert len(sent_replies) == 3
          220     assert sent_replies[2]["method"] == "blockchain.scripthash.subscribe"
          221     assert sent_replies[2]["params"][0] == scrhash
          222     assert sent_replies[2]["params"][1] == history_hash
          223 
          224     protocol.on_disconnect()
          225     assert len(protocol.txmonitor.subscribed_addresses) == 0
          226 
          227 def test_headers_subscribe():
          228     protocol, sent_replies = create_electrum_protocol_instance()
          229 
          230     protocol.handle_query({"method": "server.version", "params": ["test-code",
          231         1.4], "id": 0}) #protocol version of 1.4 means only raw headers used
          232     assert len(sent_replies) == 1
          233 
          234     protocol.handle_query({"method": "blockchain.headers.subscribe", "params":
          235         [], "id": 0})
          236     assert len(sent_replies) == 2
          237     assert "height" in sent_replies[1]["result"]
          238     assert sent_replies[1]["result"]["height"] == protocol.rpc.blockchain_height
          239     assert "hex" in sent_replies[1]["result"]
          240     assert len(sent_replies[1]["result"]["hex"]) == 80*2 #80 b/header, 2 b/char
          241 
          242     protocol.rpc.blockchain_height += 1
          243     new_bestblockhash, header = get_current_header(protocol.rpc,
          244         protocol.are_headers_raw)
          245     protocol.on_blockchain_tip_updated(header)
          246     assert len(sent_replies) == 3
          247     assert "method" in sent_replies[2]
          248     assert sent_replies[2]["method"] == "blockchain.headers.subscribe"
          249     assert "params" in sent_replies[2]
          250     assert "height" in sent_replies[2]["params"][0]
          251     assert sent_replies[2]["params"][0]["height"]\
          252         == protocol.rpc.blockchain_height
          253     assert "hex" in sent_replies[2]["params"][0]
          254     assert len(sent_replies[2]["params"][0]["hex"]) == 80*2 #80 b/header, 2 b/c
          255 
          256 def test_server_ping():
          257     protocol, sent_replies = create_electrum_protocol_instance()
          258     idd = 1
          259     protocol.handle_query({"method": "server.ping", "id": idd})
          260     assert len(sent_replies) == 1
          261     assert sent_replies[0]["result"] == None
          262     assert sent_replies[0]["id"] == idd
          263 
          264 #test scripthash.subscribe, scripthash.get_history transaction.get
          265 # transaction.get_merkle
          266