前边描述的文件IO和Socket IO,也都是阻塞的IO,在执行下一步之前必须等待上一步完成. 否则就得一直等待在原地.
- 可读事件: 如果socket缓冲区有数据可以读,对应的fd就是readable,应用可以从fd中把数据读取走,进行应用处理。
- 可写事件: 如果socket缓冲区有空间可以写,对应的fd就是writable,应用可以数据写入到fd缓冲区,交给操作系统写入到协议栈
非阻塞IO
顾名思义,和阻塞IO相对应,在文件IO中的阻塞操作可以通过设置在阻塞中变得非阻塞,完成丑小鸭到丑大鸭的蜕变.
示例
/*** @author chenshun00@gmail.com* @module parse* @since 2020/11/30 10:21 下午*/public class TestIO {public static void main(String[] args) throws IOException {//开启一个ServerSocketChannel,对应了Socket IO中的ServerSocketServerSocketChannel serverSocketChannel = ServerSocketChannel.open();final Selector selector = Selector.open();serverSocketChannel.socket().setReuseAddress(true);serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 9998));//配置非阻塞,如果设置成true还是阻塞的配置serverSocketChannel.configureBlocking(false);//注意这里的第三个参数,会attach到SelectorKey上serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);System.out.println("-- server ready");while (true) {//jdk 11才有的方法selector.select(key -> {if (!key.isValid()) {return;}//获取到attach的keyfinal ServerSocketChannel ch = (ServerSocketChannel) key.attachment();try {if (key.isAcceptable()) {//获取endpoint,对应了serverSocket.accpet()返回的socketSocketChannel client_ch = ch.accept();if (client_ch != null) { // accept() may return null...System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());//客户端也需要配置成非阻塞client_ch.configureBlocking(false);//监听可读时间client_ch.register(selector, SelectionKey.OP_READ, key.attachment());}} else if (key.isReadable()) {//整体的流程就是从socket中读取数据//然后写入到队列中,然后注册监听可写事件//当触发可写事件时,从队列中获取数据并写回SocketChannel socketChannel = (SocketChannel) key.channel();socketChannel.register(selector, SelectionKey.OP_WRITE);ByteBuffer buffer = ByteBuffer.allocate(1024);int read = socketChannel.read(buffer);buffer.flip();if (read == -1) {throw new IOException("Socket closed");}String result = new String(buffer.array()).trim();System.out.println("receive:" + result);ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());if (pendingWrites == null) {synchronized (channelToPendingWrites) {pendingWrites = channelToPendingWrites.get(key.channel());if (pendingWrites == null) {pendingWrites = new ConcurrentLinkedQueue<>();channelToPendingWrites.put(key.channel(), pendingWrites);}}}pendingWrites.add(writeBuffer);socketChannel.register(selector, SelectionKey.OP_WRITE);} else if (key.isWritable()) {//参考可读事件的注释SocketChannel socketChannel = (SocketChannel) key.channel();final Queue<Object> objects = channelToPendingWrites.get(socketChannel);if (objects == null || objects.size() == 0) {final ByteBuffer allocate = ByteBuffer.allocate(12);allocate.put("hello,world!".getBytes());socketChannel.write(allocate);} else {final Object poll = objects.poll();socketChannel.write((ByteBuffer) poll);}socketChannel.register(selector, SelectionKey.OP_READ);}} catch (Exception e) {e.printStackTrace();}}, 10000L);}}private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();private static final byte[] ACK = "Data logged successfully\n".getBytes();}
