mirror of https://github.com/rusefi/opendbc.git
77 lines
2.5 KiB
Python
Executable File
77 lines
2.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import time
|
|
import unittest
|
|
|
|
from opendbc.can.parser import CANParser
|
|
from opendbc.can.packer import CANPacker
|
|
from opendbc.can.tests.test_packer_parser import can_list_to_can_capnp
|
|
|
|
|
|
class TestParser(unittest.TestCase):
|
|
def _benchmark(self, signals, checks, thresholds, n):
|
|
parser = CANParser('toyota_new_mc_pt_generated', signals, checks, 0, False)
|
|
packer = CANPacker('toyota_new_mc_pt_generated')
|
|
|
|
can_msgs = []
|
|
for i in range(50000):
|
|
values = {"ACC_CONTROL": {"ACC_TYPE": 1, "ALLOW_LONG_PRESS": 3}}
|
|
msgs = [packer.make_can_msg(k, 0, v) for k, v in values.items()]
|
|
bts = can_list_to_can_capnp(msgs, logMonoTime=int(0.01 * i * 1e9))
|
|
can_msgs.append(bts)
|
|
|
|
ets = []
|
|
for _ in range(25):
|
|
if n > 1:
|
|
strings = []
|
|
for i in range(0, len(can_msgs), n):
|
|
strings.append(can_msgs[i:i + n])
|
|
t1 = time.process_time_ns()
|
|
for m in strings:
|
|
parser.update_strings(m)
|
|
t2 = time.process_time_ns()
|
|
else:
|
|
t1 = time.process_time_ns()
|
|
for m in can_msgs:
|
|
parser.update_strings([m])
|
|
t2 = time.process_time_ns()
|
|
|
|
ets.append(t2 - t1)
|
|
|
|
et = sum(ets) / len(ets)
|
|
avg_nanos = et / len(can_msgs)
|
|
print('%s: [%d] %.1fms to parse %s, avg: %dns' % (self._testMethodName, n, et/1e6, len(can_msgs), avg_nanos))
|
|
|
|
minn, maxx = thresholds
|
|
self.assertLess(avg_nanos, maxx)
|
|
self.assertGreater(avg_nanos, minn, "Performance seems to have improved, update test thresholds.")
|
|
|
|
def test_performance_one_signal(self):
|
|
signals = [
|
|
("ACCEL_CMD", "ACC_CONTROL"),
|
|
]
|
|
self._benchmark(signals, [('ACC_CONTROL', 10)], (4000, 18000), 1)
|
|
self._benchmark(signals, [('ACC_CONTROL', 10)], (700, 3000), 10)
|
|
|
|
def test_performance_all_signals(self):
|
|
signals = [
|
|
("ACCEL_CMD", "ACC_CONTROL"),
|
|
("ALLOW_LONG_PRESS", "ACC_CONTROL"),
|
|
("ACC_MALFUNCTION", "ACC_CONTROL"),
|
|
("RADAR_DIRTY", "ACC_CONTROL"),
|
|
("DISTANCE", "ACC_CONTROL"),
|
|
("MINI_CAR", "ACC_CONTROL"),
|
|
("CANCEL_REQ", "ACC_CONTROL"),
|
|
("ACC_CUT_IN", "ACC_CONTROL"),
|
|
("PERMIT_BRAKING", "ACC_CONTROL"),
|
|
("RELEASE_STANDSTILL", "ACC_CONTROL"),
|
|
("ITS_CONNECT_LEAD", "ACC_CONTROL"),
|
|
("ACCEL_CMD_ALT", "ACC_CONTROL"),
|
|
("CHECKSUM", "ACC_CONTROL"),
|
|
]
|
|
self._benchmark(signals, [('ACC_CONTROL', 10)], (10000, 19000), 1)
|
|
self._benchmark(signals, [('ACC_CONTROL', 10)], (1300, 5000), 10)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|