Keychain with pbkdf2 and passphrase support

This commit is contained in:
Mariano Sorgente 2020-06-27 20:48:35 +09:00 committed by Gene Hoffman
parent a1b4392bb1
commit 2fc6f89603
8 changed files with 208 additions and 111 deletions

View File

@ -63,7 +63,6 @@ def check_keys(new_root):
)
config["farmer"]["xch_target_puzzle_hash"] = all_targets[0]
elif config["farmer"]["xch_target_puzzle_hash"] not in all_targets:
print("Target:", config["farmer"]["xch_target_puzzle_hash"])
assert len(config["farmer"]["xch_target_puzzle_hash"]) == 64
print(
"WARNING: farmer using a puzzle hash which we don't have the private keys for"

View File

@ -1,11 +1,10 @@
from pathlib import Path
from blspy import ExtendedPrivateKey
from src.cmds.init import check_keys
from src.util.keychain import (
generate_mnemonic,
bytes_to_mnemonic,
Keychain,
seed_from_mnemonic,
bytes_from_mnemonic,
)
from src.types.BLSSignature import BLSPublicKey
from src.consensus.coinbase import create_puzzlehash_for_pk
@ -98,16 +97,15 @@ def add_private_key_seed(mnemonic):
"""
try:
seed = seed_from_mnemonic(mnemonic)
fingerprint = (
ExtendedPrivateKey.from_seed(seed).get_public_key().get_fingerprint()
)
entropy = bytes_from_mnemonic(mnemonic)
passphrase = ""
esk = keychain.add_private_key(entropy, passphrase)
fingerprint = esk.get_public_key().get_fingerprint()
print(
f"Adding private key with public key fingerprint {fingerprint} and mnemonic"
f"Added private key with public key fingerprint {fingerprint} and mnemonic"
)
print(f"{mnemonic_to_string(mnemonic)}")
keychain.add_private_key_seed(seed)
except ValueError as e:
print(e)
return

View File

@ -2,13 +2,12 @@ import asyncio
import logging
import time
from pathlib import Path
from blspy import ExtendedPrivateKey
from typing import List, Optional, Tuple, Dict, Callable
from src.util.byte_types import hexstr_to_bytes
from src.util.keychain import (
seed_from_mnemonic,
bytes_from_mnemonic,
generate_mnemonic,
bytes_to_mnemonic,
)
@ -470,9 +469,9 @@ class WalletRpcApi:
if "mnemonic" in request:
# Adding a key from 24 word mnemonic
mnemonic = request["mnemonic"]
seed = seed_from_mnemonic(mnemonic)
self.service.keychain.add_private_key_seed(seed)
esk = ExtendedPrivateKey.from_seed(seed)
entropy = bytes_from_mnemonic(mnemonic)
passphrase = ""
esk = self.service.keychain.add_private_key(entropy, passphrase)
else:
return {"success": False}

View File

@ -6,10 +6,11 @@ import pkg_resources
from bitstring import BitArray
from blspy import ExtendedPrivateKey, ExtendedPublicKey
from src.util.byte_types import hexstr_to_bytes
from src.util.hash import std_hash
from sys import platform
from keyrings.cryptfile.cryptfile import CryptFileKeyring
from hashlib import pbkdf2_hmac
import unicodedata
MAX_KEYS = 100
@ -33,16 +34,16 @@ def bip39_word_list() -> str:
def generate_mnemonic() -> List[str]:
seed_bytes = token_bytes(32)
mnemonic = bytes_to_mnemonic(seed_bytes)
mnemonic_bytes = token_bytes(32)
mnemonic = bytes_to_mnemonic(mnemonic_bytes)
return mnemonic
def bytes_to_mnemonic(seed_bytes: bytes):
seed_array = bytearray(seed_bytes)
def bytes_to_mnemonic(mnemonic_bytes: bytes):
seed_array = bytearray(mnemonic_bytes)
word_list = bip39_word_list().splitlines()
checksum = bytes(std_hash(seed_bytes))
checksum = bytes(std_hash(mnemonic_bytes))
seed_array.append(checksum[0])
bytes_for_mnemonic = bytes(seed_array)
@ -60,7 +61,7 @@ def bytes_to_mnemonic(seed_bytes: bytes):
return mnemonics
def seed_from_mnemonic(mnemonic: List[str]):
def bytes_from_mnemonic(mnemonic: List[str]):
word_list = {word: i for i, word in enumerate(bip39_word_list().splitlines())}
bit_array = BitArray()
for i in range(0, 24):
@ -79,6 +80,18 @@ def seed_from_mnemonic(mnemonic: List[str]):
return entropy_bytes
def entropy_to_seed(entropy: bytes, passphrase):
"""
Uses BIP39 standard to derive a seed from entropy bytes.
"""
salt_str: str = "mnemonic" + passphrase
salt = unicodedata.normalize("NFKD", salt_str).encode("utf-8")
seed = pbkdf2_hmac("sha512", entropy, salt, 2048)
assert len(seed) == 64
return seed
class Keychain:
"""
The keychain stores two types of keys: private keys, which are ExtendedPrivateKeys from blspy,
@ -98,15 +111,33 @@ class Keychain:
self.user = user
def _get_service(self):
"""
The keychain stores keys under a different name for tests.
"""
if self.testing:
return f"chia-{self.user}-test"
else:
return f"chia-{self.user}"
def _get_stored_entropy(self, user: str):
return keyring.get_password(self._get_service(), user)
def _get_pk_and_entropy(
self, user: str
) -> Optional[Tuple[ExtendedPublicKey, bytes]]:
"""
Returns the keychain conntents for a specific 'user' (key index). The contents
include an ExtendedPublicKey and the entropy required to generate the private key.
Note that generating the actual private key also requires the passphrase.
"""
epks = ExtendedPublicKey.EXTENDED_PUBLIC_KEY_SIZE
read_str = keyring.get_password(self._get_service(), user)
if read_str is None or len(read_str) == 0:
return None
str_bytes = bytes.fromhex(read_str)
return (ExtendedPublicKey.from_bytes(str_bytes[:epks]), str_bytes[epks:])
def _get_private_key_seed_user(self, index: int):
def _get_private_key_user(self, index: int):
"""
Returns the keychain user string for a key index.
"""
if self.testing:
return f"wallet-{self.user}-test-{index}"
else:
@ -114,72 +145,112 @@ class Keychain:
return f"wallet-{self.user}-raw-{index}"
def _get_free_private_key_seed_index(self) -> int:
def _get_free_private_key_index(self) -> int:
"""
Get the index of the first free spot in the keychain.
"""
index = 0
while True:
key = self._get_stored_entropy(self._get_private_key_seed_user(index))
if key is None:
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
if pkent is None:
return index
index += 1
def add_private_key_seed(self, seed: bytes):
def add_private_key(self, entropy: bytes, passphrase: str) -> ExtendedPrivateKey:
"""
Adds a private key seed to the keychain. This is the best way to add keys, since they can
be backed up to mnemonics. A seed is used to generate a BLS ExtendedPrivateKey.
Adds a private key to the keychain, with the given entropy and passphrase. The
keychain itself will store the extended public key, and the entropy bytes,
but not the passphrase.
"""
index = self._get_free_private_key_seed_index()
seed = entropy_to_seed(entropy, passphrase)
index = self._get_free_private_key_index()
key = ExtendedPrivateKey.from_seed(seed)
if key.get_public_key().get_fingerprint() in [
fingerprint = key.get_public_key().get_fingerprint()
if fingerprint in [
epk.get_public_key().get_fingerprint() for epk in self.get_all_public_keys()
]:
# Prevents duplicate add
return
return key
keyring.set_password(
self._get_service(), self._get_private_key_seed_user(index), seed.hex()
self._get_service(),
self._get_private_key_user(index),
bytes(key.get_extended_public_key()).hex() + entropy.hex(),
)
return key
def get_first_private_key(
self,
self, passphrases: List[str] = [""]
) -> Optional[Tuple[ExtendedPrivateKey, Optional[bytes]]]:
"""
Returns the first key in the keychain that has one of the passed in passphrases.
"""
index = 0
seed_hex = self._get_stored_entropy(self._get_private_key_seed_user(index))
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
while index <= MAX_KEYS:
if seed_hex is not None and len(seed_hex) > 0:
key = ExtendedPrivateKey.from_seed(hexstr_to_bytes(seed_hex))
return (key, hexstr_to_bytes(seed_hex))
if pkent is not None:
epk, ent = pkent
for pp in passphrases:
seed = entropy_to_seed(ent, pp)
key = ExtendedPrivateKey.from_seed(seed)
if key.get_extended_public_key() == epk:
return (key, ent)
index += 1
seed_hex = self._get_stored_entropy(self._get_private_key_seed_user(index))
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
return None
def get_all_private_keys(self) -> List[Tuple[ExtendedPrivateKey, bytes]]:
def get_all_private_keys(
self, passphrases: List[str] = [""]
) -> List[Tuple[ExtendedPrivateKey, bytes]]:
"""
Returns all private keys (both seed-derived keys and raw ExtendedPrivateKeys), and
the second value in the tuple is the bytes seed if it exists, otherwise None.
Returns all private keys which can be retrieved, with the given passphrases.
A tuple of key, and entropy bytes (i.e. mnemonic) is returned for each key.
"""
all_keys: List[Tuple[ExtendedPrivateKey, bytes]] = []
# Keys that have a seed are added first
index = 0
seed_hex = self._get_stored_entropy(self._get_private_key_seed_user(index))
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
while index <= MAX_KEYS:
if seed_hex is not None and len(seed_hex) > 0:
key = ExtendedPrivateKey.from_seed(hexstr_to_bytes(seed_hex))
all_keys.append((key, hexstr_to_bytes(seed_hex)))
if pkent is not None:
epk, ent = pkent
for pp in passphrases:
seed = entropy_to_seed(ent, pp)
key = ExtendedPrivateKey.from_seed(seed)
if key.get_extended_public_key() == epk:
all_keys.append((key, ent))
index += 1
seed_hex = self._get_stored_entropy(self._get_private_key_seed_user(index))
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
return all_keys
def get_all_public_keys(self) -> List[ExtendedPublicKey]:
"""
Returns all public keys (both seed-derived keys and raw keys).
Returns all extended public keys.
"""
return [sk.get_extended_public_key() for (sk, _) in self.get_all_private_keys()]
all_keys: List[Tuple[ExtendedPublicKey, bytes]] = []
index = 0
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
while index <= MAX_KEYS:
if pkent is not None:
epk, ent = pkent
all_keys.append(epk)
index += 1
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
return all_keys
def get_first_public_key(self) -> Optional[ExtendedPublicKey]:
pair = self.get_first_private_key()
if pair is None:
return None
return pair[0].get_extended_public_key()
"""
Returns the first extended public key.
"""
index = 0
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
while index <= MAX_KEYS:
if pkent is not None:
epk, ent = pkent
return epk
index += 1
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
return None
def delete_key_by_fingerprint(self, fingerprint: int):
"""
@ -187,16 +258,16 @@ class Keychain:
"""
index = 0
seed_hex = self._get_stored_entropy(self._get_private_key_seed_user(index))
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
while index <= MAX_KEYS:
if seed_hex is not None and len(seed_hex) > 0:
key = ExtendedPrivateKey.from_seed(hexstr_to_bytes(seed_hex))
if key.get_public_key().get_fingerprint() == fingerprint:
if pkent is not None:
epk, ent = pkent
if epk.get_public_key().get_fingerprint() == fingerprint:
keyring.delete_password(
self._get_service(), self._get_private_key_seed_user(index)
self._get_service(), self._get_private_key_user(index)
)
index += 1
seed_hex = self._get_stored_entropy(self._get_private_key_seed_user(index))
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
def delete_all_keys(self):
"""
@ -205,40 +276,38 @@ class Keychain:
index = 0
delete_exception = False
password = None
pkent = None
while True:
try:
password = self._get_stored_entropy(
self._get_private_key_seed_user(index)
)
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
keyring.delete_password(
self._get_service(), self._get_private_key_seed_user(index)
self._get_service(), self._get_private_key_user(index)
)
except Exception:
# Some platforms might throw on no existing key
delete_exception = True
# Stop when there are no more keys to delete
if (
password is None or len(password) == 0 or delete_exception
) and index > MAX_KEYS:
if (pkent is None or delete_exception) and index > MAX_KEYS:
break
index += 1
index = 0
delete_exception = True
password = None
pkent = None
while True:
try:
password = self._get_stored_entropy(self._get_private_key_user(index))
pkent = self._get_fingerprint_and_entropy(
self._get_private_key_user(index)
)
keyring.delete_password(
self._get_service(), self._get_private_key_user(index)
)
except Exception:
# Some platforms might throw on no existing key
delete_exception = True
# Stop when there are no more keys to delete
if (
password is None or len(password) == 0 or delete_exception
) and index > MAX_KEYS:
if (pkent is None or delete_exception) and index > MAX_KEYS:
break
index += 1

View File

@ -92,8 +92,16 @@ class BlockTools:
self.keychain = Keychain("testing-1.8", True)
self.keychain.delete_all_keys()
self.keychain.add_private_key_seed(b"block_tools farmer key")
self.keychain.add_private_key_seed(b"block_tools pool key")
self.farmer_pk = (
self.keychain.add_private_key(b"block_tools farmer key", "")
.public_child(0)
.get_public_key()
)
self.pool_pk = (
self.keychain.add_private_key(b"block_tools pool key", "")
.public_child(0)
.get_public_key()
)
plot_dir = get_plot_dir()
mkdir(plot_dir)
@ -110,27 +118,17 @@ class BlockTools:
try:
for pn, filename in enumerate(filenames):
sk: PrivateKey = PrivateKey.from_seed(pn.to_bytes(4, "big"))
ekeys: List[ExtendedPublicKey] = self.keychain.get_all_public_keys()
assert len(ekeys) >= 2
farmer_pk: PublicKey = PublicKey.from_bytes(
bytes(ekeys[0].public_child(0).get_public_key())
)
pool_pk: PublicKey = PublicKey.from_bytes(
bytes(ekeys[1].public_child(0).get_public_key())
)
plot_public_key = ProofOfSpace.generate_plot_public_key(
sk.get_public_key(), farmer_pk
sk.get_public_key(), self.farmer_pk
)
plot_seed: bytes32 = ProofOfSpace.calculate_plot_seed(
pool_pk, plot_public_key
self.pool_pk, plot_public_key
)
self.farmer_pk = farmer_pk
self.pool_pk = pool_pk
self.farmer_ph = create_puzzlehash_for_pk(
BLSPublicKey(bytes(farmer_pk))
BLSPublicKey(bytes(self.farmer_pk))
)
self.pool_ph = create_puzzlehash_for_pk(
BLSPublicKey(bytes(pool_pk))
BLSPublicKey(bytes(self.pool_pk))
)
if not (plot_dir / filename).exists():
plotter = DiskPlotter()
@ -140,7 +138,7 @@ class BlockTools:
str(plot_dir),
str(filename),
k,
stream_plot_info(pool_pk, farmer_pk, sk),
stream_plot_info(self.pool_pk, self.farmer_pk, sk),
plot_seed,
128,
)

View File

@ -1,5 +1,5 @@
[pytest]
; logging options
log_cli = 1
log_level = INFO
log_level = WARNING
log_format = %(asctime)s %(name)s: %(levelname)s %(message)s

View File

@ -134,13 +134,15 @@ async def setup_wallet_node(
config["starting_height"] = starting_height
config["initial_num_public_keys"] = 5
key_seed = token_bytes(32)
keychain = Keychain(key_seed.hex(), True)
keychain.add_private_key_seed(key_seed)
consensus_constants = constants_for_dic(dic)
db_path_key_suffix = str(
keychain.get_first_public_key().get_public_key().get_fingerprint()
)
entropy = token_bytes(32)
keychain = Keychain(entropy.hex(), True)
keychain.add_private_key(entropy, "")
test_constants_copy = test_constants.copy()
for k in dic.keys():
test_constants_copy[k] = dic[k]
first_pk = keychain.get_first_public_key()
assert first_pk is not None
db_path_key_suffix = str(first_pk.get_public_key().get_fingerprint())
db_name = f"test-wallet-db-{port}"
db_path = bt.root_path / f"test-wallet-db-{port}-{db_path_key_suffix}"
if db_path.exists():

View File

@ -1,6 +1,12 @@
import unittest
from secrets import token_bytes
from blspy import ExtendedPrivateKey
from src.util.keychain import Keychain, generate_mnemonic, seed_from_mnemonic
from src.util.keychain import (
Keychain,
generate_mnemonic,
bytes_from_mnemonic,
entropy_to_seed,
)
class TesKeychain(unittest.TestCase):
@ -8,26 +14,26 @@ class TesKeychain(unittest.TestCase):
kc: Keychain = Keychain(testing=True)
kc.delete_all_keys()
assert kc._get_free_private_key_seed_index() == 0
assert kc._get_free_private_key_index() == 0
assert len(kc.get_all_private_keys()) == 0
assert kc.get_first_private_key() is None
assert kc.get_first_public_key() is None
mnemonic = generate_mnemonic()
seed = seed_from_mnemonic(mnemonic)
entropy = bytes_from_mnemonic(mnemonic)
mnemonic_2 = generate_mnemonic()
seed_2 = seed_from_mnemonic(mnemonic_2)
entropy_2 = bytes_from_mnemonic(mnemonic_2)
kc.add_private_key_seed(seed)
assert kc._get_free_private_key_seed_index() == 1
kc.add_private_key(entropy, "")
assert kc._get_free_private_key_index() == 1
assert len(kc.get_all_private_keys()) == 1
kc.add_private_key_seed(seed_2)
kc.add_private_key_seed(seed_2) # checks to not add duplicates
assert kc._get_free_private_key_seed_index() == 2
kc.add_private_key(entropy_2, "")
kc.add_private_key(entropy_2, "") # checks to not add duplicates
assert kc._get_free_private_key_index() == 2
assert len(kc.get_all_private_keys()) == 2
assert kc._get_free_private_key_seed_index() == 2
assert kc._get_free_private_key_index() == 2
assert len(kc.get_all_private_keys()) == 2
assert len(kc.get_all_public_keys()) == 2
assert kc.get_all_private_keys()[0] == kc.get_first_private_key()
@ -35,11 +41,37 @@ class TesKeychain(unittest.TestCase):
assert len(kc.get_all_private_keys()) == 2
seed_2 = entropy_to_seed(entropy_2, "")
seed_key_2 = ExtendedPrivateKey.from_seed(seed_2)
kc.delete_key_by_fingerprint(seed_key_2.get_public_key().get_fingerprint())
assert kc._get_free_private_key_seed_index() == 1
assert kc._get_free_private_key_index() == 1
assert len(kc.get_all_private_keys()) == 1
kc.delete_all_keys()
assert kc._get_free_private_key_seed_index() == 0
assert kc._get_free_private_key_index() == 0
assert len(kc.get_all_private_keys()) == 0
kc.add_private_key(token_bytes(32), "my passphrase")
kc.add_private_key(token_bytes(32), "")
kc.add_private_key(token_bytes(32), "third passphrase")
assert len(kc.get_all_public_keys()) == 3
assert len(kc.get_all_private_keys()) == 1
assert len(kc.get_all_private_keys(["my passphrase", ""])) == 2
assert (
len(
kc.get_all_private_keys(
["my passphrase", "", "third passphrase", "another"]
)
)
== 3
)
assert len(kc.get_all_private_keys(["my passhrase wrong"])) == 0
assert kc.get_first_private_key() is not None
assert kc.get_first_private_key(["bad passphrase"]) is None
assert kc.get_first_public_key() is not None
kc.delete_all_keys()
kc.add_private_key(token_bytes(32), "my passphrase")
assert kc.get_first_public_key() is not None