基于循环神经网络(RNN)的古诗生成器

sjzhahalala 2018-03-26

基于循环神经网络(RNN)的古诗生成器,具体内容如下

之前在手机百度上看到有个“为你写诗”功能,能够随机生成古诗,当时感觉很酷炫= =

在学习了深度学习后,了解了一下原理,打算自己做个实现练练手,于是,就有了这个项目。文中如有瑕疵纰漏之处,还请路过的诸位大佬不吝赐教,万分感谢!

使用循环神经网络实现的古诗生成器,能够完成古体诗的自动生成。我简单地训练了一下,格式是对上了,至于意境么。。。emmm,呵呵

举一下模型测试结果例子:

1.生成古体诗

示例1:

树阴飞尽水三依,谩自为能厚景奇。
莫怪仙舟欲西望,楚人今此惜春风。

示例2:

岩外前苗点有泉,紫崖烟霭碧芊芊。
似僧月明秋更好,一踪颜事欲犹伤?

2.生成藏头诗(以“神策”为例)

示例1:

神照隆祭测馨尘,策紫珑氲羽团娟。

示例2:

神辇莺满花台潭,策穷渐见仙君地。

下面记录项目实现过程(由于都是文本处理方面,跟前一个项目存在很多类似的内容,对于这部分内容,我就只简单提一下,不展开了,新的东西再具体说):

1.数据预处理

数据集使用四万首的唐诗训练集,可以点击这里进行下载。

数据预处理的过程与前一个项目TensorFlow练手项目一:使用循环神经网络(RNN)实现影评情感分类大同小异,可以参考前一个项目,这里就不多说了,直接上代码。

# -*- coding: utf-8 -*-
# @Time : 18-3-13 上午11:04
# @Author : AaronJny
# @Email : [email protected]
import sys

reload(sys)
sys.setdefaultencoding('utf8')
import collections

ORIGIN_DATA = 'origin_data/poetry.txt' # 源数据路径

OUTPUT_DATA = 'processed_data/poetry.txt' # 输出向量路径

VOCAB_DATA = 'vocab/poetry.vocab'


def word_to_id(word, id_dict):
 if word in id_dict:
  return id_dict[word]
 else:
  return id_dict['<unknow>']


poetry_list = [] # 存放唐诗的数组

# 从文件中读取唐诗
with open(ORIGIN_DATA, 'r') as f:
 f_lines = f.readlines()
 print '唐诗总数 : {}'.format(len(f_lines))
 # 逐行进行处理
 for line in f_lines:
  # 去除前后空白符,转码
  strip_line = line.strip().decode('utf8')
  try:
   # 将唐诗分为标题和内容
   title, content = strip_line.split(':')
  except:
   # 出现多个':'的将被舍弃
   continue
  # 去除内容中的空格
  content = content.strip().replace(' ', '')
  # 舍弃含有非法字符的唐诗
  if '(' in content or '(' in content or '<' in content or '《' in content or '_' in content or '[' in content:
   continue
  # 舍弃过短或过长的唐诗
  lenth = len(content)
  if lenth < 20 or lenth > 100:
   continue
  # 加入列表
  poetry_list.append('s' + content + 'e')

print '用于训练的唐诗数 : {}'.format(len(poetry_list))

poetry_list=sorted(poetry_list,key=lambda x:len(x))

words_list = []
# 获取唐诗中所有的字符
for poetry in poetry_list:
 words_list.extend([word for word in poetry])
# 统计其出现的次数
counter = collections.Counter(words_list)
# 排序
sorted_words = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 获得出现次数降序排列的字符列表
words_list = ['<unknow>'] + [x[0] for x in sorted_words]
# 这里选择保留高频词的数目,词只有不到七千个,所以我全部保留
words_list = words_list[:len(words_list)]

print '词汇表大小 : {}'.format(words_list)

with open(VOCAB_DATA, 'w') as f:
 for word in words_list:
  f.write(word + '\n')

# 生成单词到id的映射
word_id_dict = dict(zip(words_list, range(len(words_list))))
# 将poetry_list转换成向量形式
id_list=[]
for poetry in poetry_list:
 id_list.append([str(word_to_id(word,word_id_dict)) for word in poetry])

# 将向量写入文件
with open(OUTPUT_DATA, 'w') as f:
 for id_l in id_list:
  f.write(' '.join(id_l) + '\n')

2.模型编写

