tsql_db.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tsql_db.py (2392B)
---
1 import os
2 import concurrent
3 import queue
4 import threading
5 import asyncio
6 import sqlite3
7
8 from .logging import Logger
9 from .util import test_read_write_permissions
10
11
12 def sql(func):
13 """wrapper for sql methods"""
14 def wrapper(self: 'SqlDB', *args, **kwargs):
15 assert threading.currentThread() != self.sql_thread
16 f = asyncio.Future()
17 self.db_requests.put((f, func, args, kwargs))
18 return f
19 return wrapper
20
21
22 class SqlDB(Logger):
23
24 def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None):
25 Logger.__init__(self)
26 self.asyncio_loop = asyncio_loop
27 self.stopping = False
28 self.stopped_event = asyncio.Event()
29 self.path = path
30 test_read_write_permissions(path)
31 self.commit_interval = commit_interval
32 self.db_requests = queue.Queue()
33 self.sql_thread = threading.Thread(target=self.run_sql)
34 self.sql_thread.start()
35
36 def stop(self):
37 self.stopping = True
38
39 def filesize(self):
40 return os.stat(self.path).st_size
41
42 def run_sql(self):
43 self.logger.info("SQL thread started")
44 self.conn = sqlite3.connect(self.path)
45 self.logger.info("Creating database")
46 self.create_database()
47 i = 0
48 while not self.stopping and self.asyncio_loop.is_running():
49 try:
50 future, func, args, kwargs = self.db_requests.get(timeout=0.1)
51 except queue.Empty:
52 continue
53 try:
54 result = func(self, *args, **kwargs)
55 except BaseException as e:
56 self.asyncio_loop.call_soon_threadsafe(future.set_exception, e)
57 continue
58 if not future.cancelled():
59 self.asyncio_loop.call_soon_threadsafe(future.set_result, result)
60 # note: in sweepstore session.commit() is called inside
61 # the sql-decorated methods, so commiting to disk is awaited
62 if self.commit_interval:
63 i = (i + 1) % self.commit_interval
64 if i == 0:
65 self.conn.commit()
66 # write
67 self.conn.commit()
68 self.conn.close()
69
70 self.logger.info("SQL thread terminated")
71 self.asyncio_loop.call_soon_threadsafe(self.stopped_event.set)
72
73 def create_database(self):
74 raise NotImplementedError()