0%

Socket & Java NIO

bing

Socket

IO 模型

一个输入操作包括两个阶段:

  • 等待数据准备好:网络编程是等待数据从网络中到达,当数据到达时存储到内核的缓冲区中
  • 从内核向应用进程复制数据:将数据从内核缓冲区复制到应用进程缓冲区

IO 模型:

同步IO、异步IO、阻塞IO、非阻塞IO之间的联系与区别

  • 阻塞 IO

    • 应用进程被阻塞,等待数据准备好,直到数据从内核缓冲区复制到应用进程缓冲区

    • 当进程被阻塞时,其他进程可以执行,不会消耗 CPU 时间

      pic

  • 非阻塞 IO

    • 应用进程不会阻塞地等待数据准备好,如果数据没有准备好,内核会返回一个错误码,此时应用进程可以执行其他的操作,但需要通过轮询的方式进行系统调用直到数据准备好
    • 非阻塞 IO 会不断进行系统调用,CPU 利用率不高

    pic

  • IO 复用

    • 使用selectpoll选择器等待数据,如果没有准备好进程被阻塞,一旦有至少一个数据准备好,就执行系统调用,将数据从内核态复制到用户态
    • 使用 IO 复用可以让一个进程处理多个 IO 请求,如果不使用 IO 复用,每个 Socket 都要创建一个线程去处理, IO 复用避免了线程切换带来的开销

    pic

    selectpollepoll 都是多路 IO 复用的解决方案

    • select :通过轮询的方式遍历 Socket,全部遍历并且存在最大连接数限制
    • poll:通过轮询的方式遍历 Socket,全部遍历,不存在最大连接数限制
    • epoll:只遍历已经准备好的 Socket
  • 信号驱动 IO

    • 应用进程使用 sigaction 系统调用,内核立即返回,应用进程不会阻塞,可以继续执行;当数据准备好内核向应用进程发送信号,之后应用进程执行系统调用将数据从内核缓冲区复制到应用进程缓冲区

    • 信号驱动 I/O 的 CPU 利用率更高

    pic

  • 异步 IO

    • 应用进程执行 aio_read 系统调用会立即返回,应用进程可以继续执行,不会被阻塞,内核会在所有操作完成之后向应用进程发送信号

    pic

IO 模型对比

  • 同步 IO:将数据从内核缓冲区复制到应用进程缓冲区时,应用进程会阻塞
  • 异步 IO:将数据从内核缓冲区复制到应用进程缓冲区时,应用进程不会阻塞

对比:

  • 阻塞 IO:是同步 IO,在数据准备阶段会阻塞,不会占用 CPU 时间
  • 非阻塞 IO:是同步 IO,在数据准备阶段不会阻塞,轮询进行系统调用直到数据准备好,CPU 利用率低
  • IO 复用:是同步 IO,在数据准备阶段会阻塞,通过选择器轮询,可以避免线程切换
  • 信号驱动 IO:是同步 IO,在数据准备阶段不会阻塞,通过信号判断数据是否准备好,CPU 利用率高
  • 异步 IO:不是同步 IO,在数据准备阶段不会阻塞,直到完全复制完再通知应用进程

IO 复用

select

1
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • 描述符类型使用数组实现,有描述符数量的限制

  • select 有三种类型的描述符 / 事件类型,分别为读、写、异常类型的描述符

  • 调用select会一直阻塞,直到有描述符的事件到达或者超时

  • 每次系统调用都需要将全部描述符从应用进程缓冲区复制到内核缓冲区

  • 系统调用的返回结果中没有声明哪些描述符已经准备好,需要应用进程全部轮询判断哪些描述符准备好

  • 几乎所有系统都支持select

poll

1
int poll(struct pollfd *fds, unsigned int nfds, int timeout);
  • 描述符类型使用链表实现,没有描述符数量的限制
  • 提供了更多的事件类型
  • 每次调用都需要将全部描述符从应用进程缓冲区复制到内核缓冲区
  • 系统调用的返回结果中没有声明哪些描述符已经准备好,需要应用进程全部轮询判断哪些描述符准备好
  • 比较新的系统支持poll

epoll