这里要编写两个模型,一个用于训练,一个用于验证(生成古体诗)。两个模型大体上一致,因为用途不同,所以有些细节有出入。当进行验证时,验证模型读取训练模型的参数进行覆盖。

注释比较细,就不多说了,看代码。对于两个模型不同的一些关键细节,我也用注释进行了说明。

# -*- coding: utf-8 -*-
# @Time : 18-3-13 下午2:06
# @Author : AaronJny
# @Email : [email protected]
import tensorflow as tf
import functools
import setting

HIDDEN_SIZE = 128 # LSTM隐藏节点个数
NUM_LAYERS = 2 # RNN深度


def doublewrap(function):
 @functools.wraps(function)
 def decorator(*args, **kwargs):
  if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
   return function(args[0])
  else:
   return lambda wrapee: function(wrapee, *args, **kwargs)

 return decorator


@doublewrap
def define_scope(function, scope=None, *args, **kwargs):
 attribute = '_cache_' + function.__name__
 name = scope or function.__name__

 @property
 @functools.wraps(function)
 def decorator(self):
  if not hasattr(self, attribute):
   with tf.variable_scope(name, *args, **kwargs):
    setattr(self, attribute, function(self))
  return getattr(self, attribute)

 return decorator


class TrainModel(object):
 """
 训练模型
 """

 def __init__(self, data, labels, emb_keep, rnn_keep):
  self.data = data # 数据
  self.labels = labels # 标签
  self.emb_keep = emb_keep # embedding层dropout保留率
  self.rnn_keep = rnn_keep # lstm层dropout保留率
  self.global_step
  self.cell
  self.predict
  self.loss
  self.optimize

 @define_scope
 def cell(self):
  """
  rnn网络结构
  :return:
  """
  lstm_cell = [
   tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE), output_keep_prob=self.rnn_keep) for
   _ in range(NUM_LAYERS)]
  cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
  return cell

 @define_scope
 def predict(self):
  """
  定义前向传播
  :return:
  """
  # 创建词嵌入矩阵权重
  embedding = tf.get_variable('embedding', shape=[setting.VOCAB_SIZE, HIDDEN_SIZE])
  # 创建softmax层参数
  if setting.SHARE_EMD_WITH_SOFTMAX:
   softmax_weights = tf.transpose(embedding)
  else:
   softmax_weights = tf.get_variable('softmaweights', shape=[HIDDEN_SIZE, setting.VOCAB_SIZE])
  softmax_bais = tf.get_variable('softmax_bais', shape=[setting.VOCAB_SIZE])
  # 进行词嵌入
  emb = tf.nn.embedding_lookup(embedding, self.data)
  # dropout
  emb_dropout = tf.nn.dropout(emb, self.emb_keep)
  # 计算循环神经网络的输出
  self.init_state = self.cell.zero_state(setting.BATCH_SIZE, dtype=tf.float32)
  outputs, last_state = tf.nn.dynamic_rnn(self.cell, emb_dropout, scope='d_rnn', dtype=tf.float32,
            initial_state=self.init_state)
  outputs = tf.reshape(outputs, [-1, HIDDEN_SIZE])
  # 计算logits
  logits = tf.matmul(outputs, softmax_weights) + softmax_bais
  return logits

 @define_scope
 def loss(self):
  """
  定义损失函数
  :return:
  """
  # 计算交叉熵
  outputs_target = tf.reshape(self.labels, [-1])
  loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.predict, labels=outputs_target, )
  # 平均
  cost = tf.reduce_mean(loss)
  return cost

 @define_scope
 def global_step(self):
  """
  global_step
  :return:
  """
  global_step = tf.Variable(0, trainable=False)
  return global_step

 @define_scope
 def optimize(self):
  """
  定义反向传播过程
  :return:
  """
  # 学习率衰减
  learn_rate = tf.train.exponential_decay(setting.LEARN_RATE, self.global_step, setting.LR_DECAY_STEP,
            setting.LR_DECAY)
  # 计算梯度,并防止梯度爆炸
  trainable_variables = tf.trainable_variables()
  grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, trainable_variables), setting.MAX_GRAD)
  # 创建优化器,进行反向传播
  optimizer = tf.train.AdamOptimizer(learn_rate)
  train_op = optimizer.apply_gradients(zip(grads, trainable_variables), self.global_step)
  return train_op


