SpringBoot使用WebSocket实现服务端推送—单机实现(1)

[![star](https://gitee.com/xxssyyyyssxx/websocket-springboot-starter/badge/star.svg?theme=dark)](https://gitee.com/xxssyyyyssxx/websocket-springboot-starter/stargazers)

最近开发中需要实现服务端的推送,经过一段时间的资料查询最终锁定使用websocket来实现。JavaEE本身就支持WebSocket。我们只需要开发一个EndPoint来处理连接、消息等即可。但是WebSocket的session管理是开发中的重中之重和难点,因为你需要知道推送给谁,就需要保存代表其连接的Session。

1.首先设计管理WebSocket的session的接口WebSocketManager。

/**
 * 管理websocket的session,可以使用Map
 * @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public interface WebSocketManager {
    /**
     * 在容器中的名字
     */
    String WEBSOCKET_MANAGER_NAME  = "webSocketManager";
    /**
     * 根据标识获取websocket session
     * @param identifier 标识
     * @return WebSocket
     */
    WebSocket get(String identifier);

    /**
     * 放入一个 websocket session
     * @param identifier 标识
     * @param webSocket websocket
     */
    void put(String identifier , WebSocket webSocket);

    /**
     * 删除
     * @param identifier 标识
     */
    void remove(String identifier);

    /**
     * 获取当前机器上的保存的WebSocket
     * @return WebSocket Map
     */
    Map localWebSocketMap();

    /**
     * 统计所有在线人数
     * @return 所有在线人数
     */
    default int size(){
        return localWebSocketMap().size();
    }

    /**
     * 给某人发送消息
     * @param identifier 标识
     * @param message 消息
     */
    void sendMessage(String identifier, String message);

    /**
     * 广播
     * @param message 消息
     */
    void broadcast(String message);

    /**
     * 修改当前的状态
     * @param identifier 标识
     * @param status 状态
     */
    void changeStatus(String identifier , int status);
}

其中,identifier是一个人的标识,发送消息就以此为根据。本类定义了本地管理WebSocket、发送消息的一些方法,WebSocket是一个实体类,保存有标识和用于会话的session。

/**
 * @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class WebSocket implements Serializable{
    public static final int STATUS_AVAILABLE       = 0;
    public static final int STATUS_UNAVAILABLE     = 1;
    private String identifier;
    private Session session;
    private int status;
    ...
}

如果是单机管理websocket,可以使用一个Map来管理session。实现类如下:

/**
 * @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class MemWebSocketManager implements WebSocketManager {
    /**
     * 因为全局只有一个 WebSocketManager ,所以才敢定义为非static
     */
    private final Map connections = new ConcurrentHashMap<>(100);

    @Override
    public WebSocket get(String identifier) {
        return connections.get(identifier);
    }

    @Override
    public void put(String identifier, WebSocket webSocket) {
        connections.put(identifier , webSocket);
    }

    @Override
    public void remove(String identifier) {
        connections.remove(identifier);
    }


    @Override
    public Map localWebSocketMap() {
        return connections;
    }

    @Override
    public void sendMessage(String identifier, String message) {
        WebSocket webSocket = get(identifier);
        if(null == webSocket){throw new RuntimeException("identifier 不存在");}

        if(WebSocket.STATUS_AVAILABLE != webSocket.getStatus()){
            return;
        }

        WebSocketUtil.sendMessage(webSocket.getSession() , message);
    }

    @Override
    public void broadcast(String message) {
        localWebSocketMap().values().forEach(
                webSocket -> WebSocketUtil.sendMessage(
                        webSocket.getSession() , message));
    }

    /**
     * 修改当前的状态
     * @param identifier 标识
     * @param status 状态
     */
    @Override
    public void changeStatus(String identifier , int status) {
        WebSocket socket = get(identifier);
        if(null == socket){return;}

        socket.setStatus(status);
    }
}

将此类注入到容器中。

@Bean(WebSocketManager.WEBSOCKET_MANAGER_NAME)
public WebSocketManager webSocketManager(){
    return new MemWebSocketManager();
}

