[![star](https://gitee.com/xxssyyyyssxx/websocket-springboot-starter/badge/star.svg?theme=dark)](https://gitee.com/xxssyyyyssxx/websocket-springboot-starter/stargazers)
最近开发中需要实现服务端的推送,经过一段时间的资料查询最终锁定使用websocket来实现。JavaEE本身就支持WebSocket。我们只需要开发一个EndPoint来处理连接、消息等即可。但是WebSocket的session管理是开发中的重中之重和难点,因为你需要知道推送给谁,就需要保存代表其连接的Session。
1.首先设计管理WebSocket的session的接口WebSocketManager。
/**
* 管理websocket的session,可以使用Map
* @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public interface WebSocketManager {
/**
* 在容器中的名字
*/
String WEBSOCKET_MANAGER_NAME = "webSocketManager";
/**
* 根据标识获取websocket session
* @param identifier 标识
* @return WebSocket
*/
WebSocket get(String identifier);
/**
* 放入一个 websocket session
* @param identifier 标识
* @param webSocket websocket
*/
void put(String identifier , WebSocket webSocket);
/**
* 删除
* @param identifier 标识
*/
void remove(String identifier);
/**
* 获取当前机器上的保存的WebSocket
* @return WebSocket Map
*/
Map localWebSocketMap();
/**
* 统计所有在线人数
* @return 所有在线人数
*/
default int size(){
return localWebSocketMap().size();
}
/**
* 给某人发送消息
* @param identifier 标识
* @param message 消息
*/
void sendMessage(String identifier, String message);
/**
* 广播
* @param message 消息
*/
void broadcast(String message);
/**
* 修改当前的状态
* @param identifier 标识
* @param status 状态
*/
void changeStatus(String identifier , int status);
}
其中,identifier是一个人的标识,发送消息就以此为根据。本类定义了本地管理WebSocket、发送消息的一些方法,WebSocket是一个实体类,保存有标识和用于会话的session。
/**
* @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class WebSocket implements Serializable{
public static final int STATUS_AVAILABLE = 0;
public static final int STATUS_UNAVAILABLE = 1;
private String identifier;
private Session session;
private int status;
...
}
如果是单机管理websocket,可以使用一个Map来管理session。实现类如下:
/**
* @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class MemWebSocketManager implements WebSocketManager {
/**
* 因为全局只有一个 WebSocketManager ,所以才敢定义为非static
*/
private final Map connections = new ConcurrentHashMap<>(100);
@Override
public WebSocket get(String identifier) {
return connections.get(identifier);
}
@Override
public void put(String identifier, WebSocket webSocket) {
connections.put(identifier , webSocket);
}
@Override
public void remove(String identifier) {
connections.remove(identifier);
}
@Override
public Map localWebSocketMap() {
return connections;
}
@Override
public void sendMessage(String identifier, String message) {
WebSocket webSocket = get(identifier);
if(null == webSocket){throw new RuntimeException("identifier 不存在");}
if(WebSocket.STATUS_AVAILABLE != webSocket.getStatus()){
return;
}
WebSocketUtil.sendMessage(webSocket.getSession() , message);
}
@Override
public void broadcast(String message) {
localWebSocketMap().values().forEach(
webSocket -> WebSocketUtil.sendMessage(
webSocket.getSession() , message));
}
/**
* 修改当前的状态
* @param identifier 标识
* @param status 状态
*/
@Override
public void changeStatus(String identifier , int status) {
WebSocket socket = get(identifier);
if(null == socket){return;}
socket.setStatus(status);
}
}
将此类注入到容器中。
@Bean(WebSocketManager.WEBSOCKET_MANAGER_NAME)
public WebSocketManager webSocketManager(){
return new MemWebSocketManager();
}
然后开发一个Endpoint,实现websocket的连接。
/**
* NOTE: Nginx反向代理要支持WebSocket,需要配置几个header,否则连接的时候就报404
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 3600s; //这个时间不长的话就容易断开连接
* @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
*/
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ServerEndpoint(value ="/websocket/connect/{identifier}")
public class WebSocketEndpoint {
private static final int CODE_BEATHEART = 8;
/**
* 路径标识:目前使用token来代表
*/
private static final String IDENTIFIER = "identifier";
private static final Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);
/// 无法通过这种方式注入组件
/*@Autowired
private WebSocketManager websocketManager;*/
public WebSocketEndpoint() {
}
@OnOpen
public void onOpen(Session session, @PathParam(IDENTIFIER) String identifier) {
try {
logger.info("*** WebSocket opened from sessionId " + session.getId() + " , identifier = " + identifier);
if(StrUtil.isBlank(identifier)){
return;
}
WebSocket socket = new WebSocket();
socket.setIdentifier(identifier);
socket.setSession(session);
socket.setStatus(WebSocket.STATUS_AVAILABLE);
//像刷新这种,id一样,session不一样,后面的覆盖前面的
WebSocketManager websocketManager = getWebSocketManager();
websocketManager.put(identifier , socket);
} catch (Exception e) {
logger.error(e.getMessage() , e);
}
}
@OnClose
public void onClose(Session session , @PathParam(IDENTIFIER) String identifier) {
logger.info("*** WebSocket closed from sessionId " + session.getId() + " , identifier = " + identifier);
getWebSocketManager().remove(identifier);
}
@OnMessage
public void onMessage(String message, Session session , @PathParam(IDENTIFIER) String identifier) {
logger.info("接收到的数据为:" + message + " from sessionId " + session.getId() + " , identifier = " + identifier);
//说明不是json,处理不了
if (!JsonUtil.isJsonObject(message)) {return;}
JSONObject object = new JSONObject(message);
boolean containsKey = object.containsKey("code");
if(!containsKey){
return;
}
Integer code = object.getInteger("code");
if(CODE_BEATHEART == code){
Map map = new HashMap<>(1);
map.put("timestamp" , System.currentTimeMillis());
Message msg = new Message(CODE_BEATHEART , map);
WebSocketUtil.sendMessage(session , JsonUtil.serializeJavaBean(msg));
}
}
@OnError
public void onError(Throwable t , @PathParam(IDENTIFIER) String identifier){
logger.info("发生异常:, identifier = " + identifier);
logger.error(t.getMessage() , t);
getWebSocketManager().remove(identifier);
}
private WebSocketManager getWebSocketManager() {
return SpringContextHolder.getBean(WebSocketManager.WEBSOCKET_MANAGER_NAME , WebSocketManager.class);
}
}
NOTE:
0. 网页端使用连接 ws://xxxxxxx/websocket/connect/{token}即可。
1.如果是Nginx反向代理的,需要在Nginx代理中设置几个header。如果是SLB,需要业务保障型。
2.如果是独立的Tomcat应用,它就会自动扫描标注了@ServerEndPoint的类,如果是内置容器,就不会扫描,需要注入一个ServerEndpointExporter。目前还不知道为什么,所以针对这两种情况,我们需要不同的配置,可以使用 ConditionalOnProperty 轻松应对。
/**
* webSocket配置
* @author xiongshiyan
*/
@Configuration
public class WebSocketConfig {
/**
* @see https://www.cnblogs.com/betterboyz/p/8669879.html
* 首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。
* 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,
* 因为它将由容器自己提供和管理, 否则就会报重复的endpoint错误。
*/
@ConditionalOnProperty(prefix = "server.websocket.exporter" ,
name = "enable" ,havingValue = "true")
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.ServerEndpoint不会成为Spring的一个组件,所以使用@Autowired是不会生效的,所以需要一个全局获取容器的方法。利用ApplicationContextAware可以注入容器,使用一个静态变量保存起来。提供get方法。
/**
* 容器全局存留
* @author xiongshiyan at 2018/8/14 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class SpringContextHolder implements ApplicationContextAware {
private static ApplicationContext applicationContext;
/**
* 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量.
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
SpringContextHolder.applicationContext = applicationContext;
}
/**
* 取得存储在静态变量中的ApplicationContext.
*/
public static ApplicationContext getApplicationContext() {
checkApplicationContext();
return applicationContext;
}
@SuppressWarnings("unchecked")
public static Object getBean(String beanName) {
checkApplicationContext();
return applicationContext.getBean(beanName);
}
/**
* 从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
public static T getBean(String beanName, Class clazz) {
checkApplicationContext();
return applicationContext.getBean(beanName, clazz);
}
/**
* 清除applicationContext静态变量.
*/
public static void cleanApplicationContext() {
applicationContext = null;
}
private static void checkApplicationContext() {
if (applicationContext == null) {
throw new IllegalStateException(
"applicationContext未注入,请配置SpringContextHolder");
}
}
}
4.WebSocketUtil是用于发送消息的工具类,提供以阻塞 / 异步方式发送不同类型消息的能力。
/**
* @author xiongshiyan at 2018/10/11 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class WebSocketUtil {
/**
* 发送消息
*/
public static boolean sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(message);
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 异步发送消息
*/
public static boolean sendMessageAsync(Session session, String message) {
Future voidFuture = session.getAsyncRemote().sendText(message);
return voidFuture.isDone();
}
/**
* 发送字节消息
*/
public static boolean sendBytes(Session session, byte[] bytes) {
try {
session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 异步发送字节
*/
public static boolean sendBytesAsync(Session session, byte[] bytes) {
Future voidFuture = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(bytes));
return voidFuture.isDone();
}
/**
* 发送对象消息
*/
public static boolean sendObject(Session session, Object o) {
try {
session.getBasicRemote().sendObject(o);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 异步发送对象
*/
public static boolean sendObjectAsync(Session session, Object o) {
Future voidFuture = session.getAsyncRemote().sendObject(o);
return voidFuture.isDone();
}
}
5.WebSocket开发需要保持一个心跳,可以通过PING/PONG消息的方式。
至此,单机就可以实现WebSocket的管理和消息传送了。
一些测试方法:
/**
* @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
*/
@RestController
@RequestMapping("/websocket/test")
public class WebsocketTestController {
@Autowired
private WebSocketManager webSocketManager;
@GetMapping("/get/send")
public String sendGet(@RequestParam("token") String token ,
@RequestParam("message") String message) throws Exception{
webSocketManager.sendMessage(token , message);
return ("发送成功");
}
@PostMapping("post/send")
public String sendPost(@RequestParam("token") String token , @RequestBody String body) throws Exception{
webSocketManager.sendMessage(token , body);
return ("发送成功");
}
@PostMapping("broadcast")
public String broadcast(@RequestBody String body) throws Exception{
webSocketManager.broadcast(body);
return ("广播成功");
}
@GetMapping("clients")
public String getClientsNum(){
return ("目前在线:" + webSocketManager.size());
}
}
测试的网页,可以自动重连:
token1
Welcome token1
给出几个测试websocket的链接地址: