Upgraded simulations using numba
This commit is contained in:
parent
aba3257de6
commit
92c30cdf65
|
@ -6,7 +6,7 @@ Created on Tue Dec 20 2022
|
||||||
@author: Emilio Martínez <emilio.martinez@fing.edu.uy>
|
@author: Emilio Martínez <emilio.martinez@fing.edu.uy>
|
||||||
|
|
||||||
Script that reads all images in folder and simulates HDMI tempest capture
|
Script that reads all images in folder and simulates HDMI tempest capture
|
||||||
if there is no subfolder with it's name.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
#%%
|
#%%
|
||||||
|
@ -20,7 +20,7 @@ import numpy as np
|
||||||
from skimage.io import imread
|
from skimage.io import imread
|
||||||
from scipy import signal
|
from scipy import signal
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from utils.DTutils import TMDS_encoding, TMDS_serial
|
from utils.DTutils import TMDS_encoding_original, TMDS_serial
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
#%%
|
#%%
|
||||||
|
@ -39,14 +39,14 @@ def get_subfolders_names_from_folder(folder):
|
||||||
def image_transmition_simulation(I, blanking=False):
|
def image_transmition_simulation(I, blanking=False):
|
||||||
|
|
||||||
# Encode image for TMDS
|
# Encode image for TMDS
|
||||||
I_TMDS = TMDS_encoding (I, blanking = blanking)
|
I_TMDS = TMDS_encoding_original (I, blanking = blanking)
|
||||||
|
|
||||||
# Serialize pixel bits and sum channel signals
|
# Serialize pixel bits and sum channel signals
|
||||||
I_TMDS_Tx = TMDS_serial(I_TMDS)
|
I_TMDS_Tx = TMDS_serial(I_TMDS)
|
||||||
|
|
||||||
return I_TMDS_Tx, I_TMDS.shape
|
return I_TMDS_Tx, I_TMDS.shape
|
||||||
|
|
||||||
def image_capture_simulation(I_Tx, h_total, v_total, N_harmonic, noise_std,
|
def image_capture_simulation(I_Tx, h_total, v_total, N_harmonic, noise_std=0,
|
||||||
fps=60):
|
fps=60):
|
||||||
|
|
||||||
# Compute pixelrate and bitrate
|
# Compute pixelrate and bitrate
|
||||||
|
@ -54,7 +54,7 @@ def image_capture_simulation(I_Tx, h_total, v_total, N_harmonic, noise_std,
|
||||||
bit_rate = 10*px_rate
|
bit_rate = 10*px_rate
|
||||||
|
|
||||||
# Continuous samples (interpolate)
|
# Continuous samples (interpolate)
|
||||||
interpolator = 1
|
interpolator = int(np.ceil(N_harmonic/5)) # Condition for sampling rate and
|
||||||
sample_rate = interpolator*bit_rate
|
sample_rate = interpolator*bit_rate
|
||||||
Nsamples = interpolator*(10*h_total*v_total)
|
Nsamples = interpolator*(10*h_total*v_total)
|
||||||
if interpolator > 1:
|
if interpolator > 1:
|
||||||
|
@ -115,67 +115,40 @@ def main():
|
||||||
foldername = sys.argv[-1]
|
foldername = sys.argv[-1]
|
||||||
|
|
||||||
# Get images and subfolders names
|
# Get images and subfolders names
|
||||||
images_tmp = get_images_names_from_folder(foldername)
|
images = get_images_names_from_folder(foldername)
|
||||||
old_subfolders = get_subfolders_names_from_folder(foldername)
|
|
||||||
|
|
||||||
# Keep images without dedicated folders only
|
simulations_folder = foldername+'/simulations/'
|
||||||
images = []
|
|
||||||
new_subfolders = []
|
|
||||||
for image in images_tmp:
|
|
||||||
image_name = image.split('.')[0]
|
|
||||||
if image_name not in old_subfolders:
|
|
||||||
images.append(image)
|
|
||||||
new_subfolders.append(image_name)
|
|
||||||
|
|
||||||
|
os.mkdir(simulations_folder)
|
||||||
|
|
||||||
for image, subfolder in zip(images,new_subfolders):
|
# timestamp for simulation starting
|
||||||
|
t1_image = time.time()
|
||||||
|
|
||||||
|
for image in images:
|
||||||
# Create new directory for simulations
|
|
||||||
subfolder_path = foldername+'/'+subfolder
|
|
||||||
os.mkdir(subfolder_path)
|
|
||||||
|
|
||||||
# timestamp for simulation starting
|
|
||||||
t1_image = time.time()
|
|
||||||
|
|
||||||
# Read image
|
# Read image
|
||||||
image_path = foldername+'/'+image
|
image_path = foldername+'/'+image
|
||||||
imagename = image.split('.')[0]
|
|
||||||
I = imread(image_path)
|
I = imread(image_path)
|
||||||
|
|
||||||
# TMDS coding and bit serialization
|
# TMDS coding and bit serialization
|
||||||
I_Tx, resolution = image_transmition_simulation(I)
|
I_Tx, resolution = image_transmition_simulation(I)
|
||||||
v_res, h_res, _ = resolution
|
v_res, h_res, _ = resolution
|
||||||
|
|
||||||
# Possible std dev noise simulation values
|
# Choose random pixelrate harmonic number
|
||||||
noise_stds = np.array([ 0, 5, 10, 15, 20, 25, 40, 50])
|
N_harmonic = np.random.randint(1,10)
|
||||||
|
|
||||||
|
I_capture = image_capture_simulation(I_Tx, h_res, v_res, N_harmonic)
|
||||||
|
|
||||||
# Make five simulations for each image
|
path = simulations_folder+image
|
||||||
for i in range(5):
|
|
||||||
|
|
||||||
# Choose random pixelrate harmonic number
|
save_simulation_image(I_capture,path)
|
||||||
N_harmonic = np.random.randint(1,10)
|
|
||||||
|
|
||||||
# Choose random SNR value (SNR=0 for no noise)
|
# timestamp for simulation ending
|
||||||
noise_std = np.random.choice(noise_stds)
|
t2_image = time.time()
|
||||||
|
|
||||||
I_capture = image_capture_simulation(I_Tx, h_res, v_res, N_harmonic, noise_std)
|
t_images = t2_images-t1_images
|
||||||
|
|
||||||
path = subfolder_path+'/'+imagename+'_'+str(N_harmonic)+'harm_'+str(noise_std)+"std.png"
|
print('\nTiempo total de las '+str(len(images))+' simulaciones:','{:.2f}'.format(t_images)+'s\n')
|
||||||
|
|
||||||
save_simulation_image(I_capture,path)
|
|
||||||
|
|
||||||
if i==0:
|
|
||||||
# timestamp for simulation ending
|
|
||||||
t2_image = time.time()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
t_image = t2_image-t1_image
|
|
||||||
|
|
||||||
print('Tiempo de la primer simulación de '+image+':','{:.2f}'.format(t_image)+'s\n')
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
|
@ -1,7 +1,6 @@
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from matplotlib import pyplot as plt
|
|
||||||
from scipy import signal
|
from scipy import signal
|
||||||
from scipy.fft import fft, ifft, fftshift
|
from numba import jit, uint8, int8, prange
|
||||||
|
|
||||||
def autocorr(x):
|
def autocorr(x):
|
||||||
"""Compute autocorrelation function of 1-D array
|
"""Compute autocorrelation function of 1-D array
|
||||||
|
@ -50,6 +49,108 @@ def binarray_to_uint(binarray):
|
||||||
|
|
||||||
return num
|
return num
|
||||||
|
|
||||||
|
def TMDS_pixel_rare (pix):
|
||||||
|
"""8bit pixel TMDS coding
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
- pix: 8-bit pixel
|
||||||
|
- cnt: 0's and 1's balance. Default in 0 (balanced)
|
||||||
|
|
||||||
|
Outputs:
|
||||||
|
- pix_out: TDMS coded 16-bit pixel (only 10 useful)
|
||||||
|
- cnt: 0's and 1's balance updated with new pixel coding
|
||||||
|
|
||||||
|
"""
|
||||||
|
# Convert 8-bit pixel to binary list D
|
||||||
|
D = uint8_to_binarray(pix)
|
||||||
|
|
||||||
|
# Initialize output q
|
||||||
|
qm = [D[0]]
|
||||||
|
|
||||||
|
# 1's unbalanced condition at current pixel
|
||||||
|
N1_D = np.sum(D)
|
||||||
|
|
||||||
|
if N1_D>4 or (N1_D==4 and not(D[0])):
|
||||||
|
|
||||||
|
# XNOR of consecutive bits
|
||||||
|
for k in range(1,8):
|
||||||
|
qm.append( not(qm[k-1] ^ D[k]) )
|
||||||
|
qm.append(0)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# XOR of consecutive bits
|
||||||
|
for k in range(1,8):
|
||||||
|
qm.append( qm[k-1] ^ D[k] )
|
||||||
|
qm.append(1)
|
||||||
|
|
||||||
|
qm.append(np.random.choice([0,1]))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Return the TMDS coded pixel as uint and 0's y 1's balance
|
||||||
|
return binarray_to_uint(qm)
|
||||||
|
|
||||||
|
@jit(nopython=True)
|
||||||
|
def TMDS_pixel_numba(pix:uint8, cnt:int8)->tuple:
|
||||||
|
|
||||||
|
D = np.zeros(8, dtype=np.uint8)
|
||||||
|
for i in range(8):
|
||||||
|
D[i] = (pix >> i) & 1
|
||||||
|
|
||||||
|
qm = np.zeros(9, dtype=np.uint8)
|
||||||
|
qm[0] = D[0]
|
||||||
|
|
||||||
|
N1_D = np.sum(D)
|
||||||
|
|
||||||
|
if N1_D > 4 or (N1_D == 4 and not D[0]):
|
||||||
|
for k in range(1, 8):
|
||||||
|
qm[k] = not (qm[k-1] ^ D[k])
|
||||||
|
qm[8] = 0
|
||||||
|
else:
|
||||||
|
for k in range(1, 8):
|
||||||
|
qm[k] = qm[k-1] ^ D[k]
|
||||||
|
qm[8] = 1
|
||||||
|
|
||||||
|
qout = np.zeros(10, dtype=np.uint8)
|
||||||
|
N1_qm = np.sum(qm[:8])
|
||||||
|
N0_qm = 8 - N1_qm
|
||||||
|
|
||||||
|
if cnt == 0 or N1_qm == 4:
|
||||||
|
|
||||||
|
qout[9] = not(qm[8])
|
||||||
|
qout[8] = qm[8]
|
||||||
|
if qm[8]:
|
||||||
|
qout[:8] = qm[:8]
|
||||||
|
else:
|
||||||
|
qout[:8] = np.logical_not(qm[:8])
|
||||||
|
|
||||||
|
if not qm[8]:
|
||||||
|
cnt += N0_qm - N1_qm
|
||||||
|
else:
|
||||||
|
cnt += N1_qm - N0_qm
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
if (cnt > 0 and N1_qm > 4) or (cnt < 0 and N1_qm < 4):
|
||||||
|
qout[9] = 1
|
||||||
|
qout[8] = qm[8]
|
||||||
|
qout[:8] = np.logical_not(qm[:8])
|
||||||
|
cnt += 2*qm[8] + N0_qm - N1_qm
|
||||||
|
else:
|
||||||
|
qout[9] = 0
|
||||||
|
qout[8] = qm[8]
|
||||||
|
qout[:8] = qm[:8]
|
||||||
|
cnt += -2*(not(qm[8])) + N1_qm - N0_qm
|
||||||
|
|
||||||
|
# Convert binary array to unsigned int
|
||||||
|
pix_tmds = 0
|
||||||
|
for bit in qout[::-1]:
|
||||||
|
pix_tmds = (pix_tmds << 1) | bit
|
||||||
|
|
||||||
|
# Return the TMDS coded pixel as uint and 0's y 1's balance
|
||||||
|
return pix_tmds, cnt
|
||||||
|
|
||||||
|
|
||||||
def TMDS_pixel (pix,cnt=0):
|
def TMDS_pixel (pix,cnt=0):
|
||||||
"""8bit pixel TMDS coding
|
"""8bit pixel TMDS coding
|
||||||
|
|
||||||
|
@ -118,6 +219,7 @@ def TMDS_pixel (pix,cnt=0):
|
||||||
# Return the TMDS coded pixel as uint and 0's y 1's balance
|
# Return the TMDS coded pixel as uint and 0's y 1's balance
|
||||||
return binarray_to_uint(qout), cnt
|
return binarray_to_uint(qout), cnt
|
||||||
|
|
||||||
|
@jit(parallel=True)
|
||||||
def TMDS_encoding_original (I, blanking = False):
|
def TMDS_encoding_original (I, blanking = False):
|
||||||
"""TMDS image coding
|
"""TMDS image coding
|
||||||
|
|
||||||
|
@ -132,9 +234,10 @@ def TMDS_encoding_original (I, blanking = False):
|
||||||
|
|
||||||
# Create "ghost dimension" if I is gray-scale image (not RGB)
|
# Create "ghost dimension" if I is gray-scale image (not RGB)
|
||||||
if len(I.shape)!= 3:
|
if len(I.shape)!= 3:
|
||||||
I = np.repeat(I[:, :, np.newaxis], 3, axis=2).astype('uint8')
|
I = I[:, :, np.newaxis]
|
||||||
|
chs = 1
|
||||||
chs = 3
|
else:
|
||||||
|
chs = 3
|
||||||
|
|
||||||
# Get image resolution
|
# Get image resolution
|
||||||
v_in, h_in = I.shape[:2]
|
v_in, h_in = I.shape[:2]
|
||||||
|
@ -158,13 +261,13 @@ def TMDS_encoding_original (I, blanking = False):
|
||||||
I_c = np.zeros((v_in,h_in,chs)).astype('uint16')
|
I_c = np.zeros((v_in,h_in,chs)).astype('uint16')
|
||||||
|
|
||||||
# Iterate over channels and pixels
|
# Iterate over channels and pixels
|
||||||
for c in range(chs):
|
for c in prange(chs):
|
||||||
for i in range(v_in):
|
for i in range(v_in):
|
||||||
cnt=[0,0,0]
|
cnt=[0,0,0]
|
||||||
for j in range(h_in):
|
for j in range(h_in):
|
||||||
# Get pixel and code it TMDS between blanking
|
# Get pixel and code it TMDS between blanking
|
||||||
pix = I[i,j,c]
|
pix = I[i,j,c]
|
||||||
I_c[i + v_diff//2 , j + h_diff//2, c], cnt[c] = TMDS_pixel (pix,cnt[c])
|
I_c[i + v_diff//2 , j + h_diff//2, c], cnt[c] = TMDS_pixel_numba (pix,cnt[c])
|
||||||
|
|
||||||
return I_c
|
return I_c
|
||||||
|
|
||||||
|
@ -242,6 +345,7 @@ def TMDS_pixel_cntdiff (pix,cnt=0):
|
||||||
byte_range = np.arange(256)
|
byte_range = np.arange(256)
|
||||||
# Initialize pixel coding and cnt-difference arrays
|
# Initialize pixel coding and cnt-difference arrays
|
||||||
TMDS_pix_table = np.zeros((256,3),dtype='uint16')
|
TMDS_pix_table = np.zeros((256,3),dtype='uint16')
|
||||||
|
TMDS_rare_pix_table = np.zeros((256),dtype='uint16')
|
||||||
TMDS_cntdiff_table = np.zeros((256,3),dtype='int8')
|
TMDS_cntdiff_table = np.zeros((256,3),dtype='int8')
|
||||||
|
|
||||||
for byte in byte_range:
|
for byte in byte_range:
|
||||||
|
@ -253,6 +357,8 @@ for byte in byte_range:
|
||||||
TMDS_cntdiff_table[byte,1] = p_null[1]
|
TMDS_cntdiff_table[byte,1] = p_null[1]
|
||||||
TMDS_cntdiff_table[byte,2] = p1[1]
|
TMDS_cntdiff_table[byte,2] = p1[1]
|
||||||
|
|
||||||
|
TMDS_rare_pix_table[byte] = TMDS_pixel_rare(byte)
|
||||||
|
|
||||||
def pixel_fastencoding(pix,cnt_prev=0):
|
def pixel_fastencoding(pix,cnt_prev=0):
|
||||||
"""8bit pixel TMDS fast coding
|
"""8bit pixel TMDS fast coding
|
||||||
|
|
||||||
|
@ -314,11 +420,9 @@ def TMDS_encoding (I, blanking = False):
|
||||||
|
|
||||||
# Create "ghost dimension" if I is gray-scale image (not RGB)
|
# Create "ghost dimension" if I is gray-scale image (not RGB)
|
||||||
if len(I.shape)!= 3:
|
if len(I.shape)!= 3:
|
||||||
# Gray-scale image
|
I = I[:, :, np.newaxis]
|
||||||
I = np.repeat(I[:, :, np.newaxis], 3, axis=2).astype('uint8')
|
|
||||||
chs = 1
|
chs = 1
|
||||||
else:
|
else:
|
||||||
# RGB image
|
|
||||||
chs = 3
|
chs = 3
|
||||||
|
|
||||||
# Get image resolution
|
# Get image resolution
|
||||||
|
@ -343,7 +447,8 @@ def TMDS_encoding (I, blanking = False):
|
||||||
# Assuming the blanking corresponds to 10bit number
|
# Assuming the blanking corresponds to 10bit number
|
||||||
# [0, 0, 1, 0, 1, 0, 1, 0, 1, 1] (LSB first) for channels R and G
|
# [0, 0, 1, 0, 1, 0, 1, 0, 1, 1] (LSB first) for channels R and G
|
||||||
I_c = 852*np.ones((v,h,chs)).astype('uint16')
|
I_c = 852*np.ones((v,h,chs)).astype('uint16')
|
||||||
I_c[:,:,2] = TMDS_blanking(h_total=h, v_total=v, h_active=h_in, v_active=v_in,
|
if chs==3:
|
||||||
|
I_c[:,:,2] = TMDS_blanking(h_total=h, v_total=v, h_active=h_in, v_active=v_in,
|
||||||
h_front_porch=h_front_porch, v_front_porch=v_front_porch, h_back_porch=h_back_porch, v_back_porch=v_back_porch)
|
h_front_porch=h_front_porch, v_front_porch=v_front_porch, h_back_porch=h_back_porch, v_back_porch=v_back_porch)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -362,6 +467,61 @@ def TMDS_encoding (I, blanking = False):
|
||||||
|
|
||||||
return I_c
|
return I_c
|
||||||
|
|
||||||
|
def TMDS_encoding_rare (I, blanking = False):
|
||||||
|
"""TMDS image coding
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
- I: 2-D image array
|
||||||
|
- blanking: Boolean that specifies if horizontal and vertical blanking is applied
|
||||||
|
|
||||||
|
Output:
|
||||||
|
- I_c: TDMS coded 16-bit image (only 10 useful)
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create "ghost dimension" if I is gray-scale image (not RGB)
|
||||||
|
if len(I.shape)!= 3:
|
||||||
|
I = I[:, :, np.newaxis]
|
||||||
|
chs = 1
|
||||||
|
else:
|
||||||
|
chs = 3
|
||||||
|
|
||||||
|
# Get image resolution
|
||||||
|
v_in, h_in = I.shape[:2]
|
||||||
|
|
||||||
|
# Get image resolution
|
||||||
|
v_in, h_in = I.shape[:2]
|
||||||
|
|
||||||
|
if blanking:
|
||||||
|
# Get blanking resolution for input image
|
||||||
|
|
||||||
|
v = (v_in==1080)*1125 + (v_in==720)*750 + (v_in==600)*628 + (v_in==480)*525
|
||||||
|
h = (h_in==1920)*2200 + (h_in==1280)*1650 + (h_in==800)*1056 + (h_in==640)*800
|
||||||
|
|
||||||
|
vdiff = v - v_in
|
||||||
|
hdiff = h - h_in
|
||||||
|
|
||||||
|
# Create image with blanking and change type to uint16
|
||||||
|
# Assuming the blanking corresponds to 10bit number [0, 0, 1, 0, 1, 0, 1, 0, 1, 1] (LSB first)
|
||||||
|
|
||||||
|
I_c = 852*np.ones((v,h,chs)).astype('uint16')
|
||||||
|
|
||||||
|
else:
|
||||||
|
v_diff = 0
|
||||||
|
h_diff = 0
|
||||||
|
I_c = I.copy()
|
||||||
|
I_c = I_c.astype('uint16')
|
||||||
|
|
||||||
|
# Iterate over channels and pixels
|
||||||
|
for c in range(chs):
|
||||||
|
for i in range(v_in):
|
||||||
|
for j in range(h_in):
|
||||||
|
# Get pixel and code it TMDS between blanking
|
||||||
|
pix = I[i,j,c]
|
||||||
|
I_c[i + v_diff//2 , j + h_diff//2, c] = TMDS_rare_pix_table[pix]
|
||||||
|
|
||||||
|
return I_c
|
||||||
|
|
||||||
def DecTMDS_pixel (pix):
|
def DecTMDS_pixel (pix):
|
||||||
"""10-bit pixel TMDS decoding
|
"""10-bit pixel TMDS decoding
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue