更多优质内容
请关注公众号

从IO模型到协程(三) 多路复用之select、poll和epoll-阿沛IT博客

正文内容

从IO模型到协程(三) 多路复用之select、poll和epoll

栏目:其他内容 系列:从IO模型到协程系列 发布时间:2020-11-05 11:35 浏览量:4440

什么是IO多路复用

IO多路复用本质是操作系统的一系列系统调用(select(), poll() 和 epoll()),它可以监视多个套接字,一旦某个套接字就绪(一般是读就绪或者写就绪),内核就会将就绪的描述符返回给用户进程,通知用户进程进行相应的读写操作。

Linux内核提供的多路复用器有 select(), poll() 和 epoll()

多路复用器只会通知用户进程哪些文件描述符已经事件就绪和事件的类型,但不会自动将内核缓冲区的数据拷贝到用户进程的内存。用户进程需要自己调recv才能获取内核缓存区的客户端消息。因此多路复用还是一种同步IO模式。

在使用select/poll/epoll多路复用的时候,必须要将IO操作(recv/connect/accept)设置为非阻塞的才行。

如果是监控到客户端状态的同时还把客户端数据返回给用户程序,无需用户程序进行系统调用,那就是异步IO模型。

目前Linux内核支持的都是同步IO模型,而只有window的iocp支持这种异步io模型。

 

在正式介绍多路复用之前,我们需要对之前服务端网络通信的流程做更详细的介绍:

已知服务器开启通信的流程为 创建套接字(socket())、套接字绑定某个ip和端口并监听端口(bind() 和 listen())、接收连接(accept())以及接收客户端数据(recv())。

 

创建套接字本质是让内核创建一个socket套接字对象(其实不止一个,但我们可以抽象为一个)。这个socket对象中包含多个成员,我们主要关注3个:输入缓冲区、输出缓冲区和等待队列(其他成员是类似于锁、控制信息和文件描述符等)。

 

套接字绑定ip和端口本质是把ip和端口号信息写入到套接字对象的控制信息中(控制信息包括该套接字绑定的本机ip和端口、以及通信对端的ip和端口、控制位、连接状态和窗口大小等信息)。

接收连接(accept())会引起进程阻塞。

在学习进程的时候我们知道,操作系统在调度进程的时候,众多进程会放到就绪队列(工作队列)和阻塞队列(等待队列)中,工作列队中的进程会轮流持有CPU进行运算,因此工作队列中的进程我们可以视作为是正在并发运行的进程。而阻塞的本质是内核将一个进程从工作队列转移到阻塞队列。

实际上在执行accept()的时候,用户程序把用于接收连接的套接字(我们称之为连接套接字)的描述符作为参数传入accept()中,内核会将进程从工作队列中转移到其绑定的连接套接字的等待队列里。(队列的节点只是引用了进程而非真的保存了进程的实际空间和内容)。

 

当网卡接收到对端发送过来的连接数据包,数据包的帧数据会被网卡保存到内存中(网卡的输入缓冲区),之后网卡就会发出中断信号,中断信号通过总线传输到CPU。CPU会停止当先运行的进程,转而运行网卡的中断程序,中断程序作了以下事情,先从网卡的缓冲区取出数据包的信息,根据数据包的接收方端口找到对应的连接套接字(即图中的socket对象)。此时CPU会做两件事,一是将TCP数据写入到socket的输入缓冲区(由于这是个连接包,所以TCP数据为空,所以没有数据可以写入到输入缓冲区),二是将socket等待队列中所有进程转移回工作队列(也就是唤醒了和该socket相关的所有进程)。

之后连接套接字socket会创建一个用于接收数据的副本socket对象,我们称之为副本套接字。accept()方法返回的就是副本套接字的描述符。

 

接收客户端数据(recv())是由副本套接字负责的,也会阻塞进程

数据从通信对端到达服务端网卡后,也会发生上述的中断,唤醒进程的过程,并且内核会将数据包的数据从socket的输入缓冲区拷贝到用户程序的内存中。

 

有了上面的铺垫,我们接下来可以介绍多路复用的内容了。

 

多路复用器之select

