from typing import Any, Callable, Dict, List, Optional from chia.consensus.block_record import BlockRecord from chia.consensus.pos_quality import UI_ACTUAL_SPACE_CONSTANT_FACTOR from chia.full_node.full_node import FullNode from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.coin_record import CoinRecord from chia.types.full_block import FullBlock from chia.types.mempool_inclusion_status import MempoolInclusionStatus from chia.types.spend_bundle import SpendBundle from chia.types.unfinished_header_block import UnfinishedHeaderBlock from chia.util.byte_types import hexstr_to_bytes from chia.util.ints import uint32, uint64, uint128 from chia.util.ws_message import WsRpcMessage, create_payload_dict class FullNodeRpcApi: def __init__(self, service: FullNode): self.service = service self.service_name = "chia_full_node" self.cached_blockchain_state: Optional[Dict] = None def get_routes(self) -> Dict[str, Callable]: return { # Blockchain "/get_blockchain_state": self.get_blockchain_state, "/get_block": self.get_block, "/get_blocks": self.get_blocks, "/get_block_record_by_height": self.get_block_record_by_height, "/get_block_record": self.get_block_record, "/get_block_records": self.get_block_records, "/get_unfinished_block_headers": self.get_unfinished_block_headers, "/get_network_space": self.get_network_space, "/get_additions_and_removals": self.get_additions_and_removals, "/get_initial_freeze_period": self.get_initial_freeze_period, "/get_network_info": self.get_network_info, # Coins "/get_coin_records_by_puzzle_hash": self.get_coin_records_by_puzzle_hash, "/get_coin_records_by_puzzle_hashes": self.get_coin_records_by_puzzle_hashes, "/get_coin_record_by_name": self.get_coin_record_by_name, "/push_tx": self.push_tx, # Mempool "/get_all_mempool_tx_ids": self.get_all_mempool_tx_ids, "/get_all_mempool_items": self.get_all_mempool_items, "/get_mempool_item_by_tx_id": self.get_mempool_item_by_tx_id, } async def _state_changed(self, change: str) -> List[WsRpcMessage]: payloads = [] if change == "new_peak" or change == "sync_mode": data = await self.get_blockchain_state({}) assert data is not None payloads.append( create_payload_dict( "get_blockchain_state", data, self.service_name, "wallet_ui", ) ) return payloads return [] async def get_initial_freeze_period(self, _: Dict): freeze_period = self.service.constants.INITIAL_FREEZE_END_TIMESTAMP return {"INITIAL_FREEZE_END_TIMESTAMP": freeze_period} async def get_blockchain_state(self, _request: Dict): """ Returns a summary of the node's view of the blockchain. """ if self.service.initialized is False: res: Dict = { "blockchain_state": { "peak": None, "genesis_challenge_initialized": self.service.initialized, "sync": { "sync_mode": False, "synced": False, "sync_tip_height": 0, "sync_progress_height": 0, }, "difficulty": 0, "sub_slot_iters": 0, "space": 0, "mempool_size": 0, }, } return res peak: Optional[BlockRecord] = self.service.blockchain.get_peak() if peak is not None and peak.height > 0: difficulty = uint64(peak.weight - self.service.blockchain.block_record(peak.prev_hash).weight) sub_slot_iters = peak.sub_slot_iters else: difficulty = self.service.constants.DIFFICULTY_STARTING sub_slot_iters = self.service.constants.SUB_SLOT_ITERS_STARTING sync_mode: bool = self.service.sync_store.get_sync_mode() or self.service.sync_store.get_long_sync() sync_tip_height: Optional[uint32] = uint32(0) if sync_mode: if self.service.sync_store.get_sync_target_height() is not None: sync_tip_height = self.service.sync_store.get_sync_target_height() assert sync_tip_height is not None if peak is not None: sync_progress_height: uint32 = peak.height else: sync_progress_height = uint32(0) else: sync_progress_height = uint32(0) if peak is not None and peak.height > 1: newer_block_hex = peak.header_hash.hex() # Average over the last day header_hash = self.service.blockchain.height_to_hash(uint32(max(1, peak.height - 4608))) assert header_hash is not None older_block_hex = header_hash.hex() space = await self.get_network_space( {"newer_block_header_hash": newer_block_hex, "older_block_header_hash": older_block_hex} ) else: space = {"space": uint128(0)} if self.service.mempool_manager is not None: mempool_size = len(self.service.mempool_manager.mempool.spends) else: mempool_size = 0 if self.service.server is not None: is_connected = len(self.service.server.get_full_node_connections()) > 0 else: is_connected = False synced = await self.service.synced() and is_connected assert space is not None response: Dict = { "blockchain_state": { "peak": peak, "genesis_challenge_initialized": self.service.initialized, "sync": { "sync_mode": sync_mode, "synced": synced, "sync_tip_height": sync_tip_height, "sync_progress_height": sync_progress_height, }, "difficulty": difficulty, "sub_slot_iters": sub_slot_iters, "space": space["space"], "mempool_size": mempool_size, }, } self.cached_blockchain_state = dict(response["blockchain_state"]) return response async def get_network_info(self, request: Dict): network_name = self.service.config["selected_network"] address_prefix = self.service.config["network_overrides"]["config"][network_name]["address_prefix"] return {"network_name": network_name, "network_prefix": address_prefix} async def get_block(self, request: Dict) -> Optional[Dict]: if "header_hash" not in request: raise ValueError("No header_hash in request") header_hash = hexstr_to_bytes(request["header_hash"]) block: Optional[FullBlock] = await self.service.block_store.get_full_block(header_hash) if block is None: raise ValueError(f"Block {header_hash.hex()} not found") return {"block": block} async def get_blocks(self, request: Dict) -> Optional[Dict]: if "start" not in request: raise ValueError("No start in request") if "end" not in request: raise ValueError("No end in request") exclude_hh = False if "exclude_header_hash" in request: exclude_hh = request["exclude_header_hash"] start = int(request["start"]) end = int(request["end"]) block_range = [] for a in range(start, end): block_range.append(uint32(a)) blocks: List[FullBlock] = await self.service.block_store.get_full_blocks_at(block_range) json_blocks = [] for block in blocks: json = block.to_json_dict() if not exclude_hh: json["header_hash"] = block.header_hash.hex() json_blocks.append(json) return {"blocks": json_blocks} async def get_block_records(self, request: Dict) -> Optional[Dict]: if "start" not in request: raise ValueError("No start in request") if "end" not in request: raise ValueError("No end in request") start = int(request["start"]) end = int(request["end"]) records = [] peak_height = self.service.blockchain.get_peak_height() if peak_height is None: raise ValueError("Peak is None") for a in range(start, end): if peak_height < uint32(a): self.service.log.warning("requested block is higher than known peak ") break header_hash: bytes32 = self.service.blockchain.height_to_hash(uint32(a)) record: Optional[BlockRecord] = self.service.blockchain.try_block_record(header_hash) if record is None: # Fetch from DB record = await self.service.blockchain.block_store.get_block_record(header_hash) if record is None: raise ValueError(f"Block {header_hash.hex()} does not exist") records.append(record) return {"block_records": records} async def get_block_record_by_height(self, request: Dict) -> Optional[Dict]: if "height" not in request: raise ValueError("No height in request") height = request["height"] header_height = uint32(int(height)) peak_height = self.service.blockchain.get_peak_height() if peak_height is None or header_height > peak_height: raise ValueError(f"Block height {height} not found in chain") header_hash: Optional[bytes32] = self.service.blockchain.height_to_hash(header_height) if header_hash is None: raise ValueError(f"Block hash {height} not found in chain") record: Optional[BlockRecord] = self.service.blockchain.try_block_record(header_hash) if record is None: # Fetch from DB record = await self.service.blockchain.block_store.get_block_record(header_hash) if record is None: raise ValueError(f"Block {header_hash} does not exist") return {"block_record": record} async def get_block_record(self, request: Dict): if "header_hash" not in request: raise ValueError("header_hash not in request") header_hash_str = request["header_hash"] header_hash = hexstr_to_bytes(header_hash_str) record: Optional[BlockRecord] = self.service.blockchain.try_block_record(header_hash) if record is None: # Fetch from DB record = await self.service.blockchain.block_store.get_block_record(header_hash) if record is None: raise ValueError(f"Block {header_hash.hex()} does not exist") return {"block_record": record} async def get_unfinished_block_headers(self, request: Dict) -> Optional[Dict]: peak: Optional[BlockRecord] = self.service.blockchain.get_peak() if peak is None: return {"headers": []} response_headers: List[UnfinishedHeaderBlock] = [] for ub_height, block, _ in (self.service.full_node_store.get_unfinished_blocks()).values(): if ub_height == peak.height: unfinished_header_block = UnfinishedHeaderBlock( block.finished_sub_slots, block.reward_chain_block, block.challenge_chain_sp_proof, block.reward_chain_sp_proof, block.foliage, block.foliage_transaction_block, b"", ) response_headers.append(unfinished_header_block) return {"headers": response_headers} async def get_network_space(self, request: Dict) -> Optional[Dict]: """ Retrieves an estimate of total space validating the chain between two block header hashes. """ if "newer_block_header_hash" not in request or "older_block_header_hash" not in request: raise ValueError("Invalid request. newer_block_header_hash and older_block_header_hash required") newer_block_hex = request["newer_block_header_hash"] older_block_hex = request["older_block_header_hash"] if newer_block_hex == older_block_hex: raise ValueError("New and old must not be the same") newer_block_bytes = hexstr_to_bytes(newer_block_hex) older_block_bytes = hexstr_to_bytes(older_block_hex) newer_block = await self.service.block_store.get_block_record(newer_block_bytes) if newer_block is None: raise ValueError("Newer block not found") older_block = await self.service.block_store.get_block_record(older_block_bytes) if older_block is None: raise ValueError("Newer block not found") delta_weight = newer_block.weight - older_block.weight delta_iters = newer_block.total_iters - older_block.total_iters weight_div_iters = delta_weight / delta_iters additional_difficulty_constant = self.service.constants.DIFFICULTY_CONSTANT_FACTOR eligible_plots_filter_multiplier = 2 ** self.service.constants.NUMBER_ZERO_BITS_PLOT_FILTER network_space_bytes_estimate = ( UI_ACTUAL_SPACE_CONSTANT_FACTOR * weight_div_iters * additional_difficulty_constant * eligible_plots_filter_multiplier ) return {"space": uint128(int(network_space_bytes_estimate))} async def get_coin_records_by_puzzle_hash(self, request: Dict) -> Optional[Dict]: """ Retrieves the coins for a given puzzlehash, by default returns unspent coins. """ if "puzzle_hash" not in request: raise ValueError("Puzzle hash not in request") kwargs: Dict[str, Any] = {"include_spent_coins": False, "puzzle_hash": hexstr_to_bytes(request["puzzle_hash"])} if "start_height" in request: kwargs["start_height"] = uint32(request["start_height"]) if "end_height" in request: kwargs["end_height"] = uint32(request["end_height"]) if "include_spent_coins" in request: kwargs["include_spent_coins"] = request["include_spent_coins"] coin_records = await self.service.blockchain.coin_store.get_coin_records_by_puzzle_hash(**kwargs) return {"coin_records": coin_records} async def get_coin_records_by_puzzle_hashes(self, request: Dict) -> Optional[Dict]: """ Retrieves the coins for a given puzzlehash, by default returns unspent coins. """ if "puzzle_hashes" not in request: raise ValueError("Puzzle hashes not in request") kwargs: Dict[str, Any] = { "include_spent_coins": False, "puzzle_hashes": [hexstr_to_bytes(ph) for ph in request["puzzle_hashes"]], } if "start_height" in request: kwargs["start_height"] = uint32(request["start_height"]) if "end_height" in request: kwargs["end_height"] = uint32(request["end_height"]) if "include_spent_coins" in request: kwargs["include_spent_coins"] = request["include_spent_coins"] coin_records = await self.service.blockchain.coin_store.get_coin_records_by_puzzle_hashes(**kwargs) return {"coin_records": coin_records} async def get_coin_record_by_name(self, request: Dict) -> Optional[Dict]: """ Retrieves a coin record by it's name. """ if "name" not in request: raise ValueError("Name not in request") name = hexstr_to_bytes(request["name"]) coin_record: Optional[CoinRecord] = await self.service.blockchain.coin_store.get_coin_record(name) if coin_record is None: raise ValueError(f"Coin record 0x{name.hex()} not found") return {"coin_record": coin_record} async def push_tx(self, request: Dict) -> Optional[Dict]: if "spend_bundle" not in request: raise ValueError("Spend bundle not in request") spend_bundle = SpendBundle.from_json_dict(request["spend_bundle"]) spend_name = spend_bundle.name() if self.service.mempool_manager.get_spendbundle(spend_name) is not None: status = MempoolInclusionStatus.SUCCESS error = None else: status, error = await self.service.respond_transaction(spend_bundle, spend_name) if status != MempoolInclusionStatus.SUCCESS: if self.service.mempool_manager.get_spendbundle(spend_name) is not None: # Already in mempool status = MempoolInclusionStatus.SUCCESS error = None if status == MempoolInclusionStatus.FAILED: assert error is not None raise ValueError(f"Failed to include transaction {spend_name}, error {error.name}") return { "status": status.name, } async def get_additions_and_removals(self, request: Dict) -> Optional[Dict]: if "header_hash" not in request: raise ValueError("No header_hash in request") header_hash = hexstr_to_bytes(request["header_hash"]) block: Optional[FullBlock] = await self.service.block_store.get_full_block(header_hash) if block is None: raise ValueError(f"Block {header_hash.hex()} not found") async with self.service.blockchain.lock: if self.service.blockchain.height_to_hash(block.height) != header_hash: raise ValueError(f"Block at {header_hash.hex()} is no longer in the blockchain (it's in a fork)") additions: List[CoinRecord] = await self.service.coin_store.get_coins_added_at_height(block.height) removals: List[CoinRecord] = await self.service.coin_store.get_coins_removed_at_height(block.height) return {"additions": additions, "removals": removals} async def get_all_mempool_tx_ids(self, request: Dict) -> Optional[Dict]: ids = list(self.service.mempool_manager.mempool.spends.keys()) return {"tx_ids": ids} async def get_all_mempool_items(self, request: Dict) -> Optional[Dict]: spends = {} for tx_id, item in self.service.mempool_manager.mempool.spends.items(): spends[tx_id.hex()] = item return {"mempool_items": spends} async def get_mempool_item_by_tx_id(self, request: Dict) -> Optional[Dict]: if "tx_id" not in request: raise ValueError("No tx_id in request") tx_id: bytes32 = hexstr_to_bytes(request["tx_id"]) item = self.service.mempool_manager.get_mempool_item(tx_id) if item is None: raise ValueError(f"Tx id 0x{tx_id.hex()} not in the mempool") return {"mempool_item": item}