美文网首页
SpringBoot集成websocket(Spring方式)

SpringBoot集成websocket(Spring方式)

作者: 架构师与哈苏 | 来源:发表于2020-07-16 11:12 被阅读0次

SpringWebSocketConfig配置

package com.meeno.chemical.socket.task.config;

import com.meeno.chemical.socket.task.handler.TaskProgressWebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * @description: SpringBoot WebSocket config
 * @author: Wzq
 * @create: 2020-07-16 10:50
 */
@Configuration
@EnableWebSocket
public class SpringWebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册自定义消息处理,消息路径为`/ws/taskProgress`
        registry.addHandler(new TaskProgressWebSocketHandler(),"/ws/taskProgress").setAllowedOrigins("*");
    }
}

TaskProgressWebSocketHandler

package com.meeno.chemical.socket.task.handler;

import org.springframework.http.HttpHeaders;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.standard.StandardWebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: 任务进度WebSocketHandler
 * @author: Wzq
 * @create: 2020-07-16 10:51
 */
public class TaskProgressWebSocketHandler extends TextWebSocketHandler {

    /**
     *  concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象。
     */
    public final static ConcurrentHashMap<String, WebSocketSession> webSocketMap = new ConcurrentHashMap<String, WebSocketSession>();

    /**
     * 处理消息
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        //在这里自定义消息处理...
        TextMessage textMessage = new TextMessage("${message body}");
        session.sendMessage(textMessage);
    }

    /**
     * 连接建立后
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        StandardWebSocketSession standardWebSocketSession = (StandardWebSocketSession) session;
        List<String> taskNameList = standardWebSocketSession.getNativeSession().getRequestParameterMap().get("taskName");
        String taskName = taskNameList.get(0);
        //保存所有会话
        webSocketMap.put(taskName,session);
    }

    /**
     * 连接关闭后
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {

        if(!webSocketMap.isEmpty()){
            // remove session after closed
            WebSocketSession webSocketSession = webSocketMap.get(session.getId());
            if(webSocketSession != null){
                webSocketMap.remove(session.getId());
            }
        }
    }


    /**
     * 发送消息给所有人
     * @param content
     */
    public static void sendMessageAll(String content){
        webSocketMap.forEach((taskNameKey,session) ->{
            TextMessage textMessage = new TextMessage(content);
            try {
                session.sendMessage(textMessage);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 发送消息给指定某个任务
     * @param taskName
     * @param content
     */
    public static void sendMessage(String taskName,String content){
        webSocketMap.forEach((taskNameKey,session) ->{
            if(taskName.equals(taskNameKey)){
                TextMessage textMessage = new TextMessage(content);
                try {
                    session.sendMessage(textMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}



问题

启用是会和定时任务冲突
需要配置
ScheduledConfig.java

package com.meeno.chemical.common.scheduling.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * @description: 任务config
 * @author: Wzq
 * @create: 2020-07-16 11:05
 */
@Configuration
public class ScheduledConfig {

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }

}


相关文章

网友评论

      本文标题:SpringBoot集成websocket(Spring方式)

      本文链接:https://www.haomeiwen.com/subject/boqphktx.html