更多优质内容
请关注公众号

从IO模型到协程(五) python中的协程(coroutine)-阿沛IT博客

正文内容

从IO模型到协程(五) python中的协程(coroutine)

栏目:Python 系列:从IO模型到协程系列 发布时间:2020-11-14 13:18 浏览量:4306

在观看本节之前,需要大家理解什么是生成器以及生成器是如何工作的,不了解生成器的朋友可以查看本博客有关python迭代器和生成器的文章。

 

一、什么是协程(Coroutine)

python中的协程本质是一个生成器(早期是使用生成器作为协程),但是和普通的生成器不同。

普通的生成器会在生成器函数中出现yield但是不会对yield代码进行赋值;而协程的生成器函数中的yield一般会进行赋值。例如:
 

# 普通的生成器函数
def gen_comm():
	yield 1
	
# 协程的生成器函数
def gen_coroutine():
	a = yield 1
	print(a)

我们知道 next(generator) 会切换到生成器函数的环境中开始(或者继续)执行生成器函数中的代码。而 yield 会暂停生成器函数中的代码执行并切换到调用方环境(即全局环境)执行代码。

上述是我们已知的关于生成器的知识。

 

接下来我们把生成器函数内的代码程序统称为子程序(而实际上函数内的代码我们都可以叫做子程序,而函数外的全局环境下的代码我们可以叫做主程序)。

接下来将介绍几个协程的方法来帮助大家理解协程。

实例1:send()方法向yield表达式发送数据

# coding=utf-8

# 创建一个协程函数(协程的生成器函数,我把它叫做协程函数,但它不是协程)
def simple_coroutine():
    print("开始协程")
    x = yield
    print("接收到数据x为: " + str(x))

coro = simple_coroutine()       # 实例化一个协程(其实就是个生成器)
output = next(coro)             # 调用next()开始子程序代码的运行,并将yield产出的值赋给output
print("协程产出了", output)       # None
coro.send(100)                  # 主程序向子程序发送数据

运行结果如下:
开始协程
协程产出了 None
接收到数据x为: 100

最后抛出StopIteration异常

 

代码分析:
代码的关键在于 x = yield 这行。实际上,我们可以将这1句代码拆解为两句:
x = yield  拆解为 yield None && x = recv(data)        # 这两句代码是我写的伪代码,但是有利于协程运行过程的理解

正式开始代码的运行流程:

