Added uds_listener

This commit is contained in:
lbenthins 2020-01-06 12:53:32 +01:00
parent 672119d885
commit 1d8c8be05a
4 changed files with 53 additions and 8 deletions

View File

@ -21,11 +21,15 @@
},
"obd_broadcast_address": {
"value": "0x7DF",
"description": "11-Bit broadcast address that the ECU listens on to receive OBD requests (functional address). The target address is obd_ecu_address + 0x8"
"description": "11-Bit broadcast address the ECU uses to receive OBD requests (functional addressing). The target address is: obd_ecu_address + 0x8"
},
"obd_ecu_address": {
"value": "0x7E0",
"description": "11-Bit ECU address used to response to an OBD request (physical address). The target address is obd_ecu_address + 0x8"
"description": "11-Bit physical address the ECU uses to response to an OBD request. The target address is: obd_ecu_address + 0x8"
},
"uds_ecu_address": {
"value": "0x7E1",
"description": "11-Bit physical address the ECU uses to receive and response to an UDS request (physical addressing). The target address is: uds_ecu_address + 0x8. The UDS module does not use functional addressing"
},
"can_interface": {
"value": "vcan0",

View File

@ -27,16 +27,20 @@ def get_dtcs():
def get_obd_broadcast_address():
try:
return int(CONFIG["obd_broadcast_address"].get("value"), 16)
except ValueError as error:
print(error)
exit(1)
return create_address(CONFIG["obd_broadcast_address"].get("value"))
def get_obd_ecu_address():
return create_address(CONFIG["obd_ecu_address"].get("value"))
def get_uds_ecu_address():
return create_address(CONFIG["uds_ecu_address"].get("value"))
def create_address(address):
try:
return int(CONFIG["obd_ecu_address"].get("value"), 16)
return int(address, 16)
except ValueError as error:
print(error)
exit(1)
@ -56,3 +60,6 @@ def get_can_bitrate():
def get_isotp_ko_file_path():
return CONFIG["isotp_ko_file_path"].get("value")

View File

@ -2,6 +2,7 @@ import sys
from threading import Thread
import os
import obd_listener
import uds_listener
import ecu_config_reader as ecu_config
VCAN_SETUP_FILE = "vcan_setup.sh"
@ -12,6 +13,7 @@ CAN_SETUP_FILE = "can_setup.sh"
def main():
set_up_can_interface()
start_obd_listener_thread()
start_uds_listener_thread()
def set_up_can_interface():
@ -29,5 +31,9 @@ def start_obd_listener_thread():
Thread(target=obd_listener.start).start()
def start_uds_listener_thread():
Thread(target=uds_listener.start).start()
if __name__ == '__main__':
sys.exit(main())

28
uds_listener.py Normal file
View File

@ -0,0 +1,28 @@
import isotp
import ecu_config_reader as ecu_config
from uds import services
CAN_INTERFACE = ecu_config.get_can_interface()
ECU_ADDRESS = ecu_config.get_uds_ecu_address()
TARGET_ADDRESS = ECU_ADDRESS + 8
def start():
isotp_socket = create_isotp_socket(ECU_ADDRESS, TARGET_ADDRESS)
while True:
request = isotp_socket.recv()
if request is not None:
if len(request) >= 1:
response = services.process_service_request(request)
if response is not None:
print("Response: " + response.hex())
isotp_socket.send(response)
def create_isotp_socket(receiver_address, target_address):
socket = isotp.socket()
socket.bind(CAN_INTERFACE, isotp.Address(rxid=receiver_address, txid=target_address))
return socket