CANParser: check if signals exist (#870)

* check if signals exist

* add failing test

* better args and test msg in checks

* also need to check message addrs

* fix up new_msg

* consistent

* check signals if msg addresses are used

* cleanup

* cleanup

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
This commit is contained in:
Dean Lee 2023-07-01 17:04:33 +08:00 committed by GitHub
parent ecd0613872
commit 008104f940
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 12 deletions

View File

@ -41,12 +41,17 @@ cdef class CANParser:
self.vl_all = {}
self.ts_nanos = {}
msg_name_to_address = {}
msg_address_to_signals = {}
for i in range(self.dbc[0].msgs.size()):
msg = self.dbc[0].msgs[i]
name = msg.name.decode("utf8")
msg_name_to_address[name] = msg.address
msg_address_to_signals[msg.address] = set()
for sig in msg.sigs:
msg_address_to_signals[msg.address].add(sig.name.decode("utf8"))
self.address_to_msg_name[msg.address] = name
self.vl[msg.address] = {}
self.vl[name] = self.vl[msg.address]
@ -58,12 +63,13 @@ cdef class CANParser:
# Convert message names into addresses
for i in range(len(signals)):
s = signals[i]
if not isinstance(s[1], numbers.Number):
if s[1] not in msg_name_to_address:
print(msg_name_to_address)
raise RuntimeError(f"could not find message {repr(s[1])} in DBC {self.dbc_name}")
s = (s[0], msg_name_to_address[s[1]])
signals[i] = s
address = s[1] if isinstance(s[1], numbers.Number) else msg_name_to_address.get(s[1])
if address not in msg_address_to_signals:
raise RuntimeError(f"could not find message {repr(s[1])} in DBC {self.dbc_name}")
if s[0] not in msg_address_to_signals[address]:
raise RuntimeError(f"could not find signal {repr(s[0])} in {repr(s[1])}, DBC {self.dbc_name}")
signals[i] = (s[0], address)
for i in range(len(checks)):
c = checks[i]

View File

@ -319,16 +319,22 @@ class TestCanParserPacker(unittest.TestCase):
self.assertEqual(set(ts_nanos), {0})
def test_undefined_signals(self):
# Ensure we don't allow messages not in the DBC
existing_signals = {"STEERING_CONTROL": ["STEER_TORQUE_REQUEST", "SET_ME_X00_2", "COUNTER"],
"CAN_FD_MESSAGE": ["SIGNED", "64_BIT_LE", "64_BIT_BE", "COUNTER"]}
# Ensure we don't allow messages or signals not in the DBC
existing_signals = {
"STEERING_CONTROL": ["STEER_TORQUE_REQUEST", "SET_ME_X00_2", "COUNTER"],
228: ["STEER_TORQUE_REQUEST", "SET_ME_X00_2", "COUNTER"],
"CAN_FD_MESSAGE": ["SIGNED", "64_BIT_LE", "64_BIT_BE", "COUNTER"],
245: ["SIGNED", "64_BIT_LE", "64_BIT_BE", "COUNTER"],
}
for msg, sigs in existing_signals.items():
for sig in sigs:
CANParser(TEST_DBC, [(sig, msg)], [(msg, 0)])
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig, msg + "123")], [(msg, 0)]))
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig, msg)], [(msg + "123", 0)]))
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig, msg + "123")], [(msg + "123", 0)]))
new_msg = msg + "1" if isinstance(msg, str) else msg + 1
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig + "1", msg)], [(msg, 0)]))
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig, new_msg)], [(msg, 0)]))
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig, msg)], [(new_msg, 0)]))
self.assertRaises(RuntimeError, partial(CANParser, TEST_DBC, [(sig, new_msg)], [(new_msg, 0)]))
if __name__ == "__main__":