人工智能AI培训_TensorFlow 队列与线程
深度学习的模型训练过程往往需要大量的数据,而将这些数据一次性的读入和预处理需要大量的时间开销,所以通常采用队列与多线程的思想解决这个问题,而且TensorFlow为我们提供了完善的函数。
本文介绍了TensorFlow的线程和队列。在使用TensorFlow进行异步计算时,队列是一种强大的机制。正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。
在Python中是没有提供直接实现队列的函数的,所以通常会使用列表模拟队列。而TensorFlow提供了整套实现队列的函数和方法,在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。操作队列的函数主要有:
FIFOQueue():创建一个先入先出(FIFO)的队列
RandomShuffleQueue():创建一个随机出队的队列
enqueue_many():初始化队列中的元素
dequeue():出队 enqueue():入队
与队列Queue有关的有以下三个概念:
Queue是TF队列和缓存机制的实现
QueueRunner是TF中对操作Queue的线程的封装
Coordinator是TF中用来协调线程运行的工具
虽然它们经常同时出现,但这三样东西在TensorFlow里面是可以单独使用的,不妨先分开来看待。
1.Queue,队列
根据实现的方式不同,分成具体的几种类型,例如:
tf.FIFOQueue 按入列顺序出列的队列
tf.RandomShuffleQueue 随机顺序出列的队列
tf.PaddingFIFOQueue 以固定长度批量出列的队列
tf.PriorityQueue 带优先级出列的队列
...
这些类型的Queue除了自身的性质不太一样外,创建、使用的方法基本是相同的,以下介绍两个最常用的
1)tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 创建一个以先进先出的顺序对元素进行排队的队列
参数:
capacity:整数。可能存储在此队列中的元素数量的上限
dtypes:DType对象列表。长度dtypes必须等于每个队列元 素中的张量数,dtype的类型形状,决定了后面进队列元素形状
方法:
q.dequeue()获取队列的数据
q.enqueue(值)将一个数据添加进队列
q.enqueue_many(列表或者元组)将多个数据添加进队列
q.size() 返回队列的大小
2)、tf.RandomShuffleQueue() 随机出的队列
Queue主要包含入列(enqueue)和出列(dequeue)两个操作。enqueue操作返回计算图中的一个Operation节点,dequeue操作返回一个Tensor值。Tensor在创建时同样只是一个定义(或称为“声明”),需要放在Session中运行才能获得真正的数值。下面是一个单独使用Queue的例子:
2. QueueRunner,队列管理器
tf.train.QueueRunner(queue, enqueue_ops=None)
参数:
queue:A Queue
enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
create_threads(sess, coord=None,start=False) 创建线程来运行给定会话的入队操作
start:布尔值,如果True启动线程;如果为False调用者 必须调用start()启动线程
coord:线程协调器 用于线程的管理
Tensorflow的计算主要在使用CPU/GPU和内存,而数据读取涉及磁盘操作,速度远低于前者操作。因此通常会使用多个线程读取数据,然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的。
QueueRunner需要与Queue一起使用(这名字已经注定了它和Queue脱不开干系),但并不一定必须使用Coordinator。看下面这个例子:
增加计数的进程会不停的后台运行,执行入队的进程会先执行10次(因为队列长度只有10),然后主线程开始消费数据,当一部分数据消费被后,入队的进程又会开始执行。最终主线程消费完20个数据后停止,但其他线程继续运行,程序不会结束。
3.Coordinator,线程协调器
tf.train.Coordinator() 线程协调员,实现一个简单的机制来协调一 组线程的终止
方法: 返回的是线程协调实例
request_stop() 请求停止
join(threads=None, stop_grace_period_secs=120) 等待线程终止
Coordinator是个用来保存线程组运行状态的协调器对象,它和TensorFlow的Queue没有必然关系,是可以单独和Python线程使用的。例如:
将这个程序运行起来,主线程会等待所有子线程都停止后结束,从而使整个程序结束。由此可见,只要有任何一个线程调用了Coordinator的request_stop方法,所有的线程都可以通过should_stop方法感知并停止当前线程。
将QueueRunner和Coordinator一起使用,实际上就是封装了这个判断操作,从而使任何一个现成出现异常时,能够正常结束整个程序,同时主线程也可以直接调用request_stop方法来停止所有子线程的执行。