詳解如何使用Keras實現Wassertein GAN
在閱讀論文 Wassertein GAN 時,作者發現理解它最好的辦法就是用代碼來實現其內容。于是在本文中,作者將用自己的在 Keras 上的代碼來向大家簡要介紹一下WGAN。
何為 GAN?
GAN,亦稱為生成對抗網絡(Generative Adversarial Network),它是生成模型中的一類——即一種能夠通過觀察來自特定分布的訓練數據,進而嘗試對這個分布進行預測的模型。這個模型新獲取的樣本「看起來」會和最初的訓練樣本類似。有些生成模型只會去學習訓練數據分布的參數,有一些模型則只能從訓練數據分布中提取樣本,而有一些則可以二者兼顧。
目前,已經存在了很多種類的生成模型:全可見信念網絡(Fully Visible Belief Network)、變分自編碼器(Variational Autoencoder)、玻爾茲曼機(Boltzmann Machine),生成隨機網絡(Generative Stochastic Network),像素循環神經網絡(PixelRNN)等等。以上的模型都因其所表征或接近的訓練數據密度而有所區別。一些模型會去精細的表征訓練數據,另一些則會以某種方式去和訓練數據進行互動——比如說生成模型。GAN 就是這里所說的后者。大部分生成模型的學習原則都可被概括為「最大化相似度預測」——即讓模型的參數能夠盡可能地與訓練數據相似。
GAN 的工作方式可以看成一個由兩部分構成的游戲:生成器(Generator/G)和判別器(Discriminator/D)(一般而言,這兩者都由神經網絡構成)。生成器隨機將一個噪聲作為自己的輸入,然后嘗試去生成一個樣本,目的是讓判別器無法判斷這個樣本是來自訓練數據還是來自生成器的。在判別器這里,我們讓它以監督學習方式來工作,具體而言就是讓它觀察真實樣本和生成器生成的樣本,并且同時用標簽告訴它這些樣本分別來自哪里。在某種意義上,判別器可以代替固定的損失函數,并且嘗試學習與訓練數據分布相關的模式。
何為 Wasserstein GAN?
就其本質而言,任何生成模型的目標都是讓模型(習得地)的分布與真實數據之間的差異達到最小。然而,傳統 GAN 中的判別器 D 并不會當模型與真實的分布重疊度不夠時去提供足夠的信息來估計這個差異度——這導致生成器得不到一個強有力的反饋信息(特別是在訓練之初),此外生成器的穩定性也普遍不足。
Wasserstein GAN 在原來的基礎之上添加了一些新的方法,讓判別器 D 去擬合模型與真實分布之間的 Wasserstein 距離。Wassersterin 距離會大致估計出「調整一個分布去匹配另一個分布還需要多少工作」。此外,其定義的方式十分值得注意,它甚至可以適用于非重疊的分布。
為了讓判別器 D 可以有效地擬合 Wasserstein 距離:
- 其權重必須在緊致空間(compact space)之內。為了達到這個目的,其權重需要在每步訓練之后,被調整到-0.01 到+0.01 的閉區間上。然而,論文作者承認,雖然這對于裁剪間距的選擇并不是理想且高敏感的(highly sensitive),但是它在實踐中卻是有效的。更多信息可參見論文 6 到 7 頁。
- 由于判別器被訓練到了更好的狀態上,所以它可以為生成器提供一個有用的梯度。
- 判別器頂層需要有線性激活。
- 它需要一個本質上不會修改判別器輸出的價值函數。
- K.mean(y_true * y_pred)
以 keras 這段損失函數為例:
- 這里采用 mean 來適應不同的批大小以及乘積。
- 預測的值通過乘上 element(可使用的真值)來最大化輸出結果(優化器通常會將損失函數的值最小化)。
論文作者表示,與 vanlillaGAN 相比,WGAN 有一下優點:
- 有意義的損失指標。判別器 D 的損失可以與生成樣本(這些樣本使得可以更少地監控訓練過程)的質量很好地關聯起來。
- 穩定性得到改進。當判別器 D 的訓練達到了最佳,它便可以為生成器 G 的訓練提供一個有用的損失。這意味著,對判別器 D 和生成器 G 的訓練不必在樣本數量上保持平衡(相反,在 Vanilla GAN 方法中而這是平衡的)。此外,作者也表示,在實驗中,他們的 WGAN 模型沒有發生過一次崩潰的情況。
開始編程!
我們會在 Keras 上實現 ACGAN 的 Wasserstein variety。在 ACGAN 這種生成對抗網絡中,其判別器 D 不僅可以預測樣本的真實與否,同時還可以將其進行歸類。
下方代碼附有部分解釋。
[1] 導入庫文件:
- import os
- import matplotlib.pyplot as plt
- %matplotlib inline
- %config InlineBackend.figure_format = 'retina' # enable hi-res output
- import numpy as np
- import tensorflow as tf
- import keras.backend as K
- from keras.datasets import mnist
- from keras.layers import *
- from keras.models import *
- from keras.optimizers import *
- from keras.initializers import *
- from keras.callbacks import *
- from keras.utils.generic_utils import Progbar
[2].Runtime 配置
- # random seed
- RND = 777
- # output settings
- RUN = 'B'
- OUT_DIR = 'out/' + RUN
- TENSORBOARD_DIR = '/tensorboard/wgans/' + RUN
- SAVE_SAMPLE_IMAGES = False
- # GPU # to run on
- GPU = "0"
- BATCH_SIZE = 100
- ITERATIONS = 20000
- # size of the random vector used to initialize G
- Z_SIZE = 100
[3]生成器 G 每進行一次迭代,判別器 D 都需要進行 D_ITERS 次迭代。
- 由于在 WGAN 中讓判別器質量能夠優化這件事更加重要,所以判別器 D 與生成器 G 在訓練次數上呈非對稱比例。
- 在論文的 v2 版本中,判別器 D 在生成器 G 每 1000 次迭代的前 25 次都會訓練 100 次,此外,判別器也會當生成器每進行了 500 次迭代以后訓練 100 次。
- D_ITERS = 5
[4]其它準備:
- # create output dirif not os.path.isdir(OUT_DIR): os.makedirs(OUT_DIR)
- # make only specific GPU to be utilized
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"os.environ["CUDA_VISIBLE_DEVICES"] = GPU
- # seed random generator for repeatability
- np.random.seed(RND)
- # force Keras to use last dimension for image channels
- K.set_image_dim_ordering('tf')
[5]判別器的損失函數:
- 由于判別器 D 一方面力圖在訓練數據分布與生成器 G 生成的數據之間習得一個 Wasserstein 距離的擬合,另一方面判別器 D 又是線性激活的,所以這里我們不需要去修改它的輸出結果。
- 由于已經使用了損失函數 Mean,所以我們可以在不同的批大小之間比較輸出結果。
- 預測結果等于真值(true value)與元素的點乘(element-wise multiplication),為了讓判別器 D 的輸出能夠最大化(通常,優化器都力圖去讓損失函數的值達到最小),真值需要取-1。
- def d_loss(y_true, y_pred): return K.mean(y_true * y_pred)
[6].創建判別器 D
判別器將圖像作為輸入,然后給出兩個輸出:
- 用線性激活來評價生成圖像的「虛假度」(最大化以用于生成圖像)。
- 用 softmax 激活來對圖像種類進行預測。
- 由于權重是從標準差為 0.02 的正態分布中初始化出來的,所以最初的剪裁不會去掉所有的權重。
- def create_D():
- # weights are initlaized from normal distribution with below params
- weight_init = RandomNormal(mean=0., stddev=0.02)
- input_image = Input(shape=(28, 28, 1), name='input_image')
- x = Conv2D( 32, (3, 3),
- padding='same',
- name='conv_1',
- kernel_initializer=weight_init)(input_image)
- x = LeakyReLU()(x)
- x = MaxPool2D(pool_size=2)(x)
- x = Dropout(0.3)(x)
- x = Conv2D( 64, (3, 3),
- padding='same',
- name='conv_2',
- kernel_initializer=weight_init)(x)
- x = MaxPool2D(pool_size=1)(x)
- x = LeakyReLU()(x)
- x = Dropout(0.3)(x)
- x = Conv2D( 128, (3, 3),
- padding='same',
- name='conv_3',
- kernel_initializer=weight_init)(x)
- x = MaxPool2D(pool_size=2)(x)
- x = LeakyReLU()(x)
- x = Dropout(0.3)(x)
- x = Conv2D( 256, (3, 3),
- padding='same',
- name='coonv_4',
- kernel_initializer=weight_init)(x)
- x = MaxPool2D(pool_size=1)(x)
- x = LeakyReLU()(x)
- x = Dropout(0.3)(x)
- features = Flatten()(x)
- output_is_fake = Dense( 1, activation='linear', name='output_is_fake')(features)
- output_class = Dense( 10, activation='softmax', name='output_class')(features) return Model(
- inputs=[input_image], outputs=[output_is_fake,
[7].創建生成器
生成器有兩個輸入:
- 一個尺寸為Z_SIZE的潛在隨機變量。
- 我們希望生成的數字類型(integer o 到 9)。
為了加入這些輸入(input),integer 類型會在內部轉換成一個1 x DICT_LEN(在本例中DICT_LEN = 10)的稀疏向量,然后乘上嵌入的維度為 DICT_LEN x Z_SIZE的矩陣,結果得到一個維度為1 x Z_SIZE的密集向量。然后該向量乘上(點 乘)可能的輸入(input),經過多個上菜樣和卷積層,最后其維度就可以和訓練圖像的維度匹配了。
- def create_G(Z_SIZEZ_SIZE=Z_SIZE):
- DICT_LEN = 10
- EMBEDDING_LEN = Z_SIZE
- # weights are initialized from normal distribution with below params
- weight_init = RandomNormal(mean=0., stddev=0.02)
- # class# input_class = Input(shape=(1, ), dtype='int32', name='input_class')
- # encode class# to the same size as Z to use hadamard multiplication later on
- e = Embedding(
- DICT_LEN, EMBEDDING_LEN,
- embeddings_initializer='glorot_uniform')(input_class)
- embedded_class = Flatten(name='embedded_class')(e)
- # latent var
- input_z = Input(shape=(Z_SIZE, ), name='input_z')
- # hadamard product
- h = multiply([input_z, embedded_class], name='h')
- # cnn part
- x = Dense(1024)(h)
- x = LeakyReLU()(x)
- x = Dense(128 * 7 * 7)(x)
- x = LeakyReLU()(x)
- x = Reshape((7, 7, 128))(x)
- x = UpSampling2D(size=(2, 2))(x)
- x = Conv2D(256, (5, 5), padding='same', kernel_initializer=weight_init)(x)
- x = LeakyReLU()(x)
- x = UpSampling2D(size=(2, 2))(x)
- x = Conv2D(128, (5, 5), padding='same', kernel_initializer=weight_init)(x)
- x = LeakyReLU()(x)
- x = Conv2D( 1, (2, 2),
- padding='same',
- activation='tanh',
- name='output_generated_image',
- kernel_initializer=weight_init)(x) return Mode
[8].將判別器 D 和生成器 G 整合到一個模型中:
- D = create_D()
- D.compile(
- optimizer=RMSprop(lr=0.00005),
- loss=[d_loss, 'sparse_categorical_crossentropy'])
- input_z = Input(shape=(Z_SIZE, ), name='input_z_')
- input_class = Input(shape=(1, ),name='input_class_', dtype='int32')
- G = create_G()
- # create combined D(G) model
- output_is_fake, output_class = D(G(inputs=[input_z, input_class]))
- DG = Model(inputs=[input_z, input_class], outputs=[output_is_fake, output_class])
- DG.compile(
- optimizer=RMSprop(lr=0.00005),
- loss=[d_loss, 'sparse_categorical_crossentropy']
- )
[9].加載 MNIST 數據集:
- # load mnist data
- (X_train, y_train), (X_test, y_test) = mnist.load_data()
- # use all available 70k samples from both train and test sets
- X_train = np.concatenate((X_train, X_test))
- y_train = np.concatenate((y_train, y_test))
- # convert to -1..1 range, reshape to (sample_i, 28, 28, 1)
- X_train = (X_train.astype(np.float32) - 127.5) / 127.5X_train = np.expand_dims(X_train, axis=3)
[10].生成樣本以及將指標和圖像發送到 TensorBorad 的實用工具:
- # save 10x10 sample of generated images
- def generate_samples(n=0, save=True):
- zz = np.random.normal(0., 1., (100, Z_SIZE))
- generated_classes = np.array(list(range(0, 10)) * 10)
- generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])
- rr = [] for c in range(10):
- rr.append(
- np.concatenate(generated_images[c * 10:(1 + c) * 10]).reshape( 280, 28))
- img = np.hstack(rr) if save:
- plt.imsave(OUT_DIR + '/samples_%07d.png' % n, img, cmap=plt.cm.gray) return img
- # write tensorboard summaries
- sw = tf.summary.FileWriter(TENSORBOARD_DIR)
- def update_tb_summary(step, write_sample_images=True):
- s = tf.Summary()
- # losses as is for names, vals in zip((('D_real_is_fake', 'D_real_class'),
- ('D_fake_is_fake', 'D_fake_class'), ('DG_is_fake', 'DG_class')),
- (D_true_losses, D_fake_losses, DG_losses)):
- v = s.value.add()
- v.simple_value = vals[-1][1]
- v.tag = names[0]
- v = s.value.add()
- v.simple_value = vals[-1][2]
- v.tag = names[1]
- # D loss: -1*D_true_is_fake - D_fake_is_fake
- v = s.value.add()
- v.simple_value = -D_true_losses[-1][1] - D_fake_losses[-1][1]
- v.tag = 'D loss (-1*D_real_is_fake - D_fake_is_fake)'
- # generated image if write_sample_images:
- img = generate_samples(step, save=True)
- s.MergeFromString(tf.Session().run(
- tf.summary.image('samples_%07d' % step,
- img.reshape([1, *img.shape, 1]))))
- sw.add_summary(s, step)
- sw.flush()
[11].訓練
訓練過程包含了以下步驟:
- 解除對判別器 D 權重的控制,讓它們變得可學習。
- 調整判別器的權重(調整到-0.01 到+0.01 閉區間上)。
- 向判別器 D 提供真實的樣本,通過在損失函數中將其乘上-1 來盡可能最大化它的輸出,最小化它的值。
- 向判別器 D 提供假的樣本試圖最小化其輸出。
- 按照上文講述的判別器迭代訓練方法重復步驟 3 和 4。
- 固定判別器 D 的權重。
- 訓練一對判別器和生成器,盡力去最小化其輸出。由于這種手段優化了生成器 G 的權重,所以前面已經訓練好了的權重固定的判別器才會將生成的假樣本判斷為真圖像。
- progress_bar = Progbar(target=ITERATIONS)
- DG_losses = []
- D_true_losses = []
- D_fake_losses = []for it in range(ITERATIONS): if len(D_true_losses) > 0:
- progress_bar.update(
- it,
- values=[ # avg of 5 most recent
- ('D_real_is_fake', np.mean(D_true_losses[-5:], axis=0)[1]),
- ('D_real_class', np.mean(D_true_losses[-5:], axis=0)[2]),
- ('D_fake_is_fake', np.mean(D_fake_losses[-5:], axis=0)[1]),
- ('D_fake_class', np.mean(D_fake_losses[-5:], axis=0)[2]),
- ('D(G)_is_fake', np.mean(DG_losses[-5:],axis=0)[1]),
- ('D(G)_class', np.mean(DG_losses[-5:],axis=0)[2])
- ]
- )
- else:
- progress_bar.update(it)
- # 1: train D on real+generated images if (it % 1000) < 25 or it % 500 == 0: # 25 times in 1000, every 500th
- d_iters = 100
- else:
- d_iters = D_ITERS for d_it in range(d_iters):
- # unfreeze D
- D.trainable = True for l in D.layers: l.trainable = True
- # clip D weights for l in D.layers:
- weights = l.get_weights()
- weights = [np.clip(w, -0.01, 0.01) for w in weights]
- l.set_weights(weights)
- # 1.1: maximize D output on reals === minimize -1*(D(real))
- # draw random samples from real images
- index = np.random.choice(len(X_train), BATCH_SIZE, replace=False)
- real_images = X_train[index]
- real_images_classes = y_train[index]
- DD_loss = D.train_on_batch(real_images, [-np.ones(BATCH_SIZE),
- real_images_classes])
- D_true_losses.append(D_loss)
- # 1.2: minimize D output on fakes
- zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE))
- generated_classes = np.random.randint(0, 10, BATCH_SIZE)
- generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])
- DD_loss = D.train_on_batch(generated_images, [np.ones(BATCH_SIZE),
- generated_classes])
- D_fake_losses.append(D_loss)
- # 2: train D(G) (D is frozen)
- # minimize D output while supplying it with fakes,
- # telling it that they are reals (-1)
- # freeze D
- D.trainable = False for l in D.layers: l.trainable = False
- zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE))
- generated_classes = np.random.randint(0, 10, BATCH_SIZE)
- DGDG_loss = DG.train_on_batch(
- [zz, generated_classes.reshape((-1, 1))],
- [-np.ones(BATCH_SIZE), generated_classes])
- DG_losses.append(DG_loss) if it % 10 == 0:
- update_tb_summary(it, write_sample_images=(it
結論
視頻的每一秒都是 250 次訓練迭代。使用 Wasserstein GAN 的一個好處就是它有著損失與樣本質量之間的關系。
附論文地址:https://arxiv.org/pdf/1701.07875.pdf
參考文獻
1. Wasserstein GAN paper (https://arxiv.org/pdf/1701.07875.pdf) – Martin Arjovsky, Soumith Chintala, Léon Bottou
2. NIPS 2016 Tutorial: Generative Adversarial Networks (https://arxiv.org/pdf/1701.00160.pdf) – Ian Goodfellow
3. Original PyTorch code for the Wasserstein GAN paper (https://github.com/martinarjovsky/WassersteinGAN)
4. Conditional Image Synthesis with Auxiliary Classifier GANs (https://arxiv.org/pdf/1610.09585v3.pdf) – Augustus Odena, Christopher Olah, Jonathon Shlens
5. Keras ACGAN implementation (https://github.com/lukedeo/keras-acgan) – Luke de Oliveira
6. Code for the article (https://gist.github.com/myurasov/6ecf449b32eb263e7d9a7f6e9aed5dc2)
原文:https://myurasov.github.io/2017/09/24/wasserstein-gan-keras.html?r
【本文是51CTO專欄機構“機器之心”的原創譯文,微信公眾號“機器之心( id: almosthuman2014)”】