WIP: added simulation scripts (GNURadio3.8) and modifying drunet training pre-processing

This commit is contained in:
Emilio Martínez 2023-03-20 23:00:33 -03:00
parent b81bfba5bb
commit 1629330448
3 changed files with 924 additions and 6 deletions

View File

@ -49,21 +49,33 @@ class DatasetFFDNet(data.Dataset):
# ---------------------------------
# randomly crop the patch
# ---------------------------------
rnd_h = random.randint(0, max(0, H - self.patch_size))
rnd_w = random.randint(0, max(0, W - self.patch_size))
patch_H = img_H[rnd_h:rnd_h + self.patch_size, rnd_w:rnd_w + self.patch_size, :]
# # Comment because using one only channel to train as ground-truth
# patch_H = img_H[rnd_h:rnd_h + self.patch_size, rnd_w:rnd_w + self.patch_size, :]
# Ground-truth as channels mean
patch_H = np.mean(img_H[rnd_h:rnd_h + self.patch_size, rnd_w:rnd_w + self.patch_size, :],axis=2)
# Tempest attack simulation
N_harmonic = random.randint(1, 9)
img_L = image_simulation(H_path, N_harmonic, noise_std=0, blanking=False)
patch_L = img_L[rnd_h:rnd_h + self.patch_size, rnd_w:rnd_w + self.patch_size, :]
# # Commented augmentation with rotating because of TMDS encoding
# ---------------------------------
# augmentation - flip, rotate
# ---------------------------------
mode = random.randint(0, 7)
patch_H = util.augment_img(patch_H, mode=mode)
# mode = random.randint(0, 7)
# patch_L = util.augment_img(patch_H, mode=mode)
# ---------------------------------
# HWC to CHW, numpy(uint) to tensor
# ---------------------------------
img_H = util.uint2tensor3(patch_H)
img_L = img_H.clone()
img_L = util.uint2tensor3(patch_L)
# ---------------------------------
# get noise level

465
KAIR/utils/DTutils.py Normal file
View File

