Autocodificador Variacional Convolucional: Conv-VAE

– Variational AutoEncoder –

basado en

Ejemplo de VAE con Tesorflow.Keras F. Chollet, Convolutional Variational AutoEncoder (VAE) trained on MNIST digits Creado 2020/05/03, última versión: 2020/05/03.

Ejemplo de Cápitulo 3 de D. Foster, Generative deep learning, OReilly, 2019

Adaptado a tensorflow.keras: Mariano Rivera, Agosto 2021. Ver 1.0


Recomendamos al lector que si busca una derivación de las VAE mas rigurosa, vea las notas anteriores. En esta nota pretendimos dar una enfoque complementario al que habíamos usado y ser mas intuitivos.

Los ejemplos anteriores de autocodificadores variacionales (VAE) presentados en estas notas presenta los detalles de la derivación de los mismos. Ahora, ya que hemos revisado la teoría, tomaremos un enfoque más pŕactico para motivar la versión convolucional de estos autocodificadores (Conv-VAE).

En este ejercicio revisaremos:

  1. Uso de Tensorflow.datasets para leer un directorio que contienen las imágenes de rostros de la base de datos CelebA.

  2. Derivación de la clase Model para definir los modelos que componen nuestro VAE.

  3. Definición del paso de entrenamiento en nuestro modelo VAE con el cálculo de la función de pérdida ELBO

  4. Especificación de las métricas con que revisamos el avance del entrenamiento

  5. Defición de una función callback que permite salvar los pesos del modelo total cada vez que en el entrenamiento se tienen una mejora en la pérdida.

  6. Recuparer un modelo a partir de su creación y cargar los pesos previamente.

  7. Analizar el comportamiento de la VAE.

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

import os
from glob import glob
import numpy as np

gpu_available = tf.config.list_physical_devices('GPU')
print(gpu_available)
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
section    = 'vae'
run_id     = '0001'
data_name  = 'faces'
RUN_FOLDER = '../run/{}/'.format(section)
RUN_FOLDER += '_'.join([run_id, data_name])

if not os.path.exists(RUN_FOLDER):
    os.mkdir(RUN_FOLDER)
    os.mkdir(os.path.join(RUN_FOLDER, 'viz'))
    os.mkdir(os.path.join(RUN_FOLDER, 'images'))
    os.mkdir(os.path.join(RUN_FOLDER, 'weights'))

mode =  'build' #'load' #

CelebA Dataset

La base de datos (BD) CelebA fue recolectada por Lui et al. (2015) y consiste de mas de 200 mil imágenes de rostros de personas famosas (celebridades) a las cuales se le asocia un vector de 40 atributos binarios (sexo,pelo castaño, bigote, variación de pose, etc.) que presentan una gran variación de la pose y del fondo. Sin embargo dicha base de datos presenta un gran sesgo sobrerepresentando gente jóven, causásica y atractiva. Otras razas como los latinos estan sub-representadas, por ello han aparecido nuevas BD que tratan de mitigar este problema, como FairFace por Kärkkäinen and J. Joo (2021). A pesar del sesgo implícito en CelebA, ésta es una fuente inicial para probar algoritmos, sin embargo tengamos siempre en cuenta que lelvar un aloritmo entrabnado con una particular base de datos a producción conlleva un etapa de validación con datos del dominio real.

CelabA ha sido apmpliamente utilizada para desarrollar algoritmos de reconocimiento de atributos faciales, detección de rostros, localización de puntos de referencia (o partes faciales) y edición y síntesis de rostros. La BD contiene

(Lui et al., 2015) Z. Lui et al. Deep Learning Face Attributes in the Wild, Proc ICCV 2015.

(Kärkkäinen and Joo, 2021) K. Kärkkäinen and J. Joo, “FairFace: Face Attribute Dataset for Balanced Race, Gender, and Age for Bias Measurement and Mitigation,” 2021 IEEE Winter Conference on Applications of Computer Vision (WACV), 2021, pp. 1547-1557, doi: 10.1109/WACV48630.2021.00159.

Hemos descargado CelebA a nuestra estación de trabajo en el directorio /home/mariano/Data/celebA/imgs_align/img_align_celeba_png. Las imágenes de interés son las que corresponden a las imágenes aproximadamente alineadas respecto a los ojos de los rostros.

A continuación definimos variables globales para nuetro ejemplo.

# run params
DATA_FOLDER   = '/home/mariano/Data/celebA/imgs_align/img_align_celeba_png'
INPUT_DIM     = (128,128,3)
LATENT_DIM    = 150
BATCH_SIZE    = 384
R_LOSS_FACTOR = 100000  # 10000
EPOCHS        = 400
INITIAL_EPOCH = 0

