Moved write_to_file to logger_utils

This commit is contained in:
Anubis 2020-02-03 20:18:32 +01:00
parent f7f0963ff3
commit ad0b62601b
4 changed files with 44 additions and 39 deletions

View File

@ -2,17 +2,19 @@ import logging
from logging import handlers
from loggers.logger_utils import MAX_LOG_FILE_SIZE
LOGGER_NAME = 'ecu_simulator'
LOGGER_NAME = "ecu_simulator"
LOGGER_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
logger = logging.getLogger(LOGGER_NAME)
LOGGER_FORMAT = "%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s"
LOG_FILE_NAME = LOGGER_NAME + ".log"
logger = logging.getLogger(LOGGER_NAME)
def configure():
formatter = logging.Formatter(LOGGER_FORMAT)
formatter = logging.Formatter(LOGGER_FORMAT, datefmt=DATE_FORMAT)
__add_file_handler(formatter)
__add_console_handler(formatter)
logger.setLevel(logging.DEBUG)

View File

@ -11,12 +11,11 @@ CAN_MASK = 0x7FF
def start():
bus = create_can_bus()
log_file = logger_utils.create_file_path(LOG_TYPE)
file_path = logger_utils.create_file_path(LOG_TYPE)
while True:
log_file = logger_utils.create_new_file_path_if_size_exceeded(log_file, LOG_TYPE)
logger = can.Logger(log_file, append=True)
logger.on_message_received(bus.recv())
logger.stop()
file_path = logger_utils.create_new_file_path_if_size_exceeded(file_path, LOG_TYPE)
message = bus.recv()
logger_utils.write_to_file(file_path, message.timestamp, message.arbitration_id, message.data)
def create_can_bus():
@ -35,4 +34,3 @@ def get_can_ids():
can_ids.extend(ECU_ADDRESSES)
can_ids.extend(TARGET_ADDRESSES)
return can_ids

View File

@ -1,5 +1,4 @@
import isotp
import time
from loggers import logger_utils
from loggers.logger_utils import CAN_INTERFACE
from addresses import UDS_ECU_ADDRESS, UDS_TARGET_ADDRESS
@ -23,14 +22,15 @@ def start():
obd_request = obd_socket_req.recv()
obd_response = obd_socket_res.recv()
file_path = logger_utils.create_new_file_path_if_size_exceeded(file_path, LOG_TYPE)
if uds_request is not None:
write_to_log(file_path, uds_request, UDS_ECU_ADDRESS)
logger_utils.write_to_file(file_path, None, UDS_ECU_ADDRESS, uds_request)
if uds_response is not None:
write_to_log(file_path, uds_response, UDS_TARGET_ADDRESS)
logger_utils.write_to_file(file_path, None, UDS_TARGET_ADDRESS, uds_response)
if obd_request is not None:
write_to_log(file_path, obd_request, OBD_BROADCAST_ADDRESS)
logger_utils.write_to_file(file_path, None, OBD_BROADCAST_ADDRESS, obd_request)
if obd_response is not None:
write_to_log(file_path, obd_response, OBD_TARGET_ADDRESS)
logger_utils.write_to_file(file_path, None, OBD_TARGET_ADDRESS, obd_response)
def create_socket(rxid, txid):
@ -39,25 +39,3 @@ def create_socket(rxid, txid):
socket.bind(CAN_INTERFACE, isotp.Address(rxid=rxid, txid=txid))
return socket
def write_to_log(file_path, message, address):
file_path = logger_utils.create_new_file_path_if_size_exceeded(file_path, LOG_TYPE)
log_file = open(file_path, "a")
log_file.write(create_log(address, message))
log_file.close()
def create_log(address, message):
return get_time() + " " + CAN_INTERFACE + " " + format_address(address) + "#" + format_msg(message) + "\n"
def get_time():
return "(" + "{0:.6f}".format(time.time()) + ")"
def format_address(address):
return hex(address).lstrip("0x").upper()
def format_msg(message):
return message.hex().upper()

View File

@ -9,6 +9,10 @@ MAX_LOG_FILE_SIZE = 1500000 # bytes
LOG_FILE_NAME_FORMAT = "_%y%m%d%H%M%S.log"
def create_file_path(log_type):
return os.path.join(os.path.dirname("ecu_simulator"), datetime.datetime.now().strftime(log_type + LOG_FILE_NAME_FORMAT))
def create_new_file_path_if_size_exceeded(file_path, log_type):
if os.path.exists(file_path):
if os.path.getsize(file_path) > MAX_LOG_FILE_SIZE:
@ -16,5 +20,28 @@ def create_new_file_path_if_size_exceeded(file_path, log_type):
return file_path
def create_file_path(log_type):
return os.path.join(os.path.dirname("ecu_simulator"), datetime.datetime.now().strftime(log_type + LOG_FILE_NAME_FORMAT))
def write_to_file(file_path, timestamp, address, data):
log_file = open(file_path, "a")
formatted_log = format_log(get_timestamp(timestamp), address, data)
log_file.write(formatted_log)
log_file.close()
def get_timestamp(timestamp):
if timestamp is None:
return create_timestamp()
return to_iso8601(timestamp)
def create_timestamp():
return str(datetime.datetime.now().isoformat(timespec="milliseconds"))
def to_iso8601(timestamp):
return str(datetime.datetime.fromtimestamp(timestamp).isoformat(timespec="milliseconds"))
def format_log(timestamp, address, data):
return timestamp + " " + CAN_INTERFACE + " " + hex(address) + " " + "0x" + data.hex() + "\n"