期望达成目标:
1.消息稳定可靠
2.支持点对点消息
3.支持一对多消息
4.支持消息广播
5.支持节点扩容
6.支持服务注册发现
针对目标的思考:
1.消息稳定可靠方面:
采用netty为网络框架,实现websocket协议(长连接),如需要持久化消息,可将消息写入数据库,接收端进行消息确认。
2.点对点消息的支持
通过给 channel 绑定身份标识,消息体指定消息类型
3.支持一对多消息
通过拿到用户所属的组 ,将channel 加入ChannelGroup
4.支持广播消息
单机模式,通过 channel 的 map,获取所有的 channel 进行广播
多节点模式,先将消息发送的直接发布订阅的中间件,每台服务收到中间件的广播,将消息发送到当前节点的所有channel
5.节点扩容
采用消息中间件的发布订阅模式,将收到的消息先投递到消息中间件,服务节点通过消费中间件消息,进行当前节点消息的转发
6.支持服务注册发现
采用 springcloud 进行服务的注册与发现(即通过springcloud 服务注册相关的实现,Nacos,eurake等)
具体实现:
1.基于消息code进行业务处理的事件机制,服务启动时会从spring上下文中拿到 CmdProcess 实现,通过 code 进行 事件的分发
public interface CmdProcess {
/**
* 消息接收处理
* @param message 消息体
* @param channel 上下文
* @return 响应消息体(无响应,则返回null)
*/
Message handler(Message message, Channel channel);
/**
* 设置命令码(此处的命令码,需要和消息包对应上)
* @return 命令码
*/
Byte getCmdCode();
}
image.gif
2.连接的安全认证机制(第一次握手执行的事件),提供了简单的实现,通过具体业务定义自己的认证实现
public abstract class AuthProcess {
/**
* 登录事件
* @param username 用户名
* @param password 用户密码
* @return IM用户对象
*/
public abstract ImSession login(String username, String password);
}
image.gif
3.channel 建立连接生命周期的执行的事件(客户端连接成功的建立,客户端连接的断开)
public interface LifeCycleEvent {
/**
* 绑定通道上下文
* @param login
* @param ctx
*/
void bindContext(ImSession login, ChannelHandlerContext ctx);
/**
* 解绑通道上下文
* @param channel
*/
void cleanContext(Channel channel);
}
image.gif
4.多节点部署消息分发接口的预留(通过参数设置,是否采用多节点模式)
public interface ImClusterTopic {
/**
* 发布消息
*/
void publish(ClusterMessage message);
/**
* 订阅消息(采用广播模式)
*/
void consumer();
}
image.gif
5.消息分发的工具类
package com.awy.common.ws.netty.toolkit;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.awy.common.message.api.packets.Message;
import com.awy.common.ws.netty.cluster.ImClusterTopic;
import com.awy.common.ws.netty.context.GlobalContent;
import com.awy.common.ws.netty.context.SessionContext;
import com.awy.common.ws.netty.config.ImConfig;
import com.awy.common.ws.netty.model.ClusterMessage;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Slf4j
public class ImSendUtil {
/**
* 发送给指定用户
* @param userId 用户id
* @param message 消息体
*/
public static void sendUser(String userId, Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.chatMessage(userId,message));
}else {
sendCurrentNodeUser(userId,message);
}
}
/**
* 发送给指定用户列表
* @param userIds 用户id列表
* @param message 消息体
*/
public static void sendUsers(List<String> userIds,Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.chatsMessage(userIds,message));
}else {
sendCurrentNodeUsers(userIds,message);
}
}
/**
* 发送指定群组
* @param groupId 群组id
* @param message 消息体
*/
public static void sendGroup(String groupId,Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.groupMessage(groupId,message));
}else {
sendCurrentNodeGroup(groupId,message);
}
}
/**
* 发送指定群组列表
* @param groupIds 群组列表
* @param message 消息体
*/
public static void sendGroups(List<String> groupIds,Message message){
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.groupsMessage(groupIds,message));
}else {
sendCurrentNodeGroups(groupIds,message);
}
}
/**
* 发送给全部用户
* @param message
*/
public static void sendAll(Message message){
//if cluster
if(isCluster()){
getImClusterTopic().publish(ClusterMessage.noStateMessage(message));
}else {
//if standalone
sendCurrentNodeAllChannel(message);
}
}
/**
* 发送给当前节点指定用户
* @param userId 用户id
* @param message 消息
*/
public static void sendCurrentNodeUser(String userId,Message message){
Channel channel = SessionContext.getChannel(userId);
if(channel != null){
channel.writeAndFlush(getMessage(message));
}
}
/**
* 发送给当前节点指定用户列表
* @param userIds 用户id 列表
* @param message 消息体
*/
public static void sendCurrentNodeUsers(List<String> userIds,Message message){
if(CollUtil.isNotEmpty(userIds)){
if(userIds.size() == 1){
sendCurrentNodeUser(userIds.get(0),message);
}else {
for (String userId : userIds) {
sendCurrentNodeUser(userId,message);
}
}
}
}
/**
* 发送给当前节点指定群组
* @param groupId
* @param message
*/
public static void sendCurrentNodeGroup(String groupId,Message message){
ChannelGroup channelGroup = SessionContext.getChannelGroup(groupId);
if(channelGroup != null){
channelGroup.writeAndFlush(getMessage(message));
}
}
/**
* 发送给当前节点指定群组列表
* @param groupIds 群组id列表
* @param message 消息
*/
public static void sendCurrentNodeGroups(List<String> groupIds, Message message){
if(CollUtil.isNotEmpty(groupIds)){
if(groupIds.size() == 1){
sendCurrentNodeGroup(groupIds.get(0),message);
}else {
for (String groupId : groupIds) {
sendCurrentNodeGroup(groupId,message);
}
}
}
}
/**
* 发送给当前节点所有用户
* @param message 消息
*/
public static void sendCurrentNodeAllChannel(Message message){
for (Map.Entry<String, Channel> entry : SessionContext.getAllChannel().entrySet()){
entry.getValue().writeAndFlush(getMessage(message));
}
}
/**
* 是否多节点
* @return
*/
private static boolean isCluster(){
return ImConfig.getImConfig().getPropertiesConfig().isCluster();
}
/**
* 获取节点推送主题
* @return
*/
private static ImClusterTopic getImClusterTopic(){
return GlobalContent.getInstance().getImClusterTopic();
}
/**
* 获取消息体
* @param message 消息体
* @return webSocket 消息体
*/
public static TextWebSocketFrame getMessage(Message message){
if(message == null){
log.error(">>>>>>>>>>>> message can not be empty ");
return null;
}
String result = JSONUtil.toJsonStr(message);
return new TextWebSocketFrame(result);
}
}
image.gif
6.webSocket服务器的实现
public class WebSocketServer {
/**
* 是否启用ssl
*/
private boolean ssl = false;
//监听端口
private int port;
//ws 前缀
private String websocketPath;
private ServerBootstrap serverBootstrap;
private NioEventLoopGroup boss;
private NioEventLoopGroup work;
private WebSocketServer(){}
public WebSocketServer(int port, String websocketPath, boolean ssl, AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
this.ssl = ssl;
this.port = port;
this.websocketPath = websocketPath;
//设置全局上下文
if(authProcess == null){
authProcess = new SimpleAuthProcess();
}
if( lifeCycleEvent == null){
lifeCycleEvent = new SimpleLifeCycleEvent();
}
GlobalContent globalContent = GlobalContent.getInstance();
globalContent.setAuthProcess(authProcess);
globalContent.setImClusterTopic(imClusterTopic);
globalContent.setLifeCycleEvent(lifeCycleEvent);
//主从 react 模型
serverBootstrap = new ServerBootstrap();
boss = new NioEventLoopGroup(1);
work = new NioEventLoopGroup();
serverBootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(getSslContext(),websocketPath));
}
private SslContext getSslContext(){
SslContext sslCtx = null;
try{
if (this.ssl) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
}
}catch (Exception e){
e.printStackTrace();
}
return sslCtx;
}
public void start(){
try {
Channel ch = serverBootstrap.bind(port).sync().channel();
log.info("Open your web browser and navigate to " +
(ssl ? "https" : "http") + "://127.0.0.1:" + port + "" + websocketPath);
} catch (Exception e){
e.printStackTrace();
}
}
public void stop(){
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
image.gif
7.spring引导类
public class NettyWebSocketStarter {
private Integer port;
private String websocketPath;
/**
* 是否启用ssl
*/
private boolean ssl = false;
private WebSocketServer server;
/**
* 认证处理器
*/
private AuthProcess authProcess;
private LifeCycleEvent lifeCycleEvent;
private ImClusterTopic imClusterTopic;
private NettyWebSocketStarter(){}
public NettyWebSocketStarter(AuthProcess authProcess){
this(authProcess,null,null);
}
public NettyWebSocketStarter(AuthProcess authProcess, LifeCycleEvent lifeCycleEvent, ImClusterTopic imClusterTopic){
this.authProcess = authProcess;
this.lifeCycleEvent = lifeCycleEvent;
this.imClusterTopic = imClusterTopic;
}
@PostConstruct
public void init(){
setAttributes(ImConfig.getImConfig().getPropertiesConfig());
initCmdProcess();
server = new WebSocketServer(port,websocketPath,ssl,authProcess,lifeCycleEvent,imClusterTopic);
server.start();
registerDiscovery(ImConfig.getImConfig().getPropertiesConfig());
}
private void setAttributes(ImPropertiesConfig propertiesConfig){
String packetScanPath = "";
if(propertiesConfig == null){
log.error(">>>>>>>> im config prefix: im.ws ");
log.error(">>>>>>>> im config attributes can not by empty ");
System.exit(0);
}
if(propertiesConfig.isCluster() && this.imClusterTopic == null){
log.error(">>>>>>>> not allowed when Cluster model imClusterTopic is null");
System.exit(0);
}
this.port = propertiesConfig.getPort();
if(this.port == null){
this.port = getPort();
}
this.websocketPath = propertiesConfig.getWebsocketPath();
if(this.websocketPath == null || this.websocketPath.isEmpty()){
this.websocketPath = ImCommonConstant.DEFAULT_WEBSOCKET_PATH;
}
this.ssl = propertiesConfig.isSsl();
packetScanPath = propertiesConfig.getPacketScan();
if(packetScanPath == null || packetScanPath.isEmpty()){
log.error(">>>>>>>> im.ws.packetScan can not by empty ");
System.exit(0);
}
initPacket(packetScanPath);
}
private void registerDiscovery(ImPropertiesConfig propertiesConfig){
if(propertiesConfig.isRegister()){
Map<String, AbstractAutoServiceRegistration> serviceRegistrationMap = getApplicationContext().getBeansOfType(AbstractAutoServiceRegistration.class);
for (Map.Entry<String,AbstractAutoServiceRegistration> registrationEntry : serviceRegistrationMap.entrySet()){
registrationEntry.getValue().start();
}
}
}
private int getPort(){
int defaultPort = 8888;
return getPort(defaultPort);
}
private int getPort(int port){
ServerSocket socket = null;
try{
socket = new ServerSocket(port);
}catch (IOException e){
++port;
return getPort(port);
}finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
log.error("");
}
}
}
return port;
}
private void initCmdProcess(){
try{
List<CmdProcess> list = new ArrayList<>();
String[] beanDefinitionNames = getApplicationContext().getBeanDefinitionNames();
Stream.of(beanDefinitionNames).forEach(beanName -> {
Object bean = getApplicationContext().getBean(beanName);
if(bean instanceof CmdProcess){
list.add((CmdProcess)bean);
}
});
ProcessManager.getInstance().addCmdProcessList(list);
log.info("init im process repository success ! count [" + list.size() + "]");
}catch (Exception e){
log.error("nit im process repository error",e);
System.exit(0);
}
}
private ApplicationContext getApplicationContext(){
return ImConfig.getImConfig().getApplicationContext();
}
private void initPacket(String packetScanPath){
try{
Set<Class<?>> set = ClassUtil.scanPackage(packetScanPath);
Object obj;
List<Message> list = new ArrayList<>();
if(CollectionUtil.isNotEmpty(set)){
for (Class clazz : set) {
obj = ReflectUtil.newInstance(clazz);
if(obj instanceof Message){
list.add((Message) obj);
}
}
}
MessageManager.getInstance().addMessages(list);
log.info("init IM packet repository success ! count [" + list.size() + "]");
}catch (Exception e){
log.error("init packet repository error",e);
System.exit(0);
}
}
@PreDestroy
public void stop(){
server.stop();
}
}
image.gif
说明:当前实现需要依赖spring,有好的建议欢迎大家提出,指正,最后贴出代码地址
github地址: https://github.com/awyFamily/awy-common-all/tree/master/common-ws-netty
网友评论