import asyncio import dataclasses import logging import math import random from concurrent.futures.process import ProcessPoolExecutor from typing import Dict, List, Optional, Tuple from chia.consensus.block_header_validation import validate_finished_header_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain_interface import BlockchainInterface from chia.consensus.constants import ConsensusConstants from chia.consensus.deficit import calculate_deficit from chia.consensus.full_block_to_block_record import header_block_to_sub_block_record from chia.consensus.pot_iterations import ( calculate_ip_iters, calculate_iterations_quality, calculate_sp_iters, is_overflow_block, ) from chia.consensus.vdf_info_computation import get_signage_point_vdf_info from chia.types.blockchain_format.classgroup import ClassgroupElement from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.blockchain_format.slots import ChallengeChainSubSlot, RewardChainSubSlot from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary from chia.types.blockchain_format.vdf import VDFInfo from chia.types.end_of_slot_bundle import EndOfSubSlotBundle from chia.types.header_block import HeaderBlock from chia.types.weight_proof import ( SubEpochChallengeSegment, SubEpochData, SubSlotData, WeightProof, SubEpochSegments, RecentChainData, ) from chia.util.block_cache import BlockCache from chia.util.hash import std_hash from chia.util.ints import uint8, uint32, uint64, uint128 from chia.util.streamable import dataclass_from_dict, recurse_jsonify log = logging.getLogger(__name__) class WeightProofHandler: LAMBDA_L = 100 C = 0.5 MAX_SAMPLES = 20 def __init__( self, constants: ConsensusConstants, blockchain: BlockchainInterface, ): self.tip: Optional[bytes32] = None self.proof: Optional[WeightProof] = None self.constants = constants self.blockchain = blockchain self.lock = asyncio.Lock() async def get_proof_of_weight(self, tip: bytes32) -> Optional[WeightProof]: tip_rec = self.blockchain.try_block_record(tip) if tip_rec is None: log.error("unknown tip") return None if tip_rec.height < self.constants.WEIGHT_PROOF_RECENT_BLOCKS: log.debug("chain to short for weight proof") return None async with self.lock: if self.proof is not None: if self.proof.recent_chain_data[-1].header_hash == tip: return self.proof wp = await self._create_proof_of_weight(tip) if wp is None: return None self.proof = wp self.tip = tip return wp def get_sub_epoch_data(self, tip_height: uint32, summary_heights: List[uint32]) -> List[SubEpochData]: sub_epoch_data: List[SubEpochData] = [] for sub_epoch_n, ses_height in enumerate(summary_heights): if ses_height > tip_height: break ses = self.blockchain.get_ses(ses_height) log.debug(f"handle sub epoch summary {sub_epoch_n} at height: {ses_height} ses {ses}") sub_epoch_data.append(_create_sub_epoch_data(ses)) return sub_epoch_data async def _create_proof_of_weight(self, tip: bytes32) -> Optional[WeightProof]: """ Creates a weight proof object """ assert self.blockchain is not None sub_epoch_segments: List[SubEpochChallengeSegment] = [] tip_rec = self.blockchain.try_block_record(tip) if tip_rec is None: log.error("failed not tip in cache") return None log.info(f"create weight proof peak {tip} {tip_rec.height}") recent_chain = await self._get_recent_chain(tip_rec.height) if recent_chain is None: return None summary_heights = self.blockchain.get_ses_heights() prev_ses_block = await self.blockchain.get_block_record_from_db(self.blockchain.height_to_hash(uint32(0))) if prev_ses_block is None: return None sub_epoch_data = self.get_sub_epoch_data(tip_rec.height, summary_heights) # use second to last ses as seed seed = self.get_seed_for_proof(summary_heights, tip_rec.height) rng = random.Random(seed) weight_to_check = _get_weights_for_sampling(rng, tip_rec.weight, recent_chain) sample_n = 0 ses_blocks = await self.blockchain.get_block_records_at(summary_heights) if ses_blocks is None: return None for sub_epoch_n, ses_height in enumerate(summary_heights): if ses_height > tip_rec.height: break # if we have enough sub_epoch samples, dont sample if sample_n >= self.MAX_SAMPLES: log.debug("reached sampled sub epoch cap") break # sample sub epoch # next sub block ses_block = ses_blocks[sub_epoch_n] if ses_block is None or ses_block.sub_epoch_summary_included is None: log.error("error while building proof") return None if _sample_sub_epoch(prev_ses_block.weight, ses_block.weight, weight_to_check): # type: ignore sample_n += 1 segments = await self.blockchain.get_sub_epoch_challenge_segments(ses_block.header_hash) if segments is None: segments = await self.__create_sub_epoch_segments(ses_block, prev_ses_block, uint32(sub_epoch_n)) if segments is None: log.error( f"failed while building segments for sub epoch {sub_epoch_n}, ses height {ses_height} " ) return None await self.blockchain.persist_sub_epoch_challenge_segments(ses_block.header_hash, segments) log.debug(f"sub epoch {sub_epoch_n} has {len(segments)} segments") sub_epoch_segments.extend(segments) prev_ses_block = ses_block log.debug(f"sub_epochs: {len(sub_epoch_data)}") return WeightProof(sub_epoch_data, sub_epoch_segments, recent_chain) def get_seed_for_proof(self, summary_heights: List[uint32], tip_height) -> bytes32: count = 0 ses = None for sub_epoch_n, ses_height in enumerate(reversed(summary_heights)): if ses_height <= tip_height: count += 1 if count == 2: ses = self.blockchain.get_ses(ses_height) break assert ses is not None seed = ses.get_hash() return seed async def _get_recent_chain(self, tip_height: uint32) -> Optional[List[HeaderBlock]]: recent_chain: List[HeaderBlock] = [] ses_heights = self.blockchain.get_ses_heights() min_height = 0 count_ses = 0 for ses_height in reversed(ses_heights): if ses_height <= tip_height: count_ses += 1 if count_ses == 2: min_height = ses_height - 1 break log.debug(f"start {min_height} end {tip_height}") headers = await self.blockchain.get_header_blocks_in_range(min_height, tip_height, tx_filter=False) blocks = await self.blockchain.get_block_records_in_range(min_height, tip_height) ses_count = 0 curr_height = tip_height blocks_n = 0 while ses_count < 2: if curr_height == 0: break # add to needed reward chain recent blocks header_block = headers[self.blockchain.height_to_hash(curr_height)] block_rec = blocks[header_block.header_hash] if header_block is None: log.error("creating recent chain failed") return None recent_chain.insert(0, header_block) if block_rec.sub_epoch_summary_included: ses_count += 1 curr_height = uint32(curr_height - 1) # type: ignore blocks_n += 1 header_block = headers[self.blockchain.height_to_hash(curr_height)] recent_chain.insert(0, header_block) log.info( f"recent chain, " f"start: {recent_chain[0].reward_chain_block.height} " f"end: {recent_chain[-1].reward_chain_block.height} " ) return recent_chain async def create_prev_sub_epoch_segments(self): log.debug("create prev sub_epoch_segments") heights = self.blockchain.get_ses_heights() if len(heights) < 3: return None count = len(heights) - 2 ses_sub_block = self.blockchain.height_to_block_record(heights[-2]) prev_ses_sub_block = self.blockchain.height_to_block_record(heights[-3]) assert prev_ses_sub_block.sub_epoch_summary_included is not None segments = await self.__create_sub_epoch_segments(ses_sub_block, prev_ses_sub_block, uint32(count)) assert segments is not None await self.blockchain.persist_sub_epoch_challenge_segments(ses_sub_block.header_hash, segments) log.debug("sub_epoch_segments done") return None async def create_sub_epoch_segments(self): log.debug("check segments in db") """ Creates a weight proof object """ assert self.blockchain is not None peak_height = self.blockchain.get_peak_height() if peak_height is None: log.error("no peak yet") return None summary_heights = self.blockchain.get_ses_heights() prev_ses_block = await self.blockchain.get_block_record_from_db(self.blockchain.height_to_hash(uint32(0))) if prev_ses_block is None: return None ses_blocks = await self.blockchain.get_block_records_at(summary_heights) if ses_blocks is None: return None for sub_epoch_n, ses_height in enumerate(summary_heights): log.debug(f"check db for sub epoch {sub_epoch_n}") if ses_height > peak_height: break ses_block = ses_blocks[sub_epoch_n] if ses_block is None or ses_block.sub_epoch_summary_included is None: log.error("error while building proof") return None await self.__create_persist_segment(prev_ses_block, ses_block, ses_height, sub_epoch_n) prev_ses_block = ses_block await asyncio.sleep(2) log.debug("done checking segments") return None async def __create_persist_segment(self, prev_ses_block, ses_block, ses_height, sub_epoch_n): segments = await self.blockchain.get_sub_epoch_challenge_segments(ses_block.header_hash) if segments is None: segments = await self.__create_sub_epoch_segments(ses_block, prev_ses_block, uint32(sub_epoch_n)) if segments is None: log.error(f"failed while building segments for sub epoch {sub_epoch_n}, ses height {ses_height} ") return None await self.blockchain.persist_sub_epoch_challenge_segments(ses_block.header_hash, segments) async def __create_sub_epoch_segments( self, ses_block: BlockRecord, se_start: BlockRecord, sub_epoch_n: uint32 ) -> Optional[List[SubEpochChallengeSegment]]: segments: List[SubEpochChallengeSegment] = [] start_height = await self.get_prev_two_slots_height(se_start) blocks = await self.blockchain.get_block_records_in_range( start_height, ses_block.height + self.constants.MAX_SUB_SLOT_BLOCKS ) header_blocks = await self.blockchain.get_header_blocks_in_range( start_height, ses_block.height + self.constants.MAX_SUB_SLOT_BLOCKS, tx_filter=False ) curr: Optional[HeaderBlock] = header_blocks[se_start.header_hash] height = se_start.height assert curr is not None first = True idx = 0 while curr.height < ses_block.height: if blocks[curr.header_hash].is_challenge_block(self.constants): log.debug(f"challenge segment {idx}, starts at {curr.height} ") seg, height = await self._create_challenge_segment(curr, sub_epoch_n, header_blocks, blocks, first) if seg is None: log.error(f"failed creating segment {curr.header_hash} ") return None segments.append(seg) idx += 1 first = False else: height = height + uint32(1) # type: ignore curr = header_blocks[self.blockchain.height_to_hash(height)] if curr is None: return None log.debug(f"next sub epoch starts at {height}") return segments async def get_prev_two_slots_height(self, se_start: BlockRecord) -> uint32: # find prev 2 slots height slot = 0 batch_size = 50 curr_rec = se_start blocks = await self.blockchain.get_block_records_in_range(curr_rec.height - batch_size, curr_rec.height) end = curr_rec.height while slot < 2 and curr_rec.height > 0: if curr_rec.first_in_sub_slot: slot += 1 if end - curr_rec.height == batch_size - 1: blocks = await self.blockchain.get_block_records_in_range(curr_rec.height - batch_size, curr_rec.height) end = curr_rec.height curr_rec = blocks[self.blockchain.height_to_hash(uint32(curr_rec.height - 1))] return curr_rec.height async def _create_challenge_segment( self, header_block: HeaderBlock, sub_epoch_n: uint32, header_blocks: Dict[bytes32, HeaderBlock], blocks: Dict[bytes32, BlockRecord], first_segment_in_sub_epoch: bool, ) -> Tuple[Optional[SubEpochChallengeSegment], uint32]: assert self.blockchain is not None sub_slots: List[SubSlotData] = [] log.debug(f"create challenge segment block {header_block.header_hash} block height {header_block.height} ") # VDFs from sub slots before challenge block first_sub_slots, first_rc_end_of_slot_vdf = await self.__first_sub_slot_vdfs( header_block, header_blocks, blocks, first_segment_in_sub_epoch ) if first_sub_slots is None: log.error("failed building first sub slots") return None, uint32(0) sub_slots.extend(first_sub_slots) ssd = await _challenge_block_vdfs( self.constants, header_block, blocks[header_block.header_hash], blocks, ) sub_slots.append(ssd) # # VDFs from slot after challenge block to end of slot log.debug(f"create slot end vdf for block {header_block.header_hash} height {header_block.height} ") challenge_slot_end_sub_slots, end_height = await self.__slot_end_vdf( uint32(header_block.height + 1), header_blocks, blocks ) if challenge_slot_end_sub_slots is None: log.error("failed building slot end ") return None, uint32(0) sub_slots.extend(challenge_slot_end_sub_slots) if first_segment_in_sub_epoch and sub_epoch_n != 0: return ( SubEpochChallengeSegment(sub_epoch_n, sub_slots, first_rc_end_of_slot_vdf), end_height, ) return SubEpochChallengeSegment(sub_epoch_n, sub_slots, None), end_height # returns a challenge chain vdf from slot start to signage point async def __first_sub_slot_vdfs( self, header_block: HeaderBlock, header_blocks: Dict[bytes32, HeaderBlock], blocks: Dict[bytes32, BlockRecord], first_in_sub_epoch: bool, ) -> Tuple[Optional[List[SubSlotData]], Optional[VDFInfo]]: # combine cc vdfs of all reward blocks from the start of the sub slot to end header_block_sub_rec = blocks[header_block.header_hash] # find slot start curr_sub_rec = header_block_sub_rec first_rc_end_of_slot_vdf = None if first_in_sub_epoch and curr_sub_rec.height > 0: while not curr_sub_rec.sub_epoch_summary_included: curr_sub_rec = blocks[curr_sub_rec.prev_hash] first_rc_end_of_slot_vdf = self.first_rc_end_of_slot_vdf(header_block, blocks, header_blocks) else: if header_block_sub_rec.overflow and header_block_sub_rec.first_in_sub_slot: sub_slots_num = 2 while sub_slots_num > 0 and curr_sub_rec.height > 0: if curr_sub_rec.first_in_sub_slot: assert curr_sub_rec.finished_challenge_slot_hashes is not None sub_slots_num -= len(curr_sub_rec.finished_challenge_slot_hashes) curr_sub_rec = blocks[curr_sub_rec.prev_hash] else: while not curr_sub_rec.first_in_sub_slot and curr_sub_rec.height > 0: curr_sub_rec = blocks[curr_sub_rec.prev_hash] curr = header_blocks[curr_sub_rec.header_hash] sub_slots_data: List[SubSlotData] = [] tmp_sub_slots_data: List[SubSlotData] = [] while curr.height < header_block.height: if curr is None: log.error("failed fetching block") return None, None if curr.first_in_sub_slot: # if not blue boxed if not blue_boxed_end_of_slot(curr.finished_sub_slots[0]): sub_slots_data.extend(tmp_sub_slots_data) for idx, sub_slot in enumerate(curr.finished_sub_slots): curr_icc_info = None if sub_slot.infused_challenge_chain is not None: curr_icc_info = sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf sub_slots_data.append(handle_finished_slots(sub_slot, curr_icc_info)) tmp_sub_slots_data = [] ssd = SubSlotData( None, None, None, None, None, curr.reward_chain_block.signage_point_index, None, None, None, None, curr.reward_chain_block.challenge_chain_ip_vdf, curr.reward_chain_block.infused_challenge_chain_ip_vdf, curr.total_iters, ) tmp_sub_slots_data.append(ssd) curr = header_blocks[self.blockchain.height_to_hash(uint32(curr.height + 1))] if len(tmp_sub_slots_data) > 0: sub_slots_data.extend(tmp_sub_slots_data) for idx, sub_slot in enumerate(header_block.finished_sub_slots): curr_icc_info = None if sub_slot.infused_challenge_chain is not None: curr_icc_info = sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf sub_slots_data.append(handle_finished_slots(sub_slot, curr_icc_info)) return sub_slots_data, first_rc_end_of_slot_vdf def first_rc_end_of_slot_vdf( self, header_block, blocks: Dict[bytes32, BlockRecord], header_blocks: Dict[bytes32, HeaderBlock], ) -> Optional[VDFInfo]: curr = blocks[header_block.header_hash] while curr.height > 0 and not curr.sub_epoch_summary_included: curr = blocks[curr.prev_hash] return header_blocks[curr.header_hash].finished_sub_slots[-1].reward_chain.end_of_slot_vdf async def __slot_end_vdf( self, start_height: uint32, header_blocks: Dict[bytes32, HeaderBlock], blocks: Dict[bytes32, BlockRecord] ) -> Tuple[Optional[List[SubSlotData]], uint32]: # gets all vdfs first sub slot after challenge block to last sub slot log.debug(f"slot end vdf start height {start_height}") curr = header_blocks[self.blockchain.height_to_hash(start_height)] curr_header_hash = curr.header_hash sub_slots_data: List[SubSlotData] = [] tmp_sub_slots_data: List[SubSlotData] = [] while not blocks[curr_header_hash].is_challenge_block(self.constants): if curr.first_in_sub_slot: sub_slots_data.extend(tmp_sub_slots_data) curr_prev_header_hash = curr.prev_header_hash # add collected vdfs for idx, sub_slot in enumerate(curr.finished_sub_slots): prev_rec = blocks[curr_prev_header_hash] eos_vdf_iters = prev_rec.sub_slot_iters if idx == 0: eos_vdf_iters = uint64(prev_rec.sub_slot_iters - prev_rec.ip_iters(self.constants)) sub_slots_data.append(handle_end_of_slot(sub_slot, eos_vdf_iters)) tmp_sub_slots_data = [] tmp_sub_slots_data.append(self.handle_block_vdfs(curr, blocks)) curr = header_blocks[self.blockchain.height_to_hash(uint32(curr.height + 1))] curr_header_hash = curr.header_hash if len(tmp_sub_slots_data) > 0: sub_slots_data.extend(tmp_sub_slots_data) log.debug(f"slot end vdf end height {curr.height} slots {len(sub_slots_data)} ") return sub_slots_data, curr.height def handle_block_vdfs(self, curr: HeaderBlock, blocks: Dict[bytes32, BlockRecord]): cc_sp_proof = None icc_ip_proof = None cc_sp_info = None icc_ip_info = None block_record = blocks[curr.header_hash] if curr.infused_challenge_chain_ip_proof is not None: assert curr.reward_chain_block.infused_challenge_chain_ip_vdf icc_ip_proof = curr.infused_challenge_chain_ip_proof icc_ip_info = curr.reward_chain_block.infused_challenge_chain_ip_vdf if curr.challenge_chain_sp_proof is not None: assert curr.reward_chain_block.challenge_chain_sp_vdf cc_sp_vdf_info = curr.reward_chain_block.challenge_chain_sp_vdf if not curr.challenge_chain_sp_proof.normalized_to_identity: (_, _, _, _, cc_vdf_iters, _,) = get_signage_point_vdf_info( self.constants, curr.finished_sub_slots, block_record.overflow, None if curr.height == 0 else blocks[curr.prev_header_hash], BlockCache(blocks), block_record.sp_total_iters(self.constants), block_record.sp_iters(self.constants), ) cc_sp_vdf_info = VDFInfo( curr.reward_chain_block.challenge_chain_sp_vdf.challenge, cc_vdf_iters, curr.reward_chain_block.challenge_chain_sp_vdf.output, ) cc_sp_proof = curr.challenge_chain_sp_proof cc_sp_info = cc_sp_vdf_info return SubSlotData( None, cc_sp_proof, curr.challenge_chain_ip_proof, icc_ip_proof, cc_sp_info, curr.reward_chain_block.signage_point_index, None, None, None, None, curr.reward_chain_block.challenge_chain_ip_vdf, icc_ip_info, curr.total_iters, ) def validate_weight_proof_single_proc(self, weight_proof: WeightProof) -> Tuple[bool, uint32]: assert self.blockchain is not None assert len(weight_proof.sub_epochs) > 0 if len(weight_proof.sub_epochs) == 0: return False, uint32(0) peak_height = weight_proof.recent_chain_data[-1].reward_chain_block.height log.info(f"validate weight proof peak height {peak_height}") summaries, sub_epoch_weight_list = _validate_sub_epoch_summaries(self.constants, weight_proof) if summaries is None: log.warning("weight proof failed sub epoch data validation") return False, uint32(0) constants, summary_bytes, wp_segment_bytes, wp_recent_chain_bytes = vars_to_bytes( self.constants, summaries, weight_proof ) log.info("validate sub epoch challenge segments") seed = summaries[-2].get_hash() rng = random.Random(seed) if not validate_sub_epoch_sampling(rng, sub_epoch_weight_list, weight_proof): log.error("failed weight proof sub epoch sample validation") return False, uint32(0) if not _validate_sub_epoch_segments(constants, rng, wp_segment_bytes, summary_bytes): return False, uint32(0) log.info("validate weight proof recent blocks") if not _validate_recent_blocks(constants, wp_recent_chain_bytes, summary_bytes): return False, uint32(0) return True, self.get_fork_point(summaries) def get_fork_point_no_validations(self, weight_proof: WeightProof) -> Tuple[bool, uint32]: log.debug("get fork point skip validations") assert self.blockchain is not None assert len(weight_proof.sub_epochs) > 0 if len(weight_proof.sub_epochs) == 0: return False, uint32(0) summaries, sub_epoch_weight_list = _validate_sub_epoch_summaries(self.constants, weight_proof) if summaries is None: log.warning("weight proof failed to validate sub epoch summaries") return False, uint32(0) return True, self.get_fork_point(summaries) async def validate_weight_proof(self, weight_proof: WeightProof) -> Tuple[bool, uint32, List[SubEpochSummary]]: assert self.blockchain is not None assert len(weight_proof.sub_epochs) > 0 if len(weight_proof.sub_epochs) == 0: return False, uint32(0), [] peak_height = weight_proof.recent_chain_data[-1].reward_chain_block.height log.info(f"validate weight proof peak height {peak_height}") summaries, sub_epoch_weight_list = _validate_sub_epoch_summaries(self.constants, weight_proof) if summaries is None: log.error("weight proof failed sub epoch data validation") return False, uint32(0), [] seed = summaries[-2].get_hash() rng = random.Random(seed) if not validate_sub_epoch_sampling(rng, sub_epoch_weight_list, weight_proof): log.error("failed weight proof sub epoch sample validation") return False, uint32(0), [] executor = ProcessPoolExecutor(1) constants, summary_bytes, wp_segment_bytes, wp_recent_chain_bytes = vars_to_bytes( self.constants, summaries, weight_proof ) segment_validation_task = asyncio.get_running_loop().run_in_executor( executor, _validate_sub_epoch_segments, constants, rng, wp_segment_bytes, summary_bytes ) recent_blocks_validation_task = asyncio.get_running_loop().run_in_executor( executor, _validate_recent_blocks, constants, wp_recent_chain_bytes, summary_bytes ) valid_segment_task = segment_validation_task valid_recent_blocks_task = recent_blocks_validation_task valid_recent_blocks = await valid_recent_blocks_task if not valid_recent_blocks: log.error("failed validating weight proof recent blocks") return False, uint32(0), [] valid_segments = await valid_segment_task if not valid_segments: log.error("failed validating weight proof sub epoch segments") return False, uint32(0), [] return True, self.get_fork_point(summaries), summaries def get_fork_point(self, received_summaries: List[SubEpochSummary]) -> uint32: # iterate through sub epoch summaries to find fork point fork_point_index = 0 ses_heights = self.blockchain.get_ses_heights() for idx, summary_height in enumerate(ses_heights): log.debug(f"check summary {idx} height {summary_height}") local_ses = self.blockchain.get_ses(summary_height) if idx == len(received_summaries) - 1: # end of wp summaries, local chain is longer or equal to wp chain break if local_ses is None or local_ses.get_hash() != received_summaries[idx].get_hash(): break fork_point_index = idx if fork_point_index > 2: # Two summeries can have different blocks and still be identical # This gets resolved after one full sub epoch height = ses_heights[fork_point_index - 2] else: height = uint32(0) return height def _get_weights_for_sampling( rng: random.Random, total_weight: uint128, recent_chain: List[HeaderBlock] ) -> Optional[List[uint128]]: weight_to_check = [] last_l_weight = recent_chain[-1].reward_chain_block.weight - recent_chain[0].reward_chain_block.weight delta = last_l_weight / total_weight prob_of_adv_succeeding = 1 - math.log(WeightProofHandler.C, delta) if prob_of_adv_succeeding <= 0: return None queries = -WeightProofHandler.LAMBDA_L * math.log(2, prob_of_adv_succeeding) for i in range(int(queries) + 1): u = rng.random() q = 1 - delta ** u # todo check division and type conversions weight = q * float(total_weight) weight_to_check.append(uint128(weight)) weight_to_check.sort() return weight_to_check def _sample_sub_epoch( start_of_epoch_weight: uint128, end_of_epoch_weight: uint128, weight_to_check: List[uint128], ) -> bool: """ weight_to_check: List[uint128] is expected to be sorted """ if weight_to_check is None: return True if weight_to_check[-1] < start_of_epoch_weight: return False if weight_to_check[0] > end_of_epoch_weight: return False choose = False for weight in weight_to_check: if weight > end_of_epoch_weight: return False if start_of_epoch_weight < weight < end_of_epoch_weight: log.debug(f"start weight: {start_of_epoch_weight}") log.debug(f"weight to check {weight}") log.debug(f"end weight: {end_of_epoch_weight}") choose = True break return choose # wp creation methods def _create_sub_epoch_data( sub_epoch_summary: SubEpochSummary, ) -> SubEpochData: reward_chain_hash: bytes32 = sub_epoch_summary.reward_chain_hash # Number of subblocks overflow in previous slot previous_sub_epoch_overflows: uint8 = sub_epoch_summary.num_blocks_overflow # total in sub epoch - expected # New work difficulty and iterations per sub-slot sub_slot_iters: Optional[uint64] = sub_epoch_summary.new_sub_slot_iters new_difficulty: Optional[uint64] = sub_epoch_summary.new_difficulty return SubEpochData(reward_chain_hash, previous_sub_epoch_overflows, sub_slot_iters, new_difficulty) async def _challenge_block_vdfs( constants: ConsensusConstants, header_block: HeaderBlock, block_rec: BlockRecord, sub_blocks: Dict[bytes32, BlockRecord], ): (_, _, _, _, cc_vdf_iters, _,) = get_signage_point_vdf_info( constants, header_block.finished_sub_slots, block_rec.overflow, None if header_block.height == 0 else sub_blocks[header_block.prev_header_hash], BlockCache(sub_blocks), block_rec.sp_total_iters(constants), block_rec.sp_iters(constants), ) cc_sp_info = None if header_block.reward_chain_block.challenge_chain_sp_vdf: cc_sp_info = header_block.reward_chain_block.challenge_chain_sp_vdf assert header_block.challenge_chain_sp_proof if not header_block.challenge_chain_sp_proof.normalized_to_identity: cc_sp_info = VDFInfo( header_block.reward_chain_block.challenge_chain_sp_vdf.challenge, cc_vdf_iters, header_block.reward_chain_block.challenge_chain_sp_vdf.output, ) ssd = SubSlotData( header_block.reward_chain_block.proof_of_space, header_block.challenge_chain_sp_proof, header_block.challenge_chain_ip_proof, None, cc_sp_info, header_block.reward_chain_block.signage_point_index, None, None, None, None, header_block.reward_chain_block.challenge_chain_ip_vdf, header_block.reward_chain_block.infused_challenge_chain_ip_vdf, block_rec.total_iters, ) return ssd def handle_finished_slots(end_of_slot: EndOfSubSlotBundle, icc_end_of_slot_info): return SubSlotData( None, None, None, None, None, None, None if end_of_slot.proofs.challenge_chain_slot_proof is None else end_of_slot.proofs.challenge_chain_slot_proof, None if end_of_slot.proofs.infused_challenge_chain_slot_proof is None else end_of_slot.proofs.infused_challenge_chain_slot_proof, end_of_slot.challenge_chain.challenge_chain_end_of_slot_vdf, icc_end_of_slot_info, None, None, None, ) def handle_end_of_slot( sub_slot: EndOfSubSlotBundle, eos_vdf_iters: uint64, ): assert sub_slot.infused_challenge_chain assert sub_slot.proofs.infused_challenge_chain_slot_proof if sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity: icc_info = sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf else: icc_info = VDFInfo( sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf.challenge, eos_vdf_iters, sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf.output, ) if sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity: cc_info = sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf else: cc_info = VDFInfo( sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf.challenge, eos_vdf_iters, sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf.output, ) assert sub_slot.proofs.infused_challenge_chain_slot_proof is not None return SubSlotData( None, None, None, None, None, None, sub_slot.proofs.challenge_chain_slot_proof, sub_slot.proofs.infused_challenge_chain_slot_proof, cc_info, icc_info, None, None, None, ) def compress_segments(full_segment_index, segments: List[SubEpochChallengeSegment]) -> List[SubEpochChallengeSegment]: compressed_segments = [] compressed_segments.append(segments[0]) for idx, segment in enumerate(segments[1:]): if idx != full_segment_index: # remove all redundant values segment = compress_segment(segment) compressed_segments.append(segment) return compressed_segments def compress_segment(segment: SubEpochChallengeSegment) -> SubEpochChallengeSegment: # find challenge slot comp_seg = SubEpochChallengeSegment(segment.sub_epoch_n, [], segment.rc_slot_end_info) for slot in segment.sub_slots: comp_seg.sub_slots.append(slot) if slot.is_challenge(): break return segment # wp validation methods def _validate_sub_epoch_summaries( constants: ConsensusConstants, weight_proof: WeightProof, ) -> Tuple[Optional[List[SubEpochSummary]], Optional[List[uint128]]]: last_ses_hash, last_ses_sub_height = _get_last_ses_hash(constants, weight_proof.recent_chain_data) if last_ses_hash is None: log.warning("could not find last ses block") return None, None summaries, total, sub_epoch_weight_list = _map_sub_epoch_summaries( constants.SUB_EPOCH_BLOCKS, constants.GENESIS_CHALLENGE, weight_proof.sub_epochs, constants.DIFFICULTY_STARTING, ) log.info(f"validating {len(summaries)} sub epochs") # validate weight if not _validate_summaries_weight(constants, total, summaries, weight_proof): log.error("failed validating weight") return None, None last_ses = summaries[-1] log.debug(f"last ses sub height {last_ses_sub_height}") # validate last ses_hash if last_ses.get_hash() != last_ses_hash: log.error(f"failed to validate ses hashes block height {last_ses_sub_height}") return None, None return summaries, sub_epoch_weight_list def _map_sub_epoch_summaries( sub_blocks_for_se: uint32, ses_hash: bytes32, sub_epoch_data: List[SubEpochData], curr_difficulty: uint64, ) -> Tuple[List[SubEpochSummary], uint128, List[uint128]]: total_weight: uint128 = uint128(0) summaries: List[SubEpochSummary] = [] sub_epoch_weight_list: List[uint128] = [] for idx, data in enumerate(sub_epoch_data): ses = SubEpochSummary( ses_hash, data.reward_chain_hash, data.num_blocks_overflow, data.new_difficulty, data.new_sub_slot_iters, ) if idx < len(sub_epoch_data) - 1: delta = 0 if idx > 0: delta = sub_epoch_data[idx].num_blocks_overflow log.debug(f"sub epoch {idx} start weight is {total_weight+curr_difficulty} ") sub_epoch_weight_list.append(uint128(total_weight + curr_difficulty)) total_weight = total_weight + uint128( # type: ignore curr_difficulty * (sub_blocks_for_se + sub_epoch_data[idx + 1].num_blocks_overflow - delta) ) # if new epoch update diff and iters if data.new_difficulty is not None: curr_difficulty = data.new_difficulty # add to dict summaries.append(ses) ses_hash = std_hash(ses) # add last sub epoch weight sub_epoch_weight_list.append(uint128(total_weight + curr_difficulty)) return summaries, total_weight, sub_epoch_weight_list def _validate_summaries_weight(constants: ConsensusConstants, sub_epoch_data_weight, summaries, weight_proof) -> bool: num_over = summaries[-1].num_blocks_overflow ses_end_height = (len(summaries) - 1) * constants.SUB_EPOCH_BLOCKS + num_over - 1 curr = None for block in weight_proof.recent_chain_data: if block.reward_chain_block.height == ses_end_height: curr = block if curr is None: return False return curr.reward_chain_block.weight == sub_epoch_data_weight def _validate_sub_epoch_segments( constants_dict: Dict, rng: random.Random, weight_proof_bytes: bytes, summaries_bytes: List[bytes], ): constants, summaries = bytes_to_vars(constants_dict, summaries_bytes) sub_epoch_segments: SubEpochSegments = SubEpochSegments.from_bytes(weight_proof_bytes) rc_sub_slot_hash = constants.GENESIS_CHALLENGE total_blocks, total_ip_iters = 0, 0 total_slot_iters, total_slots = 0, 0 total_ip_iters = 0 prev_ses: Optional[SubEpochSummary] = None segments_by_sub_epoch = map_segments_by_sub_epoch(sub_epoch_segments.challenge_segments) curr_ssi = constants.SUB_SLOT_ITERS_STARTING for sub_epoch_n, segments in segments_by_sub_epoch.items(): prev_ssi = curr_ssi curr_difficulty, curr_ssi = _get_curr_diff_ssi(constants, sub_epoch_n, summaries) log.debug(f"validate sub epoch {sub_epoch_n}") # recreate RewardChainSubSlot for next ses rc_hash sampled_seg_index = rng.choice(range(len(segments))) if sub_epoch_n > 0: rc_sub_slot = __get_rc_sub_slot(constants, segments[0], summaries, curr_ssi) prev_ses = summaries[sub_epoch_n - 1] rc_sub_slot_hash = rc_sub_slot.get_hash() if not summaries[sub_epoch_n].reward_chain_hash == rc_sub_slot_hash: log.error(f"failed reward_chain_hash validation sub_epoch {sub_epoch_n}") return False for idx, segment in enumerate(segments): valid_segment, ip_iters, slot_iters, slots = _validate_segment( constants, segment, curr_ssi, prev_ssi, curr_difficulty, prev_ses, idx == 0, sampled_seg_index == idx ) if not valid_segment: log.error(f"failed to validate sub_epoch {segment.sub_epoch_n} segment {idx} slots") return False prev_ses = None total_blocks += 1 total_slot_iters += slot_iters total_slots += slots total_ip_iters += ip_iters return True def _validate_segment( constants: ConsensusConstants, segment: SubEpochChallengeSegment, curr_ssi: uint64, prev_ssi: uint64, curr_difficulty: uint64, ses: Optional[SubEpochSummary], first_segment_in_se: bool, sampled: bool, ) -> Tuple[bool, int, int, int]: ip_iters, slot_iters, slots = 0, 0, 0 after_challenge = False for idx, sub_slot_data in enumerate(segment.sub_slots): if sampled and sub_slot_data.is_challenge(): after_challenge = True required_iters = __validate_pospace(constants, segment, idx, curr_difficulty, ses, first_segment_in_se) if required_iters is None: return False, uint64(0), uint64(0), uint64(0) assert sub_slot_data.signage_point_index is not None ip_iters = ip_iters + calculate_ip_iters( # type: ignore constants, curr_ssi, sub_slot_data.signage_point_index, required_iters ) if not _validate_challenge_block_vdfs(constants, idx, segment.sub_slots, curr_ssi): log.error(f"failed to validate challenge slot {idx} vdfs") return False, uint64(0), uint64(0), uint64(0) elif sampled and after_challenge: if not _validate_sub_slot_data(constants, idx, segment.sub_slots, curr_ssi): log.error(f"failed to validate sub slot data {idx} vdfs") return False, uint64(0), uint64(0), uint64(0) slot_iters = slot_iters + curr_ssi # type: ignore slots = slots + uint64(1) # type: ignore return True, ip_iters, slot_iters, slots def _validate_challenge_block_vdfs( constants: ConsensusConstants, sub_slot_idx: int, sub_slots: List[SubSlotData], ssi: uint64, ) -> bool: sub_slot_data = sub_slots[sub_slot_idx] if sub_slot_data.cc_signage_point is not None and sub_slot_data.cc_sp_vdf_info: assert sub_slot_data.signage_point_index sp_input = ClassgroupElement.get_default_element() if not sub_slot_data.cc_signage_point.normalized_to_identity and sub_slot_idx >= 1: is_overflow = is_overflow_block(constants, sub_slot_data.signage_point_index) prev_ssd = sub_slots[sub_slot_idx - 1] sp_input = sub_slot_data_vdf_input( constants, sub_slot_data, sub_slot_idx, sub_slots, is_overflow, prev_ssd.is_end_of_slot(), ssi ) if not sub_slot_data.cc_signage_point.is_valid(constants, sp_input, sub_slot_data.cc_sp_vdf_info): log.error(f"failed to validate challenge chain signage point 2 {sub_slot_data.cc_sp_vdf_info}") return False assert sub_slot_data.cc_infusion_point assert sub_slot_data.cc_ip_vdf_info ip_input = ClassgroupElement.get_default_element() cc_ip_vdf_info = sub_slot_data.cc_ip_vdf_info if not sub_slot_data.cc_infusion_point.normalized_to_identity and sub_slot_idx >= 1: prev_ssd = sub_slots[sub_slot_idx - 1] if prev_ssd.cc_slot_end is None: assert prev_ssd.cc_ip_vdf_info assert prev_ssd.total_iters assert sub_slot_data.total_iters ip_input = prev_ssd.cc_ip_vdf_info.output ip_vdf_iters = uint64(sub_slot_data.total_iters - prev_ssd.total_iters) cc_ip_vdf_info = VDFInfo( sub_slot_data.cc_ip_vdf_info.challenge, ip_vdf_iters, sub_slot_data.cc_ip_vdf_info.output ) if not sub_slot_data.cc_infusion_point.is_valid(constants, ip_input, cc_ip_vdf_info): log.error(f"failed to validate challenge chain infusion point {sub_slot_data.cc_ip_vdf_info}") return False return True def _validate_sub_slot_data( constants: ConsensusConstants, sub_slot_idx: int, sub_slots: List[SubSlotData], ssi: uint64, ) -> bool: sub_slot_data = sub_slots[sub_slot_idx] assert sub_slot_idx > 0 prev_ssd = sub_slots[sub_slot_idx - 1] if sub_slot_data.is_end_of_slot(): if sub_slot_data.icc_slot_end is not None: input = ClassgroupElement.get_default_element() if not sub_slot_data.icc_slot_end.normalized_to_identity and prev_ssd.icc_ip_vdf_info is not None: assert prev_ssd.icc_ip_vdf_info input = prev_ssd.icc_ip_vdf_info.output assert sub_slot_data.icc_slot_end_info if not sub_slot_data.icc_slot_end.is_valid(constants, input, sub_slot_data.icc_slot_end_info, None): log.error(f"failed icc slot end validation {sub_slot_data.icc_slot_end_info} ") return False assert sub_slot_data.cc_slot_end_info assert sub_slot_data.cc_slot_end input = ClassgroupElement.get_default_element() if (not prev_ssd.is_end_of_slot()) and (not sub_slot_data.cc_slot_end.normalized_to_identity): assert prev_ssd.cc_ip_vdf_info input = prev_ssd.cc_ip_vdf_info.output if not sub_slot_data.cc_slot_end.is_valid(constants, input, sub_slot_data.cc_slot_end_info): log.error(f"failed cc slot end validation {sub_slot_data.cc_slot_end_info}") return False else: # find end of slot idx = sub_slot_idx while idx < len(sub_slots) - 1: curr_slot = sub_slots[idx] if curr_slot.is_end_of_slot(): # dont validate intermediate vdfs if slot is blue boxed assert curr_slot.cc_slot_end if curr_slot.cc_slot_end.normalized_to_identity is True: log.debug(f"skip intermediate vdfs slot {sub_slot_idx}") return True else: break idx += 1 if sub_slot_data.icc_infusion_point is not None and sub_slot_data.icc_ip_vdf_info is not None: input = ClassgroupElement.get_default_element() if not prev_ssd.is_challenge() and prev_ssd.icc_ip_vdf_info is not None: input = prev_ssd.icc_ip_vdf_info.output if not sub_slot_data.icc_infusion_point.is_valid(constants, input, sub_slot_data.icc_ip_vdf_info, None): log.error(f"failed icc infusion point vdf validation {sub_slot_data.icc_slot_end_info} ") return False assert sub_slot_data.signage_point_index is not None if sub_slot_data.cc_signage_point: assert sub_slot_data.cc_sp_vdf_info input = ClassgroupElement.get_default_element() if not sub_slot_data.cc_signage_point.normalized_to_identity: is_overflow = is_overflow_block(constants, sub_slot_data.signage_point_index) input = sub_slot_data_vdf_input( constants, sub_slot_data, sub_slot_idx, sub_slots, is_overflow, prev_ssd.is_end_of_slot(), ssi ) if not sub_slot_data.cc_signage_point.is_valid(constants, input, sub_slot_data.cc_sp_vdf_info): log.error(f"failed cc signage point vdf validation {sub_slot_data.cc_sp_vdf_info}") return False input = ClassgroupElement.get_default_element() assert sub_slot_data.cc_ip_vdf_info assert sub_slot_data.cc_infusion_point cc_ip_vdf_info = sub_slot_data.cc_ip_vdf_info if not sub_slot_data.cc_infusion_point.normalized_to_identity and prev_ssd.cc_slot_end is None: assert prev_ssd.cc_ip_vdf_info input = prev_ssd.cc_ip_vdf_info.output assert sub_slot_data.total_iters assert prev_ssd.total_iters ip_vdf_iters = uint64(sub_slot_data.total_iters - prev_ssd.total_iters) cc_ip_vdf_info = VDFInfo( sub_slot_data.cc_ip_vdf_info.challenge, ip_vdf_iters, sub_slot_data.cc_ip_vdf_info.output ) if not sub_slot_data.cc_infusion_point.is_valid(constants, input, cc_ip_vdf_info): log.error(f"failed cc infusion point vdf validation {sub_slot_data.cc_slot_end_info}") return False return True def sub_slot_data_vdf_input( constants: ConsensusConstants, sub_slot_data: SubSlotData, sub_slot_idx: int, sub_slots: List[SubSlotData], is_overflow: bool, new_sub_slot: bool, ssi: uint64, ) -> ClassgroupElement: cc_input = ClassgroupElement.get_default_element() sp_total_iters = get_sp_total_iters(constants, is_overflow, ssi, sub_slot_data) ssd: Optional[SubSlotData] = None if is_overflow and new_sub_slot: if sub_slot_idx >= 2: if sub_slots[sub_slot_idx - 2].cc_slot_end_info is None: for ssd_idx in reversed(range(0, sub_slot_idx - 1)): ssd = sub_slots[ssd_idx] if ssd.cc_slot_end_info is not None: ssd = sub_slots[ssd_idx + 1] break if not (ssd.total_iters > sp_total_iters): break if ssd and ssd.cc_ip_vdf_info is not None: if ssd.total_iters < sp_total_iters: cc_input = ssd.cc_ip_vdf_info.output return cc_input elif not is_overflow and not new_sub_slot: for ssd_idx in reversed(range(0, sub_slot_idx)): ssd = sub_slots[ssd_idx] if ssd.cc_slot_end_info is not None: ssd = sub_slots[ssd_idx + 1] break if not (ssd.total_iters > sp_total_iters): break assert ssd is not None if ssd.cc_ip_vdf_info is not None: if ssd.total_iters < sp_total_iters: cc_input = ssd.cc_ip_vdf_info.output return cc_input elif not new_sub_slot and is_overflow: slots_seen = 0 for ssd_idx in reversed(range(0, sub_slot_idx)): ssd = sub_slots[ssd_idx] if ssd.cc_slot_end_info is not None: slots_seen += 1 if slots_seen == 2: return ClassgroupElement.get_default_element() if ssd.cc_slot_end_info is None and not (ssd.total_iters > sp_total_iters): break assert ssd is not None if ssd.cc_ip_vdf_info is not None: if ssd.total_iters < sp_total_iters: cc_input = ssd.cc_ip_vdf_info.output return cc_input def _validate_recent_blocks(constants_dict: Dict, recent_chain_bytes: bytes, summaries_bytes: List[bytes]) -> bool: constants, summaries = bytes_to_vars(constants_dict, summaries_bytes) recent_chain: RecentChainData = RecentChainData.from_bytes(recent_chain_bytes) sub_blocks = BlockCache({}) first_ses_idx = _get_ses_idx(recent_chain.recent_chain_data) ses_idx = len(summaries) - len(first_ses_idx) ssi: uint64 = constants.SUB_SLOT_ITERS_STARTING diff: Optional[uint64] = constants.DIFFICULTY_STARTING last_blocks_to_validate = 100 # todo remove cap after benchmarks for summary in summaries[:ses_idx]: if summary.new_sub_slot_iters is not None: ssi = summary.new_sub_slot_iters if summary.new_difficulty is not None: diff = summary.new_difficulty ses_blocks, sub_slots, transaction_blocks = 0, 0, 0 challenge, prev_challenge = None, None tip_height = recent_chain.recent_chain_data[-1].height prev_block_record = None deficit = uint8(0) for idx, block in enumerate(recent_chain.recent_chain_data): required_iters = uint64(0) overflow = False ses = False height = block.height for sub_slot in block.finished_sub_slots: prev_challenge = challenge challenge = sub_slot.challenge_chain.get_hash() deficit = sub_slot.reward_chain.deficit if sub_slot.challenge_chain.subepoch_summary_hash is not None: ses = True assert summaries[ses_idx].get_hash() == sub_slot.challenge_chain.subepoch_summary_hash ses_idx += 1 if sub_slot.challenge_chain.new_sub_slot_iters is not None: ssi = sub_slot.challenge_chain.new_sub_slot_iters if sub_slot.challenge_chain.new_difficulty is not None: diff = sub_slot.challenge_chain.new_difficulty if (challenge is not None) and (prev_challenge is not None): overflow = is_overflow_block(constants, block.reward_chain_block.signage_point_index) deficit = get_deficit(constants, deficit, prev_block_record, overflow, len(block.finished_sub_slots)) log.debug(f"wp, validate block {block.height}") if sub_slots > 2 and transaction_blocks > 11 and (tip_height - block.height < last_blocks_to_validate): required_iters, error = validate_finished_header_block( constants, sub_blocks, block, False, diff, ssi, ses_blocks > 2 ) if error is not None: log.error(f"block {block.header_hash} failed validation {error}") return False else: required_iters = _validate_pospace_recent_chain( constants, block, challenge, diff, overflow, prev_challenge ) if required_iters is None: return False curr_block_ses = None if not ses else summaries[ses_idx - 1] block_record = header_block_to_sub_block_record( constants, required_iters, block, ssi, overflow, deficit, height, curr_block_ses ) log.debug(f"add block {block_record.height} to tmp sub blocks") sub_blocks.add_block_record(block_record) if block.first_in_sub_slot: sub_slots += 1 if block.is_transaction_block: transaction_blocks += 1 if ses: ses_blocks += 1 prev_block_record = block_record return True def _validate_pospace_recent_chain( constants: ConsensusConstants, block: HeaderBlock, challenge: bytes32, diff: uint64, overflow: bool, prev_challenge: bytes32, ): if block.reward_chain_block.challenge_chain_sp_vdf is None: # Edge case of first sp (start of slot), where sp_iters == 0 cc_sp_hash: bytes32 = challenge else: cc_sp_hash = block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash() assert cc_sp_hash is not None q_str = block.reward_chain_block.proof_of_space.verify_and_get_quality_string( constants, challenge if not overflow else prev_challenge, cc_sp_hash, ) if q_str is None: log.error(f"could not verify proof of space block {block.height} {overflow}") return None required_iters = calculate_iterations_quality( constants.DIFFICULTY_CONSTANT_FACTOR, q_str, block.reward_chain_block.proof_of_space.size, diff, cc_sp_hash, ) return required_iters def __validate_pospace( constants: ConsensusConstants, segment: SubEpochChallengeSegment, idx: int, curr_diff: uint64, ses: Optional[SubEpochSummary], first_in_sub_epoch: bool, ) -> Optional[uint64]: if first_in_sub_epoch and segment.sub_epoch_n == 0 and idx == 0: cc_sub_slot_hash = constants.GENESIS_CHALLENGE else: cc_sub_slot_hash = __get_cc_sub_slot(segment.sub_slots, idx, ses).get_hash() sub_slot_data: SubSlotData = segment.sub_slots[idx] if sub_slot_data.signage_point_index and is_overflow_block(constants, sub_slot_data.signage_point_index): curr_slot = segment.sub_slots[idx - 1] assert curr_slot.cc_slot_end_info challenge = curr_slot.cc_slot_end_info.challenge else: challenge = cc_sub_slot_hash if sub_slot_data.cc_sp_vdf_info is None: cc_sp_hash = cc_sub_slot_hash else: cc_sp_hash = sub_slot_data.cc_sp_vdf_info.output.get_hash() # validate proof of space assert sub_slot_data.proof_of_space is not None q_str = sub_slot_data.proof_of_space.verify_and_get_quality_string( constants, challenge, cc_sp_hash, ) if q_str is None: log.error("could not verify proof of space") return None return calculate_iterations_quality( constants.DIFFICULTY_CONSTANT_FACTOR, q_str, sub_slot_data.proof_of_space.size, curr_diff, cc_sp_hash, ) def __get_rc_sub_slot( constants: ConsensusConstants, segment: SubEpochChallengeSegment, summaries: List[SubEpochSummary], curr_ssi: uint64, ) -> RewardChainSubSlot: ses = summaries[uint32(segment.sub_epoch_n - 1)] # find first challenge in sub epoch first_idx = None first = None for idx, curr in enumerate(segment.sub_slots): if curr.cc_slot_end is None: first_idx = idx first = curr break assert first_idx idx = first_idx slots = segment.sub_slots # number of slots to look for slots_n = 1 assert first assert first.signage_point_index is not None if is_overflow_block(constants, first.signage_point_index): if idx >= 2 and slots[idx - 2].cc_slot_end is None: slots_n = 2 new_diff = None if ses is None else ses.new_difficulty new_ssi = None if ses is None else ses.new_sub_slot_iters ses_hash = None if ses is None else ses.get_hash() overflow = is_overflow_block(constants, first.signage_point_index) if overflow: if idx >= 2 and slots[idx - 2].cc_slot_end is not None and slots[idx - 1].cc_slot_end is not None: ses_hash = None new_ssi = None new_diff = None sub_slot = slots[idx] while True: if sub_slot.cc_slot_end: slots_n -= 1 if slots_n == 0: break idx -= 1 sub_slot = slots[idx] icc_sub_slot_hash: Optional[bytes32] = None assert sub_slot is not None assert sub_slot.cc_slot_end_info is not None assert segment.rc_slot_end_info is not None if idx != 0: cc_vdf_info = VDFInfo(sub_slot.cc_slot_end_info.challenge, curr_ssi, sub_slot.cc_slot_end_info.output) if sub_slot.icc_slot_end_info is not None: icc_slot_end_info = VDFInfo( sub_slot.icc_slot_end_info.challenge, curr_ssi, sub_slot.icc_slot_end_info.output ) icc_sub_slot_hash = icc_slot_end_info.get_hash() else: cc_vdf_info = sub_slot.cc_slot_end_info if sub_slot.icc_slot_end_info is not None: icc_sub_slot_hash = sub_slot.icc_slot_end_info.get_hash() cc_sub_slot = ChallengeChainSubSlot( cc_vdf_info, icc_sub_slot_hash, ses_hash, new_ssi, new_diff, ) rc_sub_slot = RewardChainSubSlot( segment.rc_slot_end_info, cc_sub_slot.get_hash(), icc_sub_slot_hash, constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK, ) return rc_sub_slot def __get_cc_sub_slot(sub_slots: List[SubSlotData], idx, ses: Optional[SubEpochSummary]) -> ChallengeChainSubSlot: sub_slot: Optional[SubSlotData] = None for i in reversed(range(0, idx)): sub_slot = sub_slots[i] if sub_slot.cc_slot_end_info is not None: break assert sub_slot is not None assert sub_slot.cc_slot_end_info is not None icc_vdf = sub_slot.icc_slot_end_info icc_vdf_hash: Optional[bytes32] = None if icc_vdf is not None: icc_vdf_hash = icc_vdf.get_hash() cc_sub_slot = ChallengeChainSubSlot( sub_slot.cc_slot_end_info, icc_vdf_hash, None if ses is None else ses.get_hash(), None if ses is None else ses.new_sub_slot_iters, None if ses is None else ses.new_difficulty, ) return cc_sub_slot def _get_curr_diff_ssi(constants: ConsensusConstants, idx, summaries): curr_difficulty = constants.DIFFICULTY_STARTING curr_ssi = constants.SUB_SLOT_ITERS_STARTING for ses in reversed(summaries[0:idx]): if ses.new_sub_slot_iters is not None: curr_ssi = ses.new_sub_slot_iters curr_difficulty = ses.new_difficulty break return curr_difficulty, curr_ssi def vars_to_bytes(constants, summaries, weight_proof): constants_dict = recurse_jsonify(dataclasses.asdict(constants)) wp_recent_chain_bytes = bytes(RecentChainData(weight_proof.recent_chain_data)) wp_segment_bytes = bytes(SubEpochSegments(weight_proof.sub_epoch_segments)) summary_bytes = [] for summary in summaries: summary_bytes.append(bytes(summary)) return constants_dict, summary_bytes, wp_segment_bytes, wp_recent_chain_bytes def bytes_to_vars(constants_dict, summaries_bytes): summaries = [] for summary in summaries_bytes: summaries.append(SubEpochSummary.from_bytes(summary)) constants: ConsensusConstants = dataclass_from_dict(ConsensusConstants, constants_dict) return constants, summaries def _get_last_ses_hash( constants: ConsensusConstants, recent_reward_chain: List[HeaderBlock] ) -> Tuple[Optional[bytes32], uint32]: for idx, block in enumerate(reversed(recent_reward_chain)): if (block.reward_chain_block.height % constants.SUB_EPOCH_BLOCKS) == 0: idx = len(recent_reward_chain) - 1 - idx # reverse # find first block after sub slot end while idx < len(recent_reward_chain): curr = recent_reward_chain[idx] if len(curr.finished_sub_slots) > 0: for slot in curr.finished_sub_slots: if slot.challenge_chain.subepoch_summary_hash is not None: return ( slot.challenge_chain.subepoch_summary_hash, curr.reward_chain_block.height, ) idx += 1 return None, uint32(0) def _get_ses_idx(recent_reward_chain: List[HeaderBlock]) -> List[int]: idxs: List[int] = [] for idx, curr in enumerate(recent_reward_chain): if len(curr.finished_sub_slots) > 0: for slot in curr.finished_sub_slots: if slot.challenge_chain.subepoch_summary_hash is not None: idxs.append(idx) return idxs def get_deficit( constants: ConsensusConstants, curr_deficit: uint8, prev_block: BlockRecord, overflow: bool, num_finished_sub_slots: int, ) -> uint8: if prev_block is None: if curr_deficit >= 1 and not (overflow and curr_deficit == constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK): curr_deficit -= 1 return curr_deficit return calculate_deficit(constants, uint32(prev_block.height + 1), prev_block, overflow, num_finished_sub_slots) def get_sp_total_iters(constants: ConsensusConstants, is_overflow: bool, ssi: uint64, sub_slot_data: SubSlotData): assert sub_slot_data.cc_ip_vdf_info is not None assert sub_slot_data.total_iters is not None assert sub_slot_data.signage_point_index is not None sp_iters: uint64 = calculate_sp_iters(constants, ssi, sub_slot_data.signage_point_index) ip_iters: uint64 = sub_slot_data.cc_ip_vdf_info.number_of_iterations sp_sub_slot_total_iters = uint128(sub_slot_data.total_iters - ip_iters) if is_overflow: sp_sub_slot_total_iters = uint128(sp_sub_slot_total_iters - ssi) return sp_sub_slot_total_iters + sp_iters def blue_boxed_end_of_slot(sub_slot: EndOfSubSlotBundle): if sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity: if sub_slot.proofs.infused_challenge_chain_slot_proof is not None: if sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity: return True else: return True return False def validate_sub_epoch_sampling(rng, sub_epoch_weight_list, weight_proof): tip = weight_proof.recent_chain_data[-1] weight_to_check = _get_weights_for_sampling(rng, tip.weight, weight_proof.recent_chain_data) sampled_sub_epochs: dict[int, bool] = {} for idx in range(1, len(sub_epoch_weight_list)): if _sample_sub_epoch(sub_epoch_weight_list[idx - 1], sub_epoch_weight_list[idx], weight_to_check): sampled_sub_epochs[idx - 1] = True if len(sampled_sub_epochs) == WeightProofHandler.MAX_SAMPLES: break curr_sub_epoch_n = -1 for sub_epoch_segment in weight_proof.sub_epoch_segments: if curr_sub_epoch_n < sub_epoch_segment.sub_epoch_n: if sub_epoch_segment.sub_epoch_n in sampled_sub_epochs: del sampled_sub_epochs[sub_epoch_segment.sub_epoch_n] curr_sub_epoch_n = sub_epoch_segment.sub_epoch_n if len(sampled_sub_epochs) > 0: return False return True def map_segments_by_sub_epoch(sub_epoch_segments) -> Dict[int, List[SubEpochChallengeSegment]]: segments: Dict[int, List[SubEpochChallengeSegment]] = {} curr_sub_epoch_n = -1 for idx, segment in enumerate(sub_epoch_segments): if curr_sub_epoch_n < segment.sub_epoch_n: curr_sub_epoch_n = segment.sub_epoch_n segments[curr_sub_epoch_n] = [] segments[curr_sub_epoch_n].append(segment) return segments def validate_total_iters( segment: SubEpochChallengeSegment, sub_slot_data_idx, expected_sub_slot_iters: uint64, finished_sub_slots_since_prev: int, prev_b: SubSlotData, prev_sub_slot_data_iters, genesis, ) -> bool: sub_slot_data = segment.sub_slots[sub_slot_data_idx] if genesis: total_iters: uint128 = uint128(expected_sub_slot_iters * finished_sub_slots_since_prev) elif segment.sub_slots[sub_slot_data_idx - 1].is_end_of_slot(): assert prev_b.total_iters assert prev_b.cc_ip_vdf_info total_iters = prev_b.total_iters # Add the rest of the slot of prev_b total_iters = uint128(total_iters + prev_sub_slot_data_iters - prev_b.cc_ip_vdf_info.number_of_iterations) # Add other empty slots total_iters = uint128(total_iters + (expected_sub_slot_iters * (finished_sub_slots_since_prev - 1))) else: # Slot iters is guaranteed to be the same for header_block and prev_b # This takes the beginning of the slot, and adds ip_iters assert prev_b.cc_ip_vdf_info assert prev_b.total_iters total_iters = uint128(prev_b.total_iters - prev_b.cc_ip_vdf_info.number_of_iterations) total_iters = uint128(total_iters + sub_slot_data.cc_ip_vdf_info.number_of_iterations) return total_iters == sub_slot_data.total_iters