详解python中的线程与线程池

数据猿 2019-05-10

线程

进程和线程

什么是进程?

进程就是正在运行的程序, 一个任务就是一个进程, 进程的主要工作是管理资源, 而不是实现功能

什么是线程?

线程的主要工作是去实现功能, 比如执行计算.

线程和进程的关系就像员工与老板的关系,

老板(进程) 提供资源 和 工作空间,

员工(线程) 负责去完成相应的任务

特点

一个进程至少由一个线程, 这一个必须存在的线程被称为主线程, 同时一个进程也可以有多个线程, 即多线程

当我们我们遇到一些需要重复执行的代码时, 就可以使用多线程分担一些任务, 进而加快运行速度

线程的实现

线程模块

Python通过两个标准库_thread和threading, 提供对线程的支持 , threading对_thread进行了封装。
threading模块中提供了Thread , Lock , RLock , Condition等组件。

因此在实际的使用中我们一般都是使用threading来实现多线程

线程包括子线程和主线程:

主线程 : 当一个程序启动时 , 就有一个线程开始运行 , 该线程通常叫做程序的主线程

子线程 : 因为程序是开始时就执行的 , 如果你需要再创建线程 , 那么创建的线程就是这个主线程的子线程

主线程的重要性体现在两方面 :

  1. 是产生其他子线程的线程
  2. 通常它必须最后完成执行, 比如执行各种关闭操作

Thread类

常用参数说明

参数 说明
target 表示调用的对象, 即子线程要执行的任务, 可以是某个内置方法, 或是你自己写的函数
name 子线程的名称
args 传入target函数中的位置参数, 是一个元组, 参数后必须加逗号

常用实例方法

方法 作用
Thread.run(self) 线程启动时运行的方法, 由该方法调用 target参数所指定的函数
Thread.start(self) 启动进程, start方法就是区帮你调用run方法
Thread.terminate(self) 强制终止线程
Thread.join(self, timeout=None) 阻塞调用, 主线程进行等待
Thread.setDaemon(self, daemonic) 将子线程设置为守护线程, 随主线程结束而结束
Thread.getName(self, name) 获取线程名
Thread.setName(self, name) 设置线程名

创建线程

在python中创建线程有两种方式, 实例Thread类和继承重写Thread类

实例Thread类

import threading
import time

def run(name, s): # 线程要执行的任务
 time.sleep(s) # 停两秒
 print('I am %s' % name)

# 实例化线程类, 并传入函数及其参数, 
t1 = threading.Thread(target=run, name='one', args=('One', 5))
t2 = threading.Thread(target=run, name='two', args=('Two', 2))

# 开始执行, 这两个线程会同步执行
t1.start()
t2.start()
print(t1.getName())		# 获取线程名
print(t2.getName())

# Result:
one
two
I am Two	# 运行2s后
I am One	# 运行5s后

继承Thread类

class MyThread(threading.Thread): # 继承threading中的Thread类
 # 线程所需的参数
 def __init__(self, name, second):
 super().__init__()
 self.name = name
 self.second = second

 # 重写run方法,表示线程所执行的任务,必须有
 def run(self):
 time.sleep(self.second)
 print('I am %s' % self.name)
# 创建线程实例
t1 = MyThread('One', 5)
t2 = MyThread('Two', 2)
# 启动线程,实际上是调用了类中的run方法
t1.start()
t2.start()
t1.join()
print(t1.getName())
print(t2.getName())

# Result:
I am Two	# 运行后2s
I am One	# 运行后5s
One
Two

常用方法

join()

阻塞调用程序 , 直到调用join () 方法的线程执行结束, 才会继续往下执行

# 开始执行, 这两个线程会同步执行
t1.start()
t2.start()
t1.join()	# 等待t1线程执行完毕,再继续执行剩余的代码
print(t1.getName())
print(t2.getName())

# Result:	
I am Two
I am One
one
two

setDemon()

使用给线程设置守护模式: 子线程跟随主线程的结束而结束, 不管这个子线程任务是否完成. 而非守护模式的子线程只有在执行完成后, 主线程才会执行完成

setDaemon() 与 join() 基本上是相对的 , join会等子线程执行完毕 ; 而setDaemon则不会等