filenames  = np.array(glob(os.path.join(DATA_FOLDER, '*.png')))
n_images        = filenames.shape[0]
steps_per_epoch = n_images//BATCH_SIZE

print('num image files : ', n_images)
print('steps per epoch : ', steps_per_epoch )
num image files :  202599
steps per epoch :  527

Las imágenes las cargaremos en memoria, ajustando su tamaño a (128,128,3) y su rango dinámico (canales) a [0,1][0,1].

En Tensorflow 1.x era muy popular usar data_generators. Éstos nos permitían conectar un lector iterativo a las imágenes y preprocesarlas al momento de su lectura. Ahora,
en Tensorflow 2.x los generadres han sido reemplazados por tensorflow.datasets que permiten tener una funcionalidad similar a los generadores pero son más eficienes en cuanto a la parelización de tareas: es posible leer las imágenes y preprocesarlas en los distintos núcleos (cores) del CPU mientras el GPU procesa el lote previo de datos y realiza el paso de entrenamiento. Como planeamos entrenar por varias épocas, es necesario indicar que una vez que hayamos leido suficientes lotes para agotar la BD, la lectura deberá repetirse desde el inicio, para ello usamos el método repeat().

Con la variable tf.data.AUTOTUNE dejaremos la responsabilidad de lanzar tantas tareas paralelas al Tensorflow en la parte de carga y proprocesamiento del lote de datos. Este es un avance significativo en la mejora de la eficiencia del entrenamiento, pues Python no está diseñado para lanzar tareas concurrentes. El indicando con el método prefetch nos permite usar los núcleos del CPU para leer y cargar los datos (lotes o batches) a la vez que el GPU esta realizando cálculos. Es recomendable realizar la carga de datos, pues ajustará las dimensiones de ls imágenes, antes de aplicar el preprocesamiento de cada dato en el lote mediante el map de una función lambda. Este map es un método de Datasets que también se puede realizarse en paralelo.

En nuestro caso, obtuvimos una mejora de 10x al pasar de generators a datasets. Y no solo eso, nuestra pretención de usar hilos (threads) en el proceso de entrenamiento mediante fit_generator llegó a congelarse.

AUTOTUNE = tf.data.AUTOTUNE
dataset=tf.keras.utils.image_dataset_from_directory(directory  = DATA_FOLDER, 
                                                    labels     = None, 
                                                    batch_size = BATCH_SIZE, 
                                                    image_size = INPUT_DIM[:2],
                                                    shuffle    = True,).repeat()

dataset = dataset.prefetch(buffer_size=AUTOTUNE)

normalization_layer = tf.keras.layers.experimental.preprocessing.Rescaling(1./255)
dataset = dataset.map(lambda x: (normalization_layer(x)), num_parallel_calls=AUTOTUNE)

Found 202599 files belonging to 1 classes.

A continuación mostramos como leer un lote con la librería dataset. Note que se usa el método take y se indica que lote que se carga. El lote es un tensor de dimensión (batch_size, heigth, width, n_channels). Para ilustrar los datos del lote cargado desplegamos las primeras las imágenes, previamente transformadas a formato numpy para poder emplear matplotlib.pyplot.imshow.

import matplotlib.pyplot as plt

plt.figure(figsize=(12,6), tight_layout=True)

for images in dataset.take(1):
    for i in range(18):
        ax = plt.subplot(3, 6, i + 1)
        plt.imshow(images[i].numpy())
        plt.axis('off')

plt.show()
        
images.shape 

png

TensorShape([384, 128, 128, 3])

A continuación definimos los modelos que a su vez componen el modelo VAE.

Partimos de un autocodificador simple como el mostrado en la siguiente figura.

autoencoder

Podemos ver que consta de dos bloques, el codificador que matemáticamente se puede representar como:

(1)
y=C(xr;θc) y = C(x_r; \theta_c)
donde xrRmx_r \in \mathbb{R}^m y yRny \in \mathbb{R}^n. Lo importante aqui es que n>mn>m, es decir el vector yy codifica y comprime la información del vector xx. Luedo de decodificador toma el vector codificado yy y recupera (o al menos aproxima) al vector xrx_{r}, denotamos por xpx_{p} la reconstrucción. El decodificador lo denotamos por

(2)
xp=D(y;θd) x_p = D(y;\theta_d)

Los parámetros θ=[θc,θd]\theta=[\theta_c,\theta_d] del autocodificador se calculan resolviendo

(3)
θ=argminθ  L(xr,xp) \theta^\ast = \underset{\theta}{\arg\min} \; L(x_r, x_p)
donde LL es una métrica del error de reconstrucción.

