Added UDS service 0x10

This commit is contained in:
Anubis 2020-02-07 16:12:43 +01:00
parent 8b18407431
commit bc32248a4f
2 changed files with 105 additions and 44 deletions

View File

@ -3,17 +3,30 @@ from uds import services
import ecu_config as ecu_config
import dtc_utils
ECU_RESET = 0x11
DIAGNOSTIC_SESSION_CONTROL_SID = 0x10
DIAGNOSTIC_SESSION_TYPES = [0x01, 0x02, 0x03, 0x04]
DIAGNOSTIC_SESSION_INVALID_TYPE = 0x05
DIAGNOSTIC_SESSION_PARAMETER_RECORD = [0x00, 0x1E, 0x0B, 0xB8]
ECU_RESET_SID = 0x11
ECU_RESET_HARD = 0x01
ECU_RESET_ENABLE_RAPID_POWER_DOWN = 0x04
READ_DTC_INFO = 0x19
ECU_RESET_POWER_DOWN_TIME = 0x0f
READ_DTC_INFO_SID = 0x19
READ_DTC_INFO_BY_STATUS_MASK = 0x02
DTC_STATUS_AVAILABILITY_MASK = 0xFF
READ_DTC_STATUS_AVAILABILITY_MASK = 0xFF
POSITIVE_RESPONSE_MASK = 0x40
NEGATIVE_RESPONSE_ID = 0x7F
@ -21,79 +34,105 @@ NRC_SUB_FUNCTION_NOT_SUPPORTED = 0x12
NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT = 0x13
POSITIVE_RESPONSE_MASK = 0x40
def get_positive_response_sid(sid):
def get_response_sid(sid):
return bytes([POSITIVE_RESPONSE_MASK + sid])
class TestUdsServices(unittest.TestCase):
def test_process_service_0x11_with_hard_reset(self):
request = bytes([ECU_RESET]) + bytes([ECU_RESET_HARD])
def test_process_service_0x10(self):
request = bytes([DIAGNOSTIC_SESSION_CONTROL_SID]) + bytes([DIAGNOSTIC_SESSION_TYPES[0]])
response = services.process_service_request(request)
expected_response = get_positive_response_sid(ECU_RESET) + bytes([ECU_RESET_HARD])
expected_response = get_response_sid(DIAGNOSTIC_SESSION_CONTROL_SID) + bytes(
[DIAGNOSTIC_SESSION_TYPES[0]]) + bytes(DIAGNOSTIC_SESSION_PARAMETER_RECORD)
self.assertIsNotNone(response)
self.assertEqual(6, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x10_with_unsupported_session_type_returns_negative_response(self):
request = bytes([DIAGNOSTIC_SESSION_CONTROL_SID]) + bytes([DIAGNOSTIC_SESSION_INVALID_TYPE])
response = services.process_service_request(request)
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([DIAGNOSTIC_SESSION_CONTROL_SID]) + bytes(
[NRC_SUB_FUNCTION_NOT_SUPPORTED])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x10_with_invalid_message_length_returns_negative_response(self):
request = bytes([DIAGNOSTIC_SESSION_CONTROL_SID])
response = services.process_service_request(request)
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([DIAGNOSTIC_SESSION_CONTROL_SID]) + bytes(
[NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x11_with_hard_reset(self):
request = bytes([ECU_RESET_SID]) + bytes([ECU_RESET_HARD])
response = services.process_service_request(request)
expected_response = get_response_sid(ECU_RESET_SID) + bytes([ECU_RESET_HARD])
self.assertIsNotNone(response)
self.assertEqual(2, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x11_with_enable_power_shut_down(self):
request = bytes([ECU_RESET]) + bytes([ECU_RESET_ENABLE_RAPID_POWER_DOWN])
request = bytes([ECU_RESET_SID]) + bytes([ECU_RESET_ENABLE_RAPID_POWER_DOWN])
response = services.process_service_request(request)
expected_response = get_positive_response_sid(ECU_RESET) + bytes([ECU_RESET_ENABLE_RAPID_POWER_DOWN]) + bytes([services.ECU_RESET_POWER_DOWN_TIME])
expected_response = get_response_sid(ECU_RESET_SID) + bytes([ECU_RESET_ENABLE_RAPID_POWER_DOWN]) + bytes(
[ECU_RESET_POWER_DOWN_TIME])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x11_with_unsupported_reset_type_returns_negative_response(self):
request = bytes([ECU_RESET]) + bytes([0x06])
request = bytes([ECU_RESET_SID]) + bytes([0x06])
response = services.process_service_request(request)
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([ECU_RESET]) + bytes(
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([ECU_RESET_SID]) + bytes(
[NRC_SUB_FUNCTION_NOT_SUPPORTED])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x11_with_invalid_message_length_returns_negative_response(self):
request = bytes([ECU_RESET])
request = bytes([ECU_RESET_SID])
response = services.process_service_request(request)
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([ECU_RESET]) + bytes(
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([ECU_RESET_SID]) + bytes(
[NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x19(self):
request = bytes([READ_DTC_INFO]) + bytes([READ_DTC_INFO_BY_STATUS_MASK])
request = bytes([READ_DTC_INFO_SID]) + bytes([READ_DTC_INFO_BY_STATUS_MASK])
response = services.process_service_request(request)
dtcs = dtc_utils.encode_uds_dtcs(ecu_config.get_dtcs())
expected_response = get_positive_response_sid(READ_DTC_INFO) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes(
[DTC_STATUS_AVAILABILITY_MASK]) + dtcs
expected_response = get_response_sid(READ_DTC_INFO_SID) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes(
[READ_DTC_STATUS_AVAILABILITY_MASK]) + dtcs
self.assertIsNotNone(response)
self.assertEqual(3 + len(dtcs), len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x19_with_unsupported_sub_function_returns_negative_response(self):
request = bytes([READ_DTC_INFO]) + bytes([0x03])
request = bytes([READ_DTC_INFO_SID]) + bytes([0x03])
response = services.process_service_request(request)
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([READ_DTC_INFO]) + bytes(
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([READ_DTC_INFO_SID]) + bytes(
[NRC_SUB_FUNCTION_NOT_SUPPORTED])
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
def test_process_service_0x19_with_invalid_message_length_returns_negative_response(self):
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([READ_DTC_INFO]) + bytes(
expected_response = bytes([NEGATIVE_RESPONSE_ID]) + bytes([READ_DTC_INFO_SID]) + bytes(
[NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT])
request = bytes([READ_DTC_INFO])
request = bytes([READ_DTC_INFO_SID])
response = services.process_service_request(request)
self.assertIsNotNone(response)
self.assertEqual(3, len(response))
self.assertEqual(expected_response.hex(), response.hex())
request = bytes([READ_DTC_INFO]) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes([0x01])
request = bytes([READ_DTC_INFO_SID]) + bytes([READ_DTC_INFO_BY_STATUS_MASK]) + bytes([0x01])
response = services.process_service_request(request)
self.assertIsNotNone(response)
self.assertEqual(3, len(response))

View File

@ -2,9 +2,11 @@ import dtc_utils
import ecu_config as ecu_config
from loggers.logger_app import logger
READ_DTC_INFO_BY_STATUS_MASK = 0x2
DIAGNOSTIC_SESSION_CONTROL_SID = 0x10
READ_DTC_INFO_SID = 0x19
DIAGNOSTIC_SESSION_TYPES = [0x01, 0x02, 0x03, 0x04]
DIAGNOSTIC_SESSION_PARAMETER_RECORD = [0x00, 0x1E, 0x0B, 0xB8]
ECU_RESET_SID = 0x11
@ -12,7 +14,15 @@ ECU_RESET_ENABLE_RAPID_POWER_SHUT_DOWN = 0x04
ECU_RESET_POWER_DOWN_TIME = 0x0F
DTC_STATUS_AVAILABILITY_MASK = 0xFF
READ_DTC_INFO_BY_STATUS_MASK = 0x2
READ_DTC_INFO_SID = 0x19
READ_DTC_STATUS_AVAILABILITY_MASK = 0xFF
DTCS = dtc_utils.encode_uds_dtcs(ecu_config.get_dtcs())
POSITIVE_RESPONSE_SID_MASK = 0x40
NEGATIVE_RESPONSE_SID = 0x7F
@ -20,13 +30,11 @@ NRC_SUB_FUNCTION_NOT_SUPPORTED = 0x12
NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT = 0x13
POSITIVE_RESPONSE_SID_MASK = 0x40
DTCS = dtc_utils.encode_uds_dtcs(ecu_config.get_dtcs())
SERVICES = [
{"id": ECU_RESET_SID, "description": "ECUReset", "response": lambda request: get_0x11_response(request)},
{"id": READ_DTC_INFO_SID, "description": "ReadDTCInformation", "response": lambda request: get_0x19_response(request)}
{"id": READ_DTC_INFO_SID, "description": "ReadDTCInformation", "response": lambda request: get_0x19_response(request)},
{"id": DIAGNOSTIC_SESSION_CONTROL_SID, "description": "DiagnosticSessionControl", "response": lambda request: get_0x10_response(request)}
]
@ -43,11 +51,21 @@ def process_service_request(request):
return None
def get_0x10_response(request):
if len(request) == 2:
session_type = request[1]
if session_type in DIAGNOSTIC_SESSION_TYPES:
return get_positive_response_sid(DIAGNOSTIC_SESSION_CONTROL_SID) + bytes([session_type]) \
+ bytes(DIAGNOSTIC_SESSION_PARAMETER_RECORD)
return get_negative_response(DIAGNOSTIC_SESSION_CONTROL_SID, NRC_SUB_FUNCTION_NOT_SUPPORTED)
return get_negative_response(DIAGNOSTIC_SESSION_CONTROL_SID, NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT)
def get_0x11_response(request):
if len(request) == 2:
reset_type = request[1]
if is_reset_type_supported(reset_type):
positive_response = get_positive_response(ECU_RESET_SID, reset_type)
positive_response = get_positive_response_sid(ECU_RESET_SID) + bytes([reset_type])
if reset_type == ECU_RESET_ENABLE_RAPID_POWER_SHUT_DOWN:
return positive_response + bytes([ECU_RESET_POWER_DOWN_TIME])
return positive_response
@ -55,25 +73,29 @@ def get_0x11_response(request):
return get_negative_response(ECU_RESET_SID, NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT)
def is_reset_type_supported(reset_type):
return 0x05 >= reset_type >= 0x01
def get_0x19_response(request):
if len(request) == 2:
sub_function = request[1]
if sub_function == READ_DTC_INFO_BY_STATUS_MASK:
positive_response = get_positive_response(READ_DTC_INFO_SID, READ_DTC_INFO_BY_STATUS_MASK) + bytes(
[DTC_STATUS_AVAILABILITY_MASK])
if len(DTCS) > 0:
return positive_response + DTCS
return positive_response
report_type = request[1]
if report_type == READ_DTC_INFO_BY_STATUS_MASK:
positive_response = get_positive_response_sid(READ_DTC_INFO_SID) + bytes([report_type]) \
+ bytes([READ_DTC_STATUS_AVAILABILITY_MASK])
return add_dtcs_to_response(positive_response)
return get_negative_response(READ_DTC_INFO_SID, NRC_SUB_FUNCTION_NOT_SUPPORTED)
return get_negative_response(READ_DTC_INFO_SID, NRC_INCORRECT_MESSAGE_LENGTH_OR_INVALID_FORMAT)
def get_positive_response(sid, sub_function):
return bytes([sid + POSITIVE_RESPONSE_SID_MASK]) + bytes([sub_function])
def is_reset_type_supported(reset_type):
return 0x05 >= reset_type >= 0x01
def add_dtcs_to_response(response):
if len(DTCS) > 0:
return response + DTCS
return response
def get_positive_response_sid(requested_sid):
return bytes([requested_sid + POSITIVE_RESPONSE_SID_MASK])
def get_negative_response(sid, nrc):