内核提供了select系统调用:

int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
                  
nfds:传入文件描述符
readfds:文件描述符的读事件
writefds:文件描述符的写事件
exceptfds:文件描述符的异常事件

 

下面是官方文档对select系统调用的描述

select() allows a program to monitor multiple file descriptors,waiting until one or more of the file descriptors become "ready" for some class of I/O operation
       
# 它允许程序去监控多个文件描述符,直到一个或多个文件描述符变为“就绪”状态以待进行IO操作。

可以在Linux中执行man 2 select 命令或者上网搜man 2 select查一下内核的select()方法

 

PS:

什么是文件描述符?简单的理解就是:Linux 中一切皆文件,比如 C++ 源文件、视频文件、Shell脚本、可执行文件等,就连键盘、显示器、鼠标等硬件设备也都是文件。一个 Linux 进程可以打开成百上千个文件,为了表示和区分已经打开的文件,Linux 会给每个文件分配一个编号(一个 ID),这个编号就是一个整数,被称为文件描述符(File Descriptor)。

这样说还远没有理解到文件描述符的本质,只是方便理解才这样说。

在本文的例子中,客户端和服务端socket对象就包含一个文件描述符(文件描述符是一个整型),这个文件描述符就代表这个socket对象。select监视的就是客户端和服务端的socket对象。

下面文件描述符(file description)用fd简称表示。

 

接下来用select多路复用器实现一下上一节的文章中客户端和服务端模型:

# coding=utf-8

import socket
import select
# from select import select

# 服务端代码

# 创建套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定ip和端口
ip = "127.0.0.1"
port = 8000
server.bind((ip, port))

server.listen(3)                # 只允许最多3个客户端连接
server.setblocking(False)       # 非阻塞socket

rfd = set()     # 存放要监控的文件描述符,rfd存放监控读状态的socket
wfd = set()     # 存放要监控的文件描述符,wfd存放监控写状态的socket
efd = set()     # 存放要监控的文件描述符,efd存放监控出现异常状态的socket
rfd.add(server)

client_no = 1
clients = dict()

while True:
    print("开始select监控")
    # 获取所有读就绪、写就绪和发生异常的socket
    r_sockets, w_sockets, e_sockets = select.select(rfd, wfd, efd)      # 还可以传第四参超时时间,如果不传第四参,则select方法是阻塞的,阻塞直到有socket的状态改变;如果设了第4参为1,则执行select会阻塞1秒,1秒内没有socket改变状态就会循环1次再执行select

    # 这里我只关注可读的socket
    for r_socket in r_sockets:
        if r_socket == server:   # 如果是服务器socket可读,说明有连接进来了,此时可以去直接接收连接
            client, addr = r_socket.accept()      # 不阻塞,而且肯定可以接收到连接
            rfd.add(client)     # 将连接的客户端添加到要监控的集合中

            clients[client] = client_no
            print("客户端 %s 连接成功" % str(client_no))

            client_no += 1

        else:   # 如果是客户端socket可读,说明客户端发送消息到服务端
            msg = r_socket.recv(1024)       # 不阻塞

            if not msg:
                print("客户端 %s 断开连接" % str(clients[r_socket]))
                rfd.remove(r_socket)
            else:
                print("客户端 %s 发送消息:%s" % (str(clients[r_socket]), msg.decode('utf-8')))
				

上面的代码中,select方法内部会监视服务端socket(只监视它是否可读,不监视它是否可写或异常),由于此时socket是不可读的,所以select会阻塞程序,此时程序会让出cpu进入等待队列。一旦socket的状态变为可读(即有客户端连接进来),此时select会唤醒程序,并返回服务端socket给 r_sockets变量。

用户程序需要主动accept接收一下客户端的连接,并将刚连接进来的客户端soket也添加到要监视的可读状态的文件描述符集合rfd中。也就是说,我现在监视的就是 服务端socket和客户端1号的socket。

之后,如果有新的客户端连进来,都会被添加到被监视的集合中让select进行监控。