阶段0:当主程序执行coro = simple_coroutine()的时候, simple_coroutine 函数中的代码(子程序)没有运行,而是返回一个协程(协程对象)。
阶段1:当代码运行到next(coro)时候,主程序的代码暂停运行,并切换到子程序中开始运行子程序的代码。当运行到 x = yield 时,yield 产出一个None给主程序(yield 后面不接表达式相当于 yield None),然后系统又切换回主程序把None赋给output(执行了yield None没有执行x=recv(data)

阶段2:主程序执行到了 coro.send(100), send的作用有两个:一是切换到子程序,二是把send()中的数据赋值给子程序的x,然后把子程序中下一个yield的值返回给主程序(本例中由于只有一个yield,所以后面直接抛StopIteration)

阶段3:子程序运行完 print("接收到数据x为: " + str(x))后,return空,抛出StopIteration异常。子程序结束,回到主程序,主程序也结束。


整个过程是一个单线程在子程序和主程序切换运行的过程,不存在阻塞。子程序或者主程序会暂停,但是整个线程是没有暂停过工作的。子程序暂停了就说明线程在运行主程序,主程序暂停了就说明线程在运行子程序。因此协程的含义就是协作的程序,子程序和主程序之间协作完成任务,由单线程来进行(多个)子程序和主程序的切换与调度。

协程中的数据通信
子程序通过yield把数据传递给主程序,主程序通过send把数据传给子程序。有点类似于通过socket建立连接传递数据,而多线程则通过消息队列传递数据。
相比于多线程的数据通信,协程的这种在子程序和主程序切换的时候交换数据,不用考虑线程安全和上锁,效率比多线程高了不少。

协程可以说在单线程的情况下把cpu用到了淋漓尽致,但是毕竟是单线程,无法使用到多核运行,这个我们后面再说。

 

协程的四个状态和send的注意点。

协程有四个状态:
GEN_CREATED(等待开始执行):主程序调用next(gen)前,子程序还没开始运行的状态
GEN_RUNNING(协程正在运行):主程序调用next(gen)后,子程序的代码正在执行的状态
GEN_SUSPENDED(在yield处暂停):执行到yield后切换到主程序的状态,此时子程序停止运行
GEN_CLOSE(执行结束):子程序执行到return后结束的状态

可以在主程序中调用inspect.getgeneratorstate(gen)方法查看协程状态。

注意:除了可以用next()对协程进行首次激活,让子程序的代码开始运行之外;用gen.send(None)也可以,但是只能传None,如果协程处于GEN_CREATED的状态下调用send()方法且传入参数不为None的话就会报错

 

实例2:产出两个值的协程

# coding=utf-8

from inspect import getgeneratorstate

# 创建一个协程函数
def simple_coro2(a):
    print("协程开始运行, 传入a:%s" % str(a))
    b = yield a
    print("协程接收到数据b:%s" % str(b))
    c = yield a + b
    print("协程接收到数据c:%s" % str(c))

coro2 = simple_coro2(10)    # 返回一个协程

print(getgeneratorstate(coro2))     # “GEN_CREATED”

print(next(coro2))      # “协程开始运行, 传入a:10” 和 “10”

print(getgeneratorstate(coro2))     #  “GEN_SUSPENDED” 此时一定是暂停状态而不是运行状态,因为能回到主程序执行代码说明子程序已经经历了暂停和切换。要时刻记得这是个单线程而不是多线程

print(coro2.send(20))   # “协程接收到数据b:20”和“30” ,30是send()的返回值,是yield a+b产出的值

coro2.send(100)     # “协程接收到数据c:100”

# 最后抛出 “StopIteration”异常

上面的每个注释是每一句代码的运行结果。

 

实例3:使用协程计算移动平均值

# coding=utf-8


# 计算移动平均值
def averager():
    total = 0    # 数据值总和
    count = 0    # 传进协程的数据个数
    average = None

    while True:
        number = yield average    # 用number接收主程序发送的数据
        count += 1
        total += number
        average = total/count

avg = averager()    # 返回一个协程
next(avg)           # 激活协程,返回None
print(avg.send(10))     # “10”
print(avg.send(20))     # "15"
print(avg.send(33))     # "21"

在协程函数中用了while无限循环,意味着只要主程序一直传值给协程,协程就会不断接收值并产出结果。
虽然协程会因为while True而无法结束,但是主程序运行完了之后整个程序的所有引用都被垃圾回收,也包括协程的引用,所以此时协程会随着主程序的结束而结束。

注意:第一个next(avg)是不会给number发送任何值的,因为第一个next没有让协程运行到yield 左边的赋值就已经暂停了。如果再调用第二个next(),此时number就会被赋值为None(相当于send(None))。


现在这个程序还存在两个问题:
1.无法在主程序中停止协程(因为协程中有个死循环,协程无法自己结束),我希望在主程序发送一个结束协程的通知给子程序
2.我希望能够把最终的平均值average返回给主程序,注意,我的意思不是通过yield产出average给主程序,而是我已经求完所有数的平均值了,想让子程序把最终的average给return出来。但是我们在学生成器的时候知道,生成器的return的内容是无法在主程序中获取到的。

 

在解决这两个问题之前,我有一个小需求,我希望能在调用协程函数的时候自动预先激活协程函数中的代码运行,而无需在主程序中手动调用next()进行激活。

可以使用装饰器解决,思路是用一个装饰器装饰 averager 这个协程函数,使得装饰器返回一个有自动激活功能的协程函数。

 

实例4:会自动激活的协程

def actived(coro_func):
	def autoActiveCoro(*args, **kwargs):
		corotinue = coro_func()		# 返回一个协程
		next(corotinue)
		return corotinue
		
	return autoActiveCoro   	# 返回改造后的协程函数
	
@actived
def averager():		# 在定义 averager 函数后自动隐式执行了 averager = actived(averager)
    total = 0    
    count = 0    
    average = None

    while True:
        number = yield average    
        count += 1
        total += number
        average = total/count
		
avg = averager()	# 创建协程avg时自动激活
print(avg.send(10))
print(avg.send(20))
print(avg.send(30))

 

 

二、协程的终止和异常处理

例子5:未处理的异常会导致协程终止

# 承接上面的程序
avg.send("hello")	# 由于字符串不能进行数值运算,会引起协程内代码异常(TypeError),此时协程会终止。
avg.send(100)	# 尝试激活已终止的协程会抛出 StopIteration 异常

这个例子告诉我们,可以通过让协程发生异常来终止协程。

协程提供了 throw和close 两个方法,显式的把异常发送给协程。


gen.throw(exc_type, exc_value)
throw方法会使生成器在暂停的yield处抛出指定异常。
如果协程中没有处理这个异常(没有进行try),则协程会把这个异常向上冒泡到主程序,协程终止。
如果协程处理了这个异常,则协程会往下一个yield运行,并且下一个yield的产出值会产出给thow()作返回值


gen.close()
throw方法会使生成器在暂停的yield处抛出GeneratorExit异常。
协程中无需对这个异常进行处理(无需try和except),此时协程会直接终止,不会再向下一个yield执行。
如果协程中处理了GeneratorExit异常(try捕获到了GeneratorExit异常),生成器一定不能运行到下一个yield,否则会抛出RuntimeError异常。

 

例子6:终止协程

# coding=utf-8

from sys import maxsize

# 为做实验而自定义的异常
class DemoException(Exception):
    pass

# 定义自动激活装饰器
def actived(func):
    def autoActivedCoro(*args, **kwargs):
        coro = func(*args, **kwargs)
        res = next(coro)        # res是0,由协程yiled产出
        return coro

    return autoActivedCoro

# 传入n限定产出的次数
@actived
def coro_exec_demo(n = None):
    n = n if n else maxsize

    for i in range(n):
        try:
            recv_data = yield i
        except:
            print("yield处发生异常")
        else:
            print("接收到数据 %s " % str(recv_data))

    print("协程结束")


coro = coro_exec_demo(3)    # 创建协程并自动激活,限定yield 3次,自动激活已经yield 1次了
print(coro.send(100))       # yield 产出 1
print(coro.throw(DemoException))     # 向协程发送一个普通异常,throw直接接收到yield 2
print(coro.send(200))       # 协程最终跳出循环,抛出 StopIteration

结果:
接收到数据 100 
1
yield处发生异常
2
接收到数据 200 
协程结束
Traceback (most recent call last):
  File "D:/2编程学习笔记/python笔记/python入门2/协程/example5.py", line 34, in <module>
    print(coro.send(200))       # 协程最终跳出循环,抛出 StopIteration
StopIteration

 

# 如果是使用close()
coro = coro_exec_demo(3)    
print(coro.send(100))       
print(coro.close())
print(coro.send(200)) 

结果:
接收到数据 100 
1
yield处发生异常
yield处发生异常
协程结束
Traceback (most recent call last):
  File "D:/2编程学习笔记/python笔记/python入门2/协程/example5.py", line 34, in <module>
    print(coro.close())
RuntimeError: generator ignored GeneratorExit


发生RuntimeError异常是因为 GeneratorExit异常被协程捕获到了。正常的做法应该是不允许协程捕获GeneratorExit异常的。

for i in range(n):
	try:
		recv_data = yield i
	except DemoException as e:		# 改进:只捕获DemoException异常,避免捕获到GeneratorExit
		print("yield处发生异常")
	else:
		print("接收到数据 %s " % str(recv_data))
		
# ------------------------------------

coro = coro_exec_demo(3)    # 创建协程并自动激活,限定yield 3次,自动激活已经yield 1次了
print(coro.send(100))       # yield 产出 1
print(coro.throw(ZeroDivisionError))     # 向协程发送一个ZeroDivisionError异常,协程捕获不到,报错并直接终止
print(coro.send(200))       # 这句已经执行不到了

 

 

三、让协程返回值

我们知道协程函数的return是不会把返回值返回给主程序的。当协程运行到return就会抛出一个 StopIteration

但是实际上return的内容被保存在了 StopIteration 异常的value值中。我们可以通过在主程序使用try捕获StopIteration异常从而拿到协程的返回值。

下面我们改写一下 averager() 函数让他在程序结束的时候返回最终的平均值

 

实例7:让协程返回值

def actived(coro_func):
    def autoActiveCoro(*args, **kwargs):
        corotinue = coro_func()
        next(corotinue)
        return corotinue

    return autoActiveCoro


@actived
def averager():
    total = 0
    count = 0
    average = None

    while True:
        number = yield      # 不产出任何值给主程序
        if number == None:  # 当主程序发送None给协程时,跳出循环
            break
        count += 1
        total += number
        average = total / count

    return average


avg = averager()  # 创建协程avg时自动激活
avg.send(10)
avg.send(20)
avg.send(30)
try:
    avg.send(None)  # 会抛出StopIteration异常
except StopIteration as e:
    print("平均值结果为:%s" % e.value)

但是这种通过捕获 StopIteration 异常来获取协程返回值的方法太low了。

为了能够让开发者能更优雅的获取到协程的返回值,python提供了 yield from 结构。
 

四、yield from 结构 

yield from 语法是一种非常难理解的语法结构。yield from后面必须接一个可迭代对象。

由于生成器和协程本身也是一个可迭代对象,所以,yield from 后面也可以接一个协程对象。

现在看两个例子:

例子8:使用 yield from 代替 for 循环遍历可迭代对象

#coding=utf-8

# chain函数可以传入多个可迭代对象,并将这些可迭代对象中的元素统一产出
def chain(*iterables):
    for iterable in iterables:
        for item in iterable:
            yield item

# 使用yield from改进chain函数
def improved_chain(*iterables):
    for iterable in iterables:
        yield from iterable

a_list = [1,2,3]
a_dict = {"a":"A", "b":"B"}
a_tuple = (8,9,0)

for i in chain(a_list, a_dict, a_tuple):
    print(i)

for i in improved_chain(a_list, a_dict, a_tuple):
    print(i)
	

 

上面的代码中 chain 和 improved_chain 的效果是一模一样的。这两个函数都是生成器函数(但不是协程函数)

例子8介绍了 yield from 后接普通的可迭代对象的用法。

 

下面这个例子9则介绍 yield from 后接协程的用法,该例子来自于《流畅的Python》一书中的实例。为了能更好的介绍 yield from 接协程的用法,作者提出了几个概念:

1. 委派生成器(委派协程):
包含 yield from <corotinue> 语法的生成器函数(其实也是个协程);这个函数相当于是一个中间通道,为子生成器和调用方传递信息。

2.子生成器(子协程):
真正为调用方提供服务的协程

3.调用方(主程序):
驱动协程工作的程序

场景和目标:
现在有一个字典,这个字典记录了一个班男女生的身高和体重,现在我要使用前面的averager() + yield from创建协程计算男女生的身高体重的平均值。

例子9(重要):使用yield from让调用方(主程序)和协程(子生成器)进行通信

# coding=utf-8

# 子生成器函数(协程函数)
def averager():
    count = 0
    total = 0
    average = None

    while True:
        number = yield      # 只接收数据,不产出数据
        if number == None:  # 如果发送过来的数据为None则跳出循环终止协程
            break
        total += number
        count += 1
        average = total / count

    return count, average

# 委派生成器函数, 需要传入一个全局的results保存求得的平均值结果
def grouper(key):
    while True:
        results[key] = yield from averager()        # while每循环一次,都会创建一个新的averager子生成器(averager协程)


# 调用方,用于驱动协程的运行
def main():
    for key, d in data.items():
        print("创建计算 %s 的协程" % key)
        group = grouper(key)        # 返回一个委派生成器,其实是一个协程
        next(group)                 # 激活group协程, grouper内部会在运行到yield from averager()时创建一个averager协程然后暂停
        for item in d:
            group.send(item)        # 向group协程发送数据,发送的item数据不会赋值给grouper的results[key]。grouper内部会把item数据转发给averager协程的number = yield。

        # 发送完一组数据的所有项时,通知average协程终止。
		# PS:send(None)后,averager协程会终止,并且将返回值return给grouper的results[key]。group协程会继续下一个while,运行到yield from会生成一个新的average协程并暂停切换到调用方。
        group.send(None)
        print("用于计算 %s 的协程终止" % key)

    print("所有计算结束,计算结果如下:", results)



# 学生身高体重数据
data = {
	"boy_weight": [130,125,135,160,120,170,149],
	"boy_height": [166,170,190,185,179,168,172],
	"girl_weight": [100,97,88,120,140,102,90,89,112],
	"girl_height": [160,165,155,170,174,167,157,149,170]
}

# results用于记录结果
results = {}

if __name__ == "__main__":
    main()
	

 

上面的程序中,对data中的每一行数据创建了一个委派生成器(group协程),每个group生成器内部创建了一个子生成器(averager协程)用于计算平均值。所以一共创建了4个group协程和4个averager协程。
group协程相当于一个代理,负责将调用方send的数据转发给averager协程,并由averager协程进行工作运算,group协程则不负责工作。
当averager运行到return的时候,return的值会传给grouper中的results[key],这样grouper才完成1次while循环(其实yield from内部也用了 while 无限循环)。
上例中,每一个group协程其实也就完成了1次while循环就被销毁然后创建了新的group协程用来处理下一行数据。

当然,看到这里大家可能还是云里雾里的,这是因为我们还不知道 yield from 的内部到底做了什么。

 

result = yield from I 这句话到底做了什么事情?

在《流畅的Python》一书中,作者用一段伪代码阐述了 result = yield from I (I表示一个可迭代对象,可能是协程也可能不是协程)的内部实现:

# 下面假设I是一个协程
_i = iter(I)	# 调用I的__iter__魔术方法,返回一个可迭代对象,如果I是协程,则_i就是I协程本身
try:
	_y = next(_i)	# 预激活协程,获取第一个产出值_y
except StopIteration as _e:		# 如果第一次next协程就结束则把协程I中return的值返回给_r
	_r = _e.value
else:	# 如果协程没结束,则开始无限循环send数据给协程I
	while 1:
		try:
			_s = yield _y	# 把协程I的产出值返回给调用方,并用_s接收调用方send()发送的数据
		except GeneratorExit as _e:	# 如果调用方调用了close()方法
			try:
				_m = _i.close	# 就尝试获取_i的close方法(用try包住_m=_i.close是防止_i不是一个协程,而只是一个普通的迭代器,此时_i是没有close方法的,就会报错)
			except AttributeError:	# 如果_i不是一个协程则忽略调用方的close方法
				pass
			else:
				_m() 	# 如果_i是一个协程,则调用close方法通知子生成器(协程I)结束
			
			raise _e	# 抛出GeneratorExit异常结束委派生成器这个协程,raise _e这个异常是不会让调用方报错的,因为这是个GeneratorExit异常,协程会自动处理它。
		
		except BaseException as _e:	# 如果调用方调用了throw()方法向委派生成器这个协程发送异常
			_x = sys.exc_info()		# 获取具体是什么异常
			try:	# 这个try也是为了防止_i不是一个协程没有throw方法。
				_m = _i.throw
			except AttributeError:	# 如果_i不存在throw方法就直接抛出_x异常
				raise _e
			else:	# 如果_i有throw方法则向子生成器I发送_x异常
				try:
					_y = _m(*_x)	# 接收I的产出给_y
				except StopIteration as _e:		# 如果协程I结束,则接收协程I的返回值
					_r = _e.value	
					break
		else:	# 如果调用方调用send()让变量s接收到了,委派生成器会把数据s原封不动的转发给协程I
			try: 
				if _s is None:
					_y = next(_i)	# 相当于 _i.send(None)
				else:
					_y = _i.send(_s)
			except StopIteration as _e:     # 如果协程I执行到return结束会将StopIteration往上抛到_y = _i.send(_s)并被try捕获到
				_r = _e.value
				break
				
result = _r		# 将子协程I的返回值返回给调用方

 

这段代码揭示了 yield from 的本质,yield from的存在使得委派生成器本身也是一个协程(因为yield from内部使用了yield),yield from充当了调用方和子生成器(子协程)的中间通道,它会把调用方send()的数据转发给子生成器,把子生成器yield的产出值再yield给调用方。

为什么需要用yield from呢,我不要这个中间通道,直接让调用方和协程averager通信不行吗?
因为 yield from 里面包含了很多的异常捕获的机制,让开发者专注于逻辑代码的开发而不用花精力去处理一堆的异常。

注意:yield from中包含了yield语法,所以包含了yield from语法的委派生成器函数(例子9中的grouper)也是一个协程函数,也需要用next()激活,结束时也会抛出StopIteration。 所以如果调用方不用next()激活group,它是不会开始运行上面那段伪代码的。还有如果不在grouper中用while True 无限循环的话,当调用方调用 send(None)时,grouper会因运行到return结束委派生成器的协程而抛出StopIteration给调用方(grouper中while True无限循环的作用就是为了让grouper协程不结束,这样grouper就不会抛出StopIteration)。

由于yield from 内部会对子协程进行next()激活,所以无需对子生成器使用装饰器进行自动激活,不然就相当于激活了两次了。

 

在python 3.5以前,python都是通过生成器(yield)来实现协程的。而在之后,python推出了async 和 await这两个关键字来定义协程函数。这样的协程就不是生成器。

async function async_function():
    yield "hello"
    
async function await_function():
    data = await async_function()
    return data

print(await_function())

PS:await后面必须接 Awaitable 对象,也就是可等待对象

 

以上内容描述了协程的创建和使用,但是大家会问,协程虽然很复杂很高级,但是从上面这么多的例子看好像没有什么用呀。
没错,如果单纯使用协程,几乎没有什么能应用到实际的场景。这是因为协程必须搭配事件循环使用,否则只有协程是没有什么用处的。

是否还记得上一节多路复用中并发请求网页的例子(https://zbpblog.com/blog-212.html)。那个例子就是使用 事件循环 + 回调函数调用 的方式完成的并发请求url。
我们学了协程之后就可以把这个例子改写为 事件循环 + 协程,把回调形式的代码改为同步形式的代码。

 

相信大家看到这里已经挺不耐烦的了,不过下面这个例子一定要看完,因为这是本节的精华,前面所有的例子都是为下面这个例子做准备。

 

例子10:协程 + 事件循环请求url

# coding=utf-8

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from urllib.parse import urlparse
import socket
import os

# 使用多路复用 + 事件循环 + 协程的方式实现并发http请求

# 定义一个Future类,一个Future对象代表一个尚未完成的事件A(读事件或者写事件)
# Future对象的作用是存储一个未完成事件的回调函数,并在事件就绪时执行回调
class Future:
    def __init__(self):
        self.callbacks = []      # 存放着事件A对应的回调函数,在事件循环的时候监控到事件A就绪时就会调用这些回调函数(在本例中self.callbacks其实只会存一个回调函数Task.step)

    def runCallback(self):
        for cb in self.callbacks:
            cb()

# 定义一个Task任务类,一次url请求就是一个任务,就会调用一次getUrl方法(就会创建一个协程),也会创建一个任务类
# Task任务类的作用是驱动协程,让协程执行某些需要等待但不会阻塞的操作时暂停(通过yield),在这个操作完成后(事件就绪后)恢复协程运行(通过next)
class Task:
    def __init__(self, coroutine):  # 需要传入一个协程对象
        self.coroutine = coroutine
        self.step()     # 预激活协程

    # step方法的作用:驱动协程恢复运行。只有当事件就绪时,该方法才会被调用,所以准确的说是事件循环驱动step的调用从而驱动协程的恢复。
    def step(self):
        try:
            f = next(self.coroutine)     # 恢复协程,协程恢复后肯定会注册一个新的事件,需要协程把这个新事件(Future对象)产出并交给Task类的f变量接收
        except StopIteration as e:      # 协程运行结束
            return
        f.callbacks.append(self.step)   # 把下一次驱动协程恢复的方法放在等待事件f的回调中,当事件就绪时f.callbacks中的方法step就会被调用,协程就会恢复运行

# 定义一个爬虫类, 这个爬虫类很简单,传入一个url,爬虫负责将这个url的内容获取到即可
class Crawler:
    select = DefaultSelector()        # 定义一个selector对象存在类变量中, 目前在windows环境,所以自动选择select多路复用器
    finished = False                      # 爬取是否结束,该变量用于控制停止事件循环监听,如果爬取完所有url则停止loopEvents()的循环(停止监听事件)

    # urls是要爬取的url列表
    def __init__(self, url):
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)  # 设置为非阻塞状态
        self.url = url
        self.response = b''         # 用于累积保存本次请求接收到的所有数据
        global urls

    # 爬取单个页面, 这是个协程方法
    def getUrl(self):
        # 解析url,构建请求报文
        self.__parseUrl()

        # 连接服务器,这里必须用try
        try:
            self.client.connect((self.host, 80))
        except:
            pass

        # 连接是一个未完成的事件,所以要创建一个Future对象
        f = Future()

        # 注册监听写事件,对于客户端而言,与服务器建立好连接后可视为写就绪,写就绪后就可以发送请求报文
        # self.select.register(self.client, EVENT_WRITE, data=self.__sendReq)  # 这是回调版本的注册事件代码,下面是协程版本的注册事件代码
        self.select.register(self.client, EVENT_WRITE, data=f)  # 这里我把事件的回调设置为f对象,函数引用是对象,f也是一个对象,所以这里是可以这样做的

        # 暂停协程,直到事件就绪,f对象中的Task对象的step方法被调用才会恢复协程
        # 必须先register注册事件,才能yield f暂停协程。如果先暂停就会永远触发不了事件的回调,因为事件都没注册怎么可能监控得到事件呢,监控不到事件怎么去触发它的回调step呢,这样的结果就是协程一直处于暂停状态
        yield f     # 产出Future给Task,Task会存放一个step回调给f对象

        self.select.unregister(self.client)     # 这里有一个注意点,select.unregister解除监听写事件必须在client.send之前,否则会在下面的while循环中报错说socket连接关闭(原因是写事件就绪协程恢复运行去recv接收响应,但是其实读事件没有就绪,所以recv到了空字节,所以执行了后面的client.close()),这里花了我一点时间排雷

        # 运行到这里,说明协程被恢复,写事件就绪,可以发送请求给服务器了
        self.client.send(self.send_msg.encode('utf-8'))  # 发送请求报文

        # 发送请求又是一个还未完成的事件(读事件未就绪),此时又需要创建一个新的Future对象,一个Future对象对应一个事件嘛
        # f = Future()    # 这里的创建f对象包含在了下面的while循环中,所以就把这条冗余代码注释掉

        # 发送请求报文后,就要接收响应,不过要等到读事件就绪(就是说等到服务端把响应发过来,client的内核缓冲区有数据可读的时候)
        # 所以我们可以监听读事件是否就绪(用modify更改监听写事件为监听读事件)
        # self.select.modify(self.client, EVENT_READ, data=f)

        while True:     # 循环读取,响应报文内容可能很多,一次recv无法读取完,每次recv前都要执行一次yield暂停协程等待读事件就绪
            f = Future()
            self.select.register(self.client, EVENT_READ, data=f)       # 为什么每循环一次都要重新注册一次读事件?因为每循环一次都需要往register中传入一个新的Future对象
            yield f  # 暂停协程

            # 协程被恢复,说明客户端读就绪,可以解除上一次监听的读事件并开始接收响应
            self.select.unregister(self.client)
            data = self.client.recv(1024)
            if data:
                self.response += data
            else:  # 数据接收完毕
                self.client.close()  # 关闭连接

                # 获取完url的内容后,删除self.urls中的该url
                urls.remove(self.url)

                self.__class__.finished = False if len(urls) else True

                self.__saveHtml()

                break   # 跳出循环,终止协程

    # 根据url解析主机和爬取的路径,已经构建请求报文
    def __parseUrl(self):
        url_component = urlparse(self.url)
        self.host = url_component.netloc  # url的主机名
        self.path = '/' if url_component.path == '' else url_component.path
        self.send_msg = "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (self.path, self.host)

    # 保存页面到文件
    def __saveHtml(self):
        try:
            dir_path = './crawled_page/'
            fname = 'index.html' if self.path =='/' else self.path.strip('/').strip('.html') + '.html'
            content_arr = self.response.decode('utf-8').split("\r\n\r\n")        # 第一个元素是响应头,应该去掉,只留响应体的内容
            content_arr[0] = ''
            content = ''.join(content_arr)

            if not os.path.isdir(dir_path):
                os.mkdir(dir_path)

            with open(dir_path + fname, 'w', encoding='utf-8') as f:
                f.write(content)
            print("%s 爬取成功" % str(fname))
        except BaseException as e:
            print(e)

        finally:    # 最后无论如何都要销毁自身这个 Crawler 类,不然每请求一个url就创建一个crawler类又不销毁,浪费内存
            del self


    # 循环监听事件(阻塞),在这个类方法中,多路复用器会开始监听所有客户端socket的事件状态;事件就绪后回调函数f.runCallback也是在这个方法里面调用的
    # 回调函数step被调用会恢复协程的运行
    @classmethod
    def loopEvents(cls):
        while not cls.finished:
            events = cls.select.select()     # 监听所有socket的事件,该过程是阻塞的;返回一个包含多个元组的列表
            for key, mask in events:    # key是selectorKey对象,包含该事件的回调函数(data属性,在这里data属性放着的是Future对象),mask是事件的类型,是一个整型
                f = key.data     # 调用register监听client事件时传入的Future对象,一个register的事件对应一个Future
                f.runCallback()     # callback做的事情是恢复协程的运行

if __name__ == "__main__":
    # 定义要爬取的url
    urls = [
        "http://zbpblog.com/blog-192.html",
        "http://zbpblog.com/blog-191.html",
        "http://zbpblog.com/blog-190.html",
        "http://zbpblog.com/",
        "http://zbpblog.com/cate-php",
        "http://zbpblog.com/cate-python",
    ]

    for url in urls:    # 每爬一个url就要创建一个crawler类,创建一个coro协程,创建一个任务对象和多个Future对象(Future对象是在协程中创建的)
        crawler = Crawler(url)
        coro = crawler.getUrl()     # 创建一个协程
        Task(coro)              # 创建一个Task类用于驱动coro协程, Task的__init__已经激活了协程,协程开始运行

    Crawler.loopEvents()    # 开始循环监听事件(事件循环)

# 上面使用了协程之后,连接、发请求、接收响应就都按顺序写在一个代码块里面的,实现了同步的代码编写形式。
# 这个例子由于时IO密集型操作,所以时间基本上都花在了cls.select.select()的阻塞等待,cpu资源其实消耗的很少。像这种io密集型操作很适合用多线程和协程来完成。而协程又比多线程的消耗少,切换速度快(协程切换是函数的切换,而多线程是线程间切换)
# Future和Task这两个概念很重要,之后要学习的asyncio包中也是通过这两个类对象完成驱动协程和事件回调调用的。

这段代码用一张图来描述就是这样:

(从某个协程来看)

(从多个协程来看)

 

上面的程序将连接,发请求和接受响应的详细逻辑都写在了getUrl()中,但是我想将连接,发请求和接收响应这三个动作单独封装起来,使得getUrl中的逻辑一目了然。此时可以使用yield from来优化代码:

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from urllib.parse import urlparse
import socket
import os

class Future:
    def __init__(self):
        self.callbacks = []      

    def runCallback(self):
        for cb in self.callbacks:
            cb()

class Task:
    def __init__(self, coroutine):  
        self.coroutine = coroutine
        self.step()     

    def step(self):
        try:
            f = next(self.coroutine)     
        except StopIteration as e:      
            return
        f.callbacks.append(self.step)   

class Crawler:
    select = DefaultSelector()        
    finished = False                      

    def __init__(self, url):
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)  
        self.url = url
        self.response = b''         
        global urls

    # getUrl 这个委派生成器中创建的了3个yield from 通道用于调用方和3个子生成器的通信。
    # 但是这3个通道不是同时建立的,而是先建立一个完了再建立下一个,同一时刻只有一个通道存在。也就是说同一时刻调用方只和1个子生成器通信(同一时刻调用方只运行1个协程)
    def getUrl(self):
        self.__parseUrl()

        # 先暂停执行连接
        yield from self.__connect()

        # 连接完成后暂停执行发请求
        yield from self.__sendReq()

        # 发送请求后执行接收响应
        yield from self.__recvResponse()

    def __connect(self):
        try:
            self.client.connect((self.host, 80))
        except:
            pass

        f = Future()

        self.select.register(self.client, EVENT_WRITE, data=f)

        yield f

    def __sendReq(self):
        self.select.unregister(self.client)
        
        self.client.send(self.send_msg.encode('utf-8'))

        f = Future()

        self.select.register(self.client, EVENT_READ, data=f)

        yield f

    def __recvResponse(self):
        while True:
            self.select.unregister(self.client)
            data = self.client.recv(1024)


            if data:
                self.response += data
                f = Future()
                self.select.register(self.client, EVENT_READ, data=f)
                yield f
            else:
                self.client.close()

                urls.remove(self.url)

                self.__class__.finished = False if len(urls) else True

                self.__saveHtml()

                break



    def __parseUrl(self):
        url_component = urlparse(self.url)
        self.host = url_component.netloc  
        self.path = '/' if url_component.path == '' else url_component.path
        self.send_msg = "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (self.path, self.host)

    def __saveHtml(self):
        try:
            dir_path = './crawled_page2/'
            fname = 'index.html' if self.path =='/' else self.path.strip('/').strip('.html') + '.html'
            content_arr = self.response.decode('utf-8').split("\r\n\r\n")        
            content_arr[0] = ''
            content = ''.join(content_arr)

            if not os.path.isdir(dir_path):
                os.mkdir(dir_path)

            with open(dir_path + fname, 'w', encoding='utf-8') as f:
                f.write(content)
            print("%s 爬取成功" % str(fname))
        except BaseException as e:
            print(e)

        finally:    
            del self

    @classmethod
    def loopEvents(cls):
        while not cls.finished:
            events = cls.select.select()     
            for key, mask in events:    
                f = key.data     
                f.runCallback()     