def run(name, s): # 线程要执行的函数
 time.sleep(s) # 停两秒
 print('I am %s' % name)

# 实例化线程类, 并传入函数及其参数
t1 = threading.Thread(target=run, name='one', args=('One', 5))
t2 = threading.Thread(target=run, name='two', args=('Two', 2))
# 给t1设置守护模式, 使其随着主线程的结束而结束
t1.setDaemon(True)
# 开始执行, 这两个线程会同步执行
t1.start()
t2.start()	# 主线程会等待未设置守护模式的线程t2执行完成

# Result:
I am Two	# 运行后2s

线程间的通信

互斥锁

在同一个进程的多线程中 , 其中的变量对于所有线程来说都是共享的 , 因此 , 如果多个线程之间同时修改一个变量 , 那就乱套了 , 共享的数据就会有很大的风险 , 所以我们需要互斥锁 , 来锁住数据 , 防止篡改。

来看一个错误的示范:

a = 0
def incr(n):
 global a
 for i in range(n):
 a += 1
# 这两个方法同时声明了变量a,并对其进行修改
def decr(n):
 global a
 for i in range(n):
 a -= 1

t_incr = threading.Thread(target=incr, args=(1000000,))
t_decr = threading.Thread(target=decr, args=(1000000,))
t_incr.start()
t_decr.start()
t_incr.join()
t_decr.join()
print(a)
# 期望结果应该是0, 但是因为这里没有设置互斥锁, 所以两个方法是同时对同一个变量进行修改, 得到的的结果值是随机的

下面我们改一下上面的代码 , 两个方法加上互斥锁:

a = 0
lock = threading.Lock()	# 实例化互斥锁对象, 方便之后的调用

def incr(n):
 global a
 for i in range(n):
 lock.acquire()	# 上锁的方法
 a += 1
 lock.release()	# 解锁的方法
# 要注意的是上锁的位置是, 出现修改操作的代码
def decr(n):
 global a
 for i in range(n):
 with lock:	# 也可以直接使用with, 自动解锁
  a -= 1

t_incr = threading.Thread(target=incr, args=(1000000,))
t_decr = threading.Thread(target=decr, args=(1000000,))
t_incr.start()
t_decr.start()
t_incr.join()
t_decr.join()
print(a)
# Result: 0

在容易出现抢夺资源的地方进行上锁 , 实现同一时间内 , 只有一个线程可以对对象进行操作

队列Queue

常用方法

关键字 解释
put(item) 入队 , 将item放入队列中 , 在队列为满时插入值会发生阻塞(1)
get() 出队 , 从队列中移除并返回一个数据 , 在队列为空时获取值会发生阻塞
task_done() 任务结束 , 意味着之前入队的一个任务已经完成。由队列的消费者线程调用
join() 等待完成 , 阻塞调用线程,直到队列中的所有任务被处理掉。
empty() 如果队列为空,返回True,反之返回False
full() 如果队列为满,返回True,反之返回False
qsize() 队列长度 , 返回当前队列的数据量

(1): 阻塞: 程序停在阻塞的位置 , 无法继续执行

导入和实例化

import queue
q = queue.Queue(4)	# 实例化队列对象, 并设置最大数据量

put() 和 get()

q.put('a')
q.put('b')
print(q.get()) # : a
print(q.get()) # : b
q.task_done() # get后必须要加task_done,确认get操作是否完成
q.put(1)  # 当前队列已满,再次put就会阻塞
print(q.full()) # 由于已经阻塞, 所以这段不会被执行
# put会在队列慢了点时候,在插入值会发生阻塞
# get会在队列里没有值的时候,会发生阻塞

empty()

print(q.empty()) # 判断队列是否为空: True
q.put('test')
print(q.empty()) # : False

qsize()

print(q.qsize()) # 当前队列里有多少人: 1

full()

q.put(1)
q.put(1)
q.put(1)
print(q.full()) # : True

join()

print('testetsetset')
q.join() # join会在队列非空时发生阻塞
print('done') # 由于已经阻塞, 所以这段不会被执行

线程池

池的概念

线程池中实现准备好了一些可以重复使用的线程 , 等待接受任务并执行

