Combine keepkey and trezor wallet code

Essentially identical so silly to have two copies.
Also fix a double-dialog during sign bug that caused one to not
disappear (on MacOSX at least).
This commit is contained in:
Neil Booth 2015-12-26 16:25:10 +09:00
parent 96c7f9e7cb
commit 3a1bb5b61a
5 changed files with 205 additions and 365 deletions

View File

@ -25,6 +25,7 @@ import time
import json
import copy
from functools import partial
from struct import pack
from i18n import _
from util import NotEnoughFunds, PrintError, profiler
@ -33,7 +34,7 @@ from bitcoin import *
from account import *
from version import *
from transaction import Transaction
from transaction import Transaction, is_extended_pubkey, x_to_xpub
from plugins import run_hook
import bitcoin
from coinchooser import COIN_CHOOSERS
@ -1705,6 +1706,177 @@ class BIP32_HD_Wallet(BIP32_Wallet):
def accounts_all_used(self):
return all(self.account_is_used(acc_id) for acc_id in self.accounts)
class BIP32_Hardware_Wallet(BIP32_HD_Wallet):
# Derived classes must set:
# - device
# - wallet_type
# - root_derivation
# - plugin implementing handler, sign_transaction() and get_client()
def __init__(self, storage):
BIP32_HD_Wallet.__init__(self, storage)
self.mpk = None
self.checked_device = False
self.proper_device = False
def give_error(self, message):
self.print_error(message)
raise Exception(message)
def get_action(self):
if not self.accounts:
return 'create_accounts'
def can_import(self):
return False
def can_sign_xpubkey(self, x_pubkey):
xpub, sequence = BIP32_Account.parse_xpubkey(x_pubkey)
return xpub in self.master_public_keys.values()
def can_export(self):
return False
def is_watching_only(self):
return self.checked_device and not self.proper_device
def can_create_accounts(self):
return True
def can_change_password(self):
return False
def get_client(self):
return self.plugin.get_client()
def prefix(self):
return "/".join(self.root_derivation.split("/")[1:])
def account_derivation(self, account_id):
return self.prefix() + "/" + account_id + "'"
def address_id(self, address):
acc_id, (change, address_index) = self.get_address_index(address)
account_derivation = self.account_derivation(acc_id)
return "%s/%d/%d" % (account_derivation, change, address_index)
def mnemonic_to_seed(self, mnemonic, passphrase):
# trezor uses bip39
import pbkdf2, hashlib, hmac
PBKDF2_ROUNDS = 2048
mnemonic = unicodedata.normalize('NFKD', ' '.join(mnemonic.split()))
passphrase = unicodedata.normalize('NFKD', passphrase)
return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase,
iterations = PBKDF2_ROUNDS, macmodule = hmac,
digestmodule = hashlib.sha512).read(64)
def derive_xkeys(self, root, derivation, password):
x = self.master_private_keys.get(root)
if x:
root_xprv = pw_decode(x, password)
xprv, xpub = bip32_private_derivation(root_xprv, root, derivation)
return xpub, xprv
else:
derivation = derivation.replace(self.root_name, self.prefix()+"/")
xpub = self.get_public_key(derivation)
return xpub, None
def get_public_key(self, bip32_path):
address_n = self.get_client().expand_path(bip32_path)
node = self.get_client().get_public_node(address_n).node
xpub = ("0488B21E".decode('hex') + chr(node.depth)
+ self.i4b(node.fingerprint) + self.i4b(node.child_num)
+ node.chain_code + node.public_key)
return EncodeBase58Check(xpub)
def get_master_public_key(self):
if not self.mpk:
self.mpk = self.get_public_key(self.prefix())
return self.mpk
def i4b(self, x):
return pack('>I', x)
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(_('Decrypt method is not implemented'))
def sign_message(self, address, message, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_message(self, address, message, password)
self.check_proper_device()
try:
address_path = self.address_id(address)
address_n = self.get_client().expand_path(address_path)
except Exception as e:
self.give_error(e)
try:
msg_sig = self.get_client().sign_message('Bitcoin', address_n,
message)
except Exception as e:
self.give_error(e)
finally:
self.plugin.handler.stop()
return msg_sig.signature
def sign_transaction(self, tx, password):
if tx.is_complete() or self.is_watching_only():
return
if self.has_seed():
return BIP32_HD_Wallet.sign_transaction(self, tx, password)
self.check_proper_device()
# previous transactions used as inputs
prev_tx = {}
# path of the xpubs that are involved
xpub_path = {}
for txin in tx.inputs:
tx_hash = txin['prevout_hash']
ptx = self.transactions.get(tx_hash)
if ptx is None:
ptx = self.network.synchronous_get(('blockchain.transaction.get', [tx_hash]))
ptx = Transaction(ptx)
prev_tx[tx_hash] = ptx
for x_pubkey in txin['x_pubkeys']:
if not is_extended_pubkey(x_pubkey):
continue
xpub = x_to_xpub(x_pubkey)
for k, v in self.master_public_keys.items():
if v == xpub:
acc_id = re.match("x/(\d+)'", k).group(1)
xpub_path[xpub] = self.account_derivation(acc_id)
self.plugin.sign_transaction(tx, prev_tx, xpub_path)
def is_proper_device(self):
self.get_client().ping('t')
if not self.checked_device:
address = self.addresses(False)[0]
address_id = self.address_id(address)
n = self.get_client().expand_path(address_id)
device_address = self.get_client().get_address('Bitcoin', n)
self.checked_device = True
self.proper_device = (device_address == address)
return self.proper_device
def check_proper_device(self):
if not self.is_proper_device():
self.give_error(_('Wrong device or password'))
def sanity_check(self):
try:
self.get_client().ping('t')
except BaseException as e:
return _("%s device not detected. Continuing in watching-only "
"mode.") % self.device + "\n\n" + str(e)
if self.addresses() and not self.is_proper_device():
return _("This wallet does not match your %s device") % self.device
return None
class NewWallet(BIP32_Wallet, Mnemonic):
# Standard wallet

View File

@ -1,25 +1,16 @@
from binascii import unhexlify
from struct import pack
from sys import stderr
from time import sleep
import unicodedata
import threading
import re
from functools import partial
import electrum
from electrum import bitcoin
from electrum.account import BIP32_Account
from electrum.bitcoin import EncodeBase58Check, public_key_to_bc_address, bc_address_to_hash_160, xpub_from_pubkey
from electrum.bitcoin import bc_address_to_hash_160, xpub_from_pubkey
from electrum.i18n import _
from electrum.plugins import BasePlugin, hook, always_hook, run_hook
from electrum.transaction import Transaction, deserialize, is_extended_pubkey, x_to_xpub
from electrum.wallet import BIP32_HD_Wallet
from electrum.util import print_error, print_msg
from electrum.wallet import pw_decode, bip32_private_derivation, bip32_root
from electrum.plugins import BasePlugin, hook
from electrum.transaction import deserialize, is_extended_pubkey
from electrum.wallet import BIP32_Hardware_Wallet
from electrum.util import print_error
try:
from keepkeylib.client import types
@ -41,167 +32,10 @@ def give_error(message):
raise Exception(message)
class KeepKeyWallet(BIP32_HD_Wallet):
class KeepKeyWallet(BIP32_Hardware_Wallet):
wallet_type = 'keepkey'
root_derivation = "m/44'/0'"
def __init__(self, storage):
BIP32_HD_Wallet.__init__(self, storage)
self.mpk = None
self.device_checked = False
self.proper_device = False
self.force_watching_only = False
def get_action(self):
if not self.accounts:
return 'create_accounts'
def can_import(self):
return False
def can_sign_xpubkey(self, x_pubkey):
xpub, sequence = BIP32_Account.parse_xpubkey(x_pubkey)
return xpub in self.master_public_keys.values()
def can_export(self):
return False
def can_create_accounts(self):
return True
def can_change_password(self):
return False
def is_watching_only(self):
return self.force_watching_only
def get_client(self):
return self.plugin.get_client()
def address_id(self, address):
account_id, (change, address_index) = self.get_address_index(address)
return "44'/0'/%s'/%d/%d" % (account_id, change, address_index)
def mnemonic_to_seed(self, mnemonic, passphrase):
# keepkey uses bip39
import pbkdf2, hashlib, hmac
PBKDF2_ROUNDS = 2048
mnemonic = unicodedata.normalize('NFKD', ' '.join(mnemonic.split()))
passphrase = unicodedata.normalize('NFKD', passphrase)
return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase, iterations = PBKDF2_ROUNDS, macmodule = hmac, digestmodule = hashlib.sha512).read(64)
def derive_xkeys(self, root, derivation, password):
x = self.master_private_keys.get(root)
if x:
root_xprv = pw_decode(x, password)
xprv, xpub = bip32_private_derivation(root_xprv, root, derivation)
return xpub, xprv
else:
derivation = derivation.replace(self.root_name,"44'/0'/")
xpub = self.get_public_key(derivation)
return xpub, None
def get_public_key(self, bip32_path):
address_n = self.plugin.get_client().expand_path(bip32_path)
node = self.plugin.get_client().get_public_node(address_n).node
xpub = "0488B21E".decode('hex') + chr(node.depth) + self.i4b(node.fingerprint) + self.i4b(node.child_num) + node.chain_code + node.public_key
return EncodeBase58Check(xpub)
def get_master_public_key(self):
if not self.mpk:
self.mpk = self.get_public_key("44'/0'")
return self.mpk
def i4b(self, x):
return pack('>I', x)
def add_keypairs(self, tx, keypairs, password):
#do nothing - no priv keys available
pass
def decrypt_message(self, pubkey, message, password):
raise BaseException( _('Decrypt method is not implemented in KeepKey') )
#address = public_key_to_bc_address(pubkey.decode('hex'))
#address_path = self.address_id(address)
#address_n = self.get_client().expand_path(address_path)
#try:
# decrypted_msg = self.get_client().decrypt_message(address_n, b64decode(message))
#except Exception, e:
# give_error(e)
#finally:
# twd.stop()
#return str(decrypted_msg)
def sign_message(self, address, message, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_message(self, address, message, password)
if not self.check_proper_device():
give_error('Wrong device or password')
try:
address_path = self.address_id(address)
address_n = self.plugin.get_client().expand_path(address_path)
except Exception, e:
give_error(e)
try:
msg_sig = self.plugin.get_client().sign_message('Bitcoin', address_n, message)
except Exception, e:
give_error(e)
finally:
self.plugin.handler.stop()
return msg_sig.signature
def sign_transaction(self, tx, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_transaction(self, tx, password)
if tx.is_complete():
return
if not self.check_proper_device():
give_error('Wrong device or password')
# previous transactions used as inputs
prev_tx = {}
# path of the xpubs that are involved
xpub_path = {}
for txin in tx.inputs:
tx_hash = txin['prevout_hash']
ptx = self.transactions.get(tx_hash)
if ptx is None:
ptx = self.network.synchronous_get(('blockchain.transaction.get', [tx_hash]))
ptx = Transaction(ptx)
prev_tx[tx_hash] = ptx
for x_pubkey in txin['x_pubkeys']:
account_derivation = None
if not is_extended_pubkey(x_pubkey):
continue
xpub = x_to_xpub(x_pubkey)
for k, v in self.master_public_keys.items():
if v == xpub:
account_id = re.match("x/(\d+)'", k).group(1)
account_derivation = "44'/0'/%s'"%account_id
if account_derivation is not None:
xpub_path[xpub] = account_derivation
self.plugin.sign_transaction(tx, prev_tx, xpub_path)
def check_proper_device(self):
self.get_client().ping('t')
if not self.device_checked:
address = self.addresses(False)[0]
address_id = self.address_id(address)
n = self.get_client().expand_path(address_id)
device_address = self.get_client().get_address('Bitcoin', n)
self.device_checked = True
if device_address != address:
self.proper_device = False
else:
self.proper_device = True
return self.proper_device
device = 'KeepKey'
class KeepKeyPlugin(BasePlugin):
@ -277,8 +111,7 @@ class KeepKeyPlugin(BasePlugin):
def show_address(self, address):
if not self.wallet.check_proper_device():
give_error('Wrong device or password')
self.wallet.check_proper_device()
try:
address_path = self.wallet.address_id(address)
address_n = self.get_client().expand_path(address_path)

View File

@ -23,15 +23,9 @@ class Plugin(KeepKeyPlugin):
window.statusBar().addPermanentWidget(self.keepkey_button)
if self.handler is None:
self.handler = KeepKeyQtHandler(window)
try:
self.get_client().ping('t')
except BaseException as e:
window.show_error(_('KeepKey device not detected.\nContinuing in watching-only mode.\nReason:\n' + str(e)))
self.wallet.force_watching_only = True
return
if self.wallet.addresses() and not self.wallet.check_proper_device():
window.show_error(_("This wallet does not match your KeepKey device"))
self.wallet.force_watching_only = True
msg = self.wallet.sanity_check()
if msg:
window.show_error(msg)
@hook
def installwizard_load_wallet(self, wallet, window):
@ -161,6 +155,8 @@ class KeepKeyQtHandler:
self.done.set()
def message_dialog(self):
# Called more than once during signing, to confirm output and fee
self.dialog_stop()
self.d = WindowModalDialog(self.win, _('Please Check KeepKey Device'))
l = QLabel(self.message)
vbox = QVBoxLayout(self.d)
@ -173,4 +169,6 @@ class KeepKeyQtHandler:
self.d.show()
def dialog_stop(self):
self.d.hide()
if self.d:
self.d.hide()
self.d = None

View File

@ -23,6 +23,7 @@ class TrezorQtHandler:
self.win.connect(win, SIGNAL('pin_dialog'), self.pin_dialog)
self.win.connect(win, SIGNAL('passphrase_dialog'), self.passphrase_dialog)
self.done = threading.Event()
self.d = None
def stop(self):
self.win.emit(SIGNAL('trezor_done'))
@ -75,6 +76,8 @@ class TrezorQtHandler:
self.done.set()
def message_dialog(self):
# Called more than once during signing, to confirm output and fee
self.dialog_stop()
self.d = WindowModalDialog(self.win, _('Please Check Trezor Device'))
l = QLabel(self.message)
vbox = QVBoxLayout(self.d)
@ -82,8 +85,9 @@ class TrezorQtHandler:
self.d.show()
def dialog_stop(self):
self.d.hide()
if self.d:
self.d.hide()
self.d = None
class Plugin(TrezorPlugin):
@ -97,15 +101,9 @@ class Plugin(TrezorPlugin):
window.statusBar().addPermanentWidget(self.trezor_button)
if self.handler is None:
self.handler = TrezorQtHandler(window)
try:
self.get_client().ping('t')
except BaseException as e:
window.show_error(_('Trezor device not detected.\nContinuing in watching-only mode.\nReason:\n' + str(e)))
self.wallet.force_watching_only = True
return
if self.wallet.addresses() and not self.wallet.check_proper_device():
window.show_error(_("This wallet does not match your Trezor device"))
self.wallet.force_watching_only = True
msg = self.wallet.sanity_check()
if msg:
window.show_error(msg)
@hook
def installwizard_load_wallet(self, wallet, window):

View File

@ -1,23 +1,16 @@
from binascii import unhexlify
from struct import pack
from sys import stderr
from time import sleep
import unicodedata
import threading
import re
import electrum
from electrum import bitcoin
from electrum.account import BIP32_Account
from electrum.bitcoin import EncodeBase58Check, public_key_to_bc_address, bc_address_to_hash_160, xpub_from_pubkey
from electrum.bitcoin import bc_address_to_hash_160, xpub_from_pubkey
from electrum.i18n import _
from electrum.plugins import BasePlugin, hook, always_hook, run_hook
from electrum.transaction import Transaction, deserialize, is_extended_pubkey, x_to_xpub
from electrum.wallet import BIP32_HD_Wallet
from electrum.util import print_error, print_msg
from electrum.wallet import pw_decode, bip32_private_derivation, bip32_root
from electrum.plugins import BasePlugin, hook
from electrum.transaction import deserialize, is_extended_pubkey
from electrum.wallet import BIP32_Hardware_Wallet
from electrum.util import print_error
try:
from trezorlib.client import types
@ -30,9 +23,6 @@ except ImportError:
import trezorlib.ckd_public as ckd_public
def log(msg):
stderr.write("%s\n" % msg)
stderr.flush()
@ -42,161 +32,10 @@ def give_error(message):
raise Exception(message)
class TrezorWallet(BIP32_HD_Wallet):
class TrezorWallet(BIP32_Hardware_Wallet):
wallet_type = 'trezor'
root_derivation = "m/44'/0'"
def __init__(self, storage):
BIP32_HD_Wallet.__init__(self, storage)
self.mpk = None
self.device_checked = False
self.proper_device = False
self.force_watching_only = False
def get_action(self):
if not self.accounts:
return 'create_accounts'
def can_import(self):
return False
def can_sign_xpubkey(self, x_pubkey):
xpub, sequence = BIP32_Account.parse_xpubkey(x_pubkey)
return xpub in self.master_public_keys.values()
def can_export(self):
return False
def can_create_accounts(self):
return True
def can_change_password(self):
return False
def is_watching_only(self):
return self.force_watching_only
def get_client(self):
return self.plugin.get_client()
def address_id(self, address):
account_id, (change, address_index) = self.get_address_index(address)
return "44'/0'/%s'/%d/%d" % (account_id, change, address_index)
def mnemonic_to_seed(self, mnemonic, passphrase):
# trezor uses bip39
import pbkdf2, hashlib, hmac
PBKDF2_ROUNDS = 2048
mnemonic = unicodedata.normalize('NFKD', ' '.join(mnemonic.split()))
passphrase = unicodedata.normalize('NFKD', passphrase)
return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase, iterations = PBKDF2_ROUNDS, macmodule = hmac, digestmodule = hashlib.sha512).read(64)
def derive_xkeys(self, root, derivation, password):
x = self.master_private_keys.get(root)
if x:
root_xprv = pw_decode(x, password)
xprv, xpub = bip32_private_derivation(root_xprv, root, derivation)
return xpub, xprv
else:
derivation = derivation.replace(self.root_name,"44'/0'/")
xpub = self.get_public_key(derivation)
return xpub, None
def get_public_key(self, bip32_path):
address_n = self.plugin.get_client().expand_path(bip32_path)
node = self.plugin.get_client().get_public_node(address_n).node
xpub = "0488B21E".decode('hex') + chr(node.depth) + self.i4b(node.fingerprint) + self.i4b(node.child_num) + node.chain_code + node.public_key
return EncodeBase58Check(xpub)
def get_master_public_key(self):
if not self.mpk:
self.mpk = self.get_public_key("44'/0'")
return self.mpk
def i4b(self, x):
return pack('>I', x)
def add_keypairs(self, tx, keypairs, password):
#do nothing - no priv keys available
pass
def decrypt_message(self, pubkey, message, password):
raise BaseException( _('Decrypt method is not implemented in Trezor') )
#address = public_key_to_bc_address(pubkey.decode('hex'))
#address_path = self.address_id(address)
#address_n = self.get_client().expand_path(address_path)
#try:
# decrypted_msg = self.get_client().decrypt_message(address_n, b64decode(message))
#except Exception, e:
# give_error(e)
#finally:
# twd.stop()
#return str(decrypted_msg)
def sign_message(self, address, message, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_message(self, address, message, password)
if not self.check_proper_device():
give_error('Wrong device or password')
try:
address_path = self.address_id(address)
address_n = self.plugin.get_client().expand_path(address_path)
except Exception, e:
give_error(e)
try:
msg_sig = self.plugin.get_client().sign_message('Bitcoin', address_n, message)
except Exception, e:
give_error(e)
finally:
self.plugin.handler.stop()
return msg_sig.signature
def sign_transaction(self, tx, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_transaction(self, tx, password)
if tx.is_complete():
return
if not self.check_proper_device():
give_error('Wrong device or password')
# previous transactions used as inputs
prev_tx = {}
# path of the xpubs that are involved
xpub_path = {}
for txin in tx.inputs:
tx_hash = txin['prevout_hash']
ptx = self.transactions.get(tx_hash)
if ptx is None:
ptx = self.network.synchronous_get(('blockchain.transaction.get', [tx_hash]))
ptx = Transaction(ptx)
prev_tx[tx_hash] = ptx
for x_pubkey in txin['x_pubkeys']:
account_derivation = None
if not is_extended_pubkey(x_pubkey):
continue
xpub = x_to_xpub(x_pubkey)
for k, v in self.master_public_keys.items():
if v == xpub:
account_id = re.match("x/(\d+)'", k).group(1)
account_derivation = "44'/0'/%s'"%account_id
if account_derivation is not None:
xpub_path[xpub] = account_derivation
self.plugin.sign_transaction(tx, prev_tx, xpub_path)
def check_proper_device(self):
self.get_client().ping('t')
if not self.device_checked:
address = self.addresses(False)[0]
address_id = self.address_id(address)
n = self.get_client().expand_path(address_id)
device_address = self.get_client().get_address('Bitcoin', n)
self.device_checked = True
self.proper_device = (device_address == address)
return self.proper_device
device = 'Trezor'
class TrezorPlugin(BasePlugin):