如果没有新客户端连接进来(服务端socket不可读,未达到读就绪状态),客户端也没有发送消息给服务端(客户端socket也没达到可读状态),select就会阻塞,直到有socket达到可读状态才会唤醒程序。
当有某些客户端发送消息进来的时候,select就监控到这几个客户端可读,然后将这些客户端socket返回到 r_sockets集合中,服务端就会对这几个客户端进行recv()。

 

select多路复用的工作原理

假设进程A中创建了3个用于和客户端通信的副本socket对象,当调用select(rfd)时,用户程序会将这3个socket的描述符列表拷贝到内核,告诉内核要监听这些socket的读事件。

调用select时,内核会先遍历一遍socket(遍历一遍内核空间中的rfd列表,根据fd找到socket对象),如果存在某个socket的接收缓冲区是有数据的,那么select 就不会阻塞进程,直接返回。如果所有socket都没有数据,内核会把进程A从工作队列移出,把进程A添加到这3个socket对象的等待队列中,此时进程A阻塞。

当某个socket对象的数据到达服务端,内核根据数据包中接收方端口、发送方ip和端口找到这3个socket对象,将进程A从3个socket对象的等待队列移出到工作队列(进程A被唤醒),这里又涉及到1次对所有socket的遍历。

回到用户空间,进程A被唤醒后是不知道哪些socket对象接收了数据,他需要遍历一次这3个socket(即遍历代码中的rfd列表的socket),查看他们的状态才能知道是哪些socket对象接收到了数据,这里用户程序又需要遍历1次。

用户程序处理完接收到的网络数据后,需要重新调用select监控这3个socket,又要把rfd里的3个fd传递给内核。

 

由于需要做多次的O(n)复杂度的遍历,考虑到效率,linux规定select系统调用最多监视1024个socket。

 

select的几大缺点:
1. 每次系统调用select()时,都需要把fd集合从用户态拷贝到内核态
2. 进程被唤醒后每次都要遍历所有套接字是否就绪,内核态时也需要遍历所有套接字移除他们等待队列中的进程引用
3. select支持监视的文件描述符数量太小了,默认是1024

 

多路复用器之epoll
相比与select和poll而言,epoll没有描述符数量限制,而且无需多次的将fd从用户空间拷贝到内核空间。

相比于select只有1个系统调用,epoll提供了3个系统调用接口:

epoll_create(size):该函数在内核空间开辟一块新的空间(用来存放要监视的文件描述符),该函数返回一个epoll的文件描述符。size参数限制这个空间可以存多少个文件描述符。
epoll_ctl(int epfd,int op,int fd, struct epoll_event): 该函数是epoll的事件注册函数。这个函数作用是指定监控或者不再监控某个fd的某一个事件(这里说的事件是指 accept,recv,send这些指令)。

第一参:epfd是epoll_create返回的epoll文件描述符,指代在内核空间中开辟的epoll空间

第二参:op是具体的操作,包括:EPOLL_CTL_ADD 注册新的fd到epfd(把fd拷贝到epoll空间);EPOLL_CTL_DEL 从epfd删除一个fd;EPOLL_CTL_MOD 修改已注册的fd的监听事件。

第三参:fd, 需要监听的文件描述符,一般指socket_fd

第四参:event, 告诉内核要监听或者不监听该fd的哪个事件。
epoll_wait(epfd, epoll_event, maxevents, timeout):等待(监控)注册事件的发生。timeout决定这个方法是阻塞的还是非阻塞的。0表示非阻塞,大于0表示阻塞。比如:timeout设为1,调用epoll_wait时内核会监控所有fd是否触发了注册的事件,阻塞1秒,1秒内还是没有fd触发注册的事件,就会返回-1;如果timeout设为0,则马上返回-1;如果timeout为-1,则会一直阻塞,直到有fd的事件状态发生改变。


epoll_create创建的epoll空间又分为两块,一个是事件注册列表,用于记录epoll_ctl给哪些fd注册了哪些事件。一个是事件就绪列表,用于记录哪些fd的哪些事件已经就绪。

epoll的常用事件如下:
EPOLLIN:读就绪  常量值为 1
EPOLLOUT:写就绪   4
EPOLLERR:服务端出现异常   8
EPOLLHUP: 客户端读写关闭(可以理解为client关闭了连接,但是连接还没有断开) 16

