Nano S 1.4 support, cleanup Python 3 support

This commit is contained in:
BTChip github 2018-03-06 10:21:37 +01:00
parent 9914b3746a
commit e834117229
No known key found for this signature in database
GPG Key ID: 48BCF826EBFA4D17
16 changed files with 1135 additions and 216 deletions

55
ledgerblue/Dongle.py Normal file
View File

@ -0,0 +1,55 @@
"""
*******************************************************************************
* Ledger Blue
* (c) 2016 Ledger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
********************************************************************************
"""
from abc import ABCMeta, abstractmethod
from binascii import hexlify
import sys
TIMEOUT=20000
def hexstr(bstr):
if (sys.version_info.major == 3):
return hexlify(bstr).decode()
if (sys.version_info.major == 2):
return hexlify(bstr)
return "<undecoded APDU<"
class DongleWait(object):
__metaclass__ = ABCMeta
@abstractmethod
def waitFirstResponse(self, timeout):
pass
class Dongle(object):
__metaclass__ = ABCMeta
@abstractmethod
def exchange(self, apdu, timeout=TIMEOUT):
pass
@abstractmethod
def apduMaxDataSize(self):
pass
@abstractmethod
def close(self):
pass
def setWaitImpl(self, waitImpl):
self.waitImpl = waitImpl

View File

