haokele 2020-05-31
""" 线程通信的生产者与消费者 python的queue模块中提供了同步的线程安全的队列类,都具有原子性,实现线程间的同步 Queue (FIFO: fist in fist out) LifoQueue (LIFO: last in fist out) PriorityQueue (优先级队列) task_done(): 作用是在使用join()的时候,当queue中所有的项目都被取出,且每个项目取出后都使用了task_done(),那么就可以释放join()阻塞 系统解释如下: 用于消费者,每次get()以后,使用task_done() 是告诉队列正在处理的get任务完成 如果join()当前处于阻塞状态,那么当处理完所有项时它将继续运行(这意味着对于已经放入队列的每个项都接收到task_done()调用)。 如果调用的次数超过在队列中放置的项的次数,则引发ValueError错误。 如果不需要join()的时候也可以不使用task_done() """ import queue import threading import time import random q = queue.Queue(10) def produce(): i = 0 while i < 10: num = random.randint(1, 100) q.put("生产者生产出数据:%d" % num) print("生产者生产出数据:%d" % num) time.sleep(0.2) i += 1 print("生产结束") def consume(): while True: time.sleep(0.3) if q.empty(): break item = q.get() print("消费者取出:", item) q.task_done() print("消费者结束") if __name__ == ‘__main__‘: # 创建生产者 t1 = threading.Thread(target=produce, name="生产者") t1.start() time.sleep(0.1) # 创建消费者 t2 = threading.Thread(target=consume, name="消费者") t2.start() q.join() print("over") # from threading import Thread # import time # import random # from queue import Queue # from collections import deque # from datetime import datetime # # # 创建队列,设置队列最大数限制为3个 # queue = Queue(3) # # # # 生产者线程 # class Pro_Thread(Thread): # def run(self): # # 原材料准备,等待被生产,这里使用的是双向队列 # tasks = deque([1, 2, 3, 4, 5, 6, 7, 8]) # global queue # while True: # try: # # 从原材料左边开始生产,如果tasks中没有元素,调用popleft()则会抛出错误 # task = tasks.popleft() # queue.put(task) # print(datetime.now(), "生产", task, "现在队列数:", queue.qsize()) # # # 休眠随机时间 # time.sleep(0.5) # # 如果原材料被生产完,生产线程跳出循环 # except IndexError: # print("原材料已被生产完毕") # break # print("生产完毕") # # # # 消费者线程 # class Con_Thread(Thread): # def run(self): # global queue # while True: # if not queue.empty(): # # 通过get(),这里已经将队列减去了1 # task = queue.get() # time.sleep(2) # # 发出完成的信号,不发的话,join会永远阻塞,程序不会停止 # queue.task_done() # print(datetime.now(), "消费", task) # else: # break # print("消费完毕") # # # # r入口方法,主线程 # def main(): # Pro_1 = Pro_Thread() # # 启动线程 # Pro_1.start() # # 这里休眠一秒钟,等到队列有值,否则队列创建时是空的,主线程直接就结束了,实验失败,造成误导 # time.sleep(1) # for i in range(2): # Con_i = Con_Thread() # # 启动线程 # Con_i.start() # global queue # # 接收信号,主线程在这里等待队列被处理完毕后再做下一步 # queue.join() # # 给个标示,表示主线程已经结束 # print("主线程结束") # # # if __name__ == ‘__main__‘: # main()