if __name__ == "__main__":
    urls = [
        "http://zbpblog.com/blog-192.html",
        "http://zbpblog.com/blog-191.html",
        "http://zbpblog.com/blog-190.html",
        "http://zbpblog.com/",
        "http://zbpblog.com/cate-php",
        "http://zbpblog.com/cate-python",
    ]

    for url in urls:    
        crawler = Crawler(url)
        coro = crawler.getUrl()     
        Task(coro)              

    Crawler.loopEvents()    



这段代码其实用yield from将getUrl变成一个委托生成器作为调用方和子协程(__connect,__sendReq,__recvResponse)的通信通道。

 

上面就使用了 单线程多协程 + 事件循环 + 异步非阻塞IO方法做到的并发爬取url。

下一节将讲述 asyncio 模块,这是一个用于解决异步io高并发编程的模块和集成方案,asyncio 的本质就是使用 协程 + 事件循环+ 异步IO 做到高并发的。在协程中,要求所有的操作都必须是异步非阻塞操作而不能是同步阻塞操作,否则就会阻塞整个单线程而无法做到高并发。(异步的意思是一个操作即使没有完成也可以马上返回值)

 

因为很重要所以这里我再说一次,要让单线程做到高并发,必须要满足3个条件:

事件循环(就是一个while死循环,每次循环都会检测一次是否有fd的事件就绪,没有则会阻塞线程)

多个协程(多个协程之间进行切换做到并发完成任务)

异步非阻塞方法(如果是同步阻塞方法,那么一个协程的阻塞会导致整个线程的阻塞)

 

对比协程和多线程:

1. 多线程是由内核进行线程的调度,而协程则是由我们程序员决定的如何对程序进行调度(协程是用户程序层面,而线程是系统和内核层面),所以协程编程会增加程序逻辑设计的复杂性(需要开发者去写调度协程的算法,比如事件循环等)。
2. 创建和运行一个协程的成本很低,我们可以轻易创建成千上万个协程,但是无法在一个进程中创建这么多个线程,因为创建和销毁一个线程消耗的资源远比创建一个协程高。

3. 线程切换花费的总时间和代价比协程慢比协程高,协程的切换的速度远比线程间切换快。

 




更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > 从IO模型到协程(五) python中的协程(coroutine)

热门推荐
推荐新闻