@ -30,10 +30,10 @@ device.""")
def auto_int(x):
return int(x, 0)
def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey):
def getDeployedSecretV2(dongle, masterPrivate, targetId, issuerKey):
testMaster = PrivateKey(bytes(masterPrivate))
testMasterPublic = bytearray(testMaster.pubkey.serialize(compressed=False))
targetid = bytearray(struct.pack('>I', targetid))
targetid = bytearray(struct.pack('>I', targetId))
# identify
apdu = bytearray([0xe0, 0x04, 0x00, 0x00]) + bytearray([len(targetid)]) + targetid
@ -70,6 +70,7 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey):
# walk the device certificates to retrieve the public key to use for authentication
index = 0
last_pub_key = PublicKey(binascii.unhexlify(issuerKey), raw=True)
devicePublicKey = None
while True:
if index == 0:
certificate = bytearray(dongle.exchange(bytearray.fromhex('E052000000')))
@ -86,6 +87,7 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey):
certificateSignature = last_pub_key.ecdsa_deserialize(bytes(certificateSignatureArray))
# first cert contains a header field which holds the certificate's public key role
if index == 0:
devicePublicKey = certificatePublicKey
certificateSignedData = bytearray([0x02]) + certificateHeader + certificatePublicKey
# Could check if the device certificate is signed by the issuer public key
# ephemeral key certificate
@ -99,7 +101,13 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey):
# Commit device ECDH channel
dongle.exchange(bytearray.fromhex('E053000000'))
secret = last_pub_key.ecdh(binascii.unhexlify(ephemeralPrivate.serialize()))
return secret[0:16]
if targetId&0xF == 0x2:
return secret[0:16]
elif targetId&0xF == 0x3:
ret = {}
ret['ecdh_secret'] = secret
ret['devicePublicKey'] = devicePublicKey
return ret
if __name__ == '__main__':
from .ecWrapper import PrivateKey, PublicKey
@ -123,20 +131,38 @@ if __name__ == '__main__':
args.rootPrivateKey = privateKey.serialize()
genuine = False
ui = False
customCA = False
dongle = getDongle(args.apdu)
version = None
secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId, args.issuerKey)
if secret != None:
loader = HexLoader(dongle, 0xe0, True, secret)
data = b'\xFF'
data = loader.encryptAES(data)
try:
loader.exchange(loader.cla, 0x00, 0x00, 0x00, data)
except CommException as e:
genuine = (e.sw == 0x6D00)
loader = HexLoader(dongle, 0xe0, True, secret)
version = loader.getVersion()
genuine = True
apps = loader.listApp()
while len(apps) != 0:
for app in apps:
if (app['flags'] & 0x08):
ui = True
if (app['flags'] & 0x400):
customCA = True
apps = loader.listApp(False)
except:
genuine = False
if genuine:
if ui:
print ("WARNING : Product is genuine but has a UI application loaded")
if customCA:
print ("WARNING : Product is genuine but has a Custom CA loaded")
if not ui and not customCA:
print ("Product is genuine")
print ("SE Version " + version['osVersion'])
print ("MCU Version " + version['mcuVersion'])
if 'mcuHash' in version:
print ("MCU Hash " + binascii.hexlify(version['mcuHash']).decode('ascii'))
else:
print ("Product is NOT genuine")
print ("Product is NOT genuine")

View File

@ -20,49 +20,38 @@
from abc import ABCMeta, abstractmethod
from .commException import CommException
from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU
from .Dongle import *
from binascii import hexlify
import hid
import time
import os
import sys
from .commU2F import getDongle as getDongleU2F
from .commHTTP import getDongle as getDongleHTTP
import hid
TIMEOUT=20000
try:
from smartcard.Exceptions import NoCardException
from smartcard.System import readers
from smartcard.util import toHexString, toBytes
SCARD = True
except ImportError:
SCARD = False
APDUGEN=None
if "APDUGEN" in os.environ and len(os.environ["APDUGEN"]) != 0:
APDUGEN=os.environ["APDUGEN"]
# Force use of U2F if required
U2FKEY=None
if "U2FKEY" in os.environ and len(os.environ["U2FKEY"]) != 0:
U2FKEY=os.environ["U2FKEY"]
# Force use of MCUPROXY if required
MCUPROXY=None
if "MCUPROXY" in os.environ and len(os.environ["MCUPROXY"]) != 0:
MCUPROXY=os.environ["MCUPROXY"]
def hexstr(bstr):
if (sys.version_info.major == 3):
return hexlify(bstr).decode()
if (sys.version_info.major == 2):
return hexlify(bstr)
return "<undecoded APDU<"
class DongleWait(object):
__metaclass__ = ABCMeta
@abstractmethod
def waitFirstResponse(self, timeout):
pass
class Dongle(object):
__metaclass__ = ABCMeta
@abstractmethod
def exchange(self, apdu, timeout=TIMEOUT):
pass
@abstractmethod
def close(self):
pass
def setWaitImpl(self, waitImpl):
self.waitImpl = waitImpl
# Force use of MCUPROXY if required
PCSC=None
if "PCSC" in os.environ and len(os.environ["PCSC"]) != 0:
PCSC=os.environ["PCSC"]
if PCSC:
try:
from smartcard.Exceptions import NoCardException
from smartcard.System import readers
from smartcard.util import toHexString, toBytes
except ImportError:
PCSC = False
class HIDDongleHIDAPI(Dongle, DongleWait):
@ -74,8 +63,12 @@ class HIDDongleHIDAPI(Dongle, DongleWait):
self.opened = True
def exchange(self, apdu, timeout=TIMEOUT):
if APDUGEN:
print("%s" % hexstr(apdu))
return
if self.debug:
print("=> %s" % hexstr(apdu))
print("HID => %s" % hexstr(apdu))
if self.ledger:
apdu = wrapCommandAPDU(0x0101, apdu, 64)
padSize = len(apdu) % 64
@ -86,7 +79,8 @@ class HIDDongleHIDAPI(Dongle, DongleWait):
while(offset != len(tmp)):
data = tmp[offset:offset + 64]
data = bytearray([0]) + data
self.device.write(data)
if self.device.write(data) < 0:
raise BaseException("Error while writing")
offset += 64
dataLength = 0
dataStart = 2
@ -125,7 +119,7 @@ class HIDDongleHIDAPI(Dongle, DongleWait):
sw = (result[swOffset] << 8) + result[swOffset + 1]
response = result[dataStart : dataLength + dataStart]
if self.debug:
print("<= %s%.2x" % (hexstr(response), sw))
print("HID <= %s%.2x" % (hexstr(response), sw))
if sw != 0x9000:
raise CommException("Invalid status %04x" % sw, sw, response)
return response
@ -141,6 +135,9 @@ class HIDDongleHIDAPI(Dongle, DongleWait):
time.sleep(0.0001)
return bytearray(data)
def apduMaxDataSize(self):
return 255
def close(self):
if self.opened:
try:
@ -159,11 +156,11 @@ class DongleSmartcard(Dongle):
def exchange(self, apdu, timeout=TIMEOUT):
if self.debug:
print("=> %s" % hexstr(apdu))
print("SC => %s" % hexstr(apdu))
response, sw1, sw2 = self.device.transmit(toBytes(hexlify(apdu)))
sw = (sw1 << 8) | sw2
if self.debug:
print("<= %s%.2x" % (hexstr(response).replace(" ", ""), sw))
print("SC <= %s%.2x" % (hexstr(response).replace(" ", ""), sw))
if sw != 0x9000:
raise CommException("Invalid status %04x" % sw, sw, bytearray(response))
return bytearray(response)
@ -177,18 +174,25 @@ class DongleSmartcard(Dongle):
self.opened = False
def getDongle(debug=False, selectCommand=None):
if APDUGEN:
return HIDDongleHIDAPI(None, True, debug)
if not U2FKEY is None:
return getDongleU2F(scrambleKey=U2FKEY, debug=debug)
if MCUPROXY is not None:
return getDongleHTTP(remote_host=MCUPROXY, debug=debug)
dev = None
hidDevicePath = None
ledger = True
for hidDevice in hid.enumerate(0, 0):
if hidDevice['vendor_id'] == 0x2c97:
if hidDevice['vendor_id'] == 0x2c97 and ('interface_number' not in hidDevice or hidDevice['interface_number'] == 0):
hidDevicePath = hidDevice['path']
if hidDevicePath is not None:
dev = hid.device()
dev.open_path(hidDevicePath)
dev.set_nonblocking(True)
return HIDDongleHIDAPI(dev, ledger, debug)
if SCARD:
if PCSC:
connection = None
for reader in readers():
try:

112
ledgerblue/commHTTP.py Normal file
View File

@ -0,0 +1,112 @@
"""
*******************************************************************************
* Ledger Blue
* (c) 2016 Ledger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
********************************************************************************
"""
from abc import ABCMeta, abstractmethod
from .commException import CommException
from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU
from binascii import hexlify
import time
import os
import sys
import requests
import json
def hexstr(bstr):
if (sys.version_info.major == 3):
return hexlify(bstr).decode()
if (sys.version_info.major == 2):
return hexlify(bstr)
return "<undecoded APDU<"
class HTTPProxy(object):
def __init__(self, remote_host="localhost:8081", debug=False):
self.remote_host = "http://" + remote_host
self.debug = debug
def exchange(self, apdu):
if self.debug:
print("=> %s" % hexstr(apdu))
try:
ret = requests.post(self.remote_host + "/send_apdu", params={"data": hexstr(apdu)})
while True:
ret = requests.post(self.remote_host + "/fetch_apdu")
if ret.text != "no response apdu yet":
print("<= %s" % ret.text)
break
else:
time.sleep(0.1)
return bytearray(str(ret.text).decode("hex"))
except Exception as e:
print(e)
def exchange_seph_event(self, event):
if self.debug >= 3:
print("=> %s" % hexstr(event))
try:
ret = requests.post(self.remote_host + "/send_seph_event", params={"data": event.encode("hex")})
return ret.text
except Exception as e:
print(e)
def poll_status(self):
if self.debug >= 5:
print("=> Waiting for a status")
try:
while True:
ret = requests.post(self.remote_host + "/fetch_status")
if ret.text != "no status yet":
break
else:
time.sleep(0.05)
return bytearray(str(ret.text).decode("hex"))
except Exception as e:
print(e)
def reset(self):
if self.debug:
print("=> Reset")
try:
ret = requests.post(self.remote_host + "/reset")
except Exception as e:
print(e)
def getDongle(remote_host="localhost", debug=False):
return HTTPProxy(remote_host, debug)

333
ledgerblue/commU2F.py Normal file
View File

@ -0,0 +1,333 @@
# Copyright (c) 2013 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
*******************************************************************************
* Ledger Blue
* (c) 2016 Ledger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
********************************************************************************
"""
import os
import traceback
from abc import ABCMeta, abstractmethod
from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU
from binascii import hexlify
from .Dongle import *
import binascii
import time
import sys
import hid
from u2flib_host.device import U2FDevice
from u2flib_host.yubicommon.compat import byte2int, int2byte
from u2flib_host.constants import INS_ENROLL, INS_SIGN
from u2flib_host import u2f, exc
from u2flib_host.utils import websafe_decode, websafe_encode
from hashlib import sha256
from .commException import CommException
TIMEOUT=30000
DEVICES = [
(0x1050, 0x0200), # Gnubby
(0x1050, 0x0113), # YubiKey NEO U2F
(0x1050, 0x0114), # YubiKey NEO OTP+U2F
(0x1050, 0x0115), # YubiKey NEO U2F+CCID
(0x1050, 0x0116), # YubiKey NEO OTP+U2F+CCID
(0x1050, 0x0120), # Security Key by Yubico
(0x1050, 0x0410), # YubiKey Plus
(0x1050, 0x0402), # YubiKey 4 U2F
(0x1050, 0x0403), # YubiKey 4 OTP+U2F
(0x1050, 0x0406), # YubiKey 4 U2F+CCID
(0x1050, 0x0407), # YubiKey 4 OTP+U2F+CCID
(0x2581, 0xf1d0), # Plug-Up U2F Security Key
(0x2581, 0xf1d1), # Ledger Production U2F Dongle
(0x2c97, 0x0000), # Ledger Blue
(0x2c97, 0x0001), # Ledger Nano S
(0x2c97, 0x0002), # Ledger Aramis
(0x2c97, 0x0003), # Ledger HW2
(0x2c97, 0x0004), # Ledger Blend
(0x2c97, 0xf1d0), # Plug-Up U2F Security Key
]
HID_RPT_SIZE = 64
TYPE_INIT = 0x80
U2F_VENDOR_FIRST = 0x40
CMD_INIT = 0x06
CMD_WINK = 0x08
CMD_APDU = 0x03
U2FHID_YUBIKEY_DEVICE_CONFIG = U2F_VENDOR_FIRST
STAT_ERR = 0xbf
def _read_timeout(dev, size, timeout=TIMEOUT):
if (timeout > 0):
timeout += time.time()
while timeout == 0 or time.time() < timeout:
resp = dev.read(size)
if resp:
return resp
time.sleep(0.01)
return []
class U2FHIDError(Exception):
def __init__(self, code):
super(Exception, self).__init__("U2FHIDError: 0x%02x" % code)
self.code = code
class HIDDevice(U2FDevice):
"""
U2FDevice implementation using the HID transport.
"""
def __init__(self, path):
self.path = path
self.cid = b"\xff\xff\xff\xff"
def open(self):
self.handle = hid.device()
self.handle.open_path(self.path)
self.handle.set_nonblocking(True)
self.init()
def close(self):
if hasattr(self, 'handle'):
self.handle.close()
del self.handle
def init(self):
nonce = os.urandom(8)
resp = self.call(CMD_INIT, nonce)
while resp[:8] != nonce:
print("Wrong nonce, read again...")
resp = self._read_resp(self.cid, CMD_INIT)
self.cid = resp[8:12]
def set_mode(self, mode):
data = mode + b"\x0f\x00\x00"
self.call(U2FHID_YUBIKEY_DEVICE_CONFIG, data)
def _do_send_apdu(self, apdu_data):
return self.call(CMD_APDU, apdu_data)
def wink(self):
self.call(CMD_WINK)
def _send_req(self, cid, cmd, data):
size = len(data)
bc_l = int2byte(size & 0xff)
bc_h = int2byte(size >> 8 & 0xff)
payload = cid + int2byte(TYPE_INIT | cmd) + bc_h + bc_l + \
data[:HID_RPT_SIZE - 7]
payload += b'\0' * (HID_RPT_SIZE - len(payload))
if self.handle.write([0] + [byte2int(c) for c in payload]) < 0:
raise exc.DeviceError("Cannot write to device!")
data = data[HID_RPT_SIZE - 7:]
seq = 0
while len(data) > 0:
payload = cid + int2byte(0x7f & seq) + data[:HID_RPT_SIZE - 5]
payload += b'\0' * (HID_RPT_SIZE - len(payload))
if self.handle.write([0] + [byte2int(c) for c in payload]) < 0:
raise exc.DeviceError("Cannot write to device!")
data = data[HID_RPT_SIZE - 5:]
seq += 1
def _read_resp(self, cid, cmd):
resp = b'.'
header = cid + int2byte(TYPE_INIT | cmd)
while resp and resp[:5] != header:
# allow for timeout
resp_vals = _read_timeout(self.handle, HID_RPT_SIZE)
resp = b''.join(int2byte(v) for v in resp_vals)
if resp[:5] == cid + int2byte(STAT_ERR):
raise U2FHIDError(byte2int(resp[7]))
if not resp:
raise exc.DeviceError("Invalid response from device!")
data_len = (byte2int(resp[5]) << 8) + byte2int(resp[6])
data = resp[7:min(7 + data_len, HID_RPT_SIZE)]
data_len -= len(data)
seq = 0
while data_len > 0:
resp_vals = _read_timeout(self.handle, HID_RPT_SIZE)
resp = b''.join(int2byte(v) for v in resp_vals)
if resp[:4] != cid:
raise exc.DeviceError("Wrong CID from device!")
if byte2int(resp[4:5]) != seq & 0x7f:
raise exc.DeviceError("Wrong SEQ from device!")
seq += 1
new_data = resp[5:min(5 + data_len, HID_RPT_SIZE)]
data_len -= len(new_data)
data += new_data
return data
def call(self, cmd, data=b''):
if isinstance(data, int):
data = int2byte(data)
self._send_req(self.cid, cmd, data)
return self._read_resp(self.cid, cmd)
class U2FTunnelDongle(Dongle, DongleWait):
def __init__(self, device, scrambleKey="", ledger=False, debug=False):
self.device = device
self.scrambleKey = scrambleKey
self.ledger = ledger
self.debug = debug
self.waitImpl = self
self.opened = True
self.device.open()
def exchange(self, apdu, timeout=TIMEOUT):
if self.debug:
print("U2F => %s" % hexstr(apdu))
if (len(apdu)>=256):
raise CommException("Too long APDU to transport")
# wrap apdu
i=0
keyHandle = ""
while i < len(apdu):
val = apdu[i:i+1]
if len(self.scrambleKey) > 0:
val = chr(ord(val) ^ ord(self.scrambleKey[i % len(self.scrambleKey)]))
keyHandle += val
i+=1
client_param = sha256("u2f_tunnel".encode('utf8')).digest()
app_param = sha256("u2f_tunnel".encode('utf8')).digest()
request = client_param + app_param + int2byte(len(keyHandle)) + keyHandle
#p1 = 0x07 if check_only else 0x03
p1 = 0x03
p2 = 0
response = self.device.send_apdu(INS_SIGN, p1, p2, request)
if self.debug:
print("U2F <= %s%.2x" % (hexstr(response), 0x9000))
# check replied status words of the command (within the APDU tunnel)
if hexstr(response[-2:]) != "9000":
raise CommException("Invalid status words received: " + hexstr(response[-2:]));
# api expect a byte array, remove the appended status words
return bytearray(response[:-2])
def apduMaxDataSize(self):
return 256-5
def close(self):
self.device.close()
def waitFirstResponse(self, timeout):
raise CommException("Invalid use")
def getDongles(dev_class=None, scrambleKey="", debug=False):
dev_class = dev_class or HIDDevice
devices = []
for d in hid.enumerate(0, 0):
usage_page = d['usage_page']
if usage_page == 0xf1d0 and d['usage'] == 1:
devices.append(U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug))
# Usage page doesn't work on Linux
# well known devices
elif (d['vendor_id'], d['product_id']) in DEVICES:
device = HIDDevice(d['path'])
try:
device.open()
device.close()
devices.append(U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug))
except (exc.DeviceError, IOError, OSError):
pass
# unknown devices
else:
device = HIDDevice(d['path'])
try:
device.open()
# try a ping command to ensure a FIDO device, else timeout (BEST here, modulate the timeout, 2 seconds is way too big)
device.ping()
device.close()
devices.append(U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug))
except (exc.DeviceError, IOError, OSError):
pass
return devices
def getDongle(path=None, dev_class=None, scrambleKey="", debug=False):
# if path is none, then use the first device
dev_class = dev_class or HIDDevice
devices = []
for d in hid.enumerate(0, 0):
if path is None or d['path'] == path:
usage_page = d['usage_page']
if usage_page == 0xf1d0 and d['usage'] == 1:
return U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug)
# Usage page doesn't work on Linux
# well known devices
elif (d['vendor_id'], d['product_id']) in DEVICES and ('interface_number' not in d or d['interface_number'] == 1):
#print d
device = HIDDevice(d['path'])
try:
device.open()
device.close()
return U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug)
except (exc.DeviceError, IOError, OSError):
traceback.print_exc()
pass
# unknown devices
# else:
# device = HIDDevice(d['path'])
# try:
# device.open()
# # try a ping command to ensure a FIDO device, else timeout (BEST here, modulate the timeout, 2 seconds is way too big)
# device.ping()
# device.close()
# return U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug)
# except (exc.DeviceError, IOError, OSError):
# traceback.print_exc()
# pass
raise CommException("No dongle found")

