import asyncio import concurrent import logging from concurrent.futures.thread import ThreadPoolExecutor from pathlib import Path from typing import Callable, Dict, List, Optional, Set, Tuple from blspy import G1Element import chia.server.ws_connection as ws # lgtm [py/import-and-import-from] from chia.consensus.constants import ConsensusConstants from chia.plotting.plot_tools import PlotInfo from chia.plotting.plot_tools import add_plot_directory as add_plot_directory_pt from chia.plotting.plot_tools import get_plot_directories as get_plot_directories_pt from chia.plotting.plot_tools import load_plots from chia.plotting.plot_tools import remove_plot_directory as remove_plot_directory_pt log = logging.getLogger(__name__) class Harvester: provers: Dict[Path, PlotInfo] failed_to_open_filenames: Dict[Path, int] no_key_filenames: Set[Path] farmer_public_keys: List[G1Element] pool_public_keys: List[G1Element] root_path: Path _is_shutdown: bool executor: ThreadPoolExecutor state_changed_callback: Optional[Callable] cached_challenges: List constants: ConsensusConstants _refresh_lock: asyncio.Lock def __init__(self, root_path: Path, config: Dict, constants: ConsensusConstants): self.root_path = root_path # From filename to prover self.provers = {} self.failed_to_open_filenames = {} self.no_key_filenames = set() self._is_shutdown = False self.farmer_public_keys = [] self.pool_public_keys = [] self.match_str = None self.show_memo: bool = False self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=config["num_threads"]) self.state_changed_callback = None self.server = None self.constants = constants self.cached_challenges = [] self.log = log self.state_changed_callback: Optional[Callable] = None self.last_load_time: float = 0 self.plot_load_frequency = config.get("plot_loading_frequency_seconds", 120) async def _start(self): self._refresh_lock = asyncio.Lock() def _close(self): self._is_shutdown = True self.executor.shutdown(wait=True) async def _await_closed(self): pass def _set_state_changed_callback(self, callback: Callable): self.state_changed_callback = callback def _state_changed(self, change: str): if self.state_changed_callback is not None: self.state_changed_callback(change) def on_disconnect(self, connection: ws.WSChiaConnection): self.log.info(f"peer disconnected {connection.get_peer_info()}") self._state_changed("close_connection") def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]: response_plots: List[Dict] = [] for path, plot_info in self.provers.items(): prover = plot_info.prover response_plots.append( { "filename": str(path), "size": prover.get_size(), "plot-seed": prover.get_id(), "pool_public_key": plot_info.pool_public_key, "pool_contract_puzzle_hash": plot_info.pool_contract_puzzle_hash, "plot_public_key": plot_info.plot_public_key, "file_size": plot_info.file_size, "time_modified": plot_info.time_modified, } ) return ( response_plots, [str(s) for s, _ in self.failed_to_open_filenames.items()], [str(s) for s in self.no_key_filenames], ) async def refresh_plots(self): locked: bool = self._refresh_lock.locked() changed: bool = False if not locked: async with self._refresh_lock: # Avoid double refreshing of plots (changed, self.provers, self.failed_to_open_filenames, self.no_key_filenames,) = load_plots( self.provers, self.failed_to_open_filenames, self.farmer_public_keys, self.pool_public_keys, self.match_str, self.show_memo, self.root_path, ) if changed: self._state_changed("plots") def delete_plot(self, str_path: str): path = Path(str_path).resolve() if path in self.provers: del self.provers[path] # Remove absolute and relative paths if path.exists(): path.unlink() self._state_changed("plots") return True async def add_plot_directory(self, str_path: str) -> bool: add_plot_directory_pt(str_path, self.root_path) await self.refresh_plots() return True async def get_plot_directories(self) -> List[str]: return get_plot_directories_pt(self.root_path) async def remove_plot_directory(self, str_path: str) -> bool: remove_plot_directory_pt(str_path, self.root_path) return True def set_server(self, server): self.server = server