更多优质内容
请关注公众号

从IO模型到协程(四) 用python实现一个多路复用程序-阿沛IT博客

正文内容

从IO模型到协程(四) 用python实现一个多路复用程序

栏目:其他内容 系列:从IO模型到协程系列 发布时间:2020-11-10 23:27 浏览量:3688

我们将以socket编程模拟http请求和多路复用io模型逐步引入协程

首先,使用socket通信模拟http请求,下面是客户端的请求代码:

# coding=utf-8

import socket

# 使用socket请求我自己的博客的首页
host = "zbpblog.com"
port = 80

# 创建socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 建立tcp连接,三次握手,这个过程是阻塞的,因为连接到对端服务器需要时间
client.connect((host, port))

# 发送请求报文, 请求报文可以不用自己写,直接在chrome浏览器F12即可查看
# 报文内容只需要请求头(GET /xxx HTTP/1.1)和请求行中的Host和Connection字段即可,具体要看你请求的页面要求必须有哪些请求行字段。报文最后必须使用两个\r\n表示请求报文结束,否则运行到下面recv的时候会一直阻塞,因为服务端会认为你的报文没发完(所以服务端就不执行send,所以客户端recv就阻塞住了)

request_msg = "GET / HTTP/1.1\r\nHost: zbpblog.com\r\nConnection: close\r\n\r\n"    # Connection为close,表示使用短连接的方式
# request_msg = "GET / HTTP/1.1\r\nHost: zbpblog.com\r\nConnection: keep-alive\r\n\r\n"    # Connection为keep-alive,表示使用长连接的方式

client.send(request_msg.encode("utf-8"))     # 发送数据, 发送的数据得是字节流而不能是字符串,所以要对字符串编码一下转为byte类型

# 等待服务器返回响应,返回的数据可能大于1024字节,所以要调用多次recv直到接收完全部的数据
data = b""      # 接收到的字节流数据
while True:
    res = client.recv(1024)     # 该recv会阻塞直到服务端的响应到达客户端
	print(res)					
    if res:
        data += res
    else:
        break

print(data.decode("utf-8"))
client.close()

以短连接的方式请求的话,程序运行的结果是:当服务端将响应发送完之后,会再发送一个空消息以表示响应发送完毕(此时服务端会主动关闭连接,也就是在服务端执行client.close()),以此方式告诉客户端可以关闭连接了(此时在客户端执行client.close())。
以长连接的方式请求的话,程序运行的结果是:服务端接收到请求报文后会返回响应(可以通过while循环中的print查看到),但是发送完响应之后不会立刻发送空消息。所以客户端在接收完响应报文之后就又被client.recv()给阻塞住了。此时服务端在等客户端会发送新的请求,直到等待了超时时间的时长,服务端才发送空消息(服务端关闭连接),此时客户端才真正跳出循环关闭连接(客户端执行client.close())。


我们可以做个实验,把接收服务器响应的代码封装起来,我们使用短连接发送两次请求:

def getResponse():
    data = b""      # 接收到的字节流数据
    
    # 等待服务器返回响应,返回的数据可能大于1024字节,所以要调用多次recv直到接收完全部的数据
    while True:
        res = client.recv(1024)     # 该recv会阻塞直到服务端有响应返回

        if res:
            data += res
        else:
            break

    return data.decode("utf-8")

client.send(request_msg.encode("utf-8"))     # 发送第一次请求
print(getResponse())						 # 获取响应,由于是短连接,所以服务端返回响应后就主动关闭了客户端的套接字,所以下面的请求服务端就接收不到了

print("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")

client.send(request_msg.encode("utf-8"))
print(getResponse())						 # 第二次getResponse不会阻塞,马上会得到返回,因为服务端压根就没接收到请求