View File

@ -23,6 +23,7 @@ def get_argparser():
parser = argparse.ArgumentParser(description="Delete the app with the specified name.")
parser.add_argument("--targetId", help="The device's target ID (default is Ledger Blue)", type=auto_int)
parser.add_argument("--appName", help="The name of the application to delete")
parser.add_argument("--appHash", help="Set the application hash")
parser.add_argument("--rootPrivateKey", help="A private key used to establish a Secure Channel (hex encoded)")
parser.add_argument("--apdu", help="Display APDU log", action='store_true')
parser.add_argument("--deployLegacy", help="Use legacy deployment API", action='store_true')
@ -41,13 +42,24 @@ if __name__ == '__main__':
args = get_argparser().parse_args()
if args.appName == None:
raise Exception("Missing appName")
if args.appName == None and args.appHash == None:
raise Exception("Missing appName or appHash")
if args.appName != None and args.appHash != None:
raise Exception("Set either appName or appHash")
if args.appName != None:
if (sys.version_info.major == 3):
args.appName = bytes(args.appName,'ascii')
if (sys.version_info.major == 2):
args.appName = bytes(args.appName)
if args.appHash != None:
if (sys.version_info.major == 3):
args.appHash = bytes(args.appHash,'ascii')
if (sys.version_info.major == 2):
args.appHash = bytes(args.appHash)
args.appHash = bytearray.fromhex(args.appHash)
if (sys.version_info.major == 3):
args.appName = bytes(args.appName,'ascii')
if (sys.version_info.major == 2):
args.appName = bytes(args.appName)
if args.targetId == None:
args.targetId = 0x31000002
@ -64,4 +76,8 @@ if __name__ == '__main__':
else:
secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId)
loader = HexLoader(dongle, 0xe0, True, secret)
loader.deleteApp(args.appName)
if args.appName != None:
loader.deleteApp(args.appName)
if args.appHash != None:
loader.deleteAppByHash(args.appHash)

View File

@ -30,6 +30,9 @@ def getDeployedSecretV1(dongle, masterPrivate, targetid):
testMasterPublic = bytearray(testMaster.pubkey.serialize(compressed=False))
targetid = bytearray(struct.pack('>I', targetid))
if targetId&0xF != 0x1:
raise BaseException("Target ID does not support SCP V1")
# identify
apdu = bytearray([0xe0, 0x04, 0x00, 0x00]) + bytearray([len(targetid)]) + targetid
dongle.exchange(apdu)
@ -75,10 +78,13 @@ def getDeployedSecretV1(dongle, masterPrivate, targetid):
secret = last_pub_key.ecdh(bytes(ephemeralPrivate.serialize().decode('hex')))
return secret[0:16]
def getDeployedSecretV2(dongle, masterPrivate, targetid):
def getDeployedSecretV2(dongle, masterPrivate, targetId, signerCertChain=None, ecdh_secret_format=None):
testMaster = PrivateKey(bytes(masterPrivate))
testMasterPublic = bytearray(testMaster.pubkey.serialize(compressed=False))
targetid = bytearray(struct.pack('>I', targetid))
targetid = bytearray(struct.pack('>I', targetId))
if targetId&0xF == 0x1:
raise BaseException("Target ID does not support SCP V2")
# identify
apdu = bytearray([0xe0, 0x04, 0x00, 0x00]) + bytearray([len(targetid)]) + targetid
@ -95,13 +101,18 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid):
#if cardKey != testMasterPublic:
# raise Exception("Invalid batch public key")
print("Using test master key %s " % binascii.hexlify(testMasterPublic))
dataToSign = bytes(bytearray([0x01]) + testMasterPublic)
signature = testMaster.ecdsa_sign(bytes(dataToSign))
signature = testMaster.ecdsa_serialize(signature)
certificate = bytearray([len(testMasterPublic)]) + testMasterPublic + bytearray([len(signature)]) + signature
apdu = bytearray([0xE0, 0x51, 0x00, 0x00]) + bytearray([len(certificate)]) + certificate
dongle.exchange(apdu)
if (signerCertChain):
for cert in signerCertChain:
apdu = bytearray([0xE0, 0x51, 0x00, 0x00]) + bytearray([len(cert)]) + cert
dongle.exchange(apdu)
else:
print("Using test master key %s " % binascii.hexlify(testMasterPublic))
dataToSign = bytes(bytearray([0x01]) + testMasterPublic)
signature = testMaster.ecdsa_sign(bytes(dataToSign))
signature = testMaster.ecdsa_serialize(signature)
certificate = bytearray([len(testMasterPublic)]) + testMasterPublic + bytearray([len(signature)]) + signature
apdu = bytearray([0xE0, 0x51, 0x00, 0x00]) + bytearray([len(certificate)]) + certificate
dongle.exchange(apdu)
# provide the ephemeral certificate
ephemeralPrivate = PrivateKey()
@ -116,7 +127,8 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid):
# walk the device certificates to retrieve the public key to use for authentication
index = 0
last_pub_key = PublicKey(bytes(testMasterPublic), raw=True)
last_dev_pub_key = PublicKey(bytes(testMasterPublic), raw=True)
devicePublicKey = None
while True:
if index == 0:
certificate = bytearray(dongle.exchange(bytearray.fromhex('E052000000')))
@ -132,24 +144,33 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid):
certificatePublicKey = certificate[offset : offset + certificate[offset-1]]
offset += certificate[offset-1] + 1
certificateSignatureArray = certificate[offset : offset + certificate[offset-1]]
certificateSignature = last_pub_key.ecdsa_deserialize(bytes(certificateSignatureArray))
certificateSignature = last_dev_pub_key.ecdsa_deserialize(bytes(certificateSignatureArray))
# first cert contains a header field which holds the certificate's public key role
if index == 0:
devicePublicKey = certificatePublicKey
certificateSignedData = bytearray([0x02]) + certificateHeader + certificatePublicKey
# Could check if the device certificate is signed by the issuer public key
# ephemeral key certificate
else:
certificateSignedData = bytearray([0x12]) + deviceNonce + nonce + certificatePublicKey
if not last_pub_key.ecdsa_verify(bytes(certificateSignedData), certificateSignature):
if not last_dev_pub_key.ecdsa_verify(bytes(certificateSignedData), certificateSignature):
if index == 0:
# Not an error if loading from user key
print("Broken certificate chain - loading from user key")
else:
raise Exception("Broken certificate chain")
last_pub_key = PublicKey(bytes(certificatePublicKey), raw=True)
last_dev_pub_key = PublicKey(bytes(certificatePublicKey), raw=True)
index = index + 1
# Commit device ECDH channel
dongle.exchange(bytearray.fromhex('E053000000'))
secret = last_pub_key.ecdh(binascii.unhexlify(ephemeralPrivate.serialize()))
return secret[0:16]
secret = last_dev_pub_key.ecdh(binascii.unhexlify(ephemeralPrivate.serialize()))
#forced to specific version
if ecdh_secret_format==1 or targetId&0xF == 0x2:
return secret[0:16]
elif targetId&0xF == 0x3:
ret = {}
ret['ecdh_secret'] = secret
ret['devicePublicKey'] = devicePublicKey
return ret

View File

@ -51,7 +51,11 @@ if __name__ == '__main__':
import sys
import os
import struct
import urllib2, urlparse
if sys.version_info.major == 3:
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2, urlparse
from .BlueHSMServer_pb2 import Request, Response, Parameter
from .comm import getDongle
@ -108,13 +112,17 @@ if __name__ == '__main__':
parameter.local = False
parameter.alias = "persoKey"
parameter.name = args.perso
request.parameters = str(deviceNonce)
request.parameters = bytes(deviceNonce)
response = serverQuery(request, args.url)
offset = 0
remotePublicKeySignatureLength = ord(response.response[offset + 1]) + 2
if sys.version_info.major == 2:
responseLength = ord(response.response[offset + 1])
else:
responseLength = response.response[offset + 1]
remotePublicKeySignatureLength = responseLength + 2
remotePublicKeySignature = response.response[offset : offset + remotePublicKeySignatureLength]
certificate = bytearray([len(remotePublicKey)]) + remotePublicKey + bytearray([len(remotePublicKeySignature)]) + remotePublicKeySignature
@ -136,7 +144,7 @@ if __name__ == '__main__':
request = Request()
request.reference = "signEndorsement"
request.id = response.id
request.parameters = str(certificate)
request.parameters = bytes(certificate)
serverQuery(request, args.url)
index += 1
@ -158,7 +166,7 @@ if __name__ == '__main__':
parameter.local = False
parameter.alias = "endorsementKey"
parameter.name = args.endorsement
request.parameters = str(endorsementData)
request.parameters = bytes(endorsementData)
request.id = response.id
response = serverQuery(request, args.url)
certificate = bytearray(response.response)

View File