然后开发一个Endpoint,实现websocket的连接。

/**
 * NOTE: Nginx反向代理要支持WebSocket,需要配置几个header,否则连接的时候就报404
       proxy_http_version 1.1;
       proxy_set_header Upgrade $http_upgrade;
       proxy_set_header Connection "upgrade";
       proxy_read_timeout 3600s; //这个时间不长的话就容易断开连接
 * @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
 */
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ServerEndpoint(value ="/websocket/connect/{identifier}")
public class WebSocketEndpoint {
    private static final int CODE_BEATHEART  = 8;
    /**
     * 路径标识:目前使用token来代表
     */
    private static final String IDENTIFIER = "identifier";
    private static final Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);

    /// 无法通过这种方式注入组件
    /*@Autowired
    private WebSocketManager websocketManager;*/

    public WebSocketEndpoint() {
    }

    @OnOpen
    public void onOpen(Session session, @PathParam(IDENTIFIER) String identifier) {
        try {
            logger.info("*** WebSocket opened from sessionId " + session.getId() + " , identifier = " + identifier);
            if(StrUtil.isBlank(identifier)){
                return;
            }
            WebSocket socket = new WebSocket();
            socket.setIdentifier(identifier);
            socket.setSession(session);
            socket.setStatus(WebSocket.STATUS_AVAILABLE);
            //像刷新这种,id一样,session不一样,后面的覆盖前面的

            WebSocketManager websocketManager = getWebSocketManager();

            websocketManager.put(identifier , socket);

        } catch (Exception e) {
            logger.error(e.getMessage() , e);
        }
    }

    @OnClose
    public void onClose(Session session , @PathParam(IDENTIFIER) String identifier) {
        logger.info("*** WebSocket closed from sessionId " + session.getId() + " , identifier = " + identifier);
        getWebSocketManager().remove(identifier);
    }

    @OnMessage
    public void onMessage(String message, Session session , @PathParam(IDENTIFIER) String identifier) {
        logger.info("接收到的数据为:" + message + " from sessionId " + session.getId() + " , identifier = " + identifier);
        //说明不是json,处理不了
        if (!JsonUtil.isJsonObject(message)) {return;}

        JSONObject object = new JSONObject(message);
        boolean containsKey = object.containsKey("code");

        if(!containsKey){
            return;
        }

        Integer code = object.getInteger("code");
        if(CODE_BEATHEART == code){
            Map map = new HashMap<>(1);
            map.put("timestamp" , System.currentTimeMillis());
            Message msg = new Message(CODE_BEATHEART , map);
            WebSocketUtil.sendMessage(session , JsonUtil.serializeJavaBean(msg));
        }

    }

    @OnError
    public void onError(Throwable t , @PathParam(IDENTIFIER) String identifier){
        logger.info("发生异常:, identifier = " + identifier);
        logger.error(t.getMessage() , t);
        getWebSocketManager().remove(identifier);
    }

    private WebSocketManager getWebSocketManager() {
        return SpringContextHolder.getBean(WebSocketManager.WEBSOCKET_MANAGER_NAME , WebSocketManager.class);
    }
}

NOTE:

0. 网页端使用连接 ws://xxxxxxx/websocket/connect/{token}即可。

1.如果是Nginx反向代理的,需要在Nginx代理中设置几个header。如果是SLB,需要业务保障型。

2.如果是独立的Tomcat应用,它就会自动扫描标注了@ServerEndPoint的类,如果是内置容器,就不会扫描,需要注入一个ServerEndpointExporter。目前还不知道为什么,所以针对这两种情况,我们需要不同的配置,可以使用 ConditionalOnProperty 轻松应对。

/**
 * webSocket配置
 * @author xiongshiyan
 */