A continuación mostramos las implementaciónes de los modelos Encoder y Decoder que conforman un autoencoder. Las implementaciones consideran que los datos son las imágenes de la BD CelebA, el vector codificado es de dimensión LATENT_DIM.

from tensorflow import keras
import tensorflow.keras.layers as layers
from tensorflow.keras.layers import Dense, Conv2D, Conv2DTranspose
from tensorflow.keras.layers import Flatten, Reshape, Dropout, BatchNormalization, Activation, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential

Encoder

class Encoder(keras.Model):
    def __init__(self, input_dim, output_dim, encoder_conv_filters, encoder_conv_kernel_size, encoder_conv_strides, use_batch_norm=True, use_dropout=True, **kwargs):
        '''
        '''
        super(Encoder, self).__init__(**kwargs)

        self.input_dim                 = input_dim
        self.output_dim                = output_dim
        self.encoder_conv_filters      = encoder_conv_filters
        self.encoder_conv_kernel_size  = encoder_conv_kernel_size
        self.encoder_conv_strides      = encoder_conv_strides
        self.n_layers_encoder          = len(self.encoder_conv_filters)
        self.use_batch_norm            = use_batch_norm
        self.use_dropout               = use_dropout

        self.model = self.encoder_model()
        self.built = True
             
    def get_config(self):
        config = super(Encoder, self).get_config()
        config.update({"units": self.units})
        return config
    
    def encoder_model(self):
        '''
        '''
        encoder_input = layers.Input(shape=self.input_dim, name='encoder' )
        x = encoder_input
        
        for i in range(self.n_layers_encoder):            
            x = Conv2D(filters     = self.encoder_conv_filters[i],
                       kernel_size = self.encoder_conv_kernel_size[i],
                       strides     = self.encoder_conv_strides[i],
                       padding     = 'same',
                       name        = 'encoder_conv_' + str(i),)(x)
            if self.use_batch_norm: 
                x = BatchNormalization()(x)
            x = LeakyReLU()(x)
            if self.use_dropout:    
                x = Dropout(rate = 0.25)(x)
                
        self.last_conv_size = x.shape[1:]
        x = Flatten()(x)
        encoder_output = Dense(self.output_dim)(x)
        model = keras.Model(encoder_input, encoder_output)
        return model
        
        
    def call(self, inputs):
        '''
        '''
        return self.model(inputs) 

Decoder

class Decoder(keras.Model):
    def __init__(self, input_dim, input_conv_dim, 
                 decoder_conv_t_filters, decoder_conv_t_kernel_size, decoder_conv_t_strides, 
                 use_batch_norm=True, use_dropout=True, **kwargs):

        '''
        '''
        super(Decoder, self).__init__(**kwargs)

        self.input_dim                 = input_dim
        self.input_conv_dim            = input_conv_dim
                
        self.decoder_conv_t_filters    = decoder_conv_t_filters 
        self.decoder_conv_t_kernel_size= decoder_conv_t_kernel_size
        self.decoder_conv_t_strides    = decoder_conv_t_strides
        self.n_layers_decoder          = len(self.decoder_conv_t_filters)
        
        self.use_batch_norm            = use_batch_norm
        self.use_dropout               = use_dropout

        self.model = self.decoder_model()
        self.built = True
             
    def get_config(self):
        config = super(Decoder, self).get_config()
        config.update({"units": self.units})
        return config
    
    def decoder_model(self):
        '''
        '''    
        decoder_input = layers.Input(shape=self.input_dim, name='decoder' )
        x = Dense(np.prod(self.input_conv_dim))(decoder_input)
        x = Reshape(self.input_conv_dim)(x)

        for i in range(self.n_layers_decoder):
            x = Conv2DTranspose(filters     = self.decoder_conv_t_filters[i],
                                kernel_size = self.decoder_conv_t_kernel_size[i],
                                strides     = self.decoder_conv_t_strides[i],
                                padding     = 'same',
                                name        = 'decoder_conv_t_' + str(i))(x)
            if i < self.n_layers_decoder - 1:

                if self.use_batch_norm:   
                    x = BatchNormalization()(x)
                x = LeakyReLU()(x)
                if self.use_dropout:      
                    x = Dropout(rate = 0.25)(x)
            else:
                x = Activation('sigmoid')(x)
        decoder_output = x
        model = keras.Model(decoder_input, decoder_output)
        return model
        
    def call(self, inputs):
        '''
        '''
        return self.model(inputs) 

Generador de datos a partir de un Autocodificador