class EvalModel(object):
 """
 验证模型
 """

 def __init__(self, data, emb_keep, rnn_keep):
  self.data = data # 输入
  self.emb_keep = emb_keep # embedding层dropout保留率
  self.rnn_keep = rnn_keep # lstm层dropout保留率
  self.cell
  self.predict
  self.prob

 @define_scope
 def cell(self):
  """
  rnn网络结构
  :return:
  """
  lstm_cell = [
   tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE), output_keep_prob=self.rnn_keep) for
   _ in range(NUM_LAYERS)]
  cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
  return cell

 @define_scope
 def predict(self):
  """
  定义前向传播过程
  :return:
  """
  embedding = tf.get_variable('embedding', shape=[setting.VOCAB_SIZE, HIDDEN_SIZE])

  if setting.SHARE_EMD_WITH_SOFTMAX:
   softmax_weights = tf.transpose(embedding)
  else:
   softmax_weights = tf.get_variable('softmaweights', shape=[HIDDEN_SIZE, setting.VOCAB_SIZE])
  softmax_bais = tf.get_variable('softmax_bais', shape=[setting.VOCAB_SIZE])

  emb = tf.nn.embedding_lookup(embedding, self.data)
  emb_dropout = tf.nn.dropout(emb, self.emb_keep)
  # 与训练模型不同,这里只要生成一首古体诗,所以batch_size=1
  self.init_state = self.cell.zero_state(1, dtype=tf.float32)
  outputs, last_state = tf.nn.dynamic_rnn(self.cell, emb_dropout, scope='d_rnn', dtype=tf.float32,
            initial_state=self.init_state)
  outputs = tf.reshape(outputs, [-1, HIDDEN_SIZE])

  logits = tf.matmul(outputs, softmax_weights) + softmax_bais
  # 与训练模型不同,这里要记录最后的状态,以此来循环生成字,直到完成一首诗
  self.last_state = last_state
  return logits

 @define_scope
 def prob(self):
  """
  softmax计算概率
  :return:
  """
  probs = tf.nn.softmax(self.predict)
  return probs

3.组织数据集

编写一个类用于组织数据,方便训练使用。代码很简单,应该不存在什么问题。

# -*- coding: utf-8 -*-
# @Time : 18-3-13 上午11:59
# @Author : AaronJny
# @Email : [email protected]
import numpy as np

BATCH_SIZE = 64
DATA_PATH = 'processed_data/poetry.txt'


class Dataset(object):
 def __init__(self, batch_size):
  self.batch_size = batch_size
  self.data, self.target = self.read_data()
  self.start = 0
  self.lenth = len(self.data)

 def read_data(self):
  """
  从文件中读取数据,构建数据集
  :return: 训练数据,训练标签
  """
  # 从文件中读取唐诗向量
  id_list = []
  with open(DATA_PATH, 'r') as f:
   f_lines = f.readlines()
   for line in f_lines:
    id_list.append([int(num) for num in line.strip().split()])
  # 计算可以生成多少个batch
  num_batchs = len(id_list) // self.batch_size
  # data和target
  x_data = []
  y_data = []
  # 生成batch
  for i in range(num_batchs):
   # 截取一个batch的数据
   start = i * self.batch_size
   end = start + self.batch_size
   batch = id_list[start:end]
   # 计算最大长度
   max_lenth = max(map(len, batch))
   # 填充
   tmp_x = np.full((self.batch_size, max_lenth), 0, dtype=np.int32)
   # 数据覆盖
   for row in range(self.batch_size):
    tmp_x[row, :len(batch[row])] = batch[row]
   tmp_y = np.copy(tmp_x)
   tmp_y[:, :-1] = tmp_y[:, 1:]
   x_data.append(tmp_x)
   y_data.append(tmp_y)
  return x_data, y_data

 def next_batch(self):
  """
  获取下一个batch
  :return:
  """
  start = self.start
  self.start += 1
  if self.start >= self.lenth:
   self.start = 0
  return self.data[start], self.target[start]


if __name__ == '__main__':
 dataset = Dataset(BATCH_SIZE)
 dataset.read_data()

4.训练模型

万事俱备,开始训练。

没有按照epoch进行训练,这里只是循环训练指定个mini_batch。

训练过程中,会定期显示当前训练步数以及loss值。会定期保存当前模型及对应checkpoint。

训练代码:

# -*- coding: utf-8 -*-

# @Time : 18-3-13 下午2:50
# @Author : AaronJny
# @Email : [email protected]
import tensorflow as tf
from rnn_models import TrainModel
import dataset
import setting

