引言
多路复用要解决的问题
并发多客户端连接,在多路复用之前最简单和典型的方案:同步阻塞网络IO模型。这种模式的特点就是用一个进程来处理一个网络连接(一个用户请求),比如一段典型的示例代码如下。直接调用 recv 函数从一个 socket 上读取数据。
int main()
{
...
recv(sock, ...) //从用户角度来看非常简单,一个recv一用,要接收的数据就到我们手里了。
}
优点:这种方式非常容易让人理解,写起代码来非常的自然,符合人的直线型思维。
缺点:性能差,每个用户请求到来都得占用一个进程来处理,来一个请求就要分配一个进程跟进处理,类似一个学生配一个老师,一位患者配一个医生,可能吗?进程是一个很笨重的东西。一台服务器上创建不了多少个进程。
结论
进程在 Linux 上是一个开销不小的家伙,先不说创建,光是上下文切换一次就得几个微秒。所以为了高效地对海量用户提供服务,必须要让一个进程能同时处理很多个 tcp 连接才行。现在假设一个进程保持了 10000 条连接,那么如何发现哪条连接上有数据可读了、哪条连接可写了 ?我们当然可以采用循环遍历的方式来发现 IO 事件,但这种方式太低级了。我们希望有一种更高效的机制,在很多连接中的某条上有 IO 事件发生的时候直接快速把它找出来。其实这个事情 Linux 操作系统已经替我们都做好了,它就是我们所熟知的 IO 多路复用机制。这里的复用指的就是对进程的复用。
I/O 多路复用模型
概述
-
I/O
网络 I/O
-
多路
多个客户端连接(连接就是套接字描述,即 socket 或者 channel),指的是多条 TCP 连接
-
复用
用一个进程来处理多条的连接,使用单进程就能够实现同事处理多个客户端的连接
-
一句话
-
实现了用一个进程来处理大量的用户连接
-
IO 多路复用类似一个规范和接口,落地实现
可以分 select → poll → epoll 三个阶段来描述
-
动画演示
-
Redis 单线程如何处理那么多并发客户端连接,为什么单线程,为什么快
Redis 的 IO 多路复用
Redis 利用 epoll 来实现 IO 多路复用,将连接信息和时间放在队列中,一次放到文件时间分派器,时间分派器将事件分发给事件处理器。
Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以 I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而 I/O 多路复用就是为了解决这个问题而出现所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要 select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符)Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO 多路复用程序、文件事件分派器和事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。
参考《Redis 设计与实现》
Redis 基于 Reactor 模式开发了自己的网络事件处理器:整个处理器被称为文件事件处理器(file event handler):
- 文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
- 当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。
虽然文件事件处理器以单线程方式运行,但通过使用 I/O 多路复用程序来监听多个套接字,文件事件处理器既实现了高性能的网络通信模型又可以很好地与 Redis 服务器中其他同样以单线程方式运行地模块进行对接,这保持了 Redis 内部单线程设计的简单性。
结论:从 Redis6 开始,将网络数据读写、请求协议解析通过多个 IO 线程的来处理,对于真正的命令执行来说,仍然使用单线程操作,一举两得。
读读 read
上午开会,错过了公司食堂的饭点, 中午就和公司的首席架构师一起去楼下的米线店去吃米线。我们到了一看,果然很多人在排队。
架构师马上发话了:嚯,请求排队啊!你看这位收银点菜的,像不像 nginx 的反向代理?只收请求,不处理,把请求都发给后厨去处理 我们交了钱,拿着号离开了点餐收银台,找了个座位坐下等餐。 架构师:你看,这就是异步处理,我们下了单就可以离开等待,米线做好了会通过小喇叭“回调”我们去取餐; 如果同步处理,我们就得在收银台站着等餐,后面的请求无法处理,客户等不及肯定会离开了。
接下里架构师盯着手中的纸质号牌。
架构师:你看,这个纸质号牌在后厨“服务器”那里也有,这不就是表示会话的ID吗?
有了它就可以把大家给区分开,就不会把我的排骨米线送给别人了。过了一会, 排队的人越来越多,已经有人表示不满了,可是收银员已经满头大汗,忙到极致了。
架构师:你看他这个系统缺乏弹性扩容, 现在这么多人,应该增加收银台,可以没有其他收银设备,老板再着急也没用。
老板看到在收银这里帮不了忙,后厨的订单也累积得越来越多, 赶紧跑到后厨亲自去做米线去了。
架构师又发话了:幸亏这个系统的后台有并行处理能力,可以随意地增加资源来处理请求(做米线)。 我说:他就这点儿资源了,除了老板没人再会做米线了。 不知不觉,我们等了20分钟, 但是米线还没上来。 架构师:你看,系统的处理能力达到极限,超时了吧。 这时候收银台前排队的人已经不多了,但是还有很多人在等米线。
老板跑过来让这个打扫卫生的去收银,让收银小妹也到后厨帮忙。打扫卫生的做收银也磕磕绊绊的,没有原来的小妹灵活。
架构师:这就叫服务降级,为了保证米线的服务,把别的服务都给关闭了。 又过了20分钟,后厨的厨师叫道:237号, 您点的排骨米线没有排骨了,能换成番茄的吗? 架构师低声对我说:瞧瞧, 人太多, 系统异常了。然后他站了起来:不行,系统得进行补偿操作:退费。
说完,他拉着我,饿着肚子,头也不回地走了。
- 同步:调用者要一直等待调用结果的通知后才能进行后续的执行,现在就要,我可以等,等出结果位置。
- 异步
- 指被调用方先返回应答让调用者先回去,然后再计算调用结果,结算完最终结果后再通知并返回给调用方
- 异步调用要想获得结果一般通过回调
- 同步与异步的理解
- 同步、异步的讨论对象是被调用者(服务提供者),重点在于获得调用结果的消息通知方式上
- 阻塞:调用方一直在等待而且别的事情什么都不做,当前进/线程会被挂起,啥也不干
- 非阻塞:调用在发出后,调用方先去忙别的事情,不会阻塞当前进/线程,而会立即返回
- 阻塞与非阻塞的理解:阻塞、非阻塞的讨论对象是调用者(服务请求者),重点在于等消息时候的行为,调用者是否能干其他事
总结(4 种组合方式)
- 同步阻塞
- 同步非阻塞
- 异步阻塞
- 异步非阻塞
Unix 网络编程中的五种 IO 模型
- Blocking IO - 阻塞 IO
- NoneBlocking IO - 非阻塞 IO
- IO multiplexing - IO 多路复用
- signal driven IO - 信号驱动 IO
- asynchronous IO - 异步 IO
Java 验证
背景
一个 redisServer + 2 个 Client
BIO
概述
当用户进程调用了 recvfrom 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据(对于网络 IO 来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的 UDP 包。这个时候 kernel 就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当 kernel 一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。所以,BIO 的特点就是在 IO 执行的两个阶段都被 block 了。
先演示 accept
-
accept 监听
-
code 案例
package dev.matrixlab.study.iomultiplex.one; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; public class RedisServer { public static void main(String[] args) throws IOException { byte[] bytes = new byte[1024]; ServerSocket serverSocket = new ServerSocket(6379); while(true) { System.out.println("-----111 等待连接"); Socket socket = serverSocket.accept(); System.out.println("-----222 成功连接"); } } }
RedisClient01.javapackage dev.matrixlab.study.iomultiplex.one; import java.io.IOException; import java.net.Socket; import java.util.Scanner; public class RedisClient01 { public static void main(String[] args) throws IOException { System.out.println("------RedisClient01 start"); Socket socket = new Socket("127.0.0.1", 6379); } }
RedisClient02.javapackage dev.matrixlab.study.iomultiplex.one; import java.io.IOException; import java.net.Socket; public class RedisClient02 { public static void main(String[] args) throws IOException { System.out.println("------RedisClient02 start"); Socket socket = new Socket("127.0.0.1", 6379); } }
再演示 read
-
read 读取
-
code 案例一
-
先启动 RedisServerBIO,再启动 RedisClient01 验证后再启动 2 号客户端
RedisServerBIO.javapackage dev.matrixlab.study.iomultiplex.bio; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; public class RedisServerBIO { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(6379); while(true) { System.out.println("-----111 等待连接"); Socket socket = serverSocket.accept();//阻塞1 ,等待客户端连接 System.out.println("-----222 成功连接"); InputStream inputStream = socket.getInputStream(); int length = -1; byte[] bytes = new byte[1024]; System.out.println("-----333 等待读取"); while((length = inputStream.read(bytes)) != -1)//阻塞2 ,等待客户端发送数据 { System.out.println("-----444 成功读取"+new String(bytes,0,length)); System.out.println("===================="); System.out.println(); } inputStream.close(); socket.close(); } } }
RedisClient01.javapackage dev.matrixlab.study.iomultiplex.bio; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.util.Scanner; public class RedisClient01 { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1",6379); OutputStream outputStream = socket.getOutputStream(); //socket.getOutputStream().write("RedisClient01".getBytes()); while(true) { Scanner scanner = new Scanner(System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit")) { break; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......"); } outputStream.close(); socket.close(); } }
RedisClient02.javapackage dev.matrixlab.study.iomultiplex.bio; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.util.Scanner; public class RedisClient02 { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1",6379); OutputStream outputStream = socket.getOutputStream(); //socket.getOutputStream().write("RedisClient01".getBytes()); while(true) { Scanner scanner = new Scanner(System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit")) { break; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......"); } outputStream.close(); socket.close(); } }
-
存在的问题
上面的模型存在很大的问题,如果客户端与服务端建立了连接,如果这个连接的客户端迟迟不发数据,程序就会一直堵塞在
read()
方法上,这样其他客户端也不能进行连接,也就是一次只能处理一个客户端,对客户很不友好。直到问题所在了,请问如何解决?
-
-
code 案例2
-
多线程模式
利用多线程,只要连接了一个 socket,操作系统分配一个线程来处理,这样
read()
方法堵塞在每个具体线程上而不堵塞主线程,就能操作多个 socket 了,哪个线程中的 socket 有数据,就读哪个 socket,各取所需,灵活统一。程序服务端只负责监听是否有客户端连接,使用
accept()
阻塞。客户端 1 连接服务端,就开辟一个线程(thread1)来执行read()
方法,程序服务端继续监听;客户端 2 连接服务端,也开辟一个线程(thread2)来执行read()
方法,程序服务端继续监听;客户端 3 连接服务端,也开辟一个线程(thread3)来执行read()
方法,程序服务端继续监听……任何一个线程上的 socket 有数据发送过来,
read()
就能立马读到,cpu 就能进行处理。RedisServerBIOMultiThread.javapackage dev.matrixlab.study.iomultiplex.bio; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; public class RedisServerBIOMultiThread { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(6379); while(true) { //System.out.println("-----111 等待连接"); Socket socket = serverSocket.accept();//阻塞1 ,等待客户端连接 //System.out.println("-----222 成功连接"); new Thread(() -> { try { InputStream inputStream = socket.getInputStream(); int length = -1; byte[] bytes = new byte[1024]; System.out.println("-----333 等待读取"); while((length = inputStream.read(bytes)) != -1)//阻塞2 ,等待客户端发送数据 { System.out.println("-----444 成功读取"+new String(bytes,0,length)); System.out.println("===================="); System.out.println(); } inputStream.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } },Thread.currentThread().getName()).start(); System.out.println(Thread.currentThread().getName()); } } }
package dev.matrixlab.study.iomultiplex.bio; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.util.Scanner; public class RedisClient01 { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1",6379); OutputStream outputStream = socket.getOutputStream(); //socket.getOutputStream().write("RedisClient01".getBytes()); while(true) { Scanner scanner = new Scanner(System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit")) { break; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......"); } outputStream.close(); socket.close(); } }
package dev.matrixlab.study.iomultiplex.bio; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.util.Scanner; public class RedisClient02 { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1",6379); OutputStream outputStream = socket.getOutputStream(); //socket.getOutputStream().write("RedisClient01".getBytes()); while(true) { Scanner scanner = new Scanner(System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit")) { break; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......"); } outputStream.close(); socket.close(); } }
-
存在的问题
多线程模型,每来一个客户端,就要开辟一个线程,如果来 1 万个客户端,那就要开辟 1 万个线程。在操作系统中用户态不能直接开辟线程,需要调用内核来创建的一个线程,这其中还涉及到用户状态的切换(上下文的切换),十分耗资源。知道问题所在了,请问如何解决?
-
解决方法
-
使用线程池
这个在客户端连接少的情况下可以使用,但是用户量大的情况下,你不知道线程池要多大,太大了内存可能不够,也不可行。
-
NIO(非阻塞式 IO)方法
因为
read()
方法堵塞了,所有要开辟多个线程,如果什么方法能使read()
方法不堵塞,这样就不用开辟多个线程了,这就用到了另一个 IO 模型,NIO(非阻塞式 IO)。
-
-
-
总结
Tomcat7 之前就是用 BIO 多线程来解决多连接
痛点
-
两个痛点
- accept
- read
- 在阻塞式 I/O 模型种,应用程序在调用 recvfrom 开始到它返回有数据报准备好这段时间是阻塞的,recvfrom 返回成功后,应用进程才能开始处理数据报。
-
阻塞式 IO 小总结
-
思考
每个线程分配一个连接,必然会产生多个,既然是多个 socket 连接必然需要放入进容器,纳入统一管理。
NIO
概述
当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,NIO 特点是用户进程需要不断的主动询问内核数据准备好了吗?一句话,用轮询替代阻塞!
面试题
在 NIO 模式种,一切都是非阻塞的:accept()
方法是非阻塞的,如果没有客户端连接,就返回无连接标识;read()
方法是非阻塞的,如果 read()
方法读取不到数据就返回空闲中标识,如果读取到数据时只阻塞 read()
方法读数据的时间。在 NIO 模式中,只有一个线程:当一个客户端与服务端进行连接,这个 socket 就会加入到一个数组中,隔一段时间遍历一次,看这个 socket 的 read()
方法能否读到数据,这样一个线程就能处理多个客户端的连接和读取了。
code 案例
上述以前的 socket 的阻塞的,另外开发一套 API ServerSocketChannel
package dev.matrixlab.study.iomultiplex.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
public class RedisServerNIO {
static ArrayList<SocketChannel> socketList = new ArrayList<>();
static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
System.out.println("---------RedisServerNIO 启动等待中......");
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("127.0.0.1",6379));
serverSocket.configureBlocking(false);//设置为非阻塞模式
while (true) {
for (SocketChannel element : socketList) {
int read = element.read(byteBuffer);
if(read > 0) {
System.out.println("-----读取数据: "+read);
byteBuffer.flip();
byte[] bytes = new byte[read];
byteBuffer.get(bytes);
System.out.println(new String(bytes));
byteBuffer.clear();
}
}
SocketChannel socketChannel = serverSocket.accept();
if(socketChannel != null)
{
System.out.println("-----成功连接: ");
socketChannel.configureBlocking(false);//设置为非阻塞模式
socketList.add(socketChannel);
System.out.println("-----socketList size: "+socketList.size());
}
}
}
}
package dev.matrixlab.study.iomultiplex.nio;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;
public class RedisClient01 {
public static void main(String[] args) throws IOException {
System.out.println("------RedisClient01 start");
Socket socket = new Socket("127.0.0.1",6379);
OutputStream outputStream = socket.getOutputStream();
while(true) {
Scanner scanner = new Scanner(System.in);
String string = scanner.next();
if (string.equalsIgnoreCase("quit")) {
break;
}
socket.getOutputStream().write(string.getBytes());
System.out.println("------input quit keyword to finish......");
}
outputStream.close();
socket.close();
}
}
package dev.matrixlab.study.iomultiplex.nio;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;
public class RedisClient02 {
public static void main(String[] args) throws IOException {
System.out.println("------RedisClient02 start");
Socket socket = new Socket("127.0.0.1",6379);
OutputStream outputStream = socket.getOutputStream();
while(true) {
Scanner scanner = new Scanner(System.in);
String string = scanner.next();
if (string.equalsIgnoreCase("quit")) {
break;
}
socket.getOutputStream().write(string.getBytes());
System.out.println("------input quit keyword to finish......");
}
outputStream.close();
socket.close();
}
}
存在的问题和优缺点
NIO 成功的解决了 BIO 需要开启多线程的问题,NIO 中一个线程就能解决多个 socket,但是还存在 2 个问题。
-
问题
-
这个模型在客户端少的时候十分好用,但是客户端如果很多,比如有 1 万个客户端进行连接,那么每次循环就要遍历 1 万个 socket,如果一万个 socket 中只有 10 个 socket 有数据,也会遍历一万个 socket,就会做很多无用功,每次遍历遇到 read 返回 -1 时仍然是一次浪费资源的系统调用。
-
而且这个遍历过程是在用户态进行的,用户态判断socket是否有数据还是调用内核的read()方法实现的,这就涉及到用户态和内核态的切换,每遍历一个就要切换一次,开销很大因为这些问题的存在。
-
-
优点
不会阻塞在内核的等待数据过程,每次发起的 I/O 请求可以立即返回,不用阻塞等待,实时性较好。
-
缺点
轮询将会不断地询问内核,这将占用大量的 CPU 时间,系统资源利用率较低,所以一般 Web 服务器不使用这种 I/O 模型。
结论:让 Linux 内核搞定上述需求,我们将一批文件描述符通过一次系统调用传给内核由内核层去遍历,才能真正解决这个问题。IO 多路复用应运而生,也即将上述工作直接放进 Linux 内核,不再两态转换而是直接从内核获得结果,因为内核是非阻塞的。
问题升级:如何用单线程处理大量的链接?
非阻塞式 IO 小总结
IO Multiplexing(IO 多路复用)
概述
词牌
多路复用(Multiplexing,又称“多任务”)是一个通信和计算机网络领域的专业术语,在没有歧义的情况下,“多路复用”也可被称为“复用”。 多路复用通常表示在一个信道上传输多路信号或数据流的过程和技术。 因为多路复用能够将多个低速信道集成到一个高速信道进行传输,从而有效地利用了高速信道。
模型
I/O 多路复用在英文种其实叫 I/O multiplexing
多个 Socket 复用一根网线这个功能是在内核+驱动层实现的I/O multiplexing 这里面的 multiplexing 指的其实是在单个线程通过记录跟踪每一个 Sock(I/O 流)的状态来同时管理多个 I/O 流. 目的是尽量多的提高服务器的吞吐能力。
大家都用过 nginx,nginx 使用 epoll 接收请求,ngnix 会有很多链接进来, epoll 会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。redis 类似同理。
FileDescriptor
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于 UNIX、Linux 这样的操作系统。
public final class FileDescriptor {
private int fd;
private long handle;
private Closeable parent;
private List<Closeable> otherParents;
private boolean closed;
}
IO 多路复用
IO multiplexing 就是我们说的 select,poll,epoll,有些技术书籍也称这种 IO 方式为 event driven IO 事件驱动 IO。就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。可以基于一个阻塞对象并同时在多个描述符上等待就绪,而不是使用多个线程(每个文件描述符一个线程,每次 new 一个线程),这样可以大大节省系统资源。所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select,poll,epoll 等函数就可以返回。
简述
模拟一个 tcp 服务器处理 30 个客户 socket,一个监考老师监考多个学生,谁举手就应答谁。
假设你是一个监考老师,让 30 个学生解答一道竞赛考题,然后负责验收学生答卷,你有下面几个选择:
第一种选择:按顺序逐个验收,先验收 A,然后是 B,之后是 C、D……这中间如果有一个学生卡住,全班都会被耽误,你用循环挨个处理 socket,根本不具有并发能力。
第二种选择:你创建 30 个分身线程,每个分身线程检查一个学生的答案是否正确。 这种类似于为每一个用户创建一个进程或者线程处理连接。
第三种选择,你站在讲台上等,谁解答完谁举手。这时 C、D 举手,表示他们解答问题完毕,你下去依次检查 C、D 的答案,然后继续回到讲台上等。此时 E、A 又举手,然后去处理 E 和 A……这种就是 IO 复用模型。Linux 下的 select、poll 和 epoll 就是干这个的。
将用户 socket 对应的 fd 注册进 epoll,然后 epoll 帮你监听哪些 socket 上有消息到达,这样就避免了大量的无用操作。此时的 socket 应该采用非阻塞模式。这样,整个过程只在调用 select、poll、epoll 这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的 reactor 反应模式。
作用
Redis 单线程模型如何处理那么多并发客户端连接,为什么单线程,为什么快
Redis 的 IO 多路复用,Redis 利用 epoll 来实现 IO 多路复用,将连接信息和事件放到队列中,依次放到事件分派器,事件分派器将事件分发给事件处理器。
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符) 。所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要 select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
所谓 I/O 多路复用机制,就是说通过一种考试监考机制,一个老师可以监视多个考生,一旦某个考生举手想要交卷了,能够通知监考老师进行相应的收卷子或批改检查操作。所以这种机制需要调用班主任(select/poll/epoll)来配合。多个考生被同一个班主任监考,收完一个考试的卷子再处理其它人,无需等待所有考生,谁先举手就先响应谁,当又有考生举手要交卷,监考老师看到后从讲台走到考生位置,开始进行收卷处理。Reactor 设计模式
-
是什么
基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术。
Reactor 模式中有 2 个关键组成:
-
Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
-
Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际办理人。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
-
-
每个网络连接其实都对应一个文件描述符
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分
- 多个套接字
- IO 多路复用程序
- 文件事件分派器
- 事件处理器
select,poll,epoll 都是 I/O 多路复用的具体实现
select 方法
-
Linux 官网或者 man
int select(int ndfs, // nfds:监控的文件描述字符集里最大文件描述符加 1 fd_set *readfds, // readfds:监控有读数据到达文件描述符集合,传入传出参数 fe_set *writefds, // writefds:监控写数据到达文件描述符集合,传入传出参数 fd_set *exceptfds, // exceptfds:监控异常发现达文件描述符集合,传入传出参数 struct timeval *timeout); // timeout:定时阻塞监控事件,三种情况 // 1.NULL,永远等待下去 // 2.设置 timeval,等待固定时间 // 3.设置 timeval 里时间均为 0,检查描述字后立即返回,轮询
select 函数监视的文件描述符分3类,分别是 readfds、writefds 和 exceptfds,将用户传入的数组拷贝到内核空间;调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有 except)或超时(timeout 指定等待时间,如果立即返回设为 null 即可),函数返回;当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。
https://man7.org/linux/man-pages/man2/select.2.html (opens in a new tab)
select 是第一个实现(1983 年左右在 BSD 里面实现)
-
用户态我们自己写的 java 代码思想
/** * select 其实就是把 NIO 中用户态要遍历的 fd 数组(我们的每一个 socket 连接,安装进 ArrayList 里面的那个)拷贝到了内核态,让内核态来遍历,因为用户态判断 socket 是否有数据还是要调用内核态的,所以拷贝到内核态后,这样的遍历判断的时候就不用一直用户态和内核态频繁切换了 */ while (true) { for (SocketChannel element : socketList) { int read = element.read(byteBuffer); if (read > 0) { System.out.println("---- 读取数据:" + read); byteBuffer.flip(); byte[] bytes = new byte[read]; byteBuffer.get(bytes); System.out.println(new String(bytes)); byteBuffer.clear(); } } SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { System.out.println("---- 成功连接"); socketChannel.configureBlocking(false); // 设置为非阻塞模式 socketList.add(socketChannel); System.out.println("---- socketList size:" + socketList.size()); } }
-
C 语言代码
分析 select 函数的执行流程:
- select 是一个阻塞函数,当没有数据时,会一直阻塞在 select 那一行;
- 当有数据时会将 rset 中对应的那一位置为 1;
- select 函数返回,不在阻塞;
- 遍历文件描述符数组,判断哪个 fd 被位置了;
- 读取数据,然后处理。
sockfd = socket(AF_INET, SOCK_STREAM, 0); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(2000); addr.sin_addr.s_addr = INADDR_ANY; bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)); listen(sockfd, 5); for (i = 0; i < 5; i++) { // 模拟 5 个客户端连接 memset(&client, 0, sizeof(client)); addrlen = sizeof(client); fds[i] = accept(sockfd, (struct sockaddr*)&client, &addrlen); if (fds[i] > max) max = fds[i]; // 找出一个最大的文件描述符 }
while (1) { FD_ZERO(&rset); for (i = 0; i < 5; i++) { FD_SET(fds[i], &rset); // &rset 是个 bitmap,如何 5 个文件描述符分别是 0,1,2,4,5,7,那么这个 bitmap 为 01101101 } puts("round again"); select(max+1, &rset, NULL, NULL, NULL); // select() 是一个系统调用,它会堵塞直到有数据发送到 socket,select 会把 &rset 相应的位置置位,但并不会返回哪个 socket 有数据 for (i = 0; i < 5; i++) { if (FD_ISSET(fds[i], &rset)) { // 用户态只要遍历 &rset,看哪一位被置位了,不需要每次调用系统调用来判断了,效率有很大提升,遍历到被置位的文件描述符就进行读取 memset(buffer, 0, MAXBUF); read(fds[i], buffer, MAXBUF); puts(buffer); } } }
-
优点
select 其实就是把 NIO 中用户态要遍历的 fd 数组(我们的每一个 socket 链接,安装进 ArrayList 里面的那个)拷贝到了内核态,让内核态来遍历,因为用户态判断 socket 是否有数据还是要调用内核态的,所有拷贝到内核态后,这样遍历判断的时候就不用一直用户态和内核态频繁切换了。
从代码中可以看出,select 系统调用后,返回了一个置位后的 &rset,这样用户态只需进行很简单的二进制比较,就能很快知道哪些 socket 需要 read 数据,有效提高了效率
while (1) { FD_ZERO(&rset); for (i = 0; i < 5; i++) { FD_SET(fds[i], &rset); // &rset 是个 bitmap,如何 5 个文件描述符分别是 0,1,2,4,5,7,那么这个 bitmap 为 01101101 } puts("round again"); select(max+1, &rset, NULL, NULL, NULL); // select() 是一个系统调用,它会堵塞直到有数据发送到 socket,select 会把 &rset 相应的位置置位,但并不会返回哪个 socket 有数据 for (i = 0; i < 5; i++) { if (FD_ISSET(fds[i], &rset)) { // 用户态只要遍历 &rset,看哪一位被置位了,不需要每次调用系统调用来判断了,效率有很大提升,遍历到被置位的文件描述符就进行读取 memset(buffer, 0, MAXBUF); read(fds[i], buffer, MAXBUF); puts(buffer); } } }
-
缺点
- bitmap最大 1024 位,一个进程最多只能处理 1024 个客户端
- &rset 不可重用,每次 socket 有数据就相应的位会被置位
- 文件描述符数组拷贝到了内核态(只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)),仍然有开销。select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)
- select 并没有通知用户态哪一个 socket 有数据,仍然需要 O(n) 的遍历。select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)
我们自己模拟写的是,RedisServerNIO.java,只不过将它内核化了。
-
select 结论
select 方式,既做到了一个线程处理多个客户端连接(文件描述符),又减少了系统调用的开销(多个文件描述符只有一次 select 的系统调用 + N 次就绪状态的文件描述符的 read 系统调用
poll 方法
-
Linux 官网或者 man
int poll(struct pollfd *fds, nfds_t nfds, int timeout); struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ }
https://man7.org/linux/man-pages/man2/poll.2.html (opens in a new tab)
1997 年实现了 poll
-
C 语言代码
struct pollfd { int fd; // 文件描述符 short events; // 关系的事件,比如读事件或写事件 short revents; // 如果该文件描述符有事件发生置1 } for (i = 0; i < 5; i++) { // 模拟 5 个客户端连接 memset(&client, 0, sizeof(client)); addrlen = sizeof(client); pollfds[i].fd = accept(sockfd, (struct sockaddr*)&client, &addrlen); pollfds[i].events = POLLIN; // 这 5 个 socket 只关系读事件 } sleep(1); while(1) { puts("round again"); poll(pollfds, 5, 50000); // poll 中传入 pollfds 数组,交给内核判断是否有事件发生,如果哪个发生事件则 revents 置 1 for (i = 0; i < 5; ++i) { // 遍历数组,找到哪个 pollfd 有事件发生 if (pollfds[i].revents & POLLIN) { pollfds[i].revents = 0; // 找到后 revents 置 0 memset(buffer, 0, MAXBUF); read(pollfds[i].fd, buffer, MAXBUF); // 读取数据 puts(buffer); } } }
- poll 的执行流程
- 将五个 fd 从用户态拷贝到内核态
- poll 为阻塞方法,执行 poll 方法,如果有数据会将 fd 对应的 revents 置为 POLLIN
- poll 方法返回
- 循环遍历,查找哪个 fd 被置位为 POLLIN 了
- 将 revents 重置为 0 便于复用
- 对置位的 fd 进行读取和处理
- 解决了问题
- 解决了 bitmap 大小限制
- 解决了 rset 不可重用的情况后面由于二者原理相同,所以没能解决
- poll 的执行流程
-
优点
- poll 使用 pollfd 数组来代替 select 中的 bitmap,数组没有 1024 的限制,可以一次管理更多的 client。它和 select 的主要区别就是,去掉了 select 只能监听 1024 个文件描述符的限制。
- 当 pollfds 数组中有事件发生,相应的 revents 置位为 1,遍历的时候又置位回零,实现了 pollfd 数组的重用
-
问题
poll 解决了 select 缺点中的前两条,其本质原理还是 select 的方法,还存在 select 中原来的问题
- pollfds 数组拷贝到了内核态,仍然有开销
- poll 并没有通知用户态哪一个 socket 有数据,仍然需要 O(n) 的遍历
epoll 方法
-
Linux 官网或者 man
int epoll_create(int size); int epoll_ctl(int epfd // 是 epoll_create() 的返回值 int op, // 表示 op 操作,用三个宏来表示:添加 POLL_CTL_ADD,删除 EPOLL_CTL_DEL, 修改 EPOLL_CTL_MOD。三操作对 fd 的监听 int fd, // 是需要监听的 fd(文件描述符) struct epoll_event *event); // epoll_event 告诉内核要监听什么事 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; // events 可以是以下几个宏的集合 EPOLLIN:表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭); EPOLLOUT:表示对应的文件描述符可以写;
函数 含义 int epoll_create(int size) 参数 size 并不是限制了 epoll 所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 见上图 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) 等待 epfd 上的 io 事件,最多返回 maxevents 个事件。参数 events 用来从内核得到事件的集合,maxevents 告之内核这个 events 有多大。 -
三步调用
-
epoll_create
创建一个 epoll 句柄
int epoll_create(int size);
-
epoll_ctl
向内核添加、修改或删除要监控的文件描述符
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
-
epoll_wait
类似发起了 select() 调用
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
-
-
C 语言代码
struct epoll_event events[5]; int epfd = epoll_create(10); // epoll_create() 在内核开辟一块内存空间,用来存放 epoll 中 fd 的数据结构 for (i = 0; i < 5; i++) { // 模拟5个客户端连接 static struct epoll_event ev; // epoll 中 fd 的数据结构和 poll 中的差不多,只是没有了 revents memset(&client, 0, sizeof(client)); addrlen = sizeof(client); ev.data.fd = accept(sockfd, (struct sockaddr*)&client, &addrlen); ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_AADD, ev.data.fd, &ev); //epoll_ctl 把每个 socket 的 fd 数据结构放到 epoll_create() 创建的内存空间中 } while(1) { puts("round again"); nfds = epoll_wait(epfd, events, 5, 10000); // epoll_wait() 阻塞,只有当 epoll_create() 中创建的内存空间中的 fd 有事件发生,才会把这些 fd 放入就绪链表中,返回就绪 fd 的个数 for (i = 0; i < nfds; i++) { // 遍历就绪链表,读取数据 memset(buffer, 0, MAXBUF); read(events[i].data.fd, buffer, MAXBUF); puts(buffer); } }
epoll 是非阻塞的 是非阻塞的!!epoll 的执行流程
- 当有数据的时候,会把相应的文件描述符“置位”,但是 epool 没有 revent 标志位,所以并不是真正的置位。这时候会把有数据的文件描述符放到队首。
- epoll 会返回有数据的文件描述符的个数
- 根据返回的个数 读取前 N 个文件描述符即可
- 读取、处理
-
结论
多路复用快的原因在于,操作系统提供了这样的系统调用,使得原来的 while 循环里多次系统调用,变成了一次系统调用 + 内核层遍历这些文件描述符。
epoll 是现在最先进的 IO 多路复用器,Redis、Nginx,linux中的 Java NIO 都使用的是 epoll。这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。
-
一个 socket 的生命周期中只有一次从用户态拷贝到内核态的过程,开销小
-
使用 event 事件通知机制,每次 socket 中有数据会主动通知内核,并加入到就绪链表中,不需要遍历所有的 socket
在多路复用 IO 模型中,会有一个内核线程不断地去轮询多个 socket 的状态,只有当真正读写事件发送时,才真正调用实际的 IO 读写操作。因为在多路复用 IO 模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有真正有读写事件进行时,才会使用 IO 资源,所以它大大减少来资源占用。多路I/O复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。 采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响Redis性能的瓶颈
-
三个方法对比
select | poll | epoll | |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
数据结构 | bitmap | 数组 | 红黑树 |
最大连接数 | 1024(x86)或者 2048(x64) | 无上限 | 无上限 |
最大支持文件描述符 | 一般有最大值限制 | 65535 | 65535 |
fd 拷贝 | 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态 | 每次调用 poll,都需要把 fd 集合从用户态拷贝到内核态 | fd 首次调用 epoll_ctl 拷贝,每次调用 epoll_wait 不拷贝 |
工作效率 | 每次调用都进行线性遍历,时间复杂度为 O(n) | 每次调用都进行线性遍历,时间复杂度为 O(n) | 事件通知方式,每当 fd 就绪,系统注册的回调函数就会被调用,将就绪 fd 放到 readyList 里面,时间复杂度 O(1) |
5 种 I/O 模型总结
多路复用快的原因在于,操作系统提供了这样的系统调用,使得原来的 while 循环里多次系统调用,变成了一次系统调用 + 内核层遍历这些文件描述符。
所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要 select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理.