Refactoring

This commit is contained in:
Anubis 2020-02-01 20:44:41 +01:00
parent fa8e2a6293
commit 7f097f925c
14 changed files with 48 additions and 50 deletions

View File

@ -56,10 +56,10 @@ If you do not want to start the tool with root privileges, you can do the follow
```
# set up CAN hardware interface
sudo sh can_setup.sh <CAN interface e.g., can0> <CAN bitrate e.g., 500000> <can-isotp.ko file pyth e.g., /lib/modules/4.19.75-v7+/kernel/net/can/can-isotp.ko>
sudo sh setup_can.sh <CAN interface e.g., can0> <CAN bitrate e.g., 500000> <can-isotp.ko file pyth e.g., /lib/modules/4.19.75-v7+/kernel/net/can/can-isotp.ko>
# or set up CAN virtual interface
sudo sh vcan_setup.sh <virtual CAN interface e.g., vcan0> <can-isotp.ko file pyth e.g., /lib/modules/4.19.75-v7+/kernel/net/can/can-isotp.ko>
sudo sh setup_vcan.sh <virtual CAN interface e.g., vcan0> <can-isotp.ko file pyth e.g., /lib/modules/4.19.75-v7+/kernel/net/can/can-isotp.ko>
# and then start the tool without sudo
python3 ecu_simulator.py

View File

@ -1,21 +1,20 @@
import os
import sys
from threading import Thread
import os
import obd_listener
import uds_listener
import can_logger
import isotp_logger
import ecu_config
import ecu_simulator_logger as logging
from obd import listener as obd_listener
from uds import listener as uds_listener
from loggers.logger_app import logger
from loggers import logger_can, logger_isotp
VCAN_SETUP_FILE = "vcan_setup.sh"
SETUP_VCAN_FILE = "setup_vcan.sh"
CAN_SETUP_FILE = "can_setup.sh"
SETUP_CAN_FILE = "setup_can.sh"
def main():
logging.configure()
logging.logger.info("Starting ECU-Simulator")
logger.configure()
logger.info("Starting ECU-Simulator")
set_up_can_interface()
star_can_logger_thread()
star_isotp_logger_thread()
@ -28,20 +27,20 @@ def set_up_can_interface():
can_interface = ecu_config.get_can_interface()
isotp_ko_file_path = ecu_config.get_isotp_ko_file_path()
if interface_type == "virtual":
logging.logger.info("Setting up virtual CAN interface: " + can_interface)
os.system("sh " + VCAN_SETUP_FILE + " " + can_interface + " " + isotp_ko_file_path)
logger.info("Setting up virtual CAN interface: " + can_interface)
os.system("sh " + SETUP_VCAN_FILE + " " + can_interface + " " + isotp_ko_file_path)
elif interface_type == "hardware":
logging.logger.info("Setting up CAN interface: " + can_interface)
logging.logger.info("Loading ISO-TP module from: " + isotp_ko_file_path)
os.system("sh " + CAN_SETUP_FILE + " " + can_interface + " " + ecu_config.get_can_bitrate() + " " + isotp_ko_file_path)
logger.info("Setting up CAN interface: " + can_interface)
logger.info("Loading ISO-TP module from: " + isotp_ko_file_path)
os.system("sh " + SETUP_CAN_FILE + " " + can_interface + " " + ecu_config.get_can_bitrate() + " " + isotp_ko_file_path)
def star_can_logger_thread():
Thread(target=can_logger.start).start()
Thread(target=logger_can.start).start()
def star_isotp_logger_thread():
Thread(target=isotp_logger.start).start()
Thread(target=logger_isotp.start).start()
def start_obd_listener_thread():

View File

@ -1,6 +1,6 @@
import logging
from logging import handlers
from logger_utils import MAX_LOG_FILE_SIZE
from loggers.logger_utils import MAX_LOG_FILE_SIZE
LOGGER_NAME = 'ecu_simulator'

View File

@ -1,5 +1,5 @@
import can
import logger_utils
from loggers import logger_utils
from addresses import ECU_ADDRESSES, TARGET_ADDRESSES
LOG_TYPE = "can"
@ -20,8 +20,7 @@ def start():
def create_can_bus():
bus = can.interface.Bus(channel=logger_utils.CAN_INTERFACE, bustype=BUS_TYPE, can_filters=get_filters())
return bus
return can.interface.Bus(channel=logger_utils.CAN_INTERFACE, bustype=BUS_TYPE, can_filters=get_filters())
def get_filters():

View File

@ -1,7 +1,7 @@
import isotp
import time
import logger_utils
from logger_utils import CAN_INTERFACE
from loggers import logger_utils
from loggers.logger_utils import CAN_INTERFACE
from addresses import UDS_ECU_ADDRESS, UDS_TARGET_ADDRESS
from addresses import OBD_BROADCAST_ADDRESS, OBD_ECU_ADDRESS, OBD_TARGET_ADDRESS

View File

@ -17,4 +17,4 @@ def create_new_file_path_if_size_exceeded(file_path, log_type):
def create_file_path(log_type):
return os.path.join(os.path.dirname(__file__), datetime.datetime.now().strftime(log_type + LOG_FILE_NAME_FORMAT))
return os.path.join(os.path.dirname("ecu_simulator"), datetime.datetime.now().strftime(log_type + LOG_FILE_NAME_FORMAT))

View File

@ -2,7 +2,7 @@ import isotp
import ecu_config
from obd import services
from addresses import OBD_BROADCAST_ADDRESS, OBD_ECU_ADDRESS, OBD_TARGET_ADDRESS
from ecu_simulator_logger import logger
from loggers.logger_app import logger
CAN_INTERFACE = ecu_config.get_can_interface()

View File

@ -1,5 +1,5 @@
from obd import service_responses as responses
from ecu_simulator_logger import logger
from obd import responses
from loggers.logger_app import logger
SUPPORTED_PIDS_RESPONSE_MASK = 0x80000000

View File

@ -1,5 +1,5 @@
import unittest
from obd import service_responses
from obd import responses
BIG_ENDIAN = "big"
@ -11,54 +11,54 @@ ENGINE_TEMP_MAX = 150
class TestServiceResponses(unittest.TestCase):
def test_get_vehicle_speed_is_one_byte(self):
self.assertEqual(1, len(service_responses.get_vehicle_speed()))
self.assertEqual(1, len(responses.get_vehicle_speed()))
def test_get_vehicle_speed_is_in_range_0_255(self):
initial_vehicle_speed = 0
self.assertEqual(initial_vehicle_speed.to_bytes(1, BIG_ENDIAN), service_responses.get_vehicle_speed())
self.assertEqual((initial_vehicle_speed + 1).to_bytes(1, BIG_ENDIAN), service_responses.get_vehicle_speed())
self.assertEqual(initial_vehicle_speed.to_bytes(1, BIG_ENDIAN), responses.get_vehicle_speed())
self.assertEqual((initial_vehicle_speed + 1).to_bytes(1, BIG_ENDIAN), responses.get_vehicle_speed())
for i in range(2, 256, 1):
self.assertEqual(i.to_bytes(1, BIG_ENDIAN), service_responses.get_vehicle_speed())
self.assertEqual(i.to_bytes(1, BIG_ENDIAN), responses.get_vehicle_speed())
self.assertEqual(initial_vehicle_speed.to_bytes(1, BIG_ENDIAN), service_responses.get_vehicle_speed())
self.assertEqual((initial_vehicle_speed + 1).to_bytes(1, BIG_ENDIAN), service_responses.get_vehicle_speed())
self.assertEqual(initial_vehicle_speed.to_bytes(1, BIG_ENDIAN), responses.get_vehicle_speed())
self.assertEqual((initial_vehicle_speed + 1).to_bytes(1, BIG_ENDIAN), responses.get_vehicle_speed())
for i in range(2, 256, 1):
self.assertEqual(i.to_bytes(1, BIG_ENDIAN), service_responses.get_vehicle_speed())
self.assertEqual(i.to_bytes(1, BIG_ENDIAN), responses.get_vehicle_speed())
def test_get_fuel_level_is_one_byte(self):
self.assertEqual(1, len(service_responses.get_fuel_level()))
self.assertEqual(1, len(responses.get_fuel_level()))
def test_get_engine_temperature_is_one_byte(self):
self.assertEqual(1, len(service_responses.get_engine_temperature()))
self.assertEqual(1, len(responses.get_engine_temperature()))
def test_get_engine_temperature(self):
self.assertAlmostEqual(ENGINE_TEMP_MIN, int(service_responses.get_engine_temperature().hex(), 16), delta=(
self.assertAlmostEqual(ENGINE_TEMP_MIN, int(responses.get_engine_temperature().hex(), 16), delta=(
ENGINE_TEMP_MAX - ENGINE_TEMP_MIN))
self.assertAlmostEqual(ENGINE_TEMP_MAX, int(service_responses.get_engine_temperature().hex(), 16), delta=(
self.assertAlmostEqual(ENGINE_TEMP_MAX, int(responses.get_engine_temperature().hex(), 16), delta=(
ENGINE_TEMP_MAX - ENGINE_TEMP_MIN))
def test_get_vin_has_18_bytes(self):
self.assertEqual(18, len(service_responses.get_vin()))
self.assertEqual(18, len(responses.get_vin()))
def test_get_vin_first_byte_is_0(self):
self.assertEqual(0, service_responses.get_vin()[0])
self.assertEqual(0, responses.get_vin()[0])
def test_get_vin_has_20_bytes(self):
self.assertEqual(20, len(service_responses.get_ecu_name()))
self.assertEqual(20, len(responses.get_ecu_name()))
def test_get_fuel_level_has_1_byte(self):
self.assertEqual(1, len(service_responses.get_fuel_level()))
self.assertEqual(1, len(responses.get_fuel_level()))
def test_get_fuel_level_is_smaller_equal_than_100(self):
fuel_level = int(service_responses.get_fuel_level().hex(), 16) * (100 / 255)
fuel_level = int(responses.get_fuel_level().hex(), 16) * (100 / 255)
self.assertTrue(100 >= fuel_level > 0)
def test_get_fuel_type_has_1_byte(self):
self.assertEqual(1, len(service_responses.get_fuel_type()))
self.assertEqual(1, len(responses.get_fuel_type()))
def test_get_fuel_type_is_smaller_equal_than_23(self):
# see https://en.wikipedia.org/wiki/OBD-II_PIDs#Fuel_Type_Coding
self.assertTrue(23 >= int(service_responses.get_fuel_type().hex(), 16) > 0)
self.assertTrue(23 >= int(responses.get_fuel_type().hex(), 16) > 0)
if __name__ == '__main__':

View File

@ -2,7 +2,7 @@ import isotp
import ecu_config
from uds import services
from addresses import UDS_ECU_ADDRESS, UDS_TARGET_ADDRESS
from ecu_simulator_logger import logger
from loggers.logger_app import logger
CAN_INTERFACE = ecu_config.get_can_interface()

View File

@ -1,6 +1,6 @@
import dtc_utils
import ecu_config as ecu_config
from ecu_simulator_logger import logger
from loggers.logger_app import logger
READ_DTC_INFO_BY_STATUS_MASK = 0x2