TRAIN_TIMES = 30000 # 迭代总次数(没有计算epoch)
SHOW_STEP = 1 # 显示loss频率
SAVE_STEP = 100 # 保存模型参数频率

x_data = tf.placeholder(tf.int32, [setting.BATCH_SIZE, None]) # 输入数据
y_data = tf.placeholder(tf.int32, [setting.BATCH_SIZE, None]) # 标签
emb_keep = tf.placeholder(tf.float32) # embedding层dropout保留率
rnn_keep = tf.placeholder(tf.float32) # lstm层dropout保留率

data = dataset.Dataset(setting.BATCH_SIZE) # 创建数据集

model = TrainModel(x_data, y_data, emb_keep, rnn_keep) # 创建训练模型

saver = tf.train.Saver()

with tf.Session() as sess:
 sess.run(tf.global_variables_initializer()) # 初始化
 for step in range(TRAIN_TIMES):
  # 获取训练batch
  x, y = data.next_batch()
  # 计算loss
  loss, _ = sess.run([model.loss, model.optimize],
       {model.data: x, model.labels: y, model.emb_keep: setting.EMB_KEEP,
       model.rnn_keep: setting.RNN_KEEP})
  if step % SHOW_STEP == 0:
   print 'step {}, loss is {}'.format(step, loss)
  # 保存模型
  if step % SAVE_STEP == 0:
   saver.save(sess, setting.CKPT_PATH, global_step=model.global_step)

5.验证模型

提供两种方法验证模型:

随机生成古体诗

生成藏头诗

随机生成的结果勉强可以接受,起码格式对了,看起来也像个样子。

生成藏头诗就五花八门了,效果不好,往往要多次才能生成一个差强人意的。emmm,其实也可以理解,毕竟我们指定的“藏头”在训练集中的分布是不能保证的。

这里简单说一下生成古体诗的过程:

1.首先,读取训练模型保存的参数,覆盖验证模型的参数

2.将开始符号's'作为输入,喂给模型,模型将输出下一个字符为此表中各词的概率,以及rnn传递的state。注意,验证模型时,dropout的保留率应设置为1.0

3.根据2中输出的概率,使用轮盘赌法,随机出下一个字

4.将随机出来的字作为输入,前一次输出的state作为本次输入的state,喂给模型,模型将输入下一个字符为此表中各词的概率,以及rnn传递的state

5.重复3,4步骤,直到随机出结束符'e',生成结束。过程中生成的所有字符,构成本次生成的古体诗('s'和'e'不算)

生成藏头诗的过程与生成古体诗是类似的,主要区别在于,在开始和每个标点符号被预测出来时,向模型喂给的是“藏头”中的一个字,就不多说了,详情可参考代码。

# -*- coding: utf-8 -*-
# @Time : 18-3-13 下午2:50
# @Author : AaronJny
# @Email : [email protected]
import sys

reload(sys)
sys.setdefaultencoding('utf8')
import tensorflow as tf
import numpy as np
from rnn_models import EvalModel
import utils
import os

# 指定验证时不使用cuda,这样可以在用gpu训练的同时,使用cpu进行验证
os.environ['CUDA_VISIBLE_DEVICES'] = ''

x_data = tf.placeholder(tf.int32, [1, None])

emb_keep = tf.placeholder(tf.float32)

rnn_keep = tf.placeholder(tf.float32)

# 验证用模型
model = EvalModel(x_data, emb_keep, rnn_keep)

saver = tf.train.Saver()
# 单词到id的映射
word2id_dict = utils.read_word_to_id_dict()
# id到单词的映射
id2word_dict = utils.read_id_to_word_dict()


def generate_word(prob):
 """
 选择概率最高的前100个词,并用轮盘赌法选取最终结果
 :param prob: 概率向量
 :return: 生成的词
 """
 prob = sorted(prob, reverse=True)[:100]
 index = np.searchsorted(np.cumsum(prob), np.random.rand(1) * np.sum(prob))
 return id2word_dict[int(index)]


# def generate_word(prob):
#  """
#  从所有词中,使用轮盘赌法选取最终结果
#  :param prob: 概率向量
#  :return: 生成的词
#  """
#  index = int(np.searchsorted(np.cumsum(prob), np.random.rand(1) * np.sum(prob)))
#  return id2word_dict[index]


