Store unfinished blocks on disk, tips/lca, db changes, cleanup

This commit is contained in:
Mariano Sorgente 2020-04-28 17:09:18 +09:00
parent d88fa32746
commit f4d8c22f94
No known key found for this signature in database
GPG Key ID: 0F866338C369278C
22 changed files with 477 additions and 445 deletions

View File

@ -1,16 +1,12 @@
import asyncio
import logging
import aiosqlite
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional
from src.types.program import Program
from src.types.full_block import FullBlock
from src.types.header import HeaderData, Header
from src.types.header_block import HeaderBlock
from src.types.proof_of_space import ProofOfSpace
from src.types.header import Header
from src.types.sized_bytes import bytes32
from src.util.hash import std_hash
from src.util.ints import uint32, uint64
from src.util.ints import uint32
log = logging.getLogger(__name__)
@ -31,12 +27,7 @@ class BlockStore:
# Headers
await self.db.execute(
"CREATE TABLE IF NOT EXISTS headers(height bigint, header_hash "
"text PRIMARY KEY, proof_hash text, header blob)"
)
# LCA
await self.db.execute(
"CREATE TABLE IF NOT EXISTS lca(header_hash text PRIMARY KEY)"
"text PRIMARY KEY, proof_hash text, header blob, is_lca tinyint, is_tip tinyint)"
)
# Height index so we can look up in order of height for sync purposes
@ -46,31 +37,43 @@ class BlockStore:
await self.db.execute(
"CREATE INDEX IF NOT EXISTS header_height on headers(height)"
)
await self.db.execute("CREATE INDEX IF NOT EXISTS lca on headers(is_lca)")
await self.db.execute("CREATE INDEX IF NOT EXISTS lca on headers(is_tip)")
await self.db.commit()
return self
async def _clear_database(self):
async with self.lock:
await self.db.execute("DELETE FROM blocks")
await self.db.execute("DELETE FROM headers")
await self.db.commit()
async def get_lca(self) -> Optional[bytes32]:
cursor = await self.db.execute("SELECT * from lca")
async def get_lca(self) -> Optional[Header]:
cursor = await self.db.execute("SELECT header from headers WHERE is_lca=1")
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return bytes32(bytes.fromhex(row[0]))
return Header.from_bytes(row[0])
return None
async def set_lca(self, header_hash: bytes32) -> None:
await self.db.execute("DELETE FROM lca")
cursor_1 = await self.db.execute(
"INSERT OR REPLACE INTO lca VALUES(?)", (header_hash.hex(),)
)
cursor_1 = await self.db.execute("UPDATE headers SET is_lca=0")
await cursor_1.close()
cursor_2 = await self.db.execute(
"UPDATE headers SET is_lca=1 WHERE header_hash=?", (header_hash.hex(),)
)
await cursor_2.close()
await self.db.commit()
async def get_tips(self) -> List[bytes32]:
cursor = await self.db.execute("SELECT header from headers WHERE is_tip=1")
rows = await cursor.fetchall()
await cursor.close()
return [Header.from_bytes(row[0]) for row in rows]
async def set_tips(self, header_hashes: List[bytes32]) -> None:
cursor_1 = await self.db.execute("UPDATE headers SET is_tip=0")
await cursor_1.close()
tips_db = tuple([h.hex() for h in header_hashes])
formatted_str = f'UPDATE headers SET is_tip=1 WHERE header_hash in ({"?," * (len(tips_db) - 1)}?)'
cursor_2 = await self.db.execute(formatted_str, tips_db)
await cursor_2.close()
await self.db.commit()
async def add_block(self, block: FullBlock) -> None:
@ -84,7 +87,7 @@ class BlockStore:
block.proof_of_space.get_hash() + block.proof_of_time.output.get_hash()
)
cursor_2 = await self.db.execute(
("INSERT OR REPLACE INTO headers VALUES(?, ?, ?, ?)"),
("INSERT OR REPLACE INTO headers VALUES(?, ?, ?, ?, 0, 0)"),
(
block.height,
block.header_hash.hex(),
@ -97,12 +100,12 @@ class BlockStore:
async def get_block(self, header_hash: bytes32) -> Optional[FullBlock]:
cursor = await self.db.execute(
"SELECT * from blocks WHERE header_hash=?", (header_hash.hex(),)
"SELECT block from blocks WHERE header_hash=?", (header_hash.hex(),)
)
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return FullBlock.from_bytes(row[2])
return FullBlock.from_bytes(row[0])
return None
async def get_blocks_at(self, heights: List[uint32]) -> List[FullBlock]:
@ -110,22 +113,17 @@ class BlockStore:
return []
heights_db = tuple(heights)
formatted_str = (
f'SELECT * from blocks WHERE height in ({"?," * (len(heights_db) - 1)}?)'
)
formatted_str = f'SELECT block from blocks WHERE height in ({"?," * (len(heights_db) - 1)}?)'
cursor = await self.db.execute(formatted_str, heights_db)
rows = await cursor.fetchall()
await cursor.close()
blocks: List[FullBlock] = []
for row in rows:
blocks.append(FullBlock.from_bytes(row[2]))
return blocks
return [FullBlock.from_bytes(row[0]) for row in rows]
async def get_headers(self) -> List[Header]:
cursor = await self.db.execute("SELECT * from headers")
async def get_headers(self) -> Dict[bytes32, Header]:
cursor = await self.db.execute("SELECT header_hash, header from headers")
rows = await cursor.fetchall()
await cursor.close()
return [Header.from_bytes(row[3]) for row in rows]
return {row[0]: Header.from_bytes(row[1]) for row in rows}
async def get_proof_hashes(self) -> Dict[bytes32, bytes32]:
cursor = await self.db.execute("SELECT header_hash, proof_hash from headers")

View File

@ -101,83 +101,63 @@ class Blockchain:
self.tips = []
self.height_to_hash = {}
self.headers = {}
self.coin_store = coin_store
self.block_store = block_store
self.genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
self.coinbase_freeze = self.constants["COINBASE_FREEZE_PERIOD"]
result, removed, error_code = await self.receive_block(self.genesis, sync_mode=False)
if result != ReceiveBlockResult.ADDED_TO_HEAD:
if error_code is not None:
raise ConsensusError(error_code)
else:
raise RuntimeError(f"Invalid genesis block {self.genesis}")
start = time.time()
headers_input, lca_hash = await self._load_headers_from_store()
log.info(f"Read from disk in {time.time() - start}")
start = time.time()
assert self.lca_block is not None
if len(headers_input) > 0:
self.headers = headers_input
sorted_headers = sorted(self.headers.items(), key=lambda b: b[1].height)
if lca_hash is not None:
# Add all blocks up to the LCA, and set the tip to the LCA
assert sorted_headers[-1][0] == lca_hash
for _, header in sorted_headers:
self.height_to_hash[header.height] = header.header_hash
self.tips = [header]
self.lca_block = header
await self._reconsider_heads(self.lca_block, False, False)
else:
for _, header in sorted_headers:
# Reconsider every single header, since we don't have LCA on disk
self.height_to_hash[header.height] = header.header_hash
await self._reconsider_heads(header, False, False)
assert (
self.headers[self.height_to_hash[uint32(0)]].get_hash()
== self.genesis.header_hash
)
if len(headers_input) > 1:
assert (
self.headers[self.height_to_hash[uint32(1)]].prev_header_hash
== self.genesis.header_hash
)
log.info(f"Added to chain in {time.time() - start}")
await self._load_chain_from_store()
return self
async def _load_headers_from_store(
self,
) -> Tuple[Dict[str, Header], Optional[bytes32]]:
async def _load_chain_from_store(self,) -> None:
"""
Loads headers from disk, into a list of Headers, that can be used
to initialize the Blockchain class.
Initializes the state of the Blockchain class from the database. Sets the LCA, tips,
headers, height_to_hash, and block_store DiffStores.
"""
lca_hash: Optional[bytes32] = await self.block_store.get_lca()
seen_blocks: Dict[str, Header] = {}
tips: List[Header] = []
for header in await self.block_store.get_headers():
if lca_hash is not None:
if header.header_hash == lca_hash:
tips = [header]
else:
if len(tips) == 0 or header.weight > tips[0].weight:
tips = [header]
seen_blocks[header.header_hash] = header
lca_db: Optional[Header] = await self.block_store.get_lca()
tips_db: List[Header] = await self.block_store.get_tips()
headers_db: Dict[bytes32, Header] = await self.block_store.get_headers()
headers = {}
if len(tips) > 0:
curr: Header = tips[0]
reverse_blocks: List[Header] = [curr]
while curr.height > 0:
curr = seen_blocks[curr.prev_header_hash]
reverse_blocks.append(curr)
assert (lca_db is None) == (len(tips_db) == 0) == (len(headers_db) == 0)
if lca_db is None:
result, removed, error_code = await self.receive_block(
self.genesis, sync_mode=False
)
if result != ReceiveBlockResult.ADDED_TO_HEAD:
if error_code is not None:
raise ConsensusError(error_code)
else:
raise RuntimeError(f"Invalid genesis block {self.genesis}")
return
for block in reversed(reverse_blocks):
headers[block.header_hash] = block
return headers, lca_hash
# Set the state (lca block and tips)
self.lca_block = lca_db
self.tips = tips_db
# Find the common ancestor of the tips, and add intermediate blocks to headers
cur: List[Header] = self.tips[:]
while any(b.header_hash != cur[0].header_hash for b in cur):
heights = [b.height for b in cur]
i = heights.index(max(heights))
self.headers[cur[i].header_hash] = cur[i]
cur[i] = headers_db[cur[i].prev_header_hash]
# Consistency check, tips should have an LCA equal to the DB LCA
assert cur[0] == self.lca_block
# Sets the header for remaining blocks, and height_to_hash dict
cur_b: Header = self.lca_block
while True:
self.headers[cur_b.header_hash] = cur_b
self.height_to_hash[cur_b.height] = cur_b.header_hash
if cur_b.height == 0:
break
cur_b = headers_db[cur_b.prev_header_hash]
# Asserts that the DB genesis block is correct
assert cur_b == self.genesis.header
# Adds the blocks to the db between LCA and tip
await self.recreate_diff_stores()
def get_current_tips(self) -> List[Header]:
"""
@ -873,6 +853,7 @@ class Blockchain:
self.tips.sort(key=lambda b: b.weight, reverse=True)
# This will loop only once
removed = self.tips.pop()
await self.block_store.set_tips([t.header_hash for t in self.tips])
await self._reconsider_lca(genesis, sync_mode)
return True, removed
return False, None
@ -897,6 +878,8 @@ class Blockchain:
else:
self._reconsider_heights(self.lca_block, cur[0])
self.lca_block = cur[0]
if not genesis:
await self.block_store.set_lca(self.lca_block.header_hash)
if old_lca is None:
full: Optional[FullBlock] = await self.block_store.get_block(
@ -905,8 +888,6 @@ class Blockchain:
assert full is not None
await self.coin_store.new_lca(full)
await self._create_diffs_for_tips(self.lca_block)
if not genesis:
await self.block_store.set_lca(self.lca_block.header_hash)
# If LCA changed update the unspent store
elif old_lca.header_hash != self.lca_block.header_hash:
# New LCA is lower height but not the a parent of old LCA (Reorg)
@ -920,8 +901,6 @@ class Blockchain:
await self._from_fork_to_lca(fork_head, self.lca_block)
if not sync_mode:
await self.recreate_diff_stores()
if not genesis:
await self.block_store.set_lca(self.lca_block.header_hash)
else:
# If LCA has not changed just update the difference
self.coin_store.nuke_diffs()
@ -1132,7 +1111,9 @@ class Blockchain:
additions_since_fork: Dict[bytes32, Tuple[Coin, uint32]] = {}
removals_since_fork: Set[bytes32] = set()
coinbases_since_fork: Dict[bytes32, uint32] = {}
curr: Optional[FullBlock] = await self.block_store.get_block(block.prev_header_hash)
curr: Optional[FullBlock] = await self.block_store.get_block(
block.prev_header_hash
)
assert curr is not None
log.info(f"curr.height is: {curr.height}, fork height is: {fork_h}")
while curr.height > fork_h:

View File

@ -77,11 +77,6 @@ class CoinStore:
self.head_diffs = dict()
return self
async def _clear_database(self):
cursor = await self.coin_record_db.execute("DELETE FROM coin_record")
await cursor.close()
await self.coin_record_db.commit()
async def add_lcas(self, blocks: List[FullBlock]):
for block in blocks:
await self.new_lca(block)

View File

@ -3,7 +3,7 @@ import concurrent
import logging
import time
from asyncio import Event
from typing import AsyncGenerator, List, Optional, Tuple, Dict
from typing import AsyncGenerator, List, Optional, Tuple, Dict, Type
import aiosqlite
from chiabip158 import PyBIP158
@ -14,7 +14,6 @@ from src.consensus.block_rewards import calculate_base_fee
from src.consensus.constants import constants as consensus_constants
from src.consensus.pot_iterations import calculate_iterations
from src.consensus.weight_verifier import verify_weight
from src.full_node.store import FullNodeStore
from src.protocols import (
farmer_protocol,
full_node_protocol,
@ -69,21 +68,18 @@ class FullNode:
constants: Dict
_shut_down: bool
@staticmethod
@classmethod
async def create(
config: Dict,
name: str = None,
override_constants=None,
cls: Type, config: Dict, name: str = None, override_constants={},
):
self = FullNode()
self = cls()
self.config = config
self.server = None
self._shut_down = False # Set to true to close all infinite loops
self.constants = consensus_constants.copy()
if override_constants:
for key, value in override_constants.items():
self.constants[key] = value
for key, value in override_constants.items():
self.constants[key] = value
if name:
self.log = logging.getLogger(name)
else:
@ -98,17 +94,19 @@ class FullNode:
self.block_store = await BlockStore.create(self.connection)
self.full_node_store = await FullNodeStore.create(self.connection)
self.sync_store = await SyncStore.create(self.connection)
genesis: FullBlock = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
await self.block_store.add_block(genesis)
self.coin_store = await CoinStore.create(self.connection)
self.log.info("Initializing blockchain from disk")
self.blockchain = await Blockchain.create(self.coin_store, self.block_store)
self.log.info("Blockchain initialized")
self.blockchain = await Blockchain.create(
self.coin_store, self.block_store, self.constants
)
self.log.info(
f"Blockchain initialized to tips at {[t.height for t in self.blockchain.get_current_tips()]}"
)
self.mempool_manager = MempoolManager(self.coin_store)
self.mempool_manager = MempoolManager(self.coin_store, self.constants)
await self.mempool_manager.new_tips(await self.blockchain.get_full_tips())
return self
def _set_server(self, server: ChiaServer):
self.server = server
@ -185,7 +183,9 @@ class FullNode:
tip_hashes = [tip.header_hash for tip in tips]
tip_infos = [
tup[0]
for tup in list((self.full_node_store.get_unfinished_blocks()).items())
for tup in list(
(await self.full_node_store.get_unfinished_blocks()).items()
)
if tup[1].prev_header_hash in tip_hashes
]
for chall, iters in tip_infos:
@ -402,7 +402,9 @@ class FullNode:
blocks_missing = any(
[
not (
self.sync_store.get_potential_headers_received(uint32(h))
self.sync_store.get_potential_headers_received(
uint32(h)
)
).is_set()
for h in range(batch_start, batch_end)
]
@ -434,7 +436,9 @@ class FullNode:
# Wait for the first batch (the next "max_blocks_to_send" blocks to arrive)
awaitables = [
(self.sync_store.get_potential_headers_received(uint32(height))).wait()
(
self.sync_store.get_potential_headers_received(uint32(height))
).wait()
for height in range(height_checkpoint, end_height)
]
future = asyncio.gather(*awaitables, return_exceptions=True)
@ -534,7 +538,9 @@ class FullNode:
# Wait for the first batch (the next "max_blocks_to_send" blocks to arrive)
awaitables = [
(self.sync_store.get_potential_blocks_received(uint32(height))).wait()
(
self.sync_store.get_potential_blocks_received(uint32(height))
).wait()
for height in range(height_checkpoint, end_height)
]
future = asyncio.gather(*awaitables, return_exceptions=True)
@ -578,7 +584,9 @@ class FullNode:
result,
header_block,
error_code,
) = await self.blockchain.receive_block(block, validated, pos, sync_mode=True)
) = await self.blockchain.receive_block(
block, validated, pos, sync_mode=True
)
if (
result == ReceiveBlockResult.INVALID_BLOCK
or result == ReceiveBlockResult.DISCONNECTED_BLOCK
@ -770,14 +778,13 @@ class FullNode:
) -> OutboundMessageGenerator:
# If we don't have an unfinished block for this PoT, we don't care about it
if (
self.full_node_store.get_unfinished_block(
await self.full_node_store.get_unfinished_block(
(
new_proof_of_time.challenge_hash,
new_proof_of_time.number_of_iterations,
)
)
is None
):
) is None:
return
# If we already have the PoT in a finished block, return
@ -853,14 +860,13 @@ class FullNode:
we can complete it. Otherwise, we just verify and propagate the proof.
"""
if (
self.full_node_store.get_unfinished_block(
await self.full_node_store.get_unfinished_block(
(
respond_proof_of_time.proof.challenge_hash,
respond_proof_of_time.proof.number_of_iterations,
)
)
is not None
):
) is not None:
height: Optional[uint32] = self.full_node_store.get_proof_of_time_heights(
(
respond_proof_of_time.proof.challenge_hash,
@ -1035,10 +1041,12 @@ class FullNode:
challenge = self.blockchain.get_challenge(prev_block)
if challenge is not None:
if (
self.full_node_store.get_unfinished_block(
(
challenge.get_hash(),
new_unfinished_block.number_of_iterations,
await (
self.full_node_store.get_unfinished_block(
(
challenge.get_hash(),
new_unfinished_block.number_of_iterations,
)
)
)
is not None
@ -1060,7 +1068,7 @@ class FullNode:
async def request_unfinished_block(
self, request_unfinished_block: full_node_protocol.RequestUnfinishedBlock
) -> OutboundMessageGenerator:
for _, block in self.full_node_store.get_unfinished_blocks().items():
for _, block in (await self.full_node_store.get_unfinished_blocks()).items():
if block.header_hash == request_unfinished_block.header_hash:
yield OutboundMessage(
NodeType.FULL_NODE,
@ -1131,13 +1139,20 @@ class FullNode:
challenge_hash = challenge.get_hash()
if (
self.full_node_store.get_unfinished_block((challenge_hash, iterations_needed))
await (
self.full_node_store.get_unfinished_block(
(challenge_hash, iterations_needed)
)
)
is not None
):
return
expected_time: uint64 = uint64(
int(iterations_needed / (self.full_node_store.get_proof_of_time_estimate_ips()))
int(
iterations_needed
/ (self.full_node_store.get_proof_of_time_estimate_ips())
)
)
if expected_time > self.constants["PROPAGATION_DELAY_THRESHOLD"]:
@ -1145,13 +1160,17 @@ class FullNode:
# If this block is slow, sleep to allow faster blocks to come out first
await asyncio.sleep(5)
leader: Tuple[uint32, uint64] = self.full_node_store.get_unfinished_block_leader()
leader: Tuple[
uint32, uint64
] = self.full_node_store.get_unfinished_block_leader()
if leader is None or block.height > leader[0]:
self.log.info(
f"This is the first unfinished block at height {block.height}, so propagate."
)
# If this is the first block we see at this height, propagate
self.full_node_store.set_unfinished_block_leader((block.height, expected_time))
self.full_node_store.set_unfinished_block_leader(
(block.height, expected_time)
)
elif block.height == leader[0]:
if expected_time > leader[1] + self.constants["PROPAGATION_THRESHOLD"]:
# If VDF is expected to finish X seconds later than the best, don't propagate
@ -1162,13 +1181,17 @@ class FullNode:
elif expected_time < leader[1]:
self.log.info(f"New best unfinished block at height {block.height}")
# If this will be the first block to finalize, update our leader
self.full_node_store.set_unfinished_block_leader((leader[0], expected_time))
self.full_node_store.set_unfinished_block_leader(
(leader[0], expected_time)
)
else:
# If we have seen an unfinished block at a greater or equal height, don't propagate
self.log.info(f"Unfinished block at old height, so don't propagate")
return
self.full_node_store.add_unfinished_block((challenge_hash, iterations_needed), block)
await self.full_node_store.add_unfinished_block(
(challenge_hash, iterations_needed), block
)
timelord_request = timelord_protocol.ProofOfSpaceInfo(
challenge_hash, iterations_needed
@ -1261,7 +1284,9 @@ class FullNode:
"""
self.log.info(f"Received header block {request.header_block.height}.")
self.sync_store.add_potential_header(request.header_block)
(self.sync_store.get_potential_headers_received(request.header_block.height)).set()
(
self.sync_store.get_potential_headers_received(request.header_block.height)
).set()
for _ in []: # Yields nothing
yield _
@ -1467,7 +1492,7 @@ class FullNode:
f"PoS hash {header_signature.pos_hash} not found in database"
)
return
# Verifies that we have the correct header and body self.stored
# Verifies that we have the correct header and body stored
generator, filt, block_header_data, pos = candidate
assert block_header_data.get_hash() == header_signature.header_hash
@ -1499,9 +1524,9 @@ class FullNode:
request.proof.number_of_iterations,
)
unfinished_block_obj: Optional[FullBlock] = self.full_node_store.get_unfinished_block(
dict_key
)
unfinished_block_obj: Optional[
FullBlock
] = await self.full_node_store.get_unfinished_block(dict_key)
if not unfinished_block_obj:
self.log.warning(
f"Received a proof of time that we cannot use to complete a block {dict_key}"
@ -1564,7 +1589,8 @@ class FullNode:
await self.sync_store.add_potential_block(respond_block.block)
if (
self.full_node_store.get_sync_mode()
and respond_block.block.height in self.sync_store.potential_blocks_received
and respond_block.block.height
in self.sync_store.potential_blocks_received
):
# If we are still in sync mode, set it
self.sync_store.get_potential_blocks_received(
@ -1673,7 +1699,8 @@ class FullNode:
Message("proof_of_time_rate", rate_update),
Delivery.BROADCAST,
)
self.full_node_store.clear_seen_unfinished_blocks()
# Occasionally clear the seen list to keep it small
await self.full_node_store.clear_seen_unfinished_blocks()
challenge: Optional[Challenge] = self.blockchain.get_challenge(
respond_block.block
@ -1745,7 +1772,9 @@ class FullNode:
raise RuntimeError(f"Invalid result from receive_block {added}")
# This code path is reached if added == ADDED_AS_ORPHAN or ADDED_TO_HEAD
next_block: Optional[FullBlock] = self.full_node_store.get_disconnected_block_by_prev(
next_block: Optional[
FullBlock
] = self.full_node_store.get_disconnected_block_by_prev(
respond_block.block.header_hash
)
@ -1760,8 +1789,8 @@ class FullNode:
lowest_tip = min(tip.height for tip in self.blockchain.get_current_tips())
clear_height = uint32(max(0, lowest_tip - 30))
self.full_node_store.clear_candidate_blocks_below(clear_height)
self.full_node_store.clear_unfinished_blocks_below(clear_height)
self.full_node_store.clear_disconnected_blocks_below(clear_height)
await self.full_node_store.clear_unfinished_blocks_below(clear_height)
@api_request
async def reject_block_request(
@ -2006,7 +2035,9 @@ class FullNode:
async def request_removals(
self, request: wallet_protocol.RequestRemovals
) -> OutboundMessageGenerator:
block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash)
block: Optional[FullBlock] = await self.block_store.get_block(
request.header_hash
)
if (
block is None
or block.height != request.height
@ -2076,7 +2107,9 @@ class FullNode:
async def request_additions(
self, request: wallet_protocol.RequestAdditions
) -> OutboundMessageGenerator:
block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash)
block: Optional[FullBlock] = await self.block_store.get_block(
request.header_hash
)
if (
block is None
or block.height != request.height

View File

@ -1,15 +1,12 @@
import asyncio
import logging
import aiosqlite
from typing import Dict, List, Optional, Tuple
from src.types.program import Program
from src.types.full_block import FullBlock
from src.types.header import HeaderData, Header
from src.types.header_block import HeaderBlock
from src.types.header import HeaderData
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.util.hash import std_hash
from src.util.ints import uint32, uint64
log = logging.getLogger(__name__)
@ -30,8 +27,6 @@ class FullNodeStore:
bytes32,
Tuple[Optional[Program], Optional[bytes], HeaderData, ProofOfSpace, uint32],
]
# Blocks which are not finalized yet (no proof of time), old ones are cleared
unfinished_blocks: Dict[Tuple[bytes32, uint64], FullBlock]
# Header hashes of unfinished blocks that we have seen recently
seen_unfinished_blocks: set
# Blocks which we have received but our blockchain does not reach, old ones are cleared
@ -53,14 +48,19 @@ class FullNodeStore:
uint64((1 << 64) - 1),
)
self.candidate_blocks = {}
self.unfinished_blocks = {}
self.seen_unfinished_blocks = set()
self.disconnected_blocks = {}
return self
async def _clear_database(self):
async with self.lock:
await self.db.commit()
await self.db.execute(
f"CREATE TABLE IF NOT EXISTS unfinished_blocks("
f"challenge_hash text,"
f"iterations bigint,"
f"block blob,"
f"height int,"
f"PRIMARY KEY (challenge_hash, iterations))"
)
await self.db.commit()
return self
def add_disconnected_block(self, block: FullBlock) -> None:
self.disconnected_blocks[block.header_hash] = block
@ -123,13 +123,28 @@ class FullNodeStore:
except KeyError:
pass
def add_unfinished_block(
async def add_unfinished_block(
self, key: Tuple[bytes32, uint64], block: FullBlock
) -> None:
self.unfinished_blocks[key] = block
cursor_1 = await self.db.execute(
"INSERT OR REPLACE INTO unfinished_blocks VALUES(?, ?, ?, ?)",
(key[0].hex(), key[1], bytes(block), block.height),
)
await cursor_1.close()
await self.db.commit()
def get_unfinished_block(self, key: Tuple[bytes32, uint64]) -> Optional[FullBlock]:
return self.unfinished_blocks.get(key, None)
async def get_unfinished_block(
self, key: Tuple[bytes32, uint64]
) -> Optional[FullBlock]:
cursor = await self.db.execute(
"SELECT block from unfinished_blocks WHERE challenge_hash=? AND iterations=?",
(key[0].hex(), key[1]),
)
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return FullBlock.from_bytes(row[0])
return None
def seen_unfinished_block(self, header_hash: bytes32) -> bool:
if header_hash in self.seen_unfinished_blocks:
@ -137,22 +152,23 @@ class FullNodeStore:
self.seen_unfinished_blocks.add(header_hash)
return False
def clear_seen_unfinished_blocks(self) -> None:
async def clear_seen_unfinished_blocks(self) -> None:
self.seen_unfinished_blocks.clear()
def get_unfinished_blocks(self) -> Dict[Tuple[bytes32, uint64], FullBlock]:
return self.unfinished_blocks.copy()
async def get_unfinished_blocks(self) -> Dict[Tuple[bytes32, uint64], FullBlock]:
cursor = await self.db.execute(
"SELECT challenge_hash, iterations, block from unfinished_blocks"
)
rows = await cursor.fetchall()
await cursor.close()
return {(bytes.fromhex(a), b): FullBlock.from_bytes(c) for a, b, c in rows}
def clear_unfinished_blocks_below(self, height: uint32) -> None:
del_keys = []
for key, unf in self.unfinished_blocks.items():
if unf.height < height:
del_keys.append(key)
for key in del_keys:
try:
del self.unfinished_blocks[key]
except KeyError:
pass
async def clear_unfinished_blocks_below(self, height: uint32) -> None:
cursor = await self.db.execute(
"DELETE from unfinished_blocks WHERE height<? ", (height,)
)
await cursor.close()
await self.db.commit()
def set_unfinished_block_leader(self, key: Tuple[bytes32, uint64]) -> None:
self.unfinished_blocks_leader = key

View File

@ -3,14 +3,10 @@ import logging
import aiosqlite
from typing import Dict, List, Optional, Tuple
from src.types.program import Program
from src.types.full_block import FullBlock
from src.types.header import HeaderData, Header
from src.types.header_block import HeaderBlock
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.util.hash import std_hash
from src.util.ints import uint32, uint64
from src.util.ints import uint32
log = logging.getLogger(__name__)
@ -58,11 +54,6 @@ class SyncStore:
self.potential_future_blocks = []
return self
async def _clear_database(self):
async with self.lock:
await self.db.execute("DELETE FROM potential_blocks")
await self.db.commit()
async def add_potential_block(self, block: FullBlock) -> None:
cursor = await self.db.execute(
"INSERT OR REPLACE INTO potential_blocks VALUES(?, ?)",

View File

@ -90,7 +90,9 @@ class RpcApiHandler:
raise web.HTTPBadRequest()
header_hash = hexstr_to_bytes(request_data["header_hash"])
block: Optional[FullBlock] = await self.full_node.block_store.get_block(header_hash)
block: Optional[FullBlock] = await self.full_node.block_store.get_block(
header_hash
)
if block is None:
raise web.HTTPNotFound()
return obj_to_response(block)
@ -132,7 +134,9 @@ class RpcApiHandler:
raise web.HTTPBadRequest()
height = request_data["height"]
response_headers: List[Header] = []
for block in (self.full_node.full_node_store.get_unfinished_blocks()).values():
for block in (
await self.full_node.full_node_store.get_unfinished_blocks()
).values():
if block.height == height:
response_headers.append(block.header)

View File

@ -4,27 +4,18 @@ import logging.config
import signal
import miniupnpc
import aiosqlite
try:
import uvloop
except ImportError:
uvloop = None
from src.full_node.blockchain import Blockchain
from src.consensus.constants import constants
from src.full_node.store import FullNodeStore
from src.full_node.full_node import FullNode
from src.rpc.rpc_server import start_rpc_server
from src.full_node.mempool_manager import MempoolManager
from src.server.server import ChiaServer
from src.server.connection import NodeType
from src.types.full_block import FullBlock
from src.full_node.coin_store import CoinStore
from src.util.logging import initialize_logging
from src.util.config import load_config_cli, load_config
from src.util.default_root import DEFAULT_ROOT_PATH
from src.util.path import mkdir, path_from_root
from src.util.setproctitle import setproctitle

View File

@ -1,22 +1,18 @@
from secrets import token_bytes
from src.full_node.full_node import FullNode
from typing import AsyncGenerator, List, Dict, Optional
from src.full_node.blockchain import Blockchain
from src.full_node.store import FullNodeStore
from typing import AsyncGenerator, List, Optional
from src.protocols import (
full_node_protocol,
wallet_protocol,
)
from src.simulator.simulator_protocol import FarmNewBlockProtocol, ReorgProtocol
from src.util.bundle_tools import best_solution_program
from src.full_node.mempool_manager import MempoolManager
from src.server.outbound_message import OutboundMessage
from src.server.server import ChiaServer
from src.types.full_block import FullBlock
from src.types.spend_bundle import SpendBundle
from src.types.header import Header
from src.full_node.coin_store import CoinStore
from src.util.api_decorators import api_request
from src.util.ints import uint64
from tests.block_tools import BlockTools

View File

@ -2,7 +2,6 @@ import asyncio
import logging
import logging.config
import signal
import aiosqlite
from src.simulator.full_node_simulator import FullNodeSimulator
from src.simulator.simulator_constants import test_constants
@ -11,14 +10,9 @@ try:
except ImportError:
uvloop = None
from src.full_node.blockchain import Blockchain
from src.full_node.store import FullNodeStore
from src.rpc.rpc_server import start_rpc_server
from src.full_node.mempool_manager import MempoolManager
from src.server.server import ChiaServer
from src.server.connection import NodeType
from src.types.full_block import FullBlock
from src.full_node.coin_store import CoinStore
from src.util.logging import initialize_logging
from src.util.config import load_config_cli, load_config
from src.util.default_root import DEFAULT_ROOT_PATH
@ -38,25 +32,10 @@ async def main():
db_path = path_from_root(DEFAULT_ROOT_PATH, config["simulator_database_path"])
mkdir(db_path.parent)
connection = await aiosqlite.connect(db_path)
# Create the store (DB) and full node instance
store = await FullNodeStore.create(connection)
await store._clear_database()
genesis: FullBlock = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
await store.add_block(genesis)
coin_store = await CoinStore.create(connection)
log.info("Initializing blockchain from disk")
blockchain = await Blockchain.create(coin_store, store, test_constants)
mempool_manager = MempoolManager(coin_store, test_constants)
await mempool_manager.new_tips(await blockchain.get_full_tips())
db_path.unlink()
full_node = await FullNodeSimulator.create(
config,
override_constants=test_constants,
config, override_constants=test_constants,
)
ping_interval = net_config.get("ping_interval")
@ -78,12 +57,12 @@ async def main():
_ = await server.start_server(full_node._on_connect)
rpc_cleanup = None
def master_close_cb():
async def master_close_cb():
nonlocal server_closed
if not server_closed:
# Called by the UI, when node is closed, or when a signal is sent
log.info("Closing all connections, and server...")
full_node._shutdown()
await full_node._shutdown()
server.close_all()
server_closed = True
@ -112,12 +91,6 @@ async def main():
await rpc_cleanup()
log.info("Closed RPC server.")
await store.close()
log.info("Closed store.")
await coin_store.close()
log.info("Closed unspent store.")
await asyncio.get_running_loop().shutdown_asyncgens()
log.info("Node fully closed.")

View File

@ -87,8 +87,8 @@ full_node:
port: 8444
# Run multiple nodes with different databases by changing the database_path
database_path: db/blockchain_v3.db
simulator_database_path: sim_db/simulator_blockchain_v3.db
database_path: db/blockchain_v4.db
simulator_database_path: sim_db/simulator_blockchain_v4.db
# If True, starts an RPC server at the following port
start_rpc_server: True

View File

@ -10,7 +10,6 @@ from typing import Any, Dict, List, Optional, Tuple
import websockets
from src.types.sized_bytes import bytes32
from src.types.peer_info import PeerInfo
from src.util.byte_types import hexstr_to_bytes
from src.wallet.trade_manager import TradeManager

View File

@ -133,11 +133,7 @@ class TestWalletSimulator:
await self.time_out_assert(15, cc_wallet.get_confirmed_balance, 100)
await self.time_out_assert(15, cc_wallet.get_unconfirmed_balance, 100)
<<<<<<< HEAD
assert cc_wallet.cc_info.my_core is not None
=======
assert cc_wallet.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
colour = cc_wallet_puzzles.get_genesis_from_core(cc_wallet.cc_info.my_core)
cc_wallet_2: CCWallet = await CCWallet.create_wallet_for_cc(
@ -240,11 +236,7 @@ class TestWalletSimulator:
await self.time_out_assert(15, cc_wallet.get_confirmed_balance, 100)
await self.time_out_assert(15, cc_wallet.get_unconfirmed_balance, 100)
<<<<<<< HEAD
assert cc_wallet.cc_info.my_core is not None
=======
assert cc_wallet.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
colour = cc_wallet_puzzles.get_genesis_from_core(cc_wallet.cc_info.my_core)
cc_wallet_2: CCWallet = await CCWallet.create_wallet_for_cc(
@ -304,11 +296,7 @@ class TestWalletSimulator:
await self.time_out_assert(15, cc_wallet.get_confirmed_balance, 100)
await self.time_out_assert(15, cc_wallet.get_unconfirmed_balance, 100)
<<<<<<< HEAD
assert cc_wallet.cc_info.my_core is not None
=======
assert cc_wallet.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
colour = cc_wallet_puzzles.get_genesis_from_core(cc_wallet.cc_info.my_core)
cc_wallet_2: CCWallet = await CCWallet.create_wallet_for_cc(
@ -396,11 +384,7 @@ class TestWalletSimulator:
await self.time_out_assert(15, cc_wallet.get_confirmed_balance, 100)
await self.time_out_assert(15, cc_wallet.get_unconfirmed_balance, 100)
<<<<<<< HEAD
assert cc_wallet.cc_info.my_core is not None
=======
assert cc_wallet.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
colour = cc_wallet_puzzles.get_genesis_from_core(cc_wallet.cc_info.my_core)
cc_wallet_2: CCWallet = await CCWallet.create_wallet_for_cc(
@ -474,11 +458,7 @@ class TestWalletSimulator:
await self.time_out_assert(15, red_wallet.get_confirmed_balance, 100)
await self.time_out_assert(15, red_wallet.get_unconfirmed_balance, 100)
<<<<<<< HEAD
assert red_wallet.cc_info.my_core is not None
=======
assert red_wallet.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
red = cc_wallet_puzzles.get_genesis_from_core(red_wallet.cc_info.my_core)
await full_node_1.farm_new_block(FarmNewBlockProtocol(ph2))
@ -491,11 +471,7 @@ class TestWalletSimulator:
for i in range(1, num_blocks):
await full_node_1.farm_new_block(FarmNewBlockProtocol(ph))
<<<<<<< HEAD
assert blue_wallet_2.cc_info.my_core is not None
=======
assert blue_wallet_2.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
blue = cc_wallet_puzzles.get_genesis_from_core(blue_wallet_2.cc_info.my_core)
red_wallet_2: CCWallet = await CCWallet.create_wallet_for_cc(
@ -600,11 +576,7 @@ class TestWalletSimulator:
await self.time_out_assert(15, cc_wallet.get_unconfirmed_balance, 100)
await self.time_out_assert(15, cc_wallet.get_confirmed_balance, 100)
<<<<<<< HEAD
assert cc_wallet.cc_info.my_core is not None
=======
assert cc_wallet.cc_info.my_core
>>>>>>> efc3113afac720de5e9da011cff5d96cdc96cf1c
colour = cc_wallet_puzzles.get_genesis_from_core(cc_wallet.cc_info.my_core)
cc_wallet_2: CCWallet = await CCWallet.create_wallet_for_cc(

View File

@ -22,6 +22,7 @@ test_constants: Dict[str, Any] = {
"DISCRIMINANT_SIZE_BITS": 16,
"BLOCK_TIME_TARGET": 10,
"MIN_BLOCK_TIME": 2,
"MIN_ITERS_STARTING": 100,
"DIFFICULTY_EPOCH": 12, # The number of blocks per epoch
"DIFFICULTY_DELAY": 3, # EPOCH / WARP_FACTOR
}
@ -59,9 +60,8 @@ class TestBlockStore:
db = await BlockStore.create(connection)
db_2 = await BlockStore.create(connection_2)
db_3 = await BlockStore.create(connection_3)
try:
await db._clear_database()
genesis = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
# Save/get block
@ -77,16 +77,27 @@ class TestBlockStore:
# Test LCA
assert (await db.get_lca()) is None
await db.set_lca(blocks[-3].header_hash)
assert (await db.get_lca()) == blocks[-3].header
await db.set_tips([blocks[-2].header_hash, blocks[-1].header_hash])
assert (await db.get_tips()) == [blocks[-2].header, blocks[-1].header]
unspent_store = await CoinStore.create(connection)
b: Blockchain = await Blockchain.create(unspent_store, db, test_constants)
coin_store: CoinStore = await CoinStore.create(connection_3)
b: Blockchain = await Blockchain.create(coin_store, db_3, test_constants)
assert (await db.get_lca()) == blocks[-3].header_hash
assert b.lca_block.header_hash == (await db.get_lca())
assert b.lca_block == genesis.header
assert b.tips == [genesis.header]
b_2: Blockchain = await Blockchain.create(unspent_store, db, test_constants)
assert (await db.get_lca()) == blocks[-3].header_hash
assert b_2.lca_block.header_hash == (await db.get_lca())
for block in blocks:
await b.receive_block(block)
assert b.lca_block == blocks[-3].header
assert set(b.tips) == set(
[blocks[-3].header, blocks[-2].header, blocks[-1].header]
)
left = sorted(b.tips, key=lambda t: t.height)
right = sorted((await db_3.get_tips()), key=lambda t: t.height)
assert left == right
except Exception:
await connection.close()
@ -94,6 +105,7 @@ class TestBlockStore:
await connection_3.close()
db_filename.unlink()
db_filename_2.unlink()
db_filename_3.unlink()
raise
await connection.close()
@ -102,3 +114,27 @@ class TestBlockStore:
db_filename.unlink()
db_filename_2.unlink()
db_filename_3.unlink()
# @pytest.mark.asyncio
# async def test_deadlock(self):
# blocks = bt.get_consecutive_blocks(test_constants, 10, [], 9, b"0")
# db_filename = Path("blockchain_test.db")
# if db_filename.exists():
# db_filename.unlink()
# connection = await aiosqlite.connect(db_filename)
# db = await BlockStore.create(connection)
# tasks = []
# for i in range(10000):
# rand_i = random.randint(0, 10)
# if random.random() < 0.5:
# tasks.append(asyncio.create_task(db.add_block(blocks[rand_i])))
# if random.random() < 0.5:
# tasks.append(
# asyncio.create_task(db.get_block(blocks[rand_i].header_hash))
# )
# await asyncio.gather(*tasks)
# await connection.close()
# db_filename.unlink()

View File

@ -8,17 +8,18 @@ import pytest
from blspy import PrivateKey
from src.full_node.blockchain import Blockchain, ReceiveBlockResult
from src.full_node.store import FullNodeStore
from src.types.full_block import FullBlock
from src.types.header import Header, HeaderData
from src.types.proof_of_space import ProofOfSpace
from src.full_node.coin_store import CoinStore
from src.util.ints import uint8, uint64
from src.consensus.constants import constants as consensus_constants
from tests.block_tools import BlockTools
from src.util.errors import Err
from src.consensus.coinbase import create_coinbase_coin_and_signature
from src.types.sized_bytes import bytes32
from src.full_node.block_store import BlockStore
from src.full_node.coin_store import CoinStore
bt = BlockTools()
test_constants: Dict[str, Any] = consensus_constants.copy()
@ -48,11 +49,12 @@ class TestGenesisBlock:
@pytest.mark.asyncio
async def test_basic_blockchain(self):
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
bc1 = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
bc1 = await Blockchain.create(coin_store, store, test_constants)
assert len(bc1.get_current_tips()) == 1
genesis_block = bc1.get_current_tips()[0]
assert genesis_block.height == 0
@ -72,11 +74,12 @@ class TestBlockValidation:
"""
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 10)
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
store = await FullNodeStore.create(connection)
await store._clear_database()
unspent_store = await CoinStore.create(connection)
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
store = await BlockStore.create(connection)
coin_store = await CoinStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, 9):
result, removed, error_code = await b.receive_block(blocks[i])
assert result == ReceiveBlockResult.ADDED_TO_HEAD
@ -495,11 +498,12 @@ class TestBlockValidation:
# Make it 5x faster than target time
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 2)
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, num_blocks):
result, removed, error_code = await b.receive_block(blocks[i])
assert result == ReceiveBlockResult.ADDED_TO_HEAD
@ -527,11 +531,12 @@ class TestReorgs:
async def test_basic_reorg(self):
blocks = bt.get_consecutive_blocks(test_constants, 100, [], 9)
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, len(blocks)):
await b.receive_block(blocks[i])
@ -558,11 +563,12 @@ class TestReorgs:
async def test_reorg_from_genesis(self):
blocks = bt.get_consecutive_blocks(test_constants, 20, [], 9, b"0")
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, len(blocks)):
await b.receive_block(blocks[i])
assert b.get_current_tips()[0].height == 20
@ -601,11 +607,12 @@ class TestReorgs:
async def test_lca(self):
blocks = bt.get_consecutive_blocks(test_constants, 5, [], 9, b"0")
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, len(blocks)):
await b.receive_block(blocks[i])
@ -634,11 +641,12 @@ class TestReorgs:
blocks_reorg = bt.get_consecutive_blocks(test_constants, 3, blocks[:9], 9, b"9")
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, len(blocks)):
await b.receive_block(blocks[i])
@ -671,11 +679,12 @@ class TestReorgs:
async def test_get_header_hashes(self):
blocks = bt.get_consecutive_blocks(test_constants, 5, [], 9, b"0")
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
for i in range(1, len(blocks)):
await b.receive_block(blocks[i])

View File

@ -6,8 +6,8 @@ import aiosqlite
import pytest
from src.full_node.blockchain import Blockchain, ReceiveBlockResult
from src.full_node.store import FullNodeStore
from src.full_node.coin_store import CoinStore
from src.full_node.block_store import BlockStore
from tests.block_tools import BlockTools
from src.consensus.constants import constants as consensus_constants
@ -36,15 +36,16 @@ def event_loop():
yield loop
class TestUnspent:
class TestCoinStore:
@pytest.mark.asyncio
async def test_basic_unspent_store(self):
async def test_basic_coin_store(self):
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db_path = Path("fndb_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
db = await CoinStore.create(connection)
await db._clear_database()
# Save/get block
for block in blocks:
@ -62,9 +63,10 @@ class TestUnspent:
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db_path = Path("fndb_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
db = await CoinStore.create(connection)
await db._clear_database()
# Save/get block
for block in blocks:
@ -89,9 +91,10 @@ class TestUnspent:
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db_path = Path("fndb_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
db = await CoinStore.create(connection)
await db._clear_database()
# Save/get block
for block in blocks:
@ -128,11 +131,12 @@ class TestUnspent:
async def test_basic_reorg(self):
blocks = bt.get_consecutive_blocks(test_constants, 100, [], 9)
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
try:
for i in range(1, len(blocks)):
@ -140,10 +144,10 @@ class TestUnspent:
assert b.get_current_tips()[0].height == 100
for c, block in enumerate(blocks):
unspent = await unspent_store.get_coin_record(
unspent = await coin_store.get_coin_record(
block.header.data.coinbase.name(), block.header
)
unspent_fee = await unspent_store.get_coin_record(
unspent_fee = await coin_store.get_coin_record(
block.header.data.fees_coin.name(), block.header
)
assert unspent.spent == 0
@ -166,7 +170,7 @@ class TestUnspent:
assert result == ReceiveBlockResult.ADDED_AS_ORPHAN
elif reorg_block.height >= 100:
assert result == ReceiveBlockResult.ADDED_TO_HEAD
unspent = await unspent_store.get_coin_record(
unspent = await coin_store.get_coin_record(
reorg_block.header.data.coinbase.name(), reorg_block.header
)
assert unspent.name == reorg_block.header.data.coinbase.name()
@ -188,21 +192,22 @@ class TestUnspent:
num_blocks = 20
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 9)
db_path = Path("blockchain_test.db")
if db_path.exists():
db_path.unlink()
connection = await aiosqlite.connect(db_path)
unspent_store = await CoinStore.create(connection)
store = await FullNodeStore.create(connection)
await store._clear_database()
b: Blockchain = await Blockchain.create(unspent_store, store, test_constants)
coin_store = await CoinStore.create(connection)
store = await BlockStore.create(connection)
b: Blockchain = await Blockchain.create(coin_store, store, test_constants)
try:
for i in range(1, len(blocks)):
await b.receive_block(blocks[i])
assert b.get_current_tips()[0].height == num_blocks
unspent = await unspent_store.get_coin_record(
unspent = await coin_store.get_coin_record(
blocks[1].header.data.coinbase.name(), blocks[-1].header
)
unspent_puzzle_hash = unspent.coin.puzzle_hash
coins = await unspent_store.get_coin_records_by_puzzle_hash(
coins = await coin_store.get_coin_records_by_puzzle_hash(
unspent_puzzle_hash, blocks[-1].header
)
assert len(coins) == (num_blocks + 1) * 2

View File

@ -34,7 +34,7 @@ from src.util.errors import Err, ConsensusError
async def get_block_path(full_node: FullNode):
blocks_list = [(await full_node.blockchain.get_full_tips())[0]]
while blocks_list[0].height != 0:
b = await full_node.store.get_block(blocks_list[0].prev_header_hash)
b = await full_node.block_store.get_block(blocks_list[0].prev_header_hash)
assert b is not None
blocks_list.insert(0, b)
return blocks_list
@ -703,12 +703,12 @@ class TestFullNodeProtocol:
)
# In sync mode
full_node_1.store.set_sync_mode(True)
full_node_1.full_node_store.set_sync_mode(True)
msgs = [
_ async for _ in full_node_1.respond_block(fnp.RespondBlock(blocks_new[-5]))
]
assert len(msgs) == 0
full_node_1.store.set_sync_mode(False)
full_node_1.full_node_store.set_sync_mode(False)
# If invalid, do nothing
block_invalid = FullBlock(

View File

@ -7,8 +7,7 @@ import random
import aiosqlite
import pytest
from src.full_node.store import FullNodeStore
from src.full_node.coin_store import CoinStore
from src.full_node.full_node_store import FullNodeStore
from src.full_node.blockchain import Blockchain
from src.types.full_block import FullBlock
from src.types.sized_bytes import bytes32
@ -22,6 +21,7 @@ test_constants: Dict[str, Any] = {
"DISCRIMINANT_SIZE_BITS": 16,
"BLOCK_TIME_TARGET": 10,
"MIN_BLOCK_TIME": 2,
"MIN_ITERS_STARTING": 100,
"DIFFICULTY_EPOCH": 12, # The number of blocks per epoch
"DIFFICULTY_DELAY": 3, # EPOCH / WARP_FACTOR
}
@ -36,7 +36,7 @@ def event_loop():
yield loop
class TestStore:
class TestFullNodeStore:
@pytest.mark.asyncio
async def test_basic_store(self):
assert sqlite3.threadsafety == 1
@ -60,37 +60,11 @@ class TestStore:
db = await FullNodeStore.create(connection)
db_2 = await FullNodeStore.create(connection_2)
try:
await db._clear_database()
genesis = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
# Save/get block
for block in blocks:
await db.add_block(block)
assert block == await db.get_block(block.header_hash)
await db.add_block(blocks_alt[2])
assert len(await db.get_blocks_at([1, 2])) == 3
# Get headers (added alt block also, so +1)
assert len(await db.get_headers()) == len(blocks) + 1
# Save/get sync
for sync_mode in (False, True):
db.set_sync_mode(sync_mode)
assert sync_mode == db.get_sync_mode()
# clear sync info
await db.clear_sync_info()
# add/get potential tip, get potential tips num
db.add_potential_tip(blocks[6])
assert blocks[6] == db.get_potential_tip(blocks[6].header_hash)
# Add potential block
await db.add_potential_block(genesis)
assert genesis == await db.get_potential_block(uint32(0))
# Add/get candidate block
assert db.get_candidate_block(0) is None
partial = (
@ -120,15 +94,15 @@ class TestStore:
key = (block.header_hash, uint64(1000))
# Different database should have different data
db_2.add_unfinished_block(key, block)
await db_2.add_unfinished_block(key, block)
assert db.get_unfinished_block(key) is None
db.add_unfinished_block(key, block)
assert db.get_unfinished_block(key) == block
assert len(db.get_unfinished_blocks()) == i
assert await db.get_unfinished_block(key) is None
await db.add_unfinished_block(key, block)
assert await db.get_unfinished_block(key) == block
assert len(await db.get_unfinished_blocks()) == i
i += 1
db.clear_unfinished_blocks_below(uint32(5))
assert len(db.get_unfinished_blocks()) == 5
await db.clear_unfinished_blocks_below(uint32(5))
assert len(await db.get_unfinished_blocks()) == 5
# Set/get unf block leader
assert db.get_unfinished_block_leader() == (0, (1 << 64) - 1)
@ -147,22 +121,9 @@ class TestStore:
h_hash_1 = bytes32(token_bytes(32))
assert not db.seen_unfinished_block(h_hash_1)
assert db.seen_unfinished_block(h_hash_1)
db.clear_seen_unfinished_blocks()
await db.clear_seen_unfinished_blocks()
assert not db.seen_unfinished_block(h_hash_1)
# Test LCA
assert (await db.get_lca()) is None
unspent_store = await CoinStore.create(connection)
b: Blockchain = await Blockchain.create(unspent_store, db, test_constants)
assert (await db.get_lca()) == blocks[-3].header_hash
assert b.lca_block.header_hash == (await db.get_lca())
b_2: Blockchain = await Blockchain.create(unspent_store, db, test_constants)
assert (await db.get_lca()) == blocks[-3].header_hash
assert b_2.lca_block.header_hash == (await db.get_lca())
except Exception:
await connection.close()
await connection_2.close()
@ -181,37 +142,3 @@ class TestStore:
db_filename.unlink()
db_filename_2.unlink()
db_filename_3.unlink()
@pytest.mark.asyncio
async def test_deadlock(self):
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 9, b"0")
db_filename = Path("blockchain_test.db")
if db_filename.exists():
db_filename.unlink()
connection = await aiosqlite.connect(db_filename)
db = await FullNodeStore.create(connection)
tasks = []
for i in range(10000):
rand_i = random.randint(0, 10)
if random.random() < 0.5:
tasks.append(asyncio.create_task(db.add_block(blocks[rand_i])))
if random.random() < 0.5:
tasks.append(
asyncio.create_task(db.add_potential_block(blocks[rand_i]))
)
if random.random() < 0.5:
tasks.append(
asyncio.create_task(db.get_block(blocks[rand_i].header_hash))
)
if random.random() < 0.5:
tasks.append(
asyncio.create_task(
db.get_potential_block(blocks[rand_i].header_hash)
)
)
await asyncio.gather(*tasks)
await connection.close()
db_filename.unlink()

View File

@ -0,0 +1,110 @@
import asyncio
from secrets import token_bytes
from pathlib import Path
from typing import Any, Dict
import sqlite3
import random
import aiosqlite
import pytest
from src.full_node.sync_store import SyncStore
from src.full_node.blockchain import Blockchain
from src.types.full_block import FullBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64
from tests.block_tools import BlockTools
bt = BlockTools()
test_constants: Dict[str, Any] = {
"DIFFICULTY_STARTING": 5,
"DISCRIMINANT_SIZE_BITS": 16,
"BLOCK_TIME_TARGET": 10,
"MIN_BLOCK_TIME": 2,
"MIN_ITERS_STARTING": 100,
"DIFFICULTY_EPOCH": 12, # The number of blocks per epoch
"DIFFICULTY_DELAY": 3, # EPOCH / WARP_FACTOR
}
test_constants["GENESIS_BLOCK"] = bytes(
bt.create_genesis_block(test_constants, bytes([0] * 32), b"0")
)
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop()
yield loop
class TestStore:
@pytest.mark.asyncio
async def test_basic_store(self):
assert sqlite3.threadsafety == 1
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
blocks_alt = bt.get_consecutive_blocks(test_constants, 3, [], 9, b"1")
db_filename = Path("blockchain_test.db")
db_filename_2 = Path("blockchain_test_2.db")
if db_filename.exists():
db_filename.unlink()
if db_filename_2.exists():
db_filename_2.unlink()
connection = await aiosqlite.connect(db_filename)
connection_2 = await aiosqlite.connect(db_filename_2)
db = await SyncStore.create(connection)
db_2 = await SyncStore.create(connection_2)
try:
genesis = FullBlock.from_bytes(test_constants["GENESIS_BLOCK"])
# clear sync info
await db.clear_sync_info()
# add/get potential tip, get potential tips num
db.add_potential_tip(blocks[6])
assert blocks[6] == db.get_potential_tip(blocks[6].header_hash)
# Add potential block
await db.add_potential_block(genesis)
assert genesis == await db.get_potential_block(uint32(0))
except Exception:
await connection.close()
await connection_2.close()
db_filename.unlink()
db_filename_2.unlink()
raise
await connection.close()
await connection_2.close()
db_filename.unlink()
db_filename_2.unlink()
@pytest.mark.asyncio
async def test_deadlock(self):
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 9, b"0")
db_filename = Path("blockchain_test.db")
if db_filename.exists():
db_filename.unlink()
connection = await aiosqlite.connect(db_filename)
db = await SyncStore.create(connection)
tasks = []
for i in range(10000):
rand_i = random.randint(0, 10)
if random.random() < 0.5:
tasks.append(
asyncio.create_task(db.add_potential_block(blocks[rand_i]))
)
if random.random() < 0.5:
tasks.append(
asyncio.create_task(
db.get_potential_block(blocks[rand_i].header_hash)
)
)
await asyncio.gather(*tasks)
await connection.close()
db_filename.unlink()

View File

@ -9,7 +9,6 @@ from secrets import token_bytes
from src.full_node.blockchain import Blockchain
from src.full_node.mempool_manager import MempoolManager
from src.full_node.store import FullNodeStore
from src.full_node.full_node import FullNode
from src.server.connection import NodeType
from src.server.server import ChiaServer
@ -58,7 +57,6 @@ async def setup_full_node_simulator(db_name, port, introducer_port=None, dic={})
for k in dic.keys():
test_constants_copy[k] = dic[k]
db_path = Path(db_name)
db_path = root_path / f"{db_name}"
if db_path.exists():
db_path.unlink()
@ -74,10 +72,9 @@ async def setup_full_node_simulator(db_name, port, introducer_port=None, dic={})
config["introducer_peer"]["host"] = "127.0.0.1"
config["introducer_peer"]["port"] = introducer_port
full_node_1 = await FullNodeSimulator.create(
config,
f"full_node_{port}",
test_constants_copy,
config, f"full_node_{port}", test_constants_copy,
)
print("fULL NODE IS", full_node_1)
assert ping_interval is not None
assert network_id is not None
server_1 = ChiaServer(
@ -108,7 +105,9 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
for k in dic.keys():
test_constants_copy[k] = dic[k]
Path(db_name).unlink()
db_path = root_path / f"{db_name}"
if db_path.exists():
db_path.unlink()
net_config = load_config(root_path, "config.yaml")
ping_interval = net_config.get("ping_interval")
@ -120,9 +119,7 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
config["introducer_peer"]["host"] = "127.0.0.1"
config["introducer_peer"]["port"] = introducer_port
full_node_1 = await FullNode.create(
config,
f"full_node_{port}",
test_constants_copy,
config, f"full_node_{port}", test_constants_copy,
)
assert ping_interval is not None
assert network_id is not None
@ -141,10 +138,11 @@ async def setup_full_node(db_name, port, introducer_port=None, dic={}):
yield (full_node_1, server_1)
# TEARDOWN
full_node_1._shutdown()
await full_node_1._shutdown()
server_1.close_all()
await connection.close()
Path(db_name).unlink()
db_path = root_path / f"{db_name}"
if db_path.exists():
db_path.unlink()
async def setup_wallet_node(port, introducer_port=None, key_seed=b"", dic={}):

View File

@ -7,7 +7,6 @@ import random
import pytest
import aiosqlite
from blspy import PrivateKey
from src.full_node.store import FullNodeStore
from src.types.full_block import FullBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64

View File

@ -7,7 +7,6 @@ import aiosqlite
import random
import pytest
from src.full_node.store import FullNodeStore
from src.types.full_block import FullBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint32, uint64, uint128