@ -0,0 +1,465 @@
import numpy as np
from matplotlib import pyplot as plt
from scipy import signal
from scipy.fft import fft, ifft, fftshift
def autocorr(x):
"""Compute autocorrelation function of 1-D array
Input:
x: 1-D array
Output:
autocorr: autocorrelation function of x
"""
# Use FFT method, which has more computing efectiveness for 1-D numpy arrays
autocorr = signal.correlate(x,x,mode='full', method= 'fft')
# Fix some shifts due FFT
half_idx =int(autocorr.size/2)
max_ind = np.argmax(autocorr[half_idx:])+half_idx
autocorr = autocorr[max_ind:]
# Normalise output
return autocorr/autocorr[0]
def uint8_to_binarray(integer):
"""Convert integer into fixed-length 8-bit binary array. LSB in [0].
Extended and modified code from https://github.com/projf/display_controller/blob/master/model/tmds.py
"""
b_array = [int(i) for i in reversed(bin(integer)[2:])]
b_array += [0]*(8-len(b_array))
return b_array
def uint16_to_binarray(integer):
"""Convert integer into fixed-length 16-bit binary array. LSB in [0].
Extended and modified code from https://github.com/projf/display_controller/blob/master/model/tmds.py
"""
b_array = [int(i) for i in reversed(bin(integer)[2:])]
b_array += [0]*(16-len(b_array))
return b_array
def binarray_to_uint(binarray):
array = binarray[::-1]
num = array[0]
for n in range(1,len(binarray)):
num = (num << 1) + array[n]
return num
def TMDS_pixel (pix,cnt=0):
"""8bit pixel TMDS coding
Inputs:
- pix: 8-bit pixel
- cnt: 0's and 1's balance. Default in 0 (balanced)
Outputs:
- pix_out: TDMS coded 16-bit pixel (only 10 useful)
- cnt: 0's and 1's balance updated with new pixel coding
"""
# Convert 8-bit pixel to binary list D
D = uint8_to_binarray(pix)
# Initialize output q
qm = [D[0]]
# 1's unbalanced condition at current pixel
N1_D = np.sum(D)
if N1_D>4 or (N1_D==4 and not(D[0])):
# XNOR of consecutive bits
for k in range(1,8):
qm.append( not(qm[k-1] ^ D[k]) )
qm.append(0)
else:
# XOR of consecutive bits
for k in range(1,8):
qm.append( qm[k-1] ^ D[k] )
qm.append(1)
# Initialize output qout
qout = qm.copy()
# Unbalanced condition with previous and current pixels
N1_qm = np.sum(qm[:8])
N0_qm = 8 - N1_qm
if cnt==0 or N1_qm==4:
qout.append(not(qm[8]))
qout[8] = qm[8]
qout[:8]=qm[:8] if qm[8] else np.logical_not(qm[:8])
if not(qm[8]):
cnt += N0_qm - N1_qm
else:
cnt += N1_qm - N0_qm
else:
if (cnt>0 and N1_qm>4) or (cnt<0 and N1_qm<4):
qout.append(1)
qout[8] = qm[8]
qout[:8] = np.logical_not(qm[:8])
cnt += 2*qm[8] + N0_qm - N1_qm
else:
qout.append(0)
qout[8] = qm[8]
qout[:8] = qm[:8]
cnt += -2*(not(qm[8])) + N1_qm - N0_qm
# Return the TMDS coded pixel as uint and 0's y 1's balance
return binarray_to_uint(qout), cnt
def TMDS_encoding_original (I, blanking = False):
"""TMDS image coding
Inputs:
- I: 2-D image array
- blanking: Boolean that specifies if horizontal and vertical blanking is applied
Output:
- I_c: TDMS coded 16-bit image (only 10 useful)
"""
# Create "ghost dimension" if I is gray-scale image (not RGB)
if len(I.shape)!= 3:
I = np.repeat(I[:, :, np.newaxis], 3, axis=2).astype('uint8')
chs = 3
# Get image resolution
v_in, h_in = I.shape[:2]
if blanking:
# Get blanking resolution for input image
v = (v_in==1080)*1125 + (v_in==720)*750 + (v_in==600)*628 + (v_in==480)*525
h = (h_in==1920)*2200 + (h_in==1280)*1650 + (h_in==800)*1056 + (h_in==640)*800
vdiff = v - v_in
hdiff = h - h_in
# Create image with blanking and change type to uint16
# Assuming the blanking corresponds to 10bit number [0, 0, 1, 0, 1, 0, 1, 0, 1, 1] (LSB first)
I_c = 852*np.ones((v,h,chs)).astype('uint16')
else:
v_diff = 0
h_diff = 0
I_c = np.zeros((v_in,h_in,chs)).astype('uint16')
# Iterate over channels and pixels
for c in range(chs):
for i in range(v_in):
cnt=[0,0,0]
for j in range(h_in):
# Get pixel and code it TMDS between blanking
pix = I[i,j,c]
I_c[i + v_diff//2 , j + h_diff//2, c], cnt[c] = TMDS_pixel (pix,cnt[c])
return I_c
def TMDS_pixel_cntdiff (pix,cnt=0):
"""8bit pixel TMDS coding
Inputs:
- pix: 8-bit pixel
- cnt: 0's and 1's balance. Default in 0 (balanced)
Outputs:
- pix_out: TDMS coded 16-bit pixel (only 10 useful)
- cntdiff: balance difference given by the actual coded pixel
"""
# Convert 8-bit pixel to binary list D
D = uint8_to_binarray(pix)
# Initialize output q
qm = [D[0]]
# 1's unbalanced condition at current pixelo
N1_D = np.sum(D)
if N1_D>4 or (N1_D==4 and not(D[0])):
# XNOR of consecutive bits
for k in range(1,8):
qm.append( not(qm[k-1] ^ D[k]) )
qm.append(0)
else:
# XOR of consecutive bits
for k in range(1,8):
qm.append( qm[k-1] ^ D[k] )
qm.append(1)
# Initialize output qout
qout = qm.copy()
# Unbalanced condition with previous and current pixels
N1_qm = np.sum(qm[:8])
N0_qm = 8 - N1_qm
if cnt==0 or N1_qm==4:
qout.append(not(qm[8]))
qout[8]=qm[8]
qout[:8]=qm[:8] if qm[8] else [not(val) for val in qm[:8]]
if not(qm[8]):
cnt_diff = N0_qm - N1_qm
else:
cnt_diff = N1_qm - N0_qm
else:
if (cnt>0 and N1_qm>4) or (cnt<0 and N1_qm<4):
qout.append(1)
qout[8]=qm[8]
qout[:8] = [not(val) for val in qm[:8]]
cnt_diff = 2*qm[8] +N0_qm -N1_qm
else:
qout.append(0)
qout[8]=qm[8]
qout[:8] = qm[:8]
cnt_diff = -2*(not(qm[8])) + N1_qm - N0_qm
# Return the TMDS coded pixel as uint and 0's y 1's balance difference
uint_out = binarray_to_uint(qout)
return uint_out, cnt_diff
### Create TMDS LookUp Tables for fast encoding (3 times faster than the other implementation)
byte_range = np.arange(256)
# Initialize pixel coding and cnt-difference arrays
TMDS_pix_table = np.zeros((256,3),dtype='uint16')
TMDS_cntdiff_table = np.zeros((256,3),dtype='int8')
for byte in byte_range:
p0,p_null, p1 = TMDS_pixel_cntdiff(byte,-1),TMDS_pixel_cntdiff(byte,0),TMDS_pixel_cntdiff(byte,1) # 0's and 1's unbalance respect.
TMDS_pix_table[byte,0] = p0[0]
TMDS_pix_table[byte,1] = p_null[0]
TMDS_pix_table[byte,2] = p1[0]
TMDS_cntdiff_table[byte,0] = p0[1]
TMDS_cntdiff_table[byte,1] = p_null[1]
TMDS_cntdiff_table[byte,2] = p1[1]
def pixel_fastencoding(pix,cnt_prev=0):
"""8bit pixel TMDS fast coding
Inputs:
- pix: 8-bit pixel
- cnt: 0's and 1's balance. Default in 0 (balanced)
Outputs:
- pix_out: TDMS coded 16-bit pixel (only 10 useful)
- cnt: 0's and 1's balance updated with new pixel coding
"""
balance_idx = int(np.sign(cnt_prev))+1
pix_out = TMDS_pix_table[pix,balance_idx]
cnt = cnt_prev + TMDS_cntdiff_table[pix,balance_idx]
return pix_out, cnt
def TMDS_blanking (h_total, v_total, h_active, v_active, h_front_porch, v_front_porch, h_back_porch, v_back_porch):
# Initialize blanking image
img_blank = np.zeros((v_total,h_total))
# Get the total blanking on vertical an horizontal axis
h_blank = h_total - h_active
v_blank = v_total - v_active
# (C1,C0)=(0,0) region
img_blank[:v_front_porch,:h_front_porch] = 0b1101010100
img_blank[:v_front_porch,h_blank-h_back_porch:] = 0b1101010100
img_blank[v_blank-v_back_porch:v_blank,:h_front_porch] = 0b1101010100
img_blank[v_blank-v_back_porch:v_blank,h_blank-h_back_porch:] = 0b1101010100
img_blank[v_blank:,:h_blank] = 0b1101010100
# (C1,C0)=(0,1) region
img_blank[:v_front_porch,h_front_porch:h_blank-h_back_porch] = 0b0010101011
img_blank[v_blank-v_back_porch:,h_front_porch:h_blank-h_back_porch] = 0b0010101011
# (C1,C0)=(1,0) region
img_blank[v_front_porch:v_blank-v_back_porch,:h_front_porch] = 0b0101010100
img_blank[v_front_porch:v_blank-v_back_porch,h_blank-h_back_porch:] = 0b0101010100
# (C1,C0)=(1,1) region
img_blank[v_front_porch:v_blank-v_back_porch,v_front_porch:h_blank-h_back_porch] = 0b1010101011
return(img_blank)
def TMDS_encoding (I, blanking = False):
"""TMDS image coding
Inputs:
- I: 2D/3D image array (v_size, h_size, channels)
- blanking: Boolean that specifies if horizontal and vertical blanking is applied or not
Output:
- I_c: 3D TDMS coded 16-bit (only 10 useful) image array
"""
# Create "ghost dimension" if I is gray-scale image (not RGB)
if len(I.shape)!= 3:
# Gray-scale image
I = np.repeat(I[:, :, np.newaxis], 3, axis=2).astype('uint8')
chs = 1
else:
# RGB image
chs = 3
# Get image resolution
v_in, h_in = I.shape[:2]
if blanking:
# Get blanking resolution for input image
v = (v_in==1080)*1125 + (v_in==900)*1000 + (v_in==720)*750 + (v_in==600)*628 + (v_in==480)*525
h = (h_in==1920)*2200 + (h_in==1600)*1800 + (h_in==1280)*1650 + (h_in==800)*1056 + (h_in==640)*800
v_diff = v - v_in
h_diff = h - h_in
v_front_porch = (v_in==1080)*4 + (v_in==900)*1 + (v_in==720)*5 + (v_in==600)*1 + (v_in==480)*2
v_back_porch = (v_in==1080)*36 + (v_in==900)*96 + (v_in==720)*20 + (v_in==600)*23 + (v_in==480)*25
h_front_porch = (h_in==1920)*88 + (h_in==1600)*24 + (h_in==1280)*110 + (h_in==800)*40 + (h_in==640)*8
h_back_porch = (h_in==1920)*148 + (h_in==1600)*96 + (h_in==1280)*220 + (h_in==800)*88 + (h_in==640)*40
# Create image with blanking and change type to uint16
# Assuming the blanking corresponds to 10bit number
# [0, 0, 1, 0, 1, 0, 1, 0, 1, 1] (LSB first) for channels R and G
I_c = 852*np.ones((v,h,chs)).astype('uint16')
I_c[:,:,2] = TMDS_blanking(h_total=h, v_total=v, h_active=h_in, v_active=v_in,
h_front_porch=h_front_porch, v_front_porch=v_front_porch, h_back_porch=h_back_porch, v_back_porch=v_back_porch)
else:
v_diff = 0
h_diff = 0
I_c = np.zeros((v_in,h_in,chs)).astype('uint16')
# Iterate over channels and pixels
for c in range(chs):
for i in range(v_in):
cnt = [0,0,0]
for j in range(h_in):
# Get pixel and code it TMDS between blanking
pix = I[i,j,c]
I_c[i + v_diff, j + h_diff, c], cnt[c] = pixel_fastencoding (pix,cnt[c])
return I_c
def DecTMDS_pixel (pix):
"""10-bit pixel TMDS decoding
Inputs:
- pix: 16-bit pixel (only 10 first bits useful)
Output:
- pix_out: 8-bit TMDS decoded pixel
"""
D = uint16_to_binarray(pix)[:10]
if D[9]:
D[:8] = np.logical_not(D[:8])
Q = D.copy()[:8]
if D[8]:
for k in range(1,8):
Q[k] = D[k] ^ D[k-1]
else:
for k in range(1,8):
Q[k] = not(D[k] ^ D[k-1])
# Return pixel as uint
return binarray_to_uint(Q)
def TMDS_decoding (Ic):
"""Image TMDS decoding
Inputs:
- Ic: TMDS coded image
Output:
- Idec: 8-bit decoded image
"""
# Create "ghost dimension" if gray-scale image (not RGB)
if len(Ic.shape)!= 3:
Ic = Ic.reshape(Ic.shape[0],Ic.shape[1],1)
Idec = Ic.copy()
# Get image dimensions
Nx, Ny, Nz = Ic.shape
# Iterate over channels and pixels
for c in np.arange(Nz):
for i in np.arange(Nx):
for j in np.arange(Ny):
# Get pixel and use TMDS decoding
pix = Ic[i,j,c]
Idec[i,j,c] = DecTMDS_pixel (pix)
return Idec
def TMDS_serial(I):
'''
Serialize an image as an 1D binary array given a 10bit pixel value.
Inputs:
- I: TMDS image to serialize. Pixel values must be between 0 and 1023
Output:
- Iserials: 1D binary array per image channel which represents
the voltage value to be transmitted
'''
assert np.min(I)>=0 and np.max(I)<= 1023, "Pixel values must be between 0 and 1023"
# Initialize lists per channel
Iserials = []
n_rows,n_columns, n_channels = I.shape
# Iterate over pixel
for c in range(n_channels):
channel_list = []
for i in range(n_rows):
for j in range(n_columns):
# Get pixel value and cast it as binary string
binstring = bin(I[i,j,c])[2:]
# Fill string with 0's to get length 10
binstring = '0'*(10-len(binstring))+binstring
# Re-order string for LSB first
binstring = binstring[::-1]
binarray = list(binstring)
# Extend the bit stream
channel_list.extend(binarray)
Iserials.append(channel_list)
# Digital to analog value mapping: [0,1]-->[-A,A] (A=1)
Iserials = np.sum(2*np.array(Iserials,dtype='int32') - 1, axis=0)
del(channel_list)
return Iserials

View File

@ -0,0 +1,441 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: Manual Simulated Tempest TMDS Example
# GNU Radio version: 3.8.5.0
import os
import sys
sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio')))
from binary_serializer import binary_serializer # grc-generated hier_block
from gnuradio import analog
from gnuradio import blocks
from gnuradio import filter
from gnuradio import gr
import signal
import tempest
import numpy as np
from PIL import Image
from scipy import signal as sci_signal
import time
# Currently supporting png, jpg, jpeg, tif and gif extentions only
def get_images_names_from_folder (folder):
images_list = [image for image in os.listdir(folder) \
if image.endswith('.png') or image.endswith('.jpg') or image.endswith('.jpeg') or \
image.endswith('.tif') or image.endswith('.tiff') or image.endswith('.gif')]
return images_list
def get_subfolders_names_from_folder(folder):
subfolders_list = [name for name in os.listdir(folder) if os.path.isdir(os.path.join(folder, name))]
return subfolders_list
def save_simulation_image(I,path_and_name):
v_total,h_total = I.shape
I_save = np.zeros((v_total,h_total,3))
I_real = np.real(I)
I_imag = np.imag(I)
realmax = I_real.max()
realmin = I_real.min()
imagmax = I_imag.max()
imagmin = I_imag.min()
# Stretch contrast on every channel
I_save[:,:,0] = 255*(I_real-realmin)/(realmax-realmin)
I_save[:,:,1] = 255*(I_imag-imagmin)/(imagmax-imagmin)
im = Image.fromarray(I_save.astype('uint8'))
im.save(path_and_name)
def signal_capture_downsampling(serial_data,h, v, samp_rate, usrp_rate, noise_std):
# Resolucion y fps, con blanking de la imagen
h_total, v_total = h, v
N_samples = serial_data.shape[0]
# Random uniform phase noise
# phase_noise = np.exp(1j*np.random.uniform(0,2*np.pi, len(serial_data)))
if noise_std > 0:
noise_sigma = noise_std/15.968719423 # sqrt(255)~15.968719423 because of stretching with 255 at saving
serial_data = serial_data + np.random.normal(0, noise_sigma,N_samples) + 1j*np.random.normal(0, noise_sigma,N_samples)
# Muestreo del SDR
image_seq = sci_signal.resample_poly(serial_data,up=usrp_rate, down=samp_rate)
# Muestreo a nivel de píxel
image_Rx = sci_signal.resample(image_seq, h_total*v_total).reshape(v_total,h_total)
return image_Rx
class NO_GUI_tempest_simulated_TMDS(gr.top_block):
def reset_all_blocks_but_image_source(self):
self.data_sink_0 = blocks.vector_sink_c(1)
self.interp_fir_filter_xxx_0 = filter.interp_fir_filter_fcc(self.inter, self.rectangular_pulse)
self.interp_fir_filter_xxx_0.declare_sample_delay(0)
self.blocks_multiply_xx_0 = blocks.multiply_vcc(1)
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_ff(2)
self.blocks_add_xx_0 = blocks.add_vff(1)
self.blocks_add_const_vxx_0 = blocks.add_const_ff(3)
self.binary_serializer_0_0_0 = binary_serializer(
M=10,
N=16,
offset=0,
)
self.binary_serializer_0_0 = binary_serializer(
M=10,
N=16,
offset=0,
)
self.binary_serializer_0 = binary_serializer(
M=10,
N=16,
offset=0,
)
self.analog_sig_source_x_0 = analog.sig_source_c(self.samp_rate, analog.GR_COS_WAVE, self.px_rate*self.harmonic, 1, 0, 0)
##################################################
# Connections
##################################################
self.connect((self.analog_sig_source_x_0, 0), (self.blocks_multiply_xx_0, 1))
self.connect((self.binary_serializer_0, 0), (self.blocks_add_xx_0, 0))
self.connect((self.binary_serializer_0_0, 0), (self.blocks_add_xx_0, 1))
self.connect((self.binary_serializer_0_0_0, 0), (self.blocks_add_xx_0, 2))
self.connect((self.blocks_add_const_vxx_0, 0), (self.interp_fir_filter_xxx_0, 0))
self.connect((self.blocks_add_xx_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_const_vxx_0, 0))
self.connect((self.blocks_multiply_xx_0, 0), (self.data_sink_0, 0))
self.connect((self.interp_fir_filter_xxx_0, 0), (self.blocks_multiply_xx_0, 0))
self.connect((self.tempest_TMDS_image_source_0, 0), (self.binary_serializer_0, 0))
self.connect((self.tempest_TMDS_image_source_0, 1), (self.binary_serializer_0_0, 0))
self.connect((self.tempest_TMDS_image_source_0, 2), (self.binary_serializer_0_0_0, 0))
def __init__(self, Vsize=None, Hsize=None, Vvisible=None, Hvisible=None, FILEPATH=None, blanking=False):
gr.top_block.__init__(self, "Manual Simulated Tempest TMDS Example")
##################################################
# Variables
##################################################
self.Vsize = Vsize # To set at main
self.Hsize = Hsize # To set at main
self.Vvisible = Vvisible # To set at main
self.Hvisible = Hvisible # To set at main
self.FILEPATH = FILEPATH # To set at main
self.blanking = blanking
# Init as 1
self.harmonic = harmonic = 1 # To set at main
self.refresh_rate = refresh_rate = 60
self.px_rate = px_rate = Hsize*Vsize*refresh_rate/1000
self.inter = inter = 1
self.usrp_rate = usrp_rate = int(50e6/1000)
self.samp_rate = samp_rate = int(10*px_rate*inter)
self.rectangular_pulse = rectangular_pulse = [1]*inter
##################################################
# Blocks
##################################################
self.data_sink_0 = blocks.vector_sink_c(1)
self.tempest_TMDS_image_source_0 = tempest.TMDS_image_source(FILEPATH, 3, blanking)
# self.rational_resampler_xxx_0 = filter.rational_resampler_ccc(
# interpolation=inter,
# decimation=10*inter,
# taps=None,
# fractional_bw=0.4)
self.interp_fir_filter_xxx_0 = filter.interp_fir_filter_fcc(inter, rectangular_pulse)
self.interp_fir_filter_xxx_0.declare_sample_delay(0)
self.blocks_multiply_xx_0 = blocks.multiply_vcc(1)
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_ff(2)
self.blocks_add_xx_0 = blocks.add_vff(1)
self.blocks_add_const_vxx_0 = blocks.add_const_ff(3)
self.binary_serializer_0_0_0 = binary_serializer(
M=10,
N=16,
offset=0,
)
self.binary_serializer_0_0 = binary_serializer(
M=10,
N=16,
offset=0,
)
self.binary_serializer_0 = binary_serializer(
M=10,
N=16,
offset=0,
)
self.analog_sig_source_x_0 = analog.sig_source_c(samp_rate, analog.GR_COS_WAVE, px_rate*harmonic, 1, 0, 0)
##################################################
# Connections
##################################################
self.connect((self.analog_sig_source_x_0, 0), (self.blocks_multiply_xx_0, 1))
self.connect((self.binary_serializer_0, 0), (self.blocks_add_xx_0, 0))
self.connect((self.binary_serializer_0_0, 0), (self.blocks_add_xx_0, 1))
self.connect((self.binary_serializer_0_0_0, 0), (self.blocks_add_xx_0, 2))
self.connect((self.blocks_add_const_vxx_0, 0), (self.interp_fir_filter_xxx_0, 0))
self.connect((self.blocks_add_xx_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.blocks_add_const_vxx_0, 0))
self.connect((self.blocks_multiply_xx_0, 0), (self.data_sink_0, 0))
self.connect((self.interp_fir_filter_xxx_0, 0), (self.blocks_multiply_xx_0, 0))
self.connect((self.tempest_TMDS_image_source_0, 0), (self.binary_serializer_0, 0))
self.connect((self.tempest_TMDS_image_source_0, 1), (self.binary_serializer_0_0, 0))
self.connect((self.tempest_TMDS_image_source_0, 2), (self.binary_serializer_0_0_0, 0))
def get_refresh_rate(self):
return self.refresh_rate
def set_refresh_rate(self, refresh_rate):
self.refresh_rate = refresh_rate
self.set_px_rate(self.Hsize*self.Vsize*self.refresh_rate/1000)
def get_Vsize(self):
return self.Vsize
def set_Vsize(self, Vsize):
self.Vsize = Vsize
self.set_px_rate(self.Hsize*self.Vsize*self.refresh_rate/1000)
def get_Hsize(self):
return self.Hsize
def set_Hsize(self, Hsize):
self.Hsize = Hsize
self.set_px_rate(self.Hsize*self.Vsize*self.refresh_rate/1000)
def get_px_rate(self):
return self.px_rate
def set_px_rate(self, px_rate):
self.px_rate = px_rate
self.set_samp_rate(int(10*self.px_rate*self.inter))
self.analog_sig_source_x_0.set_frequency(self.px_rate*self.harmonic)
def get_inter(self):
return self.inter
def set_inter(self, inter):
self.inter = inter
self.set_rectangular_pulse([1]*self.inter)
self.set_samp_rate(int(10*self.px_rate*self.inter))
def get_usrp_rate(self):
return self.usrp_rate
def set_usrp_rate(self, usrp_rate):
self.usrp_rate = usrp_rate
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.analog_sig_source_x_0.set_sampling_freq(self.samp_rate)
def get_rectangular_pulse(self):
return self.rectangular_pulse
def set_rectangular_pulse(self, rectangular_pulse):
self.rectangular_pulse = rectangular_pulse
self.interp_fir_filter_xxx_0.set_taps(self.rectangular_pulse)
def get_harmonic(self):
return self.harmonic
def set_harmonic(self, harmonic):
self.harmonic = harmonic
self.analog_sig_source_x_0.set_frequency(self.px_rate*self.harmonic)
def get_Vvisible(self):
return self.Vvisible
def set_Vvisible(self, Vvisible):
self.Vvisible = Vvisible
def get_Hvisible(self):
return self.Hvisible
def set_Hvisible(self, Hvisible):
self.Hvisible = Hvisible
def set_top_block(top_block_cls=NO_GUI_tempest_simulated_TMDS, options_dict=dict()):
# Init the top block with parameters
tb = top_block_cls(Vsize=options_dict['Vsize'], Hsize=options_dict['Hsize'], Vvisible=options_dict['Vvisible'], Hvisible=options_dict['Hvisible'],
FILEPATH=options_dict['FILEPATH'],blanking=options_dict['blanking'])
return tb
def run_simulation_flowgraph(top_block, harmonic, noise_std):
top_block.set_harmonic(harmonic)
def sig_handler(sig=None, frame=None):
top_block.stop()
top_block.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
# Run flowgraph until end
top_block.start()
top_block.wait()
top_block.stop()
# Get top block's parameters
samp_rate = top_block.get_samp_rate()
usrp_rate = top_block.get_usrp_rate()
h = top_block.get_Hsize()
v = top_block.get_Vsize()
# Get output data from fg's last block
serial_data = np.array(top_block.data_sink_0.data())
# Resample and reshape to image size
I_capture = signal_capture_downsampling(serial_data,h=h, v=v, samp_rate=samp_rate, usrp_rate=usrp_rate, noise_std=noise_std)
# Start over flowgraph and the coded image transmition
top_block.reset_all_blocks_but_image_source()
top_block.tempest_TMDS_image_source_0.line_num = 0
return I_capture
def image_simulation(image_path, N_harmonic, noise_std, blanking=False):
# Set initial options
options_dict = {'Vsize': I.shape[0],
'Hsize': I.shape[1],
'Vvisible': I.shape[0],
'Hvisible': I.shape[1],
'FILEPATH': image_path,
'blanking': blanking,
}
# Initialize the top block with encoded image
top_block = set_top_block(top_block_cls=NO_GUI_tempest_simulated_TMDS, options_dict=options_dict)
# Run simulation flowgraph
I_capture = run_simulation_flowgraph(top_block, N_harmonic, noise_std)
# From complex image to 2 channel image
v_total, h_total = I_capture.shape
I_capture_2channels = np.zeros((v_total,h_total,2))
I_real = np.real(I)
I_imag = np.imag(I)
realmax, realmin = I_real.max(), I_real.min()
imagmax, imagmin = I_imag.max(), I_imag.min()
# Stretch contrast on every channel
I_capture_2channels[:,:,0] = 255*(I_real-realmin)/(realmax-realmin)
I_capture_2channels[:,:,1] = 255*(I_imag-imagmin)/(imagmax-imagmin)
I_capture_2channels = I_capture_2channels.astype('uint8')
return I_capture_2channels
def main():
# Get foldername argument
foldername = sys.argv[-1]
# Get images and subfolders names
images_tmp = get_images_names_from_folder(foldername)
old_subfolders = get_subfolders_names_from_folder(foldername)
# Keep images without dedicated folders only
images = []
new_subfolders = []
for image in images_tmp:
image_name = image.split('.')[0]
if image_name not in old_subfolders:
images.append(image)
new_subfolders.append(image_name)
# Possible std dev noise simulation values
noise_stds = np.array([ 0, 5, 10, 15, 20, 25, 40, 50])
for image, subfolder in zip(images,new_subfolders):
# Create new directory for simulations
subfolder_path = foldername+'/'+subfolder
os.mkdir(subfolder_path)
# timestamp for simulation starting
t1_image = time.time()
# Read image
image_path = foldername+'/'+image
imagename = image.split('.')[0]
I = np.array(Image.open(image_path))
# Set initial options
options_dict = {'Vsize': I.shape[0],
'Hsize': I.shape[1],
'Vvisible': I.shape[0],
'Hvisible': I.shape[1],
'FILEPATH': image_path,
'blanking': False,
}
# Initialize the top block with encoded image
top_block = set_top_block(top_block_cls=NO_GUI_tempest_simulated_TMDS, options_dict=options_dict)
for i in range(4):
# Choose random pixelrate harmonic number
N_harmonic = np.random.randint(1,10)
# Choose random SNR value (SNR=0 for no noise)
noise_std = np.random.choice(noise_stds)
path = subfolder_path+'/'+imagename+'_'+str(N_harmonic)+'harm_'+str(noise_std)+"std.png"
I_capture = run_simulation_flowgraph(top_block, N_harmonic, noise_std)
save_simulation_image(I_capture,path)
if i==0:
# timestamp for simulation ending
t2_image = time.time()
t_image = t2_image-t1_image
print('Tiempo de la primer simulación de '+image+':','{:.2f}'.format(t_image)+'s\n')
if __name__ == '__main__':
main()