一、发送数据携带用户ID
携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信
需要定义一个实体实现Principal,实现getName()方法
@Getter@Setterpublic class User implements Principal {private String username;private String password;private String role;private List<Url> urls;@Overridepublic String getName() {return username;}}
定义用户拦截器做认证,并生成User,注入StompHeaderAccessor
/***用户拦截器**/public class UserInterceptor implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) {Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {//这里就是tokenObject name = ((Map) raw).get(Constants.TOKEN_KEY);if (name instanceof LinkedList) {// 设置当前访问器的认证用户String token = ((LinkedList) name).get(0).toString();String username = null;try {Map<String, Claim> claimMap = JWTUtils.verifyToken(token);username = claimMap.get("username").asString();if(username == null){throw new RuntimeException("websocket认证失败");}} catch (UnsupportedEncodingException e) {e.printStackTrace();throw new RuntimeException("websocket认证失败", e);} catch (ValidTokenException e) {e.printStackTrace();throw new RuntimeException("websocket认证失败", e);}User user = new User();user.setUsername(username);accessor.setUser(user);// User user = new User();// user.setUsername("lalala");// accessor.setUser(user);}}}return message;}@Overridepublic void postSend(Message<?> message, MessageChannel channel, boolean sent) {}@Overridepublic void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {}@Overridepublic boolean preReceive(MessageChannel channel) {return false;}@Overridepublic Message<?> postReceive(Message<?> message, MessageChannel channel) {return null;}@Overridepublic void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {}}
/*将客户端渠道拦截器加入spring ioc容器*/@Beanpublic UserInterceptor createUserInterceptor() {return new UserInterceptor();}
服务端
/*** 接收用户信息* */@MessageMapping(value = "/principal")public void test(Principal principal) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}
客户端:
/** 发送用户信息*/function send0() {stompClient.send("/app/principal", {},{});}
二、发送JSON数据体
服务端可以直接在函数中注入JavaBean或者Map,List或者String接收
服务端:
/*点对点通信*/@MessageMapping(value = "/P2P")public void templateTest(Principal principal, Map<String,String> data) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}
客户端:
/*** 发送JSON数据体* */function send() {stompClient.send("/app/P2P", {},JSON.stringify({ 'name': 'test' }));}
三、将参数携带到发送请求的URL路径中
使用@DestinationVariable注解,类似SpringMVC的@PathVirable
服务端:
/*** 接收路径参数* */@MessageMapping(value = "/path/{name}/{company}")public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}
客户端:
/** 发送路径参数*/function send2() {stompClient.send("/app/path/zhangsan/XXX公司", {},{});}
四、发送header
使用@Header注解
服务端:
/*** 接收header参数* */@MessageMapping(value = "/header")public void headerTest(Principal principal, @Header String one, @Header String two) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}
客户端:
/*** 发送header参数* */function send3() {stompClient.send("/app/header", {"one":"lalala", "two":"中国"},{});}
五、发送Httpsession中的数据
这里有一点儿小问题,我理解的是只能发送握手连接时的HttpSession中的数据
注册HttpSessionHandshakeIntercepror
/*** 注册stomp的端点*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域// 在网页上我们就可以通过这个链接// http://localhost:8080/webSocketServer// 来和服务器的WebSocket连接registry.addEndpoint("/webSocketServer").addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*").withSockJS();}
服务端:
/*** 接收HttpSession数据* */@MessageMapping(value = "/httpsession")public void httpsession( StompHeaderAccessor accessor) {String name = (String) accessor.getSessionAttributes().get("name");System.out.println(1111);}
客户端:
/*** 发送httpsession* */function send4() {stompClient.send("/app/httpsession", {},{});}
所有代码
前端JS
<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>Title</title><script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script><script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script><script>var socket = new SockJS("http://192.168.100.88:7601/demo/webSocketServer");var stompClient = Stomp.over(socket);window.onload = function () {connect();}//订阅消息function subscribe() {stompClient.subscribe('/user/queue/message', function (response) {console.log("/user/queue/message 你接收到的消息为:" + response);});}/*** 发送用户信息* */function send0() {stompClient.send("/app/principal", {},{});}/*** 发送JSON数据体* */function send() {stompClient.send("/app/P2P", {},JSON.stringify({ 'name': 'test' }));}/*** 发送路径参数* */function send2() {stompClient.send("/app/path/zhangsan/XXX公司", {},{});}/*** 发送header参数* */function send3() {stompClient.send("/app/header", {"one":"lalala", "two":"中国"},{});}/*** 发送httpsession* */function send4() {stompClient.send("/app/httpsession", {},{});}// /**// * 发送URL中?&参数// * */// function send5() {// stompClient.send("/app/param?name=张三", {},// {});// }function connect() {stompClient.connect({Authorization:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIxOTg1NjQxNjAsImlhdCI6MTUzMTg5NzUwMCwidXNlcm5hbWUiOiJ6cXcxMSJ9.VFR2EKUx5BTYLDkDogiLA9LfNVoPjOzQ3rTWoEy7He4"//这里可以改成token// name: 'admin' // 携带客户端信息},function connectCallback(frame) {// 连接成功时(服务器响应 CONNECTED 帧)的回调方法alert("success");subscribe();},function errorCallBack(error) {// 连接失败时(服务器响应 ERROR 帧)的回调方法alert("error");});}function disconnect() {if (stompClient != null) {stompClient.disconnect();}// setConnected(false);console.log("Disconnected");}</script></head><body><input type="text" id="info"/><button onclick="send5();">发送</button></body></html>
后端MessaeMapping处
package com.iscas.biz.test.controller;import com.iscas.templet.common.ResponseEntity;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.*;import org.springframework.messaging.simp.SimpMessagingTemplate;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.simp.user.SimpUser;import org.springframework.messaging.simp.user.SimpUserRegistry;import org.springframework.web.bind.annotation.RestController;import java.security.Principal;import java.util.Map;/*** 如有要看例子,请打开注释***/@RestController@Slf4jpublic class WebSoketDemoController {//spring提供的发送消息模板@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate SimpUserRegistry userRegistry;/*** 接收用户信息* */@MessageMapping(value = "/principal")public void test(Principal principal) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}/*** 接收数据体* */@MessageMapping(value = "/P2P")public void templateTest(Principal principal, Map<String,String> data) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}/*** 接收路径参数* */@MessageMapping(value = "/path/{name}/{company}")public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}/*** 接收header参数* */@MessageMapping(value = "/header")public void headerTest(Principal principal, @Header String one, @Header String two) {log.info("当前在线人数:" + userRegistry.getUserCount());int i = 1;for (SimpUser user : userRegistry.getUsers()) {log.info("用户" + i++ + "---" + user);}//发送消息给指定用户messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/message","服务器主动推的数据");}/*** 接收HttpSession数据* */@MessageMapping(value = "/httpsession")public void httpsession( StompHeaderAccessor accessor) {String name = (String) accessor.getSessionAttributes().get("name");System.out.println(1111);}// /**// * 接收param数据// * */// @MessageMapping(value = "/param")// public void param(String name) {// System.out.println(1111);// }/*广播*/@MessageMapping("/broadcast")@SendTo("/topic/getResponse")public ResponseEntity topic() throws Exception {return new ResponseEntity(200,"success");}}
Websocket配置类
package com.iscas.base.biz.config.stomp;import org.springframework.context.annotation.Bean;import org.springframework.messaging.simp.config.ChannelRegistration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.*;import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;/*** webscoket配置** @auth zhuquanwen***///@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer {/*** 注册stomp的端点*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域// 在网页上我们就可以通过这个链接// http://localhost:8080/webSocketServer// 来和服务器的WebSocket连接registry.addEndpoint("/webSocketServer").addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*").withSockJS();}/*** 配置信息代理*/@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 订阅Broker名称registry.enableSimpleBroker("/queue", "/topic");// 全局使用的消息前缀(客户端订阅路径上会体现出来)registry.setApplicationDestinationPrefixes("/app");// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/registry.setUserDestinationPrefix("/user/");}/*** 配置客户端入站通道拦截器*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {registration.interceptors(createUserInterceptor());}/*将客户端渠道拦截器加入spring ioc容器*/@Beanpublic UserInterceptor createUserInterceptor() {return new UserInterceptor();}@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) {registration.setMessageSizeLimit(500 * 1024 * 1024);registration.setSendBufferSizeLimit(1024 * 1024 * 1024);registration.setSendTimeLimit(200000);}}
用户拦截器
package com.iscas.base.biz.config.stomp;import com.auth0.jwt.interfaces.Claim;import com.iscas.base.biz.config.Constants;import com.iscas.base.biz.util.SpringUtils;import com.iscas.templet.exception.ValidTokenException;import com.iscas.base.biz.model.auth.User;import com.iscas.base.biz.util.JWTUtils;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.simp.SimpMessageHeaderAccessor;import org.springframework.messaging.simp.stomp.StompCommand;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.support.ChannelInterceptor;import org.springframework.messaging.support.MessageHeaderAccessor;import javax.servlet.http.HttpSession;import java.io.UnsupportedEncodingException;import java.util.LinkedList;import java.util.Map;/***用户拦截器**/public class UserInterceptor implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) {Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {//这里就是tokenObject name = ((Map) raw).get(Constants.TOKEN_KEY);if (name instanceof LinkedList) {// 设置当前访问器的认证用户// String token = ((LinkedList) name).get(0).toString();// String username = null;// try {// Map<String, Claim> claimMap = JWTUtils.verifyToken(token);// username = claimMap.get("username").asString();// if(username == null){// throw new RuntimeException("websocket认证失败");// }// } catch (UnsupportedEncodingException e) {// e.printStackTrace();// throw new RuntimeException("websocket认证失败", e);// } catch (ValidTokenException e) {// e.printStackTrace();// throw new RuntimeException("websocket认证失败", e);// }// User user = new User();// user.setUsername(username);// accessor.setUser(user);User user = new User();user.setUsername("lalala");accessor.setUser(user);}}} else if (StompCommand.SEND.equals(accessor.getCommand())) {//发送数据}return message;}@Overridepublic void postSend(Message<?> message, MessageChannel channel, boolean sent) {}@Overridepublic void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {}@Overridepublic boolean preReceive(MessageChannel channel) {return false;}@Overridepublic Message<?> postReceive(Message<?> message, MessageChannel channel) {return null;}@Overridepublic void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {}}
