don't use generic Exception, but rather specific subclass

This commit is contained in:
Pavol Rusnak 2017-11-06 11:09:54 +01:00
parent 1ab602423c
commit a5fc76d8c9
No known key found for this signature in database
GPG Key ID: 91F3B339B9A02A3D
13 changed files with 80 additions and 80 deletions

View File

@ -31,22 +31,22 @@ tx_api.cache_dir = '../txcache'
try:
from trezorlib.transport_hid import HidTransport
HID_ENABLED = True
except Exception as e:
print('HID transport disabled:', e.message, e.args)
except ImportError as e:
print('HID transport disabled:', e)
HID_ENABLED = False
try:
from trezorlib.transport_pipe import PipeTransport
PIPE_ENABLED = True
except Exception as e:
print('PIPE transport disabled:', e.message, e.args)
except ImportError as e:
print('PIPE transport disabled:', e)
PIPE_ENABLED = False
try:
from trezorlib.transport_udp import UdpTransport
UDP_ENABLED = True
except Exception as e:
print('UDP transport disabled:', e.message, e.args)
except ImportError as e:
print('UDP transport disabled:', e)
UDP_ENABLED = False
@ -132,24 +132,24 @@ def generate_entropy(strength, internal_entropy, external_entropy):
random - binary stream of random data from external HRNG
'''
if strength not in (128, 192, 256):
raise Exception("Invalid strength")
raise ValueError("Invalid strength")
if not internal_entropy:
raise Exception("Internal entropy is not provided")
raise ValueError("Internal entropy is not provided")
if len(internal_entropy) < 32:
raise Exception("Internal entropy too short")
raise ValueError("Internal entropy too short")
if not external_entropy:
raise Exception("External entropy is not provided")
raise ValueError("External entropy is not provided")
if len(external_entropy) < 32:
raise Exception("External entropy too short")
raise ValueError("External entropy too short")
entropy = hashlib.sha256(internal_entropy + external_entropy).digest()
entropy_stripped = entropy[:strength // 8]
if len(entropy_stripped) * 8 != strength:
raise Exception("Entropy length mismatch")
raise ValueError("Entropy length mismatch")
return entropy_stripped

View File

@ -37,13 +37,13 @@ def wait_for_devices():
def choose_device(devices):
if not len(devices):
raise Exception("No TREZOR connected!")
raise RuntimeError("No TREZOR connected!")
if len(devices) == 1:
try:
return devices[0]
except IOError:
raise Exception("Device is currently in use")
raise RuntimeError("Device is currently in use")
i = 0
sys.stderr.write("----------------------------\n")
@ -69,7 +69,7 @@ def choose_device(devices):
device_id = int(input())
return devices[device_id]
except:
raise Exception("Invalid choice, exiting...")
raise ValueError("Invalid choice, exiting...")
def main():
@ -101,7 +101,7 @@ def main():
passw = hashlib.sha256(trezor_entropy + urandom_entropy).digest()
if len(passw) != 32:
raise Exception("32 bytes password expected")
raise ValueError("32 bytes password expected")
bip32_path = [10, 0]
passw_encrypted = client.encrypt_keyvalue(bip32_path, label, passw, False, True)

View File

@ -29,25 +29,25 @@ def generate_entropy(strength, internal_entropy, external_entropy):
random - binary stream of random data from external HRNG
'''
if strength not in (128, 192, 256):
raise Exception("Invalid strength")
raise ValueError("Invalid strength")
if not internal_entropy:
raise Exception("Internal entropy is not provided")
raise ValueError("Internal entropy is not provided")
if len(internal_entropy) < 32:
raise Exception("Internal entropy too short")
raise ValueError("Internal entropy too short")
if not external_entropy:
raise Exception("External entropy is not provided")
raise ValueError("External entropy is not provided")
if len(external_entropy) < 32:
raise Exception("External entropy too short")
raise ValueError("External entropy too short")
entropy = hashlib.sha256(internal_entropy + external_entropy).digest()
entropy_stripped = entropy[:strength // 8]
if len(entropy_stripped) * 8 != strength:
raise Exception("Entropy length mismatch")
raise ValueError("Entropy length mismatch")
return entropy_stripped

View File

@ -450,7 +450,7 @@ def sign_tx(connect, coin):
try:
txapi = coins_txapi[coin]
except:
raise Exception('Coin "%s" is not supported' % coin)
raise ValueError('Coin "%s" is not supported' % coin)
client.set_tx_api(txapi)
if sys.version_info.major < 3:

View File

@ -53,7 +53,7 @@ def sec_to_public_pair(pubkey):
x = string_to_number(pubkey[1:33])
sec0 = pubkey[:1]
if sec0 not in (b'\2', b'\3'):
raise Exception("Compressed pubkey expected")
raise ValueError("Compressed pubkey expected")
def public_pair_for_x(generator, x, is_even):
curve = generator.curve()
@ -81,7 +81,7 @@ def get_address(public_node, address_type):
def public_ckd(public_node, n):
if not isinstance(n, list):
raise Exception('Parameter must be a list')
raise ValueError('Parameter must be a list')
node = proto_types.HDNodeType()
node.CopyFrom(public_node)
@ -97,7 +97,7 @@ def get_subnode(node, i):
i_as_bytes = struct.pack(">L", i)
if is_prime(i):
raise Exception("Prime derivation not supported")
raise ValueError("Prime derivation not supported")
# Public derivation
data = node.public_key + i_as_bytes
@ -116,7 +116,7 @@ def get_subnode(node, i):
point = I_left_as_exponent * SECP256k1.generator + Point(SECP256k1.curve, x, y, SECP256k1.order)
if point == INFINITY:
raise Exception("Point cannot be INFINITY")
raise ValueError("Point cannot be INFINITY")
# Convert public point to compressed public key
node_out.public_key = point_to_pubkey(point)
@ -143,7 +143,7 @@ def deserialize(xpub):
data = tools.b58decode(xpub, None)
if tools.Hash(data[:-4])[:4] != data[-4:]:
raise Exception("Checksum failed")
raise ValueError("Checksum failed")
node = proto_types.HDNodeType()
node.depth = struct.unpack('>B', data[4:5])[0]

View File

@ -144,7 +144,7 @@ class expect(object):
def wrapped_f(*args, **kwargs):
ret = f(*args, **kwargs)
if not isinstance(ret, self.expected):
raise Exception("Got %s, expected %s" % (ret.__class__, self.expected))
raise RuntimeError("Got %s, expected %s" % (ret.__class__, self.expected))
return ret
return wrapped_f
@ -174,7 +174,7 @@ def normalize_nfc(txt):
if isinstance(txt, str):
return unicodedata.normalize('NFC', txt)
raise Exception('unicode/str or bytes/str expected')
raise ValueError('unicode/str or bytes/str expected')
class BaseClient(object):
@ -204,7 +204,7 @@ class BaseClient(object):
if handler is not None:
msg = handler(resp)
if msg is None:
raise Exception("Callback %s must return protobuf message, not None" % handler)
raise ValueError("Callback %s must return protobuf message, not None" % handler)
resp = self.call(msg)
return resp
@ -355,7 +355,7 @@ class DebugLinkMixin(object):
# return isinstance(value, TypeError)
# Evaluate missed responses in 'with' statement
if self.expected_responses is not None and len(self.expected_responses):
raise Exception("Some of expected responses didn't come from device: %s" %
raise RuntimeError("Some of expected responses didn't come from device: %s" %
[pprint(x) for x in self.expected_responses])
# Cleanup
@ -364,7 +364,7 @@ class DebugLinkMixin(object):
def set_expected_responses(self, expected):
if not self.in_with_statement:
raise Exception("Must be called inside 'with' statement")
raise RuntimeError("Must be called inside 'with' statement")
self.expected_responses = expected
def setup_debuglink(self, button, pin_correct):
@ -441,7 +441,7 @@ class DebugLinkMixin(object):
if pos != 0:
return proto.WordAck(word=self.mnemonic[pos - 1])
raise Exception("Unexpected call")
raise RuntimeError("Unexpected call")
class ProtocolMixin(object):
@ -459,7 +459,7 @@ class ProtocolMixin(object):
def init_device(self):
self.features = expect(proto.Features)(self.call)(proto.Initialize())
if str(self.features.vendor) not in self.VENDORS:
raise Exception("Unsupported device")
raise RuntimeError("Unsupported device")
def _get_local_entropy(self):
return os.urandom(32)
@ -844,7 +844,7 @@ class ProtocolMixin(object):
if self.tx_api:
tx.CopyFrom(self.tx_api.get_tx(binascii.hexlify(inp.prev_hash).decode('utf-8')))
else:
raise Exception('TX_API not defined')
raise RuntimeError('TX_API not defined')
known_hashes.append(inp.prev_hash)
return msg
@ -869,7 +869,7 @@ class ProtocolMixin(object):
if self.tx_api:
txes[inp.prev_hash] = self.tx_api.get_tx(binascii.hexlify(inp.prev_hash).decode('utf-8'))
else:
raise Exception('TX_API not defined')
raise RuntimeError('TX_API not defined')
known_hashes.append(inp.prev_hash)
return txes
@ -912,7 +912,7 @@ class ProtocolMixin(object):
if res.HasField('serialized') and res.serialized.HasField('signature_index'):
if signatures[res.serialized.signature_index] is not None:
raise Exception("Signature for index %d already filled" % res.serialized.signature_index)
raise ValueError("Signature for index %d already filled" % res.serialized.signature_index)
signatures[res.serialized.signature_index] = res.serialized.signature
if res.request_type == types.TXFINISHED:
@ -971,7 +971,7 @@ class ProtocolMixin(object):
continue
if None in signatures:
raise Exception("Some signatures are missing!")
raise RuntimeError("Some signatures are missing!")
log("SIGNED IN %.03f SECONDS, CALLED %d MESSAGES, %d BYTES" %
(time.time() - start, counter, len(serialized_tx)))
@ -989,10 +989,10 @@ class ProtocolMixin(object):
@expect(proto.Success)
def recovery_device(self, word_count, passphrase_protection, pin_protection, label, language, type=types.RecoveryDeviceType_ScrambledWords, expand=False, dry_run=False):
if self.features.initialized and not dry_run:
raise Exception("Device is initialized already. Call wipe_device() and try again.")
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
if word_count not in (12, 18, 24):
raise Exception("Invalid word count. Use 12/18/24")
raise ValueError("Invalid word count. Use 12/18/24")
self.recovery_matrix_first_pass = True
@ -1019,7 +1019,7 @@ class ProtocolMixin(object):
@session
def reset_device(self, display_random, strength, passphrase_protection, pin_protection, label, language, u2f_counter=0, skip_backup=False):
if self.features.initialized:
raise Exception("Device is initialized already. Call wipe_device() and try again.")
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
# Begin with device reset workflow
msg = proto.ResetDevice(display_random=display_random,
@ -1033,7 +1033,7 @@ class ProtocolMixin(object):
resp = self.call(msg)
if not isinstance(resp, proto.EntropyRequest):
raise Exception("Invalid response, expected EntropyRequest")
raise RuntimeError("Invalid response, expected EntropyRequest")
external_entropy = self._get_local_entropy()
log("Computer generated entropy: " + binascii.hexlify(external_entropy).decode('ascii'))
@ -1062,10 +1062,10 @@ class ProtocolMixin(object):
mnemonic = m.expand(mnemonic)
if not skip_checksum and not m.check(mnemonic):
raise Exception("Invalid mnemonic checksum")
raise ValueError("Invalid mnemonic checksum")
if self.features.initialized:
raise Exception("Device is initialized already. Call wipe_device() and try again.")
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
resp = self.call(proto.LoadDevice(mnemonic=mnemonic, pin=pin,
passphrase_protection=passphrase_protection,
@ -1079,23 +1079,23 @@ class ProtocolMixin(object):
@expect(proto.Success)
def load_device_by_xprv(self, xprv, pin, passphrase_protection, label, language):
if self.features.initialized:
raise Exception("Device is initialized already. Call wipe_device() and try again.")
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
if xprv[0:4] not in ('xprv', 'tprv'):
raise Exception("Unknown type of xprv")
raise ValueError("Unknown type of xprv")
if len(xprv) < 100 and len(xprv) > 112:
raise Exception("Invalid length of xprv")
raise ValueError("Invalid length of xprv")
node = types.HDNodeType()
data = binascii.hexlify(tools.b58decode(xprv, None))
if data[90:92] != b'00':
raise Exception("Contain invalid private key")
raise ValueError("Contain invalid private key")
checksum = binascii.hexlify(hashlib.sha256(hashlib.sha256(binascii.unhexlify(data[:156])).digest()).digest()[:4])
if checksum != data[156:]:
raise Exception("Checksum doesn't match")
raise ValueError("Checksum doesn't match")
# version 0488ade4
# depth 00
@ -1122,7 +1122,7 @@ class ProtocolMixin(object):
@session
def firmware_update(self, fp):
if self.features.bootloader_mode is False:
raise Exception("Device must be in bootloader mode")
raise RuntimeError("Device must be in bootloader mode")
data = fp.read()
@ -1139,7 +1139,7 @@ class ProtocolMixin(object):
return True
elif isinstance(resp, proto.Failure) and resp.code == types.Failure_FirmwareError:
return False
raise Exception("Unexpected result %s" % resp)
raise RuntimeError("Unexpected result %s" % resp)
# TREZORv2 method
if isinstance(resp, proto.FirmwareRequest):
@ -1154,15 +1154,15 @@ class ProtocolMixin(object):
return True
elif isinstance(resp, proto.Failure) and resp.code == types.Failure_FirmwareError:
return False
raise Exception("Unexpected result %s" % resp)
raise RuntimeError("Unexpected result %s" % resp)
raise Exception("Unexpected message %s" % resp)
raise RuntimeError("Unexpected message %s" % resp)
@field('message')
@expect(proto.Success)
def self_test(self):
if self.features.bootloader_mode is False:
raise Exception("Device must be in bootloader mode")
raise RuntimeError("Device must be in bootloader mode")
return self.call(proto.SelfTest(payload=b'\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC'))

View File

@ -15,7 +15,7 @@ def H(m):
def expmod(b, e, m):
if e < 0:
raise Exception('negative exponent')
raise ValueError('negative exponent')
if e == 0:
return 1
t = expmod(b, e >> 1, m) ** 2 % m
@ -130,18 +130,18 @@ def decodepoint(s):
x = q - x
P = [x, y]
if not isoncurve(P):
raise Exception('decoding point that is not on curve')
raise ValueError('decoding point that is not on curve')
return P
def checkvalid(s, m, pk):
if len(s) != b >> 2:
raise Exception('signature length is wrong')
raise ValueError('signature length is wrong')
if len(pk) != b >> 3:
raise Exception('public-key length is wrong')
raise ValueError('public-key length is wrong')
R = decodepoint(s[0:b >> 3])
A = decodepoint(pk)
S = decodeint(s[b >> 3:b >> 2])
h = Hint(encodepoint(R) + pk + m)
if scalarmult(B, S) != edwards(R, scalarmult(A, h)):
raise Exception('signature does not pass verification')
raise ValueError('signature does not pass verification')

View File

@ -48,7 +48,7 @@ def check_missing():
missing = list(set(types) - set(map_type_to_class.values()))
if len(missing):
raise Exception("Following protobuf messages are not defined in mapping: %s" % missing)
raise ValueError("Following protobuf messages are not defined in mapping: %s" % missing)
build_map()

View File

@ -64,17 +64,17 @@ class ProtocolV1(object):
def parse_first(self, chunk):
if chunk[:3] != b'?##':
raise Exception('Unexpected magic characters')
raise RuntimeError('Unexpected magic characters')
try:
headerlen = struct.calcsize('>HL')
(msg_type, datalen) = struct.unpack('>HL', chunk[3:3 + headerlen])
except:
raise Exception('Cannot parse header')
raise RuntimeError('Cannot parse header')
data = chunk[3 + headerlen:]
return (msg_type, datalen, data)
def parse_next(self, chunk):
if chunk[:1] != b'?':
raise Exception('Unexpected magic characters')
raise RuntimeError('Unexpected magic characters')
return chunk[1:]

View File

@ -45,12 +45,12 @@ class ProtocolV2(object):
resp = transport.read_chunk()
(magic, ) = struct.unpack('>B', resp[:1])
if magic != 0x04:
raise Exception('Expected session close')
raise RuntimeError('Expected session close')
self.session = None
def write(self, transport, msg):
if not self.session:
raise Exception('Missing session for v2 protocol')
raise RuntimeError('Missing session for v2 protocol')
# Serialize whole message
data = bytearray(msg.SerializeToString())
@ -73,7 +73,7 @@ class ProtocolV2(object):
def read(self, transport):
if not self.session:
raise Exception('Missing session for v2 protocol')
raise RuntimeError('Missing session for v2 protocol')
# Read header with first part of message data
chunk = transport.read_chunk()
@ -98,11 +98,11 @@ class ProtocolV2(object):
headerlen = struct.calcsize('>BLLL')
(magic, session, msg_type, datalen) = struct.unpack('>BLLL', chunk[:headerlen])
except:
raise Exception('Cannot parse header')
raise RuntimeError('Cannot parse header')
if magic != 0x01:
raise Exception('Unexpected magic character')
raise RuntimeError('Unexpected magic character')
if session != self.session:
raise Exception('Session id mismatch')
raise RuntimeError('Session id mismatch')
return msg_type, datalen, chunk[headerlen:]
def parse_next(self, chunk):
@ -110,11 +110,11 @@ class ProtocolV2(object):
headerlen = struct.calcsize('>BLL')
(magic, session, sequence) = struct.unpack('>BLL', chunk[:headerlen])
except:
raise Exception('Cannot parse header')
raise RuntimeError('Cannot parse header')
if magic != 0x02:
raise Exception('Unexpected magic characters')
raise RuntimeError('Unexpected magic characters')
if session != self.session:
raise Exception('Session id mismatch')
raise RuntimeError('Session id mismatch')
return chunk[headerlen:]
def parse_session_open(self, chunk):
@ -122,7 +122,7 @@ class ProtocolV2(object):
headerlen = struct.calcsize('>BL')
(magic, session) = struct.unpack('>BL', chunk[:headerlen])
except:
raise Exception('Cannot parse header')
raise RuntimeError('Cannot parse header')
if magic != 0x03:
raise Exception('Unexpected magic character')
raise RuntimeError('Unexpected magic character')
return session

View File

@ -25,7 +25,7 @@ class PinButton(QPushButton):
elif QT_VERSION_STR >= '4':
QObject.connect(self, SIGNAL('clicked()'), self._pressed)
else:
raise Exception('Unsupported Qt version')
raise RuntimeError('Unsupported Qt version')
def _pressed(self):
self.password.setText(self.password.text() + str(self.encoded_value))
@ -52,7 +52,7 @@ class PinMatrixWidget(QWidget):
elif QT_VERSION_STR >= '4':
QObject.connect(self.password, SIGNAL('textChanged(QString)'), self._password_changed)
else:
raise Exception('Unsupported Qt version')
raise RuntimeError('Unsupported Qt version')
self.strength = QLabel()
self.strength.setMinimumWidth(75)
@ -123,7 +123,7 @@ if __name__ == '__main__':
elif QT_VERSION_STR >= '4':
QObject.connect(ok, SIGNAL('clicked()'), clicked)
else:
raise Exception('Unsupported Qt version')
raise RuntimeError('Unsupported Qt version')
vbox = QVBoxLayout()
vbox.addWidget(matrix)

View File

@ -54,7 +54,7 @@ def hash_160_to_bc_address(h160, address_type):
def compress_pubkey(public_key):
if byteindex(public_key, 0) == 4:
return bytes((byteindex(public_key, 64) & 1) + 2) + public_key[1:33]
raise Exception("Pubkey is already compressed")
raise ValueError("Pubkey is already compressed")
def public_key_to_bc_address(public_key, address_type, compress=True):

View File

@ -46,7 +46,7 @@ class TxApi(object):
r = requests.get(url, headers={'User-agent': 'Mozilla/5.0'})
j = r.json()
except:
raise Exception('URL error: %s' % url)
raise RuntimeError('URL error: %s' % url)
if cache_dir and cache_file:
try: # saving into cache
json.dump(j, open(cache_file, 'w'))