还有一点要注意,客户端连接到服务端(connect()方法)是需要时间的,所以如果要使用非阻塞io编程的话不要connect之后马上就send发送请求,否则会报错,报错的原因是客户端还没有建立与服务端的连接,所以肯定发不了请求啦。
如果是阻塞io编程的话,connect方法会阻塞,直到与对端建立完连接用户进程才被唤醒(所以此时socket.send不会报错)。我们平时没有感觉到connect阻塞是因为与对端建立连接所花的时间很短。


socket.setBlocking(False)
socket.connect((host, ip))		
socket.send(msg)		# 铁定报错,因为连接未建立好


正确做法应该是:

socket.setBlocking(False)
socket.connect((host, ip))		
while True:		# 需要不断尝试发送请求
	try:
		socket.send(msg)	
	except:
		break


接下来我们使用多路复用器来实现一下上面的http请求。
这里作者使用了selector这个库来实现多路复用。selector封装了epoll,poll和select这几个多路复用器,selector.DefaultSelector会根据当前的系统来选择合适的多路复用器。
如果是在windows系统,就会选择select(windows中没有epoll);如果在linux系统就会选择epoll。

除此之外,selector还可以在注册事件的时候传入回调函数,当某个socket的某个事件就绪的时候,selector就会通知我们去调用对应的这个回调函数(注意,seletor不会自动调用回调函数,而是要我们手动调用)。

使用selector库的DefaultSelector本质上和旧版python使用select库的select/epoll是一样的。只不过selector会帮你自动选择合适的多路复用器,以及增加了使用注册回调函数的功能。


代码如下:

# coding=utf-8

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from urllib.parse import urlparse
import socket

# 使用多路复用 + 事件循环 + 回调的方式实现http请求

# 定义一个爬虫类, 这个爬虫类很简单,传入一个url,爬虫负责将这个url的内容获取到即可
class Crawler:
    select = DefaultSelector()        # 定义一个selector对象存在类变量中, 目前在windows环境,所以自动选择select多路复用器
    finished = False                      # 爬取是否结束,该变量用于控制停止事件循环监听,如果爬取完所有url则停止loopEvents()的循环(停止监听事件)

    # 开始批量爬取
    @classmethod
    def start(cls, urls):
        cls.urls = urls

        for url in cls.urls:
            crawler = cls(url)
            crawler.getUrl()

    def __init__(self, url):
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)  # 设置为非阻塞状态
        self.url = url

    # 爬取单个页面
    def getUrl(self):
        # 解析url,构建请求报文
        self.__parseUrl()

        # # 连接服务器(非阻塞,用户进程委托内核发起connect系统调用,并让内核等待连接完成,用户进程不等待连接完成而是直接往下运行其他逻辑),这里必须用try
        try:
            self.client.connect((self.host, 80))
        except:
            pass

        # 注册监听写事件,对于客户端而言,与服务器建立好连接后可视为写就绪,写就绪后就可以发送请求报文
        self.select.register(self.client, EVENT_WRITE, self.__sendReq)  # 设置写就绪后的回调方法是去发起请求

    # 根据url解析主机和爬取的路径,已经构建请求报文
    def __parseUrl(self):
        url_component = urlparse(self.url)
        self.host = url_component.netloc  # url的主机名
        self.path = '/' if url_component.path == '' else url_component.path
        self.send_msg = "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (self.path, self.host)

    # 连接建立后,发起请求
    def __sendReq(self, key):   # key是我们待会要手动往回调函数传入的SelectorKey对象,该对象包含事件就绪的socket,可以对这个socket做出相关读写操作
        client = key.fileobj
        client.send(self.send_msg.encode('utf-8'))      # 发送请求报文

       # 发送请求报文后,就要接收响应,不过要等到读事件就绪(就是说等到服务端的响应到达了客户端,client所在机器的内核缓冲区有数据可读的时候)
        # modify更改监听写事件为监听读事件(修改epoll红黑树中对应的fd的事件类型)
        self.select.modify(client, EVENT_READ, self.__recvResponse)

    # 服务器的响应到达客户端,客户端读就绪,开始接收响应
    def __recvResponse(self, key):
        client = key.fileobj

        # 一个页面的响应内容有点多,一次接收不完,所以要循环接收
        response = b''
        while True:
             # 由于是非阻塞io,所以多次连续recv可能会报错,例如一次请求服务器发送过来的内容有5kb,每次只返回1K的数据,所以要recv5次才能把这次response的内容接受完,第一次recv肯定不会报错,因为多路复用器通知你该socke已经读就绪,但是第2~4次recv的时候就可能还未读就绪,所以第2~4次recv可能报错
            try:
                data = client.recv(1024)
            except:
                continue
            if data:
                response += data
            else:   # 数据接收完毕
                print("客户端: %s" % str(key.fd))
                self.select.unregister(client)     # # 将fd从内核空间的epoll红黑树中移除
                client.close()                     # 关闭连接
                break                               # 记得跳出循环,不然下一次client.recv会报错的,因为client已经关闭连接了

        # 获取完url的内容后,删除self.urls中的该url
        self.__class__.urls.remove(self.url)

        self.__class__.finished = False if len(self.urls) else True

        print(response.decode("utf-8"))

    # 循环监听事件(阻塞),在这个类方法中,多路复用器会开始监听所有客户端socket的事件状态;事件就绪后回调函数也是在这个方法里面调用的
    @classmethod
    def loopEvents(cls):
        while not cls.finished:
            events = cls.select.select()     # 监听所有socket的事件,该过程是阻塞的;返回一个包含多个元组的列表
            for key, mask in events:    # key是selectorKey对象,mask是事件的类型,是一个整型
                print("事件就绪,类型为 %s " % str(mask))
                callback = key.data     # 返回之前调用register时传入的回调函数,一个register对应一个callback
                callback(key)

