louishao 2019-11-08
来源:https://github.com/jiangxinyang227/NLP-Project/text_classifier
import tensorflow as tf from .base import BaseModel class BiLstmAttenModel(BaseModel): def __init__(self, config, vocab_size, word_vectors): super(BiLstmAttenModel, self).__init__(config=config, vocab_size=vocab_size, word_vectors=word_vectors) # 构建模型 self.build_model() # 初始化保存模型的saver对象 self.init_saver() def build_model(self): # 词嵌入层 with tf.name_scope("embedding"): # 利用预训练的词向量初始化词嵌入矩阵 if self.word_vectors is not None: embedding_w = tf.Variable(tf.cast(self.word_vectors, dtype=tf.float32, name="word2vec"), name="embedding_w") else: embedding_w = tf.get_variable("embedding_w", shape=[self.vocab_size, self.config["embedding_size"]],initializer=tf.contrib.layers.xavier_initializer()) # 利用词嵌入矩阵将输入的数据中的词转换成词向量,维度[batch_size, sequence_length, embedding_size] embedded_words = tf.nn.embedding_lookup(embedding_w, self.inputs) # 定义两层双向LSTM的模型结构 with tf.name_scope("Bi-LSTM"): for idx, hidden_size in enumerate(self.config["hidden_sizes"]): with tf.name_scope("Bi-LSTM" + str(idx)): # 定义前向LSTM结构 lstm_fw_cell = tf.nn.rnn_cell.DropoutWrapper( tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True), output_keep_prob=self.keep_prob) # 定义反向LSTM结构 lstm_bw_cell = tf.nn.rnn_cell.DropoutWrapper( tf.nn.rnn_cell.LSTMCell(num_units=hidden_size, state_is_tuple=True), output_keep_prob=self.keep_prob) # 采用动态rnn,可以动态的输入序列的长度,若没有输入,则取序列的全长 # outputs是一个元祖(output_fw, output_bw),其中两个元素的维度都是[batch_size, max_time, hidden_size], # fw和bw的hidden_size一样 # self.current_state 是最终的状态,二元组(state_fw, state_bw),state_fw=[batch_size, s],s是一个元祖(h, c) outputs, current_state = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell, lstm_bw_cell, embedded_words, dtype=tf.float32, scope="bi-lstm" + str(idx)) # 对outputs中的fw和bw的结果拼接 [batch_size, time_step, hidden_size * 2] embedded_words = tf.concat(outputs, 2) # 将最后一层Bi-LSTM输出的结果分割成前向和后向的输出 outputs = tf.split(embedded_words, 2, -1) # 在Bi-LSTM+Attention的论文中,将前向和后向的输出相加 with tf.name_scope("Attention"): H = outputs[0] + outputs[1] # 得到Attention的输出 output = self._attention(H) output_size = self.config["hidden_sizes"][-1] # 全连接层的输出 with tf.name_scope("output"): output_w = tf.get_variable( "output_w", shape=[output_size, self.config["num_classes"]], initializer=tf.contrib.layers.xavier_initializer()) output_b = tf.Variable(tf.constant(0.1, shape=[self.config["num_classes"]]), name="output_b") self.l2_loss += tf.nn.l2_loss(output_w) self.l2_loss += tf.nn.l2_loss(output_b) self.logits = tf.nn.xw_plus_b(output, output_w, output_b, name="logits") self.predictions = self.get_predictions() self.loss = self.cal_loss() self.train_op, self.summary_op = self.get_train_op() def _attention(self, H): """ 利用Attention机制得到句子的向量表示 """ # 获得最后一层LSTM的神经元数量 hidden_size = self.config["hidden_sizes"][-1] # 初始化一个权重向量,是可训练的参数 W = tf.Variable(tf.random_normal([hidden_size], stddev=0.1)) # 对Bi-LSTM的输出用激活函数做非线性转换 M = tf.tanh(H) # 对W和M做矩阵运算,M=[batch_size, time_step, hidden_size],计算前做维度转换成[batch_size * time_step, hidden_size] # newM = [batch_size, time_step, 1],每一个时间步的输出由向量转换成一个数字 newM = tf.matmul(tf.reshape(M, [-1, hidden_size]), tf.reshape(W, [-1, 1])) # 对newM做维度转换成[batch_size, time_step] restoreM = tf.reshape(newM, [-1, self.config["sequence_length"]]) # 用softmax做归一化处理[batch_size, time_step] self.alpha = tf.nn.softmax(restoreM) # 利用求得的alpha的值对H进行加权求和,用矩阵运算直接操作 r = tf.matmul(tf.transpose(H, [0, 2, 1]), tf.reshape(self.alpha, [-1, self.config["sequence_length"], 1])) # 将三维压缩成二维sequeezeR=[batch_size, hidden_size] sequeezeR = tf.squeeze(r) sentenceRepren = tf.tanh(sequeezeR) # 对Attention的输出可以做dropout处理 output = tf.nn.dropout(sentenceRepren, self.keep_prob) return output
base.py
import tensorflow as tf import numpy as np class BaseModel(object): def __init__(self, config, vocab_size=None, word_vectors=None): """ 文本分类的基类,提供了各种属性和训练,验证,测试的方法 :param config: 模型的配置参数 :param vocab_size: 当不提供词向量的时候需要vocab_size来初始化词向量 :param word_vectors:预训练的词向量,word_vectors 和 vocab_size必须有一个不为None """ self.config = config self.vocab_size = vocab_size self.word_vectors = word_vectors self.inputs = tf.placeholder(tf.int32, [None, None], name="inputs") # 数据输入 self.labels = tf.placeholder(tf.float32, [None], name="labels") # 标签 self.keep_prob = tf.placeholder(tf.float32, name="keep_prob") # dropout self.l2_loss = tf.constant(0.0) # 定义l2损失 self.loss = 0.0 # 损失 self.train_op = None # 训练入口 self.summary_op = None self.logits = None # 模型最后一层的输出 self.predictions = None # 预测结果 self.saver = None # 保存为ckpt模型的对象 def cal_loss(self): """ 计算损失,支持二分类和多分类 :return: """ with tf.name_scope("loss"): losses = 0.0 if self.config["num_classes"] == 1: losses = tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=tf.reshape(self.labels, [-1, 1])) elif self.config["num_classes"] > 1: self.labels = tf.cast(self.labels, dtype=tf.int32) losses = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits, labels=self.labels) loss = tf.reduce_mean(losses) return loss def get_optimizer(self): """ 获得优化器 :return: """ optimizer = None if self.config["optimization"] == "adam": optimizer = tf.train.AdamOptimizer(self.config["learning_rate"]) if self.config["optimization"] == "rmsprop": optimizer = tf.train.RMSPropOptimizer(self.config["learning_rate"]) if self.config["optimization"] == "sgd": optimizer = tf.train.GradientDescentOptimizer(self.config["learning_rate"]) return optimizer def get_train_op(self): """ 获得训练的入口 :return: """ # 定义优化器 optimizer = self.get_optimizer() trainable_params = tf.trainable_variables() gradients = tf.gradients(self.loss, trainable_params) # 对梯度进行梯度截断 clip_gradients, _ = tf.clip_by_global_norm(gradients, self.config["max_grad_norm"]) train_op = optimizer.apply_gradients(zip(clip_gradients, trainable_params)) tf.summary.scalar("loss", self.loss) summary_op = tf.summary.merge_all() return train_op, summary_op def get_predictions(self): """ 得到预测结果 :return: """ predictions = None if self.config["num_classes"] == 1: predictions = tf.cast(tf.greater_equal(self.logits, 0.0), tf.int32, name="predictions") elif self.config["num_classes"] > 1: predictions = tf.argmax(self.logits, axis=-1, name="predictions") return predictions def build_model(self): """ 创建模型 :return: """ raise NotImplementedError def init_saver(self): """ 初始化saver对象 :return: """ self.saver = tf.train.Saver(tf.global_variables()) def train(self, sess, batch, dropout_prob): """ 训练模型 :param sess: tf的会话对象 :param batch: batch数据 :param dropout_prob: dropout比例 :return: 损失和预测结果 """ feed_dict = {self.inputs: batch["x"], self.labels: batch["y"], self.keep_prob: dropout_prob} # 训练模型 _, summary, loss, predictions = sess.run([self.train_op, self.summary_op, self.loss, self.predictions], feed_dict=feed_dict) return summary, loss, predictions def eval(self, sess, batch): """ 验证模型 :param sess: tf中的会话对象 :param batch: batch数据 :return: 损失和预测结果 """ feed_dict = {self.inputs: batch["x"], self.labels: batch["y"], self.keep_prob: 1.0} summary, loss, predictions = sess.run([self.summary_op, self.loss, self.predictions], feed_dict=feed_dict) return summary, loss, predictions def infer(self, sess, inputs): """ 预测新数据 :param sess: tf中的会话对象 :param inputs: batch数据 :return: 预测结果 """ feed_dict = {self.inputs: np.array([inputs]), self.keep_prob: 1.0} predict = sess.run(self.predictions, feed_dict=feed_dict) return predict