整个TimeServer的代码较多,不过逻辑应该是比较清楚的,还加了注释。
public class NIOTimeServer {//默认端口号private static final int PORT = 8585;public static void main(String[] args) throws IOException {new Thread(new TimeServerTask(PORT)).start();}private static class TimeServerTask implements Runnable{private Selector selector;private ServerSocketChannel serverSocketChannel;//停止channel 标识private volatile boolean stop = false;/*** 初始化channel 监听端口 初始化selector*/public TimeServerTask(int port) {try {//初始化通道和选择器selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();//设置为非阻塞模式serverSocketChannel.configureBlocking(false);//绑定端口serverSocketChannel.bind(new InetSocketAddress(port));//将通道注册到选择器serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("The TimeServer started at port: " + port);} catch (IOException e) {e.printStackTrace();}}/*** 停止*/public void stop(){this.stop = true;}@Overridepublic void run() {while(!stop){try{//这里使用带有时间限制的阻塞select方法selector.select(1000);//获取就绪通道Set<SelectionKey> selectionKeys = selector.selectedKeys();//遍历就绪通道Iterator<SelectionKey> iterator = selectionKeys.iterator();SelectionKey selectionKey;while(iterator.hasNext()){selectionKey = iterator.next();iterator.remove();try{handleInput(selectionKey);}catch (IOException e){selectionKey.cancel();if(null != selectionKey.channel()){selectionKey.channel().close();}}}}catch (Throwable t){t.printStackTrace();}}//停止 就关闭selector selector关闭后 所有注册到它上面的channel也会被关闭if(null != selector){try {selector.close();} catch (IOException e) {e.printStackTrace();}}}
/*** 处理输入*/private void handleInput(SelectionKey selectionKey) throws IOException {//先使用isValid方法判断是否可用if(selectionKey.isValid()){//判断是否可接收连接if(selectionKey.isAcceptable()){//从SelectionKey中拿到与之对应的ServerChannelServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();//建立连接SocketChannel socketChannel = channel.accept();//非阻塞模式socketChannel.configureBlocking(false);//将socketChannel也注册到selectorsocketChannel.register(selector,SelectionKey.OP_READ);}if(selectionKey.isReadable()){//读取数据SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer buffer = ByteBuffer.allocate(1025-1);//因为channel是非阻塞的 所以read方法可能返回-1 0 以及正常读取到的字节数量int read = channel.read(buffer);if(read>0){//回绕缓冲区 便于获取有效元素buffer.flip();byte[] elements = new byte[buffer.remaining()];//将buffer中的数据写入数组buffer.get(elements);String content = new String(elements, StandardCharsets.UTF_8);System.out.println("The TimeServer received order: " + content );String response = "QUERY TIME ORDER ".equalsIgnoreCase(content)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";//写回响应doWrite(response,channel);}else if(read<0){//对端链路关闭selectionKey.cancel();channel.close();}else {//读到0字节 忽略}}}}private void doWrite(String response,SocketChannel socketChannel) throws IOException {if(response!=null && response.trim().length()>0){socketChannel.write(ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8)));}}}}
这样我们启动main线程,就会启动这个TimeServer。