@Configuration
public class WebSocketConfig {
    /**
     * @see https://www.cnblogs.com/betterboyz/p/8669879.html
     * 首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。
     * 要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,
     * 因为它将由容器自己提供和管理, 否则就会报重复的endpoint错误。
     */
    @ConditionalOnProperty(prefix = "server.websocket.exporter" ,
                                name = "enable" ,havingValue = "true")
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.ServerEndpoint不会成为Spring的一个组件,所以使用@Autowired是不会生效的,所以需要一个全局获取容器的方法。利用ApplicationContextAware可以注入容器,使用一个静态变量保存起来。提供get方法。

/**
 * 容器全局存留
 * @author xiongshiyan at 2018/8/14 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class SpringContextHolder implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    /**
     * 实现ApplicationContextAware接口的context注入函数, 将其存入静态变量.
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        SpringContextHolder.applicationContext = applicationContext;
    }

    /**
     * 取得存储在静态变量中的ApplicationContext.
     */
    public static ApplicationContext getApplicationContext() {
        checkApplicationContext();
        return applicationContext;
    }

    @SuppressWarnings("unchecked")
    public static Object getBean(String beanName) {
        checkApplicationContext();
        return applicationContext.getBean(beanName);
    }

    /**
     * 从静态变量ApplicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    public static  T getBean(String beanName, Class clazz) {
        checkApplicationContext();
        return applicationContext.getBean(beanName, clazz);
    }

    /**
     * 清除applicationContext静态变量.
     */
    public static void cleanApplicationContext() {
        applicationContext = null;
    }

    private static void checkApplicationContext() {
        if (applicationContext == null) {
            throw new IllegalStateException(
                    "applicationContext未注入,请配置SpringContextHolder");
        }
    }
}

4.WebSocketUtil是用于发送消息的工具类,提供以阻塞 / 异步方式发送不同类型消息的能力。

/**
 * @author xiongshiyan at 2018/10/11 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class WebSocketUtil {
    /**
     * 发送消息
     */
    public static boolean sendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(message);
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 异步发送消息
     */
    public static boolean sendMessageAsync(Session session, String message) {
        Future voidFuture = session.getAsyncRemote().sendText(message);
        return voidFuture.isDone();
    }

    /**
     * 发送字节消息
     */
    public static boolean sendBytes(Session session, byte[] bytes) {
        try {
            session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 异步发送字节
     */
    public static boolean sendBytesAsync(Session session, byte[] bytes) {
        Future voidFuture = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(bytes));
        return voidFuture.isDone();
    }
    /**
     * 发送对象消息
     */
    public static boolean sendObject(Session session, Object o) {
        try {
            session.getBasicRemote().sendObject(o);
            return true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 异步发送对象
     */
    public static boolean sendObjectAsync(Session session, Object o) {
        Future voidFuture = session.getAsyncRemote().sendObject(o);
        return voidFuture.isDone();
    }
}

5.WebSocket开发需要保持一个心跳,可以通过PING/PONG消息的方式。

至此,单机就可以实现WebSocket的管理和消息传送了。

一些测试方法:

/**
 * @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
 */
@RestController
@RequestMapping("/websocket/test")
public class WebsocketTestController {
    @Autowired
    private WebSocketManager webSocketManager;

    @GetMapping("/get/send")
    public String sendGet(@RequestParam("token") String token ,
                       @RequestParam("message") String message) throws Exception{
        webSocketManager.sendMessage(token , message);
        return ("发送成功");
    }
    @PostMapping("post/send")
    public String sendPost(@RequestParam("token") String token , @RequestBody String body) throws Exception{
        webSocketManager.sendMessage(token , body);
        return ("发送成功");
    }
    @PostMapping("broadcast")
    public String broadcast(@RequestBody String body) throws Exception{
        webSocketManager.broadcast(body);
        return ("广播成功");
    }

    @GetMapping("clients")
    public String getClientsNum(){
        return ("目前在线:" + webSocketManager.size());
    }
}

测试的网页,可以自动重连:




    token1



Welcome token1

给出几个测试websocket的链接地址:

http://www.blue-zero.com/WebSocket/

http://coolaf.com/tool/chattest