Implemented UDS service 0x19

This commit is contained in:
Anubis 2020-01-12 15:50:49 +01:00
parent a0db88f57d
commit 582077820f
3 changed files with 78 additions and 29 deletions

View File

@ -6,10 +6,10 @@ DTC_LENGTH = 5
BIG_ENDIAN = "big"
DEFAULT_DTC_STATUS = 0x2F
UDS_DTC_HIGH_BYTE = 0x01
UDS_DTC_DEFAULT_STATUS = 0x2F
def encode_obd_dtcs(dtcs):
dtcs_bytes = bytearray()
@ -24,7 +24,7 @@ def encode_uds_dtcs(dtcs):
for dtc in dtcs:
if is_dtc_valid(dtc):
dtcs_bytes += get_dtc_first_byte(dtc) + get_dtc_second_byte(dtc) + bytes([UDS_DTC_HIGH_BYTE]) + \
bytes([DEFAULT_DTC_STATUS])
bytes([UDS_DTC_DEFAULT_STATUS])
return dtcs_bytes

View File

@ -67,11 +67,36 @@ class TestUdsServices(unittest.TestCase):
def test_process_service_0x19(self):
request = bytes([READ_DTC_INFO]) + bytes([READ_DTC_INFO_BY_STATUS_MASK])
response = services.process_service_request(request)
dtcs = ecu_config.get_dtcs()
expected_response = get_positive_response_sid(READ_DTC_INFO) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes([DTC_STATUS_AVAILABILITY_MASK]) + dtc_utils.encode_uds_dtcs(dtcs)
dtcs = dtc_utils.encode_uds_dtcs(ecu_config.get_dtcs())
expected_response = get_positive_response_sid(READ_DTC_INFO) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes(
[DTC_STATUS_AVAILABILITY_MASK]) + dtcs
self.assertIsNotNone(response)
# self.assertEqual(2, len(response))
self.assertEqual(3 + len(dtcs), len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x19_with_unsupported_sub_function_returns_negative_response(self):
request = bytes([READ_DTC_INFO]) + bytes([0x03])
response = services.process_service_request(request)
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([READ_DTC_INFO]) + bytes(
[NRC_SUB_FUNCTION_NOT_SUPPORTED])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x19_with_invalid_message_length_returns_negative_response(self):
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([READ_DTC_INFO]) + bytes(
[NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT])
request = bytes([READ_DTC_INFO])
response = services.process_service_request(request)
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
request = bytes([READ_DTC_INFO]) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes([0x01])
response = services.process_service_request(request)
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())

View File

@ -1,5 +1,10 @@
import dtc_utils
import ecu_config_reader as ecu_config
READ_DTC_INFO_BY_STATUS_MASK = 0x2
READ_DTC_INFO_SID = 0x19
ECU_RESET_SID = 0x11
ECU_RESET_ENABLE_RAPID_POWER_SHUT_DOWN = 0x04
@ -16,31 +21,11 @@ NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT = 0x13
POSITIVE_RESPONSE_SID_MASK = 0x40
def get_0x19_response(request):
return bytes([POSITIVE_RESPONSE_SID_MASK + READ_DTC_INFO_SID]) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes([DTC_STATUS_AVAILABILITY_MASK])
def is_reset_type_supported(reset_type):
return 0x05 >= reset_type >= 0x01
def get_0x11_response(request):
negative_response = bytes([NEGATIVE_RESPONSE_SID]) + bytes([ECU_RESET_SID])
if len(request) == 2:
reset_type = request[1]
if is_reset_type_supported(reset_type):
positive_response = bytes([POSITIVE_RESPONSE_SID_MASK + ECU_RESET_SID]) + bytes([reset_type])
if reset_type == ECU_RESET_ENABLE_RAPID_POWER_SHUT_DOWN:
return positive_response + bytes([ECU_RESET_POWER_DOWN_TIME])
return positive_response
return negative_response + bytes([NRC_SUB_FUNCTION_NOT_SUPPORTED])
return negative_response + bytes([NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT])
DTCS = dtc_utils.encode_uds_dtcs(ecu_config.get_dtcs())
SERVICES = [
{"id": ECU_RESET_SID, "description": "ECUReset", "response": lambda request: get_0x11_response(request)},
{"id": 0x19, "description": "ReadDTCInformation", "response": lambda request: get_0x19_response(request)}
{"id": READ_DTC_INFO_SID, "description": "ReadDTCInformation", "response": lambda request: get_0x19_response(request)}
]
@ -51,3 +36,42 @@ def process_service_request(request):
if service.get("id") == sid:
return service.get("response")(request)
return None
def get_0x11_response(request):
if len(request) == 2:
reset_type = request[1]
if is_reset_type_supported(reset_type):
positive_response = get_positive_response(ECU_RESET_SID, reset_type)
if reset_type == ECU_RESET_ENABLE_RAPID_POWER_SHUT_DOWN:
return positive_response + bytes([ECU_RESET_POWER_DOWN_TIME])
return positive_response
return get_negative_response(ECU_RESET_SID, NRC_SUB_FUNCTION_NOT_SUPPORTED)
return get_negative_response(ECU_RESET_SID, NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT)
def is_reset_type_supported(reset_type):
return 0x05 >= reset_type >= 0x01
def get_0x19_response(request):
if len(request) == 2:
sub_function = request[1]
if sub_function == READ_DTC_INFO_BY_STATUS_MASK:
positive_response = get_positive_response(READ_DTC_INFO_SID, READ_DTC_INFO_BY_STATUS_MASK) + bytes(
[DTC_STATUS_AVAILABILITY_MASK])
if len(DTCS) > 0:
return positive_response + DTCS
return positive_response
return get_negative_response(READ_DTC_INFO_SID, NRC_SUB_FUNCTION_NOT_SUPPORTED)
return get_negative_response(READ_DTC_INFO_SID, NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT)
def get_positive_response(sid, sub_function):
return bytes([sid + POSITIVE_RESPONSE_SID_MASK]) + bytes([sub_function])
def get_negative_response(sid, nrc):
return bytes([NEGATIVE_RESPONSE_SID]) + bytes([sid]) + bytes([nrc])