@ -18,22 +18,127 @@
"""
from Crypto.Cipher import AES
import sys
import struct
import hashlib
import binascii
from .ecWrapper import PrivateKey, PublicKey
from builtins import int
from ecpy.curves import Curve
import os
#from builtins import str
LOAD_SEGMENT_CHUNK_HEADER_LENGTH = 3
MIN_PADDING_LENGTH = 1
SCP_MAC_LENGTH = 0xE
BOLOS_TAG_APPNAME = 0x01
BOLOS_TAG_APPVERSION = 0x02
BOLOS_TAG_ICON = 0x03
BOLOS_TAG_DERIVEPATH = 0x04
BOLOS_TAG_DATASIZE = 0x05
BOLOS_TAG_DEPENDENCY = 0x06
def encodelv(v):
l = len(v)
s = ""
if l < 128:
s += struct.pack(">B", l)
elif l < 256:
s += struct.pack(">B", 0x81)
s += struct.pack(">B", l)
elif l < 65536:
s += struct.pack(">B", 0x82)
s += struct.pack(">H", l)
else:
raise Exception("Unimplemented LV encoding")
s += v
return s
def encodetlv(t, v):
l = len(v)
s = struct.pack(">B", t)
if l < 128:
s += struct.pack(">B", l)
elif l < 256:
s += struct.pack(">B", 0x81)
s += struct.pack(">B", l)
elif l < 65536:
s += struct.pack(">B", 0x82)
s += struct.pack(">H", l)
else:
raise Exception("Unimplemented TLV encoding")
s += v
return s
def str2bool(v):
if v is not None:
return v.lower() in ("yes", "true", "t", "1")
return False
SCP_DEBUG = str2bool(os.getenv("SCP_DEBUG"))
class HexLoader:
def __init__(self, card, cla=0xF0, secure=False, key=None, relative=True):
def scp_derive_key(self, ecdh_secret, keyindex):
retry = 0
# di = sha256(i || retrycounter || ecdh secret)
while True:
sha256 = hashlib.new('sha256')
sha256.update(struct.pack(">IB", keyindex, retry))
sha256.update(ecdh_secret)
# compare di with order
CURVE_SECP256K1 = Curve.get_curve('secp256k1')
if int.from_bytes(sha256.digest(), 'big') < CURVE_SECP256K1.order:
break
#regenerate a new di satisfying order upper bound
retry+=1
# Pi = di*G
privkey = PrivateKey(bytes(sha256.digest()))
pubkey = bytearray(privkey.pubkey.serialize(compressed=False))
# ki = sha256(Pi)
sha256 = hashlib.new('sha256')
sha256.update(pubkey)
#print ("Key " + str (keyindex) + ": " + sha256.hexdigest())
return sha256.digest()
def __init__(self, card, cla=0xF0, secure=False, mutauth_result=None, relative=True, cleardata_block_len=None):
self.card = card
self.cla = cla
self.secure = secure
self.key = key
self.iv = b"\x00" * 16
self.createappParams = None
#legacy unsecure SCP (pre nanos-1.4, pre blue-2.1)
self.max_mtu = 0xFE
if not self.card is None:
self.max_mtu = min(self.max_mtu, self.card.apduMaxDataSize())
self.scpVersion = 2
self.key = mutauth_result
self.iv = b'\x00' * 16
self.relative = relative
#store the aligned block len to be transported if requested
self.cleardata_block_len=cleardata_block_len
if not (self.cleardata_block_len is None):
if not self.card is None:
self.cleardata_block_len = min(self.cleardata_block_len, self.card.apduMaxDataSize())
# try:
if type(mutauth_result) is dict and 'ecdh_secret' in mutauth_result:
self.scp_enc_key = self.scp_derive_key(mutauth_result['ecdh_secret'], 0)[0:16]
self.scp_enc_iv = b"\x00" * 16
self.scp_mac_key = self.scp_derive_key(mutauth_result['ecdh_secret'], 1)[0:16]
self.scp_mac_iv = b"\x00" * 16
self.scpVersion = 3
self.max_mtu = 0xFE
if not self.card is None:
self.max_mtu = min(self.max_mtu, self.card.apduMaxDataSize()&0xF0)
# except:
# pass
def crc16(self, data):
@ -79,53 +184,122 @@ class HexLoader:
return crc
def exchange(self, cla, ins, p1, p2, data):
#wrap
data = self.scpWrap(data)
apdu = bytearray([cla, ins, p1, p2, len(data)]) + bytearray(data)
if self.card == None:
print("%s" % binascii.hexlify(apdu))
else:
return self.card.exchange(apdu)
# unwrap after exchanged
return self.scpUnwrap(bytes(self.card.exchange(apdu)))
def encryptAES(self, data):
if not self.secure:
def scpWrap(self, data):
if not self.secure or data is None or len(data) == 0:
return data
paddedData = data + b'\x80'
while (len(paddedData) % 16) != 0:
paddedData += b'\x00'
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
encryptedData = cipher.encrypt(paddedData)
self.iv = encryptedData[len(encryptedData) - 16:]
if self.scpVersion == 3:
if SCP_DEBUG:
print(binascii.hexlify(data))
# ENC
paddedData = data + b'\x80'
while (len(paddedData) % 16) != 0:
paddedData += b'\x00'
if SCP_DEBUG:
print(binascii.hexlify(paddedData))
cipher = AES.new(self.scp_enc_key, AES.MODE_CBC, self.scp_enc_iv)
encryptedData = cipher.encrypt(paddedData)
self.scp_enc_iv = encryptedData[-16:]
if SCP_DEBUG:
print(binascii.hexlify(encryptedData))
# MAC
cipher = AES.new(self.scp_mac_key, AES.MODE_CBC, self.scp_mac_iv)
macData = cipher.encrypt(encryptedData)
self.scp_mac_iv = macData[-16:]
# only append part of the mac
encryptedData += self.scp_mac_iv[-SCP_MAC_LENGTH:]
if SCP_DEBUG:
print(binascii.hexlify(encryptedData))
else:
paddedData = data + b'\x80'
while (len(paddedData) % 16) != 0:
paddedData += b'\x00'
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
if SCP_DEBUG:
print("wrap_old: "+binascii.hexlify(paddedData))
encryptedData = cipher.encrypt(paddedData)
self.iv = encryptedData[-16:]
#print (">>")
return encryptedData
def decryptAES(self, data):
if not self.secure or len(data) == 0:
def scpUnwrap(self, data):
if not self.secure or data is None or len(data) == 0 or len(data) == 2:
return data
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
decryptedData = cipher.decrypt(data)
l = len(decryptedData) - 1
while (decryptedData[l] != chr(0x80)):
l-=1
decryptedData = decryptedData[0:l]
self.iv = data[len(data) - 16:]
if sys.version_info.major == 3:
padding_char = 0x80
else:
padding_char = chr(0x80)
if self.scpVersion == 3:
if SCP_DEBUG:
print(binascii.hexlify(data))
# MAC
cipher = AES.new(self.scp_mac_key, AES.MODE_CBC, self.scp_mac_iv)
macData = cipher.encrypt(data[0:-SCP_MAC_LENGTH])
self.scp_mac_iv = macData[-16:]
if self.scp_mac_iv[-SCP_MAC_LENGTH:] != data[-SCP_MAC_LENGTH:] :
raise BaseException("Invalid SCP MAC")
# consume mac
data = data[0:-SCP_MAC_LENGTH]
if SCP_DEBUG:
print(binascii.hexlify(data))
# ENC
cipher = AES.new(self.scp_enc_key, AES.MODE_CBC, self.scp_enc_iv)
self.scp_enc_iv = data[-16:]
data = cipher.decrypt(data)
l = len(data) - 1
while (data[l] != padding_char):
l-=1
if l == -1:
raise BaseException("Invalid SCP ENC padding")
data = data[0:l]
decryptedData = data
if SCP_DEBUG:
print(binascii.hexlify(data))
else:
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
decryptedData = cipher.decrypt(data)
if SCP_DEBUG:
print("unwrap_old: "+binascii.hexlify(decryptedData))
l = len(decryptedData) - 1
while (decryptedData[l] != padding_char):
l-=1
if l == -1:
raise BaseException("Invalid SCP ENC padding")
decryptedData = decryptedData[0:l]
self.iv = data[-16:]
#print ("<<")
return decryptedData
def selectSegment(self, baseAddress):
data = b'\x05' + struct.pack('>I', baseAddress)
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def loadSegmentChunk(self, offset, chunk):
data = b'\x06' + struct.pack('>H', offset) + chunk
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def flushSegment(self):
data = b'\x07'
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def crcSegment(self, offsetSegment, lengthSegment, crcExpected):
data = b'\x08' + struct.pack('>H', offsetSegment) + struct.pack('>I', lengthSegment) + struct.pack('>H', crcExpected)
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def validateTargetId(self, targetId):
@ -138,10 +312,15 @@ class HexLoader:
data = b'\x09' + struct.pack('>I', bootadr)
if (signature != None):
data += chr(len(signature)) + signature
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def createApp(self, appflags, applength, appname, icon=None, path=None, iconOffset=None, iconSize=None, appversion=None):
def commit(self, signature=None):
data = b'\x09'
if (signature != None):
data += chr(len(signature)) + signature
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def createAppNoInstallParams(self, appflags, applength, appname, icon=None, path=None, iconOffset=None, iconSize=None, appversion=None):
data = b'\x0B' + struct.pack('>I', applength) + struct.pack('>I', appflags) + struct.pack('>B', len(appname)) + appname
if iconOffset is None:
if not (icon is None):
@ -160,42 +339,102 @@ class HexLoader:
if not appversion is None:
data += struct.pack('>B', len(appversion)) + appversion
data = self.encryptAES(data)
# in previous version, appparams are not part of the application hash yet
self.createappParams = None #data[1:]
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def createApp(self, code_length, data_length=0, install_params_length=0, flags=0, bootOffset=1):
#keep the create app parameters to be included in the load app hash
self.createappParams = struct.pack('>IIIII', code_length, data_length, install_params_length, flags, bootOffset)
data = b'\x0B' + self.createappParams
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def deleteApp(self, appname):
data = b'\x0C' + struct.pack('>B',len(appname)) + appname
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def deleteAppByHash(self, appfullhash):
if len(appfullhash) != 32:
raise BaseException("Invalid hash format, sha256 expected")
data = b'\x15' + appfullhash
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def getVersion(self):
data = b'\x10'
response = self.exchange(self.cla, 0x00, 0x00, 0x00, data)
if sys.version_info.major == 2:
response = bytearray(response)
result = {}
offset = 0
result['targetId'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3]
offset += 4
result['osVersion'] = response[offset + 1 : offset + 1 + response[offset]].decode('utf-8')
offset += 1 + response[offset]
offset += 1
result['flags'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3]
offset += 4
result['mcuVersion'] = response[offset + 1 : offset + 1 + response[offset] - 1].decode('utf-8')
offset += 1 + response[offset]
if (offset < len(response)):
result['mcuHash'] = response[offset : offset + 32]
return result
def listApp(self, restart=True):
if restart:
data = b'\x0E'
else:
data = b'\x0F'
data = self.encryptAES(data)
response = str(self.exchange(self.cla, 0x00, 0x00, 0x00, data))
response = bytearray(self.decryptAES(response))
response = self.exchange(self.cla, 0x00, 0x00, 0x00, data)
if sys.version_info.major == 2:
response = bytearray(response)
#print binascii.hexlify(response[0])
result = []
offset = 0
while offset != len(response):
item = {}
offset += 1
item['name'] = response[offset + 1 : offset + 1 + response[offset]]
offset += 1 + response[offset]
item['flags'] = response[offset] << 24 | response[offset + 1] << 16 | response[offset + 2] << 8 | response[offset + 3]
offset += 4
item['hash'] = response[offset : offset + 32]
offset += 32
result.append(item)
if len(response) > 0:
if response[0] != 0x01:
# support old format
while offset != len(response):
item = {}
offset += 1
item['name'] = response[offset + 1 : offset + 1 + response[offset]].decode('utf-8')
offset += 1 + response[offset]
item['flags'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3]
offset += 4
item['hash'] = response[offset : offset + 32]
offset += 32
result.append(item)
else:
offset += 1
while offset != len(response):
item = {}
#skip the current entry's size
offset += 1
item['flags'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3]
offset += 4
item['hash_code_data'] = response[offset : offset + 32]
offset += 32
item['hash'] = response[offset : offset + 32]
offset += 32
item['name'] = response[offset + 1 : offset + 1 + response[offset]].decode('utf-8')
offset += 1 + response[offset]
result.append(item)
return result
def load(self, erase_u8, max_length_per_apdu, hexFile):
def load(self, erase_u8, max_length_per_apdu, hexFile, reverse=False, doCRC=True):
if (max_length_per_apdu > self.max_mtu):
max_length_per_apdu = self.max_mtu
initialAddress = 0
if self.relative:
initialAddress = hexFile.minAddr()
sha256 = hashlib.new('sha256')
for area in hexFile.getAreas():
# stat by hashing the create app params to ensure complete app signature
if self.createappParams:
sha256.update(self.createappParams)
areas = hexFile.getAreas()
if reverse:
areas = reversed(hexFile.getAreas())
for area in areas:
startAddress = area.getStart() - initialAddress
data = area.getData()
self.selectSegment(startAddress)
@ -206,37 +445,50 @@ class HexLoader:
crc = self.crc16(bytearray(data))
offset = 0
length = len(data)
if reverse:
offset = length
while (length > 0):
if length > max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH:
chunkLen = max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH
if length > max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH - SCP_MAC_LENGTH:
chunkLen = max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH - SCP_MAC_LENGTH
if (chunkLen%16) != 0:
chunkLen -= (chunkLen%16)
else:
chunkLen = length
chunk = data[offset : offset + chunkLen]
sha256.update(chunk)
self.loadSegmentChunk(offset, bytes(chunk))
offset += chunkLen
if self.cleardata_block_len and chunkLen%self.cleardata_block_len:
if (chunkLen < self.cleardata_block_len):
raise Exception("Cannot transport not block aligned data with fixed block len")
chunkLen -= chunkLen%self.cleardata_block_len;
# padd with 00's when not complete block and performing NENC
if reverse:
chunk = data[offset-chunkLen : offset]
self.loadSegmentChunk(offset-chunkLen, bytes(chunk))
else:
chunk = data[offset : offset + chunkLen]
sha256.update(chunk)
self.loadSegmentChunk(offset, bytes(chunk))
if reverse:
offset -= chunkLen
else:
offset += chunkLen
length -= chunkLen
self.flushSegment()
self.crcSegment(0, len(data), crc)
if doCRC:
self.crcSegment(0, len(data), crc)
return sha256.hexdigest()
def run(self, hexFile, bootaddr, signature=None):
initialAddress = 0
if self.relative:
initialAddress = hexFile.minAddr()
self.boot(bootaddr - initialAddress, signature)
def run(self, bootoffset=1, signature=None):
self.boot(bootoffset, signature)
def resetCustomCA(self):
data = b'\x13'
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def setupCustomCA(self, name, public):
data = b'\x12' + struct.pack('>B',len(name)) + name + struct.pack('>B',len(public)) + public
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
def runApp(self, name):
data = b'\x14' + struct.pack('>B',len(name)) + name
data = self.encryptAES(data)
self.exchange(self.cla, 0x00, 0x00, 0x00, data)
data = name
self.exchange(self.cla, 0xD8, 0x00, 0x00, data)

View File

@ -66,7 +66,7 @@ class IntelHexParser:
recordType = data[3]
if recordType == 0x00:
if startZone == None:
raise Exception("Data record but no zone defined at line " + lineNumber)
raise Exception("Data record but no zone defined at line " + str(lineNumber))
if startFirst == None:
startFirst = address
current = startFirst
@ -123,10 +123,13 @@ class IntelHexParser:
import binascii
class IntelHexPrinter:
def addArea(self, startaddress, data):
#order by start address
def addArea(self, startaddress, data, insertFirst=False):
#self.areas.append(IntelHexArea(startaddress, data))
self.areas = insertAreaSorted(self.areas, IntelHexArea(startaddress, data))
if (insertFirst):
self.areas = [IntelHexArea(startaddress, data)] + self.areas
else:
#order by start address
self.areas = insertAreaSorted(self.areas, IntelHexArea(startaddress, data))
def __init__(self, parser=None, eol="\r\n"):
self.areas = []

View File

@ -18,6 +18,7 @@
"""
DEFAULT_ALIGNMENT = 1024
PAGE_ALIGNMENT = 64
import argparse
@ -32,6 +33,7 @@ def get_argparser():
repeated""", action='append')
parser.add_argument("--appName", help="The name to give the application after loading it")
parser.add_argument("--signature", help="A signature of the application (hex encoded)")
parser.add_argument("--signApp", help="Sign application with provided rootPrivateKey", action='store_true')
parser.add_argument("--appFlags", help="The application flags", type=auto_int)
parser.add_argument("--bootAddr", help="The application's boot address", type=auto_int)
parser.add_argument("--rootPrivateKey", help="""The Signer private key used to establish a Secure Channel (otherwise
@ -41,7 +43,13 @@ a random one will be generated)""")
parser.add_argument("--apilevel", help="Use given API level when interacting with the device", type=auto_int)
parser.add_argument("--delete", help="Delete the app with the same name before loading the provided one", action='store_true')
parser.add_argument("--params", help="Store icon and install parameters in a parameter section before the code", action='store_true')
parser.add_argument("--tlv", help="Use install parameters for all variable length parameters", action='store_true')
parser.add_argument("--dataSize", help="The code section's size in the provided hex file (to separate data from code, if not provided the whole allocated NVRAM section for the application will remain readonly.", type=auto_int)
parser.add_argument("--appVersion", help="The application version (as a string)")
parser.add_argument("--offline", help="Request to only output application load APDUs", action="store_true")
parser.add_argument("--installparamsSize", help="The loaded install parameters section size (when parameters are already included within the .hex file.", type=auto_int)
parser.add_argument("--tlvraw", help="Add a custom install param with the hextag:hexvalue encoding", action='append')
parser.add_argument("--dep", help="Add a dependency over an appname[:appversion]", action='append')
return parser
def auto_int(x):
@ -129,50 +137,121 @@ if __name__ == '__main__':
else:
path = parse_bip32_path(args.path[0], args.apilevel)
icon = None
if not args.icon is None:
icon = bytearray.fromhex(args.icon)
args.icon = bytearray.fromhex(args.icon)
signature = None
if not args.signature is None:
signature = bytearray.fromhex(args.signature)
#prepend app's data with the icon content (could also add other various install parameters)
printer = IntelHexPrinter(parser)
#todo build a TLV zone to keep install params
#todo dney nvm_write in that section ?
paramsSectionContent = []
if icon:
paramsSectionContent = icon
# prepend the param section (arbitrary)
if (args.params):
# Use of Nested Encryption Key within the SCP protocol is mandartory for upgrades
cleardata_block_len=None
if args.appFlags & 2:
# Not true for scp < 3
# if signature is None:
# raise BaseException('Upgrades must be signed')
# ensure data can be decoded with code decryption key without troubles.
cleardata_block_len = 16
if not args.offline:
dongle = getDongle(args.apdu)
if args.deployLegacy:
secret = getDeployedSecretV1(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId)
else:
secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId)
loader = HexLoader(dongle, 0xe0, not(args.offline), secret, cleardata_block_len=cleardata_block_len)
#tlv mode does not support explicit by name removal, would require a list app before to identify the hash to be removed
if (not (args.appFlags & 2)) and args.delete:
loader.deleteApp(args.appName)
if (args.tlv):
#if code length is not provided, then consider the whole provided hex file is the code and no data section is split
code_length = printer.maxAddr() - printer.minAddr()
if not args.dataSize is None:
code_length -= args.dataSize
else:
args.dataSize = 0
installparams = ""
# express dependency
if (args.dep):
for dep in args.dep:
appname = dep
appversion = None
# split if version is specified
if (dep.find(":") != -1):
(appname,appversion) = dep.split(":")
depvalue = encodelv(appname)
if(appversion):
depvalue += encodelv(appversion)
installparams += encodetlv(BOLOS_TAG_DEPENDENCY, depvalue)
#add raw install parameters as requested
if (args.tlvraw):
for tlvraw in args.tlvraw:
(hextag,hexvalue) = tlvraw.split(":")
installparams += encodetlv(int(hextag, 16), binascii.unhexlify(hexvalue))
if (not (args.appFlags & 2)) and ( args.installparamsSize is None or args.installparamsSize == 0 ):
#build install parameters
#mandatory app name
installparams += encodetlv(BOLOS_TAG_APPNAME, args.appName)
if not args.appVersion is None:
installparams += encodetlv(BOLOS_TAG_APPVERSION, args.appVersion)
if not args.icon is None:
installparams += encodetlv(BOLOS_TAG_ICON, args.icon)
if len(path) > 0:
installparams += encodetlv(BOLOS_TAG_DERIVEPATH, path)
# append install parameters to the loaded file
param_start = printer.maxAddr()+(PAGE_ALIGNMENT-(args.dataSize%PAGE_ALIGNMENT))%PAGE_ALIGNMENT
# only append install param section when not an upgrade as it has already been computed in the encrypted and signed chunk
printer.addArea(param_start, installparams)
paramsSize = len(installparams)
else:
paramsSize = args.installparamsSize
# split code and install params in the code
code_length -= args.installparamsSize
# create app
#ensure the boot address is an offset
if args.bootAddr > printer.minAddr():
args.bootAddr -= printer.minAddr()
loader.createApp(code_length, args.dataSize, paramsSize, args.appFlags, args.bootAddr|1)
elif (args.params):
paramsSectionContent = []
if not args.icon is None:
paramsSectionContent = args.icon
#take care of aligning the parameters sections to avoid possible invalid dereference of aligned words in the program nvram.
#also use the default MPU alignment
param_start = printer.minAddr()-len(paramsSectionContent)-(DEFAULT_ALIGNMENT-(len(paramsSectionContent)%DEFAULT_ALIGNMENT))
printer.addArea(param_start, paramsSectionContent)
# account for added regions (install parameters, icon ...)
appLength = printer.maxAddr() - printer.minAddr()
dongle = getDongle(args.apdu)
if args.deployLegacy:
secret = getDeployedSecretV1(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId)
# account for added regions (install parameters, icon ...)
appLength = printer.maxAddr() - printer.minAddr()
loader.createAppNoInstallParams(args.appFlags, appLength, args.appName, None, path, 0, len(paramsSectionContent), args.appVersion)
else:
secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId)
loader = HexLoader(dongle, 0xe0, True, secret)
# account for added regions (install parameters, icon ...)
appLength = printer.maxAddr() - printer.minAddr()
loader.createAppNoInstallParams(args.appFlags, appLength, args.appName, args.icon, path, None, None, args.appVersion)
if (not (args.appFlags & 2)) and args.delete:
loader.deleteApp(args.appName)
#heuristic to guess how to pass the icon
if (args.params):
loader.createApp(args.appFlags, appLength, args.appName, None, path, 0, len(paramsSectionContent), args.appVersion)
else:
loader.createApp(args.appFlags, appLength, args.appName, icon, path, None, None, args.appVersion)
hash = loader.load(0x0, 0xF0, printer)
print("Application hash : " + hash)
loader.run(printer, args.bootAddr, signature)
print("Application full hash : " + hash)
if (signature == None and args.signApp):
masterPrivate = PrivateKey(bytes(bytearray.fromhex(args.rootPrivateKey)))
signature = masterPrivate.ecdsa_serialize(masterPrivate.ecdsa_sign(bytes(binascii.unhexlify(hash)), raw=True))
print("Application signature: " + binascii.hexlify(signature))
if (args.tlv):
loader.commit(signature)
else:
loader.run(args.bootAddr-printer.minAddr(), signature)

View File

@ -29,6 +29,8 @@ bootloader mode.""")
parser.add_argument("--fileName", help="The name of the firmware file to load")
parser.add_argument("--bootAddr", help="The firmware's boot address", type=auto_int)
parser.add_argument("--apdu", help="Display APDU log", action='store_true')
parser.add_argument("--reverse", help="Load HEX file in reverse from the highest address to the lowest", action='store_true')
parser.add_argument("--nocrc", help="Load HEX file without checking CRC of loaded sections", action='store_true')
return parser
if __name__ == '__main__':
@ -53,5 +55,6 @@ if __name__ == '__main__':
loader = HexLoader(dongle, 0xe0, False, None, False)
loader.validateTargetId(args.targetId)
hash = loader.load(0xFF, 0xF0, parser)
loader.run(parser.getAreas(), args.bootAddr)
hash = loader.load(0xFF, 0xF0, parser, reverse=args.reverse, doCRC=(not args.nocrc))
loader.run(args.bootAddr)

