314 lines
13 KiB
Python
314 lines
13 KiB
Python
import matplotlib.pyplot as plt
|
|
from matplotlib.pyplot import savefig
|
|
from numpy import where, isin
|
|
from os import chdir, mkdir, path, remove
|
|
from shutil import rmtree
|
|
from PipelineTimer import PipelineTimer
|
|
from scipy.cluster.hierarchy import dendrogram
|
|
|
|
|
|
figure_format: str = "png" # png works well for quickly previewing results and draft results write ups.
|
|
# figure_format: str = "eps" # Journals/Conferences generally prefer EPS file format for camera-ready copies.
|
|
figure_dpi: int = 300
|
|
figure_transp: bool = False
|
|
|
|
arb_id_folder: str = 'figures'
|
|
cluster_folder: str = 'clusters'
|
|
j1979_folder: str = 'j1979'
|
|
threshold_folder: str = 'threshold_heatmaps'
|
|
|
|
|
|
def plot_signals_by_arb_id(a_timer: PipelineTimer, arb_id_dict: dict, signal_dict: dict, vehicle_number: str,
|
|
force: bool=False):
|
|
if path.exists(arb_id_folder):
|
|
if force:
|
|
rmtree(arb_id_folder)
|
|
else:
|
|
print("\nArbID plotting appears to have already been done and forcing is turned off. Skipping...")
|
|
return
|
|
|
|
a_timer.start_function_time()
|
|
|
|
for k_id, signals in signal_dict.items():
|
|
arb_id = arb_id_dict[k_id]
|
|
if not arb_id.static and not arb_id.short:
|
|
print("Plotting Arb ID " + str(k_id) + " (" + str(hex(k_id)) + ") for Vehicle " + vehicle_number)
|
|
a_timer.start_iteration_time()
|
|
|
|
signals_to_plot = []
|
|
# Don't plot the static signals
|
|
for k_signal, signal in signals.items():
|
|
if not signal.static:
|
|
signals_to_plot.append(signal)
|
|
# There's a corner case where the Arb ID only has static signals. This conditional accounts for this.
|
|
# TODO: This corner case should probably be reflected by arb_id.static.
|
|
if len(signals_to_plot) < 1:
|
|
continue
|
|
# One row per signal plus one for the TANG. Squeeze is used to force axes to be an array to avoid errors.
|
|
fig, axes = plt.subplots(nrows=1 + len(signals_to_plot), ncols=1)
|
|
plt.suptitle("Time Series and TANG for Arbitration ID " + hex(k_id) + " from Vehicle " + vehicle_number,
|
|
weight='bold',
|
|
position=(0.5, 1))
|
|
fig.set_size_inches(8, (1 + len(signals_to_plot) + 1) * 1.3)
|
|
# The min() statement provides whitespace for the title depending on the number of subplots.
|
|
size_adjust = len(signals_to_plot) / 100
|
|
plt.tight_layout(h_pad=1, rect=(0, 0, 1, min(0.985, 0.93 + size_adjust)))
|
|
# This adjusts whitespace padding on the left and right of the subplots
|
|
fig.subplots_adjust(left=0.07, right=0.98)
|
|
for i, signal in enumerate(signals_to_plot):
|
|
ax = axes[i]
|
|
ax.set_title(signal.plot_title,
|
|
style='italic',
|
|
size='medium')
|
|
ax.set_xlim([signal.time_series.first_valid_index(), signal.time_series.last_valid_index()])
|
|
ax.plot(signal.time_series, color='black')
|
|
# Add a 25% opacity dashed black line to the entropy gradient plot at one boundary of each sub-flow
|
|
axes[-1].axvline(x=signal.start_index, alpha=0.25, c='black', linestyle='dashed')
|
|
|
|
# Plot the entropy gradient at the bottom of the overall output
|
|
ax = axes[-1]
|
|
ax.set_title("Min-Max Normalized Transition Aggregation N-Gram (TANG)",
|
|
style='italic',
|
|
size='medium')
|
|
tang_bit_width = arb_id.tang.shape[0]
|
|
ax.set_xlim([-0.01 * tang_bit_width, 1.005 * tang_bit_width])
|
|
y = arb_id.tang[:]
|
|
# Differentiate bit positions with non-zero and zero entropy using black points and grey x respectively.
|
|
ix = isin(y, 0)
|
|
pad_bit = where(ix)
|
|
non_pad_bit = where(~ix)
|
|
ax.scatter(non_pad_bit, y[non_pad_bit], color='black', marker='o', s=10)
|
|
ax.scatter(pad_bit, y[pad_bit], color='grey', marker='^', s=10)
|
|
|
|
if not path.exists(arb_id_folder):
|
|
mkdir(arb_id_folder)
|
|
chdir(arb_id_folder)
|
|
|
|
# If you want transparent backgrounds, a different file format, etc. then change these settings accordingly.
|
|
savefig(hex(arb_id.id) + "." + figure_format,
|
|
bbox_iches='tight',
|
|
pad_inches=0.0,
|
|
dpi=figure_dpi,
|
|
format=figure_format,
|
|
transparent=figure_transp)
|
|
|
|
chdir("..")
|
|
|
|
plt.close(fig)
|
|
|
|
a_timer.set_plot_save_arb_id()
|
|
print("\tComplete...")
|
|
|
|
a_timer.set_plot_save_arb_id_dict()
|
|
|
|
|
|
def plot_signals_by_cluster(a_timer: PipelineTimer,
|
|
cluster_dict: dict,
|
|
signal_dict: dict,
|
|
use_j1979_tags: bool,
|
|
vehicle_number: str,
|
|
force: bool=False):
|
|
if path.exists(cluster_folder):
|
|
if force:
|
|
rmtree(cluster_folder)
|
|
else:
|
|
print("\nCluster plotting appears to have already been done and forcing is turned off. Skipping...")
|
|
return
|
|
|
|
a_timer.start_function_time()
|
|
|
|
print("\n")
|
|
for cluster_number, list_of_signals in cluster_dict.items():
|
|
print("Plotting cluster", cluster_number, "with " + str(len(list_of_signals)) + " signals.")
|
|
a_timer.start_iteration_time()
|
|
|
|
# Setup the plot
|
|
fig, axes = plt.subplots(nrows=len(list_of_signals), ncols=1, squeeze=False)
|
|
plt.suptitle("Signal Cluster " + str(cluster_number) + " from Vehicle " + vehicle_number,
|
|
weight='bold',
|
|
position=(0.5, 1))
|
|
fig.set_size_inches(8, (1 + len(list_of_signals)+1) * 1.3)
|
|
|
|
size_adjust = len(list_of_signals) / 100
|
|
# The min() statement provides whitespace for the suptitle depending on the number of subplots.
|
|
plt.tight_layout(h_pad=1, rect=(0, 0, 1, min(0.985, 0.93 + size_adjust)))
|
|
# This adjusts whitespace padding on the left and right of the subplots
|
|
fig.subplots_adjust(left=0.07, right=0.98)
|
|
|
|
# Plot the time series of each signal in the cluster
|
|
for i, signal_key in enumerate(list_of_signals):
|
|
signal = signal_dict[signal_key[0]][signal_key]
|
|
ax = axes[i, 0]
|
|
if signal.j1979_title and use_j1979_tags:
|
|
this_title = signal.plot_title + " [" + signal.j1979_title + \
|
|
" (PCC:" + str(round(signal.j1979_pcc, 2)) + ")]"
|
|
else:
|
|
this_title = signal.plot_title
|
|
ax.set_title(this_title,
|
|
style='italic',
|
|
size='medium')
|
|
ax.set_xlim([signal.time_series.first_valid_index(), signal.time_series.last_valid_index()])
|
|
ax.plot(signal.time_series, color='black')
|
|
|
|
if not path.exists(cluster_folder):
|
|
mkdir(cluster_folder)
|
|
chdir(cluster_folder)
|
|
|
|
# If you want transparent backgrounds, a different file format, etc. then change these settings accordingly.
|
|
savefig("cluster_" + str(cluster_number) + "." + figure_format,
|
|
bbox_iches='tight',
|
|
pad_inches=0.0,
|
|
dpi=figure_dpi,
|
|
format=figure_format,
|
|
transparent=figure_transp)
|
|
|
|
chdir("..")
|
|
|
|
plt.close(fig)
|
|
|
|
a_timer.set_plot_save_cluster()
|
|
print("\tComplete...")
|
|
|
|
a_timer.set_plot_save_cluster_dict()
|
|
|
|
|
|
def plot_j1979(a_timer: PipelineTimer, j1979_dict: dict, vehicle_number: str, force: bool=False):
|
|
if path.exists(j1979_folder):
|
|
if force:
|
|
rmtree(j1979_folder)
|
|
else:
|
|
print("\nJ1979 plotting appears to have already been done and forcing is turned off. Skipping...")
|
|
return
|
|
|
|
a_timer.start_function_time()
|
|
|
|
print("Plotting J1979 response data")
|
|
plot_length = len(j1979_dict.keys())
|
|
|
|
# Setup the plot
|
|
fig, axes = plt.subplots(nrows=plot_length, ncols=1, squeeze=False)
|
|
plt.suptitle("J1979 Data Collected from Vehicle " + vehicle_number,
|
|
weight='bold',
|
|
position=(0.5, 1))
|
|
fig.set_size_inches(8, (1 + plot_length) * 1.3)
|
|
|
|
size_adjust = plot_length / 100
|
|
# The min() statement provides whitespace for the suptitle depending on the number of subplots.
|
|
plt.tight_layout(h_pad=1, rect=(0, 0, 1, min(0.985, 0.93 + size_adjust)))
|
|
# This adjusts whitespace padding on the left and right of the subplots
|
|
fig.subplots_adjust(left=0.07, right=0.98)
|
|
|
|
# Plot the time series of each signal in the cluster
|
|
for i, (pid, data) in enumerate(j1979_dict.items()):
|
|
a_timer.start_iteration_time()
|
|
ax = axes[i, 0]
|
|
ax.set_title("PID " + str(hex(pid)) + ": " + data.title,
|
|
style='italic',
|
|
size='medium')
|
|
ax.set_xlim([data.data.first_valid_index(), data.data.last_valid_index()])
|
|
ax.plot(data.data, color='black')
|
|
a_timer.set_plot_save_j1979_pid()
|
|
|
|
if not path.exists(j1979_folder):
|
|
mkdir(j1979_folder)
|
|
chdir(j1979_folder)
|
|
|
|
# If you want transparent backgrounds, a different file format, etc. then change these settings accordingly.
|
|
savefig("j1979." + figure_format,
|
|
bbox_iches='tight',
|
|
pad_inches=0.0,
|
|
dpi=figure_dpi,
|
|
format=figure_format,
|
|
transparent=figure_transp)
|
|
|
|
chdir("..")
|
|
plt.close(fig)
|
|
|
|
a_timer.set_plot_save_j1979_dict()
|
|
print("\tComplete...")
|
|
|
|
|
|
def plot_sample_threshold_heatmap(sample):
|
|
this_figure_name = "alignment_scores_" + sample.output_vehicle_dir + "." + figure_format
|
|
|
|
if path.exists(threshold_folder):
|
|
chdir(threshold_folder)
|
|
if path.isfile(this_figure_name):
|
|
if sample.force_threshold_plot:
|
|
remove(this_figure_name)
|
|
else:
|
|
print("\nThreshold heatmap plotting for " + sample.output_vehicle_dir + " already complete.")
|
|
print("\tHeatmap plot forcing is turned off. Skipping...")
|
|
chdir("..")
|
|
return
|
|
chdir("..")
|
|
|
|
print("\tPlotting threshold parameter-Alignment Score heatmap for " + sample.output_vehicle_dir)
|
|
if not path.exists(threshold_folder):
|
|
mkdir(threshold_folder)
|
|
chdir(threshold_folder)
|
|
|
|
fig, ax = plt.subplots()
|
|
halfway_mark = int(round(sample.avg_score_matrix.shape[0]/2, 0))
|
|
sample.avg_score_matrix = sample.avg_score_matrix[0:halfway_mark, 0:halfway_mark].astype(float)
|
|
sample.validator.set_lex_threshold_parameters(sample)
|
|
im = ax.imshow(sample.avg_score_matrix, cmap='inferno', interpolation='none', vmin=0.0, vmax=1.0)
|
|
# im = ax.imshow(sample.avg_score_matrix, cmap='Greys', interpolation='nearest')
|
|
# ax.set_xticks(arange(sample.avg_score_matrix.shape[1]))
|
|
# ax.set_yticks(arange(sample.avg_score_matrix.shape[0]))
|
|
cbar = ax.figure.colorbar(im, ax=ax)
|
|
cbar.ax.set_ylabel("Alignment Score", rotation=-90, va="bottom")
|
|
ax.set_title("Average Alignment Score as a Function of \n Lexical Analysis Threshold Parameters for " +
|
|
sample.year + " " + sample.make + " " + sample.model + "\nInversion: " +
|
|
str(round(sample.optimal_bit_dist, 2)) + " Merge: " + str(round(sample.optimal_merge_dist, 2)) +
|
|
" Score: " +
|
|
str(round(sample.avg_score_matrix[sample.optimal_bit_dist, sample.optimal_merge_dist], 2)))
|
|
fig.tight_layout()
|
|
|
|
# If you want transparent backgrounds, a different file format, etc. then change these settings accordingly.
|
|
savefig(this_figure_name,
|
|
bbox_iches='tight',
|
|
pad_inches=0.0,
|
|
dpi=figure_dpi,
|
|
format=figure_format,
|
|
transparent=figure_transp)
|
|
|
|
plt.close()
|
|
chdir("..")
|
|
print("\t\tComplete...")
|
|
|
|
|
|
def plot_dendrogram(a_timer: PipelineTimer,
|
|
linkage_matrix,
|
|
threshold: float,
|
|
vehicle_number: str,
|
|
force: bool = False):
|
|
dendrogram_filename = "dendrogram_" + vehicle_number + "." + figure_format
|
|
if path.isfile(dendrogram_filename):
|
|
if force:
|
|
remove(dendrogram_filename)
|
|
else:
|
|
print("Dendrogram already plotted. Skipping...")
|
|
return
|
|
plt.figure(figsize=(7, 7), dpi=600)
|
|
R: dict = dendrogram(Z=linkage_matrix, orientation='top', distance_sort='ascending', no_labels=True)
|
|
plt.title("Dendrogram of Agglomerative Clustering for Vehicle " + vehicle_number)
|
|
plt.xlabel("Signals Observed")
|
|
plt.ylabel("Single Linkage Cluster Merge Distance")
|
|
xmin, xmax = plt.xlim()
|
|
# Add a 25% opacity dashed black line to the entropy gradient plot at one boundary of each sub-flow
|
|
plt.hlines(y=threshold, xmin=xmin, xmax=xmax, alpha=0.25, colors='black', linestyle='dashed',
|
|
label='cluster threshold')
|
|
plt.legend(loc='upper right')
|
|
|
|
print("\tPlotting dendrogram and saving to " + dendrogram_filename)
|
|
|
|
savefig(dendrogram_filename,
|
|
bbox_iches='tight',
|
|
pad_inches=0.0,
|
|
dpi=600,
|
|
format=figure_format,
|
|
transparent=figure_transp)
|
|
plt.close()
|
|
print("\t\tComplete...")
|