import asyncio import logging import traceback from concurrent.futures.process import ProcessPoolExecutor from dataclasses import dataclass from typing import Dict, List, Optional, Sequence, Tuple, Union, Callable from chia.consensus.block_header_validation import validate_finished_header_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain_interface import BlockchainInterface from chia.consensus.constants import ConsensusConstants from chia.consensus.cost_calculator import NPCResult from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty from chia.consensus.full_block_to_block_record import block_to_block_record from chia.consensus.get_block_challenge import get_block_challenge from chia.consensus.pot_iterations import calculate_iterations_quality, is_overflow_block from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions from chia.types.blockchain_format.coin import Coin from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.full_block import FullBlock from chia.types.generator_types import BlockGenerator from chia.types.header_block import HeaderBlock from chia.util.block_cache import BlockCache from chia.util.errors import Err from chia.util.generator_tools import get_block_header, tx_removals_and_additions from chia.util.ints import uint16, uint64, uint32 from chia.util.streamable import Streamable, dataclass_from_dict, streamable log = logging.getLogger(__name__) @dataclass(frozen=True) @streamable class PreValidationResult(Streamable): error: Optional[uint16] required_iters: Optional[uint64] # Iff error is None npc_result: Optional[NPCResult] # Iff error is None and block is a transaction block def batch_pre_validate_blocks( constants_dict: Dict, blocks_pickled: Dict[bytes, bytes], full_blocks_pickled: Optional[List[bytes]], header_blocks_pickled: Optional[List[bytes]], prev_transaction_generators: List[Optional[bytes]], npc_results: Dict[uint32, bytes], check_filter: bool, expected_difficulty: List[uint64], expected_sub_slot_iters: List[uint64], ) -> List[bytes]: blocks = {} for k, v in blocks_pickled.items(): blocks[k] = BlockRecord.from_bytes(v) results: List[PreValidationResult] = [] constants: ConsensusConstants = dataclass_from_dict(ConsensusConstants, constants_dict) if full_blocks_pickled is not None and header_blocks_pickled is not None: assert ValueError("Only one should be passed here") if full_blocks_pickled is not None: for i in range(len(full_blocks_pickled)): try: block: FullBlock = FullBlock.from_bytes(full_blocks_pickled[i]) tx_additions: List[Coin] = [] removals: List[bytes32] = [] npc_result: Optional[NPCResult] = None if block.height in npc_results: npc_result = NPCResult.from_bytes(npc_results[block.height]) assert npc_result is not None if npc_result.npc_list is not None: removals, tx_additions = tx_removals_and_additions(npc_result.npc_list) else: removals, tx_additions = [], [] if block.transactions_generator is not None and npc_result is None: prev_generator_bytes = prev_transaction_generators[i] assert prev_generator_bytes is not None assert block.transactions_info is not None block_generator: BlockGenerator = BlockGenerator.from_bytes(prev_generator_bytes) assert block_generator.program == block.transactions_generator npc_result = get_name_puzzle_conditions( block_generator, min(constants.MAX_BLOCK_COST_CLVM, block.transactions_info.cost), True ) removals, tx_additions = tx_removals_and_additions(npc_result.npc_list) header_block = get_block_header(block, tx_additions, removals) required_iters, error = validate_finished_header_block( constants, BlockCache(blocks), header_block, check_filter, expected_difficulty[i], expected_sub_slot_iters[i], ) error_int: Optional[uint16] = None if error is not None: error_int = uint16(error.code.value) results.append(PreValidationResult(error_int, required_iters, npc_result)) except Exception: error_stack = traceback.format_exc() log.error(f"Exception: {error_stack}") results.append(PreValidationResult(uint16(Err.UNKNOWN.value), None, None)) elif header_blocks_pickled is not None: for i in range(len(header_blocks_pickled)): try: header_block = HeaderBlock.from_bytes(header_blocks_pickled[i]) required_iters, error = validate_finished_header_block( constants, BlockCache(blocks), header_block, check_filter, expected_difficulty[i], expected_sub_slot_iters[i], ) error_int = None if error is not None: error_int = uint16(error.code.value) results.append(PreValidationResult(error_int, required_iters, None)) except Exception: error_stack = traceback.format_exc() log.error(f"Exception: {error_stack}") results.append(PreValidationResult(uint16(Err.UNKNOWN.value), None, None)) return [bytes(r) for r in results] async def pre_validate_blocks_multiprocessing( constants: ConsensusConstants, constants_json: Dict, block_records: BlockchainInterface, blocks: Sequence[Union[FullBlock, HeaderBlock]], pool: ProcessPoolExecutor, check_filter: bool, npc_results: Dict[uint32, NPCResult], get_block_generator: Optional[Callable], batch_size: int, ) -> Optional[List[PreValidationResult]]: """ This method must be called under the blockchain lock If all the full blocks pass pre-validation, (only validates header), returns the list of required iters. if any validation issue occurs, returns False. Args: check_filter: constants_json: pool: constants: block_records: blocks: list of full blocks to validate (must be connected to current chain) npc_results get_block_generator """ prev_b: Optional[BlockRecord] = None # Collects all the recent blocks (up to the previous sub-epoch) recent_blocks: Dict[bytes32, BlockRecord] = {} recent_blocks_compressed: Dict[bytes32, BlockRecord] = {} num_sub_slots_found = 0 num_blocks_seen = 0 if blocks[0].height > 0: if not block_records.contains_block(blocks[0].prev_header_hash): return [PreValidationResult(uint16(Err.INVALID_PREV_BLOCK_HASH.value), None, None)] curr = block_records.block_record(blocks[0].prev_header_hash) num_sub_slots_to_look_for = 3 if curr.overflow else 2 while ( curr.sub_epoch_summary_included is None or num_blocks_seen < constants.NUMBER_OF_TIMESTAMPS or num_sub_slots_found < num_sub_slots_to_look_for ) and curr.height > 0: if num_blocks_seen < constants.NUMBER_OF_TIMESTAMPS or num_sub_slots_found < num_sub_slots_to_look_for: recent_blocks_compressed[curr.header_hash] = curr if curr.first_in_sub_slot: assert curr.finished_challenge_slot_hashes is not None num_sub_slots_found += len(curr.finished_challenge_slot_hashes) recent_blocks[curr.header_hash] = curr if curr.is_transaction_block: num_blocks_seen += 1 curr = block_records.block_record(curr.prev_hash) recent_blocks[curr.header_hash] = curr recent_blocks_compressed[curr.header_hash] = curr block_record_was_present = [] for block in blocks: block_record_was_present.append(block_records.contains_block(block.header_hash)) diff_ssis: List[Tuple[uint64, uint64]] = [] for block in blocks: if block.height != 0: assert block_records.contains_block(block.prev_header_hash) if prev_b is None: prev_b = block_records.block_record(block.prev_header_hash) sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty( constants, len(block.finished_sub_slots) > 0, prev_b, block_records ) overflow = is_overflow_block(constants, block.reward_chain_block.signage_point_index) challenge = get_block_challenge(constants, block, BlockCache(recent_blocks), prev_b is None, overflow, False) if block.reward_chain_block.challenge_chain_sp_vdf is None: cc_sp_hash: bytes32 = challenge else: cc_sp_hash = block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash() q_str: Optional[bytes32] = block.reward_chain_block.proof_of_space.verify_and_get_quality_string( constants, challenge, cc_sp_hash ) if q_str is None: for i, block_i in enumerate(blocks): if not block_record_was_present[i] and block_records.contains_block(block_i.header_hash): block_records.remove_block_record(block_i.header_hash) return None required_iters: uint64 = calculate_iterations_quality( constants.DIFFICULTY_CONSTANT_FACTOR, q_str, block.reward_chain_block.proof_of_space.size, difficulty, cc_sp_hash, ) block_rec = block_to_block_record( constants, block_records, required_iters, block, None, ) # Makes sure to not override the valid blocks already in block_records if not block_records.contains_block(block_rec.header_hash): block_records.add_block_record(block_rec) # Temporarily add block to dict recent_blocks[block_rec.header_hash] = block_rec recent_blocks_compressed[block_rec.header_hash] = block_rec else: recent_blocks[block_rec.header_hash] = block_records.block_record(block_rec.header_hash) recent_blocks_compressed[block_rec.header_hash] = block_records.block_record(block_rec.header_hash) prev_b = block_rec diff_ssis.append((difficulty, sub_slot_iters)) block_dict: Dict[bytes32, Union[FullBlock, HeaderBlock]] = {} for i, block in enumerate(blocks): block_dict[block.header_hash] = block if not block_record_was_present[i]: block_records.remove_block_record(block.header_hash) recent_sb_compressed_pickled = {bytes(k): bytes(v) for k, v in recent_blocks_compressed.items()} npc_results_pickled = {} for k, v in npc_results.items(): npc_results_pickled[k] = bytes(v) futures = [] # Pool of workers to validate blocks concurrently for i in range(0, len(blocks), batch_size): end_i = min(i + batch_size, len(blocks)) blocks_to_validate = blocks[i:end_i] if any([len(block.finished_sub_slots) > 0 for block in blocks_to_validate]): final_pickled = {bytes(k): bytes(v) for k, v in recent_blocks.items()} else: final_pickled = recent_sb_compressed_pickled b_pickled: Optional[List[bytes]] = None hb_pickled: Optional[List[bytes]] = None previous_generators: List[Optional[bytes]] = [] for block in blocks_to_validate: # We ONLY add blocks which are in the past, based on header hashes (which are validated later) to the # prev blocks dict. This is important since these blocks are assumed to be valid and are used as previous # generator references prev_blocks_dict: Dict[uint32, Union[FullBlock, HeaderBlock]] = {} curr_b: Union[FullBlock, HeaderBlock] = block while curr_b.prev_header_hash in block_dict: curr_b = block_dict[curr_b.prev_header_hash] prev_blocks_dict[curr_b.header_hash] = curr_b if isinstance(block, FullBlock): assert get_block_generator is not None if b_pickled is None: b_pickled = [] b_pickled.append(bytes(block)) try: block_generator: Optional[BlockGenerator] = await get_block_generator(block, prev_blocks_dict) except ValueError: return None if block_generator is not None: previous_generators.append(bytes(block_generator)) else: previous_generators.append(None) else: if hb_pickled is None: hb_pickled = [] hb_pickled.append(bytes(block)) futures.append( asyncio.get_running_loop().run_in_executor( pool, batch_pre_validate_blocks, constants_json, final_pickled, b_pickled, hb_pickled, previous_generators, npc_results_pickled, check_filter, [diff_ssis[j][0] for j in range(i, end_i)], [diff_ssis[j][1] for j in range(i, end_i)], ) ) # Collect all results into one flat list return [ PreValidationResult.from_bytes(result) for batch_result in (await asyncio.gather(*futures)) for result in batch_result ]