El propósito de tenemos es estimar la distribución P(X)P(X) de los datos para luego poderla muestrear, es decir generar muestras xP(X)x \sim P(X) tal que se cumpla:

  1. Las imágenes generadas sean similares a los de la BD de entrenamiento, rostros en CelebA.

  2. El generador es paramétrizado por un vector de variables latentes (de menor dimensión que las imágenes) las cuelas tienen una distribución conocida y simple de muestrear.

  3. El generador es capaz de generar datos que, aunque similares (rostros), no esten presentes en la BD de entrenamiento; no es una simple memoria.

El primer punto lo podriamos satisfacer con un autocodificador simple: asuminedo al vector yy de la figura anterior el vector de espacio latentes. Sin embargo, para el punto dos no es claro que distribución P(Y)P(Y) tiene yy. Por lo tanto, no podemos obtener nuevas muestras del espacio latente yP(Y)y^\prime \sim P(Y) para generar nuevos datos. es decir no podemos hacer

(4)
yP(Y) y^\prime \sim P(Y)
para luego generar el nuevo dato con

(5)
x=D(y) x^\prime = D(y^\prime)

El punto tres es aun mas complejo, ¿que pasa si utiliza una yy^\prime que caen entre las YY de la BD? ¿Estará en ese punto definida la P(y)P(y^\prime)? ¿correponderá a algo como un rostro? Es decir, ¿Tendrá suficinte poder de generalización el podrá para procesar variables latentes no vistas dureate el entrenamiento?

Los puntos dos y tres están muy relacionados, si las variables latentes tienen una distribución simple monomodal será fácil de muestrear. Además debemos imponer regularidad en P(Y)P(Y): imágenes similares provienen de variables latentes similares.

Aquí es donde entran en acción los autoencodificadores variacionales.

Primero, lo que haremos es dejar a yy con su distribución extraña que tiene y a partir de ella obtener una muestrade variable latente zz que se distribuye en forma simple: zN(0,I)z \sim \mathcal{N}(0,I). Es tienen una distribución Gaussiana multivariada con media cero y varianza igual a uno en cada variable. Para desarrollar esta parte hagamos la siguiente suposición, teemos varias muestras tomadas de una misma persona, tal vez de una pequeña secuencia de un video, luego dichas muestras varian ligeramente entre ellas. Entonces es de esperarse que también sus variables latentes sean similares y caigan alrededor de una media (para la persona específica) y su variabilidad esté determinada por una varianza (igualmente para dicha persona). Ahora, si tenemos varios videos de rostros para distintas personas, cada uno de ellos tendra su propia media y varianza. A lo que vamos es que dada rostro tienen en realidad una pequeña distribución, como una nube alrededor de un punto (su media individual). Si muestreamos dicha nube debemos tener imágenes del rostro de una misma persona con ligeras variaciones: como de pose de posición de las cejas, de los labios, etc. Con ello lograríamos imponer regularidad a nuestra distribución.

Como solo tenemos rostros aislados y no videos, lo que haremos es que cada rostro xix_i genera una codificación yiy_i en forma determinista, pero dicha codificación puede producir distintas variables latentes {zi}t\{z_i\}_t, con una distribución individual P(zi)P(z_i). Luego impondremos a que distribución de toda la población P(Z)=iP(zi)P(Z) = \prod_i P(z_i) sea la normal multivariada N(0,I))\mathcal{N}(0,I)). Con ello logramos que las P(zi)P(z_i) se concentren y no se dispercen dejando huecos y sea fácil de generar nuevas zz^\prime que produzcan restros realistas.

Queremos algo como lo siguiente

(6)
xCySzDx^ x \overset{C}{\mapsto} y \overset{S}{\leadsto} z \overset{D} {\mapsto}\hat x

Sin embargo el problma es que la función SS es estocástica y en principio no podriamos impelementar la retropropagación del gradiente desde el error (pérdida) hacia el codificador.

la forma de darle la vuelta a este problema es usar el llamado truco de la parametrización que consiste en estimar la media μi\mu_i y la varianza σi2\sigma^2_i de la distribución y luego calcular zi=μi+σϵz_i = \mu_i + \sigma \epsilon donde ϵ\epsilon se distribuye normal N(0,1)\mathcal{N}(0,1). Eso lo podemos ver como

(7)
xCyf(μ,σ2)gundefinedSzDx^ x \overset{C}{\mapsto} y \overbrace{ \overset{f}{\mapsto} (\mu , \sigma^2) \overset{g}{\mapsto} }^{S} z \overset{D}{\mapsto} \hat x

donde ff representa una transformación mediante capas densas y gg el truco de la parametrización:

