# flake8: noqa: F811, F401 import asyncio import sys from typing import Dict, List, Optional, Tuple import aiosqlite import pytest from chia.consensus.block_header_validation import validate_finished_header_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain import Blockchain from chia.consensus.default_constants import DEFAULT_CONSTANTS from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty from chia.consensus.full_block_to_block_record import block_to_block_record from chia.full_node.block_store import BlockStore from chia.full_node.coin_store import CoinStore from chia.server.start_full_node import SERVICE_NAME from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary from chia.util.block_cache import BlockCache from chia.util.block_tools import test_constants from chia.util.config import load_config from chia.util.default_root import DEFAULT_ROOT_PATH from chia.util.generator_tools import get_block_header from tests.setup_nodes import bt try: from reprlib import repr except ImportError: pass from chia.consensus.pot_iterations import calculate_iterations_quality from chia.full_node.weight_proof import ( # type: ignore WeightProofHandler, _map_sub_epoch_summaries, _validate_sub_epoch_segments, _validate_summaries_weight, ) from chia.types.full_block import FullBlock from chia.types.header_block import HeaderBlock from chia.util.ints import uint32, uint64 from tests.core.fixtures import ( default_400_blocks, default_1000_blocks, default_10000_blocks, default_10000_blocks_compact, pre_genesis_empty_slots_1000_blocks, ) @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop() yield loop def count_sub_epochs(blockchain, last_hash) -> int: curr = blockchain._sub_blocks[last_hash] count = 0 while True: if curr.height == 0: break # next sub block curr = blockchain._sub_blocks[curr.prev_hash] # if end of sub-epoch if curr.sub_epoch_summary_included is not None: count += 1 return count def get_prev_ses_block(sub_blocks, last_hash) -> Tuple[BlockRecord, int]: curr = sub_blocks[last_hash] blocks = 1 while curr.height != 0: # next sub block curr = sub_blocks[curr.prev_hash] # if end of sub-epoch if curr.sub_epoch_summary_included is not None: return curr, blocks blocks += 1 assert False async def load_blocks_dont_validate( blocks, ) -> Tuple[ Dict[bytes32, HeaderBlock], Dict[uint32, bytes32], Dict[bytes32, BlockRecord], Dict[bytes32, SubEpochSummary] ]: header_cache: Dict[bytes32, HeaderBlock] = {} height_to_hash: Dict[uint32, bytes32] = {} sub_blocks: Dict[bytes32, BlockRecord] = {} sub_epoch_summaries: Dict[bytes32, SubEpochSummary] = {} prev_block = None difficulty = test_constants.DIFFICULTY_STARTING block: FullBlock for block in blocks: if block.height > 0: assert prev_block is not None difficulty = block.reward_chain_block.weight - prev_block.weight if block.reward_chain_block.challenge_chain_sp_vdf is None: assert block.reward_chain_block.signage_point_index == 0 cc_sp: bytes32 = block.reward_chain_block.pos_ss_cc_challenge_hash else: cc_sp = block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash() quality_string: Optional[bytes32] = block.reward_chain_block.proof_of_space.verify_and_get_quality_string( test_constants, block.reward_chain_block.pos_ss_cc_challenge_hash, cc_sp, ) assert quality_string is not None required_iters: uint64 = calculate_iterations_quality( test_constants.DIFFICULTY_CONSTANT_FACTOR, quality_string, block.reward_chain_block.proof_of_space.size, difficulty, cc_sp, ) sub_block = block_to_block_record( test_constants, BlockCache(sub_blocks, height_to_hash), required_iters, block, None ) sub_blocks[block.header_hash] = sub_block height_to_hash[block.height] = block.header_hash header_cache[block.header_hash] = get_block_header(block, [], []) if sub_block.sub_epoch_summary_included is not None: sub_epoch_summaries[block.height] = sub_block.sub_epoch_summary_included prev_block = block return header_cache, height_to_hash, sub_blocks, sub_epoch_summaries async def _test_map_summaries(blocks, header_cache, height_to_hash, sub_blocks, summaries): curr = sub_blocks[blocks[-1].header_hash] orig_summaries: Dict[int, SubEpochSummary] = {} while curr.height > 0: if curr.sub_epoch_summary_included is not None: orig_summaries[curr.height] = curr.sub_epoch_summary_included # next sub block curr = sub_blocks[curr.prev_hash] wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None # sub epoch summaries validate hashes summaries, sub_epoch_data_weight, _ = _map_sub_epoch_summaries( test_constants.SUB_EPOCH_BLOCKS, test_constants.GENESIS_CHALLENGE, wp.sub_epochs, test_constants.DIFFICULTY_STARTING, ) assert len(summaries) == len(orig_summaries) class TestWeightProof: @pytest.mark.asyncio async def test_weight_proof_map_summaries_1(self, default_400_blocks): header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(default_400_blocks) await _test_map_summaries(default_400_blocks, header_cache, height_to_hash, sub_blocks, summaries) @pytest.mark.asyncio async def test_weight_proof_map_summaries_2(self, default_1000_blocks): header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(default_1000_blocks) await _test_map_summaries(default_1000_blocks, header_cache, height_to_hash, sub_blocks, summaries) @pytest.mark.asyncio async def test_weight_proof_summaries_1000_blocks(self, default_1000_blocks): blocks = default_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) summaries, sub_epoch_data_weight, _ = _map_sub_epoch_summaries( wpf.constants.SUB_EPOCH_BLOCKS, wpf.constants.GENESIS_CHALLENGE, wp.sub_epochs, wpf.constants.DIFFICULTY_STARTING, ) assert _validate_summaries_weight(test_constants, sub_epoch_data_weight, summaries, wp) # assert res is not None @pytest.mark.asyncio async def test_weight_proof_bad_peak_hash(self, default_1000_blocks): blocks = default_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(b"sadgfhjhgdgsfadfgh") assert wp is None @pytest.mark.asyncio @pytest.mark.skip(reason="broken") async def test_weight_proof_from_genesis(self, default_400_blocks): blocks = default_400_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None @pytest.mark.asyncio async def test_weight_proof_edge_cases(self, default_400_blocks): blocks: List[FullBlock] = default_400_blocks blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=2 ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=1 ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=2 ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=4, normalized_to_identity_cc_eos=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 10, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=4, normalized_to_identity_icc_eos=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 10, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=4, normalized_to_identity_cc_ip=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 10, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=4, normalized_to_identity_cc_sp=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 1, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, skip_slots=4 ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 10, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=True, ) blocks: List[FullBlock] = bt.get_consecutive_blocks( 300, block_list_input=blocks, seed=b"asdfghjkl", force_overflow=False, ) header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_weight_proof1000(self, default_1000_blocks): blocks = default_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_weight_proof1000_pre_genesis_empty_slots(self, pre_genesis_empty_slots_1000_blocks): blocks = pre_genesis_empty_slots_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_weight_proof10000__blocks_compact(self, default_10000_blocks_compact): blocks = default_10000_blocks_compact header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_weight_proof1000_partial_blocks_compact(self, default_10000_blocks_compact): blocks: List[FullBlock] = bt.get_consecutive_blocks( 100, block_list_input=default_10000_blocks_compact, seed=b"asdfghjkl", normalized_to_identity_cc_ip=True, normalized_to_identity_cc_eos=True, normalized_to_identity_icc_eos=True, ) header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_weight_proof10000(self, default_10000_blocks): blocks = default_10000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) assert wp is not None wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, {}, height_to_hash, {})) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_check_num_of_samples(self, default_10000_blocks): blocks = default_10000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf.get_proof_of_weight(blocks[-1].header_hash) curr = -1 samples = 0 for sub_epoch_segment in wp.sub_epoch_segments: if sub_epoch_segment.sub_epoch_n > curr: curr = sub_epoch_segment.sub_epoch_n samples += 1 assert samples <= wpf.MAX_SAMPLES @pytest.mark.asyncio async def test_weight_proof_extend_no_ses(self, default_1000_blocks): blocks = default_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) last_ses_height = sorted(summaries.keys())[-1] wpf_synced = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf_synced.get_proof_of_weight(blocks[last_ses_height].header_hash) assert wp is not None # todo for each sampled sub epoch, validate number of segments wpf_not_synced = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) valid, fork_point, _ = await wpf_not_synced.validate_weight_proof(wp) assert valid assert fork_point == 0 # extend proof with 100 blocks new_wp = await wpf_synced._create_proof_of_weight(blocks[-1].header_hash) valid, fork_point, _ = await wpf_not_synced.validate_weight_proof(new_wp) assert valid assert fork_point == 0 @pytest.mark.asyncio async def test_weight_proof_extend_new_ses(self, default_1000_blocks): blocks = default_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) # delete last summary last_ses_height = sorted(summaries.keys())[-1] last_ses = summaries[last_ses_height] del summaries[last_ses_height] wpf_synced = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wp = await wpf_synced.get_proof_of_weight(blocks[last_ses_height - 10].header_hash) assert wp is not None wpf_not_synced = WeightProofHandler(test_constants, BlockCache(sub_blocks, height_to_hash, header_cache, {})) valid, fork_point, _ = await wpf_not_synced.validate_weight_proof(wp) assert valid assert fork_point == 0 # extend proof with 100 blocks wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) summaries[last_ses_height] = last_ses wpf_synced.blockchain = BlockCache(sub_blocks, header_cache, height_to_hash, summaries) new_wp = await wpf_synced._create_proof_of_weight(blocks[-1].header_hash) valid, fork_point, _ = await wpf_not_synced.validate_weight_proof(new_wp) assert valid assert fork_point == 0 wpf_synced.blockchain = BlockCache(sub_blocks, header_cache, height_to_hash, summaries) new_wp = await wpf_synced._create_proof_of_weight(blocks[last_ses_height].header_hash) valid, fork_point, _ = await wpf_not_synced.validate_weight_proof(new_wp) assert valid assert fork_point == 0 valid, fork_point, _ = await wpf.validate_weight_proof(new_wp) assert valid assert fork_point != 0 @pytest.mark.asyncio async def test_weight_proof_extend_multiple_ses(self, default_1000_blocks): blocks = default_1000_blocks header_cache, height_to_hash, sub_blocks, summaries = await load_blocks_dont_validate(blocks) last_ses_height = sorted(summaries.keys())[-1] last_ses = summaries[last_ses_height] before_last_ses_height = sorted(summaries.keys())[-2] before_last_ses = summaries[before_last_ses_height] wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) wpf_verify = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, {})) for x in range(10, -1, -1): wp = await wpf.get_proof_of_weight(blocks[before_last_ses_height - x].header_hash) assert wp is not None valid, fork_point, _ = await wpf_verify.validate_weight_proof(wp) assert valid assert fork_point == 0 # extend proof with 100 blocks summaries[last_ses_height] = last_ses summaries[before_last_ses_height] = before_last_ses wpf = WeightProofHandler(test_constants, BlockCache(sub_blocks, header_cache, height_to_hash, summaries)) new_wp = await wpf._create_proof_of_weight(blocks[-1].header_hash) valid, fork_point, _ = await wpf.validate_weight_proof(new_wp) assert valid assert fork_point != 0 @pytest.mark.skip("used for debugging") @pytest.mark.asyncio async def test_weight_proof_from_database(self): connection = await aiosqlite.connect("path to db") block_store: BlockStore = await BlockStore.create(connection) blocks = await block_store.get_block_records_in_range(0, 0xFFFFFFFF) peak = len(blocks) - 1 peak_height = blocks[peak].height headers = await block_store.get_header_blocks_in_range(0, peak_height) sub_height_to_hash = {} sub_epoch_summaries = {} # peak_header = await block_store.get_full_blocks_at([peak_height]) if len(blocks) == 0: return None, None assert peak is not None # Sets the other state variables (peak_height and height_to_hash) curr: BlockRecord = blocks[peak] while True: sub_height_to_hash[curr.height] = curr.header_hash if curr.sub_epoch_summary_included is not None: sub_epoch_summaries[curr.height] = curr.sub_epoch_summary_included if curr.height == 0: break curr = blocks[curr.prev_hash] assert len(sub_height_to_hash) == peak_height + 1 block_cache = BlockCache(blocks, headers, sub_height_to_hash, sub_epoch_summaries) wpf = WeightProofHandler(DEFAULT_CONSTANTS, block_cache) wp = await wpf._create_proof_of_weight(sub_height_to_hash[peak_height - 50]) valid, fork_point = wpf.validate_weight_proof_single_proc(wp) await connection.close() assert valid print(f"size of proof is {get_size(wp)}") def get_size(obj, seen=None): """Recursively finds size of objects""" size = sys.getsizeof(obj) if seen is None: seen = set() obj_id = id(obj) if obj_id in seen: return 0 # Important mark as seen *before* entering recursion to gracefully handle # self-referential objects seen.add(obj_id) if isinstance(obj, dict): size += sum([get_size(v, seen) for v in obj.values()]) size += sum([get_size(k, seen) for k in obj.keys()]) elif hasattr(obj, "__dict__"): size += get_size(obj.__dict__, seen) elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)): size += sum([get_size(i, seen) for i in obj]) return size