# 定义要爬取的url
urls = [
    "http://zbpblog.com/blog-192.html",
    "http://zbpblog.com/blog-191.html",
    "http://zbpblog.com/blog-190.html",
    "http://zbpblog.com/",
    "http://zbpblog.com/cate-php",
    "http://zbpblog.com/cate-python",
]

Crawler.start(urls)     # 开始建立连接(但是start中开没开始发请求,发请求和接收响应是发生在下面的loopEvents中)
Crawler.loopEvents()    # 开始监听事件


PS:上面的代码还有可以改善的地方,在__recvResponse里面,我想通过循环recv立刻读取完所有服务器返回的数据,但是实际上多路复用器只通知了1次读就绪,因此recv可能产生报错异常。就算用了try防止了报错,也会有循环空转的情况(cpu做了无用功),还不如用这个空转的时间去处理其他socket的事件。于是有了以下改进,思路是,每通知1次读就绪就只执行1次recv,每次recv到的数据都保存起来(多路复用器通知了多次读就绪才把响应接收完,而未改进的代码是只通知了一次就想把响应接收完)。

改进的代码(改了3个方法,其他没变):

def __init__(self, url):
    self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.client.setblocking(False)  
    self.url = url
    self.response = b''         # 用于累积保存本次请求接收到的所有数据

def __recvResponse(self, key):		# 不用while循环
    client = key.fileobj
    data = client.recv(1024)
    if data:
        self.response += data		# self.response累积存储data
    else:   
        self.select.unregister(client)     # 解除监听
        client.close()                     # 关闭连接

        # 获取完url的内容后,删除self.urls中的该url
        self.__class__.urls.remove(self.url)

        self.__class__.finished = False if len(self.urls) else True

        self.__saveHtml()


# 保存页面到文件
def __saveHtml(self):
    try:
        dir_path = './crawled_page/'
        fname = 'index.html' if self.path =='/' else self.path.strip('/').strip('.html') + '.html'
        content_arr = self.response.decode('utf-8').split("\r\n\r\n")        # 第一个元素是响应头,应该去掉,只留响应体的内容
        content_arr[0] = ''
        content = ''.join(content_arr)

        if not os.path.isdir(dir_path):
            os.mkdir(dir_path)

        with open(dir_path + fname, 'w', encoding='utf-8') as f:
            f.write(content)
        print("%s 爬取成功" % str(fname))
    except BaseException as e:
        print(e)

