basado en
Ejemplo de VAE con Tesorflow.Keras F. Chollet, Convolutional Variational AutoEncoder (VAE) trained on MNIST digits Creado 2020/05/03, última versión: 2020/05/03.
Ejemplo de Cápitulo 3 de D. Foster, Generative deep learning, OReilly, 2019
Adaptado a tensorflow.keras: Mariano Rivera, Agosto 2021. Ver 1.0
Recomendamos al lector que si busca una derivación de las VAE mas rigurosa, vea las notas anteriores. En esta nota pretendimos dar una enfoque complementario al que habíamos usado y ser mas intuitivos.
Los ejemplos anteriores de autocodificadores variacionales (VAE) presentados en estas notas presenta los detalles de la derivación de los mismos. Ahora, ya que hemos revisado la teoría, tomaremos un enfoque más pŕactico para motivar la versión convolucional de estos autocodificadores (Conv-VAE).
En este ejercicio revisaremos:
Uso de Tensorflow.datasets para leer un directorio que contienen las imágenes de rostros de la base de datos CelebA.
Derivación de la clase Model
para definir los modelos que componen nuestro VAE.
Definición del paso de entrenamiento en nuestro modelo VAE con el cálculo de la función de pérdida ELBO
Especificación de las métricas con que revisamos el avance del entrenamiento
Defición de una función callback
que permite salvar los pesos del modelo total cada vez que en el entrenamiento se tienen una mejora en la pérdida.
Recuparer un modelo a partir de su creación y cargar los pesos previamente.
Analizar el comportamiento de la VAE.
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os
from glob import glob
import numpy as np
gpu_available = tf.config.list_physical_devices('GPU')
print(gpu_available)
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
section = 'vae'
run_id = '0001'
data_name = 'faces'
RUN_FOLDER = '../run/{}/'.format(section)
RUN_FOLDER += '_'.join([run_id, data_name])
if not os.path.exists(RUN_FOLDER):
os.mkdir(RUN_FOLDER)
os.mkdir(os.path.join(RUN_FOLDER, 'viz'))
os.mkdir(os.path.join(RUN_FOLDER, 'images'))
os.mkdir(os.path.join(RUN_FOLDER, 'weights'))
mode = 'build' #'load' #
La base de datos (BD) CelebA fue recolectada por Lui et al. (2015) y consiste de mas de 200 mil imágenes de rostros de personas famosas (celebridades) a las cuales se le asocia un vector de 40 atributos binarios (sexo,pelo castaño, bigote, variación de pose, etc.) que presentan una gran variación de la pose y del fondo. Sin embargo dicha base de datos presenta un gran sesgo sobrerepresentando gente jóven, causásica y atractiva. Otras razas como los latinos estan sub-representadas, por ello han aparecido nuevas BD que tratan de mitigar este problema, como FairFace por Kärkkäinen and J. Joo (2021). A pesar del sesgo implícito en CelebA, ésta es una fuente inicial para probar algoritmos, sin embargo tengamos siempre en cuenta que lelvar un aloritmo entrabnado con una particular base de datos a producción conlleva un etapa de validación con datos del dominio real.
CelabA ha sido apmpliamente utilizada para desarrollar algoritmos de reconocimiento de atributos faciales, detección de rostros, localización de puntos de referencia (o partes faciales) y edición y síntesis de rostros. La BD contiene
202,599 imágenes de caras, de
10,177 personas distintas, con 4
40 atributos binarios anotados por imagen.
(Lui et al., 2015) Z. Lui et al. Deep Learning Face Attributes in the Wild, Proc ICCV 2015.
(Kärkkäinen and Joo, 2021) K. Kärkkäinen and J. Joo, “FairFace: Face Attribute Dataset for Balanced Race, Gender, and Age for Bias Measurement and Mitigation,” 2021 IEEE Winter Conference on Applications of Computer Vision (WACV), 2021, pp. 1547-1557, doi: 10.1109/WACV48630.2021.00159.
Hemos descargado CelebA a nuestra estación de trabajo en el directorio /home/mariano/Data/celebA/imgs_align/img_align_celeba_png
. Las imágenes de interés son las que corresponden a las imágenes aproximadamente alineadas respecto a los ojos de los rostros.
A continuación definimos variables globales para nuetro ejemplo.
# run params
DATA_FOLDER = '/home/mariano/Data/celebA/imgs_align/img_align_celeba_png'
INPUT_DIM = (128,128,3)
LATENT_DIM = 150
BATCH_SIZE = 384
R_LOSS_FACTOR = 100000 # 10000
EPOCHS = 400
INITIAL_EPOCH = 0
filenames = np.array(glob(os.path.join(DATA_FOLDER, '*.png')))
n_images = filenames.shape[0]
steps_per_epoch = n_images//BATCH_SIZE
print('num image files : ', n_images)
print('steps per epoch : ', steps_per_epoch )
num image files : 202599
steps per epoch : 527
Las imágenes las cargaremos en memoria, ajustando su tamaño a (128,128,3) y su rango dinámico (canales) a .
En Tensorflow 1.x era muy popular usar data_generators
. Éstos nos permitían conectar un lector iterativo a las imágenes y preprocesarlas al momento de su lectura. Ahora,
en Tensorflow 2.x los generadres han sido reemplazados por tensorflow.datasets
que permiten tener una funcionalidad similar a los generadores pero son más eficienes en cuanto a la parelización de tareas: es posible leer las imágenes y preprocesarlas en los distintos núcleos (cores) del CPU mientras el GPU procesa el lote previo de datos y realiza el paso de entrenamiento. Como planeamos entrenar por varias épocas, es necesario indicar que una vez que hayamos leido suficientes lotes para agotar la BD, la lectura deberá repetirse desde el inicio, para ello usamos el método repeat()
.
Con la variable tf.data.AUTOTUNE
dejaremos la responsabilidad de lanzar tantas tareas paralelas al Tensorflow en la parte de carga y proprocesamiento del lote de datos. Este es un avance significativo en la mejora de la eficiencia del entrenamiento, pues Python
no está diseñado para lanzar tareas concurrentes. El indicando con el método prefetch
nos permite usar los núcleos del CPU para leer y cargar los datos (lotes o batches) a la vez que el GPU esta realizando cálculos. Es recomendable realizar la carga de datos, pues ajustará las dimensiones de ls imágenes, antes de aplicar el preprocesamiento de cada dato en el lote mediante el map
de una función lambda. Este map es un método de Datasets que también se puede realizarse en paralelo.
En nuestro caso, obtuvimos una mejora de 10x al pasar de generators a datasets. Y no solo eso, nuestra pretención de usar hilos (threads) en el proceso de entrenamiento mediante fit_generator llegó a congelarse.
AUTOTUNE = tf.data.AUTOTUNE
dataset=tf.keras.utils.image_dataset_from_directory(directory = DATA_FOLDER,
labels = None,
batch_size = BATCH_SIZE,
image_size = INPUT_DIM[:2],
shuffle = True,).repeat()
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
normalization_layer = tf.keras.layers.experimental.preprocessing.Rescaling(1./255)
dataset = dataset.map(lambda x: (normalization_layer(x)), num_parallel_calls=AUTOTUNE)
Found 202599 files belonging to 1 classes.
A continuación mostramos como leer un lote con la librería dataset. Note que se usa el método take
y se indica que lote que se carga. El lote es un tensor de dimensión (batch_size, heigth, width, n_channels). Para ilustrar los datos del lote cargado desplegamos las primeras las imágenes, previamente transformadas a formato numpy para poder emplear matplotlib.pyplot.imshow.
import matplotlib.pyplot as plt
plt.figure(figsize=(12,6), tight_layout=True)
for images in dataset.take(1):
for i in range(18):
ax = plt.subplot(3, 6, i + 1)
plt.imshow(images[i].numpy())
plt.axis('off')
plt.show()
images.shape
TensorShape([384, 128, 128, 3])
A continuación definimos los modelos que a su vez componen el modelo VAE.
Partimos de un autocodificador simple como el mostrado en la siguiente figura.
Podemos ver que consta de dos bloques, el codificador que matemáticamente se puede representar como:
(1)
donde y . Lo importante aqui es que , es decir el vector codifica y comprime la información del vector . Luedo de decodificador toma el vector codificado y recupera (o al menos aproxima) al vector , denotamos por la reconstrucción. El decodificador lo denotamos por
(2)
Los parámetros del autocodificador se calculan resolviendo
(3)
donde es una métrica del error de reconstrucción.
A continuación mostramos las implementaciónes de los modelos Encoder y Decoder que conforman un autoencoder. Las implementaciones consideran que los datos son las imágenes de la BD CelebA, el vector codificado es de dimensión LATENT_DIM
.
from tensorflow import keras
import tensorflow.keras.layers as layers
from tensorflow.keras.layers import Dense, Conv2D, Conv2DTranspose
from tensorflow.keras.layers import Flatten, Reshape, Dropout, BatchNormalization, Activation, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential
class Encoder(keras.Model):
def __init__(self, input_dim, output_dim, encoder_conv_filters, encoder_conv_kernel_size, encoder_conv_strides, use_batch_norm=True, use_dropout=True, **kwargs):
'''
'''
super(Encoder, self).__init__(**kwargs)
self.input_dim = input_dim
self.output_dim = output_dim
self.encoder_conv_filters = encoder_conv_filters
self.encoder_conv_kernel_size = encoder_conv_kernel_size
self.encoder_conv_strides = encoder_conv_strides
self.n_layers_encoder = len(self.encoder_conv_filters)
self.use_batch_norm = use_batch_norm
self.use_dropout = use_dropout
self.model = self.encoder_model()
self.built = True
def get_config(self):
config = super(Encoder, self).get_config()
config.update({"units": self.units})
return config
def encoder_model(self):
'''
'''
encoder_input = layers.Input(shape=self.input_dim, name='encoder' )
x = encoder_input
for i in range(self.n_layers_encoder):
x = Conv2D(filters = self.encoder_conv_filters[i],
kernel_size = self.encoder_conv_kernel_size[i],
strides = self.encoder_conv_strides[i],
padding = 'same',
name = 'encoder_conv_' + str(i),)(x)
if self.use_batch_norm:
x = BatchNormalization()(x)
x = LeakyReLU()(x)
if self.use_dropout:
x = Dropout(rate = 0.25)(x)
self.last_conv_size = x.shape[1:]
x = Flatten()(x)
encoder_output = Dense(self.output_dim)(x)
model = keras.Model(encoder_input, encoder_output)
return model
def call(self, inputs):
'''
'''
return self.model(inputs)
class Decoder(keras.Model):
def __init__(self, input_dim, input_conv_dim,
decoder_conv_t_filters, decoder_conv_t_kernel_size, decoder_conv_t_strides,
use_batch_norm=True, use_dropout=True, **kwargs):
'''
'''
super(Decoder, self).__init__(**kwargs)
self.input_dim = input_dim
self.input_conv_dim = input_conv_dim
self.decoder_conv_t_filters = decoder_conv_t_filters
self.decoder_conv_t_kernel_size= decoder_conv_t_kernel_size
self.decoder_conv_t_strides = decoder_conv_t_strides
self.n_layers_decoder = len(self.decoder_conv_t_filters)
self.use_batch_norm = use_batch_norm
self.use_dropout = use_dropout
self.model = self.decoder_model()
self.built = True
def get_config(self):
config = super(Decoder, self).get_config()
config.update({"units": self.units})
return config
def decoder_model(self):
'''
'''
decoder_input = layers.Input(shape=self.input_dim, name='decoder' )
x = Dense(np.prod(self.input_conv_dim))(decoder_input)
x = Reshape(self.input_conv_dim)(x)
for i in range(self.n_layers_decoder):
x = Conv2DTranspose(filters = self.decoder_conv_t_filters[i],
kernel_size = self.decoder_conv_t_kernel_size[i],
strides = self.decoder_conv_t_strides[i],
padding = 'same',
name = 'decoder_conv_t_' + str(i))(x)
if i < self.n_layers_decoder - 1:
if self.use_batch_norm:
x = BatchNormalization()(x)
x = LeakyReLU()(x)
if self.use_dropout:
x = Dropout(rate = 0.25)(x)
else:
x = Activation('sigmoid')(x)
decoder_output = x
model = keras.Model(decoder_input, decoder_output)
return model
def call(self, inputs):
'''
'''
return self.model(inputs)
El propósito de tenemos es estimar la distribución de los datos para luego poderla muestrear, es decir generar muestras tal que se cumpla:
Las imágenes generadas sean similares a los de la BD de entrenamiento, rostros en CelebA.
El generador es paramétrizado por un vector de variables latentes (de menor dimensión que las imágenes) las cuelas tienen una distribución conocida y simple de muestrear.
El generador es capaz de generar datos que, aunque similares (rostros), no esten presentes en la BD de entrenamiento; no es una simple memoria.
El primer punto lo podriamos satisfacer con un autocodificador simple: asuminedo al vector de la figura anterior el vector de espacio latentes. Sin embargo, para el punto dos no es claro que distribución tiene . Por lo tanto, no podemos obtener nuevas muestras del espacio latente para generar nuevos datos. es decir no podemos hacer
(4)
para luego generar el nuevo dato con
(5)
El punto tres es aun mas complejo, ¿que pasa si utiliza una que caen entre las de la BD? ¿Estará en ese punto definida la ? ¿correponderá a algo como un rostro? Es decir, ¿Tendrá suficinte poder de generalización el podrá para procesar variables latentes no vistas dureate el entrenamiento?
Los puntos dos y tres están muy relacionados, si las variables latentes tienen una distribución simple monomodal será fácil de muestrear. Además debemos imponer regularidad en : imágenes similares provienen de variables latentes similares.
Aquí es donde entran en acción los autoencodificadores variacionales.
Primero, lo que haremos es dejar a con su distribución extraña que tiene y a partir de ella obtener una muestrade variable latente que se distribuye en forma simple: . Es tienen una distribución Gaussiana multivariada con media cero y varianza igual a uno en cada variable. Para desarrollar esta parte hagamos la siguiente suposición, teemos varias muestras tomadas de una misma persona, tal vez de una pequeña secuencia de un video, luego dichas muestras varian ligeramente entre ellas. Entonces es de esperarse que también sus variables latentes sean similares y caigan alrededor de una media (para la persona específica) y su variabilidad esté determinada por una varianza (igualmente para dicha persona). Ahora, si tenemos varios videos de rostros para distintas personas, cada uno de ellos tendra su propia media y varianza. A lo que vamos es que dada rostro tienen en realidad una pequeña distribución, como una nube alrededor de un punto (su media individual). Si muestreamos dicha nube debemos tener imágenes del rostro de una misma persona con ligeras variaciones: como de pose de posición de las cejas, de los labios, etc. Con ello lograríamos imponer regularidad a nuestra distribución.
Como solo tenemos rostros aislados y no videos, lo que haremos es que cada rostro genera una codificación en forma determinista, pero dicha codificación puede producir distintas variables latentes , con una distribución individual . Luego impondremos a que distribución de toda la población sea la normal multivariada . Con ello logramos que las se concentren y no se dispercen dejando huecos y sea fácil de generar nuevas que produzcan restros realistas.
Queremos algo como lo siguiente
(6)
Sin embargo el problma es que la función es estocástica y en principio no podriamos impelementar la retropropagación del gradiente desde el error (pérdida) hacia el codificador.
la forma de darle la vuelta a este problema es usar el llamado truco de la parametrización que consiste en estimar la media y la varianza de la distribución y luego calcular donde se distribuye normal . Eso lo podemos ver como
(7)
donde representa una transformación mediante capas densas y el truco de la parametrización:
(8)
A continuación implementamos el modelo Sampler
que realiza la tarea arriba descrita.
class Sampler(keras.Model):
"""Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
def __init__(self, latent_dim, **kwargs):
super(Sampler, self).__init__(**kwargs)
self.latent_dim = latent_dim
self.model = self.sampler_model()
self.built = True
def get_config(self):
config = super(Sampler, self).get_config()
config.update({"units": self.units})
return config
def sampler_model(self):
'''
input_dim is a vector in the latent (codified) space
'''
input_data = layers.Input(shape=self.latent_dim)
z_mean = Dense(self.latent_dim, name="z_mean")(input_data)
z_log_var = Dense(self.latent_dim, name="z_log_var")(input_data)
self.batch = tf.shape(z_mean)[0]
self.dim = tf.shape(z_mean)[1]
epsilon = tf.keras.backend.random_normal(shape=(self.batch, self.dim))
z = z_mean + tf.exp(0.5 * z_log_var) * epsilon
model = keras.Model(input_data, [z, z_mean, z_log_var])
return model
def call(self, inputs):
'''
'''
return self.model(inputs)
Noten que el componente estocástico (generación de ) esta fuera del la ruta de cálculo desde los datos hasta el error y que dicho componente estocástico no tienen parámetros que requieran ser entrenados.
La siguiente figura ilustra al autoencodificador variacional.
Ahora implementamos el modelo del VAE. Es importante que la función de costo incluya los siguientes elementos:
Error de reconstucción. Medir la discrepancia entre la imágen de entrada y la imagen reconstruida .
Desviación de distribución de las variables latentes respecto ala normal multivariada
Para el primer punto usaremos la el Error Absoluto Promedio (MAE) y para el segundo la divergencia de Kullback-Leibler entre distribuciones, que por ser Gaussianas no correlacionadas y una de ellas la normal toma una forma muy simple. Usaremos el parámetro R_LOSS_FACTOR
para ponderar cada uno de los errores.
Adicionalmente, hemos preferido en esta nota implementar el método train_step
que corresponde a lo realizado dentro del lazo de entrenamiento. Con ello pretendemos mostrar como se implementa, se evalua una párdida (loss) particular y como se definen los valores a los que el proceso de entrenamiento da seguimiento: loss
, loss_reconstruction
y loss_kl
.
Usamos también una callback
que nos permite salvar los pesos del modelo cada vez que se logre una mejora en loss
.
class VAE(keras.Model):
def __init__(self, r_loss_factor=1, summary=False, **kwargs):
super(VAE, self).__init__(**kwargs)
self.r_loss_factor = r_loss_factor
# Architecture
self.input_dim = INPUT_DIM
self.latent_dim = LATENT_DIM
self.encoder_conv_filters = [64,64,64,64]
self.encoder_conv_kernel_size = [3,3,3,3]
self.encoder_conv_strides = [2,2,2,2]
self.n_layers_encoder = len(self.encoder_conv_filters)
self.decoder_conv_t_filters = [64,64,64,3]
self.decoder_conv_t_kernel_size= [3,3,3,3]
self.decoder_conv_t_strides = [2,2,2,2]
self.n_layers_decoder = len(self.decoder_conv_t_filters)
self.use_batch_norm = True
self.use_dropout = True
self.total_loss_tracker = tf.keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")
self.mae = tf.keras.losses.MeanAbsoluteError()
# Encoder
self.encoder_model = Encoder(input_dim = self.input_dim,
output_dim = self.latent_dim,
encoder_conv_filters = self.encoder_conv_filters,
encoder_conv_kernel_size = self.encoder_conv_kernel_size,
encoder_conv_strides = self.encoder_conv_strides,
use_batch_norm = self.use_batch_norm,
use_dropout = self.use_dropout)
self.encoder_conv_size = self.encoder_model.last_conv_size
if summary:
self.encoder_model.summary()
# Sampler
self.sampler_model = Sampler(latent_dim = self.latent_dim)
if summary:
self.sampler_model.summary()
# Decoder
self.decoder_model = Decoder(input_dim = self.latent_dim,
input_conv_dim = self.encoder_conv_size,
decoder_conv_t_filters = self.decoder_conv_t_filters,
decoder_conv_t_kernel_size= self.decoder_conv_t_kernel_size,
decoder_conv_t_strides = self.decoder_conv_t_strides,
use_batch_norm = self.use_batch_norm,
use_dropout = self.use_dropout)
if summary: self.decoder_model.summary()
self.built = True
@property
def metrics(self):
return [self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,]
@tf.function
def train_step(self, data):
'''
'''
with tf.GradientTape() as tape:
# predict
x = self.encoder_model(data)
z, z_mean, z_log_var = self.sampler_model(x)
pred = self.decoder_model(z)
# loss
r_loss = self.r_loss_factor * self.mae(data, pred)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = r_loss + kl_loss
# gradient
grads = tape.gradient(total_loss, self.trainable_weights)
# train step
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# compute progress
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(r_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),}
@tf.function
def generate(self, z_sample):
'''
We use the sample of the N(0,I) directly as
input of the deterministic generator.
'''
return self.decoder_model(z_sample)
@tf.function
def codify(self, images):
'''
For an input image we obtain its particular distribution:
its mean, its variance (unvertaintly) and a sample z of such distribution.
'''
x = self.encoder_model.predict(images)
z, z_mean, z_log_var= self.sampler_model(x)
return z, z_mean, z_log_var
# implement the call method
@tf.function
def call(self, inputs, training=False):
'''
'''
tmp1, tmp2 = self.encoder_model.use_Dropout,self.decoder_model.use_Dropout
if not training:
self.encoder_model.use_Dropout, self.decoder_model.use_Dropout = False,False
x = self.encoder_model(inputs)
z, z_mean, z_log_var = self.sampler_model(x)
pred = self.decoder_model(z)
self.encoder_model.use_Dropout, self.decoder_model.use_Dropout = tmp1, tmp2
return pred
vae = VAE(r_loss_factor=R_LOSS_FACTOR, summary=True)
Model: "encoder"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
model (Functional) (None, 150) 728150
=================================================================
Total params: 728,150
Trainable params: 727,638
Non-trainable params: 512
_________________________________________________________________
Model: "sampler"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
model_1 (Functional) [(None, 150), (None, 150) 45300
=================================================================
Total params: 45,300
Trainable params: 45,300
Non-trainable params: 0
_________________________________________________________________
Model: "decoder"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
model_2 (Functional) (None, 128, 128, 3) 731779
=================================================================
Total params: 731,779
Trainable params: 731,395
Non-trainable params: 384
_________________________________________________________________
vae.summary()
Model: "vae"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
encoder (Encoder) multiple 728150
_________________________________________________________________
sampler (Sampler) multiple 45300
_________________________________________________________________
decoder (Decoder) multiple 731779
=================================================================
Total params: 1,505,235
Trainable params: 1,504,333
Non-trainable params: 902
_________________________________________________________________
vae.compile(optimizer=keras.optimizers.Adam())
from tensorflow.keras.callbacks import ModelCheckpoint
filepath = 'best_weight_model.h5'
checkpoint = ModelCheckpoint(filepath=filepath,
monitor='loss',
verbose=1,
save_best_only=True,
save_weights_only=True,
mode='min')
callbacks = [checkpoint]
Entrenamos con el procedimiento normal de keras: invocando al método fit
. Sin embargo, internamente el lazo de entrenamiento estará llamando a nuestra implementación de train_step
.
vae.fit(dataset,
batch_size = BATCH_SIZE,
epochs = EPOCHS,
initial_epoch = INITIAL_EPOCH,
steps_per_epoch = steps_per_epoch,
callbacks = callbacks)
Epoch 1/400
527/527 [==============================] - 46s 86ms/step - loss: 7516.1080 - reconstruction_loss: 6913.9912 - kl_loss: 371.8324
Epoch 00001: loss improved from inf to 7285.82129, saving model to best_weight_model.h5
Epoch 2/400
527/527 [==============================] - 46s 87ms/step - loss: 6812.7973 - reconstruction_loss: 6329.5083 - kl_loss: 385.5406
Epoch 00002: loss improved from 7285.82129 to 6715.04883, saving model to best_weight_model.h5
Epoch 3/400
527/527 [==============================] - 46s 87ms/step - loss: 6504.1279 - reconstruction_loss: 6068.6631 - kl_loss: 390.3454
Epoch 00003: loss improved from 6715.04883 to 6459.00439, saving model to best_weight_model.h5
Epoch 4/400
527/527 [==============================] - 46s 87ms/step - loss: 6331.8780 - reconstruction_loss: 5902.8765 - kl_loss: 392.8752
. . .
Epoch 00398: loss improved from 5172.32568 to 5171.25830, saving model to best_weight_model.h5
Epoch 399/400
527/527 [==============================] - 47s 89ms/step - loss: 5171.2232 - reconstruction_loss: 4809.0151 - kl_loss: 363.3829
Epoch 00399: loss did not improve from 5171.25830
Epoch 400/400
527/527 [==============================] - 47s 89ms/step - loss: 5171.5463 - reconstruction_loss: 4809.2295 - kl_loss: 363.3069
Epoch 00400: loss did not improve from 5171.25830
Muy bien terminamos el entrenamiento y salvamos los pesos de nuestro modelo, los que resultaron de la última época de entrenamiento.
vae.save_weights("model_vae_faces_1e4.h5")
Sin embargo, sabemos que los mejores pesos no necesariamente correpondel a los calculados en la última época, como es el caso; sino a los que almacen nuestra callback
en el archivo "best_weight_model.h5"
.
Pero son solo los pesos, no el modelo completo. Para recuperar nuestro modelo debemos construirlo primero, lo llamamos vae2
para distinguirlo del que usamos en entrenamiento. Una vez construido el modelo, cargamos en él los pesos salvados.
vae2 = VAE(r_loss_factor=R_LOSS_FACTOR)
vae2.load_weights("best_weight_model.h5")
Como primer ejercicio generamos una variable en el espacio latente:
(9)
Luego seleccionamos dos entradas de dicho vector: y . Entonces dejando fijas todas las entradas de exepto y exploramos las imagenes que se generas variando dichas entradas.
Notamos que una variable (eje horizontal) parece que hace variar el rostro de tener características que van de masculinas a femeninas. La otra variable (eje vertical) parece asociarse con la sonrisa (alegría o tristeza del rostro).
Exploramos otras combinaciones de variables y vimos que algunas estan encontrar que estan asociadas con la orientacion del rostro, el grueso de las cejas, usar o no lentes, etc. Sin embargo notamos que lo que para un rostro es válido para otro no: la misma variable puedede star asociada a distintas características.
Podemos encontramos cierta analogía entre las variables latente y los cromosomas: así como gen puede asociarse a distintos fenotipos, o varios genes controlar un fenotipo; igual pasa con las variables latentes en nuestro generador. En nuestra analogia, el valor de una variable latente correponde a la expresión (real) de un gen (binario). Además la codificación “genética” en las variable latente cambia de entrenamiento a entrenamiento. Por lo que repetimos el entrenamiento, seguramente la figura generada abajo tenga distinta interpretación.
Notemos como el generador, aunque genera rostros pausibles, tienen dificultades para representar un fondo razonable, ello debido a la gran variabilidad de fondos en la BD CelebA.
import matplotlib.pyplot as plt
def plot_latent_space(vae, input_size=(28,28,1), n=30, figsize=15, scale=1., latents_start=[0,1]):
# display a n*n 2D manifold of digits
canvas = np.zeros((input_size[0]*n, input_size[1]*n, input_size[2]))
# linearly spaced coordinates corresponding to the 2D plot
# of digit classes in the latent space
grid_x = np.linspace(-scale, scale, n)
grid_y = np.linspace(-scale, scale, n)[::-1]
z_sample = np.random.normal(0,1,(1,vae.latent_dim))
for i, yi in enumerate(grid_y):
for j, xi in enumerate(grid_x):
z_sample[0][latents_start[0]], z_sample[0][latents_start[1]]=xi,yi
x_decoded = vae.generate(z_sample)
img = x_decoded[0].numpy().reshape(input_size)
canvas[i*input_size[0] : (i + 1)*input_size[0],
j*input_size[1] : (j + 1)*input_size[1],
: ] = img
plt.figure(figsize=(figsize, figsize))
start_range = input_size[0] // 2
end_range = n*input_size[0] + start_range
pixel_range = np.arange(start_range, end_range, input_size[0])
sample_range_x = np.round(grid_x, 1)
sample_range_y = np.round(grid_y, 1)
plt.xticks(pixel_range, sample_range_x)
plt.yticks(pixel_range, sample_range_y)
plt.xlabel("z[{}]".format(latents_start[0]))
plt.ylabel("z[{}]".format(latents_start[1]))
plt.imshow(canvas, cmap="Greys_r")
plt.show()
plot_latent_space(vae2, input_size=INPUT_DIM, n = 6, latents_start=[20,30], scale=3)
Ahora leemos un lote de imágenes de la BD. Las pasamos por el Codificador y Muestrador.
images = dataset.take(4)
import matplotlib.pyplot as plt
plt.figure(figsize=(12,6), tight_layout=True)
for images in dataset.take(1):
for i in range(18):
ax = plt.subplot(3, 6, i + 1)
plt.imshow(images[i].numpy())
plt.axis('off')
plt.show()
images.shape
TensorShape([384, 128, 128, 3])
x = vae2.encoder_model.predict(images)
z, z_mean, z_log_var= vae2.sampler_model(x)
Ya tenemos para cada imágen el vector codificado . Ahora, seleccionamos dos imágenes al azar, digamos la y la . Entonces usamos su codificación: y y calculamos la combinación convexa de dichos vectores:
(11)
y generamos cada rostro de la combinación (de hecho solo unos pasos).
(12)
y los mostramos las reconstrucciones. Los pasos intermedios muestran una transformación paulatina desde el rostro al rostro . Note que los pasos intermedios preservan la cualidad de aparentar ser un rostro y no son simple superposiciones de imágenes.
def plot_warping(z1, z2, n=(1,5)):
n_trans = np.prod(n)
f, axarr = plt.subplots(n[0],n[1], figsize=(10,6), tight_layout=True)
for i in range(n[0]):
for j in range(n[1]):
alpha = (i*n[1]+j)/(n_trans-1)
z_new = (1-alpha)*z1 + alpha*z2
z_new=tf.expand_dims(z_new, axis=0)
x_decoded = vae.generate(z_new)
img = x_decoded[0].numpy()
axarr[i,j].imshow(img)
axarr[i,j].set_title("{:0.3}".format(alpha))
axarr[i,j].axis('off')
plt.show()
plot_warping(z1=z_mean[46], z2=z_mean[60], n=(3,5))
Nos resta revisar que tan Gaussianas es distribución . Elegimos para un lote de imágenes elegimos dosentradas de los vectores devariables latentes y desplegamos los puntos. Notamos que se concentran al rededor de , no parecen estar correlacionadas y parace monomodal.
for i in range(5):
images = dataset.take(1)
x = vae2.encoder_model.predict(images)
_, z_mean,_= vae2.sampler_model(x)
if i:
Z = np.concatenate((Z, z_mean), axis=0)
else:
Z = z_mean
Z = np.array(Z)
Z.shape
(1920, 150)
def plot_label_clusters(xdata, ydata, limits=10):
# display a 2D plot of the digit classes in the latent space
plt.figure(figsize=(8,7))
plt.scatter(xdata,ydata, s=(2,),c='r')
plt.colorbar()
plt.xlabel("z[11]")
plt.ylabel("z[12]")
plt.xlim([-limits,limits])
plt.ylim([-limits,limits])
plt.show()
xdata = Z[:,0]
ydata = Z[:,3]
plot_label_clusters(xdata=xdata, ydata=ydata, limits=5)
Obtenemos los histogramas para las primeras entradas de las variabes latentes y se observamos cierta “Gaussianidad”; en algunas mas que otras. Aunque hemos preferido previlegiar buenas reconstrucciones que penalizar desviaciones de la Gaussiana, los histogramas lucen muy razonables.
nrows, ncols = 4,6
fig, axs = plt.subplots(nrows,ncols, figsize=(12,8), tight_layout=True)
n_bins = 20
for i in range(nrows):
for j in range(ncols):
idx = i*ncols+j
vals = z[:,idx].numpy()
axs[i,j].hist(vals, bins=n_bins)
axs[i,j].set_title("z[{}]".format(idx))
fig.tight_layout()
plt.show()