import asyncio import dataclasses import logging import random import time import traceback from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union import aiosqlite from blspy import AugSchemeMPL import chia.server.ws_connection as ws # lgtm [py/import-and-import-from] from chia.consensus.block_creation import unfinished_block_to_full_block from chia.consensus.block_record import BlockRecord from chia.consensus.blockchain import Blockchain, ReceiveBlockResult from chia.consensus.constants import ConsensusConstants from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary from chia.consensus.multiprocess_validation import PreValidationResult from chia.consensus.pot_iterations import calculate_sp_iters from chia.full_node.block_store import BlockStore from chia.full_node.bundle_tools import detect_potential_template_generator from chia.full_node.coin_store import CoinStore from chia.full_node.full_node_store import FullNodeStore from chia.full_node.mempool_manager import MempoolManager from chia.full_node.signage_point import SignagePoint from chia.full_node.sync_store import SyncStore from chia.full_node.weight_proof import WeightProofHandler from chia.protocols import farmer_protocol, full_node_protocol, timelord_protocol, wallet_protocol from chia.protocols.full_node_protocol import ( RejectBlocks, RequestBlocks, RespondBlock, RespondBlocks, RespondSignagePoint, ) from chia.protocols.protocol_message_types import ProtocolMessageTypes from chia.server.node_discovery import FullNodePeers from chia.server.outbound_message import Message, NodeType, make_msg from chia.server.server import ChiaServer from chia.types.blockchain_format.classgroup import ClassgroupElement from chia.types.blockchain_format.pool_target import PoolTarget from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary from chia.types.blockchain_format.vdf import CompressibleVDFField, VDFInfo, VDFProof from chia.types.end_of_slot_bundle import EndOfSubSlotBundle from chia.types.full_block import FullBlock from chia.types.header_block import HeaderBlock from chia.types.mempool_inclusion_status import MempoolInclusionStatus from chia.types.spend_bundle import SpendBundle from chia.types.unfinished_block import UnfinishedBlock from chia.util.bech32m import encode_puzzle_hash from chia.util.db_wrapper import DBWrapper from chia.util.errors import ConsensusError, Err from chia.util.ints import uint8, uint32, uint64, uint128 from chia.util.path import mkdir, path_from_root from chia.util.safe_cancel_task import cancel_task_safe from chia.util.profiler import profile_task class FullNode: block_store: BlockStore full_node_store: FullNodeStore full_node_peers: Optional[FullNodePeers] sync_store: Any coin_store: CoinStore mempool_manager: MempoolManager connection: aiosqlite.Connection _sync_task: Optional[asyncio.Task] _init_weight_proof: Optional[asyncio.Task] = None blockchain: Blockchain config: Dict server: Any log: logging.Logger constants: ConsensusConstants _shut_down: bool root_path: Path state_changed_callback: Optional[Callable] timelord_lock: asyncio.Lock initialized: bool weight_proof_handler: Optional[WeightProofHandler] def __init__( self, config: Dict, root_path: Path, consensus_constants: ConsensusConstants, name: str = None, ): self.initialized = False self.root_path = root_path self.config = config self.server = None self._shut_down = False # Set to true to close all infinite loops self.constants = consensus_constants self.pow_creation: Dict[uint32, asyncio.Event] = {} self.state_changed_callback: Optional[Callable] = None self.full_node_peers = None self.sync_store = None self.signage_point_times = [time.time() for _ in range(self.constants.NUM_SPS_SUB_SLOT)] self.full_node_store = FullNodeStore(self.constants) if name: self.log = logging.getLogger(name) else: self.log = logging.getLogger(__name__) db_path_replaced: str = config["database_path"].replace("CHALLENGE", config["selected_network"]) self.db_path = path_from_root(root_path, db_path_replaced) mkdir(self.db_path.parent) def _set_state_changed_callback(self, callback: Callable): self.state_changed_callback = callback async def _start(self): self.timelord_lock = asyncio.Lock() self.compact_vdf_sem = asyncio.Semaphore(4) self.new_peak_sem = asyncio.Semaphore(8) # create the store (db) and full node instance self.connection = await aiosqlite.connect(self.db_path) self.db_wrapper = DBWrapper(self.connection) self.block_store = await BlockStore.create(self.db_wrapper) self.sync_store = await SyncStore.create() self.coin_store = await CoinStore.create(self.db_wrapper) self.log.info("Initializing blockchain from disk") start_time = time.time() self.blockchain = await Blockchain.create(self.coin_store, self.block_store, self.constants) self.mempool_manager = MempoolManager(self.coin_store, self.constants) self.weight_proof_handler = None self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof()) if self.config.get("enable_profiler", False): asyncio.create_task(profile_task(self.root_path, "node", self.log)) self._sync_task = None self._segment_task = None time_taken = time.time() - start_time if self.blockchain.get_peak() is None: self.log.info(f"Initialized with empty blockchain time taken: {int(time_taken)}s") else: self.log.info( f"Blockchain initialized to peak {self.blockchain.get_peak().header_hash} height" f" {self.blockchain.get_peak().height}, " f"time taken: {int(time_taken)}s" ) pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_peak()) assert len(pending_tx) == 0 # no pending transactions when starting up peak: Optional[BlockRecord] = self.blockchain.get_peak() self.uncompact_task = None if peak is not None: full_peak = await self.blockchain.get_full_peak() await self.peak_post_processing(full_peak, peak, max(peak.height - 1, 0), None) if self.config["send_uncompact_interval"] != 0: sanitize_weight_proof_only = False if "sanitize_weight_proof_only" in self.config: sanitize_weight_proof_only = self.config["sanitize_weight_proof_only"] assert self.config["target_uncompact_proofs"] != 0 self.uncompact_task = asyncio.create_task( self.broadcast_uncompact_blocks( self.config["send_uncompact_interval"], self.config["target_uncompact_proofs"], sanitize_weight_proof_only, ) ) self.initialized = True if self.full_node_peers is not None: asyncio.create_task(self.full_node_peers.start()) async def initialize_weight_proof(self): self.weight_proof_handler = WeightProofHandler(self.constants, self.blockchain) peak = self.blockchain.get_peak() if peak is not None: await self.weight_proof_handler.create_sub_epoch_segments() def set_server(self, server: ChiaServer): self.server = server dns_servers = [] if "dns_servers" in self.config: dns_servers = self.config["dns_servers"] elif self.config["port"] == 8444: # If `dns_servers` misses from the `config`, hardcode it if we're running mainnet. dns_servers.append("dns-introducer.chia.net") try: self.full_node_peers = FullNodePeers( self.server, self.root_path, self.config["target_peer_count"] - self.config["target_outbound_peer_count"], self.config["target_outbound_peer_count"], self.config["peer_db_path"], self.config["introducer_peer"], dns_servers, self.config["peer_connect_interval"], self.log, ) except Exception as e: error_stack = traceback.format_exc() self.log.error(f"Exception: {e}") self.log.error(f"Exception in peer discovery: {e}") self.log.error(f"Exception Stack: {error_stack}") def _state_changed(self, change: str): if self.state_changed_callback is not None: self.state_changed_callback(change) async def short_sync_batch(self, peer: ws.WSChiaConnection, start_height: uint32, target_height: uint32) -> bool: """ Tries to sync to a chain which is not too far in the future, by downloading batches of blocks. If the first block that we download is not connected to our chain, we return False and do an expensive long sync instead. Long sync is not preferred because it requires downloading and validating a weight proof. Args: peer: peer to sync from start_height: height that we should start downloading at. (Our peak is higher) target_height: target to sync to Returns: False if the fork point was not found, and we need to do a long sync. True otherwise. """ # Don't trigger multiple batch syncs to the same peer if ( peer.peer_node_id in self.sync_store.backtrack_syncing and self.sync_store.backtrack_syncing[peer.peer_node_id] > 0 ): return True # Don't batch sync, we are already in progress of a backtrack sync if peer.peer_node_id in self.sync_store.batch_syncing: return True # Don't trigger a long sync self.sync_store.batch_syncing.add(peer.peer_node_id) self.log.info(f"Starting batch short sync from {start_height} to height {target_height}") if start_height > 0: first = await peer.request_block(full_node_protocol.RequestBlock(uint32(start_height), False)) if first is None or not isinstance(first, full_node_protocol.RespondBlock): self.sync_store.batch_syncing.remove(peer.peer_node_id) raise ValueError(f"Error short batch syncing, could not fetch block at height {start_height}") if not self.blockchain.contains_block(first.block.prev_header_hash): self.log.info("Batch syncing stopped, this is a deep chain") self.sync_store.batch_syncing.remove(peer.peer_node_id) # First sb not connected to our blockchain, do a long sync instead return False batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS if self._segment_task is not None and (not self._segment_task.done()): try: self._segment_task.cancel() except Exception as e: self.log.warning(f"failed to cancel segment task {e}") self._segment_task = None try: for height in range(start_height, target_height, batch_size): end_height = min(target_height, height + batch_size) request = RequestBlocks(uint32(height), uint32(end_height), True) response = await peer.request_blocks(request) if not response: raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}") async with self.blockchain.lock: success, advanced_peak, fork_height = await self.receive_block_batch(response.blocks, peer, None) if not success: raise ValueError(f"Error short batch syncing, failed to validate blocks {height}-{end_height}") if advanced_peak: peak = self.blockchain.get_peak() peak_fb: Optional[FullBlock] = await self.blockchain.get_full_peak() assert peak is not None and peak_fb is not None and fork_height is not None await self.peak_post_processing(peak_fb, peak, fork_height, peer) self.log.info(f"Added blocks {height}-{end_height}") except Exception: self.sync_store.batch_syncing.remove(peer.peer_node_id) raise self.sync_store.batch_syncing.remove(peer.peer_node_id) return True async def short_sync_backtrack( self, peer: ws.WSChiaConnection, peak_height: uint32, target_height: uint32, target_unf_hash: bytes32 ): """ Performs a backtrack sync, where blocks are downloaded one at a time from newest to oldest. If we do not find the fork point 5 deeper than our peak, we return False and do a long sync instead. Args: peer: peer to sync from peak_height: height of our peak target_height: target height target_unf_hash: partial hash of the unfinished block of the target Returns: True iff we found the fork point, and we do not need to long sync. """ try: if peer.peer_node_id not in self.sync_store.backtrack_syncing: self.sync_store.backtrack_syncing[peer.peer_node_id] = 0 self.sync_store.backtrack_syncing[peer.peer_node_id] += 1 unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block(target_unf_hash) curr_height: int = target_height found_fork_point = False responses = [] while curr_height > peak_height - 5: # If we already have the unfinished block, don't fetch the transactions. In the normal case, we will # already have the unfinished block, from when it was broadcast, so we just need to download the header, # but not the transactions fetch_tx: bool = unfinished_block is None or curr_height != target_height curr = await peer.request_block(full_node_protocol.RequestBlock(uint32(curr_height), fetch_tx)) if curr is None: raise ValueError(f"Failed to fetch block {curr_height} from {peer.get_peer_info()}, timed out") if curr is None or not isinstance(curr, full_node_protocol.RespondBlock): raise ValueError( f"Failed to fetch block {curr_height} from {peer.get_peer_info()}, wrong type {type(curr)}" ) responses.append(curr) if self.blockchain.contains_block(curr.block.prev_header_hash) or curr_height == 0: found_fork_point = True break curr_height -= 1 if found_fork_point: for response in reversed(responses): await self.respond_block(response, peer) except Exception as e: self.sync_store.backtrack_syncing[peer.peer_node_id] -= 1 raise e self.sync_store.backtrack_syncing[peer.peer_node_id] -= 1 return found_fork_point async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection): """ We have received a notification of a new peak from a peer. This happens either when we have just connected, or when the peer has updated their peak. Args: request: information about the new peak peer: peer that sent the message """ # Store this peak/peer combination in case we want to sync to it, and to keep track of peers self.sync_store.peer_has_block(request.header_hash, peer.peer_node_id, request.weight, request.height, True) if self.blockchain.contains_block(request.header_hash): return None # Not interested in less heavy peaks peak: Optional[BlockRecord] = self.blockchain.get_peak() curr_peak_height = uint32(0) if peak is None else peak.height if peak is not None and peak.weight > request.weight: return None if self.sync_store.get_sync_mode(): # If peer connects while we are syncing, check if they have the block we are syncing towards peak_sync_hash = self.sync_store.get_sync_target_hash() peak_sync_height = self.sync_store.get_sync_target_height() if peak_sync_hash is not None and request.header_hash != peak_sync_hash and peak_sync_height is not None: peak_peers: Set[bytes32] = self.sync_store.get_peers_that_have_peak([peak_sync_hash]) # Don't ask if we already know this peer has the peak if peer.peer_node_id not in peak_peers: target_peak_response: Optional[RespondBlock] = await peer.request_block( full_node_protocol.RequestBlock(uint32(peak_sync_height), False), timeout=10 ) if target_peak_response is not None and isinstance(target_peak_response, RespondBlock): self.sync_store.peer_has_block( peak_sync_hash, peer.peer_node_id, target_peak_response.block.weight, peak_sync_height, False, ) else: if request.height <= curr_peak_height + self.config["short_sync_blocks_behind_threshold"]: # This is the normal case of receiving the next block if await self.short_sync_backtrack( peer, curr_peak_height, request.height, request.unfinished_reward_block_hash ): return None if request.height < self.constants.WEIGHT_PROOF_RECENT_BLOCKS: # This is the case of syncing up more than a few blocks, at the start of the chain # TODO(almog): fix weight proofs so they work at the beginning as well self.log.debug("Doing batch sync, no backup") await self.short_sync_batch(peer, uint32(0), request.height) return None if request.height < curr_peak_height + self.config["sync_blocks_behind_threshold"]: # This case of being behind but not by so much if await self.short_sync_batch(peer, uint32(max(curr_peak_height - 6, 0)), request.height): return None # This is the either the case where we were not able to sync successfully (for example, due to the fork # point being in the past), or we are very far behind. Performs a long sync. self._sync_task = asyncio.create_task(self._sync()) async def send_peak_to_timelords( self, peak_block: Optional[FullBlock] = None, peer: Optional[ws.WSChiaConnection] = None ): """ Sends current peak to timelords """ if peak_block is None: peak_block = await self.blockchain.get_full_peak() if peak_block is not None: peak = self.blockchain.block_record(peak_block.header_hash) difficulty = self.blockchain.get_next_difficulty(peak.header_hash, False) ses: Optional[SubEpochSummary] = next_sub_epoch_summary( self.constants, self.blockchain, peak.required_iters, peak_block, True, ) recent_rc = self.blockchain.get_recent_reward_challenges() curr = peak while not curr.is_challenge_block(self.constants) and not curr.first_in_sub_slot: curr = self.blockchain.block_record(curr.prev_hash) if curr.is_challenge_block(self.constants): last_csb_or_eos = curr.total_iters else: last_csb_or_eos = curr.ip_sub_slot_total_iters(self.constants) curr = peak passed_ses_height_but_not_yet_included = True while (curr.height % self.constants.SUB_EPOCH_BLOCKS) != 0: if curr.sub_epoch_summary_included: passed_ses_height_but_not_yet_included = False curr = self.blockchain.block_record(curr.prev_hash) if curr.sub_epoch_summary_included or curr.height == 0: passed_ses_height_but_not_yet_included = False timelord_new_peak: timelord_protocol.NewPeakTimelord = timelord_protocol.NewPeakTimelord( peak_block.reward_chain_block, difficulty, peak.deficit, peak.sub_slot_iters, ses, recent_rc, last_csb_or_eos, passed_ses_height_but_not_yet_included, ) msg = make_msg(ProtocolMessageTypes.new_peak_timelord, timelord_new_peak) if peer is None: await self.server.send_to_all([msg], NodeType.TIMELORD) else: await self.server.send_to_specific([msg], peer.peer_node_id) async def synced(self) -> bool: curr: Optional[BlockRecord] = self.blockchain.get_peak() if curr is None: return False while curr is not None and not curr.is_transaction_block: curr = self.blockchain.try_block_record(curr.prev_hash) now = time.time() if ( curr is None or curr.timestamp is None or curr.timestamp < uint64(int(now - 60 * 7)) or self.sync_store.get_sync_mode() ): return False else: return True async def on_connect(self, connection: ws.WSChiaConnection): """ Whenever we connect to another node / wallet, send them our current heads. Also send heads to farmers and challenges to timelords. """ self._state_changed("add_connection") self._state_changed("sync_mode") if self.full_node_peers is not None: asyncio.create_task(self.full_node_peers.on_connect(connection)) if self.initialized is False: return None if connection.connection_type is NodeType.FULL_NODE: # Send filter to node and request mempool items that are not in it (Only if we are currently synced) synced = await self.synced() peak_height = self.blockchain.get_peak_height() current_time = int(time.time()) if synced and peak_height is not None and current_time > self.constants.INITIAL_FREEZE_END_TIMESTAMP: my_filter = self.mempool_manager.get_filter() mempool_request = full_node_protocol.RequestMempoolTransactions(my_filter) msg = make_msg(ProtocolMessageTypes.request_mempool_transactions, mempool_request) await connection.send_message(msg) peak_full: Optional[FullBlock] = await self.blockchain.get_full_peak() if peak_full is not None: peak: BlockRecord = self.blockchain.block_record(peak_full.header_hash) if connection.connection_type is NodeType.FULL_NODE: request_node = full_node_protocol.NewPeak( peak.header_hash, peak.height, peak.weight, peak.height, peak_full.reward_chain_block.get_unfinished().get_hash(), ) await connection.send_message(make_msg(ProtocolMessageTypes.new_peak, request_node)) elif connection.connection_type is NodeType.WALLET: # If connected to a wallet, send the Peak request_wallet = wallet_protocol.NewPeakWallet( peak.header_hash, peak.height, peak.weight, peak.height, ) await connection.send_message(make_msg(ProtocolMessageTypes.new_peak_wallet, request_wallet)) elif connection.connection_type is NodeType.TIMELORD: await self.send_peak_to_timelords() def on_disconnect(self, connection: ws.WSChiaConnection): self.log.info(f"peer disconnected {connection.get_peer_info()}") self._state_changed("close_connection") self._state_changed("sync_mode") if self.sync_store is not None: self.sync_store.peer_disconnected(connection.peer_node_id) def _num_needed_peers(self) -> int: assert self.server is not None assert self.server.all_connections is not None diff = self.config["target_peer_count"] - len(self.server.all_connections) return diff if diff >= 0 else 0 def _close(self): self._shut_down = True if self._init_weight_proof is not None: self._init_weight_proof.cancel() if self.blockchain is not None: self.blockchain.shut_down() if self.mempool_manager is not None: self.mempool_manager.shut_down() if self.full_node_peers is not None: asyncio.create_task(self.full_node_peers.close()) if self.uncompact_task is not None: self.uncompact_task.cancel() async def _await_closed(self): cancel_task_safe(self._sync_task, self.log) for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()): cancel_task_safe(task, self.log) await self.connection.close() if self._init_weight_proof is not None: await asyncio.wait([self._init_weight_proof]) async def _sync(self): """ Performs a full sync of the blockchain up to the peak. - Wait a few seconds for peers to send us their peaks - Select the heaviest peak, and request a weight proof from a peer with that peak - Validate the weight proof, and disconnect from the peer if invalid - Find the fork point to see where to start downloading blocks - Download blocks in batch (and in parallel) and verify them one at a time - Disconnect peers that provide invalid blocks or don't have the blocks """ if self.weight_proof_handler is None: return None # Ensure we are only syncing once and not double calling this method if self.sync_store.get_sync_mode(): return None if self.sync_store.get_long_sync(): self.log.debug("already in long sync") return None self.sync_store.set_long_sync(True) self.log.debug("long sync started") try: self.log.info("Starting to perform sync.") self.log.info("Waiting to receive peaks from peers.") # Wait until we have 3 peaks or up to a max of 30 seconds peaks = [] for i in range(300): peaks = [tup[0] for tup in self.sync_store.get_peak_of_each_peer().values()] if len(self.sync_store.get_peers_that_have_peak(peaks)) < 3: if self._shut_down: return None await asyncio.sleep(0.1) self.log.info(f"Collected a total of {len(peaks)} peaks.") self.sync_peers_handler = None # Based on responses from peers about the current peaks, see which peak is the heaviest # (similar to longest chain rule). target_peak = self.sync_store.get_heaviest_peak() if target_peak is None: raise RuntimeError("Not performing sync, no peaks collected") heaviest_peak_hash, heaviest_peak_height, heaviest_peak_weight = target_peak self.sync_store.set_peak_target(heaviest_peak_hash, heaviest_peak_height) self.log.info(f"Selected peak {heaviest_peak_height}, {heaviest_peak_hash}") # Check which peers are updated to this height peers = [] coroutines = [] for peer in self.server.all_connections.values(): if peer.connection_type == NodeType.FULL_NODE: peers.append(peer.peer_node_id) coroutines.append( peer.request_block( full_node_protocol.RequestBlock(uint32(heaviest_peak_height), True), timeout=10 ) ) for i, target_peak_response in enumerate(await asyncio.gather(*coroutines)): if target_peak_response is not None and isinstance(target_peak_response, RespondBlock): self.sync_store.peer_has_block( heaviest_peak_hash, peers[i], heaviest_peak_weight, heaviest_peak_height, False ) # TODO: disconnect from peer which gave us the heaviest_peak, if nobody has the peak peer_ids: Set[bytes32] = self.sync_store.get_peers_that_have_peak([heaviest_peak_hash]) peers_with_peak: List = [c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids] # Request weight proof from a random peer self.log.info(f"Total of {len(peers_with_peak)} peers with peak {heaviest_peak_height}") weight_proof_peer = random.choice(peers_with_peak) self.log.info( f"Requesting weight proof from peer {weight_proof_peer.peer_host} up to height" f" {heaviest_peak_height}" ) if self.blockchain.get_peak() is not None and heaviest_peak_weight <= self.blockchain.get_peak().weight: raise ValueError("Not performing sync, already caught up.") wp_timeout = 360 if "weight_proof_timeout" in self.config: wp_timeout = self.config["weight_proof_timeout"] self.log.debug(f"weight proof timeout is {wp_timeout} sec") request = full_node_protocol.RequestProofOfWeight(heaviest_peak_height, heaviest_peak_hash) response = await weight_proof_peer.request_proof_of_weight(request, timeout=wp_timeout) # Disconnect from this peer, because they have not behaved properly if response is None or not isinstance(response, full_node_protocol.RespondProofOfWeight): await weight_proof_peer.close(600) raise RuntimeError(f"Weight proof did not arrive in time from peer: {weight_proof_peer.peer_host}") if response.wp.recent_chain_data[-1].reward_chain_block.height != heaviest_peak_height: await weight_proof_peer.close(600) raise RuntimeError(f"Weight proof had the wrong height: {weight_proof_peer.peer_host}") if response.wp.recent_chain_data[-1].reward_chain_block.weight != heaviest_peak_weight: await weight_proof_peer.close(600) raise RuntimeError(f"Weight proof had the wrong weight: {weight_proof_peer.peer_host}") # dont sync to wp if local peak is heavier, # dont ban peer, we asked for this peak current_peak = self.blockchain.get_peak() if current_peak is not None: if response.wp.recent_chain_data[-1].reward_chain_block.weight <= current_peak.weight: raise RuntimeError(f"current peak is heavier than Weight proof peek: {weight_proof_peer.peer_host}") try: validated, fork_point, summaries = await self.weight_proof_handler.validate_weight_proof(response.wp) except Exception as e: await weight_proof_peer.close(600) raise ValueError(f"Weight proof validation threw an error {e}") if not validated: await weight_proof_peer.close(600) raise ValueError("Weight proof validation failed") self.log.info(f"Re-checked peers: total of {len(peers_with_peak)} peers with peak {heaviest_peak_height}") self.sync_store.set_sync_mode(True) self._state_changed("sync_mode") # Ensures that the fork point does not change async with self.blockchain.lock: await self.blockchain.warmup(fork_point) await self.sync_from_fork_point(fork_point, heaviest_peak_height, heaviest_peak_hash, summaries) except asyncio.CancelledError: self.log.warning("Syncing failed, CancelledError") except Exception as e: tb = traceback.format_exc() self.log.error(f"Error with syncing: {type(e)}{tb}") finally: if self._shut_down: return None await self._finish_sync() async def sync_from_fork_point( self, fork_point_height: int, target_peak_sb_height: uint32, peak_hash: bytes32, summaries: List[SubEpochSummary], ): self.log.info(f"Start syncing from fork point at {fork_point_height} up to {target_peak_sb_height}") peer_ids: Set[bytes32] = self.sync_store.get_peers_that_have_peak([peak_hash]) peers_with_peak: List = [c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids] if len(peers_with_peak) == 0: raise RuntimeError(f"Not syncing, no peers with header_hash {peak_hash} ") advanced_peak = False batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS our_peak_height = self.blockchain.get_peak_height() ses_heigths = self.blockchain.get_ses_heights() if len(ses_heigths) > 2 and our_peak_height is not None: ses_heigths.sort() max_fork_ses_height = ses_heigths[-3] # This is fork point in SES in case where fork was not detected if self.blockchain.get_peak_height() is not None and fork_point_height == max_fork_ses_height: for peer in peers_with_peak: # Grab a block at peak + 1 and check if fork point is actually our current height block_response: Optional[Any] = await peer.request_block( full_node_protocol.RequestBlock(uint32(our_peak_height + 1), True) ) if block_response is not None and isinstance(block_response, full_node_protocol.RespondBlock): peak = self.blockchain.get_peak() if peak is not None and block_response.block.prev_header_hash == peak.header_hash: fork_point_height = our_peak_height break for i in range(fork_point_height, target_peak_sb_height, batch_size): start_height = i end_height = min(target_peak_sb_height, start_height + batch_size) request = RequestBlocks(uint32(start_height), uint32(end_height), True) self.log.info(f"Requesting blocks: {start_height} to {end_height}") batch_added = False to_remove = [] for peer in peers_with_peak: if peer.closed: to_remove.append(peer) continue response = await peer.request_blocks(request, timeout=60) if response is None: await peer.close() to_remove.append(peer) continue if isinstance(response, RejectBlocks): to_remove.append(peer) continue elif isinstance(response, RespondBlocks): success, advanced_peak, _ = await self.receive_block_batch( response.blocks, peer, None if advanced_peak else uint32(fork_point_height), summaries ) if success is False: await peer.close(600) continue else: batch_added = True break peak = self.blockchain.get_peak() assert peak is not None msg = make_msg( ProtocolMessageTypes.new_peak_wallet, wallet_protocol.NewPeakWallet( peak.header_hash, peak.height, peak.weight, uint32(max(peak.height - 1, uint32(0))), ), ) await self.server.send_to_all([msg], NodeType.WALLET) for peer in to_remove: peers_with_peak.remove(peer) if self.sync_store.peers_changed.is_set(): peer_ids = self.sync_store.get_peers_that_have_peak([peak_hash]) peers_with_peak = [c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids] self.log.info(f"Number of peers we are syncing from: {len(peers_with_peak)}") self.sync_store.peers_changed.clear() if batch_added is False: self.log.info(f"Failed to fetch blocks {start_height} to {end_height} from peers: {peers_with_peak}") break else: self.log.info(f"Added blocks {start_height} to {end_height}") self.blockchain.clean_block_record( min( end_height - self.constants.BLOCKS_CACHE_SIZE, peak.height - self.constants.BLOCKS_CACHE_SIZE, ) ) async def receive_block_batch( self, all_blocks: List[FullBlock], peer: ws.WSChiaConnection, fork_point: Optional[uint32], wp_summaries: Optional[List[SubEpochSummary]] = None, ) -> Tuple[bool, bool, Optional[uint32]]: advanced_peak = False fork_height: Optional[uint32] = uint32(0) blocks_to_validate: List[FullBlock] = [] for i, block in enumerate(all_blocks): if not self.blockchain.contains_block(block.header_hash): blocks_to_validate = all_blocks[i:] break if len(blocks_to_validate) == 0: return True, False, fork_height pre_validate_start = time.time() pre_validation_results: Optional[ List[PreValidationResult] ] = await self.blockchain.pre_validate_blocks_multiprocessing(blocks_to_validate, {}) self.log.debug(f"Block pre-validation time: {time.time() - pre_validate_start}") if pre_validation_results is None: return False, False, None for i, block in enumerate(blocks_to_validate): if pre_validation_results[i].error is not None: self.log.error( f"Invalid block from peer: {peer.get_peer_info()} {Err(pre_validation_results[i].error)}" ) return False, advanced_peak, fork_height for i, block in enumerate(blocks_to_validate): assert pre_validation_results[i].required_iters is not None (result, error, fork_height,) = await self.blockchain.receive_block( block, pre_validation_results[i], None if advanced_peak else fork_point, wp_summaries ) if result == ReceiveBlockResult.NEW_PEAK: advanced_peak = True elif result == ReceiveBlockResult.INVALID_BLOCK or result == ReceiveBlockResult.DISCONNECTED_BLOCK: if error is not None: self.log.error(f"Error: {error}, Invalid block from peer: {peer.get_peer_info()} ") return False, advanced_peak, fork_height block_record = self.blockchain.block_record(block.header_hash) if block_record.sub_epoch_summary_included is not None: if self.weight_proof_handler is not None: await self.weight_proof_handler.create_prev_sub_epoch_segments() if advanced_peak: self._state_changed("new_peak") self.log.debug( f"Total time for {len(blocks_to_validate)} blocks: {time.time() - pre_validate_start}, " f"advanced: {advanced_peak}" ) return True, advanced_peak, fork_height async def _finish_sync(self): """ Finalize sync by setting sync mode to False, clearing all sync information, and adding any final blocks that we have finalized recently. """ self.log.info("long sync done") self.sync_store.set_long_sync(False) self.sync_store.set_sync_mode(False) self._state_changed("sync_mode") if self.server is None: return None peak: Optional[BlockRecord] = self.blockchain.get_peak() async with self.blockchain.lock: await self.sync_store.clear_sync_info() peak_fb: FullBlock = await self.blockchain.get_full_peak() if peak is not None: await self.peak_post_processing(peak_fb, peak, peak.height - 1, None) if peak is not None and self.weight_proof_handler is not None: await self.weight_proof_handler.get_proof_of_weight(peak.header_hash) self._state_changed("block") def has_valid_pool_sig(self, block: Union[UnfinishedBlock, FullBlock]): if ( block.foliage.foliage_block_data.pool_target == PoolTarget(self.constants.GENESIS_PRE_FARM_POOL_PUZZLE_HASH, uint32(0)) and block.foliage.prev_block_hash != self.constants.GENESIS_CHALLENGE and block.reward_chain_block.proof_of_space.pool_public_key is not None ): if not AugSchemeMPL.verify( block.reward_chain_block.proof_of_space.pool_public_key, bytes(block.foliage.foliage_block_data.pool_target), block.foliage.foliage_block_data.pool_signature, ): return False return True async def signage_point_post_processing( self, request: full_node_protocol.RespondSignagePoint, peer: ws.WSChiaConnection, ip_sub_slot: Optional[EndOfSubSlotBundle], ): self.log.info( f"⏲️ Finished signage point {request.index_from_challenge}/" f"{self.constants.NUM_SPS_SUB_SLOT}: " f"CC: {request.challenge_chain_vdf.output.get_hash()} " f"RC: {request.reward_chain_vdf.output.get_hash()} " ) self.signage_point_times[request.index_from_challenge] = time.time() sub_slot_tuple = self.full_node_store.get_sub_slot(request.challenge_chain_vdf.challenge) if sub_slot_tuple is not None: prev_challenge = sub_slot_tuple[0].challenge_chain.challenge_chain_end_of_slot_vdf.challenge else: prev_challenge = None # Notify nodes of the new signage point broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( prev_challenge, request.challenge_chain_vdf.challenge, request.index_from_challenge, request.reward_chain_vdf.challenge, ) msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast) await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) peak = self.blockchain.get_peak() if peak is not None and peak.height > self.constants.MAX_SUB_SLOT_BLOCKS: sub_slot_iters = peak.sub_slot_iters difficulty = uint64(peak.weight - self.blockchain.block_record(peak.prev_hash).weight) # Makes sure to potentially update the difficulty if we are past the peak (into a new sub-slot) assert ip_sub_slot is not None if request.challenge_chain_vdf.challenge != ip_sub_slot.challenge_chain.get_hash(): next_difficulty = self.blockchain.get_next_difficulty(peak.header_hash, True) next_sub_slot_iters = self.blockchain.get_next_slot_iters(peak.header_hash, True) difficulty = next_difficulty sub_slot_iters = next_sub_slot_iters else: difficulty = self.constants.DIFFICULTY_STARTING sub_slot_iters = self.constants.SUB_SLOT_ITERS_STARTING # Notify farmers of the new signage point broadcast_farmer = farmer_protocol.NewSignagePoint( request.challenge_chain_vdf.challenge, request.challenge_chain_vdf.output.get_hash(), request.reward_chain_vdf.output.get_hash(), difficulty, sub_slot_iters, request.index_from_challenge, ) msg = make_msg(ProtocolMessageTypes.new_signage_point, broadcast_farmer) await self.server.send_to_all([msg], NodeType.FARMER) async def peak_post_processing( self, block: FullBlock, record: BlockRecord, fork_height: uint32, peer: Optional[ws.WSChiaConnection] ): """ Must be called under self.blockchain.lock. This updates the internal state of the full node with the latest peak information. It also notifies peers about the new peak. """ difficulty = self.blockchain.get_next_difficulty(record.header_hash, False) sub_slot_iters = self.blockchain.get_next_slot_iters(record.header_hash, False) self.log.info( f"🌱 Updated peak to height {record.height}, weight {record.weight}, " f"hh {record.header_hash}, " f"forked at {fork_height}, rh: {record.reward_infusion_new_challenge}, " f"total iters: {record.total_iters}, " f"overflow: {record.overflow}, " f"deficit: {record.deficit}, " f"difficulty: {difficulty}, " f"sub slot iters: {sub_slot_iters}, " f"Generator size: " f"{len(bytes(block.transactions_generator)) if block.transactions_generator else 'No tx'}, " f"Generator ref list size: " f"{len(block.transactions_generator_ref_list) if block.transactions_generator else 'No tx'}" ) sub_slots = await self.blockchain.get_sp_and_ip_sub_slots(record.header_hash) assert sub_slots is not None if not self.sync_store.get_sync_mode(): self.blockchain.clean_block_records() fork_block: Optional[BlockRecord] = None if fork_height != block.height - 1 and block.height != 0: # This is a reorg fork_block = self.blockchain.block_record(self.blockchain.height_to_hash(fork_height)) added_eos, new_sps, new_ips = self.full_node_store.new_peak( record, block, sub_slots[0], sub_slots[1], fork_block, self.blockchain, ) if sub_slots[1] is None: assert record.ip_sub_slot_total_iters(self.constants) == 0 # Ensure the signage point is also in the store, for consistency self.full_node_store.new_signage_point( record.signage_point_index, self.blockchain, record, record.sub_slot_iters, SignagePoint( block.reward_chain_block.challenge_chain_sp_vdf, block.challenge_chain_sp_proof, block.reward_chain_block.reward_chain_sp_vdf, block.reward_chain_sp_proof, ), skip_vdf_validation=True, ) # Update the mempool (returns successful pending transactions added to the mempool) for bundle, result, spend_name in await self.mempool_manager.new_peak(self.blockchain.get_peak()): self.log.debug(f"Added transaction to mempool: {spend_name}") mempool_item = self.mempool_manager.get_mempool_item(spend_name) assert mempool_item is not None fees = mempool_item.fee assert fees >= 0 assert mempool_item.cost is not None new_tx = full_node_protocol.NewTransaction( spend_name, mempool_item.cost, uint64(bundle.fees()), ) msg = make_msg(ProtocolMessageTypes.new_transaction, new_tx) await self.server.send_to_all([msg], NodeType.FULL_NODE) # If there were pending end of slots that happen after this peak, broadcast them if they are added if added_eos is not None: broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( added_eos.challenge_chain.challenge_chain_end_of_slot_vdf.challenge, added_eos.challenge_chain.get_hash(), uint8(0), added_eos.reward_chain.end_of_slot_vdf.challenge, ) msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast) await self.server.send_to_all([msg], NodeType.FULL_NODE) if new_sps is not None and peer is not None: for index, sp in new_sps: assert ( sp.cc_vdf is not None and sp.cc_proof is not None and sp.rc_vdf is not None and sp.rc_proof is not None ) await self.signage_point_post_processing( RespondSignagePoint(index, sp.cc_vdf, sp.cc_proof, sp.rc_vdf, sp.rc_proof), peer, sub_slots[1] ) # TODO: maybe add and broadcast new IPs as well if record.height % 1000 == 0: # Occasionally clear data in full node store to keep memory usage small self.full_node_store.clear_seen_unfinished_blocks() self.full_node_store.clear_old_cache_entries() if self.sync_store.get_sync_mode() is False: await self.send_peak_to_timelords(block) # Tell full nodes about the new peak msg = make_msg( ProtocolMessageTypes.new_peak, full_node_protocol.NewPeak( record.header_hash, record.height, record.weight, fork_height, block.reward_chain_block.get_unfinished().get_hash(), ), ) if peer is not None: await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) else: await self.server.send_to_all([msg], NodeType.FULL_NODE) # Tell wallets about the new peak msg = make_msg( ProtocolMessageTypes.new_peak_wallet, wallet_protocol.NewPeakWallet( record.header_hash, record.height, record.weight, fork_height, ), ) await self.server.send_to_all([msg], NodeType.WALLET) # Check if we detected a spent transaction, to load up our generator cache if block.transactions_generator is not None and self.full_node_store.previous_generator is None: generator_arg = detect_potential_template_generator(block.height, block.transactions_generator) if generator_arg: self.log.info(f"Saving previous generator for height {block.height}") self.full_node_store.previous_generator = generator_arg self._state_changed("new_peak") async def respond_block( self, respond_block: full_node_protocol.RespondBlock, peer: Optional[ws.WSChiaConnection] = None, ) -> Optional[Message]: """ Receive a full block from a peer full node (or ourselves). """ block: FullBlock = respond_block.block if self.sync_store.get_sync_mode(): return None # Adds the block to seen, and check if it's seen before (which means header is in memory) header_hash = block.header_hash if self.blockchain.contains_block(header_hash): return None pre_validation_result: Optional[PreValidationResult] = None if ( block.is_transaction_block() and block.transactions_info is not None and block.transactions_info.generator_root != bytes([0] * 32) and block.transactions_generator is None ): # This is the case where we already had the unfinished block, and asked for this block without # the transactions (since we already had them). Therefore, here we add the transactions. unfinished_rh: bytes32 = block.reward_chain_block.get_unfinished().get_hash() unf_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block(unfinished_rh) if ( unf_block is not None and unf_block.transactions_generator is not None and unf_block.foliage_transaction_block == block.foliage_transaction_block ): pre_validation_result = self.full_node_store.get_unfinished_block_result(unfinished_rh) assert pre_validation_result is not None block = dataclasses.replace( block, transactions_generator=unf_block.transactions_generator, transactions_generator_ref_list=unf_block.transactions_generator_ref_list, ) else: # We still do not have the correct information for this block, perhaps there is a duplicate block # with the same unfinished block hash in the cache, so we need to fetch the correct one if peer is None: return None block_response: Optional[Any] = await peer.request_block( full_node_protocol.RequestBlock(block.height, True) ) if block_response is None or not isinstance(block_response, full_node_protocol.RespondBlock): self.log.warning( f"Was not able to fetch the correct block for height {block.height} {block_response}" ) return None new_block: FullBlock = block_response.block if new_block.foliage_transaction_block != block.foliage_transaction_block: self.log.warning(f"Received the wrong block for height {block.height} {new_block.header_hash}") return None assert new_block.transactions_generator is not None self.log.debug( f"Wrong info in the cache for bh {new_block.header_hash}, there might be multiple blocks from the " f"same farmer with the same pospace." ) # This recursion ends here, we cannot recurse again because transactions_generator is not None return await self.respond_block(block_response, peer) async with self.blockchain.lock: # After acquiring the lock, check again, because another asyncio thread might have added it if self.blockchain.contains_block(header_hash): return None validation_start = time.time() # Tries to add the block to the blockchain, if we already validated transactions, don't do it again npc_results = {} if pre_validation_result is not None and pre_validation_result.npc_result is not None: npc_results[block.height] = pre_validation_result.npc_result pre_validation_results: Optional[ List[PreValidationResult] ] = await self.blockchain.pre_validate_blocks_multiprocessing([block], npc_results) if pre_validation_results is None: raise ValueError(f"Failed to validate block {header_hash} height {block.height}") if pre_validation_results[0].error is not None: if Err(pre_validation_results[0].error) == Err.INVALID_PREV_BLOCK_HASH: added: ReceiveBlockResult = ReceiveBlockResult.DISCONNECTED_BLOCK error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH fork_height: Optional[uint32] = None else: raise ValueError( f"Failed to validate block {header_hash} height " f"{block.height}: {Err(pre_validation_results[0].error).name}" ) else: result_to_validate = ( pre_validation_results[0] if pre_validation_result is None else pre_validation_result ) assert result_to_validate.required_iters == pre_validation_results[0].required_iters added, error_code, fork_height = await self.blockchain.receive_block(block, result_to_validate, None) if ( self.full_node_store.previous_generator is not None and fork_height is not None and fork_height < self.full_node_store.previous_generator.block_height ): self.full_node_store.previous_generator = None validation_time = time.time() - validation_start if added == ReceiveBlockResult.ALREADY_HAVE_BLOCK: return None elif added == ReceiveBlockResult.INVALID_BLOCK: assert error_code is not None self.log.error(f"Block {header_hash} at height {block.height} is invalid with code {error_code}.") raise ConsensusError(error_code, header_hash) elif added == ReceiveBlockResult.DISCONNECTED_BLOCK: self.log.info(f"Disconnected block {header_hash} at height {block.height}") return None elif added == ReceiveBlockResult.NEW_PEAK: # Only propagate blocks which extend the blockchain (becomes one of the heads) new_peak: Optional[BlockRecord] = self.blockchain.get_peak() assert new_peak is not None and fork_height is not None await self.peak_post_processing(block, new_peak, fork_height, peer) elif added == ReceiveBlockResult.ADDED_AS_ORPHAN: self.log.info( f"Received orphan block of height {block.height} rh " f"{block.reward_chain_block.get_hash()}" ) else: # Should never reach here, all the cases are covered raise RuntimeError(f"Invalid result from receive_block {added}") percent_full_str = ( ( ", percent full: " + str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3)) + "%" ) if block.transactions_info is not None else "" ) self.log.info( f"Block validation time: {validation_time}, " f"cost: {block.transactions_info.cost if block.transactions_info is not None else 'None'}" f"{percent_full_str}" ) # This code path is reached if added == ADDED_AS_ORPHAN or NEW_TIP peak = self.blockchain.get_peak() assert peak is not None # Removes all temporary data for old blocks clear_height = uint32(max(0, peak.height - 50)) self.full_node_store.clear_candidate_blocks_below(clear_height) self.full_node_store.clear_unfinished_blocks_below(clear_height) if peak.height % 1000 == 0 and not self.sync_store.get_sync_mode(): await self.sync_store.clear_sync_info() # Occasionally clear sync peer info self._state_changed("block") record = self.blockchain.block_record(block.header_hash) if self.weight_proof_handler is not None and record.sub_epoch_summary_included is not None: if self._segment_task is None or self._segment_task.done(): self._segment_task = asyncio.create_task(self.weight_proof_handler.create_prev_sub_epoch_segments()) return None async def respond_unfinished_block( self, respond_unfinished_block: full_node_protocol.RespondUnfinishedBlock, peer: Optional[ws.WSChiaConnection], farmed_block: bool = False, ): """ We have received an unfinished block, either created by us, or from another peer. We can validate it and if it's a good block, propagate it to other peers and timelords. """ block = respond_unfinished_block.unfinished_block if block.prev_header_hash != self.constants.GENESIS_CHALLENGE and not self.blockchain.contains_block( block.prev_header_hash ): # No need to request the parent, since the peer will send it to us anyway, via NewPeak self.log.debug("Received a disconnected unfinished block") return None # Adds the unfinished block to seen, and check if it's seen before, to prevent # processing it twice. This searches for the exact version of the unfinished block (there can be many different # foliages for the same trunk). This is intentional, to prevent DOS attacks. # Note that it does not require that this block was successfully processed if self.full_node_store.seen_unfinished_block(block.get_hash()): return None block_hash = block.reward_chain_block.get_hash() # This searched for the trunk hash (unfinished reward hash). If we have already added a block with the same # hash, return if self.full_node_store.get_unfinished_block(block_hash) is not None: return None peak: Optional[BlockRecord] = self.blockchain.get_peak() if peak is not None: if block.total_iters < peak.sp_total_iters(self.constants): # This means this unfinished block is pretty far behind, it will not add weight to our chain return None if block.prev_header_hash == self.constants.GENESIS_CHALLENGE: prev_b = None else: prev_b = self.blockchain.block_record(block.prev_header_hash) # Count the blocks in sub slot, and check if it's a new epoch if len(block.finished_sub_slots) > 0: num_blocks_in_ss = 1 # Curr else: curr = self.blockchain.try_block_record(block.prev_header_hash) num_blocks_in_ss = 2 # Curr and prev while (curr is not None) and not curr.first_in_sub_slot: curr = self.blockchain.try_block_record(curr.prev_hash) num_blocks_in_ss += 1 if num_blocks_in_ss > self.constants.MAX_SUB_SLOT_BLOCKS: # TODO: potentially allow overflow blocks here, which count for the next slot self.log.warning("Too many blocks added, not adding block") return None async with self.blockchain.lock: # TODO: pre-validate VDFs outside of lock validation_start = time.time() validate_result = await self.blockchain.validate_unfinished_block(block) if validate_result.error is not None: if validate_result.error == Err.COIN_AMOUNT_NEGATIVE.value: # TODO: remove in the future, hotfix for 1.1.5 peers to not disconnect older peers self.log.info(f"Consensus error {validate_result.error}, not disconnecting") return raise ConsensusError(Err(validate_result.error)) validation_time = time.time() - validation_start assert validate_result.required_iters is not None # Perform another check, in case we have already concurrently added the same unfinished block if self.full_node_store.get_unfinished_block(block_hash) is not None: return None if block.prev_header_hash == self.constants.GENESIS_CHALLENGE: height = uint32(0) else: height = uint32(self.blockchain.block_record(block.prev_header_hash).height + 1) ses: Optional[SubEpochSummary] = next_sub_epoch_summary( self.constants, self.blockchain, validate_result.required_iters, block, True, ) self.full_node_store.add_unfinished_block(height, block, validate_result) if farmed_block is True: self.log.info( f"🍀 ️Farmed unfinished_block {block_hash}, SP: {block.reward_chain_block.signage_point_index}, " f"validation time: {validation_time}, " f"cost: {block.transactions_info.cost if block.transactions_info else 'None'}" ) else: percent_full_str = ( ( ", percent full: " + str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3)) + "%" ) if block.transactions_info is not None else "" ) self.log.info( f"Added unfinished_block {block_hash}, not farmed by us," f" SP: {block.reward_chain_block.signage_point_index} farmer response time: " f"{time.time() - self.signage_point_times[block.reward_chain_block.signage_point_index]}, " f"Pool pk {encode_puzzle_hash(block.foliage.foliage_block_data.pool_target.puzzle_hash, 'xch')}, " f"validation time: {validation_time}, " f"cost: {block.transactions_info.cost if block.transactions_info else 'None'}" f"{percent_full_str}" ) sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty( self.constants, len(block.finished_sub_slots) > 0, prev_b, self.blockchain, ) if block.reward_chain_block.signage_point_index == 0: res = self.full_node_store.get_sub_slot(block.reward_chain_block.pos_ss_cc_challenge_hash) if res is None: if block.reward_chain_block.pos_ss_cc_challenge_hash == self.constants.GENESIS_CHALLENGE: rc_prev = self.constants.GENESIS_CHALLENGE else: self.log.warning(f"Do not have sub slot {block.reward_chain_block.pos_ss_cc_challenge_hash}") return None else: rc_prev = res[0].reward_chain.get_hash() else: assert block.reward_chain_block.reward_chain_sp_vdf is not None rc_prev = block.reward_chain_block.reward_chain_sp_vdf.challenge timelord_request = timelord_protocol.NewUnfinishedBlockTimelord( block.reward_chain_block, difficulty, sub_slot_iters, block.foliage, ses, rc_prev, ) timelord_msg = make_msg(ProtocolMessageTypes.new_unfinished_block_timelord, timelord_request) await self.server.send_to_all([timelord_msg], NodeType.TIMELORD) full_node_request = full_node_protocol.NewUnfinishedBlock(block.reward_chain_block.get_hash()) msg = make_msg(ProtocolMessageTypes.new_unfinished_block, full_node_request) if peer is not None: await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) else: await self.server.send_to_all([msg], NodeType.FULL_NODE) self._state_changed("unfinished_block") async def new_infusion_point_vdf( self, request: timelord_protocol.NewInfusionPointVDF, timelord_peer: Optional[ws.WSChiaConnection] = None ) -> Optional[Message]: # Lookup unfinished blocks unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block( request.unfinished_reward_hash ) if unfinished_block is None: self.log.warning( f"Do not have unfinished reward chain block {request.unfinished_reward_hash}, cannot finish." ) return None prev_b: Optional[BlockRecord] = None target_rc_hash = request.reward_chain_ip_vdf.challenge last_slot_cc_hash = request.challenge_chain_ip_vdf.challenge # Backtracks through end of slot objects, should work for multiple empty sub slots for eos, _, _ in reversed(self.full_node_store.finished_sub_slots): if eos is not None and eos.reward_chain.get_hash() == target_rc_hash: target_rc_hash = eos.reward_chain.end_of_slot_vdf.challenge if target_rc_hash == self.constants.GENESIS_CHALLENGE: prev_b = None else: # Find the prev block, starts looking backwards from the peak. target_rc_hash must be the hash of a block # and not an end of slot (since we just looked through the slots and backtracked) curr: Optional[BlockRecord] = self.blockchain.get_peak() for _ in range(10): if curr is None: break if curr.reward_infusion_new_challenge == target_rc_hash: # Found our prev block prev_b = curr break curr = self.blockchain.try_block_record(curr.prev_hash) # If not found, cache keyed on prev block if prev_b is None: self.full_node_store.add_to_future_ip(request) self.log.warning(f"Previous block is None, infusion point {request.reward_chain_ip_vdf.challenge}") return None finished_sub_slots: Optional[List[EndOfSubSlotBundle]] = self.full_node_store.get_finished_sub_slots( self.blockchain, prev_b, last_slot_cc_hash, ) if finished_sub_slots is None: return None sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty( self.constants, len(finished_sub_slots) > 0, prev_b, self.blockchain, ) if unfinished_block.reward_chain_block.pos_ss_cc_challenge_hash == self.constants.GENESIS_CHALLENGE: sub_slot_start_iters = uint128(0) else: ss_res = self.full_node_store.get_sub_slot(unfinished_block.reward_chain_block.pos_ss_cc_challenge_hash) if ss_res is None: self.log.warning(f"Do not have sub slot {unfinished_block.reward_chain_block.pos_ss_cc_challenge_hash}") return None _, _, sub_slot_start_iters = ss_res sp_total_iters = uint128( sub_slot_start_iters + calculate_sp_iters( self.constants, sub_slot_iters, unfinished_block.reward_chain_block.signage_point_index, ) ) block: FullBlock = unfinished_block_to_full_block( unfinished_block, request.challenge_chain_ip_vdf, request.challenge_chain_ip_proof, request.reward_chain_ip_vdf, request.reward_chain_ip_proof, request.infused_challenge_chain_ip_vdf, request.infused_challenge_chain_ip_proof, finished_sub_slots, prev_b, self.blockchain, sp_total_iters, difficulty, ) if not self.has_valid_pool_sig(block): self.log.warning("Trying to make a pre-farm block but height is not 0") return None try: await self.respond_block(full_node_protocol.RespondBlock(block)) except Exception as e: self.log.warning(f"Consensus error validating block: {e}") if timelord_peer is not None: # Only sends to the timelord who sent us this VDF, to reset them to the correct peak await self.send_peak_to_timelords(peer=timelord_peer) return None async def respond_end_of_sub_slot( self, request: full_node_protocol.RespondEndOfSubSlot, peer: ws.WSChiaConnection ) -> Tuple[Optional[Message], bool]: fetched_ss = self.full_node_store.get_sub_slot(request.end_of_slot_bundle.challenge_chain.get_hash()) if fetched_ss is not None: # Already have the sub-slot return None, True async with self.timelord_lock: fetched_ss = self.full_node_store.get_sub_slot( request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge ) if ( (fetched_ss is None) and request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge != self.constants.GENESIS_CHALLENGE ): # If we don't have the prev, request the prev instead full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot( request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge, uint8(0), bytes([0] * 32), ) return ( make_msg(ProtocolMessageTypes.request_signage_point_or_end_of_sub_slot, full_node_request), False, ) peak = self.blockchain.get_peak() if peak is not None and peak.height > 2: next_sub_slot_iters = self.blockchain.get_next_slot_iters(peak.header_hash, True) next_difficulty = self.blockchain.get_next_difficulty(peak.header_hash, True) else: next_sub_slot_iters = self.constants.SUB_SLOT_ITERS_STARTING next_difficulty = self.constants.DIFFICULTY_STARTING # Adds the sub slot and potentially get new infusions new_infusions = self.full_node_store.new_finished_sub_slot( request.end_of_slot_bundle, self.blockchain, peak, await self.blockchain.get_full_peak(), ) # It may be an empty list, even if it's not None. Not None means added successfully if new_infusions is not None: self.log.info( f"⏲️ Finished sub slot, SP {self.constants.NUM_SPS_SUB_SLOT}/{self.constants.NUM_SPS_SUB_SLOT}, " f"{request.end_of_slot_bundle.challenge_chain.get_hash()}, " f"number of sub-slots: {len(self.full_node_store.finished_sub_slots)}, " f"RC hash: {request.end_of_slot_bundle.reward_chain.get_hash()}, " f"Deficit {request.end_of_slot_bundle.reward_chain.deficit}" ) # Notify full nodes of the new sub-slot broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge, request.end_of_slot_bundle.challenge_chain.get_hash(), uint8(0), request.end_of_slot_bundle.reward_chain.end_of_slot_vdf.challenge, ) msg = make_msg(ProtocolMessageTypes.new_signage_point_or_end_of_sub_slot, broadcast) await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) for infusion in new_infusions: await self.new_infusion_point_vdf(infusion) # Notify farmers of the new sub-slot broadcast_farmer = farmer_protocol.NewSignagePoint( request.end_of_slot_bundle.challenge_chain.get_hash(), request.end_of_slot_bundle.challenge_chain.get_hash(), request.end_of_slot_bundle.reward_chain.get_hash(), next_difficulty, next_sub_slot_iters, uint8(0), ) msg = make_msg(ProtocolMessageTypes.new_signage_point, broadcast_farmer) await self.server.send_to_all([msg], NodeType.FARMER) return None, True else: self.log.info( f"End of slot not added CC challenge " f"{request.end_of_slot_bundle.challenge_chain.challenge_chain_end_of_slot_vdf.challenge}" ) return None, False async def respond_transaction( self, transaction: SpendBundle, spend_name: bytes32, peer: Optional[ws.WSChiaConnection] = None, test: bool = False, ) -> Tuple[MempoolInclusionStatus, Optional[Err]]: if self.sync_store.get_sync_mode(): return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING if not test and not (await self.synced()): return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING # No transactions in mempool in initial client. Remove 6 weeks after launch if int(time.time()) <= self.constants.INITIAL_FREEZE_END_TIMESTAMP: return MempoolInclusionStatus.FAILED, Err.INITIAL_TRANSACTION_FREEZE if self.mempool_manager.seen(spend_name): return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION self.mempool_manager.add_and_maybe_pop_seen(spend_name) self.log.debug(f"Processing transaction: {spend_name}") # Ignore if syncing if self.sync_store.get_sync_mode(): status = MempoolInclusionStatus.FAILED error: Optional[Err] = Err.NO_TRANSACTIONS_WHILE_SYNCING self.mempool_manager.remove_seen(spend_name) else: try: cost_result = await self.mempool_manager.pre_validate_spendbundle(transaction) except Exception as e: self.mempool_manager.remove_seen(spend_name) raise e async with self.mempool_manager.lock: if self.mempool_manager.get_spendbundle(spend_name) is not None: self.mempool_manager.remove_seen(spend_name) return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION cost, status, error = await self.mempool_manager.add_spendbundle(transaction, cost_result, spend_name) if status == MempoolInclusionStatus.SUCCESS: self.log.debug( f"Added transaction to mempool: {spend_name} mempool size: " f"{self.mempool_manager.mempool.total_mempool_cost}" ) # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS # vector. mempool_item = self.mempool_manager.get_mempool_item(spend_name) assert mempool_item is not None fees = mempool_item.fee assert fees >= 0 assert cost is not None new_tx = full_node_protocol.NewTransaction( spend_name, cost, fees, ) msg = make_msg(ProtocolMessageTypes.new_transaction, new_tx) if peer is None: await self.server.send_to_all([msg], NodeType.FULL_NODE) else: await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) else: self.mempool_manager.remove_seen(spend_name) self.log.debug( f"Wasn't able to add transaction with id {spend_name}, " f"status {status} error: {error}" ) return status, error async def _needs_compact_proof( self, vdf_info: VDFInfo, header_block: HeaderBlock, field_vdf: CompressibleVDFField ) -> bool: if field_vdf == CompressibleVDFField.CC_EOS_VDF: for sub_slot in header_block.finished_sub_slots: if sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf == vdf_info: if ( sub_slot.proofs.challenge_chain_slot_proof.witness_type == 0 and sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity ): return False return True if field_vdf == CompressibleVDFField.ICC_EOS_VDF: for sub_slot in header_block.finished_sub_slots: if ( sub_slot.infused_challenge_chain is not None and sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf == vdf_info ): assert sub_slot.proofs.infused_challenge_chain_slot_proof is not None if ( sub_slot.proofs.infused_challenge_chain_slot_proof.witness_type == 0 and sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity ): return False return True if field_vdf == CompressibleVDFField.CC_SP_VDF: if header_block.reward_chain_block.challenge_chain_sp_vdf is None: return False if vdf_info == header_block.reward_chain_block.challenge_chain_sp_vdf: assert header_block.challenge_chain_sp_proof is not None if ( header_block.challenge_chain_sp_proof.witness_type == 0 and header_block.challenge_chain_sp_proof.normalized_to_identity ): return False return True if field_vdf == CompressibleVDFField.CC_IP_VDF: if vdf_info == header_block.reward_chain_block.challenge_chain_ip_vdf: if ( header_block.challenge_chain_ip_proof.witness_type == 0 and header_block.challenge_chain_ip_proof.normalized_to_identity ): return False return True return False async def _can_accept_compact_proof( self, vdf_info: VDFInfo, vdf_proof: VDFProof, height: uint32, header_hash: bytes32, field_vdf: CompressibleVDFField, ) -> bool: """ - Checks if the provided proof is indeed compact. - Checks if proof verifies given the vdf_info from the start of sub-slot. - Checks if the provided vdf_info is correct, assuming it refers to the start of sub-slot. - Checks if the existing proof was non-compact. Ignore this proof if we already have a compact proof. """ is_fully_compactified = await self.block_store.is_fully_compactified(header_hash) if is_fully_compactified is None or is_fully_compactified: self.log.info(f"Already compactified block: {header_hash}. Ignoring.") return False if vdf_proof.witness_type > 0 or not vdf_proof.normalized_to_identity: self.log.error(f"Received vdf proof is not compact: {vdf_proof}.") return False if not vdf_proof.is_valid(self.constants, ClassgroupElement.get_default_element(), vdf_info): self.log.error(f"Received compact vdf proof is not valid: {vdf_proof}.") return False header_block = await self.blockchain.get_header_block_by_height(height, header_hash, tx_filter=False) if header_block is None: self.log.error(f"Can't find block for given compact vdf. Height: {height} Header hash: {header_hash}") return False is_new_proof = await self._needs_compact_proof(vdf_info, header_block, field_vdf) if not is_new_proof: self.log.info(f"Duplicate compact proof. Height: {height}. Header hash: {header_hash}.") return is_new_proof async def _replace_proof( self, vdf_info: VDFInfo, vdf_proof: VDFProof, height: uint32, field_vdf: CompressibleVDFField, ): full_blocks = await self.block_store.get_full_blocks_at([height]) assert len(full_blocks) > 0 for block in full_blocks: new_block = None block_record = await self.blockchain.get_block_record_from_db(self.blockchain.height_to_hash(height)) assert block_record is not None if field_vdf == CompressibleVDFField.CC_EOS_VDF: for index, sub_slot in enumerate(block.finished_sub_slots): if sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf == vdf_info: new_proofs = dataclasses.replace(sub_slot.proofs, challenge_chain_slot_proof=vdf_proof) new_subslot = dataclasses.replace(sub_slot, proofs=new_proofs) new_finished_subslots = block.finished_sub_slots new_finished_subslots[index] = new_subslot new_block = dataclasses.replace(block, finished_sub_slots=new_finished_subslots) break if field_vdf == CompressibleVDFField.ICC_EOS_VDF: for index, sub_slot in enumerate(block.finished_sub_slots): if ( sub_slot.infused_challenge_chain is not None and sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf == vdf_info ): new_proofs = dataclasses.replace(sub_slot.proofs, infused_challenge_chain_slot_proof=vdf_proof) new_subslot = dataclasses.replace(sub_slot, proofs=new_proofs) new_finished_subslots = block.finished_sub_slots new_finished_subslots[index] = new_subslot new_block = dataclasses.replace(block, finished_sub_slots=new_finished_subslots) break if field_vdf == CompressibleVDFField.CC_SP_VDF: if block.reward_chain_block.challenge_chain_sp_vdf == vdf_info: assert block.challenge_chain_sp_proof is not None new_block = dataclasses.replace(block, challenge_chain_sp_proof=vdf_proof) if field_vdf == CompressibleVDFField.CC_IP_VDF: if block.reward_chain_block.challenge_chain_ip_vdf == vdf_info: new_block = dataclasses.replace(block, challenge_chain_ip_proof=vdf_proof) if new_block is None: self.log.debug("did not replace any proof, vdf does not match") return async with self.db_wrapper.lock: await self.block_store.add_full_block(new_block.header_hash, new_block, block_record) await self.block_store.db_wrapper.commit_transaction() async def respond_compact_proof_of_time(self, request: timelord_protocol.RespondCompactProofOfTime): field_vdf = CompressibleVDFField(int(request.field_vdf)) if not await self._can_accept_compact_proof( request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf ): return None async with self.blockchain.compact_proof_lock: await self._replace_proof(request.vdf_info, request.vdf_proof, request.height, field_vdf) msg = make_msg( ProtocolMessageTypes.new_compact_vdf, full_node_protocol.NewCompactVDF(request.height, request.header_hash, request.field_vdf, request.vdf_info), ) if self.server is not None: await self.server.send_to_all([msg], NodeType.FULL_NODE) async def new_compact_vdf(self, request: full_node_protocol.NewCompactVDF, peer: ws.WSChiaConnection): is_fully_compactified = await self.block_store.is_fully_compactified(request.header_hash) if is_fully_compactified is None or is_fully_compactified: return False header_block = await self.blockchain.get_header_block_by_height( request.height, request.header_hash, tx_filter=False ) if header_block is None: return None field_vdf = CompressibleVDFField(int(request.field_vdf)) if await self._needs_compact_proof(request.vdf_info, header_block, field_vdf): peer_request = full_node_protocol.RequestCompactVDF( request.height, request.header_hash, request.field_vdf, request.vdf_info ) response = await peer.request_compact_vdf(peer_request, timeout=10) if response is not None and isinstance(response, full_node_protocol.RespondCompactVDF): await self.respond_compact_vdf(response, peer) async def request_compact_vdf(self, request: full_node_protocol.RequestCompactVDF, peer: ws.WSChiaConnection): header_block = await self.blockchain.get_header_block_by_height( request.height, request.header_hash, tx_filter=False ) if header_block is None: return None vdf_proof: Optional[VDFProof] = None field_vdf = CompressibleVDFField(int(request.field_vdf)) if field_vdf == CompressibleVDFField.CC_EOS_VDF: for sub_slot in header_block.finished_sub_slots: if sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf == request.vdf_info: vdf_proof = sub_slot.proofs.challenge_chain_slot_proof break if field_vdf == CompressibleVDFField.ICC_EOS_VDF: for sub_slot in header_block.finished_sub_slots: if ( sub_slot.infused_challenge_chain is not None and sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf == request.vdf_info ): vdf_proof = sub_slot.proofs.infused_challenge_chain_slot_proof break if ( field_vdf == CompressibleVDFField.CC_SP_VDF and header_block.reward_chain_block.challenge_chain_sp_vdf == request.vdf_info ): vdf_proof = header_block.challenge_chain_sp_proof if ( field_vdf == CompressibleVDFField.CC_IP_VDF and header_block.reward_chain_block.challenge_chain_ip_vdf == request.vdf_info ): vdf_proof = header_block.challenge_chain_ip_proof if vdf_proof is None or vdf_proof.witness_type > 0 or not vdf_proof.normalized_to_identity: self.log.error(f"{peer} requested compact vdf we don't have, height: {request.height}.") return None compact_vdf = full_node_protocol.RespondCompactVDF( request.height, request.header_hash, request.field_vdf, request.vdf_info, vdf_proof, ) msg = make_msg(ProtocolMessageTypes.respond_compact_vdf, compact_vdf) await peer.send_message(msg) async def respond_compact_vdf(self, request: full_node_protocol.RespondCompactVDF, peer: ws.WSChiaConnection): field_vdf = CompressibleVDFField(int(request.field_vdf)) if not await self._can_accept_compact_proof( request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf ): return None async with self.blockchain.compact_proof_lock: if self.blockchain.seen_compact_proofs(request.vdf_info, request.height): return None await self._replace_proof(request.vdf_info, request.vdf_proof, request.height, field_vdf) msg = make_msg( ProtocolMessageTypes.new_compact_vdf, full_node_protocol.NewCompactVDF(request.height, request.header_hash, request.field_vdf, request.vdf_info), ) if self.server is not None: await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) async def broadcast_uncompact_blocks( self, uncompact_interval_scan: int, target_uncompact_proofs: int, sanitize_weight_proof_only: bool ): min_height: Optional[int] = 0 try: while not self._shut_down: while self.sync_store.get_sync_mode(): if self._shut_down: return None await asyncio.sleep(30) broadcast_list: List[timelord_protocol.RequestCompactProofOfTime] = [] new_min_height = None max_height = self.blockchain.get_peak_height() if max_height is None: await asyncio.sleep(30) continue # Calculate 'min_height' correctly the first time this task is launched, using the db assert min_height is not None min_height = await self.block_store.get_first_not_compactified(min_height) if min_height is None or min_height > max(0, max_height - 1000): min_height = max(0, max_height - 1000) batches_finished = 0 self.log.info("Scanning the blockchain for uncompact blocks.") assert max_height is not None assert min_height is not None for h in range(min_height, max_height, 100): # Got 10 times the target header count, sampling the target headers should contain # enough randomness to split the work between blueboxes. if len(broadcast_list) > target_uncompact_proofs * 10: break stop_height = min(h + 99, max_height) assert min_height is not None headers = await self.blockchain.get_header_blocks_in_range(min_height, stop_height, tx_filter=False) records: Dict[bytes32, BlockRecord] = {} if sanitize_weight_proof_only: records = await self.blockchain.get_block_records_in_range(min_height, stop_height) for header in headers.values(): prev_broadcast_list_len = len(broadcast_list) expected_header_hash = self.blockchain.height_to_hash(header.height) if header.header_hash != expected_header_hash: continue if sanitize_weight_proof_only: assert header.header_hash in records record = records[header.header_hash] for sub_slot in header.finished_sub_slots: if ( sub_slot.proofs.challenge_chain_slot_proof.witness_type > 0 or not sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity ): broadcast_list.append( timelord_protocol.RequestCompactProofOfTime( sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf, header.header_hash, header.height, uint8(CompressibleVDFField.CC_EOS_VDF), ) ) if sub_slot.proofs.infused_challenge_chain_slot_proof is not None and ( sub_slot.proofs.infused_challenge_chain_slot_proof.witness_type > 0 or not sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity ): assert sub_slot.infused_challenge_chain is not None broadcast_list.append( timelord_protocol.RequestCompactProofOfTime( sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf, header.header_hash, header.height, uint8(CompressibleVDFField.ICC_EOS_VDF), ) ) # Running in 'sanitize_weight_proof_only' ignores CC_SP_VDF and CC_IP_VDF # unless this is a challenge block. if sanitize_weight_proof_only: if not record.is_challenge_block(self.constants): # Calculates 'new_min_height' as described below. if ( prev_broadcast_list_len == 0 and len(broadcast_list) > 0 and h <= max(0, max_height - 1000) ): new_min_height = header.height # Skip calculations for CC_SP_VDF and CC_IP_VDF. continue if header.challenge_chain_sp_proof is not None and ( header.challenge_chain_sp_proof.witness_type > 0 or not header.challenge_chain_sp_proof.normalized_to_identity ): assert header.reward_chain_block.challenge_chain_sp_vdf is not None broadcast_list.append( timelord_protocol.RequestCompactProofOfTime( header.reward_chain_block.challenge_chain_sp_vdf, header.header_hash, header.height, uint8(CompressibleVDFField.CC_SP_VDF), ) ) if ( header.challenge_chain_ip_proof.witness_type > 0 or not header.challenge_chain_ip_proof.normalized_to_identity ): broadcast_list.append( timelord_protocol.RequestCompactProofOfTime( header.reward_chain_block.challenge_chain_ip_vdf, header.header_hash, header.height, uint8(CompressibleVDFField.CC_IP_VDF), ) ) # This is the first header with uncompact proofs. Store its height so next time we iterate # only from here. Fix header block iteration window to at least 1000, so reorgs will be # handled correctly. if prev_broadcast_list_len == 0 and len(broadcast_list) > 0 and h <= max(0, max_height - 1000): new_min_height = header.height # Small sleep between batches. batches_finished += 1 if batches_finished % 10 == 0: await asyncio.sleep(1) # We have no uncompact blocks, but mentain the block iteration window to at least 1000 blocks. if new_min_height is None: new_min_height = max(0, max_height - 1000) min_height = new_min_height if len(broadcast_list) > target_uncompact_proofs: random.shuffle(broadcast_list) broadcast_list = broadcast_list[:target_uncompact_proofs] if self.sync_store.get_sync_mode(): continue if self.server is not None: for new_pot in broadcast_list: msg = make_msg(ProtocolMessageTypes.request_compact_proof_of_time, new_pot) await self.server.send_to_all([msg], NodeType.TIMELORD) await asyncio.sleep(uncompact_interval_scan) except Exception as e: error_stack = traceback.format_exc() self.log.error(f"Exception in broadcast_uncompact_blocks: {e}") self.log.error(f"Exception Stack: {error_stack}")