import unicodedata from hashlib import pbkdf2_hmac from secrets import token_bytes from sys import platform from typing import List, Optional, Tuple import keyring as keyring_main import pkg_resources from bitstring import BitArray from blspy import AugSchemeMPL, G1Element, PrivateKey from keyrings.cryptfile.cryptfile import CryptFileKeyring from chia.util.hash import std_hash MAX_KEYS = 100 if platform == "win32" or platform == "cygwin": import keyring.backends.Windows keyring.set_keyring(keyring.backends.Windows.WinVaultKeyring()) elif platform == "darwin": import keyring.backends.macOS keyring.set_keyring(keyring.backends.macOS.Keyring()) elif platform == "linux": keyring = CryptFileKeyring() keyring.keyring_key = "your keyring password" # type: ignore else: keyring = keyring_main def bip39_word_list() -> str: return pkg_resources.resource_string(__name__, "english.txt").decode() def generate_mnemonic() -> str: mnemonic_bytes = token_bytes(32) mnemonic = bytes_to_mnemonic(mnemonic_bytes) return mnemonic def bytes_to_mnemonic(mnemonic_bytes: bytes): if len(mnemonic_bytes) not in [16, 20, 24, 28, 32]: raise ValueError( f"Data length should be one of the following: [16, 20, 24, 28, 32], but it is {len(mnemonic_bytes)}." ) word_list = bip39_word_list().splitlines() CS = len(mnemonic_bytes) // 4 checksum = BitArray(bytes(std_hash(mnemonic_bytes)))[:CS] bitarray = BitArray(mnemonic_bytes) + checksum mnemonics = [] assert len(bitarray) % 11 == 0 for i in range(0, len(bitarray) // 11): start = i * 11 end = start + 11 bits = bitarray[start:end] m_word_position = bits.uint m_word = word_list[m_word_position] mnemonics.append(m_word) return " ".join(mnemonics) def bytes_from_mnemonic(mnemonic_str: str): mnemonic: List[str] = mnemonic_str.split(" ") if len(mnemonic) not in [12, 15, 18, 21, 24]: raise ValueError("Invalid mnemonic length") word_list = {word: i for i, word in enumerate(bip39_word_list().splitlines())} bit_array = BitArray() for i in range(0, len(mnemonic)): word = mnemonic[i] value = word_list[word] bit_array.append(BitArray(uint=value, length=11)) CS: int = len(mnemonic) // 3 ENT: int = len(mnemonic) * 11 - CS assert len(bit_array) == len(mnemonic) * 11 assert ENT % 32 == 0 entropy_bytes = bit_array[:ENT].bytes checksum_bytes = bit_array[ENT:] checksum = BitArray(std_hash(entropy_bytes))[:CS] assert len(checksum_bytes) == CS if checksum != checksum_bytes: raise ValueError("Invalid order of mnemonic words") return entropy_bytes def mnemonic_to_seed(mnemonic: str, passphrase): """ Uses BIP39 standard to derive a seed from entropy bytes. """ salt_str: str = "mnemonic" + passphrase salt = unicodedata.normalize("NFKD", salt_str).encode("utf-8") mnemonic_normalized = unicodedata.normalize("NFKD", mnemonic).encode("utf-8") seed = pbkdf2_hmac("sha512", mnemonic_normalized, salt, 2048) assert len(seed) == 64 return seed class Keychain: """ The keychain stores two types of keys: private keys, which are PrivateKeys from blspy, and private key seeds, which are bytes objects that are used as a seed to construct PrivateKeys. Private key seeds are converted to mnemonics when shown to users. Both types of keys are stored as hex strings in the python keyring, and the implementation of the keyring depends on OS. Both types of keys can be added, and get_private_keys returns a list of all keys. """ testing: bool user: str def __init__(self, user: str = "user-chia-1.8", testing: bool = False): self.testing = testing self.user = user def _get_service(self): """ The keychain stores keys under a different name for tests. """ if self.testing: return f"chia-{self.user}-test" else: return f"chia-{self.user}" def _get_pk_and_entropy(self, user: str) -> Optional[Tuple[G1Element, bytes]]: """ Returns the keychain contents for a specific 'user' (key index). The contents include an G1Element and the entropy required to generate the private key. Note that generating the actual private key also requires the passphrase. """ read_str = keyring.get_password(self._get_service(), user) if read_str is None or len(read_str) == 0: return None str_bytes = bytes.fromhex(read_str) return ( G1Element.from_bytes(str_bytes[: G1Element.SIZE]), str_bytes[G1Element.SIZE :], # flake8: noqa ) def _get_private_key_user(self, index: int): """ Returns the keychain user string for a key index. """ if self.testing: return f"wallet-{self.user}-test-{index}" else: return f"wallet-{self.user}-{index}" def _get_free_private_key_index(self) -> int: """ Get the index of the first free spot in the keychain. """ index = 0 while True: pk = self._get_private_key_user(index) pkent = self._get_pk_and_entropy(pk) if pkent is None: return index index += 1 def add_private_key(self, mnemonic: str, passphrase: str) -> PrivateKey: """ Adds a private key to the keychain, with the given entropy and passphrase. The keychain itself will store the public key, and the entropy bytes, but not the passphrase. """ seed = mnemonic_to_seed(mnemonic, passphrase) entropy = bytes_from_mnemonic(mnemonic) index = self._get_free_private_key_index() key = AugSchemeMPL.key_gen(seed) fingerprint = key.get_g1().get_fingerprint() if fingerprint in [pk.get_fingerprint() for pk in self.get_all_public_keys()]: # Prevents duplicate add return key keyring.set_password( self._get_service(), self._get_private_key_user(index), bytes(key.get_g1()).hex() + entropy.hex(), ) return key def get_first_private_key(self, passphrases: List[str] = [""]) -> Optional[Tuple[PrivateKey, bytes]]: """ Returns the first key in the keychain that has one of the passed in passphrases. """ index = 0 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) while index <= MAX_KEYS: if pkent is not None: pk, ent = pkent for pp in passphrases: mnemonic = bytes_to_mnemonic(ent) seed = mnemonic_to_seed(mnemonic, pp) key = AugSchemeMPL.key_gen(seed) if key.get_g1() == pk: return (key, ent) index += 1 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) return None def get_private_key_by_fingerprint( self, fingerprint: int, passphrases: List[str] = [""] ) -> Optional[Tuple[PrivateKey, bytes]]: """ Return first private key which have the given public key fingerprint. """ index = 0 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) while index <= MAX_KEYS: if pkent is not None: pk, ent = pkent for pp in passphrases: mnemonic = bytes_to_mnemonic(ent) seed = mnemonic_to_seed(mnemonic, pp) key = AugSchemeMPL.key_gen(seed) if pk.get_fingerprint() == fingerprint: return (key, ent) index += 1 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) return None def get_all_private_keys(self, passphrases: List[str] = [""]) -> List[Tuple[PrivateKey, bytes]]: """ Returns all private keys which can be retrieved, with the given passphrases. A tuple of key, and entropy bytes (i.e. mnemonic) is returned for each key. """ all_keys: List[Tuple[PrivateKey, bytes]] = [] index = 0 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) while index <= MAX_KEYS: if pkent is not None: pk, ent = pkent for pp in passphrases: mnemonic = bytes_to_mnemonic(ent) seed = mnemonic_to_seed(mnemonic, pp) key = AugSchemeMPL.key_gen(seed) if key.get_g1() == pk: all_keys.append((key, ent)) index += 1 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) return all_keys def get_all_public_keys(self) -> List[G1Element]: """ Returns all public keys. """ all_keys: List[Tuple[G1Element, bytes]] = [] index = 0 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) while index <= MAX_KEYS: if pkent is not None: pk, ent = pkent all_keys.append(pk) index += 1 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) return all_keys def get_first_public_key(self) -> Optional[G1Element]: """ Returns the first public key. """ index = 0 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) while index <= MAX_KEYS: if pkent is not None: pk, ent = pkent return pk index += 1 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) return None def delete_key_by_fingerprint(self, fingerprint: int): """ Deletes all keys which have the given public key fingerprint. """ index = 0 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) while index <= MAX_KEYS: if pkent is not None: pk, ent = pkent if pk.get_fingerprint() == fingerprint: keyring.delete_password(self._get_service(), self._get_private_key_user(index)) index += 1 pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) def delete_all_keys(self): """ Deletes all keys from the keychain. """ index = 0 delete_exception = False pkent = None while True: try: pkent = self._get_pk_and_entropy(self._get_private_key_user(index)) keyring.delete_password(self._get_service(), self._get_private_key_user(index)) except Exception: # Some platforms might throw on no existing key delete_exception = True # Stop when there are no more keys to delete if (pkent is None or delete_exception) and index > MAX_KEYS: break index += 1 index = 0 delete_exception = True pkent = None while True: try: pkent = self._get_pk_and_entropy( self._get_private_key_user(index) ) # changed from _get_fingerprint_and_entropy to _get_pk_and_entropy - GH keyring.delete_password(self._get_service(), self._get_private_key_user(index)) except Exception: # Some platforms might throw on no existing key delete_exception = True # Stop when there are no more keys to delete if (pkent is None or delete_exception) and index > MAX_KEYS: break index += 1