db lock issue, use one connection

This commit is contained in:
Yostra 2020-03-29 18:24:47 -07:00
parent 2ccdd3860d
commit 6e01eca711
13 changed files with 122 additions and 116 deletions

View File

@ -1,6 +1,5 @@
import asyncio
from typing import Dict, Optional, List
from pathlib import Path
import aiosqlite
from src.types.full_block import FullBlock
from src.types.hashable.coin import Coin
@ -37,12 +36,12 @@ class CoinStore:
cache_size: uint32
@classmethod
async def create(cls, db_path: Path, cache_size: uint32 = uint32(600000)):
async def create(cls, connection: aiosqlite.Connection, cache_size: uint32 = uint32(600000)):
self = cls()
self.cache_size = cache_size
# All full blocks which have been added to the blockchain. Header_hash -> block
self.coin_record_db = await aiosqlite.connect(db_path)
self.coin_record_db = connection
await self.coin_record_db.execute(
(
f"CREATE TABLE IF NOT EXISTS coin_record("
@ -81,9 +80,6 @@ class CoinStore:
self.head_diffs = dict()
return self
async def close(self):
await self.coin_record_db.close()
async def _clear_database(self):
cursor = await self.coin_record_db.execute("DELETE FROM coin_record")
await cursor.close()

View File

@ -1,7 +1,6 @@
import asyncio
import logging
import aiosqlite
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from src.types.hashable.program import Program
@ -58,11 +57,11 @@ class FullNodeStore:
lock: asyncio.Lock
@classmethod
async def create(cls, db_path: Path):
async def create(cls, connection):
self = cls()
# All full blocks which have been added to the blockchain. Header_hash -> block
self.db = await aiosqlite.connect(db_path)
self.db = connection
await self.db.execute(
"CREATE TABLE IF NOT EXISTS blocks(height bigint, header_hash text PRIMARY KEY, block blob)"
)
@ -110,9 +109,6 @@ class FullNodeStore:
self.lock = asyncio.Lock() # external
return self
async def close(self):
await self.db.close()
async def _clear_database(self):
async with self.lock:
await self.db.execute("DELETE FROM blocks")

View File

@ -3,6 +3,7 @@ import logging
import logging.config
import signal
import aiosqlite
import miniupnpc
try:
@ -39,11 +40,12 @@ async def main():
mkdir(db_path.parent)
# Create the store (DB) and full node instance
store = await FullNodeStore.create(db_path)
connection = await aiosqlite.connect(db_path)
store = await FullNodeStore.create(connection)
genesis: FullBlock = FullBlock.from_bytes(constants["GENESIS_BLOCK"])
await store.add_block(genesis)
unspent_store = await CoinStore.create(db_path)
unspent_store = await CoinStore.create(connection)
log.info("Initializing blockchain from disk")
blockchain = await Blockchain.create(unspent_store, store)
@ -129,11 +131,8 @@ async def main():
await rpc_cleanup()
log.info("Closed RPC server.")
await store.close()
log.info("Closed store.")
await unspent_store.close()
log.info("Closed unspent store.")
await connection.close()
log.info("Closed db connection.")
await asyncio.get_running_loop().shutdown_asyncgens()
log.info("Node fully closed.")

View File

@ -3,6 +3,8 @@ import logging
import logging.config
import signal
from src.path import mkdir, path_from_root
from pathlib import Path
import aiosqlite
from src.simulator.full_node_simulator import FullNodeSimulator
from src.simulator.simulator_constants import test_constants
@ -34,14 +36,15 @@ async def main():
db_path = path_from_root(config["simulator_database_path"])
mkdir(db_path.parent)
connection = await aiosqlite.connect(db_path)
# Create the store (DB) and full node instance
store = await FullNodeStore.create(db_path)
store = await FullNodeStore.create(connection)
await store._clear_database()
genesis: FullBlock = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
await store.add_block(genesis)
unspent_store = await CoinStore.create(db_path)
unspent_store = await CoinStore.create(connection)
log.info("Initializing blockchain from disk")
blockchain = await Blockchain.create(unspent_store, store, test_constants)

View File

@ -1,6 +1,5 @@
import asyncio
from typing import Set, Tuple, Optional
from pathlib import Path
import aiosqlite
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64
@ -17,12 +16,12 @@ class WalletPuzzleStore:
cache_size: uint32
@classmethod
async def create(cls, db_path: Path, cache_size: uint32 = uint32(600000)):
async def create(cls, connection: aiosqlite.Connection, cache_size: uint32 = uint32(600000)):
self = cls()
self.cache_size = cache_size
self.db_connection = await aiosqlite.connect(db_path)
self.db_connection = connection
await self.db_connection.execute(
(

View File

@ -4,6 +4,8 @@ from pathlib import Path
from typing import Dict, Optional, List, Set, Tuple, Callable
import logging
import asyncio
import aiosqlite
from chiabip158 import PyBIP158
from src.types.hashable.coin import Coin
@ -60,6 +62,7 @@ class WalletStateManager:
state_changed_callback: Optional[Callable]
pending_tx_callback: Optional[Callable]
db_path: Path
db_connection: aiosqlite.Connection
@staticmethod
async def create(
@ -75,10 +78,11 @@ class WalletStateManager:
self.log = logging.getLogger(__name__)
self.lock = asyncio.Lock()
self.wallet_store = await WalletStore.create(db_path)
self.tx_store = await WalletTransactionStore.create(db_path)
self.puzzle_store = await WalletPuzzleStore.create(db_path)
self.user_store = await WalletUserStore.create(db_path)
self.db_connection = await aiosqlite.connect(db_path)
self.wallet_store = await WalletStore.create(self.db_connection)
self.tx_store = await WalletTransactionStore.create(self.db_connection)
self.puzzle_store = await WalletPuzzleStore.create(self.db_connection)
self.user_store = await WalletUserStore.create(self.db_connection)
self.lca = None
self.sync_mode = False
self.height_to_hash = {}
@ -1035,10 +1039,7 @@ class WalletStateManager:
async def close_all_stores(self):
async with self.lock:
await self.wallet_store.close()
await self.tx_store.close()
await self.puzzle_store.close()
await self.user_store.close()
await self.db_connection.close()
async def clear_all_stores(self):
async with self.lock:

View File

@ -1,5 +1,4 @@
from typing import Dict, Optional, List, Set
from pathlib import Path
import aiosqlite
from src.types.hashable.coin import Coin
from src.wallet.block_record import BlockRecord
@ -20,12 +19,12 @@ class WalletStore:
cache_size: uint32
@classmethod
async def create(cls, db_path: Path, cache_size: uint32 = uint32(600000)):
async def create(cls, connection: aiosqlite.Connection, cache_size: uint32 = uint32(600000)):
self = cls()
self.cache_size = cache_size
self.db_connection = await aiosqlite.connect(db_path)
self.db_connection = connection
await self.db_connection.execute(
(
f"CREATE TABLE IF NOT EXISTS coin_record("
@ -75,9 +74,6 @@ class WalletStore:
self.coin_record_cache = dict()
return self
async def close(self):
await self.db_connection.close()
async def _clear_database(self):
cursor = await self.db_connection.execute("DELETE FROM coin_record")
await cursor.close()

View File

@ -1,5 +1,4 @@
from typing import Dict, Optional, List
from pathlib import Path
import aiosqlite
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint8
@ -18,12 +17,12 @@ class WalletTransactionStore:
tx_record_cache: Dict[bytes32, TransactionRecord]
@classmethod
async def create(cls, db_path: Path, cache_size: uint32 = uint32(600000)):
async def create(cls, connection: aiosqlite.Connection, cache_size: uint32 = uint32(600000)):
self = cls()
self.cache_size = cache_size
self.db_connection = await aiosqlite.connect(db_path)
self.db_connection = connection
await self.db_connection.execute(
(
f"CREATE TABLE IF NOT EXISTS transaction_record("
@ -78,9 +77,6 @@ class WalletTransactionStore:
self.tx_record_cache = dict()
return self
async def close(self):
await self.db_connection.close()
async def _init_cache(self):
print("init cache here")

View File

@ -1,4 +1,3 @@
from pathlib import Path
from typing import Optional, List
import aiosqlite
@ -16,10 +15,10 @@ class WalletUserStore:
cache_size: uint32
@classmethod
async def create(cls, db_path: Path):
async def create(cls, connection: aiosqlite.Connection):
self = cls()
self.db_connection = await aiosqlite.connect(db_path)
self.db_connection = connection
await self.db_connection.execute(
(
@ -52,9 +51,6 @@ class WalletUserStore:
if len(all_wallets) == 0:
await self.create_wallet("Chia Wallet", WalletType.STANDARD_WALLET, "")
async def close(self):
await self.db_connection.close()
async def _clear_database(self):
cursor = await self.db_connection.execute("DELETE FROM users_wallets")
await cursor.close()

View File

@ -3,6 +3,7 @@ import time
from typing import Any, Dict
from pathlib import Path
import aiosqlite
import pytest
from blspy import PrivateKey
@ -44,8 +45,10 @@ def event_loop():
class TestGenesisBlock:
@pytest.mark.asyncio
async def test_basic_blockchain(self):
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
bc1 = await Blockchain.create(unspent_store, store, test_constants)
assert len(bc1.get_current_tips()) == 1
@ -56,8 +59,7 @@ class TestGenesisBlock:
) == genesis_block.weight
assert bc1.get_next_min_iters(bc1.genesis) > 0
await unspent_store.close()
await store.close()
await connection.close()
class TestBlockValidation:
@ -67,17 +69,18 @@ class TestBlockValidation:
Provides a list of 10 valid blocks, as well as a blockchain with 9 blocks added to it.
"""
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 10)
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
store = await FullNodeStore.create(connection)
await store._clear_database()
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
unspent_store = await CoinStore.create(connection)
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
for i in range(1, 9):
result, removed = await b.receive_block(blocks[i])
assert result == ReceiveBlockResult.ADDED_TO_HEAD
yield (blocks, b)
await unspent_store.close()
await store.close()
await connection.close()
@pytest.mark.asyncio
async def test_prev_pointer(self, initial_blockchain):
@ -294,9 +297,10 @@ class TestBlockValidation:
num_blocks = 30
# Make it 5x faster than target time
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 2)
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
for i in range(1, num_blocks):
@ -317,16 +321,17 @@ class TestBlockValidation:
assert (b.get_next_min_iters(blocks[26])) > (b.get_next_min_iters(blocks[25]))
assert (b.get_next_min_iters(blocks[27])) == (b.get_next_min_iters(blocks[26]))
await unspent_store.close()
await store.close()
await connection.close()
class TestReorgs:
@pytest.mark.asyncio
async def test_basic_reorg(self):
blocks = bt.get_consecutive_blocks(test_constants, 100, [], 9)
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
@ -348,14 +353,15 @@ class TestReorgs:
assert result == ReceiveBlockResult.ADDED_TO_HEAD
assert b.get_current_tips()[0].height == 119
await unspent_store.close()
await store.close()
await connection.close()
@pytest.mark.asyncio
async def test_reorg_from_genesis(self):
blocks = bt.get_consecutive_blocks(test_constants, 20, [], 9, b"0")
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
for i in range(1, len(blocks)):
@ -390,14 +396,15 @@ class TestReorgs:
result, _ = await b.receive_block(blocks_reorg_chain_2[22])
assert result == ReceiveBlockResult.ADDED_TO_HEAD
await unspent_store.close()
await store.close()
await connection.close()
@pytest.mark.asyncio
async def test_lca(self):
blocks = bt.get_consecutive_blocks(test_constants, 5, [], 9, b"0")
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
for i in range(1, len(blocks)):
@ -417,14 +424,15 @@ class TestReorgs:
await b.receive_block(reorg[i])
assert b.lca_block.header_hash == blocks[0].header_hash
await unspent_store.close()
await store.close()
await connection.close()
@pytest.mark.asyncio
async def test_get_header_hashes(self):
blocks = bt.get_consecutive_blocks(test_constants, 5, [], 9, b"0")
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
@ -434,5 +442,4 @@ class TestReorgs:
assert len(header_hashes) == 6
assert header_hashes == [block.header_hash for block in blocks]
await unspent_store.close()
await store.close()
await connection.close()

View File

@ -5,6 +5,7 @@ from typing import Any, Dict
import sqlite3
import random
import aiosqlite
import pytest
from src.full_node.store import FullNodeStore
from src.types.full_block import FullBlock
@ -50,8 +51,12 @@ class TestStore:
if db_filename_3.exists():
db_filename_3.unlink()
db = await FullNodeStore.create(db_filename)
db_2 = await FullNodeStore.create(db_filename_2)
connection = await aiosqlite.connect(db_filename)
connection_2 = await aiosqlite.connect(db_filename_2)
connection_3 = await aiosqlite.connect(db_filename_3)
db = await FullNodeStore.create(connection)
db_2 = await FullNodeStore.create(connection_2)
try:
await db._clear_database()
@ -144,19 +149,20 @@ class TestStore:
assert not db.seen_unfinished_block(h_hash_1)
except Exception:
await db.close()
await db_2.close()
await connection.close()
await connection_2.close()
await connection_3.close()
db_filename.unlink()
db_filename_2.unlink()
raise
# Different database should have different data
db_3 = await FullNodeStore.create(db_filename_3)
db_3 = await FullNodeStore.create(connection_3)
assert db_3.get_unfinished_block_leader() == (0, (1 << 64) - 1)
await db.close()
await db_2.close()
await db_3.close()
await connection.close()
await connection_2.close()
await connection_3.close()
db_filename.unlink()
db_filename_2.unlink()
db_filename_3.unlink()
@ -169,7 +175,8 @@ class TestStore:
if db_filename.exists():
db_filename.unlink()
db = await FullNodeStore.create(db_filename)
connection = await aiosqlite.connect(db_filename)
db = await FullNodeStore.create(connection)
tasks = []
for i in range(10000):
@ -191,5 +198,5 @@ class TestStore:
)
)
await asyncio.gather(*tasks)
await db.close()
await connection.close()
db_filename.unlink()

View File

@ -2,6 +2,7 @@ import asyncio
from typing import Any, Dict
from pathlib import Path
import aiosqlite
import pytest
from src.full_node.blockchain import Blockchain, ReceiveBlockResult
@ -40,7 +41,9 @@ class TestUnspent:
async def test_basic_unspent_store(self):
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db = await CoinStore.create(Path("fndb_test.db"))
db_path = Path("fndb_test.db")
connection = await aiosqlite.connect(db_path)
db = await CoinStore.create(connection)
await db._clear_database()
# Save/get block
@ -51,14 +54,16 @@ class TestUnspent:
assert block.header.data.coinbase == unspent.coin
assert block.header.data.fees_coin == unspent_fee.coin
await db.close()
await connection.close()
Path("fndb_test.db").unlink()
@pytest.mark.asyncio
async def test_set_spent(self):
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db = await CoinStore.create(Path("fndb_test.db"))
db_path = Path("fndb_test.db")
connection = await aiosqlite.connect(db_path)
db = await CoinStore.create(connection)
await db._clear_database()
# Save/get block
@ -76,14 +81,16 @@ class TestUnspent:
assert unspent.spent == 1
assert unspent_fee.spent == 1
await db.close()
await connection.close()
Path("fndb_test.db").unlink()
@pytest.mark.asyncio
async def test_rollback(self):
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db = await CoinStore.create(Path("fndb_test.db"))
db_path = Path("fndb_test.db")
connection = await aiosqlite.connect(db_path)
db = await CoinStore.create(connection)
await db._clear_database()
# Save/get block
@ -114,14 +121,16 @@ class TestUnspent:
assert unspent is None
assert unspent_fee is None
await db.close()
await connection.close()
Path("fndb_test.db").unlink()
@pytest.mark.asyncio
async def test_basic_reorg(self):
blocks = bt.get_consecutive_blocks(test_constants, 100, [], 9)
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
try:
@ -166,21 +175,21 @@ class TestUnspent:
assert unspent.spent_block_index == 0
assert b.get_current_tips()[0].height == 119
except Exception as e:
await unspent_store.close()
await store.close()
await connection.close()
Path("blockchain_test.db").unlink()
raise e
await unspent_store.close()
await store.close()
await connection.close()
Path("blockchain_test.db").unlink()
@pytest.mark.asyncio
async def test_get_puzzle_hash(self):
num_blocks = 20
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 9)
unspent_store = await CoinStore.create(Path("blockchain_test.db"))
store = await FullNodeStore.create(Path("blockchain_test.db"))
db_path = Path("blockchain_test.db")
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
try:
@ -197,11 +206,9 @@ class TestUnspent:
)
assert len(coins) == (num_blocks + 1) * 2
except Exception as e:
await unspent_store.close()
await store.close()
await connection.close()
Path("blockchain_test.db").unlink()
raise e
await unspent_store.close()
await store.close()
await connection.close()
Path("blockchain_test.db").unlink()

View File

@ -2,6 +2,8 @@ import signal
from typing import Any, Dict, Optional
from pathlib import Path
import asyncio
import aiosqlite
import blspy
from secrets import token_bytes
@ -55,9 +57,11 @@ async def setup_full_node_simulator(db_name, port, introducer_port=None, dic={})
for k in dic.keys():
test_constants_copy[k] = dic[k]
store_1 = await FullNodeStore.create(Path(db_name))
db_path = Path(db_name)
connection = await aiosqlite.connect(db_path)
store_1 = await FullNodeStore.create(connection)
await store_1._clear_database()
unspent_store_1 = await CoinStore.create(Path(db_name))
unspent_store_1 = await CoinStore.create(connection)
await unspent_store_1._clear_database()
mempool_1 = MempoolManager(unspent_store_1, test_constants_copy)
@ -93,8 +97,7 @@ async def setup_full_node_simulator(db_name, port, introducer_port=None, dic={})
full_node_1._shutdown()
server_1.close_all()
await server_1.await_closed()
await store_1.close()
await unspent_store_1.close()
await connection.close()
Path(db_name).unlink()
@ -104,9 +107,11 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
for k in dic.keys():
test_constants_copy[k] = dic[k]
store_1 = await FullNodeStore.create(Path(db_name))
db_path = Path(db_name)
connection = await aiosqlite.connect(db_path)
store_1 = await FullNodeStore.create(connection)
await store_1._clear_database()
unspent_store_1 = await CoinStore.create(Path(db_name))
unspent_store_1 = await CoinStore.create(connection)
await unspent_store_1._clear_database()
mempool_1 = MempoolManager(unspent_store_1, test_constants_copy)
@ -139,9 +144,7 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
# TEARDOWN
full_node_1._shutdown()
server_1.close_all()
await server_1.await_closed()
await store_1.close()
await unspent_store_1.close()
await connection.close()
Path(db_name).unlink()