Merge branch 'diff-signaling' into main

This commit is contained in:
Emilio Martinez 2024-01-23 14:27:32 -03:00
commit 52717b28db
8 changed files with 331 additions and 30 deletions

1
.gitignore vendored
View File

@ -10,4 +10,5 @@ gr-tempest/examples/simulations/*.py
scripts/__pycache__ scripts/__pycache__
scripts/*.jpg scripts/*.jpg
scripts/*.png scripts/*.png
*.log

View File

@ -41,7 +41,7 @@ class DatasetDrunetFineTune(data.Dataset):
contains one or more L representations of the H image. contains one or more L representations of the H image.
""" """
assert os.path.isdir(opt['dataroot_H']), "No es dir" assert os.path.isdir(opt['dataroot_H']), f"{opt['dataroot_H']} is not a directory"
self.paths_H = [f for f in os.listdir(opt['dataroot_H']) if os.path.isfile(os.path.join(opt['dataroot_H'],f))] self.paths_H = [f for f in os.listdir(opt['dataroot_H']) if os.path.isfile(os.path.join(opt['dataroot_H'],f))]
#------------------------------------------------------------------------------------------------------ #------------------------------------------------------------------------------------------------------
# For the above step you can use util.get_image_paths(), but it goes recursevely throught the tree dirs # For the above step you can use util.get_image_paths(), but it goes recursevely throught the tree dirs

View File

@ -46,21 +46,30 @@ def image_transmition_simulation(I, blanking=False):
return I_TMDS_Tx, I_TMDS.shape return I_TMDS_Tx, I_TMDS.shape
def image_capture_simulation(I_Tx, h_total, v_total, N_harmonic, sdr_rate = 50e6, def image_capture_simulation(I_Tx, h_total, v_total, N_harmonic, sdr_rate = 50e6,
noise_std=0, fps=60, freq_error=0, phase_error=0): noise_std=0, fps=60, freq_error=0, phase_error=0,
interpolator=None, diff_signaling=False):
# Compute pixelrate and bitrate # Compute pixelrate and bitrate
px_rate = h_total*v_total*fps px_rate = h_total*v_total*fps
bit_rate = 10*px_rate bit_rate = 10*px_rate
# Continuous samples (interpolate) # Continuous samples (interpolate)
interpolator = int(np.ceil(N_harmonic/5)) # Condition for sampling rate and if interpolator:
sample_rate = interpolator*bit_rate sample_rate = interpolator*bit_rate
Nsamples = interpolator*(10*h_total*v_total) else:
interpolator = int(np.ceil(N_harmonic/5)) # Condition for sampling rate
if interpolator > 1: if interpolator > 1:
I_Tx_continuous = np.repeat(I_Tx,interpolator) I_Tx_continuous = np.repeat(I_Tx,interpolator)
else: else:
I_Tx_continuous = I_Tx I_Tx_continuous = I_Tx
# Differential signaling
if (diff_signaling) and (interpolator != 1):
I_Tx_continuous = np.diff(I_Tx_continuous)
Nsamples = len(I_Tx_continuous)
# Add Gaussian noise # Add Gaussian noise
if noise_std > 0: if noise_std > 0:
noise_sigma = noise_std/15.968719423 # sqrt(255)~15.968719423 noise_sigma = noise_std/15.968719423 # sqrt(255)~15.968719423
@ -142,6 +151,8 @@ def main(simulation_options_path = 'options/tempest_simulation.json'):
blanking = options['options']['blanking'] blanking = options['options']['blanking']
fps = options['options']['frames_per_second'] fps = options['options']['frames_per_second']
sdr_rate = options['options']['sdr_rate'] sdr_rate = options['options']['sdr_rate']
interpolator = options['options']['interpolator']
differential_signaling = options['options']['differential_signaling']
harmonics = options['options']['random']['harmonics'] harmonics = options['options']['random']['harmonics']
freq_error_range = options['options']['random']['freq_error'] freq_error_range = options['options']['random']['freq_error']
phase_error_range = options['options']['random']['phase_error'] phase_error_range = options['options']['random']['phase_error']
@ -155,11 +166,19 @@ def main(simulation_options_path = 'options/tempest_simulation.json'):
# Get images and subfolders names # Get images and subfolders names
images = get_images_names_from_folder(input_folder) images = get_images_names_from_folder(input_folder)
# Get images names from output folder
output_existing_images = get_images_names_from_folder(output_folder)
# Initialize processing time # Initialize processing time
t_all_images = 0 t_all_images = 0
for image in images: for image in images:
# Check if image already simulated
if image in output_existing_images:
output_existing_images.remove(image)
continue
# timestamp for simulation starting # timestamp for simulation starting
t1_image = time.time() t1_image = time.time()
@ -184,7 +203,8 @@ def main(simulation_options_path = 'options/tempest_simulation.json'):
v_res, h_res, _ = resolution v_res, h_res, _ = resolution
I_capture = image_capture_simulation(I_Tx, h_res, v_res, N_harmonic, sdr_rate, I_capture = image_capture_simulation(I_Tx, h_res, v_res, N_harmonic, sdr_rate,
sigma, fps, freq_error, phase_error) sigma, fps, freq_error, phase_error,
interpolator, differential_signaling)
path = os.path.join(output_folder,image) path = os.path.join(output_folder,image)

View File

@ -18,6 +18,27 @@ from utils.utils_dist import get_dist_info, init_dist
from data.select_dataset import define_Dataset from data.select_dataset import define_Dataset
from models.select_model import define_Model from models.select_model import define_Model
# OCR metrics
# First, must install Tesseract: https://tesseract-ocr.github.io/tessdoc/Installation.html
# Second, install CER/WER and tesseract python wrapper libraries
# pip install fastwer
# pip install pybind11
# pip install pytesseract
import pytesseract
import fastwer
def calculate_cer_wer(img_E, img_H):
# Transcribe ground-truth image to text
text_H = pytesseract.image_to_string(img_H).strip().replace('\n',' ')
# Transcribe estimated image to text
text_E = pytesseract.image_to_string(img_E).strip().replace('\n',' ')
cer = fastwer.score_sent(text_E, text_H, char_level=True)
wer = fastwer.score_sent(text_E, text_H)
return cer, wer
''' '''
# -------------------------------------------- # --------------------------------------------
@ -96,6 +117,7 @@ def main(json_path='options/test_drunet.json'):
# ---------------------------------------- # ----------------------------------------
""" """
L_paths = util.get_image_paths(opt['datasets']['test']['dataroot_L']) L_paths = util.get_image_paths(opt['datasets']['test']['dataroot_L'])
H_paths = util.get_image_paths(opt['datasets']['test']['dataroot_H'])
noise_sigma = opt['datasets']['test']['sigma_test'] noise_sigma = opt['datasets']['test']['sigma_test']
''' '''
@ -103,12 +125,15 @@ def main(json_path='options/test_drunet.json'):
# Step--4 (main test) # Step--4 (main test)
# ---------------------------------------- # ----------------------------------------
''' '''
# avg_psnr = 0.0 avg_psnr = 0.0
# avg_ssim = 0.0 avg_ssim = 0.0
# idx = 0 avg_edgeJaccard = 0.0
avg_cer = 0.0
avg_wer = 0.0
idx = 0
for L_path in L_paths: for L_path, H_path in zip(L_paths,H_paths):
# idx += 1 idx += 1
image_name_ext = os.path.basename(L_path) image_name_ext = os.path.basename(L_path)
img_name, ext = os.path.splitext(image_name_ext) img_name, ext = os.path.splitext(image_name_ext)
@ -118,7 +143,7 @@ def main(json_path='options/test_drunet.json'):
logger.info('Creating inference on test image...') logger.info('Creating inference on test image...')
# Load image # Load image
img_L_original = util.imread_uint(L_path, n_channels=3) img_L_original = util.imread_uint(L_path, n_channels=3)[50:-50,100:-100,:]
img_L = img_L_original[:,:,:2] img_L = img_L_original[:,:,:2]
img_L = util.uint2single(img_L) img_L = util.uint2single(img_L)
img_L = util.single2tensor4(img_L) img_L = util.single2tensor4(img_L)
@ -150,26 +175,36 @@ def main(json_path='options/test_drunet.json'):
logger.info(f'Inference of {img_name} completed. Saved at {img_dir}.') logger.info(f'Inference of {img_name} completed. Saved at {img_dir}.')
# ----------------------- # Load H image and compute metrics
# calculate PSNR img_H = util.imread_uint(H_path, n_channels=3)
# ----------------------- if img_H.ndim == 3:
# current_psnr = util.calculate_psnr(E_img, H_img) img_H = np.mean(img_H, axis=2)
img_H = img_H.astype('uint8')
# ----------------------- # ----------------------------------------
# calculate SSIM # compute PSNR, SSIM, edgeJaccard and CER
# ----------------------- # ----------------------------------------
# current_ssim = util.calculate_ssim(E_img, H_img) current_psnr = util.calculate_psnr(img_E, img_H)
current_ssim = util.calculate_ssim(img_E, img_H)
current_edgeJaccard = util.calculate_edge_jaccard(img_E, img_H)
current_cer, current_wer = calculate_cer_wer(img_E, img_H)
# logger.info('{:->4d}--> {:>10s} | PSNR = {:<4.2f}dB, SSIM = {:<4.2f}'.format(idx, image_name_ext, current_psnr, current_ssim)) logger.info('{:->4d}--> {:>10s} | PSNR = {:<4.2f}dB ; SSIM = {:.3f} ; edgeJaccard = {:.3f} ; CER = {:.3f}% ; WER = {:.3f}%'.format(idx, image_name_ext, current_psnr, current_ssim, current_edgeJaccard, current_cer, current_wer))
# avg_psnr += current_psnr avg_psnr += current_psnr
# avg_ssim += current_ssim avg_ssim += current_ssim
avg_edgeJaccard += current_edgeJaccard
avg_cer += current_cer
avg_wer += current_wer
# avg_psnr = avg_psnr / idx avg_psnr = avg_psnr / idx
# avg_ssim = avg_ssim / idx avg_ssim = avg_ssim / idx
avg_edgeJaccard = avg_edgeJaccard / idx
avg_cer = avg_cer / idx
avg_wer = avg_wer / idx
# testing log # Average log
# logger.info('Average PSNR : {:<.2f}dB, Average SSIM : {:<4.2f}\n'.format(avg_psnr, avg_ssim)) logger.info('[Average metrics] PSNR : {:<4.2f}dB, SSIM = {:.3f} : edgeJaccard = {:.3f} : CER = {:.3f}% : WER = {:.3f}%'.format(avg_psnr, avg_ssim, avg_edgeJaccard, avg_cer, avg_wer))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,9 +1,9 @@
{ {
"paths": { "paths": {
"folder_original_images": "path/to/folder", "folder_original_images": "/home/ceibal-anii/emilio/pfc/deep-tempest/end-to-end/sinteticas/trainsets/ground-truth",
"__comment1__": "Insert input folder (original images)", "__comment1__": "Insert input folder (original images)",
"folder_simulated_images": "path/to/new_folder", "folder_simulated_images": "/home/ceibal-anii/emilio/pfc/deep-tempest/end-to-end/imgs_diff_signaling/trainsets",
"__comment2__": "Insert output folder (tempest degraded images)" "__comment2__": "Insert output folder (tempest degraded images)"
}, },
@ -17,6 +17,12 @@
"sdr_rate": 50e6, "sdr_rate": 50e6,
"__comment3__": "sampling rate of SDR", "__comment3__": "sampling rate of SDR",
"interpolator": 10,
"__comment4__": "Interpolation for analog pulse simulation",
"differential_signaling": true,
"__comment5__": "Use diferential signaling. Epsilon delay as one interpolation unit",
"random": { "random": {
"harmonics": [3], "harmonics": [3],
@ -25,7 +31,7 @@
"sigma": null, "sigma": null,
"__comment2__":"Gaussian noise with random sigma over specified range [sigma1, simga2]", "__comment2__":"Gaussian noise with random sigma over specified range [sigma1, simga2]",
"freq_error": [0,50], "freq_error": [0,15],
"__comment3__": "Random frequency error (Hz) over specified range [f1, f2]", "__comment3__": "Random frequency error (Hz) over specified range [f1, f2]",
"phase_error": [-1,1], "phase_error": [-1,1],

View File

@ -0,0 +1,150 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"from PIL import Image, ImageDraw, ImageFont\n",
"import random\n",
"import string\n",
"from matplotlib import font_manager\n",
"\n",
"def generar_imagen_con_palabras_rndfont(texto, img_shape, tamano_letra, color_texto, fondo_color, ruta_guardado):\n",
" # Create white plain image\n",
" imagen = Image.new(\"RGB\", img_shape, fondo_color)\n",
" dibujo = ImageDraw.Draw(imagen)\n",
"\n",
" # Compute amount of lines depending on image shape and number of characters\n",
" N_total = len(texto)\n",
" N_lines = N_total//img_shape[1]\n",
" N_horizontal = img_shape[0]//(tamano_letra)\n",
"\n",
" # Get system font types\n",
" system_fonts = font_manager.findSystemFonts()\n",
" # Filter out some non-readable fonts\n",
" ttf_fonts = [font for font in system_fonts if ((\".ttf\" in font) and (\"lohit\" not in font) and (\"kacst\" not in font)) and (\"Navilu\" not in font) and (\"telu\" not in font) and (\"lyx\" not in font) and (\"malayalam\" not in font) and (\"tlwg\" not in font) and (\"samyak\" not in font) and (\"droid\" not in font) and (\"kalapi\" not in font) and (\"openoffice\" not in font) and (\"orya\" not in font)]\n",
"\n",
" # Write over image one font per line\n",
" for iter in range(N_lines):\n",
" rnd_font_index = random.randint(0,len(ttf_fonts)-1)\n",
" random_font = ttf_fonts[rnd_font_index]\n",
" # print(f\"Font N {iter}: {random_font}\")\n",
"\n",
" # Load text font and set size\n",
" fuente = ImageFont.truetype(font=random_font, size=tamano_letra)\n",
" # Get line text\n",
" texto_linea = texto[iter * N_horizontal : (iter+1) * N_horizontal]\n",
"\n",
" # Adjust text position\n",
" posicion_texto = ((imagen.width - fuente.getsize(texto_linea)[0]) // 2, \n",
" int(1.5* iter * tamano_letra))\n",
"\n",
" # Write text\n",
" dibujo.text(posicion_texto, texto_linea, font=fuente, fill=color_texto)\n",
"\n",
" # Save image\n",
" imagen.save(ruta_guardado)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"147\n"
]
}
],
"source": [
"system_fonts = font_manager.findSystemFonts(fontpaths=None, fontext='ttf')\n",
"ttf_fonts = [font for font in system_fonts if ((\".ttf\" in font) and (\"lohit\" not in font) and (\"kacst\" not in font)) and (\"Navilu\" not in font) and (\"telu\" not in font) and (\"lyx\" not in font) and (\"malayalam\" not in font) and (\"tlwg\" not in font) and (\"samyak\" not in font) and (\"droid\" not in font) and (\"kalapi\" not in font) and (\"openoffice\" not in font) and (\"orya\" not in font)]\n",
"print(len(ttf_fonts))"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"33\n",
"7\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmp/ipykernel_34832/450411713.py:33: DeprecationWarning: getsize is deprecated and will be removed in Pillow 10 (2023-07-01). Use getbbox or getlength instead.\n",
" posicion_texto = ((imagen.width - fuente.getsize(texto_linea)[0]) // 2,\n"
]
}
],
"source": [
"# initializing size of string\n",
"N_total = 30000\n",
"img_shape = (1600,900)\n",
"N_lines = N_total//img_shape[1]\n",
"tamano_letra = 22\n",
"N_horizontal = img_shape[0]//(10*tamano_letra)\n",
"\n",
"print(N_lines)\n",
"print(N_horizontal)\n",
"\n",
"# Generating random strings\n",
"texto = ''.join(random.choices(string.ascii_letters +\n",
" string.digits, k=N_total))\n",
"\n",
"color_texto = \"black\"\n",
"fondo_color = \"white\"\n",
"ruta_guardado = \"./imagen_con_palabras.png\"\n",
"\n",
"# Random font por cada línea\n",
"generar_imagen_con_palabras_rndfont(texto, img_shape, tamano_letra, color_texto, fondo_color, ruta_guardado)\n",
"\n",
"# Texto especificando un ttf\n",
"# tipo_letra = \"Playfair_Display/static/PlayfairDisplay-Black.ttf\" # Reemplaza con la ruta de tu tipo de letra\n",
"# generar_imagen_con_palabras(texto, img_shape, tipo_letra, tamano_letra, color_texto, fondo_color, ruta_guardado)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,42 @@
import os
from datetime import date
import random
import string
from text_utils import generate_random_txt_img
NUM_IMAGES = 10
NUM_CHARACTERS = 29000
IMG_SHAPE = (1600,900)
TEXT_SIZE = 22
today = date.today()
# Month abbreviation, day and year
save_path = today.strftime("%b-%d-%Y")
if not os.path.exists(save_path):
os.mkdir(save_path)
else:
i = 2
save_path_tmp = save_path + str(i)
while os.path.exists(save_path_tmp):
i+=1
save_path_tmp = save_path + str(i)
images_name = "generated_text"
for i in range(NUM_IMAGES):
text = ''.join(random.choices(string.ascii_letters +
string.digits, k=NUM_CHARACTERS))
text_color = random.choices(["black","white"], weights=(70, 30), k=1)[0]
background_color = "black"*(text_color=="white") + "white"*(text_color=="black")
generate_random_txt_img(text,
IMG_SHAPE,
TEXT_SIZE,
text_color,
background_color,
os.path.join(save_path,images_name+str(i)+".png"))

View File

@ -0,0 +1,47 @@
from PIL import Image, ImageDraw, ImageFont
import random
import string
from matplotlib import font_manager
def generate_random_txt_img(text, img_shape, text_size, text_color, background_color, save_path):
# Create white plain image
imagen = Image.new("RGB", img_shape, background_color)
dibujo = ImageDraw.Draw(imagen)
# Compute amount of lines depending on image shape and number of characters
N_total = len(text)
N_lines = N_total//img_shape[1]
N_horizontal = int(1.6 * img_shape[0] // (text_size))
# Get system font types
system_fonts = font_manager.findSystemFonts()
# Filter out some non-readable fonts
ttf_fonts = [font for font in system_fonts if ((".ttf" in font) and ("lohit" not in font) and ("kacst" not in font)) and ("Navilu" not in font) and ("telu" not in font) and ("lyx" not in font) and ("malayalam" not in font) and ("tlwg" not in font) and ("samyak" not in font) and ("droid" not in font) and ("kalapi" not in font) and ("openoffice" not in font) and ("orya" not in font)]
# Write over image one font per line
for iter in range(N_lines):
rnd_font_index = random.randint(0,len(ttf_fonts)-1)
random_font = ttf_fonts[rnd_font_index]
# print(f"Font N {iter}: {random_font}")
# Load text font and set size
try:
fuente = ImageFont.truetype(font=random_font, size=text_size)
except:
# Load a fixed font when crashes
fuente = ImageFont.truetype("/usr/share/fonts/truetype/liberation2/LiberationSans-BoldItalic.ttf", size=text_size)
# Get line text
texto_linea = text[iter * N_horizontal : (iter+1) * N_horizontal]
# Adjust text position
posicion_texto = ((imagen.width - fuente.getsize(texto_linea)[0]) // 2,
int(1.25* iter * text_size)
)
# Write text
dibujo.text(posicion_texto, texto_linea, font=fuente, fill=text_color)
# Save image
imagen.save(save_path)