def generate_poem():
 """
 随机生成一首诗歌
 :return:
 """
 with tf.Session() as sess:
  # 加载最新的模型
  ckpt = tf.train.get_checkpoint_state('ckpt')
  saver.restore(sess, ckpt.model_checkpoint_path)
  # 预测第一个词
  rnn_state = sess.run(model.cell.zero_state(1, tf.float32))
  x = np.array([[word2id_dict['s']]], np.int32)
  prob, rnn_state = sess.run([model.prob, model.last_state],
         {model.data: x, model.init_state: rnn_state, model.emb_keep: 1.0,
         model.rnn_keep: 1.0})
  word = generate_word(prob)
  poem = ''
  # 循环操作,直到预测出结束符号‘e'
  while word != 'e':
   poem += word
   x = np.array([[word2id_dict[word]]])
   prob, rnn_state = sess.run([model.prob, model.last_state],
          {model.data: x, model.init_state: rnn_state, model.emb_keep: 1.0,
          model.rnn_keep: 1.0})
   word = generate_word(prob)
  # 打印生成的诗歌
  print poem


def generate_acrostic(head):
 """
 生成藏头诗
 :param head:每行的第一个字组成的字符串
 :return:
 """
 with tf.Session() as sess:
  # 加载最新的模型
  ckpt = tf.train.get_checkpoint_state('ckpt')
  saver.restore(sess, ckpt.model_checkpoint_path)
  # 进行预测
  rnn_state = sess.run(model.cell.zero_state(1, tf.float32))
  poem = ''
  cnt = 1
  # 一句句生成诗歌
  for x in head:
   word = x
   while word != ',' and word != '。':
    poem += word
    x = np.array([[word2id_dict[word]]])
    prob, rnn_state = sess.run([model.prob, model.last_state],
           {model.data: x, model.init_state: rnn_state, model.emb_keep: 1.0,
           model.rnn_keep: 1.0})
    word = generate_word(prob)
    if len(poem) > 25:
     print 'bad.'
     break
   # 根据单双句添加标点符号
   if cnt & 1:
    poem += ','
   else:
    poem += '。'
   cnt += 1
  # 打印生成的诗歌
  print poem
  return poem


if __name__ == '__main__':
 # generate_acrostic(u'神策')
 generate_poem()

6.一些提取出来的方法和配置

很简单,不多说。

utils.py

# -*- coding: utf-8 -*-
# @Time : 18-3-13 下午4:16
# @Author : AaronJny
# @Email : [email protected]
import setting

def read_word_list():
 """
 从文件读取词汇表
 :return: 词汇列表
 """
 with open(setting.VOCAB_PATH, 'r') as f:
  word_list = [word for word in f.read().decode('utf8').strip().split('\n')]
 return word_list

def read_word_to_id_dict():
 """
 生成单词到id的映射
 :return:
 """
 word_list=read_word_list()
 word2id=dict(zip(word_list,range(len(word_list))))
 return word2id

def read_id_to_word_dict():
 """
 生成id到单词的映射
 :return:
 """
 word_list=read_word_list()
 id2word=dict(zip(range(len(word_list)),word_list))
 return id2word


if __name__ == '__main__':
 read_id_to_word_dict()

setting.py

# -*- coding: utf-8 -*-
# @Time : 18-3-13 下午3:08
# @Author : AaronJny
# @Email : [email protected]


VOCAB_SIZE = 6272 # 词汇表大小

SHARE_EMD_WITH_SOFTMAX = True # 是否在embedding层和softmax层之间共享参数
MAX_GRAD = 5.0 # 最大梯度,防止梯度爆炸
LEARN_RATE = 0.0005 # 初始学习率
LR_DECAY = 0.92 # 学习率衰减
LR_DECAY_STEP = 600 # 衰减步数
BATCH_SIZE = 64 # batch大小
CKPT_PATH = 'ckpt/model_ckpt' # 模型保存路径
VOCAB_PATH = 'vocab/poetry.vocab' # 词表路径
EMB_KEEP = 0.5 # embedding层dropout保留率

RNN_KEEP = 0.5 # lstm层dropout保留率

7.完毕

编码到此结束,有兴趣的朋友可以自己跑一跑,玩一玩,我就不多做测试了。

项目GitHub地址:https://github.com/AaronJny/peotry_generate

博主也正在学习,能力浅薄,文中如有瑕疵纰漏之处,还请路过的诸位大佬不吝赐教,万分感谢!

相关推荐