import asyncio import json import logging import socket import time import traceback from pathlib import Path from typing import Callable, Dict, List, Optional, Set, Tuple, Union, Any from blspy import PrivateKey from chia.consensus.block_record import BlockRecord from chia.consensus.constants import ConsensusConstants from chia.consensus.multiprocess_validation import PreValidationResult from chia.protocols import wallet_protocol from chia.protocols.full_node_protocol import RequestProofOfWeight, RespondProofOfWeight from chia.protocols.protocol_message_types import ProtocolMessageTypes from chia.protocols.wallet_protocol import ( RejectAdditionsRequest, RejectRemovalsRequest, RequestAdditions, RequestHeaderBlocks, RespondAdditions, RespondBlockHeader, RespondHeaderBlocks, RespondRemovals, ) from chia.server.node_discovery import WalletPeers from chia.server.outbound_message import Message, NodeType, make_msg from chia.server.server import ChiaServer from chia.server.ws_connection import WSChiaConnection from chia.types.blockchain_format.coin import Coin, hash_coin_list from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.header_block import HeaderBlock from chia.types.peer_info import PeerInfo from chia.util.byte_types import hexstr_to_bytes from chia.util.errors import Err, ValidationError from chia.util.ints import uint32, uint128 from chia.util.keychain import Keychain from chia.util.lru_cache import LRUCache from chia.util.merkle_set import MerkleSet, confirm_included_already_hashed, confirm_not_included_already_hashed from chia.util.path import mkdir, path_from_root from chia.wallet.block_record import HeaderBlockRecord from chia.wallet.derivation_record import DerivationRecord from chia.wallet.settings.settings_objects import BackupInitialized from chia.wallet.transaction_record import TransactionRecord from chia.wallet.util.backup_utils import open_backup_file from chia.wallet.util.wallet_types import WalletType from chia.wallet.wallet_action import WalletAction from chia.wallet.wallet_blockchain import ReceiveBlockResult from chia.wallet.wallet_state_manager import WalletStateManager from chia.util.profiler import profile_task class WalletNode: key_config: Dict config: Dict constants: ConsensusConstants server: Optional[ChiaServer] log: logging.Logger wallet_peers: WalletPeers # Maintains the state of the wallet (blockchain and transactions), handles DB connections wallet_state_manager: Optional[WalletStateManager] # How far away from LCA we must be to perform a full sync. Before then, do a short sync, # which is consecutive requests for the previous block short_sync_threshold: int _shut_down: bool root_path: Path state_changed_callback: Optional[Callable] syncing: bool full_node_peer: Optional[PeerInfo] peer_task: Optional[asyncio.Task] logged_in: bool def __init__( self, config: Dict, keychain: Keychain, root_path: Path, consensus_constants: ConsensusConstants, name: str = None, ): self.config = config self.constants = consensus_constants self.root_path = root_path if name: self.log = logging.getLogger(name) else: self.log = logging.getLogger(__name__) # Normal operation data self.cached_blocks: Dict = {} self.future_block_hashes: Dict = {} self.keychain = keychain # Sync data self._shut_down = False self.proof_hashes: List = [] self.header_hashes: List = [] self.header_hashes_error = False self.short_sync_threshold = 15 # Change the test when changing this self.potential_blocks_received: Dict = {} self.potential_header_hashes: Dict = {} self.state_changed_callback = None self.wallet_state_manager = None self.backup_initialized = False # Delay first launch sync after user imports backup info or decides to skip self.server = None self.wsm_close_task = None self.sync_task: Optional[asyncio.Task] = None self.new_peak_lock: Optional[asyncio.Lock] = None self.logged_in_fingerprint: Optional[int] = None self.peer_task = None self.logged_in = False self.last_new_peak_messages = LRUCache(5) def get_key_for_fingerprint(self, fingerprint: Optional[int]): private_keys = self.keychain.get_all_private_keys() if len(private_keys) == 0: self.log.warning("No keys present. Create keys with the UI, or with the 'chia keys' program.") return None private_key: Optional[PrivateKey] = None if fingerprint is not None: for sk, _ in private_keys: if sk.get_g1().get_fingerprint() == fingerprint: private_key = sk break else: private_key = private_keys[0][0] return private_key async def _start( self, fingerprint: Optional[int] = None, new_wallet: bool = False, backup_file: Optional[Path] = None, skip_backup_import: bool = False, ) -> bool: private_key = self.get_key_for_fingerprint(fingerprint) if private_key is None: self.logged_in = False return False if self.config.get("enable_profiler", False): asyncio.create_task(profile_task(self.root_path, "wallet", self.log)) db_path_key_suffix = str(private_key.get_g1().get_fingerprint()) db_path_replaced: str = ( self.config["database_path"] .replace("CHALLENGE", self.config["selected_network"]) .replace("KEY", db_path_key_suffix) ) path = path_from_root(self.root_path, db_path_replaced) mkdir(path.parent) assert self.server is not None self.wallet_state_manager = await WalletStateManager.create( private_key, self.config, path, self.constants, self.server ) self.wsm_close_task = None assert self.wallet_state_manager is not None backup_settings: BackupInitialized = self.wallet_state_manager.user_settings.get_backup_settings() if backup_settings.user_initialized is False: if new_wallet is True: await self.wallet_state_manager.user_settings.user_created_new_wallet() self.wallet_state_manager.new_wallet = True elif skip_backup_import is True: await self.wallet_state_manager.user_settings.user_skipped_backup_import() elif backup_file is not None: await self.wallet_state_manager.import_backup_info(backup_file) else: self.backup_initialized = False await self.wallet_state_manager.close_all_stores() self.wallet_state_manager = None self.logged_in = False return False self.backup_initialized = True if backup_file is not None: json_dict = open_backup_file(backup_file, self.wallet_state_manager.private_key) if "start_height" in json_dict["data"]: start_height = json_dict["data"]["start_height"] self.config["starting_height"] = max(0, start_height - self.config["start_height_buffer"]) else: self.config["starting_height"] = 0 else: self.config["starting_height"] = 0 if self.state_changed_callback is not None: self.wallet_state_manager.set_callback(self.state_changed_callback) self.wallet_state_manager.set_pending_callback(self._pending_tx_handler) self._shut_down = False self.peer_task = asyncio.create_task(self._periodically_check_full_node()) self.sync_event = asyncio.Event() self.sync_task = asyncio.create_task(self.sync_job()) self.logged_in_fingerprint = fingerprint self.logged_in = True return True def _close(self): self.log.info("self._close") self.logged_in_fingerprint = None self._shut_down = True async def _await_closed(self): self.log.info("self._await_closed") await self.server.close_all_connections() asyncio.create_task(self.wallet_peers.ensure_is_closed()) if self.wallet_state_manager is not None: await self.wallet_state_manager.close_all_stores() self.wallet_state_manager = None if self.sync_task is not None: self.sync_task.cancel() self.sync_task = None if self.peer_task is not None: self.peer_task.cancel() self.peer_task = None self.logged_in = False def _set_state_changed_callback(self, callback: Callable): self.state_changed_callback = callback if self.wallet_state_manager is not None: self.wallet_state_manager.set_callback(self.state_changed_callback) self.wallet_state_manager.set_pending_callback(self._pending_tx_handler) def _pending_tx_handler(self): if self.wallet_state_manager is None or self.backup_initialized is False: return None asyncio.create_task(self._resend_queue()) async def _action_messages(self) -> List[Message]: if self.wallet_state_manager is None or self.backup_initialized is False: return [] actions: List[WalletAction] = await self.wallet_state_manager.action_store.get_all_pending_actions() result: List[Message] = [] for action in actions: data = json.loads(action.data) action_data = data["data"]["action_data"] if action.name == "request_puzzle_solution": coin_name = bytes32(hexstr_to_bytes(action_data["coin_name"])) height = uint32(action_data["height"]) msg = make_msg( ProtocolMessageTypes.request_puzzle_solution, wallet_protocol.RequestPuzzleSolution(coin_name, height), ) result.append(msg) return result async def _resend_queue(self): if ( self._shut_down or self.server is None or self.wallet_state_manager is None or self.backup_initialized is None ): return None for msg, sent_peers in await self._messages_to_resend(): if ( self._shut_down or self.server is None or self.wallet_state_manager is None or self.backup_initialized is None ): return None full_nodes = self.server.get_full_node_connections() for peer in full_nodes: if peer.peer_node_id in sent_peers: continue await peer.send_message(msg) for msg in await self._action_messages(): if ( self._shut_down or self.server is None or self.wallet_state_manager is None or self.backup_initialized is None ): return None await self.server.send_to_all([msg], NodeType.FULL_NODE) async def _messages_to_resend(self) -> List[Tuple[Message, Set[bytes32]]]: if self.wallet_state_manager is None or self.backup_initialized is False or self._shut_down: return [] messages: List[Tuple[Message, Set[bytes32]]] = [] records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent() for record in records: if record.spend_bundle is None: continue msg = make_msg( ProtocolMessageTypes.send_transaction, wallet_protocol.SendTransaction(record.spend_bundle), ) already_sent = set() for peer, status, _ in record.sent_to: already_sent.add(hexstr_to_bytes(peer)) messages.append((msg, already_sent)) return messages def set_server(self, server: ChiaServer): self.server = server # TODO: perhaps use a different set of DNS seeders for wallets, to split the traffic. self.wallet_peers = WalletPeers( self.server, self.root_path, self.config["target_peer_count"], self.config["wallet_peers_path"], self.config["introducer_peer"], [], self.config["peer_connect_interval"], self.log, ) asyncio.create_task(self.wallet_peers.start()) async def on_connect(self, peer: WSChiaConnection): if self.wallet_state_manager is None or self.backup_initialized is False: return None messages_peer_ids = await self._messages_to_resend() for msg, peer_ids in messages_peer_ids: if peer.peer_node_id in peer_ids: continue await peer.send_message(msg) if not self.has_full_node() and self.wallet_peers is not None: asyncio.create_task(self.wallet_peers.on_connect(peer)) async def _periodically_check_full_node(self) -> None: tries = 0 while not self._shut_down and tries < 5: if self.has_full_node(): await self.wallet_peers.ensure_is_closed() break tries += 1 await asyncio.sleep(self.config["peer_connect_interval"]) def has_full_node(self) -> bool: if self.server is None: return False if "full_node_peer" in self.config: full_node_peer = PeerInfo( self.config["full_node_peer"]["host"], self.config["full_node_peer"]["port"], ) peers = [c.get_peer_info() for c in self.server.get_full_node_connections()] full_node_resolved = PeerInfo(socket.gethostbyname(full_node_peer.host), full_node_peer.port) if full_node_peer in peers or full_node_resolved in peers: self.log.info(f"Will not attempt to connect to other nodes, already connected to {full_node_peer}") for connection in self.server.get_full_node_connections(): if ( connection.get_peer_info() != full_node_peer and connection.get_peer_info() != full_node_resolved ): self.log.info(f"Closing unnecessary connection to {connection.get_peer_info()}.") asyncio.create_task(connection.close()) return True return False async def complete_blocks(self, header_blocks: List[HeaderBlock], peer: WSChiaConnection): if self.wallet_state_manager is None: return None header_block_records: List[HeaderBlockRecord] = [] assert self.server trusted = self.server.is_trusted_peer(peer, self.config["trusted_peers"]) async with self.wallet_state_manager.blockchain.lock: for block in header_blocks: if block.is_transaction_block: # Find additions and removals (additions, removals,) = await self.wallet_state_manager.get_filter_additions_removals( block, block.transactions_filter, None ) # Get Additions added_coins = await self.get_additions(peer, block, additions) if added_coins is None: raise ValueError("Failed to fetch additions") # Get removals removed_coins = await self.get_removals(peer, block, added_coins, removals) if removed_coins is None: raise ValueError("Failed to fetch removals") hbr = HeaderBlockRecord(block, added_coins, removed_coins) else: hbr = HeaderBlockRecord(block, [], []) header_block_records.append(hbr) ( result, error, fork_h, ) = await self.wallet_state_manager.blockchain.receive_block(hbr, trusted=trusted) if result == ReceiveBlockResult.NEW_PEAK: if not self.wallet_state_manager.sync_mode: self.wallet_state_manager.blockchain.clean_block_records() self.wallet_state_manager.state_changed("new_block") self.wallet_state_manager.state_changed("sync_changed") elif result == ReceiveBlockResult.INVALID_BLOCK: self.log.info(f"Invalid block from peer: {peer.get_peer_info()} {error}") await peer.close() return None else: self.log.debug(f"Result: {result}") async def new_peak_wallet(self, peak: wallet_protocol.NewPeakWallet, peer: WSChiaConnection): if self.wallet_state_manager is None: return None curr_peak = self.wallet_state_manager.blockchain.get_peak() if curr_peak is not None and curr_peak.weight >= peak.weight: return None if self.new_peak_lock is None: self.new_peak_lock = asyncio.Lock() async with self.new_peak_lock: request = wallet_protocol.RequestBlockHeader(peak.height) response: Optional[RespondBlockHeader] = await peer.request_block_header(request) if response is None or not isinstance(response, RespondBlockHeader) or response.header_block is None: return None header_block = response.header_block if (curr_peak is None and header_block.height < self.constants.WEIGHT_PROOF_RECENT_BLOCKS) or ( curr_peak is not None and curr_peak.height > header_block.height - 200 ): top = header_block blocks = [top] # Fetch blocks backwards until we hit the one that we have, # then complete them with additions / removals going forward while not self.wallet_state_manager.blockchain.contains_block(top.prev_header_hash) and top.height > 0: request_prev = wallet_protocol.RequestBlockHeader(top.height - 1) response_prev: Optional[RespondBlockHeader] = await peer.request_block_header(request_prev) if response_prev is None: return None if not isinstance(response_prev, RespondBlockHeader): return None prev_head = response_prev.header_block blocks.append(prev_head) top = prev_head blocks.reverse() await self.complete_blocks(blocks, peer) await self.wallet_state_manager.create_more_puzzle_hashes() elif header_block.height >= self.constants.WEIGHT_PROOF_RECENT_BLOCKS: # Request weight proof # Sync if PoW validates if self.wallet_state_manager.sync_mode: self.last_new_peak_messages.put(peer, peak) return None weight_request = RequestProofOfWeight(header_block.height, header_block.header_hash) weight_proof_response: RespondProofOfWeight = await peer.request_proof_of_weight( weight_request, timeout=360 ) if weight_proof_response is None: return None weight_proof = weight_proof_response.wp if self.wallet_state_manager is None: return None if self.server is not None and self.server.is_trusted_peer(peer, self.config["trusted_peers"]): valid, fork_point = self.wallet_state_manager.weight_proof_handler.get_fork_point_no_validations( weight_proof ) else: valid, fork_point, _ = await self.wallet_state_manager.weight_proof_handler.validate_weight_proof( weight_proof ) if not valid: self.log.error( f"invalid weight proof, num of epochs {len(weight_proof.sub_epochs)}" f" recent blocks num ,{len(weight_proof.recent_chain_data)}" ) self.log.debug(f"{weight_proof}") return None self.log.info(f"Validated, fork point is {fork_point}") self.wallet_state_manager.sync_store.add_potential_fork_point( header_block.header_hash, uint32(fork_point) ) self.wallet_state_manager.sync_store.add_potential_peak(header_block) self.start_sync() def start_sync(self) -> None: self.log.info("self.sync_event.set()") self.sync_event.set() async def check_new_peak(self) -> None: if self.wallet_state_manager is None: return None current_peak: Optional[BlockRecord] = self.wallet_state_manager.blockchain.get_peak() if current_peak is None: return None potential_peaks: List[ Tuple[bytes32, HeaderBlock] ] = self.wallet_state_manager.sync_store.get_potential_peaks_tuples() for _, block in potential_peaks: if current_peak.weight < block.weight: await asyncio.sleep(5) self.start_sync() return None async def sync_job(self) -> None: while True: self.log.info("Loop start in sync job") if self._shut_down is True: break asyncio.create_task(self.check_new_peak()) await self.sync_event.wait() self.last_new_peak_messages = LRUCache(5) self.sync_event.clear() if self._shut_down is True: break try: assert self.wallet_state_manager is not None self.wallet_state_manager.set_sync_mode(True) await self._sync() except Exception as e: tb = traceback.format_exc() self.log.error(f"Loop exception in sync {e}. {tb}") finally: if self.wallet_state_manager is not None: self.wallet_state_manager.set_sync_mode(False) for peer, peak in self.last_new_peak_messages.cache.items(): asyncio.create_task(self.new_peak_wallet(peak, peer)) self.log.info("Loop end in sync job") async def _sync(self) -> None: """ Wallet has fallen far behind (or is starting up for the first time), and must be synced up to the LCA of the blockchain. """ if self.wallet_state_manager is None or self.backup_initialized is False or self.server is None: return None highest_weight: uint128 = uint128(0) peak_height: uint32 = uint32(0) peak: Optional[HeaderBlock] = None potential_peaks: List[ Tuple[bytes32, HeaderBlock] ] = self.wallet_state_manager.sync_store.get_potential_peaks_tuples() self.log.info(f"Have collected {len(potential_peaks)} potential peaks") for header_hash, potential_peak_block in potential_peaks: if potential_peak_block.weight > highest_weight: highest_weight = potential_peak_block.weight peak_height = potential_peak_block.height peak = potential_peak_block if peak_height is None or peak_height == 0: return None if self.wallet_state_manager.peak is not None and highest_weight <= self.wallet_state_manager.peak.weight: self.log.info("Not performing sync, already caught up.") return None peers: List[WSChiaConnection] = self.server.get_full_node_connections() if len(peers) == 0: self.log.info("No peers to sync to") return None async with self.wallet_state_manager.blockchain.lock: fork_height = None if peak is not None: fork_height = self.wallet_state_manager.sync_store.get_potential_fork_point(peak.header_hash) our_peak_height = self.wallet_state_manager.blockchain.get_peak_height() ses_heigths = self.wallet_state_manager.blockchain.get_ses_heights() if len(ses_heigths) > 2 and our_peak_height is not None: ses_heigths.sort() max_fork_ses_height = ses_heigths[-3] # This is fork point in SES in case where fork was not detected if ( self.wallet_state_manager.blockchain.get_peak_height() is not None and fork_height == max_fork_ses_height ): peers = self.server.get_full_node_connections() for peer in peers: # Grab a block at peak + 1 and check if fork point is actually our current height potential_height = uint32(our_peak_height + 1) block_response: Optional[Any] = await peer.request_header_blocks( wallet_protocol.RequestHeaderBlocks(potential_height, potential_height) ) if block_response is not None and isinstance( block_response, wallet_protocol.RespondHeaderBlocks ): our_peak = self.wallet_state_manager.blockchain.get_peak() if ( our_peak is not None and block_response.header_blocks[0].prev_header_hash == our_peak.header_hash ): fork_height = our_peak_height break if fork_height is None: fork_height = uint32(0) await self.wallet_state_manager.blockchain.warmup(fork_height) batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS advanced_peak = False for i in range(max(0, fork_height - 1), peak_height, batch_size): start_height = i end_height = min(peak_height, start_height + batch_size) peers = self.server.get_full_node_connections() added = False for peer in peers: try: added, advanced_peak = await self.fetch_blocks_and_validate( peer, uint32(start_height), uint32(end_height), None if advanced_peak else fork_height ) if added: break except Exception as e: await peer.close() exc = traceback.format_exc() self.log.error(f"Error while trying to fetch from peer:{e} {exc}") if not added: raise RuntimeError(f"Was not able to add blocks {start_height}-{end_height}") peak = self.wallet_state_manager.blockchain.get_peak() assert peak is not None self.wallet_state_manager.blockchain.clean_block_record( min( end_height - self.constants.BLOCKS_CACHE_SIZE, peak.height - self.constants.BLOCKS_CACHE_SIZE, ) ) async def fetch_blocks_and_validate( self, peer: WSChiaConnection, height_start: uint32, height_end: uint32, fork_point_with_peak: Optional[uint32], ) -> Tuple[bool, bool]: """ Returns whether the blocks validated, and whether the peak was advanced """ if self.wallet_state_manager is None: return False, False self.log.info(f"Requesting blocks {height_start}-{height_end}") request = RequestHeaderBlocks(uint32(height_start), uint32(height_end)) res: Optional[RespondHeaderBlocks] = await peer.request_header_blocks(request) if res is None or not isinstance(res, RespondHeaderBlocks): raise ValueError("Peer returned no response") header_blocks: List[HeaderBlock] = res.header_blocks advanced_peak = False if header_blocks is None: raise ValueError(f"No response from peer {peer}") if ( self.full_node_peer is not None and peer.peer_host == self.full_node_peer.host or peer.peer_host == "127.0.0.1" ): trusted = True pre_validation_results: Optional[List[PreValidationResult]] = None else: trusted = False pre_validation_results = await self.wallet_state_manager.blockchain.pre_validate_blocks_multiprocessing( header_blocks ) if pre_validation_results is None: return False, advanced_peak assert len(header_blocks) == len(pre_validation_results) for i in range(len(header_blocks)): header_block = header_blocks[i] if not trusted and pre_validation_results is not None and pre_validation_results[i].error is not None: raise ValidationError(Err(pre_validation_results[i].error)) fork_point_with_old_peak = None if advanced_peak else fork_point_with_peak if header_block.is_transaction_block: # Find additions and removals (additions, removals,) = await self.wallet_state_manager.get_filter_additions_removals( header_block, header_block.transactions_filter, fork_point_with_old_peak ) # Get Additions added_coins = await self.get_additions(peer, header_block, additions) if added_coins is None: raise ValueError("Failed to fetch additions") # Get removals removed_coins = await self.get_removals(peer, header_block, added_coins, removals) if removed_coins is None: raise ValueError("Failed to fetch removals") header_block_record = HeaderBlockRecord(header_block, added_coins, removed_coins) else: header_block_record = HeaderBlockRecord(header_block, [], []) start_t = time.time() if trusted: (result, error, fork_h,) = await self.wallet_state_manager.blockchain.receive_block( header_block_record, None, trusted, fork_point_with_old_peak ) else: assert pre_validation_results is not None (result, error, fork_h,) = await self.wallet_state_manager.blockchain.receive_block( header_block_record, pre_validation_results[i], trusted, fork_point_with_old_peak ) self.log.debug( f"Time taken to validate {header_block.height} with fork " f"{fork_point_with_old_peak}: {time.time() - start_t}" ) if result == ReceiveBlockResult.NEW_PEAK: advanced_peak = True self.wallet_state_manager.state_changed("new_block") elif result == ReceiveBlockResult.INVALID_BLOCK: raise ValueError("Value error peer sent us invalid block") if advanced_peak: await self.wallet_state_manager.create_more_puzzle_hashes() return True, advanced_peak def validate_additions( self, coins: List[Tuple[bytes32, List[Coin]]], proofs: Optional[List[Tuple[bytes32, bytes, Optional[bytes]]]], root, ): if proofs is None: # Verify root additions_merkle_set = MerkleSet() # Addition Merkle set contains puzzlehash and hash of all coins with that puzzlehash for puzzle_hash, coins_l in coins: additions_merkle_set.add_already_hashed(puzzle_hash) additions_merkle_set.add_already_hashed(hash_coin_list(coins_l)) additions_root = additions_merkle_set.get_root() if root != additions_root: return False else: for i in range(len(coins)): assert coins[i][0] == proofs[i][0] coin_list_1: List[Coin] = coins[i][1] puzzle_hash_proof: bytes32 = proofs[i][1] coin_list_proof: Optional[bytes32] = proofs[i][2] if len(coin_list_1) == 0: # Verify exclusion proof for puzzle hash not_included = confirm_not_included_already_hashed( root, coins[i][0], puzzle_hash_proof, ) if not_included is False: return False else: try: # Verify inclusion proof for coin list included = confirm_included_already_hashed( root, hash_coin_list(coin_list_1), coin_list_proof, ) if included is False: return False except AssertionError: return False try: # Verify inclusion proof for puzzle hash included = confirm_included_already_hashed( root, coins[i][0], puzzle_hash_proof, ) if included is False: return False except AssertionError: return False return True def validate_removals(self, coins, proofs, root): if proofs is None: # If there are no proofs, it means all removals were returned in the response. # we must find the ones relevant to our wallets. # Verify removals root removals_merkle_set = MerkleSet() for name_coin in coins: # TODO review all verification name, coin = name_coin if coin is not None: removals_merkle_set.add_already_hashed(coin.name()) removals_root = removals_merkle_set.get_root() if root != removals_root: return False else: # This means the full node has responded only with the relevant removals # for our wallet. Each merkle proof must be verified. if len(coins) != len(proofs): return False for i in range(len(coins)): # Coins are in the same order as proofs if coins[i][0] != proofs[i][0]: return False coin = coins[i][1] if coin is None: # Verifies merkle proof of exclusion not_included = confirm_not_included_already_hashed( root, coins[i][0], proofs[i][1], ) if not_included is False: return False else: # Verifies merkle proof of inclusion of coin name if coins[i][0] != coin.name(): return False included = confirm_included_already_hashed( root, coin.name(), proofs[i][1], ) if included is False: return False return True async def get_additions(self, peer: WSChiaConnection, block_i, additions) -> Optional[List[Coin]]: if len(additions) > 0: additions_request = RequestAdditions(block_i.height, block_i.header_hash, additions) additions_res: Optional[Union[RespondAdditions, RejectAdditionsRequest]] = await peer.request_additions( additions_request ) if additions_res is None: await peer.close() return None elif isinstance(additions_res, RespondAdditions): validated = self.validate_additions( additions_res.coins, additions_res.proofs, block_i.foliage_transaction_block.additions_root, ) if not validated: await peer.close() return None added_coins = [] for ph_coins in additions_res.coins: ph, coins = ph_coins added_coins.extend(coins) return added_coins elif isinstance(additions_res, RejectRemovalsRequest): await peer.close() return None return None else: added_coins = [] return added_coins async def get_removals(self, peer: WSChiaConnection, block_i, additions, removals) -> Optional[List[Coin]]: assert self.wallet_state_manager is not None request_all_removals = False # Check if we need all removals for coin in additions: puzzle_store = self.wallet_state_manager.puzzle_store record_info: Optional[DerivationRecord] = await puzzle_store.get_derivation_record_for_puzzle_hash( coin.puzzle_hash.hex() ) if record_info is not None and record_info.wallet_type == WalletType.COLOURED_COIN: # TODO why ? request_all_removals = True break if record_info is not None and record_info.wallet_type == WalletType.DISTRIBUTED_ID: request_all_removals = True break if len(removals) > 0 or request_all_removals: if request_all_removals: removals_request = wallet_protocol.RequestRemovals(block_i.height, block_i.header_hash, None) else: removals_request = wallet_protocol.RequestRemovals(block_i.height, block_i.header_hash, removals) removals_res: Optional[Union[RespondRemovals, RejectRemovalsRequest]] = await peer.request_removals( removals_request ) if removals_res is None: return None elif isinstance(removals_res, RespondRemovals): validated = self.validate_removals( removals_res.coins, removals_res.proofs, block_i.foliage_transaction_block.removals_root, ) if validated is False: await peer.close() return None removed_coins = [] for _, coins_l in removals_res.coins: if coins_l is not None: removed_coins.append(coins_l) return removed_coins elif isinstance(removals_res, RejectRemovalsRequest): return None else: return None else: return []