Refactoring: Added addresses

This commit is contained in:
Anubis 2020-01-23 15:59:06 +01:00
parent 97926a55df
commit 22380d2f02
6 changed files with 35 additions and 33 deletions

15
addresses.py Normal file
View File

@ -0,0 +1,15 @@
import ecu_config
OBD_BROADCAST_ADDRESS = ecu_config.get_obd_broadcast_address()
OBD_ECU_ADDRESS = ecu_config.get_obd_ecu_address()
OBD_TARGET_ADDRESS = OBD_ECU_ADDRESS + 8
UDS_ECU_ADDRESS = ecu_config.get_uds_ecu_address()
UDS_TARGET_ADDRESS = UDS_ECU_ADDRESS + 8
ECU_ADDRESSES = [OBD_BROADCAST_ADDRESS, OBD_ECU_ADDRESS, UDS_ECU_ADDRESS]
TARGET_ADDRESSES = [OBD_TARGET_ADDRESS, UDS_TARGET_ADDRESS]

View File

@ -2,8 +2,9 @@ import can
import ecu_config
import os
import datetime
from addresses import ECU_ADDRESSES, TARGET_ADDRESSES
LOG_FILE_NAME_FORMAT = 'can_%y%m%d%H%M%S.log'
LOG_FILE_NAME_FORMAT = "can_%y%m%d%H%M%S.log"
MAX_LOG_FILE_SIZE = 1500000 # bytes
@ -32,11 +33,10 @@ def get_filters():
def get_can_ids():
ecu_can_ids = ecu_config.get_ecu_addresses()
tester_can_ids = []
for ecu_can_id in ecu_can_ids:
tester_can_ids.append(ecu_can_id+8)
return ecu_can_ids + tester_can_ids
can_ids = []
can_ids.extend(ECU_ADDRESSES)
can_ids.extend(TARGET_ADDRESSES)
return can_ids
def create_new_file_path_if_size_exceeded(file_path):

View File

@ -26,10 +26,6 @@ def get_dtcs():
return CONFIG["dtcs"].get("value")
def get_ecu_addresses():
return [get_obd_broadcast_address(), get_obd_ecu_address(), get_uds_ecu_address()]
def get_obd_broadcast_address():
return create_address(CONFIG["obd_broadcast_address"].get("value"))

View File

@ -2,11 +2,11 @@ import os
import isotp
import time
import datetime
import obd_listener
import uds_listener
import ecu_config
from addresses import UDS_ECU_ADDRESS, UDS_TARGET_ADDRESS
from addresses import OBD_BROADCAST_ADDRESS, OBD_ECU_ADDRESS, OBD_TARGET_ADDRESS
LOG_FILE_NAME_FORMAT = 'isotp_%y%m%d%H%M%S.log'
LOG_FILE_NAME_FORMAT = "isotp_%y%m%d%H%M%S.log"
MAX_LOG_FILE_SIZE = 1500000 # bytes
@ -14,14 +14,14 @@ CAN_INTERFACE = ecu_config.get_can_interface()
def start():
uds_socket_req = create_socket(rxid=uds_listener.ECU_ADDRESS, txid=uds_listener.TARGET_ADDRESS)
uds_socket_res = create_socket(rxid=uds_listener.TARGET_ADDRESS, txid=uds_listener.ECU_ADDRESS)
uds_socket_req = create_socket(rxid=UDS_ECU_ADDRESS, txid=UDS_TARGET_ADDRESS)
uds_socket_res = create_socket(rxid=UDS_TARGET_ADDRESS, txid=UDS_ECU_ADDRESS)
obd_broadcast_socket_req = create_socket(rxid=obd_listener.BROADCAST_ADDRESS, txid=obd_listener.TARGET_ADDRESS)
obd_broadcast_socket_res = create_socket(rxid=obd_listener.TARGET_ADDRESS, txid=obd_listener.BROADCAST_ADDRESS)
obd_broadcast_socket_req = create_socket(rxid=OBD_BROADCAST_ADDRESS, txid=OBD_TARGET_ADDRESS)
obd_broadcast_socket_res = create_socket(rxid=OBD_TARGET_ADDRESS, txid=OBD_BROADCAST_ADDRESS)
obd_socket_req = create_socket(rxid=obd_listener.ECU_ADDRESS, txid=obd_listener.TARGET_ADDRESS)
obd_socket_res = create_socket(rxid=obd_listener.TARGET_ADDRESS, txid=obd_listener.ECU_ADDRESS)
obd_socket_req = create_socket(rxid=OBD_ECU_ADDRESS, txid=OBD_TARGET_ADDRESS)
obd_socket_res = create_socket(rxid=OBD_TARGET_ADDRESS, txid=OBD_ECU_ADDRESS)
log_file_path = create_file_path()

View File

@ -1,19 +1,13 @@
import isotp
import ecu_config
from obd import services
from addresses import OBD_BROADCAST_ADDRESS, OBD_ECU_ADDRESS, OBD_TARGET_ADDRESS
CAN_INTERFACE = ecu_config.get_can_interface()
BROADCAST_ADDRESS = ecu_config.get_obd_broadcast_address()
ECU_ADDRESS = ecu_config.get_obd_ecu_address()
TARGET_ADDRESS = ECU_ADDRESS + 8
def start():
request_socket = create_isotp_socket(BROADCAST_ADDRESS, TARGET_ADDRESS)
response_socket = create_isotp_socket(ECU_ADDRESS, TARGET_ADDRESS)
request_socket = create_isotp_socket(OBD_BROADCAST_ADDRESS, OBD_TARGET_ADDRESS)
response_socket = create_isotp_socket(OBD_ECU_ADDRESS, OBD_TARGET_ADDRESS)
while True:
request = request_socket.recv()
requested_pid, requested_sid = get_sid_and_pid(request)

View File

@ -1,16 +1,13 @@
import isotp
import ecu_config
from uds import services
from addresses import UDS_ECU_ADDRESS, UDS_TARGET_ADDRESS
CAN_INTERFACE = ecu_config.get_can_interface()
ECU_ADDRESS = ecu_config.get_uds_ecu_address()
TARGET_ADDRESS = ECU_ADDRESS + 8
def start():
isotp_socket = create_isotp_socket(ECU_ADDRESS, TARGET_ADDRESS)
isotp_socket = create_isotp_socket(UDS_ECU_ADDRESS, UDS_TARGET_ADDRESS)
while True:
request = isotp_socket.recv()
if request is not None: