今天周六,早晨出门吃饭,全身汗湿透。天气真的是太热了!我决定一天不出门,在屋子里面休息!
晚上,腾飞给我说了他暑假的计划,决定去长沙、成都去转一圈,并邀请我去,还顺便叫我晚上去吃饭。
最后我们就一起吃了一顿饭,不过我估计我休息的时间是下下周,因此可能不能和他一起去了。
今天总结一下本周学习到的知识:
周一
在进行神经网络序列输入的时候,发现了一个很好的文件代码用来数据预处理。
注意:后面使用“数据单元”代表数据的一个最小单元,比如训练英文数据就可以代表一个字符——’a’,训练中文数据就可以代表一个汉字——’王’etc
文件名字叫做read_utils.py
。该文件中实现了一个类TextConverter
和一个工具函数batch_generator
。
该文件是一个工具类,用于把一个输入文件根据编码输出对应的一批一批的数据用于RNN/LSTM之类的文本处理神经网络训练, 用法是先使用TextConverter类编码所有的内容为数据单元对应数字,然后使用batch_generator函数将编码好的数字分批返回 比如:
text = f.read() #f是open后得到的文件指针
converter = TextConverter(text)
arr = converter.text_to_arr(text)
g = batch_generator(arr,num_seqs,num_steps) #如果输入本来就是编码好的数据,则直接使用这个函数即可
下面让我们来一个一个学习一下。
TextConverter类
TextConverter类是用来将传入的文件中所有数据 首先,我们看看该类的构造函数。
class TextConverter(object):
def __init__(self, text=None, max_vocab=5000, filename=None):
if filename is not None:
with open(filename, 'rb') as f:
self.vocab = pickle.load(f)
else:
vocab = set(text) #存储读取文件中的数据单元所有类型的集合,比如英文文件会是:{'\n','A','b',...,'\r'}
print(len(vocab)) #打印数据单元的种类的数目
# max_vocab_process
vocab_count = {} #存储每一个数据单元在整个读入的文本中出现的次数的字典
for word in vocab:
vocab_count[word] = 0
for word in text:
vocab_count[word] += 1
vocab_count_list = [] #存储元组(数据单元,对应数量)组成的列表,然后按照数量的大小排序,比如[('a',100),('d',20),...,('x',3)]
for word in vocab_count:
vocab_count_list.append((word, vocab_count[word]))
vocab_count_list.sort(key=lambda x: x[1], reverse=True)
if len(vocab_count_list) > max_vocab: #根据传入的最大数量的数据单元数截断前max_vocab大的数据单元,基本上不可能,除非遇到汉字之类的文本
vocab_count_list = vocab_count_list[:max_vocab]
vocab = [x[0] for x in vocab_count_list]
self.vocab = vocab #vocab仅仅存储数据单元按照出现数量从大到小的列表,例如:['a','d',...,'x']
self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)} # 数据单元到数字字典{' ':0,'e':1,...,'c':20,...}
self.int_to_word_table = dict(enumerate(self.vocab)) # 数字到数据单元字典{0:‘ ’,1:'e',...,20:'c',..}
可以看出,该构造函数的输入是(文本内容,最大词限制,文件名)。可以看出最后一个关键字参数filename是用来判断文件是否为空,从而直接读取 不用进入后面的处理环节,这个地方跟后面的保存模块对应的:
def save_to_file(self, filename): #仅仅存储数据单元按照出现数量从大到小的列表到指定文件filename处,例如:['a','d',...,'x']
with open(filename, 'wb') as f:
pickle.dump(self.vocab, f)
保存后,以后就可以直接使用这个词表了。 如果没有传入文件名,则说明需要进行后续的处理,我们仔细看一下后面的代码,发现实际上做的工作就是:
- 找到所有数据中“数据单元”
- 遍历文件记录每个“数据单元”出现的次数,根据次数大小对“数据单元”排序
- 根据传入参数max_vocab截断数据单元,只去前max_vocab个“数据单元”
- 将留下的数据单元一一映射到自然数0,1,2…上
注意传入的text
是一个列表或者列表生成器之类的数据结构,因为后面的代码把它这样子用了(比如去text的集合,用for迭代text等)。
在构造函数中已经实现了“数据单元”到自然数列的映射,因此互相转换的函数就显而易见了,如下所示:
def word_to_int(self, word): #返回数据单元对应的整数
if word in self.word_to_int_table:
return self.word_to_int_table[word]
else:
return len(self.vocab) #如果出现了没有出现的词,则变为<unk>对应的标记
def int_to_word(self, index): #返回整数对应的数据单元
if index == len(self.vocab):
return '<unk>' #没有出现的词被标记为unknown的缩写
elif index < len(self.vocab):
return self.int_to_word_table[index]
else:
raise Exception('Unknown index!')
由上面的函数可知,在映射的时候如果词没有出现在词表中,则标记为<unk>
返回,这个是非常重要的一个处理,因为在实际进行数据
输入的时候,由于截断引起的超出数据记录的词,或者在进行测试集的时候很有可能出现这种情况!
既然有了单个“数据单元”和自然数的映射,多个“数据单元”组成的列表当然也能相互转化:
def text_to_arr(self, text): #将输入的text根据word_to_int返回得到对应的编码数,并构成np.ndarray并返回,例如:输入' a\n',则返回类似array([ 0, 0, 4, 10])
arr = []
for word in text:
arr.append(self.word_to_int(word))
return np.array(arr)
def arr_to_text(self, arr): #输入列表类型的数据,返回对应的数据单元的组合
words = []
for index in arr:
words.append(self.int_to_word(index))
return "".join(words)
batch_generator
有了数据编码的类,下面就需要一个样本生成的函数了。 根据输入的数据(这个输入一般就是全部样本组成的文本,并且已经根据所有数据单元编码成为了数字列表), 返回对应的生成器,满足输入的序列个数和序列长度
def batch_generator(arr, n_seqs, n_steps): #根据输入的arr(这个输入一般就是全部样本组成的文本,并且已经根据所有数据单元编码成为了数字列表),返回对应的生成器,满足输入的序列个数和序列长度
arr = copy.copy(arr)
batch_size = n_seqs * n_steps #计算没次输入需要使用的数据单元
n_batches = int(len(arr) / batch_size) #一共可以得到多少组输入数据
arr = arr[:batch_size * n_batches] #直接忽略了后面不能构成一组输入的数据!
arr = arr.reshape((n_seqs, -1))
while True:
np.random.shuffle(arr) #将所有行打乱顺序
for n in range(0, arr.shape[1], n_steps):
x = arr[:, n:n + n_steps] #每次选择对应n_seqs行,n_steps列的数据
y = np.zeros_like(x) #返回跟x同形状的n维数组,数据全部都是0
y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0]
yield x, y
可以看出,该函数根据输入的所有训练数据,和对应的序列一批的个数(n_seqs)和每个输入的序列的长度(n_steps),然后 通过生成器函数不断的迭代取出来数据用于训练。每一个输入和输出的序列刚错开一位,比如:
#如果输入的x是[[48 49 50]
# [ 0 1 2]]
# 则输出的y是[[49 50 48]
# [ 1 2 0]]
周二
文件名字叫做model.py
。该文件中实现了一个类CharRNN
和一个工具函数pick_top_n
。
CharRNN
下面介绍模型类,这个模型使用的是TensorFlow模块,然后进行网络的搭建,首先看构造函数:
class CharRNN:
def __init__(self, num_classes, num_seqs=64, num_steps=50,
lstm_size=128, num_layers=2, learning_rate=0.001,
grad_clip=5, sampling=False, train_keep_prob=0.5, use_embedding=False, embedding_size=128):
if sampling is True:
num_seqs, num_steps = 1, 1
else:
num_seqs, num_steps = num_seqs, num_steps
self.num_classes = num_classes
self.num_seqs = num_seqs #序列个数
self.num_steps = num_steps #序列长度
self.lstm_size = lstm_size
self.num_layers = num_layers
self.learning_rate = learning_rate
self.grad_clip = grad_clip
self.train_keep_prob = train_keep_prob
self.use_embedding = use_embedding
self.embedding_size = embedding_size
tf.reset_default_graph()
self.build_inputs() #构建输入层
self.build_lstm() #构建LSTM层
self.build_loss() #构建损失函数
self.build_optimizer() #构建优化器
self.saver = tf.train.Saver() #保存设置
#下面测试,增加总结
tf.summary.scalar('loss',self.loss)
for var in tf.trainable_variables():
tf.summary.histogram(var.op.name, var)
self.merge_summary = tf.summary.merge_all()
self.train_writer = tf.summary.FileWriter('./model')
self.train_writer.add_graph(tf.get_default_graph())
可以看出,该构造函数根据输入的参数,搭建了一个R输入-R输出的神经网络,隐状态用的是LSTM模型。 首先先保存各个输入的设定,然后分别构建各个层和优化保存相关的设置,我们一个一个看:
def build_inputs(self):
with tf.name_scope('inputs'):
self.inputs = tf.placeholder(tf.int32, shape=(
self.num_seqs, self.num_steps), name='inputs')
self.targets = tf.placeholder(tf.int32, shape=(
self.num_seqs, self.num_steps), name='targets')
self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')
# 对于中文,需要使用embedding层
# 英文字母没有必要用embedding层
if self.use_embedding is False:
self.lstm_inputs = tf.one_hot(self.inputs, self.num_classes)
else:
with tf.device("/cpu:0"):
embedding = tf.get_variable('embedding', [self.num_classes, self.embedding_size])
self.lstm_inputs = tf.nn.embedding_lookup(embedding, self.inputs)
上面的函数就是输入层,可以看出,根据输入的参数embedding
来确定输入层是否增加一个嵌入层,显然,如果数据的词表
比较大,比如中文,就需要嵌入层降维,如果比较小,就可以不用嵌入层。
然后是LSTM层:
def build_lstm(self):
# 创建单个cell并堆叠多层
def get_a_cell(lstm_size, keep_prob):
lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)
drop = tf.nn.rnn_cell.DropoutWrapper(lstm, output_keep_prob=keep_prob)
return drop
with tf.name_scope('lstm'):
cell = tf.nn.rnn_cell.MultiRNNCell(
[get_a_cell(self.lstm_size, self.keep_prob) for _ in range(self.num_layers)]
)
self.initial_state = cell.zero_state(self.num_seqs, tf.float32)
# 通过dynamic_rnn对cell展开时间维度
self.lstm_outputs, self.final_state = tf.nn.dynamic_rnn(cell, self.lstm_inputs, initial_state=self.initial_state)
# 通过lstm_outputs得到概率
seq_output = tf.concat(self.lstm_outputs, 1)
x = tf.reshape(seq_output, [-1, self.lstm_size])
with tf.variable_scope('softmax'):
softmax_w = tf.Variable(tf.truncated_normal([self.lstm_size, self.num_classes], stddev=0.1))
softmax_b = tf.Variable(tf.zeros(self.num_classes))
self.logits = tf.matmul(x, softmax_w) + softmax_b
self.proba_prediction = tf.nn.softmax(self.logits, name='predictions')
可以看出,LSTM层使用的是多层,层数根据参数self.num_layers
确定LSTM的隐层的层数。然后得到输出使用的是softmax
激活函数,可以
得到输出的每一个类别的概率。
之后是损失和优化:
def build_loss(self):
with tf.name_scope('loss'):
y_one_hot = tf.one_hot(self.targets, self.num_classes)
y_reshaped = tf.reshape(y_one_hot, self.logits.get_shape())
loss = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=y_reshaped)
self.loss = tf.reduce_mean(loss)
def build_optimizer(self):
# 使用clipping gradients
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, tvars), self.grad_clip)
train_op = tf.train.AdamOptimizer(self.learning_rate)
self.optimizer = train_op.apply_gradients(zip(grads, tvars))
损失使用的就是一般常用的交叉熵损失,优化则使用的是比较著名的自适应优化器adam
之后就可以开始训练了:
def train(self, batch_generator, max_steps, save_path, save_every_n, log_every_n):
self.session = tf.Session()
with self.session as sess:
sess.run(tf.global_variables_initializer())
# Train network
step = 0
new_state = sess.run(self.initial_state)
for x, y in batch_generator:
step += 1
start = time.time()
feed = {self.inputs: x,
self.targets: y,
self.keep_prob: self.train_keep_prob,
self.initial_state: new_state}
batch_loss, new_state, _ , train_summary = sess.run([self.loss,
self.final_state,
self.optimizer,
self.merge_summary],
feed_dict=feed)
end = time.time()
# control the print lines
if step % log_every_n == 0:
print('step: {}/{}... '.format(step, max_steps),
'loss: {:.4f}... '.format(batch_loss),
'{:.4f} sec/batch'.format((end - start)))
self.train_writer.add_summary(train_summary, step)
if (step % save_every_n == 0):
self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
if step >= max_steps:
break
self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
可以看出,训练就是根据前面搭建的网络和生成的样本,往里面不断的喂数据。然后将结果不断保存。
训练好模型后,我们就可以读取保存好的模型:
def load(self, checkpoint):
self.session = tf.Session()
self.saver.restore(self.session, checkpoint)
print('Restored from: {}'.format(checkpoint))
读取了保存好模型中的各种参数后,就看一通过这个网络生成样本:
def sample(self, n_samples, prime, vocab_size): #n_samples:一共输出多少个基本单元;prime:开始的几个基本单元;vocab_size:一共有多少个类型的基本单元+1(未知数据编码)
samples = [c for c in prime]
sess = self.session
new_state = sess.run(self.initial_state)
preds = np.ones((vocab_size, )) # for prime=[]
for c in prime: #根据输入的“基本单元”的多少,不断更新状态,直到最后的输入为止!真好的实现!
x = np.zeros((1, 1))
# 输入单个字符
x[0, 0] = c
feed = {self.inputs: x,
self.keep_prob: 1.,
self.initial_state: new_state} #每次输入时更新状态即可达到连续的效果,对应LSTM状态是元组(c,h)
preds, new_state = sess.run([self.proba_prediction, self.final_state],
feed_dict=feed)
c = pick_top_n(preds, vocab_size)
# 添加字符到samples中
samples.append(c)
# 不断生成字符,直到达到指定数目
for i in range(n_samples):
x = np.zeros((1, 1))
x[0, 0] = c
feed = {self.inputs: x,
self.keep_prob: 1.,
self.initial_state: new_state}
preds, new_state = sess.run([self.proba_prediction, self.final_state],
feed_dict=feed)
c = pick_top_n(preds, vocab_size)
samples.append(c)
return np.array(samples)
注意这个函数是根据输入的前几个自然数序列(已经通过“基本单元”映射为自然数了),预测下一个输出的对应自然数。
其中第一个for循环出色的使用了权重共享的思想,使用sample这个函数的时候使得在构造函数时sample
这个参数为True
。
然后一个一个的将“基本单元”映射后的自然数输入,这样每次仅更新隐状态输出的状态参数。
之后第二个for循环依次生成后续的一个一个自然数。
pick_top_n
在上一小节的最后一个sample
函数中,用到了pick_top_n
函数,这个函数的内容如下:
def pick_top_n(preds, vocab_size, top_n=5):
p = np.squeeze(preds) #squeeze函数从数组的形状中删除单维度条目,即把shape中为1的维度去掉
# 将除了top_n个预测值的位置都置为0
p[np.argsort(p,kind = 'mergesort')[:-top_n]] = 0 #argsort函数可以按照给定方法排序
# 归一化概率
p = p / np.sum(p)
# 随机选取一个字符
c = np.random.choice(vocab_size, 1, p=p)[0]
return c
可以看出,该函数通过输入的各个序列的概率,然后根据n
取得概率前几个最大的概率,之后通过这些概率进行归一化,然后得到留下来
的数字序列对应的概率分布律,最后通过np.random.choice
按照各个字符的分布律来随机选择一个字符并返回。
周三
预测和精度
今天,通过前两天的代码的学习,我今天将我需要用到的数据序列通过read_utils.py
预处理,之后放到model.py
里面进行训练。
之后设置了20000步的训练,结果发现可以成功运行并根据输入生成一系列新的输出,但是我希望能够直接得到下一个字符的概率,因此
可以按照如下的方式进行实现:
也可以通过这个网络预测下一个出现的“数据单元”的概率:
def prediction_next_n(self,prime,vocab_size,next_n =3 , **k): #prime:开始的几个基本单元;vocab_size:一共有多少个类型的基本单元+1(未知数据编码)
# samples = [c for c in prime]
sess = self.session
new_state = sess.run(self.initial_state)
preds = np.ones((vocab_size,)) # for prime=[]
for c in prime: # 根据输入的“基本单元”的多少,不断更新状态,直到最后的输入为止!真好的实现!
x = np.zeros((1, 1))
# 输入单个字符
x[0, 0] = c
feed = {self.inputs: x,
self.keep_prob: 1.,
self.initial_state: new_state} # 每次输入时更新状态即可达到连续的效果,对应LSTM状态是元组(c,h)
preds, new_state = sess.run([self.proba_prediction, self.final_state],
feed_dict=feed)
p = np.squeeze(preds) #squeeze函数从数组的形状中删除单维度条目,即把shape中为1的维度去掉
# 将next_n个最大的概率的位置得到
next_n_num = np.argsort(p,kind = 'mergesort')[-next_n:] #argsort函数可以按照给定方法排序
#返回的应该是标号和对应的概率值
s_p_d = []
for i in next_n_num:
s_p_d.append((i,p[i]))
return s_p_d
返回的这个各个自然数的概率,就可以进行预测生成新的数据对应的结果了。
为了后续的测试,我需要得到精度,因此实现一个计算精度的函数:
def get_accuracy(self,dualList,vocab_size,next_n =3): #输入的序列满足有开始的标记,没有结尾的标记
success_num = 0
for one_session in dualList:
if one_session[-1] in self.prediction_next_n(one_session[:-1],vocab_size,next_n):
success_num = success_num + 1
print(success_num,len(dualList))
print('精度是:%.4f' % (success_num/len(dualList)) )
TensorBoard的使用
为了将所有数据都显示出来,我使用了TensorBoard进行显示。