主线程提交任务给 线程池 , 线程池中的每个线程会一次一个的接收任务并执行 , 直到主线程执行结束

主线程: 相当于生产者,只管向线程池提交任务。
并不关心线程池是如何执行任务的。
因此,并不关心是哪一个线程执行的这个任务。

线程池: 相当于消费者,负责接收任务,
并将任务分配到一个空闲的线程中去执行。

自定义线程池

import queue
import threading
import time


class ThreadPool: # 自定义线程池

 def __init__(self, n): # 主线程做

 self.queue_obj = queue.Queue()
 for i in range(n):
  threading.Thread(target=self.worker, daemon=True).start() # 给子线程worker设置为守护模式

 def worker(self): # 子线程做,由于Debug调试的只是主线程的代码,所以在调试时看不到子线程执行的代码
 """线程对象,写while True 是为了能够一直执行任务。"""
 while True: # 让线程执行完一个任务之后不会死掉,主线程结束时,守护模式会让worker里的死循环停止
  func = self.queue_obj.get() # get已经入队的任务, 这里会接收到主线程分配的func
  # 由于设置了守护模式,当队列为空时,不会一直阻塞在get这里
  # 有了守护模式,worker会在主线程执行完毕后死掉
  func() # 将队列里的任务拿出来调用
  """
  这里func与task_done的顺序非常重要,如果func放在task_done后面的话会出现只执行两次就结束。
  """
  self.queue_obj.task_done() # task_done 会刷新计数器
  # 线程池里有一个类似计数器的机制,用来记录put的次数(+1),每一次task_done都会回拨一次记录的次数(-1)
  # 当回拨完计数器为0之后,就会执行join

 def apply_async(self, func): # 主线程做
 """向队列中传入需要执行的函数对象"""
 self.queue_obj.put(func) # 将接收到的func入队

 def join(self): # 主线程做
 """等待队列中的内容被取完"""
 self.queue_obj.join() # 队列里不为空就阻塞,为空就不阻塞

简单使用

def task1(): # 子线程做
 time.sleep(2)
 print('task1 over')

def task2(): # 子线程做
 time.sleep(3)
 print('task2 over')

P = ThreadPool(2) # 如果在start开启线程之后没有传入任务对象,worker里的get会直接阻塞
P.apply_async(task1)
P.apply_async(task2)

print('start')
P.join()
print('done')

# Result: 
start
task1 over
task2 over
done

如果get发生阻塞意味着队列为空,意味着join不阻塞,意味着print('done')会执行,
意味着主线程没有任务在做,意味着主线程结束,意味着不等待设置了守护的线程执行任务,
意味着子线程会随着主线程的死亡而死亡,这就是为什么会设置守护模式。

如果没有设置守护模式意味着get发生阻塞,意味着子线程任务执行不完,意味着主线程一直要等子线程完成,
意味着程序一直都结束不了,意味着程序有问题

python内置线程池

原理

  1. 创建线程池
  2. 将任务扔进去
  3. 关闭线程池
  4. 等待线程任务执行完毕

 '''手动实现线程池:
主要是配合队列来进行实现,我们定义好一个队列对象,然后将我们的任务对象put到我们的队列对象中,
然后使用多线程,让我们的线程去get队列种的对象,然后各自去执行自己get到的任务,
这样的话其实也就实现了线程池
'''

使用方法

from multiprocessing.pool import ThreadPool
import time

pool = ThreadPool(2) # 直接使用内置线程池, 设置最大线程数

def task1():
 time.sleep(2)
 print('task1 over')

def task2(*args, **kwargs):
 time.sleep(3)
 print('task2 over', args, kwargs)

pool.apply_async(task1)
pool.apply_async(task2, args=(1, 2), kwds={'a': 1, 'b': 2})
print('Task Submitted')
pool.close() # 要点: close必须要在join之前, 不允许再提交任务了
pool.join()
print('Mission Complete')

# Result:
Task Submitted
task1 over
task2 over (1, 2) {'a': 1, 'b': 2}
Mission Complete

其他操作

操作一: close - 关闭提交通道,不允许再提交任务

操作二: terminate - 中止进程池,中止所有任务

相关推荐

末点 / 0评论 2020-06-27