(8)
zi=μi+σϵ      con      ϵN(0,1). z_i = \mu_i + \sigma \epsilon \;\;\;\text{con}\;\;\; \epsilon \sim \mathcal{N}(0,1).

Sampler

A continuación implementamos el modelo Sampler que realiza la tarea arriba descrita.

class Sampler(keras.Model):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
    def __init__(self, latent_dim, **kwargs):
        super(Sampler, self).__init__(**kwargs)
        self.latent_dim = latent_dim
        self.model      = self.sampler_model()
        self.built      = True

    def get_config(self):
        config = super(Sampler, self).get_config()
        config.update({"units": self.units})
        return config
    
    def sampler_model(self):
        '''
        input_dim is a vector in the latent (codified) space
        '''
        input_data = layers.Input(shape=self.latent_dim)
        z_mean     = Dense(self.latent_dim, name="z_mean")(input_data)
        z_log_var  = Dense(self.latent_dim, name="z_log_var")(input_data)

        self.batch = tf.shape(z_mean)[0] 
        self.dim   = tf.shape(z_mean)[1]

        epsilon = tf.keras.backend.random_normal(shape=(self.batch, self.dim))
        z = z_mean + tf.exp(0.5 * z_log_var) * epsilon
        model = keras.Model(input_data, [z, z_mean, z_log_var])
        return model
        
    def call(self, inputs):
        '''
        '''
        return self.model(inputs)
    

Noten que el componente estocástico (generación de ϵ\epsilon) esta fuera del la ruta de cálculo desde los datos hasta el error y que dicho componente estocástico no tienen parámetros que requieran ser entrenados.

La siguiente figura ilustra al autoencodificador variacional.

vae

VAE

Ahora implementamos el modelo del VAE. Es importante que la función de costo incluya los siguientes elementos:

  1. Error de reconstucción. Medir la discrepancia entre la imágen de entrada xix_i y la imagen reconstruida x^i\hat x_i.

  2. Desviación de distribución de las variables latentes P(Z)P(Z) respecto ala normal multivariada N(0,I)\mathcal{N}(0,I)

Para el primer punto usaremos la el Error Absoluto Promedio (MAE) y para el segundo la divergencia de Kullback-Leibler entre distribuciones, que por ser Gaussianas no correlacionadas y una de ellas la normal N(0,I)\mathcal{N}(0,I) toma una forma muy simple. Usaremos el parámetro R_LOSS_FACTOR para ponderar cada uno de los errores.

Adicionalmente, hemos preferido en esta nota implementar el método train_step que corresponde a lo realizado dentro del lazo de entrenamiento. Con ello pretendemos mostrar como se implementa, se evalua una párdida (loss) particular y como se definen los valores a los que el proceso de entrenamiento da seguimiento: loss, loss_reconstruction y loss_kl.

Usamos también una callback que nos permite salvar los pesos del modelo cada vez que se logre una mejora en loss.