View File

@ -55,7 +55,6 @@ if __name__ == '__main__':
dongle = getDongle(args.apdu)
secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId)
loader = HexLoader(dongle, 0xe0, True, secret)
loader = HexLoader(dongle, 0xe0)
loader.runApp(args.appName)

View File

@ -63,39 +63,23 @@ if __name__ == '__main__':
class SCP:
def __init__(self, dongle, targetId, rootPrivateKey):
self.key = getDeployedSecretV2(dongle, rootPrivateKey, targetId)
self.iv = b'\x00' * 16
secret = getDeployedSecretV2(dongle, rootPrivateKey, targetId)
self.loader = HexLoader(dongle, 0xe0, True, secret)
def encryptAES(self, data):
paddedData = data + b'\x80'
while (len(paddedData) % 16) != 0:
paddedData += b'\x00'
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
encryptedData = cipher.encrypt(paddedData)
self.iv = encryptedData[len(encryptedData) - 16:]
return encryptedData
return self.loader.scpWrap(data);
def decryptAES(self, data):
if len(data) == 0:
return data
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
decryptedData = cipher.decrypt(data)
l = len(decryptedData) - 1
while (decryptedData[l] != chr(0x80)):
l -= 1
decryptedData = decryptedData[0:l]
self.iv = data[len(data) - 16:]
return decryptedData
return self.loader.scpUnwrap(data);
dongle = getDongle(args.apdu)
if args.scp:
if args.rootPrivateKey is None:
privateKey = PrivateKey()
publicKey = binascii.hexlify(
privateKey.pubkey.serialize(compressed=False))
publicKey = binascii.hexlify(privateKey.pubkey.serialize(compressed=False))
print("Generated random root public key : %s" % publicKey)
args.rootPrivateKey = privateKey.serialize()
scp = SCP(dongle, args.targetId, bytearray.fromhex(args.rootPrivateKey))
scp = SCP(dongle, args.targetId, bytearray.fromhex(args.rootPrivateKey))
for data in file:
data = data.rstrip('\r\n').decode('hex')
@ -105,13 +89,13 @@ if __name__ == '__main__':
data = bytearray(data)
if data[4] > 0 and len(data) > 5:
apduData = data[5: 5 + data[4]]
apduData = scp.encryptAES(str(apduData))
apduData = scp.encryptAES(bytes(apduData))
result = dongle.exchange(
data[0:4] + bytearray([len(apduData)]) + bytearray(apduData))
data[0:4] + bytearray([len(apduData)]) + bytearray(apduData))
else:
result = dongle.exchange(data[0:5])
result = scp.decryptAES(str(result))
if args.apdu:
print("<= Clear " + hexstr(result))
print("<= Clear " + result.encode('hex'))
else:
dongle.exchange(bytearray(data))

