Evan 2019-12-03
在实际的深度学习应用中,一个常见的问题是,一些类在训练集中的实例数量明显高于其他类。这种类不平衡数据集在不同的领域(如健康、银行、安全等)中很常见。对于这样的机器学习数据集,学习算法往往偏向于多数类,因此少数类实例的误分类率较高。
为了解决这一问题,需要采取过采样、过采样、两阶段训练和成本敏感学习等不同的策略。为少数类生成人工数据的方法构成了更通用的方法。这篇文章是关于使用深度卷积生成对抗网络(DC-GAN)来减少机器学习数据集中的这种不平衡,以提高分类性能。
在本文中,我们将讨论以下主题:
开发用于生成图像的GAN需要使用判别器卷积神经网络模型来对给定图像是真实图像还是已生成的图像进行分类,并使用反卷积层将输入转换为像素值为二维的图像。
这些生成器和判别器模型在零和游戏中竞争。这意味着对一个模型的改进是以降低另一个模型的性能为代价的。结果是非常不稳定的训练过程,经常会导致失败。
一些技巧:
我们将使用DC-GAN为“Diabetic Retinopathy Detection(https://www.kaggle.com/c/diabetic-retinopathy-detection/overview)”机器学习数据集的第4类创建人工样本,该数据集有4类,其中类1有13000个样本,而类4只有600个样本。
导入所有必要的Python库。
import os import tensorflow as tf from keras.utils import plot_model import pydot import graphviz import numpy as np # linear algebra from sklearn.model_selection import train_test_split import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv) from tqdm import tqdm from numpy import expand_dims, zeros, ones, vstack from numpy.random import randn, randint from keras.optimizers import Adam, SGD from keras.models import Sequential from keras.layers import Dense, Reshape, Flatten, Conv2D, Conv2DTranspose, LeakyReLU, Dropout, BatchNormalization from matplotlib import pyplot from keras.preprocessing import image from cv2 import cv2 from PIL import Image config = tf.ConfigProto() config.gpu_options.allow_growth = True sess = tf.Session(config=config)
该机器学习数据集有几个压缩文件,我们需要将它们解压缩到包含相应图像的训练/测试文件夹中。训练图像的所有标签都在单独的csv文件中提供。
在下面的Python代码中,我们将读取一个包含标签和图像名称的csv文件。在继续前进之前,我们需要进行一些完整性检查(添加.jpeg扩展名,删除大小为0 KB的所有图像)。
在此机器学习数据集中,类3和4是少数类。我们将训练GAN为第4类生成图像。
#Read all the labels from the CSV file df_csv = pd.read_csv('/storage/trainLabels.csv') df_csv['image'] = df_csv['image'].astype(str) + '.jpeg' ## Delete all the images of size zero (0 KB), No need to do this step while rerunning the program ## ## There are multiple reasons for size zero data such as issue while downloading the database, ## ## Limited space on the VM or currupted data from the source. ## cd /storage/train/ !find /storage/train/ -size 0 -print !find /storage/train/ -size 0 -delete !ls -1 >> /storage/name.txt #List all the remaining/useful images into a txt file ## Remove extra entries (deleted images) from df_csv dataframe df_txt = pd.read_table('/storage/name.txt', header=None) df_txt.columns = ['name'] df = df_csv[df_csv.image.isin(df_txt.name.values)] len(df[df['level'] == 4]) #we will create a seperate data frame for the desired class df_4 = df[df['level'] == 4] df_4.head()
以下Python代码定义了判别器和生成器。鉴别器使用2 x 2 strides的卷积层对输入图像进行下采样(技巧1和2)。输出层使用Sigmoid激活函数来预测输入样本是真实的还是假的。使用Adam优化器(技巧#4)对机器学习模型进行训练,以最小化二元叉熵损失函数。
生成器由Conv2DTranspose定义,strides为2 x 2,可对图像进行128像素的上采样。输出层使用Tanh激活函数来确保输出值在[-1,1]的期望范围内(技巧#4)。我们有意识地对判别器和生成器使用了不同的学习率(技巧6)。
define_gan()函数使用已经定义的生成器和判别器模型,并创建一个新的逻辑模型。
# define the standalone discriminator model using GAN training hacks def define_discriminator(in_shape=(128,128,3)): model = Sequential() # input layer with image size of 128x128, since its a colored image it has 3 channels model.add(Conv2D(16, (3,3), padding='same', input_shape=in_shape)) model.add(LeakyReLU(alpha=0.2)) # downsample to 64x64 using strides of 2,2 and use of LeakyReLU model.add(Conv2D(8, (3,3), strides=(2,2), padding='same')) model.add(LeakyReLU(alpha=0.2)) # downsample to 32x32 model.add(Conv2D(16, (3,3), strides=(2,2), padding='same')) model.add(LeakyReLU(alpha=0.2)) # downsample to 16x16 model.add(Conv2D(8, (3,3), strides=(2,2), padding='same')) model.add(LeakyReLU(alpha=0.2)) # now the image size is down to 16 x 16 # classifier model.add(Flatten()) model.add(Dropout(0.2)) model.add(Dense(1, activation='sigmoid')) # compile model learning rate is higher than generator 2e-3 # use adam optimizer opt = Adam(lr=0.0002, beta_1=0.5) model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy']) return model # define the standalone generator model using GAN training hacks def define_generator(latent_dim): model = Sequential() # foundation for 16x16 image n_nodes = 256 * 16 * 16 model.add(Dense(n_nodes, input_dim=latent_dim)) model.add(LeakyReLU(alpha=0.2)) model.add(Reshape((16, 16, 256))) # upsample to 32x32, use of strides and LeakyReLU model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')) model.add(LeakyReLU(alpha=0.2)) # upsample to 64x64 model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')) model.add(LeakyReLU(alpha=0.2)) # upsample to 128x128 model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')) model.add(LeakyReLU(alpha=0.2)) # output layer, use of tanh as per hacks model.add(Conv2D(3, (3,3), activation='tanh', padding='same')) return model # combined generator and discriminator model, for updating the generator # this is alogical GAN model using above defined generator and discriminator def define_gan(g_model, d_model): # make weights in the discriminator not trainable d_model.trainable = False # initialize with a sequential model model = Sequential() # generator and discriminator are initialized later in the code to g_model and d_model respectively # add the generator model.add(g_model) # add the discriminator model.add(d_model) # compile model # use of adam optimizer, learning rate is lower than discriminator 2e-4 opt = Adam(lr=0.00002, beta_1=0.5) model.compile(loss='binary_crossentropy', optimizer=opt) return model
下面的函数load_real_samples()将从机器学习数据集中读取实际数据,并对图像进行归一化(技巧5),然后再调用train()将其提供给generate_real_samples ()。
generate_fake_samples()函数生成具有随机像素值和fake label 0的图像。
# read images from df_4 dataframe into x_train x_train = [] def load_real_samples(): for f, breed in tqdm(df_4.values): try: img = image.load_img(('/storage/train/{}'.format(f)), target_size=(128, 128)) # convert to float32 arr1 = image.img_to_array(img, dtype = 'float32') # scale images to [-1,1] from [0,255] arr = (arr1 - 127.5) / 127.5 x_train.append(arr) except: pass return x_train # select real samples # images are loaded into dataset variable using load_real_samples() function # below function will randomly select images from dataset and spit out X and y # X contains images, y contains lables. (model is not trained with labels as all the images are of same label) def generate_real_samples(dataset, n_samples): # choose random instances ix = randint(0, len(dataset), n_samples) # generate 'real' class labels (1) y = ones((n_samples, 1)) # retrieve selected images X = [] f = 0 for f in range (len(ix)): X.append(dataset[ix[f]]) #print(len(X)) return X, y # Use Gaussian Latent Space # generate points in latent space as input for the generator # The latent space defines the shape and distribution of the input to the generator model used to generate new images. def generate_latent_points(latent_dim, n_samples): # generate points in the latent space x_input = randn(latent_dim * n_samples) # reshape into a batch of inputs for the network x_input = x_input.reshape(n_samples, latent_dim) return x_input # use the generator to generate n fake examples, with class labels def generate_fake_samples(g_model, latent_dim, n_samples): # generate points in latent space x_input = generate_latent_points(latent_dim, n_samples) # predict outputs X = g_model.predict(x_input) # create 'fake' class labels (0) y = zeros((n_samples, 1)) return X, y
由于具有对抗性,生成器属性在每个epoch后都会发生变化。一旦生成了可接受的图像质量,生成器可能无法提高性能,在许多情况下甚至会随着后续的epoch而降低。
使用3个选项可以解决此问题
所有这些动作都将由summary_performance()函数执行,以评估判别器模型。针对多个epoch的GAN训练将每隔10 epoch生成一次模型快照,同时save_plat()将持续保存图像。这将有助于追溯GAN图像生成的过程。
# create and save a plot of generated images def save_plot(examples, epoch, n=7): # scale from [-1,1] to [0,1] examples = (examples + 1) / 2.0 # plot images for i in range(n * n): # define subplot pyplot.subplot(n, n, 1 + i) # turn off axis pyplot.axis('off') # plot raw pixel data pyplot.imshow(examples[i]) # save plot to file filename = 'generated_plot_e%03d.png' % (epoch+1) pyplot.savefig(filename) pyplot.close() # evaluate the discriminator, plot generated images, save generator model def summarize_performance(epoch, g_model, d_model, dataset, latent_dim, n_samples=49): print("################# Summarize ###################") # prepare real samples X_real, y_real = generate_real_samples(dataset, n_samples) # evaluate discriminator on real examples X_real_raw = np.array(X_real) y_real_raw = np.array(y_real) _, acc_real = d_model.evaluate(X_real_raw, y_real_raw, verbose=0) # prepare fake examples x_fake, y_fake = generate_fake_samples(g_model, latent_dim, n_samples) # evaluate discriminator on fake examples _, acc_fake = d_model.evaluate(x_fake, y_fake, verbose=0) # summarize discriminator performance print('>Accuracy real: %.0f%%, fake: %.0f%%' % (acc_real*100, acc_fake*100)) # save plot save_plot(x_fake, epoch) # save the generator model tile file filename = 'generator_model_%03d.h5' % (epoch+1) g_model.save(filename)
训练判别器模型更新两次(每次使用伪样本和真实样本),生成器为每个batch iteration生成一次(技巧7)。
在训练GAN时观察损失非常重要,判别器损失的突然下降表明生成器模型已经开始生成不良样本,判别器可以轻松地对其进行鉴别(技巧#8)。
# train the generator and discriminator def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=320, n_batch=64): bat_per_epo = int(len(dataset) / n_batch) #print('Batches per Epoch is %d' %bat_per_epo) half_batch = int(n_batch / 2) #print("Half Batch %d" % half_batch) # manually enumerate epochs for i in range(n_epochs): # enumerate batches over the training set for j in range(bat_per_epo): # print("Batch number %d" %(j+1)) # get randomly selected 'real' samples X_real, y_real = generate_real_samples(dataset, half_batch) #print(len(X_real)) X_real_raw = np.array(X_real) y_real_raw = np.array(y_real) # update discriminator model weights with real images # It is reccomended to update discriminator with seperate batches of real and fake images # update discriminator model with real images d_loss1, _ = d_model.train_on_batch(X_real_raw, y_real_raw) # generate 'fake' examples X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch) # update discriminator model with fake images d_loss2, _ = d_model.train_on_batch(X_fake, y_fake) # prepare points in latent space as input for the generator X_gan = generate_latent_points(latent_dim, n_batch) # create inverted labels for the fake samples y_gan = ones((n_batch, 1)) # update the generator via the discriminator's error g_loss = gan_model.train_on_batch(X_gan, y_gan) # summarize loss on this batch print('>%d, %d/%d, d1=%.3f, d2=%.3f g=%.3f' % (i+1, j+1, bat_per_epo, d_loss1, d_loss2, g_loss)) # evaluate the model performance, sometimes if (i+1) % 10 == 0: summarize_performance(i, g_model, d_model, dataset, latent_dim) # size of the latent space latent_dim = 100 # create the discriminator d_model = define_discriminator() # create the generator g_model = define_generator(latent_dim) # create the gan gan_model = define_gan(g_model, d_model) plot_model( d_model, to_file='Discriminator.png', show_shapes=True, show_layer_names=True, rankdir='TB') plot_model( g_model, to_file='Generator.png', show_shapes=True, show_layer_names=True, rankdir='TB') # load image data dataset = load_real_samples()
为了使模型可视化,我们可以使用plot_model()函数对它们进行绘制。
还可以使用summary()函数查看机器学习模型布局和可训练参数的数量。调用train()函数来开始判别器和生成器的训练。
d_model.summary() g_model.summary() with tf.device('/device:GPU:0'): train(g_model, d_model, gan_model, dataset, latent_dim)
判别器模型
生成器模型
训练过程如下例所示
经过320个epochs之后,以下是产生的示例图像。更复杂的生成器和鉴别器模型可以生成质量更好的图像。
save_plot() will generate a 7 by 7 matrix of images
现在,可以将这些新生成的少数类图像添加到原始不平衡机器学习数据集中。这将有助于将不平衡的多类数据转换为平衡的机器学习数据集。这将改善模型的分类性能。
借助生成对抗网络自动创建动漫角色
CycleGAN
PixelDTGAN
StackGAN
DTN