例子
public class TestIO {public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//int kqfd = kqueue()final Selector selector = Selector.open();//f_cntlserverSocketChannel.socket().setReuseAddress(true);//bind(fd)serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 9998));//配置非阻塞,如果设置成true还是阻塞的配置//fcntl(fd,flags|n_noblack);serverSocketChannel.configureBlocking(false);//注意这里的第三个参数,会attach到SelectorKey上serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);System.out.println("-- server ready");while (true) {//jdk 11才有的方法selector.select(key -> {if (!key.isValid()) {return;}//获取到attach的keyfinal ServerSocketChannel ch = (ServerSocketChannel) key.attachment();try {if (key.isAcceptable()) {//获取endpoint,对应了serverSocket.accpet()返回的socketSocketChannel client_ch = ch.accept();if (client_ch != null) { // accept() may return null...System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());//客户端也需要配置成非阻塞client_ch.configureBlocking(false);//监听可读时间client_ch.register(selector, SelectionKey.OP_READ, key.attachment());}} else if (key.isReadable()) {//整体的流程就是从socket中读取数据//然后写入到队列中,然后注册监听可写事件//当触发可写事件时,从队列中获取数据并写回SocketChannel socketChannel = (SocketChannel) key.channel();socketChannel.register(selector, SelectionKey.OP_WRITE);ByteBuffer buffer = ByteBuffer.allocate(1024);int read = socketChannel.read(buffer);buffer.flip();if (read == -1) {throw new IOException("Socket closed");}String result = new String(buffer.array()).trim();System.out.println("receive:" + result);ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());if (pendingWrites == null) {synchronized (channelToPendingWrites) {pendingWrites = channelToPendingWrites.get(key.channel());if (pendingWrites == null) {pendingWrites = new ConcurrentLinkedQueue<>();channelToPendingWrites.put(key.channel(), pendingWrites);}}}pendingWrites.add(writeBuffer);socketChannel.register(selector, SelectionKey.OP_WRITE);} else if (key.isWritable()) {//参考可读事件的注释SocketChannel socketChannel = (SocketChannel) key.channel();final Queue<Object> objects = channelToPendingWrites.get(socketChannel);if (objects == null || objects.size() == 0) {final ByteBuffer allocate = ByteBuffer.allocate(12);allocate.put("hello,world!".getBytes());socketChannel.write(allocate);} else {final Object poll = objects.poll();socketChannel.write((ByteBuffer) poll);}socketChannel.register(selector, SelectionKey.OP_READ);}} catch (Exception e) {e.printStackTrace();}}, 10000L);}}private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();private static final byte[] ACK = "Data logged successfully\n".getBytes();}
讨论什么?
- 理解IO多路复用,Java中的概念分别代表了什么.
- 零拷贝? 不再需要拷贝了么?
Java NIO中的register,Acceptable,Readable, writeable代表什么意思。
IO模型I — 阻塞
- 系统调用

#include <stdio.h>int main() {printf("Hello, World!");return 0;}
输出结果
[root@iZbp11om21c05wzu8e4tx0Z test]# vim hello.c[root@iZbp11om21c05wzu8e4tx0Z test]# gcc hello.c -o hello[root@iZbp11om21c05wzu8e4tx0Z test]# strace ./helloexecve("./hello", ["./hello"], [/* 22 vars */]) = 0brk(NULL) = 0x6fe000mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d87000access("/etc/ld.so.preload", R_OK) = -1 ENOENT (No such file or directory)open("/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3fstat(3, {st_mode=S_IFREG|0644, st_size=30947, ...}) = 0mmap(NULL, 30947, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7f1fa6d7f000close(3) = 0open("/lib64/libc.so.6", O_RDONLY|O_CLOEXEC) = 3read(3, "\177ELF\2\1\1\3\0\0\0\0\0\0\0\0\3\0>\0\1\0\0\0\20&\2\0\0\0\0\0"..., 832) = 832fstat(3, {st_mode=S_IFREG|0755, st_size=2156160, ...}) = 0mmap(NULL, 3985888, PROT_READ|PROT_EXEC, MAP_PRIVATE|MAP_DENYWRITE, 3, 0) = 0x7f1fa6799000mprotect(0x7f1fa695c000, 2097152, PROT_NONE) = 0mmap(0x7f1fa6b5c000, 24576, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_DENYWRITE, 3, 0x1c3000) = 0x7f1fa6b5c000mmap(0x7f1fa6b62000, 16864, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_FIXED|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6b62000close(3) = 0mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d7e000mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d7c000arch_prctl(ARCH_SET_FS, 0x7f1fa6d7c740) = 0mprotect(0x7f1fa6b5c000, 16384, PROT_READ) = 0mprotect(0x600000, 4096, PROT_READ) = 0mprotect(0x7f1fa6d88000, 4096, PROT_READ) = 0munmap(0x7f1fa6d7f000, 30947) = 0fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 0), ...}) = 0mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f1fa6d86000write(1, "Hello, World!", 13Hello, World!) = 13exit_group(0) = ?+++ exited with 0 +++
- 缓冲区
- 概念
- 多路
- 复用
- 可读、可写事件
- 边缘触发、水平触发
- 数据结构 - O(n), O(logn)
- 关联Java中的NIO
- 零拷贝
- 什么是零拷贝? 完全没有拷贝么?
- Reactor 模型 (原文章已经不能下载了) 《Scalable IO in Java》
- Redis中的使用(强烈建读)
Netty中的使用
-
实操
单线程
int main() {int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //AF_INT:ipv4, SOCK_STREAM:tcp协议//将套接字和IP、端口绑定struct sockaddr_in *serv_addr = malloc(sizeof(struct sockaddr_in));serv_addr->sin_family = AF_INET; //使用IPv4地址serv_addr->sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址serv_addr->sin_port = htons(11234); //端口int on = 1;if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) == -1) {printf("处理失败SO_REUSEPORT失败");exit(1);}bind(serv_sock, (struct sockaddr *) serv_addr, sizeof(struct sockaddr));listen(serv_sock, 20);for (;;) {int fd = accept(serv_sock, NULL, NULL);char *hello = "hello chenshun!";char cc[20];ssize_t rr = read(fd, cc, 10);if (rr == -1) {close(fd);}printf("i receive your message:%s\n", cc);write(fd, hello, strlen(hello));write(fd, "good bye!", strlen("good bye!"));close(fd);}return 0;}
多线程
```c void run(void ff) { int fd = (int ) (ff); char *hello = “hello chenshun!”; char cc[11]; ssize_t rr = read(fd, cc, 10); if (rr == -1) { close(fd); } printf(“i receive your message:%s\n”, cc); write(fd, hello, strlen(hello)); write(fd, “good bye!”, strlen(“good bye!”)); close(fd); return 0; }
-
int main() { int serv_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //AF_INT:ipv4, SOCK_STREAM:tcp协议 //将套接字和IP、端口绑定 struct sockaddr_in serv_addr = malloc(sizeof(struct sockaddr_in)); serv_addr->sin_family = AF_INET; //使用IPv4地址 serv_addr->sin_addr.s_addr = inet_addr(“127.0.0.1”); //具体的IP地址 serv_addr->sin_port = htons(11234); //端口 int on = 1; if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) == -1) { printf(“处理失败SO_REUSEPORT失败”); exit(1); } bind(serv_sock, (struct sockaddr ) serv_addr, sizeof(struct sockaddr)); listen(serv_sock, 20);
for (;;) {int fd = accept(serv_sock, NULL, NULL);pthread_attr_t attr;pthread_t thread;size_t stacksize;pthread_attr_init(&attr);pthread_attr_getstacksize(&attr, &stacksize);if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);int res = pthread_create(&thread, &attr, run, &fd);printf("创建线程执行:%d\n", res);if (res != 0) {exit(-1);}}return 0;
}
<a name="ga2zT"></a>#### IO多路复用```c//// Created by chenshun on 2022/3/21.//#include <sys/socket.h>#include <sys/event.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <string.h>#include <stdlib.h>const size_t PAGE = 1024 * 16;const int kReadEvent = 1;const int kWriteEvent = 2;void updateEvent(int kqFd, int fd, int events) {struct kevent ke;if (events & kReadEvent) {EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, 0);kevent(kqFd, &ke, 1, NULL, 0, NULL);}if (events & kWriteEvent) {EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);kevent(kqFd, &ke, 1, NULL, 0, NULL);}}void delEvent(int kqFd, int fd, int events) {struct kevent ke;if (events & kReadEvent) {EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);kevent(kqFd, &ke, 1, NULL, 0, NULL);}if (events & kWriteEvent) {EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);kevent(kqFd, &ke, 1, NULL, 0, NULL);}}void handleAccept(int kq, int socket_fd) {struct sockaddr_storage sa;socklen_t salen = sizeof(sa);int client = accept(socket_fd, (struct sockaddr *) &sa, &salen);int flags = fcntl(client, F_GETFL, 0);fcntl(client, F_SETFL, flags | O_NONBLOCK);updateEvent(kq, client, kReadEvent);}void handleRead(int kq, int fd) {char buf[PAGE];size_t rr = read(fd, &buf, PAGE);if (rr == 0) {delEvent(kq, fd, kReadEvent | kWriteEvent);close(fd);return;}delEvent(kq, fd, kReadEvent);if (rr == 6 && !strcmp("exit\r\n", buf)) {write(fd, "good bye!\n", strlen("good bye!\n"));close(fd);return;}printf("receive bytes: %zu, i have receive your message: %s\n", rr, buf);updateEvent(kq, fd, kWriteEvent);}void handleWrite(int kq, int fd) {char *send = "i have receive your message, expect your next message\n";write(fd, send, strlen(send));delEvent(kq, fd, kWriteEvent);updateEvent(kq, fd, kReadEvent);}void loop_once(int kq, int socket_fd, int waitms) {struct timespec timeout;timeout.tv_sec = waitms / 1000;timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;struct kevent *event = malloc(sizeof(struct kevent *));int retval = kevent(kq, NULL, 0, event, 20, &timeout);if (retval > 0) {printf("收到事件: %d 件\n", retval);}for (int i = 0; i < retval; i++) {struct kevent ev = event[i];uintptr_t fd = ev.ident;if (ev.filter == EVFILT_READ) {//处理可读时间if (fd == socket_fd) {handleAccept(kq, socket_fd);} else {handleRead(kq, (int) fd);}}if (ev.filter == EVFILT_WRITE) {//处理可写事件handleWrite(kq, (int) fd);}}free(event);}int main() {int port = 19999;int socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);struct sockaddr_in *serv_addr = malloc(sizeof(struct sockaddr_in));if (serv_addr == NULL) {printf("分配内存失败");exit(1);}serv_addr->sin_family = AF_INET;serv_addr->sin_addr.s_addr = inet_addr("127.0.0.1");serv_addr->sin_port = htons(port);//int on = 1;setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));int flags = fcntl(socket_fd, F_GETFL, 0);fcntl(socket_fd, F_SETFL, flags | O_NONBLOCK);if (bind(socket_fd, (struct sockaddr *) serv_addr, sizeof(struct sockaddr)) < 0) {printf("bind 失败!");exit(1);}listen(socket_fd, 20);printf("使用telnet 127.0.0.1 19999 来进行测试\n");printf("输入exit来断开TCP链接\n");int queue = kqueue();//注册updateEvent(queue, socket_fd, kReadEvent);for (;;) {loop_once(queue, socket_fd, 10000);}return 0;}
多线程IO多路复用
无
