一、案例1
要求
- 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 实现多人群聊
- 服务器端:可以监测用户上线,离线,并实现消息转发功能
- 客户端:通过 Channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
-
1、服务端
public class GroupChatServer {// 定义属性private Selector selector;private ServerSocketChannel listenChannel;private static final int PORT = 6667;// 构造器// 初始化工作public GroupChatServer() {try {// 得到选择器selector = Selector.open();// 获取 ServerSocketChannellistenChannel = ServerSocketChannel.open();// 绑定端口listenChannel.socket().bind(new InetSocketAddress(PORT));// 配置非阻塞模式listenChannel.configureBlocking(false);// 将该 listenChannel 注册到 selectorlistenChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}// 监听public void listen() {try {while (true) {// int count = selector.select(2000);int count = selector.select();if (count > 0) { // 有事件处理// 遍历得到的 SelectionKey 集合Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {// 取出 selectionKeySelectionKey key = iterator.next();// 监听到 acceptif (key.isAcceptable()) {SocketChannel socketChannel = listenChannel.accept();socketChannel.configureBlocking(false);// 将该 socketChannel 注册到 SelectorsocketChannel.register(selector, SelectionKey.OP_READ);// 提示System.out.println(socketChannel.getRemoteAddress() + " 上线 ");}if (key.isReadable()) { //通道发送read事件,即通道可读// 处理读readData(key);}// 当前的key删除,防止重复处理iterator.remove();}} else {System.out.println("等待。。。。。");}}} catch (Exception e) {e.printStackTrace();} finally {}}// 读取客户端信息private void readData(SelectionKey key) {SocketChannel channel = null;try {// 取到关联的 channelchannel = (SocketChannel) key.channel();// 创建 bufferByteBuffer buffer = ByteBuffer.allocate(1024);int count = channel.read(buffer);// 根据count做处理if (count > 0) {String msg = new String(buffer.array());System.out.println("from 客户端:" + msg);// 向其他客户端转发消息(排除自己)sendInfoToOtherClients(msg, channel);}} catch (Exception e) {try {System.out.println(channel.getRemoteAddress() + "离线");// 取消注册key.cancel();// 关闭通道channel.close();} catch (IOException ioException) {ioException.printStackTrace();}}}// 转发消息给其他客户private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {System.out.println("服务器转发消息中...");// 遍历 所有注册到 selector 上的 SocketChannel,并删除 selffor (SelectionKey key : selector.keys()) {Channel targetChannel = key.channel();// 排除自己if (targetChannel instanceof SocketChannel && targetChannel != self) {// 转型SocketChannel dest = (SocketChannel) targetChannel;// 将 msg 存储到bufferByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));// 将 buffer 数据写入到 通道System.out.println("转发给" + dest.getRemoteAddress().toString().substring(1) + ",内容:【" + msg + "]");dest.write(buffer);}}}public static void main(String[] args) {GroupChatServer groupChatServer = new GroupChatServer();groupChatServer.listen();}}
2、客户端
public class GroupChatClient {private final static String HOST = "127.0.0.1";private final static int PORT = 6667;private Selector selector;private SocketChannel socketChannel;private String username;// 构造器,完成初始化工作public GroupChatClient() throws IOException {selector = Selector.open();// 连接服务器socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));// 设置非阻塞socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);// 得到 username// username = socketChannel.getLocalAddress().toString().substring(1);username = socketChannel.getLocalAddress().toString().substring(1);System.out.println(username + " is ok...");}// 向服务器发送消息public void sendInfo(String info) {info = username + "说:" + info;try {socketChannel.write(ByteBuffer.wrap(info.getBytes(StandardCharsets.UTF_8)));} catch (IOException e) {e.printStackTrace();}}// 读取从服务器端回复的消息public void readInfo() {try {// int readChannels = selector.select(2000);int readChannels = selector.select();if (readChannels > 0) {Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if (key.isReadable()) {// 得到相关的通道SocketChannel socketChannel = (SocketChannel) key.channel();// 得到一个bufferByteBuffer buffer = ByteBuffer.allocate(1024);// 读取socketChannel.read(buffer);String msg = new String(buffer.array());System.out.println(msg);}}// 删除当前 SelectionKey 防止重复操作iterator.remove();} else {// System.out.println("没有可用的通道");}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException {// 启动客户端GroupChatClient groupChatClient = new GroupChatClient();// 启动一个线程,每隔三秒,从服务器端读取数据new Thread(new Runnable() {@Overridepublic void run() {while (true) {groupChatClient.readInfo();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();// 发送数据给服务器Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String s = scanner.nextLine();groupChatClient.sendInfo(s);}}}
