python-trezor/trezorctl

608 lines
19 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
2016-11-25 13:53:55 -08:00
# This file is part of the TREZOR project.
#
# Copyright (C) 2012-2016 Marek Palatinus <slush@satoshilabs.com>
# Copyright (C) 2012-2016 Pavol Rusnak <stick@satoshilabs.com>
# Copyright (C) 2016 Jochen Hoenicke <hoenicke@gmail.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
import binascii
import json
import base64
2017-07-03 08:45:56 -07:00
import click
2017-07-01 08:59:11 -07:00
from trezorlib.client import TrezorClient, TrezorClientVerbose, CallException
import trezorlib.types_pb2 as types
2017-07-01 08:59:11 -07:00
def get_transport_class_by_name(name):
2017-07-01 08:59:11 -07:00
if name == 'usb':
from trezorlib.transport_hid import HidTransport
return HidTransport
2017-07-01 08:59:11 -07:00
if name == 'udp':
2016-04-29 17:37:18 -07:00
from trezorlib.transport_udp import UdpTransport
2017-07-01 08:59:11 -07:00
return UdpTransport
2016-04-29 17:37:18 -07:00
2017-07-01 08:59:11 -07:00
if name == 'pipe':
from trezorlib.transport_pipe import PipeTransport
2017-07-01 08:59:11 -07:00
return PipeTransport
2017-07-01 08:59:11 -07:00
if name == 'bridge':
from trezorlib.transport_bridge import BridgeTransport
2017-07-01 08:59:11 -07:00
return BridgeTransport
raise NotImplementedError('Unknown transport: "%s"' % name)
2016-06-30 07:47:17 -07:00
2017-07-01 08:59:11 -07:00
def get_transport(transport_name, path):
transport = get_transport_class_by_name(transport_name)
dev = transport.find_by_path(path)
return dev
2017-07-03 08:45:56 -07:00
@click.group()
@click.option('-t', '--transport', type=click.Choice(['usb', 'udp', 'pipe', 'bridge']), default='usb', help='Select transport used for communication.')
@click.option('-p', '--path', help='Select device by transport-specific path.')
@click.option('-v', '--verbose', is_flag=True, help='Show communication messages.')
@click.option('-j', '--json', 'is_json', is_flag=True, help='Print result as JSON object')
@click.pass_context
def cli(ctx, transport, path, verbose, is_json):
if ctx.invoked_subcommand == 'list':
ctx.obj = transport
else:
t = get_transport(transport, path)
if verbose:
ctx.obj = TrezorClientVerbose(t)
else:
ctx.obj = TrezorClient(t)
@cli.resultcallback()
def print_result(res, transport, path, verbose, is_json):
if is_json:
from google.protobuf import json_format, message
if isinstance(res, message.Message):
click.echo(json_format.MessageToJson(res, preserving_proto_field_name=True))
else:
click.echo(json.dumps(res, sort_keys=True, indent=4))
else:
if isinstance(res, list):
for line in res:
click.echo(line)
elif isinstance(res, dict):
for k, v in res.items():
click.echo('%s: %s' % (k, v))
else:
click.echo(res)
2017-07-03 08:45:56 -07:00
#
# Common functions
#
2017-07-03 08:45:56 -07:00
@cli.command(name='list', help='List connected TREZOR devices.')
@click.pass_obj
def ls(transport_name):
transport_class = get_transport_class_by_name(transport_name)
devices = transport_class.enumerate()
if transport_name == 'usb':
return [dev[0] for dev in devices]
if transport_name == 'bridge':
return devices
return []
2016-05-27 00:05:54 -07:00
2017-07-03 08:45:56 -07:00
#
# Basic device functions
#
2017-07-03 08:45:56 -07:00
@cli.command(help='Send ping message.')
@click.argument('message')
@click.option('-b', '--button-protection', is_flag=True)
@click.option('-p', '--pin-protection', is_flag=True)
@click.option('-r', '--passphrase-protection', is_flag=True)
@click.pass_obj
def ping(client, message, button_protection, pin_protection, passphrase_protection):
return client.ping(message, button_protection=button_protection, pin_protection=pin_protection, passphrase_protection=passphrase_protection)
2016-05-27 00:05:54 -07:00
2017-07-03 08:45:56 -07:00
@cli.command(help='Clear session (remove cached PIN, passphrase, etc.).')
@click.pass_obj
def clear_session(client):
return client.clear_session()
2017-07-03 08:45:56 -07:00
@cli.command(help='Get example entropy.')
@click.argument('size', type=int)
@click.pass_obj
def get_entropy(client, size):
return binascii.hexlify(client.get_entropy(size))
2017-07-03 08:45:56 -07:00
@cli.command(help='Retrieve device features and settings.')
@click.pass_obj
def get_features(client):
return client.features
2017-07-03 08:45:56 -07:00
@cli.command(help='List all supported coin types by the device.')
@click.pass_obj
def list_coins(client):
return [coin.coin_name for coin in client.features.coins]
2017-07-03 08:45:56 -07:00
#
# Device management functions
#
@cli.command(help='Change new PIN or remove existing.')
@click.option('-r', '--remove', is_flag=True)
@click.pass_obj
def change_pin(client, remove):
return client.change_pin(remove)
2017-07-03 08:45:56 -07:00
@cli.command(help='Enable passphrase.')
@click.pass_obj
def enable_passphrase(client):
return client.apply_settings(use_passphrase=True)
2017-07-03 08:45:56 -07:00
@cli.command(help='Disable passphrase.')
@click.pass_obj
def disable_passphrase(client):
return client.apply_settings(use_passphrase=False)
2017-07-03 08:45:56 -07:00
@cli.command(help='Set new device label.')
@click.option('-l', '--label')
@click.pass_obj
def set_label(client, label):
return client.apply_settings(label=label)
2017-07-03 08:45:56 -07:00
@cli.command(help='Set new homescreen.')
@click.option('-f', '--filename', default=None)
@click.pass_obj
def set_homescreen(client, filename):
if filename is not None:
from PIL import Image
im = Image.open(filename)
if im.size != (128, 64):
raise CallException(types.Failure_DataError, 'Wrong size of the image')
im = im.convert('1')
pix = im.load()
img = ''
for j in range(64):
for i in range(128):
img += '1' if pix[i, j] else '0'
img = ''.join(chr(int(img[i:i + 8], 2)) for i in range(0, len(img), 8))
else:
img = '\x00'
return client.apply_settings(homescreen=img)
2017-07-03 08:45:56 -07:00
@cli.command(help='Set U2F counter.')
@click.argument('counter', type=int)
@click.pass_obj
def set_u2f_counter(client, counter):
return client.set_u2f_counter(counter)
2017-07-03 08:45:56 -07:00
@cli.command(help='Reset device to factory defaults and remove all private data.')
@click.pass_obj
def wipe_device(client):
return client.wipe_device()
2017-07-03 08:45:56 -07:00
@cli.command(help='Load custom configuration to the device.')
@click.option('-m', '--mnemonic')
@click.option('-e', '--expand', is_flag=True)
@click.option('-x', '--xprv')
@click.option('-p', '--pin', default='')
@click.option('-r', '--passphrase-protection', is_flag=True)
@click.option('-l', '--label', default='')
@click.option('-s', '--skip-checksum', is_flag=True)
@click.pass_obj
def load_device(client, mnemonic, expand, xprv, pin, passphrase_protection, label, skip_checksum):
if not mnemonic and not xprv:
raise CallException(types.Failure_DataError, 'Please provide mnemonic or xprv')
if mnemonic:
return client.load_device_by_mnemonic(
2017-07-03 08:45:56 -07:00
mnemonic,
pin,
passphrase_protection,
label,
'english',
skip_checksum,
expand
)
if xprv:
return client.load_device_by_xprv(
2017-07-03 08:45:56 -07:00
xprv,
pin,
passphrase_protection,
label,
'english'
)
@cli.command(help='Start safe recovery workflow.')
@click.option('-w', '--words', type=click.Choice(['12', '18', '24']), default='24')
2017-07-03 08:45:56 -07:00
@click.option('-e', '--expand', is_flag=True)
@click.option('-p', '--pin-protection', is_flag=True)
@click.option('-r', '--passphrase-protection', is_flag=True)
@click.option('-l', '--label')
@click.option('-t', '--type', 'rec_type', type=click.Choice(['scrambled', 'matrix']), default='scrambled')
@click.option('-d', '--dry-run', is_flag=True)
@click.pass_obj
def recovery_device(client, words, expand, pin_protection, passphrase_protection, label, rec_type, dry_run):
typemap = {
'scrambled': types.RecoveryDeviceType_ScrambledWords,
'matrix': types.RecoveryDeviceType_Matrix
}
return client.recovery_device(
int(words),
2017-07-03 08:45:56 -07:00
passphrase_protection,
pin_protection,
label,
'english',
typemap[rec_type],
expand,
dry_run
)
2017-07-03 08:45:56 -07:00
@cli.command(help='Perform device setup and generate new seed.')
@click.option('-t', '--strength', type=click.Choice(['128', '192', '256']), default='256')
2017-07-03 08:45:56 -07:00
@click.option('-p', '--pin-protection', is_flag=True)
@click.option('-r', '--passphrase-protection', is_flag=True)
@click.option('-l', '--label')
@click.option('-u', '--u2f-counter', default=0)
@click.option('-s', '--skip-backup', is_flag=True)
@click.pass_obj
def reset_device(client, strength, pin_protection, passphrase_protection, label, u2f_counter, skip_backup):
return client.reset_device(
2017-07-03 08:45:56 -07:00
True,
int(strength),
2017-07-03 08:45:56 -07:00
passphrase_protection,
pin_protection,
label,
'english',
u2f_counter,
skip_backup
)
2017-07-03 08:45:56 -07:00
@cli.command(help='Perform device seed backup.')
@click.pass_obj
def backup_device(client):
return client.backup_device()
2017-07-03 08:45:56 -07:00
#
# Firmware update
#
2017-07-03 08:45:56 -07:00
@cli.command(help='Upload new firmware to device (must be in bootloader mode).')
@click.option('-f', '--filename')
@click.option('-u', '--url')
@click.option('-v', '--version')
@click.option('-s', '--skip-check', is_flag=True)
@click.pass_obj
def firmware_update(client, filename, url, version, skip_check):
if filename:
fp = open(filename, 'rb').read()
elif url:
import requests
click.echo('Downloading from', url)
r = requests.get(url)
fp = r.content
else:
import requests
r = requests.get('https://wallet.trezor.io/data/firmware/releases.json')
releases = r.json()
def version_func(r):
return r['version']
def version_string(r):
return '.'.join(map(str, version_func(r)))
if version:
release = next((r for r in releases if version_string(r) == version))
else:
release = max(releases, key=version_func)
click.echo('Fetching version: %s' % version_string(release))
click.echo('Firmware fingerprint: %s' % release['fingerprint'])
url = 'https://wallet.trezor.io/' + release['url']
click.echo('Downloading from %s' % url)
r = requests.get(url)
fp = r.content
2017-07-03 08:45:56 -07:00
if not skip_check:
if fp[:8] == b'54525a52' or fp[:8] == b'54525a56':
fp = binascii.unhexlify(fp)
if fp[:4] != b'TRZR' and fp[:4] != b'TRZV':
raise CallException(types.Failure_FirmwareError, 'TREZOR firmware header expected')
2017-07-03 08:45:56 -07:00
click.echo('Please confirm action on device...')
2017-07-03 08:45:56 -07:00
from io import BytesIO
return client.firmware_update(fp=BytesIO(fp))
2017-07-03 08:45:56 -07:00
@cli.command(help='Perform a self-test.')
@click.pass_obj
def self_test(client):
return client.self_test()
2017-07-03 08:45:56 -07:00
#
# Basic coin functions
#
2016-06-12 14:49:52 -07:00
2017-07-03 09:49:03 -07:00
2017-07-03 08:45:56 -07:00
@cli.command(help='Get address for specified path.')
@click.option('-c', '--coin', default='Bitcoin')
@click.option('-n', '--address')
@click.option('-t', '--script-type', type=click.Choice(['address', 'segwit', 'p2shsegwit']), default='address')
@click.option('-d', '--show-display', is_flag=True)
@click.pass_obj
def get_address(client, coin, address, script_type, show_display):
address_n = client.expand_path(address)
typemap = {
'address': types.SPENDADDRESS,
'segwit': types.SPENDWITNESS,
'p2shsegwit': types.SPENDP2SHWITNESS,
}
script_type = typemap[script_type]
return client.get_address(coin, address_n, show_display, script_type=script_type)
2017-07-03 08:45:56 -07:00
@cli.command(help='Get public node of given path.')
@click.option('-c', '--coin', default='Bitcoin')
@click.option('-n', '-address')
@click.option('-e', '--curve')
@click.option('-d', '--show-display', is_flag=True)
@click.pass_obj
def get_public_node(client, coin, address, curve, show_display):
address_n = client.expand_path(address)
return client.get_public_node(address_n, ecdsa_curve_name=curve, show_display=show_display, coin_name=coin)
2017-07-03 08:45:56 -07:00
#
# Message functions
#
@cli.command(help='Sign message using address of given path.')
@click.option('-c', '--coin', default='Bitcoin')
@click.option('-n', '-address')
@click.argument('message')
@click.pass_obj
def sign_message(client, coin, address, message):
address_n = client.expand_path(address)
res = client.sign_message(coin, address_n, message)
return {
2017-07-03 08:45:56 -07:00
'message': message,
'address': res.address,
'signature': base64.b64encode(res.signature)
}
@cli.command(help='Verify message.')
@click.option('-c', '--coin', default='Bitcoin')
@click.argument('address')
@click.argument('signature')
@click.argument('message')
@click.pass_obj
def verify_message(client, coin, address, signature, message):
signature = base64.b64decode(signature)
return client.verify_message(coin, address, signature, message)
2017-07-03 08:45:56 -07:00
@cli.command(help='Encrypt value by given key and path.')
@click.option('-n', '-address')
@click.argument('key')
@click.argument('value')
@click.pass_obj
def encrypt_keyvalue(client, address, key, value):
address_n = client.expand_path(address)
res = client.encrypt_keyvalue(address_n, key, value)
return binascii.hexlify(res)
2017-07-03 08:45:56 -07:00
@cli.command(help='Decrypt value by given key and path.')
@click.option('-n', '-address')
@click.argument('key')
@click.argument('value')
@click.pass_obj
def decrypt_keyvalue(client, address, key, value):
address_n = client.expand_path(address)
return client.decrypt_keyvalue(address_n, key, value.decode('hex'))
2017-07-03 08:45:56 -07:00
@cli.command(help='Encrypt message.')
@click.option('-c', '--coin', default='Bitcoin')
@click.option('-d', '--display-only', is_flag=True)
@click.option('-n', '-address')
@click.argument('pubkey')
@click.argument('message')
@click.pass_obj
def encrypt_message(client, coin, display_only, address, pubkey, message):
pubkey = binascii.unhexlify(pubkey)
address_n = client.expand_path(address)
res = client.encrypt_message(pubkey, message, display_only, coin, address_n)
return {
2017-07-03 08:45:56 -07:00
'nonce': binascii.hexlify(res.nonce),
'message': binascii.hexlify(res.message),
'hmac': binascii.hexlify(res.hmac),
'payload': base64.b64encode(res.nonce + res.message + res.hmac),
}
@cli.command(help='Decrypt message.')
@click.option('-n', '-address')
@click.argument('payload')
@click.pass_obj
def decrypt_message(client, address, payload):
address_n = client.expand_path(address)
payload = base64.b64decode(payload)
nonce, message, msg_hmac = payload[:33], payload[33:-8], payload[-8:]
return client.decrypt_message(address_n, nonce, message, msg_hmac)
2017-07-03 08:45:56 -07:00
#
# Ethereum functions
#
2017-07-03 08:45:56 -07:00
@cli.command(help='Get Ethereum address in hex encoding.')
@click.option('-n', '-address')
@click.option('-d', '--show-display', is_flag=True)
@click.pass_obj
def ethereum_get_address(client, address, show_display):
address_n = client.expand_path(address)
address = client.ethereum_get_address(address_n, show_display)
2017-07-11 10:37:25 -07:00
return '0x%s' % binascii.hexlify(str(address))
2017-07-03 08:45:56 -07:00
@cli.command(help='Sign (and optionally publish) Ethereum transaction. Use TO as destination address or set TO to "" for contract creation.')
@click.option('-a', '--host', default='localhost:8545', help='RPC port of ethereum node for automatic gas/nonce estimation')
@click.option('-c', '--chain-id', type=int, help='EIP-155 chain id (replay protection)')
@click.option('-n', '-address', help='BIP-32 path to signing key')
@click.option('-v', '--value', default='0', help='Ether amount to transfer, e.g. "100 milliether"')
@click.option('-g', '--gas-limit', type=int, help='Gas limit - Required for offline signing')
@click.option('-t', '--gas-price', help='Gas price, e.g. "20 nanoether" - Required for offline signing')
@click.option('-i', '--nonce', type=int, help='Transaction counter - Required for offline signing')
@click.option('-d', '--data', default='', help='Data as hex string, e.g. 0x12345678')
@click.option('-p', '--publish', is_flag=True, help='Publish transaction via RPC')
@click.argument('to')
@click.pass_obj
def ethereum_sign_tx(client, host, chain_id, address, value, gas_limit, gas_price, nonce, data, publish, to):
from ethjsonrpc import EthJsonRpc
import rlp
ether_units = {
'wei': 1,
'kwei': 1000,
'babbage': 1000,
'femtoether': 1000,
'mwei': 1000000,
'lovelace': 1000000,
'picoether': 1000000,
'gwei': 1000000000,
'shannon': 1000000000,
'nanoether': 1000000000,
'nano': 1000000000,
'szabo': 1000000000000,
'microether': 1000000000000,
'micro': 1000000000000,
'finney': 1000000000000000,
'milliether': 1000000000000000,
'milli': 1000000000000000,
'ether': 1000000000000000000,
'eth': 1000000000000000000,
}
if ' ' in value:
value, unit = value.split(' ', 1)
if unit.lower() not in ether_units:
raise CallException(types.Failure_DataError, 'Unrecognized ether unit %r' % unit)
value = int(value) * ether_units[unit.lower()]
else:
value = int(value)
if gas_price is not None:
if ' ' in gas_price:
gas_price, unit = gas_price.split(' ', 1)
if unit.lower() not in ether_units:
raise CallException(types.Failure_DataError, 'Unrecognized gas price unit %r' % unit)
gas_price = int(gas_price) * ether_units[unit.lower()]
else:
2017-07-03 08:45:56 -07:00
gas_price = int(gas_price)
if gas_limit is not None:
gas_limit = int(gas_limit)
if to.startswith('0x') or to.startswith('0X'):
to_address = to[2:].decode('hex')
else:
to_address = to.decode('hex')
address_n = client.expand_path(address)
address = '0x%s' % (binascii.hexlify(client.ethereum_get_address(address_n)),)
if gas_price is None or gas_limit is None or nonce is None:
host, port = host.split(':')
eth = EthJsonRpc(host, int(port))
if not data:
data = ''
elif data.startswith('0x'):
data = data[2:]
data = binascii.unhexlify(data)
if gas_price is None:
gas_price = eth.eth_gasPrice()
if gas_limit is None:
gas_limit = eth.eth_estimateGas(
to_address=to,
from_address=address,
value=('0x%x' % value),
data='0x' + data)
if nonce is None:
nonce = eth.eth_getTransactionCount(address)
sig = client.ethereum_sign_tx(
n=address_n,
nonce=nonce,
gas_price=gas_price,
gas_limit=gas_limit,
to=to_address,
value=value,
data=data,
chain_id=chain_id)
transaction = rlp.encode(
(nonce, gas_price, gas_limit, to_address, value, data) + sig)
tx_hex = '0x%s' % binascii.hexlify(transaction)
if publish:
tx_hash = eth.eth_sendRawTransaction(tx_hex)
return 'Transaction published with ID: %s' % tx_hash
2017-07-03 08:45:56 -07:00
else:
return 'Signed raw transaction: %s' % tx_hex
2017-07-03 08:45:56 -07:00
#
# Main
#
if __name__ == '__main__':
2017-07-03 08:45:56 -07:00
cli()