chaigang 2020-06-13
死锁:只上锁不解锁容易造成死锁现象
互斥锁:加一把锁就对应解一把锁,形成互斥锁
递归锁:用于解决死锁,只是一种应急的处理方法
from threading import RLock
从语法上讲,锁可以互相嵌套,但不要使用
不要因为逻辑问题让上锁分成两次,导致死锁
from queue import Queue put 存 get 取 put_nowait 存,超出了队列长度,报错 get_nowait 取,没数据取不出来,报错 linux windows 线程中put_nowait,get_nowait都支持
(1) Queue
先进先出,后进后出
(2)LifoQueue
先进后出,后进先出(按照栈的特点设计)
from queue import LifoQueue lq = LifoQueue(3) lq.put(11) lq.put(22) lq.put(33) # print(lq.put_nowait(444)) print(lq.get()) print(lq.get()) print(lq.get())
(3)PriorityQueue
按照优先级顺序排序(默认从小到大排序)
from queue import PriorityQueue # 1.如果都是数字,默认从小到大排序 pq = PriorityQueue() pq.put(13) pq.put(3) pq.put(20) print(pq.get()) print(pq.get()) print(pq.get()) # 2.如果都是字符串 """如果是字符串,按照ascii编码排序""" pq1 = PriorityQueue() pq1.put("chinese") pq1.put("america") pq1.put("latinos") pq1.put("blackman") print(pq1.get()) print(pq1.get()) print(pq1.get()) print(pq1.get()) # 3.要么全是数字,要么全是字符串,不能混合 error """ pq2 = PriorityQueue() pq2.put(13) pq2.put("aaa") pq2.put("拟稿") """ pq3 = PriorityQueue() # 4.默认按照元组中的第一个元素排序 pq3.put( (20,"wangwen") ) pq3.put( (18,"wangzhen") ) pq3.put( (30,"weiyilin") ) pq3.put( (40,"xiechen") ) print(pq3.get()) print(pq3.get()) print(pq3.get()) print(pq3.get())
# 线程池 # 实例化线程池 ThreadPoolExcutor (推荐5*cpu_count) # 异步提交任务 submit / map # 阻塞直到任务完成 shutdown # 获取子线程的返回值 result # 使用回调函数 add_done_callback # 回调函数 就是一个参数,将这个函数作为参数传到另一个函数里面. 函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数 # 线程池 是由子线程实现的 # 进程池 是由主进程实现的
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time def func(i): print("任务执行中... start" , os.getpid()) time.sleep(10) print("任务结束... end" , i) return i
(1) ProcessPoolExecutor 进程池基本使用
"""默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源""" if __name__ == "__main__": lst = [] # print(os.cpu_count()) # 8 cpu逻辑核心数 # (1) 创建进程池对象 """进程池里面最多创建os.cpu_count()这么多个进程,所有任务全由这几个进程完成,不会额外创建进程""" p = ProcessPoolExecutor() # (2) 异步提交任务 for i in range(10): res = p.submit(func,i) lst.append(res) # (3) 获取当前进程池返回值 # for i in lst: # print(i.result()) # (4) 等待所有子进程执行结束 p.shutdown() # join print("主程序执行结束....")
(2) ThreadPoolExecutor 线程池的基本用法
"""默认如果一个线程短时间内可以完成更多的任务,就不会创建额外的新的线程,以节省资源""" from threading import current_thread as cthread def func(i): print("thread ... start" , cthread().ident,i) time.sleep(3) print("thread ... end" , i ) return cthread().ident if __name__ == "__main__": lst = [] setvar = set() # (1) 创建线程池对象 """限制线程池最多创建os.cpu_count() * 5 = 线程数,所有任务全由这几个线程完成,不会额外创建线程""" tp = ThreadPoolExecutor()# 我的电脑40个线程并发 # (2) 异步提交任务 for i in range(100): res = tp.submit(func,i) lst.append(res) # (3) 获取返回值 for i in lst: setvar.add(i.result()) # (4) 等待所有子线程执行结束 tp.shutdown() print(len(setvar) , setvar) print("主线程执行结束 ... ")
(3)线程池 map
from concurrent.futures import ThreadPoolExecutor from threading import current_thread as cthread from collections import Iterator def func(i): # print("thread start ... ",cthread().ident) # print("thread end ... ",i) time.sleep(0.5) return "*" * i if __name__ == "__main__": setvar = set() lst = [] tp = ThreadPoolExecutor(5) # map(自定义函数,可迭代性数据) 可迭代性数据(容器类型数据,迭代器,range对象) it = tp.map(func,range(20)) # 判定返回值是否是迭代器 print(isinstance(it,Iterator)) tp.shutdown() for i in it: print(i)
把函数当成参数传递给另外一个函数
在当前函数执行完毕之后,最后调用一下该参数(函数),这个函数就是回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread as cthread import os,time def func1(i): print("Process start ... ",os.getpid()) time.sleep(0.5) print("Process end ... ",i) return "*" * i def func2(i): print("thread start ... ",cthread().ident) time.sleep(0.5) print("thread end ... ",i) return "*" * i def call_back1(obj): print("<==回调函数callback进程号:===>",os.getpid()) print(obj.result()) def call_back2(obj): print("<==回调函数callback线程号:===>",cthread().ident) print(obj.result())
(1) 进程池的回调函数 : 由主进程执行调用完成
if __name__ == "__main__": p = ProcessPoolExecutor(5) for i in range(1,11): res = p.submit(func1,i) # 进程对象.add_done_callback(回调函数) ‘‘‘ add_done_callback 可以把res本对象和回调函数自动传递到函数里来 ‘‘‘ res.add_done_callback(call_back1) p.shutdown() print("主进程执行结束 ... " , os.getpid())
(2) 线程池的回调函数: 由当前子线程执行调用完成
if __name__ == "__main__": tp = ThreadPoolExecutor(5) for i in range(1,11): res = tp.submit(func2,i) # 进程对象.add_done_callback(回调函数) ‘‘‘ add_done_callback 可以把res本对象和回调函数自动传递到函数里来 ‘‘‘ res.add_done_callback(call_back2) tp.shutdown() print("主线程执行结束 ... " , cthread().ident)
(3) 回调函数原型
# add_done_callback 原型 class Ceshi(): def add_done_callback(self,func): print("执行操作1 ... ") print("执行操作2 ... ") func(self) def result(self): return 123456 def call_back3(obj): print(obj) print(obj.result()) obj = Ceshi() obj.add_done_callback(call_back3)
先安装 gevent模块
#协程也叫纤程: 协程是线程的一种实现. 指的是一条线程能够在多任务之间来回切换的一种实现. 对于CPU、操作系统来说,协程并不存在. 任务之间的切换会花费时间. 目前电脑配置一般线程开到200会阻塞卡顿. #协程的实现 协程帮助你记住哪个任务执行到哪个位置上了,并且实现安全的切换 一个任务一旦阻塞卡顿,立刻切换到另一个任务继续执行,保证线程总是忙碌的,更加充分的利用CPU,抢占更多的时间片 # 一个线程可以由多个协程来实现,协程之间不会产生数据安全问题 #协程模块 # greenlet gevent的底层,协程,切换的模块 # gevent 直接用的,gevent能提供更全面的功能
(1) 协程的具体实现
switch 遇到阻塞时,只能手动调用该函数进行函数切换,不能自动实现切换,来规避io阻塞;
from greenlet import greenlet import time def eat(): print("eat 1") g2.switch() time.sleep(3) print("eat 2") def play(): print("play one") time.sleep(3) print("play two") g1.switch() g1 = greenlet(eat) g2 = greenlet(play) g1.switch()
(3) gevent
gevent 可以自动切换,但是不能够自动识别time.sleep这样的阻塞
import gevent def eat(): print("eat 1") time.sleep(3) print("eat 2") def play(): print("play one") time.sleep(3) print("play two") # 利用gevent.spawn创建协程对象g1 g1 = gevent.spawn(eat) # 利用gevent.spawn创建协程对象g2 g2 = gevent.spawn(play) # 阻塞,必须g1协程执行完毕为止 g1.join() # 阻塞,必须gg协程执行完毕为止 g2.join() print("主线程执行完毕 ... ")
(4) gevent,time 添加阻塞,让他实现自动切换
def eat(): print("eat 1") gevent.sleep(3) print("eat 2") def play(): print("play one") gevent.sleep(3) print("play two") # 利用gevent.spawn创建协程对象g1 g1 = gevent.spawn(eat) # 利用gevent.spawn创建协程对象g2 g2 = gevent.spawn(play) # 阻塞,必须g1协程执行完毕为止 g1.join() # 阻塞,必须gg协程执行完毕为止 g2.join() print("主线程执行完毕 ... ")
(5) 终极大招 彻底解决不识别阻塞的问题
from gevent import monkey monkey.patch_all() # 把下面所有引入的模块中的阻塞识别一下 import time import gevent def eat(): print("eat 1") time.sleep(3) print("eat 2") def play(): print("play one") time.sleep(3) print("play two") # 利用gevent.spawn创建协程对象g1 g1 = gevent.spawn(eat) # 利用gevent.spawn创建协程对象g2 g2 = gevent.spawn(play) # 阻塞,必须g1协程执行完毕为止 g1.join() # 阻塞,必须gg协程执行完毕为止 g2.join() print("主线程执行完毕 ... ")
(1) spawn(函数,参数1,参数2,参数3 .... ) 启动协程 (2) join 阻塞,直到某个协程任务执行完毕之后,再放行 (3) joinall 等待所有协程任务都执行完毕之后,在放行 g1.join() g2.join() <=> gevent.joinall( [g1,g2] )(推荐:比较简洁) (4) value 获取协程任务中的返回值 g1.value g2.value 获取对应协程中的返回值
(1) 利用协程爬取数据
requests 抓取页面数据模块 HTTP 状态码 200 ok 404 not found 400 bad request
(2) 基本语法
from gevent import monkey ; monkey.patch_all() import time import gevent import requests """ response = requests.get("http://www.baidu.com") print(response) # 获取状态码 print( response.status_code ) # 获取网页中的字符编码 res = response.apparent_encoding print( res ) # 设置编码集,防止乱码 response.encoding = res # 获取网页当中的内容 res = response.text print(res)
def get_url(url): response = requests.get(url) if response.status_code == 200: # print(response.text) time.sleep(0.1)
(3) 正常爬取
starttime = time.time() for i in url_list: get_url(i) endtime = time.time() print("执行时间:" ,endtime - starttime ) import re strvar = ‘<img lz_src="http://i5.7k7kimg.cn/cms/cms10/20200609/113159_2868.jpg"‘ obj = re.search(r‘<img lz_src="(.*?)"‘,strvar) print(obj.groups()[0])
(4) 用协程的方式爬取数据
lst = [] starttime = time.time() for i in url_list: g = gevent.spawn(get_url,i) lst.append(g) gevent.joinall(lst) endtime = time.time() print("执行时间:" ,endtime - starttime ) # 执行时间: 2.3307271003723145
利用多进程,多线程,多携程可以让服务器运行速度更快
并且也可以抗住更多用户的访问