import os import shutil from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml from chia import __version__ from chia.consensus.coinbase import create_puzzlehash_for_pk from chia.ssl.create_ssl import generate_ca_signed_cert, get_chia_ca_crt_key, make_ca_cert from chia.util.bech32m import encode_puzzle_hash from chia.util.config import ( create_default_chia_config, initial_config_file, load_config, save_config, unflatten_properties, ) from chia.util.ints import uint32 from chia.util.keychain import Keychain from chia.util.path import mkdir from chia.wallet.derive_keys import master_sk_to_pool_sk, master_sk_to_wallet_sk private_node_names = {"full_node", "wallet", "farmer", "harvester", "timelord", "daemon"} public_node_names = {"full_node", "wallet", "farmer", "introducer", "timelord"} def dict_add_new_default(updated: Dict, default: Dict, do_not_migrate_keys: Dict[str, Any]): for k in do_not_migrate_keys: if k in updated and do_not_migrate_keys[k] == "": updated.pop(k) for k, v in default.items(): ignore = False if k in do_not_migrate_keys: do_not_data = do_not_migrate_keys[k] if isinstance(do_not_data, dict): ignore = False else: ignore = True if isinstance(v, dict) and k in updated and ignore is False: # If there is an intermediate key with empty string value, do not migrate all descendants if do_not_migrate_keys.get(k, None) == "": do_not_migrate_keys[k] = v dict_add_new_default(updated[k], default[k], do_not_migrate_keys.get(k, {})) elif k not in updated or ignore is True: updated[k] = v def check_keys(new_root: Path) -> None: keychain: Keychain = Keychain() all_sks = keychain.get_all_private_keys() if len(all_sks) == 0: print("No keys are present in the keychain. Generate them with 'chia keys generate'") return None config: Dict = load_config(new_root, "config.yaml") pool_child_pubkeys = [master_sk_to_pool_sk(sk).get_g1() for sk, _ in all_sks] all_targets = [] stop_searching_for_farmer = "xch_target_address" not in config["farmer"] stop_searching_for_pool = "xch_target_address" not in config["pool"] number_of_ph_to_search = 500 selected = config["selected_network"] prefix = config["network_overrides"]["config"][selected]["address_prefix"] for i in range(number_of_ph_to_search): if stop_searching_for_farmer and stop_searching_for_pool and i > 0: break for sk, _ in all_sks: all_targets.append( encode_puzzle_hash(create_puzzlehash_for_pk(master_sk_to_wallet_sk(sk, uint32(i)).get_g1()), prefix) ) if all_targets[-1] == config["farmer"].get("xch_target_address"): stop_searching_for_farmer = True if all_targets[-1] == config["pool"].get("xch_target_address"): stop_searching_for_pool = True # Set the destinations if "xch_target_address" not in config["farmer"]: print(f"Setting the xch destination address for coinbase fees reward to {all_targets[0]}") config["farmer"]["xch_target_address"] = all_targets[0] elif config["farmer"]["xch_target_address"] not in all_targets: print( f"WARNING: using a farmer address which we don't have the private" f" keys for. We searched the first {number_of_ph_to_search} addresses. Consider overriding " f"{config['farmer']['xch_target_address']} with {all_targets[0]}" ) if "pool" not in config: config["pool"] = {} if "xch_target_address" not in config["pool"]: print(f"Setting the xch destination address for coinbase reward to {all_targets[0]}") config["pool"]["xch_target_address"] = all_targets[0] elif config["pool"]["xch_target_address"] not in all_targets: print( f"WARNING: using a pool address which we don't have the private" f" keys for. We searched the first {number_of_ph_to_search} addresses. Consider overriding " f"{config['pool']['xch_target_address']} with {all_targets[0]}" ) # Set the pool pks in the farmer pool_pubkeys_hex = set(bytes(pk).hex() for pk in pool_child_pubkeys) if "pool_public_keys" in config["farmer"]: for pk_hex in config["farmer"]["pool_public_keys"]: # Add original ones in config pool_pubkeys_hex.add(pk_hex) config["farmer"]["pool_public_keys"] = pool_pubkeys_hex save_config(new_root, "config.yaml", config) def copy_files_rec(old_path: Path, new_path: Path): if old_path.is_file(): print(f"{new_path}") mkdir(new_path.parent) shutil.copy(old_path, new_path) elif old_path.is_dir(): for old_path_child in old_path.iterdir(): new_path_child = new_path / old_path_child.name copy_files_rec(old_path_child, new_path_child) def migrate_from( old_root: Path, new_root: Path, manifest: List[str], do_not_migrate_settings: List[str], ): """ Copy all the files in "manifest" to the new config directory. """ if old_root == new_root: print("same as new path, exiting") return 1 if not old_root.is_dir(): print(f"{old_root} not found - this is ok if you did not install this version") return 0 print(f"\n{old_root} found") print(f"Copying files from {old_root} to {new_root}\n") for f in manifest: old_path = old_root / f new_path = new_root / f copy_files_rec(old_path, new_path) # update config yaml with new keys config: Dict = load_config(new_root, "config.yaml") config_str: str = initial_config_file("config.yaml") default_config: Dict = yaml.safe_load(config_str) flattened_keys = unflatten_properties({k: "" for k in do_not_migrate_settings}) dict_add_new_default(config, default_config, flattened_keys) save_config(new_root, "config.yaml", config) create_all_ssl(new_root) return 1 def create_all_ssl(root: Path): # remove old key and crt config_dir = root / "config" old_key_path = config_dir / "trusted.key" old_crt_path = config_dir / "trusted.crt" if old_key_path.exists(): print(f"Old key not needed anymore, deleting {old_key_path}") os.remove(old_key_path) if old_crt_path.exists(): print(f"Old crt not needed anymore, deleting {old_crt_path}") os.remove(old_crt_path) ssl_dir = config_dir / "ssl" if not ssl_dir.exists(): ssl_dir.mkdir() ca_dir = ssl_dir / "ca" if not ca_dir.exists(): ca_dir.mkdir() private_ca_key_path = ca_dir / "private_ca.key" private_ca_crt_path = ca_dir / "private_ca.crt" chia_ca_crt, chia_ca_key = get_chia_ca_crt_key() chia_ca_crt_path = ca_dir / "chia_ca.crt" chia_ca_key_path = ca_dir / "chia_ca.key" chia_ca_crt_path.write_bytes(chia_ca_crt) chia_ca_key_path.write_bytes(chia_ca_key) if not private_ca_key_path.exists() or not private_ca_crt_path.exists(): # Create private CA print(f"Can't find private CA, creating a new one in {root} to generate TLS certificates") make_ca_cert(private_ca_crt_path, private_ca_key_path) # Create private certs for each node ca_key = private_ca_key_path.read_bytes() ca_crt = private_ca_crt_path.read_bytes() generate_ssl_for_nodes(ssl_dir, ca_crt, ca_key, True) else: # This is entered when user copied over private CA print(f"Found private CA in {root}, using it to generate TLS certificates") ca_key = private_ca_key_path.read_bytes() ca_crt = private_ca_crt_path.read_bytes() generate_ssl_for_nodes(ssl_dir, ca_crt, ca_key, True) chia_ca_crt, chia_ca_key = get_chia_ca_crt_key() generate_ssl_for_nodes(ssl_dir, chia_ca_crt, chia_ca_key, False, overwrite=False) def generate_ssl_for_nodes(ssl_dir: Path, ca_crt: bytes, ca_key: bytes, private: bool, overwrite=True): if private: names = private_node_names else: names = public_node_names for node_name in names: node_dir = ssl_dir / node_name if not node_dir.exists(): node_dir.mkdir() if private: prefix = "private" else: prefix = "public" key_path = node_dir / f"{prefix}_{node_name}.key" crt_path = node_dir / f"{prefix}_{node_name}.crt" if key_path.exists() and crt_path.exists() and overwrite is False: continue generate_ca_signed_cert(ca_crt, ca_key, crt_path, key_path) def copy_cert_files(cert_path: Path, new_path: Path): for ext in "*.crt", "*.key": for old_path_child in cert_path.glob(ext): new_path_child = new_path / old_path_child.name copy_files_rec(old_path_child, new_path_child) def init(create_certs: Optional[Path], root_path: Path): if create_certs is not None: if root_path.exists(): if os.path.isdir(create_certs): ca_dir: Path = root_path / "config/ssl/ca" if ca_dir.exists(): print(f"Deleting your OLD CA in {ca_dir}") shutil.rmtree(ca_dir) print(f"Copying your CA from {create_certs} to {ca_dir}") copy_cert_files(create_certs, ca_dir) create_all_ssl(root_path) else: print(f"** Directory {create_certs} does not exist **") else: print(f"** {root_path} does not exist. Executing core init **") # sanity check here to prevent infinite recursion if chia_init(root_path) == 0 and root_path.exists(): return init(create_certs, root_path) print(f"** {root_path} was not created. Exiting **") return -1 else: return chia_init(root_path) def chia_version_number() -> Tuple[str, str, str, str]: scm_full_version = __version__ left_full_version = scm_full_version.split("+") version = left_full_version[0].split(".") scm_major_version = version[0] scm_minor_version = version[1] if len(version) > 2: smc_patch_version = version[2] patch_release_number = smc_patch_version else: smc_patch_version = "" major_release_number = scm_major_version minor_release_number = scm_minor_version dev_release_number = "" # If this is a beta dev release - get which beta it is if "0b" in scm_minor_version: original_minor_ver_list = scm_minor_version.split("0b") major_release_number = str(1 - int(scm_major_version)) # decrement the major release for beta minor_release_number = scm_major_version patch_release_number = original_minor_ver_list[1] if smc_patch_version and "dev" in smc_patch_version: dev_release_number = "." + smc_patch_version elif "0rc" in version[1]: original_minor_ver_list = scm_minor_version.split("0rc") major_release_number = str(1 - int(scm_major_version)) # decrement the major release for release candidate minor_release_number = str(int(scm_major_version) + 1) # RC is 0.2.1 for RC 1 patch_release_number = original_minor_ver_list[1] if smc_patch_version and "dev" in smc_patch_version: dev_release_number = "." + smc_patch_version else: major_release_number = scm_major_version minor_release_number = scm_minor_version patch_release_number = smc_patch_version dev_release_number = "" install_release_number = major_release_number + "." + minor_release_number if len(patch_release_number) > 0: install_release_number += "." + patch_release_number if len(dev_release_number) > 0: install_release_number += dev_release_number return major_release_number, minor_release_number, patch_release_number, dev_release_number def chia_minor_release_number(): res = int(chia_version_number()[2]) print(f"Install release number: {res}") return res def chia_full_version_str() -> str: major, minor, patch, dev = chia_version_number() return f"{major}.{minor}.{patch}{dev}" def chia_init(root_path: Path): if os.environ.get("CHIA_ROOT", None) is not None: print( f"warning, your CHIA_ROOT is set to {os.environ['CHIA_ROOT']}. " f"Please unset the environment variable and run chia init again\n" f"or manually migrate config.yaml" ) print(f"Chia directory {root_path}") if root_path.is_dir() and Path(root_path / "config" / "config.yaml").exists(): # This is reached if CHIA_ROOT is set, or if user has run chia init twice # before a new update. check_keys(root_path) print(f"{root_path} already exists, no migration action taken") return -1 create_default_chia_config(root_path) create_all_ssl(root_path) check_keys(root_path) print("") print("To see your keys, run 'chia keys show --show-mnemonic-seed'") return 0