database.py (4706B)
1 import sqlite3 2 import logging 3 4 5 def create_database(name="database.db"): 6 with sqlite3.connect(name) as conn: 7 logging.info("Creating new {}...".format(name)) 8 conn.execute( 9 "CREATE TABLE payments (uuid TEXT, fiat_value DECIMAL, btc_value DECIMAL, method TEXT, address TEXT, time DECIMAL, webhook TEXT, rhash TEXT)" 10 ) 11 return 12 13 14 def _get_database_schema_version(name="database.db"): 15 with sqlite3.connect(name) as conn: 16 return conn.execute("SELECT version FROM schema_version").fetchone()[0] 17 18 19 def _set_database_schema_version(version, name="database.db"): 20 with sqlite3.connect(name) as conn: 21 conn.execute("UPDATE schema_version SET version = {}".format(version)) 22 23 24 def _log_migrate_database(from_version, to_version, message): 25 logging.info( 26 "Migrating database from {} to {}: {}".format(from_version, to_version, message) 27 ) 28 29 30 def migrate_database(name="database.db"): 31 with sqlite3.connect(name) as conn: 32 version_table_exists = conn.execute( 33 "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'schema_version'" 34 ).fetchone() 35 if version_table_exists: 36 schema_version = _get_database_schema_version(name) 37 else: 38 schema_version = 0 39 40 if schema_version < 1: 41 _log_migrate_database(0, 1, "Creating new table for schema version") 42 with sqlite3.connect(name) as conn: 43 conn.execute("CREATE TABLE schema_version (version INT)") 44 conn.execute("INSERT INTO schema_version (version) VALUES (1)") 45 46 if schema_version < 2: 47 _log_migrate_database(1, 2, "Creating new table for generated addresses") 48 with sqlite3.connect(name) as conn: 49 conn.execute("CREATE TABLE addresses (n INTEGER, address TEXT, xpub TEXT)") 50 _set_database_schema_version(2) 51 52 if schema_version < 3: 53 _log_migrate_database(2, 3, "Adding base currency column to payments table") 54 with sqlite3.connect(name) as conn: 55 conn.execute("ALTER TABLE payments ADD fiat_currency TEXT") 56 _set_database_schema_version(3) 57 58 if schema_version < 4: 59 _log_migrate_database(3, 4, "Renaming fiat to base in payments table") 60 with sqlite3.connect(name) as conn: 61 conn.execute("ALTER TABLE payments RENAME fiat_value TO base_value") 62 conn.execute("ALTER TABLE payments RENAME fiat_currency TO base_currency") 63 _set_database_schema_version(4) 64 65 #if schema_version < 5: 66 # do next migration 67 68 new_version = _get_database_schema_version(name) 69 if schema_version != new_version: 70 logging.info( 71 "Finished migrating database schema from version {} to {}".format( 72 schema_version, new_version 73 ) 74 ) 75 76 77 def write_to_database(invoice, name="database.db"): 78 with sqlite3.connect(name) as conn: 79 cur = conn.cursor() 80 cur.execute( 81 "INSERT INTO payments (uuid,base_currency,base_value,btc_value,method,address,time,webhook,rhash) VALUES (?,?,?,?,?,?,?,?,?)", 82 ( 83 invoice["uuid"], 84 invoice["base_currency"], 85 invoice["base_value"], 86 invoice["btc_value"], 87 invoice["method"], 88 invoice["address"], 89 invoice["time"], 90 invoice["webhook"], 91 invoice["rhash"], 92 ), 93 ) 94 return 95 96 97 def load_invoices_from_db(where, name="database.db"): 98 with sqlite3.connect(name) as conn: 99 conn.row_factory = sqlite3.Row 100 cur = conn.cursor() 101 rows = cur.execute("SELECT * FROM payments WHERE {}".format(where)).fetchall() 102 return rows 103 104 105 def load_invoice_from_db(uuid, name="database.db"): 106 rows = load_invoices_from_db("uuid='{}'".format(uuid), name) 107 if len(rows) > 0: 108 return [dict(ix) for ix in rows][0] 109 else: 110 return None 111 112 113 def add_generated_address(index, address, xpub): 114 with sqlite3.connect("database.db") as conn: 115 cur = conn.cursor() 116 cur.execute( 117 "INSERT INTO addresses (n, address, xpub) VALUES (?,?,?)", 118 ( 119 index, 120 address, 121 xpub, 122 ), 123 ) 124 return 125 126 127 def get_next_address_index(xpub): 128 with sqlite3.connect("database.db") as conn: 129 conn.row_factory = sqlite3.Row 130 cur = conn.cursor() 131 addresses = cur.execute( 132 "SELECT n FROM addresses WHERE xpub='{}' ORDER BY n DESC LIMIT 1".format(xpub) 133 ).fetchall() 134 135 if len(addresses) == 0: 136 return 0 137 else: 138 return max([dict(addr)["n"] for addr in addresses]) + 1