epoll有两种工作模式:LT(水平触发,默认) 和 ET(边缘模式)

LT:事件就绪后,用户可以选择处理或不处理,如果用户本次不处理,则下次调用epoll_wait仍会将未处理的事件打包给你

ET:事件就绪后,用必须处理,因为内核将就绪事件打包给你的时候就把对应的就绪事件从就绪列表中删除。

ET模式很大程度减少了epoll_wait被触发的次数(也减少了fd从内核到用户空间的拷贝次数),效率比LT高。

 

epoll的工作原理如下:
1.当执行epoll_create()时,内核会创建一个eventpoll的结构体,我们关注其中两个成员属性(rbr和rdlist),分别对应两种数据结构:红黑树和双向链表(就绪链表),其中红黑树用于保存和管理所有的要监控的fd,双向链表保存事件就绪的fd,fd文件描述符代表一个套接字。

此外该eventpoll中还有一个等待队列。

struct eventpoll{
	...
	// 红黑树的根节点,该树存储所有添加到epoll的需要监控的socket
	struct rb_root	rbr;
	// 双向链表存放已就绪的socket
	struct list_head rdlist;
}

2.当执行epoll_ctl()时,会把要监控的fd从用户空间拷贝到内核空间,然后再将这个fd作为一个节点添加到eventpoll的红黑树中。同时被监控的socket也会保存一份该eventpoll的引用。
3.当一个socket的读事件或者写事件就绪时,中断处理程序会通过端口和发送方IP找到该socket和对应eventpoll,将这个socket的fd添加到eventpoll对象的就绪链表中。
4.当调用 epoll_wait()时,会检测就绪链表的节点数是否为空,如果不为空说明有fd的事件已经就绪,此时就会将就绪链表中的fd拷贝回用户空间。让用户程序进行相应的读写操作。如果为空,epoll_wait()就会阻塞用户程序,直到有事件就绪,就绪队列不再为空。


epoll使用红黑树管理fd,使得eventpoll可以O(logn)复杂度高效增删查找要监控的fd。

 

epoll使用的代码实例(python实现):

# coding=utf-8

import select, socket

# 服务端代码,该代码不能再windows中运行,因为windows中没有epoll的系统调用

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 8088))
server.listen()
server.setblocking(False)

print("服务器启动成功")

# 创建epoll对象,相当于执行了 epoll_create 在内核中开辟了一块空间用于记录fd事件
epoll = select.epoll()

# 注册要监听的fd事件,相当于调用 epoll_ctl 将fd的事件写入内核空间中。
epoll.register(server.fileno(), select.EPOLLIN)     # 监听server套接字的可读事件,第一参传的是服务端socket的文件描述符,文件描述符是一个整型数字

# 存储要监听的socket
monitored_socket = { server.fileno():server}

# 存储客户端发送过来的消息
client_msg = {}
timeout = 10

while True:
    # 等待fd的事件就绪(此时内核会监控所有fd的事件状态),相当于执行epoll_wait(),这个过程是阻塞的,上面的setblocking(False)是控制套接字的操作不阻塞,但是不能控制epoll不阻塞
    # 这里可以传入阻塞的超时时间timeout, 不传则一直阻塞; 如果没有事件就绪且超过超时时间则返回null;如果有事件就绪,则返回一个列表,每个列表放着一个元祖,元组放着事件就绪的fd和具体事件类型
    events = epoll.poll(timeout)

    if not events:
        print("无事件就绪")
    else:
        print("有 %s 个事件就绪" % len(events))

    for fd, event in events:
        ready_socket = monitored_socket[fd]     # 根据fd获取就绪的socket对象
        print("event", event)

        if ready_socket == server:      # 如果就绪fd是server,说明server可读事件就绪,表示有客户端连接
            client, addr = server.accept()     # 非阻塞,而且一定能接收到连接

            client_fd = client.fileno()

            print("客户端 %s 建立连接成功" % str(client_fd))

            client.setblocking(False)

            # 先监听客户端的可读事件
            epoll.register(client_fd, select.EPOLLIN)
            monitored_socket[client_fd] = client
            client_msg[client_fd] = []      # 保存该客户端的所有发送过来的消息

        elif event & select.EPOLLHUP:      # 如果客户端关闭连接

            ready_socket.close()

            epoll.unregister(fd)    # 不再监听该客户端的事件

            del monitored_socket[fd]

            del client_msg[fd]

            print("客户端 %s 关闭连接" % str(fd))

        elif event & select.EPOLLIN:  # 如果客户端读事件就绪
            msg = ready_socket.recv(1024)
            
            if msg:
                print("接收到客户端 %s 的消息 %s" % (str(fd), msg.decode("utf-8")))

                client_msg[fd].append(msg)

                # 修改监听事件为写事件,因为客户端发送消息过来之后我想将消息马上原样发送回给客户端
                epoll.modify(fd, select.EPOLLOUT)
            else:   # 如果返回空字符说明客户端关闭socket断开连接
                ready_socket.close()

                epoll.unregister(fd)  # 不再监听该客户端的事件

                del monitored_socket[fd]

                del client_msg[fd]

                print("客户端 %s 关闭连接" % str(fd))

        elif event & select.EPOLLOUT: # 如果客户端写事件就绪,其实只要客户端连接了服务端且它的输入缓冲区没满应该就满足写事件就绪(所谓的客户端写就绪就是服务端可以执行send发送消息给客户端)
            try:
                msg = client_msg[fd].pop()
            except:
                print("客户端 %s 消息列表为空" % str(fd))
                epoll.modify(fd, select.EPOLLIN)  # 如果将客户端的所有消息都发回去了,就改回监听客户端读事件
            else:
                ready_socket.send(msg)

 

客户端代码:

# coding=utf-8

from threading import Thread
import socket


# 创建套接字
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定ip和端口
ip = "127.0.0.1"
port = 8088

client.connect((ip, port))

def getResponse():
    while True:     # 开一个线程用于接收服务器返回的消息
        try:
            response = client.recv(1024)		# 阻塞
            print(response.decode("utf-8"))
        except:     # 如果主线程关闭客户端socket则recv会报错,此时break跳出循环,结束该线程即可
            print("子线程由于client断开连接而退出")
            break

thread_for_response = Thread(target=getResponse)
thread_for_response.start()

while True:
    msg = input()
    if msg:
        client.send(msg.encode("utf-8"))
        print("msg:" + msg)
    else:   # 如果直接输入换行则断开连接
        client.close()      # 关闭套接字断开连接
        print("client close")
        break

PS:client.shutdown()不会断开连接,只是把读写关闭了而已,如果调用client.shutdown(),在服务端的表现就是客户端发送一个空字符给服务端,会触发客户端socket的EPOLLIN。所以我在EPOLLIN里面也加了一段关闭client并且从epoll对象中删除客户端fd的逻辑。

这个代码我把服务端代码放到我的远程服务器运行,客户端放到我的本机运行,不过要将客户端代码的连接ip改为我的远程服务器真实ip才行。

 

对比select和epoll,epoll比较好的解决了select的3个缺点:
1.select有fd的数量限制默认是1024,epoll没有限制。
2.每次调用select()都要所有的fd重复的拷贝到内核空间,而epoll只需拷贝1次并保存到内核空间用红黑树管理。

3.socket的事件得到响应,回到用户程序后,用户程序需要遍历一次所监控的fd列表,检索出就绪的fd,复杂度为O(n);epoll会把就绪的fd直接返回给用户程序,用户程序无需遍历找出就绪fd。

 

但是并不是说epoll的性能就一定比select高,要看具体使用场景:

在连接并发数高,连接活跃度不高(连接了之后不怎么发消息)的情况下,epoll比select更合适
在连接并发数不高,连接活跃度高的情况下,select比epoll更合适

 

最后强调一点:epoll,poll和select都是同步IO,因为他们需要在socket的事件就绪后由用户程序进行读写(recv,send)。




更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > 从IO模型到协程(三) 多路复用之select、poll和epoll

热门推荐
推荐新闻