class VAE(keras.Model):
    def __init__(self, r_loss_factor=1, summary=False, **kwargs):
        super(VAE, self).__init__(**kwargs)

        self.r_loss_factor               = r_loss_factor

        # Architecture
        self.input_dim                 = INPUT_DIM
        self.latent_dim                = LATENT_DIM
        self.encoder_conv_filters      = [64,64,64,64]
        self.encoder_conv_kernel_size  = [3,3,3,3]
        self.encoder_conv_strides      = [2,2,2,2]
        self.n_layers_encoder          = len(self.encoder_conv_filters)

        self.decoder_conv_t_filters    = [64,64,64,3]
        self.decoder_conv_t_kernel_size= [3,3,3,3]
        self.decoder_conv_t_strides    = [2,2,2,2]
        self.n_layers_decoder          = len(self.decoder_conv_t_filters)

        self.use_batch_norm            = True
        self.use_dropout               = True
    
        self.total_loss_tracker          = tf.keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker             = tf.keras.metrics.Mean(name="kl_loss")
        self.mae                         = tf.keras.losses.MeanAbsoluteError()

        # Encoder
        self.encoder_model = Encoder(input_dim                = self.input_dim,
                                     output_dim               = self.latent_dim,
                                     encoder_conv_filters     = self.encoder_conv_filters, 
                                     encoder_conv_kernel_size = self.encoder_conv_kernel_size, 
                                     encoder_conv_strides     = self.encoder_conv_strides, 
                                     use_batch_norm           = self.use_batch_norm, 
                                     use_dropout              = self.use_dropout)
        self.encoder_conv_size = self.encoder_model.last_conv_size
        if summary: 
            self.encoder_model.summary()
        
        # Sampler
        self.sampler_model = Sampler(latent_dim  = self.latent_dim)     
        if summary: 
            self.sampler_model.summary()
        
        # Decoder
        self.decoder_model = Decoder(input_dim                 = self.latent_dim, 
                                     input_conv_dim            = self.encoder_conv_size,
                                     decoder_conv_t_filters    = self.decoder_conv_t_filters,
                                     decoder_conv_t_kernel_size= self.decoder_conv_t_kernel_size, 
                                     decoder_conv_t_strides    = self.decoder_conv_t_strides, 
                                     use_batch_norm            = self.use_batch_norm, 
                                     use_dropout               = self.use_dropout)
        if summary: self.decoder_model.summary()
        
        self.built = True

    @property
    def metrics(self):
        return [self.total_loss_tracker,
                self.reconstruction_loss_tracker,
                self.kl_loss_tracker,]

    @tf.function
    def train_step(self, data):
        '''
        '''
        with tf.GradientTape() as tape:
            
            # predict
            x                    = self.encoder_model(data)
            z, z_mean, z_log_var = self.sampler_model(x) 
            pred                 = self.decoder_model(z)

            # loss
            r_loss     = self.r_loss_factor * self.mae(data, pred)
            kl_loss    = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss    = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = r_loss + kl_loss
        
        # gradient
        grads = tape.gradient(total_loss, self.trainable_weights)
        # train step
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        
        # compute progress
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(r_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {"loss":                self.total_loss_tracker.result(),
                "reconstruction_loss": self.reconstruction_loss_tracker.result(),
                "kl_loss":             self.kl_loss_tracker.result(),}
    
    @tf.function
    def generate(self, z_sample):
        '''
        We use the sample of the N(0,I) directly as  
        input of the deterministic generator. 
        '''
        return self.decoder_model(z_sample)

    @tf.function
    def codify(self, images):
        '''
        For an input image we obtain its particular distribution:
        its mean, its variance (unvertaintly) and a sample z of such distribution.
        '''
        x = self.encoder_model.predict(images)
        z, z_mean, z_log_var= self.sampler_model(x)
        return z, z_mean, z_log_var
    
    # implement the call method
    @tf.function
    def call(self, inputs, training=False):
        '''
        '''
        tmp1, tmp2 = self.encoder_model.use_Dropout,self.decoder_model.use_Dropout
        if not training:
            self.encoder_model.use_Dropout, self.decoder_model.use_Dropout = False,False
            
        x                    = self.encoder_model(inputs)
        z, z_mean, z_log_var = self.sampler_model(x) 
        pred                 = self.decoder_model(z)
        
        self.encoder_model.use_Dropout, self.decoder_model.use_Dropout = tmp1, tmp2
        return pred

Train

Compile

vae = VAE(r_loss_factor=R_LOSS_FACTOR, summary=True)
Model: "encoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
model (Functional)           (None, 150)               728150    
=================================================================
Total params: 728,150
Trainable params: 727,638
Non-trainable params: 512
_________________________________________________________________
Model: "sampler"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
model_1 (Functional)         [(None, 150), (None, 150) 45300     
=================================================================
Total params: 45,300
Trainable params: 45,300
Non-trainable params: 0
_________________________________________________________________
Model: "decoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
model_2 (Functional)         (None, 128, 128, 3)       731779    
=================================================================
Total params: 731,779
Trainable params: 731,395
Non-trainable params: 384
_________________________________________________________________
vae.summary()
Model: "vae"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
encoder (Encoder)            multiple                  728150    
_________________________________________________________________
sampler (Sampler)            multiple                  45300     
_________________________________________________________________
decoder (Decoder)            multiple                  731779    
=================================================================
Total params: 1,505,235
Trainable params: 1,504,333
Non-trainable params: 902
_________________________________________________________________
vae.compile(optimizer=keras.optimizers.Adam())
from tensorflow.keras.callbacks import ModelCheckpoint
filepath = 'best_weight_model.h5'
checkpoint = ModelCheckpoint(filepath=filepath, 
                             monitor='loss',
                             verbose=1, 
                             save_best_only=True,
                             save_weights_only=True,
                             mode='min')
callbacks = [checkpoint]

Fit

Entrenamos con el procedimiento normal de keras: invocando al método fit. Sin embargo, internamente el lazo de entrenamiento estará llamando a nuestra implementación de train_step.

vae.fit(dataset,
        batch_size      = BATCH_SIZE,
        epochs          = EPOCHS,
        initial_epoch   = INITIAL_EPOCH,
        steps_per_epoch = steps_per_epoch,
        callbacks       = callbacks)
Epoch 1/400
527/527 [==============================] - 46s 86ms/step - loss: 7516.1080 - reconstruction_loss: 6913.9912 - kl_loss: 371.8324

Epoch 00001: loss improved from inf to 7285.82129, saving model to best_weight_model.h5
Epoch 2/400
527/527 [==============================] - 46s 87ms/step - loss: 6812.7973 - reconstruction_loss: 6329.5083 - kl_loss: 385.5406

Epoch 00002: loss improved from 7285.82129 to 6715.04883, saving model to best_weight_model.h5
Epoch 3/400
527/527 [==============================] - 46s 87ms/step - loss: 6504.1279 - reconstruction_loss: 6068.6631 - kl_loss: 390.3454

Epoch 00003: loss improved from 6715.04883 to 6459.00439, saving model to best_weight_model.h5
Epoch 4/400
527/527 [==============================] - 46s 87ms/step - loss: 6331.8780 - reconstruction_loss: 5902.8765 - kl_loss: 392.8752

. . .

Epoch 00398: loss improved from 5172.32568 to 5171.25830, saving model to best_weight_model.h5
Epoch 399/400
527/527 [==============================] - 47s 89ms/step - loss: 5171.2232 - reconstruction_loss: 4809.0151 - kl_loss: 363.3829

Epoch 00399: loss did not improve from 5171.25830
Epoch 400/400
527/527 [==============================] - 47s 89ms/step - loss: 5171.5463 - reconstruction_loss: 4809.2295 - kl_loss: 363.3069

Epoch 00400: loss did not improve from 5171.25830

Muy bien terminamos el entrenamiento y salvamos los pesos de nuestro modelo, los que resultaron de la última época de entrenamiento.

vae.save_weights("model_vae_faces_1e4.h5")

Sin embargo, sabemos que los mejores pesos no necesariamente correpondel a los calculados en la última época, como es el caso; sino a los que almacen nuestra callback en el archivo "best_weight_model.h5".

Pero son solo los pesos, no el modelo completo. Para recuperar nuestro modelo debemos construirlo primero, lo llamamos vae2 para distinguirlo del que usamos en entrenamiento. Una vez construido el modelo, cargamos en él los pesos salvados.

vae2 = VAE(r_loss_factor=R_LOSS_FACTOR)
vae2.load_weights("best_weight_model.h5")

Resultados

Como primer ejercicio generamos una variable en el espacio latente:

(9)
zN(0,I) z \sim \mathcal{N}(0,I)

Luego seleccionamos dos entradas de dicho vector: ziz_i y zjz_j. Entonces dejando fijas todas las entradas de zz exepto ziz_i y zjz_j exploramos las imagenes que se generas variando dichas entradas.

Notamos que una variable (eje horizontal) parece que hace variar el rostro de tener características que van de masculinas a femeninas. La otra variable (eje vertical) parece asociarse con la sonrisa (alegría o tristeza del rostro).

Exploramos otras combinaciones de variables y vimos que algunas estan encontrar que estan asociadas con la orientacion del rostro, el grueso de las cejas, usar o no lentes, etc. Sin embargo notamos que lo que para un rostro es válido para otro no: la misma variable puedede star asociada a distintas características.

Podemos encontramos cierta analogía entre las variables latente y los cromosomas: así como gen puede asociarse a distintos fenotipos, o varios genes controlar un fenotipo; igual pasa con las variables latentes en nuestro generador. En nuestra analogia, el valor de una variable latente correponde a la expresión (real) de un gen (binario). Además la codificación “genética” en las variable latente cambia de entrenamiento a entrenamiento. Por lo que repetimos el entrenamiento, seguramente la figura generada abajo tenga distinta interpretación.

Notemos como el generador, aunque genera rostros pausibles, tienen dificultades para representar un fondo razonable, ello debido a la gran variabilidad de fondos en la BD CelebA.

import matplotlib.pyplot as plt

def plot_latent_space(vae, input_size=(28,28,1), n=30, figsize=15,  scale=1., latents_start=[0,1]):
    # display a n*n 2D manifold of digits
    canvas = np.zeros((input_size[0]*n, input_size[1]*n, input_size[2]))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    z_sample = np.random.normal(0,1,(1,vae.latent_dim))
    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample[0][latents_start[0]], z_sample[0][latents_start[1]]=xi,yi
            x_decoded = vae.generate(z_sample)
            img = x_decoded[0].numpy().reshape(input_size)
            canvas[i*input_size[0] : (i + 1)*input_size[0],
                   j*input_size[1] : (j + 1)*input_size[1],
                   : ] = img

    plt.figure(figsize=(figsize, figsize))
    start_range    = input_size[0] // 2
    end_range      = n*input_size[0] + start_range
    pixel_range    = np.arange(start_range, end_range, input_size[0])
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[{}]".format(latents_start[0]))
    plt.ylabel("z[{}]".format(latents_start[1]))
    plt.imshow(canvas, cmap="Greys_r")
    plt.show()


plot_latent_space(vae2, input_size=INPUT_DIM, n = 6, latents_start=[20,30], scale=3)

png

Ahora leemos un lote de imágenes de la BD. Las pasamos por el Codificador y Muestrador.

images = dataset.take(4)
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6), tight_layout=True)

