Move hardware wallet to trezor/

On second thoughts it does go better there.
This commit is contained in:
Neil Booth 2015-12-27 18:23:46 +09:00
parent 669cf74789
commit 11e1c62f28
3 changed files with 184 additions and 181 deletions

View File

@ -24,9 +24,8 @@ import random
import time import time
import json import json
import copy import copy
import re
from functools import partial from functools import partial
from struct import pack
from unicodedata import normalize
from i18n import _ from i18n import _
from util import NotEnoughFunds, PrintError, profiler from util import NotEnoughFunds, PrintError, profiler
@ -35,7 +34,7 @@ from bitcoin import *
from account import * from account import *
from version import * from version import *
from transaction import Transaction, is_extended_pubkey, x_to_xpub from transaction import Transaction
from plugins import run_hook from plugins import run_hook
import bitcoin import bitcoin
from coinchooser import COIN_CHOOSERS from coinchooser import COIN_CHOOSERS
@ -1707,177 +1706,6 @@ class BIP32_HD_Wallet(BIP32_Wallet):
def accounts_all_used(self): def accounts_all_used(self):
return all(self.account_is_used(acc_id) for acc_id in self.accounts) return all(self.account_is_used(acc_id) for acc_id in self.accounts)
class BIP32_Hardware_Wallet(BIP32_HD_Wallet):
# Derived classes must set:
# - device
# - wallet_type
# - root_derivation
# - plugin implementing handler, sign_transaction() and get_client()
def __init__(self, storage):
BIP32_HD_Wallet.__init__(self, storage)
self.mpk = None
self.checked_device = False
self.proper_device = False
def give_error(self, message):
self.print_error(message)
raise Exception(message)
def get_action(self):
if not self.accounts:
return 'create_accounts'
def can_import(self):
return False
def can_sign_xpubkey(self, x_pubkey):
xpub, sequence = BIP32_Account.parse_xpubkey(x_pubkey)
return xpub in self.master_public_keys.values()
def can_export(self):
return False
def is_watching_only(self):
return self.checked_device and not self.proper_device
def can_create_accounts(self):
return True
def can_change_password(self):
return False
def get_client(self):
return self.plugin.get_client()
def prefix(self):
return "/".join(self.root_derivation.split("/")[1:])
def account_derivation(self, account_id):
return self.prefix() + "/" + account_id + "'"
def address_id(self, address):
acc_id, (change, address_index) = self.get_address_index(address)
account_derivation = self.account_derivation(acc_id)
return "%s/%d/%d" % (account_derivation, change, address_index)
def mnemonic_to_seed(self, mnemonic, passphrase):
# trezor uses bip39
import pbkdf2, hashlib, hmac
PBKDF2_ROUNDS = 2048
mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
passphrase = normalize('NFKD', passphrase)
return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase,
iterations = PBKDF2_ROUNDS, macmodule = hmac,
digestmodule = hashlib.sha512).read(64)
def derive_xkeys(self, root, derivation, password):
x = self.master_private_keys.get(root)
if x:
root_xprv = pw_decode(x, password)
xprv, xpub = bip32_private_derivation(root_xprv, root, derivation)
return xpub, xprv
else:
derivation = derivation.replace(self.root_name, self.prefix()+"/")
xpub = self.get_public_key(derivation)
return xpub, None
def get_public_key(self, bip32_path):
address_n = self.get_client().expand_path(bip32_path)
node = self.get_client().get_public_node(address_n).node
xpub = ("0488B21E".decode('hex') + chr(node.depth)
+ self.i4b(node.fingerprint) + self.i4b(node.child_num)
+ node.chain_code + node.public_key)
return EncodeBase58Check(xpub)
def get_master_public_key(self):
if not self.mpk:
self.mpk = self.get_public_key(self.prefix())
return self.mpk
def i4b(self, x):
return pack('>I', x)
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(_('Decrypt method is not implemented'))
def sign_message(self, address, message, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_message(self, address, message, password)
self.check_proper_device()
try:
address_path = self.address_id(address)
address_n = self.get_client().expand_path(address_path)
except Exception as e:
self.give_error(e)
try:
msg_sig = self.get_client().sign_message('Bitcoin', address_n,
message)
except Exception as e:
self.give_error(e)
finally:
self.plugin.handler.stop()
return msg_sig.signature
def sign_transaction(self, tx, password):
if tx.is_complete() or self.is_watching_only():
return
if self.has_seed():
return BIP32_HD_Wallet.sign_transaction(self, tx, password)
self.check_proper_device()
# previous transactions used as inputs
prev_tx = {}
# path of the xpubs that are involved
xpub_path = {}
for txin in tx.inputs:
tx_hash = txin['prevout_hash']
ptx = self.transactions.get(tx_hash)
if ptx is None:
ptx = self.network.synchronous_get(('blockchain.transaction.get', [tx_hash]))
ptx = Transaction(ptx)
prev_tx[tx_hash] = ptx
for x_pubkey in txin['x_pubkeys']:
if not is_extended_pubkey(x_pubkey):
continue
xpub = x_to_xpub(x_pubkey)
for k, v in self.master_public_keys.items():
if v == xpub:
acc_id = re.match("x/(\d+)'", k).group(1)
xpub_path[xpub] = self.account_derivation(acc_id)
self.plugin.sign_transaction(tx, prev_tx, xpub_path)
def is_proper_device(self):
self.get_client().ping('t')
if not self.checked_device:
address = self.addresses(False)[0]
address_id = self.address_id(address)
n = self.get_client().expand_path(address_id)
device_address = self.get_client().get_address('Bitcoin', n)
self.checked_device = True
self.proper_device = (device_address == address)
return self.proper_device
def check_proper_device(self):
if not self.is_proper_device():
self.give_error(_('Wrong device or password'))
def sanity_check(self):
try:
self.get_client().ping('t')
except BaseException as e:
return _("%s device not detected. Continuing in watching-only "
"mode.") % self.device + "\n\n" + str(e)
if self.addresses() and not self.is_proper_device():
return _("This wallet does not match your %s device") % self.device
return None
class NewWallet(BIP32_Wallet, Mnemonic): class NewWallet(BIP32_Wallet, Mnemonic):
# Standard wallet # Standard wallet

View File

@ -1,11 +1,189 @@
import re
from binascii import unhexlify from binascii import unhexlify
from struct import pack
from unicodedata import normalize
from electrum.account import BIP32_Account from electrum.account import BIP32_Account
from electrum.bitcoin import bc_address_to_hash_160, xpub_from_pubkey from electrum.bitcoin import (bc_address_to_hash_160, xpub_from_pubkey,
bip32_private_derivation, EncodeBase58Check)
from electrum.i18n import _ from electrum.i18n import _
from electrum.plugins import BasePlugin, hook from electrum.plugins import BasePlugin, hook
from electrum.transaction import deserialize, is_extended_pubkey from electrum.transaction import (deserialize, is_extended_pubkey,
Transaction, x_to_xpub)
from electrum.wallet import BIP32_HD_Wallet
class TrezorCompatibleWallet(BIP32_HD_Wallet):
# A BIP32 hierarchical deterministic wallet
#
# Derived classes must set:
# - device
# - wallet_type
root_derivation = "m/44'/0'"
def __init__(self, storage):
BIP32_HD_Wallet.__init__(self, storage)
self.mpk = None
self.checked_device = False
self.proper_device = False
def give_error(self, message):
self.print_error(message)
raise Exception(message)
def get_action(self):
if not self.accounts:
return 'create_accounts'
def can_import(self):
return False
def can_sign_xpubkey(self, x_pubkey):
xpub, sequence = BIP32_Account.parse_xpubkey(x_pubkey)
return xpub in self.master_public_keys.values()
def can_export(self):
return False
def is_watching_only(self):
return self.checked_device and not self.proper_device
def can_create_accounts(self):
return True
def can_change_password(self):
return False
def get_client(self):
return self.plugin.get_client()
def prefix(self):
return "/".join(self.root_derivation.split("/")[1:])
def account_derivation(self, account_id):
return self.prefix() + "/" + account_id + "'"
def address_id(self, address):
acc_id, (change, address_index) = self.get_address_index(address)
account_derivation = self.account_derivation(acc_id)
return "%s/%d/%d" % (account_derivation, change, address_index)
def mnemonic_to_seed(self, mnemonic, passphrase):
# trezor uses bip39
import pbkdf2, hashlib, hmac
PBKDF2_ROUNDS = 2048
mnemonic = normalize('NFKD', ' '.join(mnemonic.split()))
passphrase = normalize('NFKD', passphrase)
return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase,
iterations = PBKDF2_ROUNDS, macmodule = hmac,
digestmodule = hashlib.sha512).read(64)
def derive_xkeys(self, root, derivation, password):
x = self.master_private_keys.get(root)
if x:
root_xprv = pw_decode(x, password)
xprv, xpub = bip32_private_derivation(root_xprv, root, derivation)
return xpub, xprv
else:
derivation = derivation.replace(self.root_name, self.prefix()+"/")
xpub = self.get_public_key(derivation)
return xpub, None
def get_public_key(self, bip32_path):
address_n = self.get_client().expand_path(bip32_path)
node = self.get_client().get_public_node(address_n).node
xpub = ("0488B21E".decode('hex') + chr(node.depth)
+ self.i4b(node.fingerprint) + self.i4b(node.child_num)
+ node.chain_code + node.public_key)
return EncodeBase58Check(xpub)
def get_master_public_key(self):
if not self.mpk:
self.mpk = self.get_public_key(self.prefix())
return self.mpk
def i4b(self, x):
return pack('>I', x)
def decrypt_message(self, pubkey, message, password):
raise RuntimeError(_('Decrypt method is not implemented'))
def sign_message(self, address, message, password):
if self.has_seed():
return BIP32_HD_Wallet.sign_message(self, address, message, password)
self.check_proper_device()
try:
address_path = self.address_id(address)
address_n = self.get_client().expand_path(address_path)
except Exception as e:
self.give_error(e)
try:
msg_sig = self.get_client().sign_message('Bitcoin', address_n,
message)
except Exception as e:
self.give_error(e)
finally:
self.plugin.handler.stop()
return msg_sig.signature
def sign_transaction(self, tx, password):
if tx.is_complete() or self.is_watching_only():
return
if self.has_seed():
return BIP32_HD_Wallet.sign_transaction(self, tx, password)
self.check_proper_device()
# previous transactions used as inputs
prev_tx = {}
# path of the xpubs that are involved
xpub_path = {}
for txin in tx.inputs:
tx_hash = txin['prevout_hash']
ptx = self.transactions.get(tx_hash)
if ptx is None:
ptx = self.network.synchronous_get(('blockchain.transaction.get', [tx_hash]))
ptx = Transaction(ptx)
prev_tx[tx_hash] = ptx
for x_pubkey in txin['x_pubkeys']:
if not is_extended_pubkey(x_pubkey):
continue
xpub = x_to_xpub(x_pubkey)
for k, v in self.master_public_keys.items():
if v == xpub:
acc_id = re.match("x/(\d+)'", k).group(1)
xpub_path[xpub] = self.account_derivation(acc_id)
self.plugin.sign_transaction(tx, prev_tx, xpub_path)
def is_proper_device(self):
self.get_client().ping('t')
if not self.checked_device:
address = self.addresses(False)[0]
address_id = self.address_id(address)
n = self.get_client().expand_path(address_id)
device_address = self.get_client().get_address('Bitcoin', n)
self.checked_device = True
self.proper_device = (device_address == address)
return self.proper_device
def check_proper_device(self):
if not self.is_proper_device():
self.give_error(_('Wrong device or password'))
def sanity_check(self):
try:
self.get_client().ping('t')
except BaseException as e:
return _("%s device not detected. Continuing in watching-only "
"mode.") % self.device + "\n\n" + str(e)
if self.addresses() and not self.is_proper_device():
return _("This wallet does not match your %s device") % self.device
return None
class TrezorCompatiblePlugin(BasePlugin): class TrezorCompatiblePlugin(BasePlugin):
# Derived classes provide: # Derived classes provide:

View File

@ -1,7 +1,5 @@
from electrum.wallet import BIP32_Hardware_Wallet
from plugins.trezor.client import trezor_client_class from plugins.trezor.client import trezor_client_class
from plugins.trezor.plugin import TrezorCompatiblePlugin from plugins.trezor.plugin import TrezorCompatiblePlugin, TrezorCompatibleWallet
try: try:
from trezorlib.client import proto, BaseClient, ProtocolMixin from trezorlib.client import proto, BaseClient, ProtocolMixin
@ -10,9 +8,8 @@ except ImportError:
TREZOR = False TREZOR = False
class TrezorWallet(BIP32_Hardware_Wallet): class TrezorWallet(TrezorCompatibleWallet):
wallet_type = 'trezor' wallet_type = 'trezor'
root_derivation = "m/44'/0'"
device = 'Trezor' device = 'Trezor'