import copy import logging import os import random import shutil import sys import tempfile import time from argparse import Namespace from dataclasses import replace from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple from blspy import AugSchemeMPL, G1Element, G2Element, PrivateKey from chia.cmds.init_funcs import create_all_ssl, create_default_chia_config from chia.full_node.bundle_tools import ( best_solution_generator_from_template, detect_potential_template_generator, simple_solution_generator, ) from chia.plotting.create_plots import create_plots from chia.consensus.block_creation import create_unfinished_block, unfinished_block_to_full_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain_interface import BlockchainInterface from chia.consensus.coinbase import create_puzzlehash_for_pk from chia.consensus.constants import ConsensusConstants from chia.consensus.default_constants import DEFAULT_CONSTANTS from chia.consensus.deficit import calculate_deficit from chia.consensus.full_block_to_block_record import block_to_block_record from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary from chia.consensus.pot_iterations import ( calculate_ip_iters, calculate_iterations_quality, calculate_sp_interval_iters, calculate_sp_iters, is_overflow_block, ) from chia.consensus.vdf_info_computation import get_signage_point_vdf_info from chia.full_node.signage_point import SignagePoint from chia.plotting.plot_tools import PlotInfo, load_plots, parse_plot_info from chia.types.blockchain_format.classgroup import ClassgroupElement from chia.types.blockchain_format.coin import Coin from chia.types.blockchain_format.pool_target import PoolTarget from chia.types.blockchain_format.proof_of_space import ProofOfSpace from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.blockchain_format.slots import ( ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs, ) from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary from chia.types.blockchain_format.vdf import VDFInfo, VDFProof from chia.types.end_of_slot_bundle import EndOfSubSlotBundle from chia.types.full_block import FullBlock from chia.types.generator_types import BlockGenerator, CompressorArg from chia.types.spend_bundle import SpendBundle from chia.types.unfinished_block import UnfinishedBlock from chia.util.bech32m import encode_puzzle_hash from chia.util.block_cache import BlockCache from chia.util.config import load_config, save_config from chia.util.hash import std_hash from chia.util.ints import uint8, uint32, uint64, uint128 from chia.util.keychain import Keychain, bytes_to_mnemonic from chia.util.path import mkdir from chia.util.vdf_prover import get_vdf_info_and_proof from chia.util.wallet_tools import WalletTool from chia.wallet.derive_keys import ( master_sk_to_farmer_sk, master_sk_to_local_sk, master_sk_to_pool_sk, master_sk_to_wallet_sk, ) test_constants = DEFAULT_CONSTANTS.replace( **{ "MIN_PLOT_SIZE": 18, "MIN_BLOCKS_PER_CHALLENGE_BLOCK": 12, "DIFFICULTY_STARTING": 2 ** 12, "DISCRIMINANT_SIZE_BITS": 16, "SUB_EPOCH_BLOCKS": 170, "WEIGHT_PROOF_THRESHOLD": 2, "WEIGHT_PROOF_RECENT_BLOCKS": 380, "DIFFICULTY_CONSTANT_FACTOR": 33554432, "NUM_SPS_SUB_SLOT": 16, # Must be a power of 2 "MAX_SUB_SLOT_BLOCKS": 50, "EPOCH_BLOCKS": 340, "BLOCKS_CACHE_SIZE": 340 + 3 * 50, # Coordinate with the above values "SUB_SLOT_TIME_TARGET": 600, # The target number of seconds per slot, mainnet 600 "SUB_SLOT_ITERS_STARTING": 2 ** 10, # Must be a multiple of 64 "NUMBER_ZERO_BITS_PLOT_FILTER": 1, # H(plot signature of the challenge) must start with these many zeroes "MAX_FUTURE_TIME": 3600 * 24 * 10, # Allows creating blockchains with timestamps up to 10 days in the future, for testing "COST_PER_BYTE": 1337, "MEMPOOL_BLOCK_BUFFER": 6, "INITIAL_FREEZE_END_TIMESTAMP": int(time.time()) - 1, "NETWORK_TYPE": 1, } ) log = logging.getLogger(__name__) class BlockTools: """ Tools to generate blocks for testing. """ def __init__( self, constants: ConsensusConstants = test_constants, root_path: Optional[Path] = None, const_dict=None ): self._tempdir = None if root_path is None: self._tempdir = tempfile.TemporaryDirectory() root_path = Path(self._tempdir.name) self.root_path = root_path create_default_chia_config(root_path) self.keychain = Keychain("testing-1.8.0", True) self.keychain.delete_all_keys() self.farmer_master_sk_entropy = std_hash(b"block_tools farmer key") self.pool_master_sk_entropy = std_hash(b"block_tools pool key") self.farmer_master_sk = self.keychain.add_private_key(bytes_to_mnemonic(self.farmer_master_sk_entropy), "") self.pool_master_sk = self.keychain.add_private_key(bytes_to_mnemonic(self.pool_master_sk_entropy), "") self.farmer_pk = master_sk_to_farmer_sk(self.farmer_master_sk).get_g1() self.pool_pk = master_sk_to_pool_sk(self.pool_master_sk).get_g1() self.farmer_ph: bytes32 = create_puzzlehash_for_pk( master_sk_to_wallet_sk(self.farmer_master_sk, uint32(0)).get_g1() ) self.pool_ph: bytes32 = create_puzzlehash_for_pk( master_sk_to_wallet_sk(self.pool_master_sk, uint32(0)).get_g1() ) self.init_plots(root_path) create_all_ssl(root_path) self.all_sks: List[PrivateKey] = [sk for sk, _ in self.keychain.get_all_private_keys()] self.pool_pubkeys: List[G1Element] = [master_sk_to_pool_sk(sk).get_g1() for sk in self.all_sks] farmer_pubkeys: List[G1Element] = [master_sk_to_farmer_sk(sk).get_g1() for sk in self.all_sks] if len(self.pool_pubkeys) == 0 or len(farmer_pubkeys) == 0: raise RuntimeError("Keys not generated. Run `chia generate keys`") _, loaded_plots, _, _ = load_plots({}, {}, farmer_pubkeys, self.pool_pubkeys, None, False, root_path) self.plots: Dict[Path, PlotInfo] = loaded_plots self.local_sk_cache: Dict[bytes32, PrivateKey] = {} self._config = load_config(self.root_path, "config.yaml") self._config["logging"]["log_stdout"] = True self._config["selected_network"] = "testnet0" for service in ["harvester", "farmer", "full_node", "wallet", "introducer", "timelord", "pool"]: self._config[service]["selected_network"] = "testnet0" save_config(self.root_path, "config.yaml", self._config) overrides = self._config["network_overrides"]["constants"][self._config["selected_network"]] updated_constants = constants.replace_str_to_bytes(**overrides) if const_dict is not None: updated_constants = updated_constants.replace(**const_dict) self.constants = updated_constants def change_config(self, new_config: Dict): self._config = new_config overrides = self._config["network_overrides"]["constants"][self._config["selected_network"]] updated_constants = self.constants.replace_str_to_bytes(**overrides) self.constants = updated_constants save_config(self.root_path, "config.yaml", self._config) def init_plots(self, root_path: Path): plot_dir = get_plot_dir() mkdir(plot_dir) temp_dir = plot_dir / "tmp" mkdir(temp_dir) num_pool_public_key_plots = 15 num_pool_address_plots = 5 args = Namespace() # Can't go much lower than 20, since plots start having no solutions and more buggy args.size = 22 # Uses many plots for testing, in order to guarantee proofs of space at every height args.num = num_pool_public_key_plots # Some plots created to a pool public key, and some to a pool puzzle hash args.buffer = 100 args.farmer_public_key = bytes(self.farmer_pk).hex() args.pool_public_key = bytes(self.pool_pk).hex() args.pool_contract_address = None args.tmp_dir = temp_dir args.tmp2_dir = plot_dir args.final_dir = plot_dir args.plotid = None args.memo = None args.buckets = 0 args.stripe_size = 2000 args.num_threads = 0 args.nobitfield = False args.exclude_final_dir = False args.list_duplicates = False test_private_keys = [ AugSchemeMPL.key_gen(std_hash(i.to_bytes(2, "big"))) for i in range(num_pool_public_key_plots + num_pool_address_plots) ] try: # No datetime in the filename, to get deterministic filenames and not re-plot create_plots( args, root_path, use_datetime=False, test_private_keys=test_private_keys[:num_pool_public_key_plots], ) # Create more plots, but to a pool address instead of public key args.pool_public_key = None args.pool_contract_address = encode_puzzle_hash(self.pool_ph, "xch") args.num = num_pool_address_plots create_plots( args, root_path, use_datetime=False, test_private_keys=test_private_keys[num_pool_public_key_plots:], ) except KeyboardInterrupt: shutil.rmtree(plot_dir, ignore_errors=True) sys.exit(1) @property def config(self) -> Dict: return copy.deepcopy(self._config) def get_plot_signature(self, m: bytes32, plot_pk: G1Element) -> G2Element: """ Returns the plot signature of the header data. """ farmer_sk = master_sk_to_farmer_sk(self.all_sks[0]) for _, plot_info in self.plots.items(): if plot_pk == plot_info.plot_public_key: # Look up local_sk from plot to save locked memory if plot_info.prover.get_id() in self.local_sk_cache: local_master_sk = self.local_sk_cache[plot_info.prover.get_id()] else: _, _, local_master_sk = parse_plot_info(plot_info.prover.get_memo()) self.local_sk_cache[plot_info.prover.get_id()] = local_master_sk local_sk = master_sk_to_local_sk(local_master_sk) agg_pk = ProofOfSpace.generate_plot_public_key(local_sk.get_g1(), farmer_sk.get_g1()) assert agg_pk == plot_pk harv_share = AugSchemeMPL.sign(local_sk, m, agg_pk) farm_share = AugSchemeMPL.sign(farmer_sk, m, agg_pk) return AugSchemeMPL.aggregate([harv_share, farm_share]) raise ValueError(f"Do not have key {plot_pk}") def get_pool_key_signature(self, pool_target: PoolTarget, pool_pk: Optional[G1Element]) -> Optional[G2Element]: # Returns the pool signature for the corresponding pk. If no pk is provided, returns None. if pool_pk is None: return None for sk in self.all_sks: sk_child = master_sk_to_pool_sk(sk) if sk_child.get_g1() == pool_pk: return AugSchemeMPL.sign(sk_child, bytes(pool_target)) raise ValueError(f"Do not have key {pool_pk}") def get_farmer_wallet_tool(self) -> WalletTool: return WalletTool(self.constants, self.farmer_master_sk) def get_pool_wallet_tool(self) -> WalletTool: return WalletTool(self.constants, self.pool_master_sk) def get_consecutive_blocks( self, num_blocks: int, block_list_input: List[FullBlock] = None, farmer_reward_puzzle_hash: Optional[bytes32] = None, pool_reward_puzzle_hash: Optional[bytes32] = None, transaction_data: Optional[SpendBundle] = None, seed: bytes = b"", time_per_block: Optional[float] = None, force_overflow: bool = False, skip_slots: int = 0, # Force at least this number of empty slots before the first SB guarantee_transaction_block: bool = False, # Force that this block must be a tx block normalized_to_identity_cc_eos: bool = False, normalized_to_identity_icc_eos: bool = False, normalized_to_identity_cc_sp: bool = False, normalized_to_identity_cc_ip: bool = False, current_time: bool = False, previous_generator: CompressorArg = None, genesis_timestamp: Optional[uint64] = None, ) -> List[FullBlock]: assert num_blocks > 0 if block_list_input is not None: block_list = block_list_input.copy() else: block_list = [] constants = self.constants transaction_data_included = False if time_per_block is None: time_per_block = float(constants.SUB_SLOT_TIME_TARGET) / float(constants.SLOT_BLOCKS_TARGET) if farmer_reward_puzzle_hash is None: farmer_reward_puzzle_hash = self.farmer_ph if len(block_list) == 0: initial_block_list_len = 0 genesis = self.create_genesis_block( constants, seed, force_overflow=force_overflow, skip_slots=skip_slots, timestamp=(uint64(int(time.time())) if genesis_timestamp is None else genesis_timestamp), ) log.info(f"Created block 0 iters: {genesis.total_iters}") num_empty_slots_added = skip_slots block_list = [genesis] num_blocks -= 1 else: initial_block_list_len = len(block_list) num_empty_slots_added = uint32(0) # Allows forcing empty slots in the beginning, for testing purposes if num_blocks == 0: return block_list height_to_hash, difficulty, blocks = load_block_list(block_list, constants) latest_block: BlockRecord = blocks[block_list[-1].header_hash] curr = latest_block while not curr.is_transaction_block: curr = blocks[curr.prev_hash] start_timestamp = curr.timestamp start_height = curr.height curr = latest_block blocks_added_this_sub_slot = 1 while not curr.first_in_sub_slot: curr = blocks[curr.prev_hash] blocks_added_this_sub_slot += 1 finished_sub_slots_at_sp: List[EndOfSubSlotBundle] = [] # Sub-slots since last block, up to signage point finished_sub_slots_at_ip: List[EndOfSubSlotBundle] = [] # Sub-slots since last block, up to infusion point sub_slot_iters: uint64 = latest_block.sub_slot_iters # The number of iterations in one sub-slot same_slot_as_last = True # Only applies to first slot, to prevent old blocks from being added sub_slot_start_total_iters: uint128 = latest_block.ip_sub_slot_total_iters(constants) sub_slots_finished = 0 pending_ses: bool = False # Start at the last block in block list # Get the challenge for that slot while True: slot_cc_challenge, slot_rc_challenge = get_challenges( constants, blocks, finished_sub_slots_at_sp, latest_block.header_hash, ) prev_num_of_blocks = num_blocks if num_empty_slots_added < skip_slots: # If did not reach the target slots to skip, don't make any proofs for this sub-slot num_empty_slots_added += 1 else: # Loop over every signage point (Except for the last ones, which are used for overflows) for signage_point_index in range(0, constants.NUM_SPS_SUB_SLOT - constants.NUM_SP_INTERVALS_EXTRA): curr = latest_block while curr.total_iters > sub_slot_start_total_iters + calculate_sp_iters( constants, sub_slot_iters, uint8(signage_point_index) ): if curr.height == 0: break curr = blocks[curr.prev_hash] if curr.total_iters > sub_slot_start_total_iters: finished_sub_slots_at_sp = [] if same_slot_as_last: if signage_point_index < latest_block.signage_point_index: # Ignore this signage_point because it's in the past continue signage_point: SignagePoint = get_signage_point( constants, BlockCache(blocks), latest_block, sub_slot_start_total_iters, uint8(signage_point_index), finished_sub_slots_at_sp, sub_slot_iters, normalized_to_identity_cc_sp, ) if signage_point_index == 0: cc_sp_output_hash: bytes32 = slot_cc_challenge else: assert signage_point.cc_vdf is not None cc_sp_output_hash = signage_point.cc_vdf.output.get_hash() qualified_proofs: List[Tuple[uint64, ProofOfSpace]] = self.get_pospaces_for_challenge( constants, slot_cc_challenge, cc_sp_output_hash, seed, difficulty, sub_slot_iters, ) for required_iters, proof_of_space in sorted(qualified_proofs, key=lambda t: t[0]): if blocks_added_this_sub_slot == constants.MAX_SUB_SLOT_BLOCKS or force_overflow: break if same_slot_as_last: if signage_point_index == latest_block.signage_point_index: # Ignore this block because it's in the past if required_iters <= latest_block.required_iters: continue assert latest_block.header_hash in blocks additions = None removals = None if transaction_data_included: transaction_data = None if transaction_data is not None and not transaction_data_included: additions = transaction_data.additions() removals = transaction_data.removals() assert start_timestamp is not None if proof_of_space.pool_contract_puzzle_hash is not None: if pool_reward_puzzle_hash is not None: # The caller wants to be paid to a specific address, but this PoSpace is tied to an # address, so continue until a proof of space tied to a pk is found continue pool_target = PoolTarget(proof_of_space.pool_contract_puzzle_hash, uint32(0)) else: if pool_reward_puzzle_hash is not None: pool_target = PoolTarget(pool_reward_puzzle_hash, uint32(0)) else: pool_target = PoolTarget(self.pool_ph, uint32(0)) if transaction_data is not None: if previous_generator is not None: block_generator: Optional[BlockGenerator] = best_solution_generator_from_template( previous_generator, transaction_data ) else: block_generator = simple_solution_generator(transaction_data) aggregate_signature = transaction_data.aggregated_signature else: block_generator = None aggregate_signature = G2Element() full_block, block_record = get_full_block_and_block_record( constants, blocks, sub_slot_start_total_iters, uint8(signage_point_index), proof_of_space, slot_cc_challenge, slot_rc_challenge, farmer_reward_puzzle_hash, pool_target, start_timestamp, start_height, time_per_block, block_generator, aggregate_signature, additions, removals, height_to_hash, difficulty, required_iters, sub_slot_iters, self.get_plot_signature, self.get_pool_key_signature, finished_sub_slots_at_ip, signage_point, latest_block, seed, normalized_to_identity_cc_ip, current_time=current_time, ) if block_record.is_transaction_block: transaction_data_included = True else: if guarantee_transaction_block: continue if pending_ses: pending_ses = False block_list.append(full_block) if full_block.transactions_generator is not None: compressor_arg = detect_potential_template_generator( full_block.height, full_block.transactions_generator ) if compressor_arg is not None: previous_generator = compressor_arg blocks_added_this_sub_slot += 1 blocks[full_block.header_hash] = block_record log.info(f"Created block {block_record.height} ove=False, iters " f"{block_record.total_iters}") height_to_hash[uint32(full_block.height)] = full_block.header_hash latest_block = blocks[full_block.header_hash] finished_sub_slots_at_ip = [] num_blocks -= 1 if num_blocks == 0: return block_list # Finish the end of sub-slot and try again next sub-slot # End of sub-slot logic if len(finished_sub_slots_at_ip) == 0: # Block has been created within this sub-slot eos_iters: uint64 = uint64(sub_slot_iters - (latest_block.total_iters - sub_slot_start_total_iters)) cc_input: ClassgroupElement = latest_block.challenge_vdf_output rc_challenge: bytes32 = latest_block.reward_infusion_new_challenge else: # No blocks were successfully created within this sub-slot eos_iters = sub_slot_iters cc_input = ClassgroupElement.get_default_element() rc_challenge = slot_rc_challenge cc_vdf, cc_proof = get_vdf_info_and_proof( constants, cc_input, slot_cc_challenge, eos_iters, ) rc_vdf, rc_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), rc_challenge, eos_iters, ) eos_deficit: uint8 = ( latest_block.deficit if latest_block.deficit > 0 else constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK ) icc_eos_vdf, icc_ip_proof = get_icc( constants, uint128(sub_slot_start_total_iters + sub_slot_iters), finished_sub_slots_at_ip, latest_block, blocks, sub_slot_start_total_iters, eos_deficit, ) # End of slot vdf info for icc and cc have to be from challenge block or start of slot, respectively, # in order for light clients to validate. cc_vdf = VDFInfo(cc_vdf.challenge, sub_slot_iters, cc_vdf.output) if normalized_to_identity_cc_eos: _, cc_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), cc_vdf.challenge, sub_slot_iters, True, ) if pending_ses: sub_epoch_summary: Optional[SubEpochSummary] = None else: sub_epoch_summary = next_sub_epoch_summary( constants, BlockCache(blocks, height_to_hash), latest_block.required_iters, block_list[-1], False, ) pending_ses = True if sub_epoch_summary is not None: ses_hash = sub_epoch_summary.get_hash() new_sub_slot_iters: Optional[uint64] = sub_epoch_summary.new_sub_slot_iters new_difficulty: Optional[uint64] = sub_epoch_summary.new_difficulty log.info(f"Sub epoch summary: {sub_epoch_summary}") else: ses_hash = None new_sub_slot_iters = None new_difficulty = None if icc_eos_vdf is not None: # Icc vdf (Deficit of latest block is <= 4) if len(finished_sub_slots_at_ip) == 0: # This means there are blocks in this sub-slot curr = latest_block while not curr.is_challenge_block(constants) and not curr.first_in_sub_slot: curr = blocks[curr.prev_hash] if curr.is_challenge_block(constants): icc_eos_iters = uint64(sub_slot_start_total_iters + sub_slot_iters - curr.total_iters) else: icc_eos_iters = sub_slot_iters else: # This means there are no blocks in this sub-slot icc_eos_iters = sub_slot_iters icc_eos_vdf = VDFInfo( icc_eos_vdf.challenge, icc_eos_iters, icc_eos_vdf.output, ) if normalized_to_identity_icc_eos: _, icc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), icc_eos_vdf.challenge, icc_eos_iters, True, ) icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = InfusedChallengeChainSubSlot(icc_eos_vdf) assert icc_sub_slot is not None icc_sub_slot_hash = icc_sub_slot.get_hash() if latest_block.deficit == 0 else None cc_sub_slot = ChallengeChainSubSlot( cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty, ) else: # No icc icc_sub_slot = None cc_sub_slot = ChallengeChainSubSlot(cc_vdf, None, ses_hash, new_sub_slot_iters, new_difficulty) finished_sub_slots_at_ip.append( EndOfSubSlotBundle( cc_sub_slot, icc_sub_slot, RewardChainSubSlot( rc_vdf, cc_sub_slot.get_hash(), icc_sub_slot.get_hash() if icc_sub_slot is not None else None, eos_deficit, ), SubSlotProofs(cc_proof, icc_ip_proof, rc_proof), ) ) finished_sub_slots_eos = finished_sub_slots_at_ip.copy() latest_block_eos = latest_block overflow_cc_challenge = finished_sub_slots_at_ip[-1].challenge_chain.get_hash() overflow_rc_challenge = finished_sub_slots_at_ip[-1].reward_chain.get_hash() additions = None removals = None if transaction_data_included: transaction_data = None if transaction_data is not None and not transaction_data_included: additions = transaction_data.additions() removals = transaction_data.removals() sub_slots_finished += 1 log.info( f"Sub slot finished. blocks included: {blocks_added_this_sub_slot} blocks_per_slot: " f"{(len(block_list) - initial_block_list_len)/sub_slots_finished}" ) blocks_added_this_sub_slot = 0 # Sub slot ended, overflows are in next sub slot # Handle overflows: No overflows on new epoch if new_sub_slot_iters is None and num_empty_slots_added >= skip_slots and new_difficulty is None: for signage_point_index in range( constants.NUM_SPS_SUB_SLOT - constants.NUM_SP_INTERVALS_EXTRA, constants.NUM_SPS_SUB_SLOT, ): # note that we are passing in the finished slots which include the last slot signage_point = get_signage_point( constants, BlockCache(blocks), latest_block_eos, sub_slot_start_total_iters, uint8(signage_point_index), finished_sub_slots_eos, sub_slot_iters, normalized_to_identity_cc_sp, ) if signage_point_index == 0: cc_sp_output_hash = slot_cc_challenge else: assert signage_point is not None assert signage_point.cc_vdf is not None cc_sp_output_hash = signage_point.cc_vdf.output.get_hash() # If did not reach the target slots to skip, don't make any proofs for this sub-slot qualified_proofs = self.get_pospaces_for_challenge( constants, slot_cc_challenge, cc_sp_output_hash, seed, difficulty, sub_slot_iters, ) for required_iters, proof_of_space in sorted(qualified_proofs, key=lambda t: t[0]): if blocks_added_this_sub_slot == constants.MAX_SUB_SLOT_BLOCKS: break assert start_timestamp is not None if proof_of_space.pool_contract_puzzle_hash is not None: if pool_reward_puzzle_hash is not None: # The caller wants to be paid to a specific address, but this PoSpace is tied to an # address, so continue until a proof of space tied to a pk is found continue pool_target = PoolTarget(proof_of_space.pool_contract_puzzle_hash, uint32(0)) else: if pool_reward_puzzle_hash is not None: pool_target = PoolTarget(pool_reward_puzzle_hash, uint32(0)) else: pool_target = PoolTarget(self.pool_ph, uint32(0)) if transaction_data is not None: if previous_generator is not None: block_generator = best_solution_generator_from_template( previous_generator, transaction_data ) else: block_generator = simple_solution_generator(transaction_data) aggregate_signature = transaction_data.aggregated_signature else: block_generator = None aggregate_signature = G2Element() full_block, block_record = get_full_block_and_block_record( constants, blocks, sub_slot_start_total_iters, uint8(signage_point_index), proof_of_space, slot_cc_challenge, slot_rc_challenge, farmer_reward_puzzle_hash, pool_target, start_timestamp, start_height, time_per_block, block_generator, aggregate_signature, additions, removals, height_to_hash, difficulty, required_iters, sub_slot_iters, self.get_plot_signature, self.get_pool_key_signature, finished_sub_slots_at_ip, signage_point, latest_block, seed, overflow_cc_challenge=overflow_cc_challenge, overflow_rc_challenge=overflow_rc_challenge, normalized_to_identity_cc_ip=normalized_to_identity_cc_ip, current_time=current_time, ) if block_record.is_transaction_block: transaction_data_included = True elif guarantee_transaction_block: continue if pending_ses: pending_ses = False block_list.append(full_block) if full_block.transactions_generator is not None: compressor_arg = detect_potential_template_generator( full_block.height, full_block.transactions_generator ) if compressor_arg is not None: previous_generator = compressor_arg blocks_added_this_sub_slot += 1 log.info(f"Created block {block_record.height } ov=True, iters " f"{block_record.total_iters}") num_blocks -= 1 if num_blocks == 0: return block_list blocks[full_block.header_hash] = block_record height_to_hash[uint32(full_block.height)] = full_block.header_hash latest_block = blocks[full_block.header_hash] finished_sub_slots_at_ip = [] finished_sub_slots_at_sp = finished_sub_slots_eos.copy() same_slot_as_last = False sub_slot_start_total_iters = uint128(sub_slot_start_total_iters + sub_slot_iters) if num_blocks < prev_num_of_blocks: num_empty_slots_added += 1 if new_sub_slot_iters is not None: assert new_difficulty is not None sub_slot_iters = new_sub_slot_iters difficulty = new_difficulty def create_genesis_block( self, constants: ConsensusConstants, seed: bytes32 = b"", timestamp: Optional[uint64] = None, force_overflow: bool = False, skip_slots: int = 0, ) -> FullBlock: if timestamp is None: timestamp = uint64(int(time.time())) finished_sub_slots: List[EndOfSubSlotBundle] = [] unfinished_block: Optional[UnfinishedBlock] = None ip_iters: uint64 = uint64(0) sub_slot_total_iters: uint128 = uint128(0) # Keep trying until we get a good proof of space that also passes sp filter while True: cc_challenge, rc_challenge = get_challenges(constants, {}, finished_sub_slots, None) for signage_point_index in range(0, constants.NUM_SPS_SUB_SLOT): signage_point: SignagePoint = get_signage_point( constants, BlockCache({}, {}), None, sub_slot_total_iters, uint8(signage_point_index), finished_sub_slots, constants.SUB_SLOT_ITERS_STARTING, ) if signage_point_index == 0: cc_sp_output_hash: bytes32 = cc_challenge else: assert signage_point is not None assert signage_point.cc_vdf is not None cc_sp_output_hash = signage_point.cc_vdf.output.get_hash() # If did not reach the target slots to skip, don't make any proofs for this sub-slot qualified_proofs: List[Tuple[uint64, ProofOfSpace]] = self.get_pospaces_for_challenge( constants, cc_challenge, cc_sp_output_hash, seed, constants.DIFFICULTY_STARTING, constants.SUB_SLOT_ITERS_STARTING, ) # Try each of the proofs of space for required_iters, proof_of_space in qualified_proofs: sp_iters: uint64 = calculate_sp_iters( constants, uint64(constants.SUB_SLOT_ITERS_STARTING), uint8(signage_point_index), ) ip_iters = calculate_ip_iters( constants, uint64(constants.SUB_SLOT_ITERS_STARTING), uint8(signage_point_index), required_iters, ) is_overflow = is_overflow_block(constants, uint8(signage_point_index)) if force_overflow and not is_overflow: continue if len(finished_sub_slots) < skip_slots: continue unfinished_block = create_unfinished_block( constants, sub_slot_total_iters, constants.SUB_SLOT_ITERS_STARTING, uint8(signage_point_index), sp_iters, ip_iters, proof_of_space, cc_challenge, constants.GENESIS_PRE_FARM_FARMER_PUZZLE_HASH, PoolTarget(constants.GENESIS_PRE_FARM_POOL_PUZZLE_HASH, uint32(0)), self.get_plot_signature, self.get_pool_key_signature, signage_point, timestamp, BlockCache({}), seed=seed, finished_sub_slots_input=finished_sub_slots, ) assert unfinished_block is not None if not is_overflow: cc_ip_vdf, cc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), cc_challenge, ip_iters, ) cc_ip_vdf = replace(cc_ip_vdf, number_of_iterations=ip_iters) rc_ip_vdf, rc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), rc_challenge, ip_iters, ) assert unfinished_block is not None total_iters_sp = uint128(sub_slot_total_iters + sp_iters) return unfinished_block_to_full_block( unfinished_block, cc_ip_vdf, cc_ip_proof, rc_ip_vdf, rc_ip_proof, None, None, finished_sub_slots, None, BlockCache({}), total_iters_sp, constants.DIFFICULTY_STARTING, ) if signage_point_index == constants.NUM_SPS_SUB_SLOT - constants.NUM_SP_INTERVALS_EXTRA - 1: # Finish the end of sub-slot and try again next sub-slot cc_vdf, cc_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), cc_challenge, constants.SUB_SLOT_ITERS_STARTING, ) rc_vdf, rc_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), rc_challenge, constants.SUB_SLOT_ITERS_STARTING, ) cc_slot = ChallengeChainSubSlot(cc_vdf, None, None, None, None) finished_sub_slots.append( EndOfSubSlotBundle( cc_slot, None, RewardChainSubSlot( rc_vdf, cc_slot.get_hash(), None, uint8(constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK), ), SubSlotProofs(cc_proof, None, rc_proof), ) ) if unfinished_block is not None: cc_ip_vdf, cc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), finished_sub_slots[-1].challenge_chain.get_hash(), ip_iters, ) rc_ip_vdf, rc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), finished_sub_slots[-1].reward_chain.get_hash(), ip_iters, ) total_iters_sp = uint128( sub_slot_total_iters + calculate_sp_iters( self.constants, self.constants.SUB_SLOT_ITERS_STARTING, unfinished_block.reward_chain_block.signage_point_index, ) ) return unfinished_block_to_full_block( unfinished_block, cc_ip_vdf, cc_ip_proof, rc_ip_vdf, rc_ip_proof, None, None, finished_sub_slots, None, BlockCache({}), total_iters_sp, constants.DIFFICULTY_STARTING, ) sub_slot_total_iters = uint128(sub_slot_total_iters + constants.SUB_SLOT_ITERS_STARTING) def get_pospaces_for_challenge( self, constants: ConsensusConstants, challenge_hash: bytes32, signage_point: bytes32, seed: bytes, difficulty: uint64, sub_slot_iters: uint64, ) -> List[Tuple[uint64, ProofOfSpace]]: found_proofs: List[Tuple[uint64, ProofOfSpace]] = [] plots: List[PlotInfo] = [ plot_info for _, plot_info in sorted(list(self.plots.items()), key=lambda x: str(x[0])) ] random.seed(seed) for plot_info in plots: plot_id = plot_info.prover.get_id() if ProofOfSpace.passes_plot_filter(constants, plot_id, challenge_hash, signage_point): new_challenge: bytes32 = ProofOfSpace.calculate_pos_challenge(plot_id, challenge_hash, signage_point) qualities = plot_info.prover.get_qualities_for_challenge(new_challenge) for proof_index, quality_str in enumerate(qualities): required_iters = calculate_iterations_quality( constants.DIFFICULTY_CONSTANT_FACTOR, quality_str, plot_info.prover.get_size(), difficulty, signage_point, ) if required_iters < calculate_sp_interval_iters(constants, sub_slot_iters): proof_xs: bytes = plot_info.prover.get_full_proof(new_challenge, proof_index) # Look up local_sk from plot to save locked memory ( pool_public_key_or_puzzle_hash, farmer_public_key, local_master_sk, ) = parse_plot_info(plot_info.prover.get_memo()) local_sk = master_sk_to_local_sk(local_master_sk) plot_pk = ProofOfSpace.generate_plot_public_key( local_sk.get_g1(), farmer_public_key, ) proof_of_space: ProofOfSpace = ProofOfSpace( new_challenge, plot_info.pool_public_key, plot_info.pool_contract_puzzle_hash, plot_pk, plot_info.prover.get_size(), proof_xs, ) found_proofs.append((required_iters, proof_of_space)) random_sample = found_proofs if len(found_proofs) >= 1: if random.random() < 0.1: # Removes some proofs of space to create "random" chains, based on the seed random_sample = random.sample(found_proofs, len(found_proofs) - 1) return random_sample def get_signage_point( constants: ConsensusConstants, blocks: BlockchainInterface, latest_block: Optional[BlockRecord], sub_slot_start_total_iters: uint128, signage_point_index: uint8, finished_sub_slots: List[EndOfSubSlotBundle], sub_slot_iters: uint64, normalized_to_identity_cc_sp: bool = False, ) -> SignagePoint: if signage_point_index == 0: return SignagePoint(None, None, None, None) sp_iters = calculate_sp_iters(constants, sub_slot_iters, signage_point_index) overflow = is_overflow_block(constants, signage_point_index) sp_total_iters = uint128( sub_slot_start_total_iters + calculate_sp_iters(constants, sub_slot_iters, signage_point_index) ) ( cc_vdf_challenge, rc_vdf_challenge, cc_vdf_input, rc_vdf_input, cc_vdf_iters, rc_vdf_iters, ) = get_signage_point_vdf_info( constants, finished_sub_slots, overflow, latest_block, blocks, sp_total_iters, sp_iters, ) cc_sp_vdf, cc_sp_proof = get_vdf_info_and_proof( constants, cc_vdf_input, cc_vdf_challenge, cc_vdf_iters, ) rc_sp_vdf, rc_sp_proof = get_vdf_info_and_proof( constants, rc_vdf_input, rc_vdf_challenge, rc_vdf_iters, ) cc_sp_vdf = replace(cc_sp_vdf, number_of_iterations=sp_iters) if normalized_to_identity_cc_sp: _, cc_sp_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), cc_sp_vdf.challenge, sp_iters, True, ) return SignagePoint(cc_sp_vdf, cc_sp_proof, rc_sp_vdf, rc_sp_proof) def finish_block( constants: ConsensusConstants, blocks: Dict[bytes32, BlockRecord], height_to_hash: Dict[uint32, bytes32], finished_sub_slots: List[EndOfSubSlotBundle], sub_slot_start_total_iters: uint128, signage_point_index: uint8, unfinished_block: UnfinishedBlock, required_iters: uint64, ip_iters: uint64, slot_cc_challenge: bytes32, slot_rc_challenge: bytes32, latest_block: BlockRecord, sub_slot_iters: uint64, difficulty: uint64, normalized_to_identity_cc_ip: bool = False, ) -> Tuple[FullBlock, BlockRecord]: is_overflow = is_overflow_block(constants, signage_point_index) cc_vdf_challenge = slot_cc_challenge if len(finished_sub_slots) == 0: new_ip_iters = unfinished_block.total_iters - latest_block.total_iters cc_vdf_input = latest_block.challenge_vdf_output rc_vdf_challenge = latest_block.reward_infusion_new_challenge else: new_ip_iters = ip_iters cc_vdf_input = ClassgroupElement.get_default_element() rc_vdf_challenge = slot_rc_challenge cc_ip_vdf, cc_ip_proof = get_vdf_info_and_proof( constants, cc_vdf_input, cc_vdf_challenge, new_ip_iters, ) cc_ip_vdf = replace(cc_ip_vdf, number_of_iterations=ip_iters) if normalized_to_identity_cc_ip: _, cc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), cc_ip_vdf.challenge, ip_iters, True, ) deficit = calculate_deficit( constants, uint32(latest_block.height + 1), latest_block, is_overflow, len(finished_sub_slots), ) icc_ip_vdf, icc_ip_proof = get_icc( constants, unfinished_block.total_iters, finished_sub_slots, latest_block, blocks, uint128(sub_slot_start_total_iters + sub_slot_iters) if is_overflow else sub_slot_start_total_iters, deficit, ) rc_ip_vdf, rc_ip_proof = get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), rc_vdf_challenge, new_ip_iters, ) assert unfinished_block is not None sp_total_iters = uint128( sub_slot_start_total_iters + calculate_sp_iters(constants, sub_slot_iters, signage_point_index) ) full_block: FullBlock = unfinished_block_to_full_block( unfinished_block, cc_ip_vdf, cc_ip_proof, rc_ip_vdf, rc_ip_proof, icc_ip_vdf, icc_ip_proof, finished_sub_slots, latest_block, BlockCache(blocks), sp_total_iters, difficulty, ) block_record = block_to_block_record(constants, BlockCache(blocks), required_iters, full_block, None) return full_block, block_record def get_challenges( constants: ConsensusConstants, blocks: Dict[uint32, BlockRecord], finished_sub_slots: List[EndOfSubSlotBundle], prev_header_hash: Optional[bytes32], ) -> Tuple[bytes32, bytes32]: if len(finished_sub_slots) == 0: if prev_header_hash is None: return constants.GENESIS_CHALLENGE, constants.GENESIS_CHALLENGE curr: BlockRecord = blocks[prev_header_hash] while not curr.first_in_sub_slot: curr = blocks[curr.prev_hash] assert curr.finished_challenge_slot_hashes is not None assert curr.finished_reward_slot_hashes is not None cc_challenge = curr.finished_challenge_slot_hashes[-1] rc_challenge = curr.finished_reward_slot_hashes[-1] else: cc_challenge = finished_sub_slots[-1].challenge_chain.get_hash() rc_challenge = finished_sub_slots[-1].reward_chain.get_hash() return cc_challenge, rc_challenge def get_plot_dir() -> Path: cache_path = Path(os.path.expanduser(os.getenv("CHIA_ROOT", "~/.chia/"))) / "test-plots" mkdir(cache_path) return cache_path def load_block_list( block_list: List[FullBlock], constants: ConsensusConstants ) -> Tuple[Dict[uint32, bytes32], uint64, Dict[uint32, BlockRecord]]: difficulty = 0 height_to_hash: Dict[uint32, bytes32] = {} blocks: Dict[uint32, BlockRecord] = {} for full_block in block_list: if full_block.height == 0: difficulty = uint64(constants.DIFFICULTY_STARTING) else: difficulty = full_block.weight - block_list[full_block.height - 1].weight if full_block.reward_chain_block.signage_point_index == 0: challenge = full_block.reward_chain_block.pos_ss_cc_challenge_hash sp_hash = challenge else: assert full_block.reward_chain_block.challenge_chain_sp_vdf is not None challenge = full_block.reward_chain_block.challenge_chain_sp_vdf.challenge sp_hash = full_block.reward_chain_block.challenge_chain_sp_vdf.output.get_hash() quality_str = full_block.reward_chain_block.proof_of_space.verify_and_get_quality_string( constants, challenge, sp_hash ) required_iters: uint64 = calculate_iterations_quality( constants.DIFFICULTY_CONSTANT_FACTOR, quality_str, full_block.reward_chain_block.proof_of_space.size, uint64(difficulty), sp_hash, ) blocks[full_block.header_hash] = block_to_block_record( constants, BlockCache(blocks), required_iters, full_block, None, ) height_to_hash[uint32(full_block.height)] = full_block.header_hash return height_to_hash, uint64(difficulty), blocks def get_icc( constants: ConsensusConstants, vdf_end_total_iters: uint128, finished_sub_slots: List[EndOfSubSlotBundle], latest_block: BlockRecord, blocks: Dict[bytes32, BlockRecord], sub_slot_start_total_iters: uint128, deficit: uint8, ) -> Tuple[Optional[VDFInfo], Optional[VDFProof]]: if len(finished_sub_slots) == 0: prev_deficit = latest_block.deficit else: prev_deficit = finished_sub_slots[-1].reward_chain.deficit if deficit == prev_deficit == constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK: # new slot / overflow sb to new slot / overflow sb return None, None if deficit == (prev_deficit - 1) == (constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK - 1): # new slot / overflow sb to challenge sb return None, None if len(finished_sub_slots) != 0: last_ss = finished_sub_slots[-1] assert last_ss.infused_challenge_chain is not None assert finished_sub_slots[-1].reward_chain.deficit <= (constants.MIN_BLOCKS_PER_CHALLENGE_BLOCK - 1) return get_vdf_info_and_proof( constants, ClassgroupElement.get_default_element(), last_ss.infused_challenge_chain.get_hash(), uint64(vdf_end_total_iters - sub_slot_start_total_iters), ) curr = latest_block # curr deficit is 0, 1, 2, 3, or 4 while not curr.is_challenge_block(constants) and not curr.first_in_sub_slot: curr = blocks[curr.prev_hash] icc_iters = uint64(vdf_end_total_iters - latest_block.total_iters) if latest_block.is_challenge_block(constants): icc_input: Optional[ClassgroupElement] = ClassgroupElement.get_default_element() else: icc_input = latest_block.infused_challenge_vdf_output assert icc_input is not None if curr.is_challenge_block(constants): # Deficit 4 icc_challenge_hash = curr.challenge_block_info_hash else: assert curr.finished_infused_challenge_slot_hashes is not None # First block in sub slot has deficit 0,1,2 or 3 icc_challenge_hash = curr.finished_infused_challenge_slot_hashes[-1] return get_vdf_info_and_proof( constants, icc_input, icc_challenge_hash, icc_iters, ) def get_full_block_and_block_record( constants: ConsensusConstants, blocks: Dict[uint32, BlockRecord], sub_slot_start_total_iters: uint128, signage_point_index: uint8, proof_of_space: ProofOfSpace, slot_cc_challenge: bytes32, slot_rc_challenge: bytes32, farmer_reward_puzzle_hash: bytes32, pool_target: PoolTarget, start_timestamp: uint64, start_height: uint32, time_per_block: float, block_generator: Optional[BlockGenerator], aggregate_signature: G2Element, additions: Optional[List[Coin]], removals: Optional[List[Coin]], height_to_hash: Dict[uint32, bytes32], difficulty: uint64, required_iters: uint64, sub_slot_iters: uint64, get_plot_signature: Callable[[bytes32, G1Element], G2Element], get_pool_signature: Callable[[PoolTarget, Optional[G1Element]], Optional[G2Element]], finished_sub_slots: List[EndOfSubSlotBundle], signage_point: SignagePoint, prev_block: BlockRecord, seed: bytes = b"", overflow_cc_challenge: bytes32 = None, overflow_rc_challenge: bytes32 = None, normalized_to_identity_cc_ip: bool = False, current_time: bool = False, ) -> Tuple[FullBlock, BlockRecord]: if current_time is True: if prev_block.timestamp is not None: timestamp = uint64(max(int(time.time()), prev_block.timestamp + int(time_per_block))) else: timestamp = uint64(int(time.time())) else: timestamp = uint64(start_timestamp + int((prev_block.height + 1 - start_height) * time_per_block)) sp_iters = calculate_sp_iters(constants, sub_slot_iters, signage_point_index) ip_iters = calculate_ip_iters(constants, sub_slot_iters, signage_point_index, required_iters) unfinished_block = create_unfinished_block( constants, sub_slot_start_total_iters, sub_slot_iters, signage_point_index, sp_iters, ip_iters, proof_of_space, slot_cc_challenge, farmer_reward_puzzle_hash, pool_target, get_plot_signature, get_pool_signature, signage_point, timestamp, BlockCache(blocks), seed, block_generator, aggregate_signature, additions, removals, prev_block, finished_sub_slots, ) if (overflow_cc_challenge is not None) and (overflow_rc_challenge is not None): slot_cc_challenge = overflow_cc_challenge slot_rc_challenge = overflow_rc_challenge full_block, block_record = finish_block( constants, blocks, height_to_hash, finished_sub_slots, sub_slot_start_total_iters, signage_point_index, unfinished_block, required_iters, ip_iters, slot_cc_challenge, slot_rc_challenge, prev_block, sub_slot_iters, difficulty, normalized_to_identity_cc_ip, ) return full_block, block_record