View File

@ -46,9 +46,14 @@ def serverQuery(request, url):
return response
if __name__ == '__main__':
import sys
import os
import struct
import urllib2, urlparse
if sys.version_info.major == 3:
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2, urlparse
from .BlueHSMServer_pb2 import Request, Response, Parameter
from .comm import getDongle
import sys
@ -81,6 +86,11 @@ if __name__ == '__main__':
parameter.local = False
parameter.alias = "persoKey"
parameter.name = args.perso
if args.targetId&0xF == 0x3:
parameter = request.remote_parameters.add()
parameter.local = False
parameter.alias = "scpv2"
parameter.name = "dummy"
request.largeStack = True
response = serverQuery(request, args.url)
@ -106,14 +116,23 @@ if __name__ == '__main__':
parameter.local = False
parameter.alias = "persoKey"
parameter.name = args.perso
request.parameters = str(deviceNonce)
if args.targetId&0xF == 0x3:
parameter = request.remote_parameters.add()
parameter.local = False
parameter.alias = "scpv2"
parameter.name = "dummy"
request.parameters = bytes(deviceNonce)
request.largeStack = True
response = serverQuery(request, args.url)
offset = 0
remotePublicKeySignatureLength = ord(response.response[offset + 1]) + 2
if sys.version_info.major == 2:
responseLength = ord(response.response[offset + 1])
else:
responseLength = response.response[offset + 1]
remotePublicKeySignatureLength = responseLength + 2
remotePublicKeySignature = response.response[offset : offset + remotePublicKeySignatureLength]
certificate = bytearray([len(remotePublicKey)]) + remotePublicKey + bytearray([len(remotePublicKeySignature)]) + remotePublicKeySignature
@ -135,7 +154,7 @@ if __name__ == '__main__':
request = Request()
request.reference = "distributeFirmware11"
request.id = response.id
request.parameters = str(certificate)
request.parameters = bytes(certificate)
request.largeStack = True
serverQuery(request, args.url)
index += 1
@ -152,6 +171,11 @@ if __name__ == '__main__':
parameter.local = False
parameter.alias = "firmwareKey"
parameter.name = args.firmwareKey
if args.targetId&0xF == 0x3:
parameter = request.remote_parameters.add()
parameter.local = False
parameter.alias = "scpv2"
parameter.name = "dummy"
request.id = response.id
request.largeStack = True

View File

@ -8,14 +8,14 @@ import os
here = dirname(__file__)
setup(
name='ledgerblue',
version='0.1.16',
version='0.1.17',
author='Ledger',
author_email='hello@ledger.fr',
description='Python library to communicate with Ledger Blue/Nano S',
long_description=open(join(here, 'README.md')).read(),
url='https://github.com/LedgerHQ/blue-loader-python',
packages=find_packages(),
install_requires=['hidapi>=0.7.99', 'protobuf>=2.6.1', 'pycrypto>=2.6.1', 'future', 'ecpy>=0.8.1', 'pillow>=3.4.0'],
install_requires=['hidapi>=0.7.99', 'protobuf>=2.6.1', 'pycrypto>=2.6.1', 'future', 'ecpy>=0.8.1', 'pillow>=3.4.0', 'python-u2flib-host>=3.0.2'],
extras_require = {
'smartcard': [ 'python-pyscard>=1.6.12-4build1' ]
},