在这个程序中,一次请求(getUrl)会生成一个客户端socket,每一个socket都会和服务端建立一次连接。每个socket的生命周期是:connect()建立连接 --- __sendReq()发送请求 --- __recvResponse()接收响应 --- 最后关闭连接。 也就是说一个socket只发送一个请求就关闭(因为这里是短连接),每请求1次就连接1次,这样频繁的建立和关闭连接会浪费建立连接的时间,效率比长连接低不少。

selectors模块的手册可以在这里查看:
https://docs.python.org/3/library/selectors.html
使用谷歌的翻译功能即可看到中文

也有直接的中文文档
https://www.dazhuanlan.com/2020/03/01/5e5a91ffa8c3d/?__cf_chl_jschl_tk__=6b83dc4dad612a631bcdfea8b7c25b3e6d2a6455-1602857112-0-ASrGptReaBYES2jJx6b65UzmH1JBwuqbjmaw5SJM212IPHRl_A1IJclgrEZL_jli_3OP2pLcFy1NT4YoyiubW4w7C8GVX8nzRyefjJlSh2Id_nYHtxBfEfNv7U1b0IdxmAmuJV2jZoJX9WDJQfcF_l0cIo4ARW4HIQLGPjsG8DWI3vL3uPl0QPwC3c9DfO15uz3dJl3m1wLjIdHaNAiLw-IHBvIrxUMNpuiSokPDOsyE2RyNIGdzKjERAtNcJj8uH-FjGLE5a14fPXrFBBUWW7BOnOoNhKzSJ5Og0KBKh2bImGNjr0a2VFegRSiINdm79g


多线程和多路复用相同点和区别:
相同点:两者都做到了高并发工作

区别:
多线程是在某个线程遇到阻塞的时候(例如io操作)通过切换线程,把cpu让给那些没有阻塞的线程占用的方式来做到并发,使得进程每时每刻都能用到cpu在工作,没有闲着。

多路复用虽然是单线程,但是使用了非阻塞IO+事件驱动(事件通知)+事件循环的模式做到高并发,由于非阻塞所以进程不会因为等待而让出cpu,事件通知保证了socket事件就绪后才进行发出系统调用请求,避免了不必要的while循环和系统调用而浪费cpu资源(相比于NIO模型的空转而浪费CPU而言)。

多路复用虽然是单线程,但是这个单线程一直都使用着cpu在进行运作。

多路复用比多线程好的地方:
1.避免了线程间切换(节省了上下文切换的时间+减少cpu损耗,切换线程是要消耗cpu的)
2.无需考虑线程安全和上锁(节省了上锁和解锁的时间)
3.线程的创建所需的资源和成本不小,所以多路复用更省资源。

所以多路复用在这种情况下是要比多线程的性能更高的。
(当然进行系统调用如recv,send,connect的时候cpu还是会从用户态切换到内核态,这个过程也相当于让出了cpu,但是这个情况放在多线程里面也会发生,只要是进行的io操作都会发生系统中断和用户态内核态切换)




上面的代码使用了传入回调函数的编程方式

在getUrl()中传入了回调函数__sendReq,在__sendReq中的register又传入了回调函数__recvResponse,可以说回调函数一层层的进行嵌套。
因为有多个函数,所以变量共用也是个问题,上面的代码使用面向对象编程所以可以通过定义成员变量来解决变量共用的问题。但是如果是普通的函数就不好解决这个问题。

而且回调的方式可读性很差。

为了解决这个问题这里就提出了协程。

协程要做到的事情:
1.使用单线程做到任务并发执行(这个多路复用也能做到)
2.采用同步的方式去编写异步的代码(这是python中Selector库的多路复用器做不到的)

协程的内容会放在下一节进行详细说明。




更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > 从IO模型到协程(四) 用python实现一个多路复用程序

热门推荐
推荐新闻