SpringBoot使用WebSocket实现即时消息

 环境:SpringBoot2.3.9.RELEASE

依赖

<dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-websocket</artifactId> </dependency> 

定义消息类型

抽象消息对象

public class AbstractMessage {      /**      *  消息类型      */     protected String type ;     /**      *  消息内容      */     protected String content ;     /**      *  消息日期      */     protected String date ; } 

消息对象子类

1、使用t实时消Ping检查消息

public class PingMessage extends AbstractMessage {      public PingMessage() { }     public PingMessage(String type) {          this.type = type ;     } } 

 2、现即息系统消息

public class SystemMessage extends AbstractMessage {      public SystemMessage() { }     public SystemMessage(String type,使用t实时消 String content) {          this.type = type ;         this.content = content ;     } } 

 3、点对点消息

public class PersonMessage extends AbstractMessage {      private String fromName ;     private String toName ; } 

 消息类型定义 

public enum MessageType {      /**      *  系统消息 0000;心跳检查消息 0001;点对点消息2001      */     SYSTEM("0000"),现即息 PING("0001"), PERSON("2001") ;     private String type ;     private MessageType(String type) {          this.type = type ;     }     public String getType() {          return type;     }     public void setType(String type) {          this.type = type;     } } 

 WebSocket服务端点

该类作用就是定义客户端连接的地址

@ServerEndpoint(value = "/message/{ username}",      encoders = { WsMessageEncoder.class},     decoders = { WsMessageDecoder.class},     subprotocols = { "gmsg"},     configurator = MessageConfigurator.class)   @Component   public class GMessageListener {        public static ConcurrentMap<String, UserSession> sessions = new ConcurrentHashMap<>();     private static Logger logger = LoggerFactory.getLogger(GMessageListener.class) ;     private String username ;     @OnOpen       public void onOpen(Session session, EndpointConfig config, @PathParam("username") String username){          UserSession userSession = new UserSession(session.getId(), username, session) ;         this.username = username ;         sessions.put(username, userSession) ;         logger.info("【{ }】用户进入, 当前连接数:{ }", username, sessions.size()) ;      }       @OnClose       public void onClose(Session session, CloseReason reason){            UserSession userSession = sessions.remove(this.username) ;         if (userSession != null) {              logger.info("用户【{ }】, 断开连接, 当前连接数:{ }", username, sessions.size()) ;         }     }     @OnMessage     public void pongMessage(Session session, PongMessage message) {          ByteBuffer buffer = message.getApplicationData() ;         logger.debug("接受到Pong帧【这是由浏览器发送】:" + buffer.toString());     }     @OnMessage     public void onMessage(Session session, AbstractMessage message) {          if (message instanceof PingMessage) {              logger.debug("这里是ping消息");             return ;         }         if (message instanceof PersonMessage) {              PersonMessage personMessage = (PersonMessage) message ;             if (this.username.equals(personMessage.getToName())) {                  logger.info("【{ }】收到消息:{ }", this.username, personMessage.getContent());             } else {                  UserSession userSession = sessions.get(personMessage.getToName()) ;                 if (userSession != null) {                      try {                          userSession.getSession().getAsyncRemote().sendText(new ObjectMapper().writeValueAsString(message)) ;                     } catch (JsonProcessingException e) {                          e.printStackTrace();                     }                 }             }             return ;         }         if (message instanceof SystemMessage) {              logger.info("接受到消息类型为【系统消息】") ;              return ;         }     }     @OnError     public void onError(Session session, Throwable error) {          logger.error(error.getMessage()) ;     } } 

 WsMessageEncoder.java类

该类的主要作用是,当发送的使用t实时消消息是对象时,该如何转换

public class WsMessageEncoder implements Encoder.Text<AbstractMessage> {      private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;     @Override     public void init(EndpointConfig endpointConfig) {      }     @Override     public void destroy() {      }     @Override     public String encode(AbstractMessage tm) throws EncodeException {          String message = null ;         try {              message = new ObjectMapper().writeValueAsString(tm);         } catch (JsonProcessingException e) {              logger.error("JSON处理错误:{ }",现即息 e) ;         }         return message;     } } 

 WsMessageDecoder.java类

该类的云服务器作用是,当接收到消息时如何转换成对象。使用t实时消

public class WsMessageDecoder implements  Decoder.Text<AbstractMessage> {      private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;     private static Set<String> msgTypes = new HashSet<>() ;     static {          msgTypes.add(MessageType.PING.getType()) ;         msgTypes.add(MessageType.SYSTEM.getType()) ;         msgTypes.add(MessageType.PERSON.getType()) ;     }     @Override     @SuppressWarnings("unchecked")     public AbstractMessage decode(String s) throws DecodeException {          AbstractMessage message = null ;         try {              ObjectMapper mapper = new ObjectMapper() ;             Map<String,现即息String> map = mapper.readValue(s, Map.class) ;             String type = map.get("type") ;             switch(type) {                  case "0000":                     message = mapper.readValue(s, SystemMessage.class) ;                     break;                 case "0001":                     message = mapper.readValue(s, PingMessage.class) ;                     break;                 case "2001":                     message = mapper.readValue(s, PersonMessage.class) ;                     break;             }         } catch (JsonProcessingException e) {              logger.error("JSON处理错误:{ }", e) ;         }         return message ;     }     // 该方法判断消息是否可以被解码(转换)     @Override     @SuppressWarnings("unchecked")     public boolean willDecode(String s) {          Map<String, String> map = new HashMap<>() ;         try {              map = new ObjectMapper().readValue(s, Map.class);         } catch (JsonProcessingException e) {              e.printStackTrace();         }         logger.debug("检查消息:【" + s + "】是否可以解码") ;         String type = map.get("type") ;         if (StringUtils.isEmpty(type) || !msgTypes.contains(type)) {              return false ;         }         return true ;     }     @Override     public void init(EndpointConfig endpointConfig) {      }     @Override     public void destroy() {      } } 

 MessageConfigurator.java类

该类的作用是配置服务端点,比如配置握手信息

public class MessageConfigurator extends ServerEndpointConfig.Configurator {      private static Logger logger = LoggerFactory.getLogger(MessageConfigurator.class) ;     @Override     public void modifyHandshake(ServerEndpointConfig sec,使用t实时消 HandshakeRequest request, HandshakeResponse response) {          logger.debug("握手请求头信息:" + request.getHeaders());         logger.debug("握手响应头信息:" + response.getHeaders());         super.modifyHandshake(sec, request, response);     }    } 

 WebSocke配置类

@Configuration public class WebSocketConfig {      @Bean     public ServerEndpointExporter serverEndpointExporter (){            return new ServerEndpointExporter();       }   } 

 当以jar包形式运行时需要配置该bean,暴露我们配置的现即息@ServerEndpoint;当我们以war独立tomcat运行时不能配置该bean。

前端页面

<!doctype html> <html>  <head>   <meta charset="UTF-8">   <meta name="Author" content="">   <meta name="Keywords" content="">   <meta name="Description" content="">   <script src="g-messages.js?使用t实时消v=1"></script>   <title>WebSocket</title>   <style type="text/css">   </style>   <script>     let gm = null ;     let username = null ;     function ListenerMsg({ url, protocols = [gmsg], options = { }}) {          if (!url){               throw new Error("未知服务地址") ;         }         gm = new window.__GM({              url: url,             protocols: protocols         }) ;         gm.open(options) ;     }     ListenerMsg.init = (user) => {          if (!user) {              alert("未知的当前登录人") ;             return ;         }         let url = `ws://localhost:8080/message/${ user}` ;         let msg = document.querySelector("#msg")         ListenerMsg({ url, options: {              onmessage (e) {                  let data = JSON.parse(e.data) ;                 let li = document.createElement("li") ;                 li.innerHTML = "【" + data.fromName + "】对你说:" + data.content ;                 msg.appendChild(li) ;             }         }}) ;     }     function enter() {          username = document.querySelector("#nick").value ;         ListenerMsg.init(username) ;         document.querySelector("#chat").style.display = "block" ;         document.querySelector("#enter").style.display = "none" ;         document.querySelector("#cu").innerText = username ;     }     function send() {          let a = document.querySelector("#toname") ;         let b = document.querySelector("#content") ;         let toName = a.value ;         let content = b.value ;         gm.sendMessage({ type: "2001", content, fromName: username, toName}) ;         a.value =  ;         b.value =  ;     }   </script>  </head>  <body>     <div id="enter">         <input id="nick"/><button type="button" onclick="enter()">进入</button>     </div>     <hr/>     <div id="chat" style="display:none;">         当前用户:<b id="cu"></b><br/>         用户:<input id="toname" name="toname"/><br/><br/>         内容:<textarea id="content" rows="3" cols="22"></textarea><br/>         <button type="button" onclick="send()">发送</button>     </div>     <div>         <ul id="msg">         </ul>     </div>  </body> </html> 

到此所有的亿华云计算代码完毕,接下来测试

测试

打开两个标签页,现即息以不同的使用t实时消用户进入。

输入对方用户名发送消息

成功了,现即息简单的使用t实时消websocket。我们生产环境还就这么完的,8g内存跑了6w的用户。

完毕!!!

IT科技
上一篇:付款完成后,您只需耐心等待,如果您注册成功,系统会提示您。这里需要注意的是,域名是一个即时产品,只有在最终付款成功时才能预订,注册成功后不能更改。
下一篇:2、根据用户基础选择访问提供程序。由于互联问题的存在,接入商的选择也非常重要,如果用户群主要在联通,尽量选择联通接入较好的接入商,如果用户群主要在电信,那么选择电信接入较好的接入商。如果用户组位于国家/地区,则选择更好的访问提供程序进行交互。