for images in dataset.take(1):
    for i in range(18):
        ax = plt.subplot(3, 6, i + 1)
        plt.imshow(images[i].numpy())
        plt.axis('off')

plt.show()
        
images.shape 

png

TensorShape([384, 128, 128, 3])
x = vae2.encoder_model.predict(images)
z, z_mean, z_log_var= vae2.sampler_model(x)

Ya tenemos para cada imágen xix_i el vector codificado (zi,μi,σi2)(z_i, \mu_i, \sigma^2_i). Ahora, seleccionamos dos imágenes al azar, digamos la xix_i y la xjx_j. Entonces usamos su codificación: μi\mu_i y μj\mu_j y calculamos la combinación convexa de dichos vectores:

(11)
μα=(1α)μi+αμj      para      α[0,1] \mu_{\alpha}= (1-\alpha) \mu_i + \alpha\mu_j \;\;\; \text{para} \;\;\; \alpha \in [0,1]

y generamos cada rostro de la combinación (de hecho solo unos pasos).

(12)
x^α=D(μα) \hat x_{\alpha} = D(\mu_{\alpha})

y los mostramos las reconstrucciones. Los pasos intermedios muestran una transformación paulatina desde el rostro xix_i al rostro xjx_j. Note que los pasos intermedios preservan la cualidad de aparentar ser un rostro y no son simple superposiciones de imágenes.