1
2
3
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  • epoll 只需要将描述符从进程缓冲区向内核缓冲区拷贝一次
  • 没有描述符数量的限制
  • 系统调用的返回结果是准备好的事件
  • 只有 Linux 支持epoll

触发模式:

  • LT(level trigger):当 epoll_wait() 检测到描述符事件到达时,将此事件通知进程,进程可以不立即处理该事件,下次调用 epoll_wait() 会再次通知进程。是默认的一种模式,并且同时支持 Blocking 和 No-Blocking
  • ET(edge trigger):通知之后进程必须立即处理事件,下次再调用 epoll_wait() 时不会再得到事件到达的通知

应用场景

  • select:可移植性更高;超时时间支持ns,更适用于实时性要求比较高的场景
  • poll:没有最大描述符数量的限制,如果平台支持并且对实时性要求不高,应该使用 poll 而不是 select
  • epoll:只需要运行在 Linux 平台上,有大量的描述符需要同时轮询,并且这些连接最好是长连接

java NIO

NIO & IO

  • I/O 以流的方式处理数据,而 NIO 以块(缓冲)的方式处理数据
  • IO 是阻塞 IO,NIO 是非阻塞 IO

通道

  • FileChannel:不能配置非阻塞
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

缓冲区

缓冲区类型

  • ByteBuffer
  • CharBuffer
  • ShotBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

缓冲区状态变量

  • capacity:最大容量
  • position:当前已经读写的字节数
  • limit-positon:还可以读写的字节数

文件 IO 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void fastCopy(String src, String desc) throws IOException{
FileInputStream in = new FileInputStream(src);
FileChannel fcin = in.getChannel();

FileOutputStream out = new FileOutputStream(desc);
FileChannel fcout = out.getChannel();

ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

while(true) {
int cur = fcin.read(buffer);
if(cur == -1)
break;
buffer.flip();
fcout.write(buffer);
buffer.clear();
}
in.close();
out.close();
}

Socket NIO 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 服务端
package com.apathy;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class SocketNIOService {

public static void main(String[] args) throws IOException{
Selector sel = Selector.open(); // 创建选择器

// 创建ServerSocketChannel,监听新进来的TCP连接
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false); // 设置非阻塞 IO
/* 将通道注册到选择器上
SelectionKey.OP_CONNECT:连接就绪,一个Channel成功连接到另一个服务器
SelectionKey.OP_ACCEPT:接收就绪,一个ServerSocketChannel准备好接收新进入的连接
SelectionKey.OP_READ:读就绪,一个有数据可读的通道
SelectionKey.OP_WRITE:写就绪,一个等待写数据的通道
*/
socketChannel.register(sel, SelectionKey.OP_ACCEPT);

// 将 ServerSocketChannel 绑定到特定端口
ServerSocket serverSocket = socketChannel.socket(); // 创建套接字
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
serverSocket.bind(address); // 绑到特定端口

while(true) {
sel.select(); // 监听事件,没有事件会阻塞
Set<SelectionKey> set = sel.selectedKeys(); // 获取到达的事件
Iterator<SelectionKey> iterator = set.iterator(); // 使用迭代器遍历
while(iterator.hasNext()) { // 轮询
SelectionKey key = iterator.next();
if(key.isAcceptable()) { // 连接就绪
// 如果是连接请求,将连接通道注册到 selector 上
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(sel, SelectionKey.OP_READ);
} else if(key.isReadable()) { // 读就绪
SocketChannel sc = (SocketChannel) key.channel();
System.out.println(readDataFromSocketChannel(sc));
sc.close();
}
iterator.remove();
}
}
}

private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {

ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder data = new StringBuilder();

while (true) {
buffer.clear();
int n = sChannel.read(buffer);
if (n == -1) {
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0; i < limit; i++) {
dst[i] = (char) buffer.get(i);
}
data.append(dst);
buffer.clear();
}
return data.toString();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 客户端
package com.apathy;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

public class SocketNIOClient {

public static void main(String[] args) throws IOException{
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream ops = socket.getOutputStream();
String str = "hello, apathy";
ops.write(str.getBytes());
ops.close();
socket.close();
}
}