I have recently implemented the variational autoencoder proposed in Kingma and Welling (2014)1. This is a rather interesting unsupervised learning model. In contrast to most earlier work, Kingma and Welling (2014)1 optimize the variational lower bound directly using gradient ascent. What makes this possible is a trick they call the reparameterization trick. I am hoping to write another post later on what this trick is and how it helps in reducing the variance of the gradient estimates. Here I just wanted to share my implementation. This model is surprisingly easy to implement. Below is an implementation in theano with a replication of Figure4b from their paper.

I also implemented a sampling layer for lasagne which can be used to implement this model in lasagne. Code is here on github.

Fig4b from Kingma and Welling (2014)

"""
A Theano implementation of the variational autoencoder proposed in
    Kingma, D. P., & Welling, M. (2014). Auto-Encoding Variational Bayes.
    arXiv:1312.6114
28 May 2016
goker erdogan
https://github.com/gokererdogan
"""
import cPickle as pkl
import gzip

import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

from DeepLearning.common import load_mnist

# functions used for initializing weights and biases
def init_weight(*shape):
    return 0.01 * np.random.randn(*shape)


def init_bias(*shape):
    return 0.01 * np.random.randn(*shape)

if __name__ == "__main__":
    # BUILD THE MODEL

    # random number generator used for sampling latent variables
    srng = RandomStreams(seed=123)

    # model parameters
    minibatch_size = 100
    input_dim = 784
    # number of hidden units in encoder (x -> z) network
    encoder_hidden_dim = 500
    # number of hidden units in decoder (z -> x) network
    decoder_hidden_dim = 500
    # number of latent variables
    latent_dim = 2  # pairs of mu and sigma

    # input to the network
    x = T.fmatrix(name='x')

    # Encoder network hidden layer
    w1 = theano.shared(init_weight(input_dim, encoder_hidden_dim), name='w1')
    b1 = theano.shared(init_bias(encoder_hidden_dim), name='b1')
    h1 = T.tanh(T.dot(x, w1) + b1)

    # Encoder network outputs: means and standard deviations of the Gaussian distribution 
    # p(z|mu, sigma)
    w_mu = theano.shared(init_weight(encoder_hidden_dim, latent_dim), name='w_mu')
    b_mu = theano.shared(init_bias(latent_dim), name='b_mu')
    mu = T.dot(h1, w_mu) + b_mu

    w_sigma = theano.shared(init_weight(encoder_hidden_dim, latent_dim), name='w_sigma')
    b_sigma = theano.shared(init_bias(latent_dim), name='b_sigma')
    sigma = T.dot(h1, w_sigma) + b_sigma

    # Latent variables
    # Sample z from Normal(z|mu, sigma) using the reparameterization trick.
    eps = srng.normal((minibatch_size, latent_dim))  # draw random normal values
    z = mu + (eps * T.exp(sigma))  # calculate latent variables

    # Decoder network hidden layer
    w2 = theano.shared(init_weight(latent_dim, decoder_hidden_dim), name='w2')
    b2 = theano.shared(init_bias(decoder_hidden_dim), name='b2')
    h2 = T.tanh(T.dot(z, w2) + b2)

    # Decoder network output
    w3 = theano.shared(init_weight(decoder_hidden_dim, input_dim), name='w3')
    b3 = theano.shared(init_bias(input_dim), name='b3')
    y = T.nnet.sigmoid(T.dot(h2, w3) + b3)

    # LOSS FUNCTION
    # first KL term in Eqn. 10 in the paper. Acts as a regularization on latent variables.
    ll_bound_kl_term = 0.5 * (1. + T.sum(2. * sigma) -
                              T.sum(T.square(mu)) -
                              T.sum(T.exp(2. * sigma))) / minibatch_size

    # second term in Eqn. 10, measures data fit. 
    # Note because the outputs are 0-1, we use binary crossentropy.
    ll_bound_fit_term = T.sum(-1. * T.nnet.binary_crossentropy(y, x)) / minibatch_size

    # this is the log likelihood lower bound that we want to maximize
    ll_bound = ll_bound_kl_term + ll_bound_fit_term

    # L2 regularization
    # We add L2 regularization on model parameters. This can be achieved by adding 
    # the Frobenius norm of each model parameter to loss function.

    # L2 regularization weight
    lamda = 0.01
    # cost = -log ll bound + (lambda * L2 regularizer)
    cost = -ll_bound + lamda * (T.sum(w1**2) + T.sum(b1**2) + T.sum(w_mu**2) + 
                                T.sum(b_mu**2) + T.sum(w_sigma**2) +
                                T.sum(b_sigma**2) + T.sum(w2**2) + 
                                T.sum(b2**2) + T.sum(w3**2) + 
                                T.sum(b3**2))

    # theano function that calculates the gradient of the cost wrt to model parameters
    dw1, db1, dw_mu, db_mu, dw_sigma, db_sigma, dw2, db2, dw3, db3 = T.grad(cost, 
                                                                            [w1, b1, w_mu, 
                                                                             b_mu, w_sigma, 
                                                                             b_sigma, w2, 
                                                                             b2, w3, b3])
    # theano function for getting the output of the model for a given input
    predict = theano.function([x], y)

    # TRAINING

    learning_rate = 0.001  # my experience shows that larger learning rates lead to divergence

    # number of training epochs, i.e., passes over training set.
    # there are 50.000 samples in training set. 200 epochs means training over 
    # 10.000.000 samples. In the paper, they keep training for much longer (around 100.000.000)
    epoch_count = 500

    # report log_likelihood bound on training after this many training samples
    report_interval = 20000

    # load the training data
    (tx, ty), (vx, vy), (_, _) = load_mnist(path='../datasets')
    train_n = tx.shape[0]
    val_n = vx.shape[0]
    # we load the data into shared variables, this is recommended if you do training on GPU.
    train_x = theano.shared(np.asarray(tx, dtype=np.float32))
    val_x = theano.shared(np.asarray(vx, dtype=np.float32))

    # index of the first sample in batch
    batch_start = T.lscalar()

    # Theano function for training
    # This function feeds the current batch to model and applies 
    # simple gradient descent updates on model parameters
    train_model = theano.function([batch_start], ll_bound,
                                  updates=((w1, w1 - learning_rate * dw1),
                                           (b1, b1 - learning_rate * db1),
                                           (w_mu, w_mu - learning_rate * dw_mu),
                                           (b_mu, b_mu - learning_rate * db_mu),
                                           (w_sigma, w_sigma - learning_rate * dw_sigma),
                                           (b_sigma, b_sigma - learning_rate * db_sigma),
                                           (w2, w2 - learning_rate * dw2),
                                           (b2, b2 - learning_rate * db2),
                                           (w3, w3 - learning_rate * dw3),
                                           (b3, b3 - learning_rate * db3)),
                                  givens={x: train_x[batch_start:(batch_start+minibatch_size)]})

    # Theano function for calculating log ll bound on validation set
    validate_model = theano.function([batch_start], ll_bound,
                                     givens={x: val_x[batch_start:(batch_start+minibatch_size)]})

    # arrays for keeping track of log ll bounds on training and validation sets
    train_ll = []
    val_ll = []

    # TRAINING LOOP
    for e in range(epoch_count):
        ll = 0.0
        for i in range(0, train_n, minibatch_size):
            cll = train_model(i)  # one step of gradient descent
            ll += cll
            # report log ll bound
            if (i + minibatch_size) % report_interval == 0:
                ll = (ll * minibatch_size) / report_interval
                print "|Epoch {0:d}|\tTrain ll: {1:f}".format(e, ll)
                train_ll.append(ll)
                ll = 0.0

        # epoch over, shuffle training data
        train_x = theano.shared(np.asarray(tx[np.random.permutation(train_n)], dtype=np.float32))

        # calculate log ll on validation and report it
        ll = 0.0
        for i in range(0, val_n, minibatch_size):
            cll = validate_model(i)
            ll += cll

        ll = (ll * minibatch_size) / val_n
        print "|Epoch {0:d}|\tVal ll: {1:f}".format(e, ll)
        val_ll.append(ll)

    print("Training over.")

    if latent_dim == 2:
        print("Generating latent space figure")
        img = np.zeros((28*20, 28*20))

        # function for calculating x given z
        generate_fn = theano.function([z], y)

        # calculate min and max latent variable values over validation set
        calc_latent = theano.function([batch_start], z, 
                        givens={x: val_x[batch_start:(batch_start+minibatch_size)]})
        minz = np.inf
        maxz = -np.inf
        for i in range(0, val_n, minibatch_size):
            val_z = calc_latent(i)
            if np.min(val_z) < minz:
                minz = np.min(val_z)
            if np.max(val_z) > maxz:
                maxz = np.max(val_z)

        zvals = np.linspace(minz, maxz, 20)
        for i, z1 in enumerate(zvals):
            for j, z2 in enumerate(zvals):
                xp = generate_fn([np.asarray([z1, z2], dtype=np.float32)])
                img[(i*28):((i+1)*28), (j*28):((j+1)*28)] = xp.reshape((28, 28))

        """
        import matplotlib.pyplot as plt
        plt.style.use('classic')
        plt.imshow(img, cmap='gray')
        """

        from scipy import misc
        misc.imsave('Fig4b.png', img)

Bibliography

  1. Kingma, D. P., & Welling, M. (2014). Auto-Encoding Variational Bayes. arXiv:1312.6114  2