def plot_warping(z1, z2, n=(1,5)):
    
    n_trans = np.prod(n)

    f, axarr = plt.subplots(n[0],n[1], figsize=(10,6),  tight_layout=True)
    for i in range(n[0]):
        for j in range(n[1]):
            alpha = (i*n[1]+j)/(n_trans-1)
            z_new = (1-alpha)*z1 + alpha*z2
            z_new=tf.expand_dims(z_new, axis=0)
            x_decoded = vae.generate(z_new)
            img = x_decoded[0].numpy()
            axarr[i,j].imshow(img)
            axarr[i,j].set_title("{:0.3}".format(alpha))
            axarr[i,j].axis('off')
    
    plt.show()

plot_warping(z1=z_mean[46], z2=z_mean[60], n=(3,5))

png

Nos resta revisar que tan Gaussianas es distribución P(Z)P(Z). Elegimos para un lote de imágenes elegimos dosentradas de los vectores devariables latentes y desplegamos los puntos. Notamos que se concentran al rededor de (0,0)(0,0), no parecen estar correlacionadas y parace monomodal.

for i in range(5):
    images = dataset.take(1)
    x = vae2.encoder_model.predict(images)
    _, z_mean,_= vae2.sampler_model(x)
    if i:
        Z = np.concatenate((Z, z_mean), axis=0)
    else:
        Z = z_mean
        
Z = np.array(Z)
Z.shape        
(1920, 150)
def plot_label_clusters(xdata, ydata, limits=10):
    # display a 2D plot of the digit classes in the latent space
    plt.figure(figsize=(8,7))
    plt.scatter(xdata,ydata, s=(2,),c='r')
    plt.colorbar()
    plt.xlabel("z[11]")
    plt.ylabel("z[12]")
    plt.xlim([-limits,limits])
    plt.ylim([-limits,limits])
    plt.show()


xdata = Z[:,0]
ydata = Z[:,3]
plot_label_clusters(xdata=xdata, ydata=ydata, limits=5)

png

Obtenemos los histogramas para las primeras NN entradas de las variabes latentes y se observamos cierta “Gaussianidad”; en algunas mas que otras. Aunque hemos preferido previlegiar buenas reconstrucciones que penalizar desviaciones de la Gaussiana, los histogramas lucen muy razonables.

nrows, ncols = 4,6
fig, axs = plt.subplots(nrows,ncols, figsize=(12,8), tight_layout=True)
n_bins = 20

for i in range(nrows):
    for j in range(ncols):
        idx = i*ncols+j
        vals = z[:,idx].numpy() 
        axs[i,j].hist(vals, bins=n_bins)
        axs[i,j].set_title("z[{}]".format(idx))

fig.tight_layout()
plt.show()

png