public class NIOTimeClient { //本地默认端口号 private static final int LOCAL_PORT = 3738; //服务端默认端口号 private static final int REMOTE_PORT = 8585; public static void main(String[] args) { new Thread(new TimeClientHandler("127.0.0.1", REMOTE_PORT)).start(); } private static class TimeClientHandler implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop = false; public TimeClientHandler(String host, int port) { this.host = host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); //从本地3738端口连接 socketChannel.bind(new InetSocketAddress(LOCAL_PORT)); socketChannel.configureBlocking(false); System.out.println("TimeClient started at port " + LOCAL_PORT); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { //选择就绪通道 selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey selectionKey; while (it.hasNext()) { selectionKey = it.next(); it.remove(); try { handleInput(selectionKey); } catch (Exception e) { if (null != selectionKey) { selectionKey.cancel(); if (null != selectionKey.channel()) { selectionKey.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //停止 关闭selector if(null != selector){ try { System.out.println("关闭selector"); //selector关闭之后 注册在其上面的channel都会关闭 selector.close(); } catch (IOException e) { e.printStackTrace(); } } }
/** * 处理输入 */ private void handleInput(SelectionKey selectionKey) throws IOException { if(selectionKey.isValid()){ //获取key对应的channel SocketChannel channel = (SocketChannel) selectionKey.channel(); //首先判断connection if(selectionKey.isConnectable()){ if(channel.finishConnect()){ channel.register(selector,SelectionKey.OP_READ); doWrite(channel); }else { System.exit(1); } } //判断是否可读 if(selectionKey.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(1025-1); int read = channel.read(buffer); if(read>0){ //回绕缓冲区 方便读取 buffer.flip(); byte[] elements = new byte[buffer.remaining()]; buffer.get(elements); String content = new String(elements,StandardCharsets.UTF_8); System.out.println("The time client received time : " + content); stop = true; }else if(read < 0){ selectionKey.cancel(); channel.close(); }else { //读到0字节 忽略 } } } } private void doConnect() throws IOException { //如果直接连接成功 就将其注册到selector并监听读取就绪事件 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { //没有连接成功 就注册 注意这里是OP_CONNECT socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel socketChannel) throws IOException { byte[] request = "QUERY TIME ORDER ".getBytes(StandardCharsets.UTF_8); ByteBuffer byteBuffer = ByteBuffer.allocate(request.length); byteBuffer.put(request); byteBuffer.flip(); socketChannel.write(byteBuffer); if (!byteBuffer.hasRemaining()) { System.out.println("Send order to server successfully."); } } }}