美文网首页
mina整合spring ,服务端反向发送消息到客户端 完整实例

mina整合spring ,服务端反向发送消息到客户端 完整实例

作者: 安易学车 | 来源:发表于2018-03-13 09:49 被阅读0次

之前的项目需要用到mina,实现的功能主要是:服务端主动发送消息到客户端,这个的服务端为外网的tomcat,客户端为内网的tomcat,由于无法知道内网tomcat 的地址,也就不能直接通过http的方式发送信息回来,最后想来想去用mina实现了这个功能。

当然,我这里的服务端是整合的了spring 的,也可以直接把服务端独立出来,不整合spring,这个都一样,区别不大。

代码和配置如下:

---------------------------

1,jar包,我这里使用的是spring4.0.5,mina2.0.7

maven部分文件如下,这个包会自动也依赖进来mina-filter-ssl-1.1.7.jar

[java] view plain copy

[java] view plain copy

                  

            org.springframework  

            spring-context-support  

            ${springframework-version}  

          

          

            org.springframework  

            spring-jdbc  

            ${springframework-version}  

          

          

            org.springframework  

            spring-orm  

            ${springframework-version}  

          

          

            org.springframework  

            spring-aop  

            ${springframework-version}  

          

          

          

            org.springframework  

            spring-web  

            ${springframework-version}  

              

                  

                    commons-logging  

                    commons-logging  

          

            org.springframework  

            spring-webmvc  

            ${springframework-version}  

          

            org.aspectj  

            aspectjrt  

            ${aspectj-version}  

          

            org.aspectj  

            aspectjweaver  

            ${aspectj-version}  

          

            org.apache.mina  

            mina-core  

2.0.7  

          

            org.apache.mina  

            mina-integration-spring  

1.1.7  

          

            org.apache.mina  

            mina-integration-beans  

2.0.8  

2,spring-ztc_app-mina.xml (spring与mina 的配置文件)

spring与mina 的配置文件,需要导入到spring 的总配置文件中,或者加入到web.xml的spring监听扫描中

[java] view plain copy

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

xsi:schemaLocation="http://www.springframework.org/schema/beans   

http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  

  

  

              

  

  

                -->  

value="org.apache.mina.integration.beans.InetSocketAddressEditor">  

  

init-method="bind" destroy-method="unbind">  

  

  

  

  

          

                /> -->  

  

  

class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">  

  

              

  

  

3,mina服务端业务处理类

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.service.IoHandlerAdapter;  

import org.apache.mina.core.session.IdleStatus;  

import org.apache.mina.core.session.IoSession;  

/**

 * @Description: mina服务端业务处理类

 * @author whl

 * @date 2014-9-30 下午12:36:28

 *

 */  

public class ServerHandler extends IoHandlerAdapter {  

private final static Log log = LogFactory.getLog(ServerHandler.class);  

public ServerHandler() {  

// TODO Auto-generated constructor stub  

     }  

@Override  

public void exceptionCaught(IoSession session, Throwable cause)  

throws Exception {  

     }  

@Override  

public void messageReceived(IoSession session, Object message)  

throws Exception {  

log.debug("服务端收到信息-------------");  

//获取客户端发过来的key  

          String key = message.toString();  

System.out.println("message :"+message.toString());  

String carPark_id = key.substring(key.indexOf("=") + 1);  

System.out.println("carPark_id :"+carPark_id);  

//保存客户端的会话session  

          SessionMap sessionMap = SessionMap.newInstance();  

          sessionMap.addSession(carPark_id, session);  

     }  

@Override  

public void messageSent(IoSession session, Object message) throws Exception {  

log.debug("------------服务端发消息到客户端---");  

     }  

@Override  

public void sessionClosed(IoSession session) throws Exception {  

// TODO Auto-generated method stub  

log.debug("远程session关闭了一个..." + session.getRemoteAddress().toString());  

     }  

@Override  

public void sessionCreated(IoSession session) throws Exception {  

log.debug(session.getRemoteAddress().toString() +"----------------------create");  

     }  

@Override  

public void sessionIdle(IoSession session, IdleStatus status)  

throws Exception {  

log.debug(session.getServiceAddress() +"IDS");  

     }  

@Override  

public void sessionOpened(IoSession session) throws Exception {  

log.debug("连接打开:"+session.getLocalAddress());  

     }  

    }  

4,服务端缓存客户端socket连接的单例类

[html] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import java.util.HashMap;  

import java.util.Map;  

import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.session.IoSession;  

/**  

 * @Description: 单例工具类,保存所有mina客户端连接  

 * @author whl  

 * @date 2014-9-29 上午10:09:15  

 *  

 */  

public class SessionMap {  

private final static Loglog = LogFactory.getLog(SessionMap.class);  

private static SessionMapsessionMap = null;  

private Mapmap = new HashMap();  

    //构造私有化 单例  

    private SessionMap(){}  

    /**  

     * @Description: 获取唯一实例  

     * @author whl  

     * @date 2014-9-29 下午1:29:33  

     */  

    public static SessionMap newInstance(){  

        log.debug("SessionMap单例获取---");  

if(sessionMap == null){  

sessionMap = new SessionMap();  

        }  

        return sessionMap;  

    }  

    /**  

     * @Description: 保存session会话  

     * @author whl  

     * @date 2014-9-29 下午1:31:05  

     */  

    public void addSession(String key, IoSession session){  

log.debug("保存会话到SessionMap单例---key=" + key);  

        this.map.put(key, session);  

    }  

    /**  

     * @Description: 根据key查找缓存的session  

     * @author whl  

     * @date 2014-9-29 下午1:31:55  

     */  

    public IoSession getSession(String key){  

log.debug("获取会话从SessionMap单例---key=" + key);  

        return this.map.get(key);  

    }  

    /**  

     * @Description: 发送消息到客户端  

     * @author whl  

     * @date 2014-9-29 下午1:57:51  

     */  

    public void sendMessage(String[] keys, Object message){  

        for(String key : keys){  

IoSessionsession = getSession(key);  

log.debug("反向发送消息到客户端Session---key=" + key + "----------消息=" + message);  

if(session == null){  

                return;  

            }  

            session.write(message);  

        }  

    }  

}  

5,编码解码器,

HCoderFactory.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import java.nio.charset.Charset;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.ProtocolCodecFactory;  

import org.apache.mina.filter.codec.ProtocolDecoder;  

import org.apache.mina.filter.codec.ProtocolEncoder;  

/**

 * @Description: 编码和解码器工厂类.

 * @author whl

 * @date 2014-9-30 下午12:34:59

 *

 */  

public class HCoderFactory implements ProtocolCodecFactory {  

private final HEncoder encoder;  

private final HDecoder decoder;  

public HCoderFactory() {  

//this(Charset.defaultCharset());  

this(Charset.forName("UTF-8"));  

    }  

public HCoderFactory(Charset charSet) {  

this.encoder = new HEncoder(charSet);  

this.decoder = new HDecoder(charSet);  

    }  

@Override  

public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {  

return decoder;  

    }  

@Override  

public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {  

return encoder;  

    }  

}  

HDecoder.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import java.nio.charset.Charset;  

import java.nio.charset.CharsetDecoder;  

import org.apache.mina.core.buffer.IoBuffer;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.CumulativeProtocolDecoder;  

import org.apache.mina.filter.codec.ProtocolDecoderOutput;  

/**

 * @Description: 解码工具类

 * @author whl

 * @date 2014-9-30 下午12:35:22

 *

 */  

public class HDecoder extends CumulativeProtocolDecoder {  

private final Charset charset;  

public HDecoder(Charset charset) {  

this.charset = charset;  

    }  

public boolean doDecode(IoSession session, IoBuffer in,  

ProtocolDecoderOutput out)throws Exception {  

//System.out.println("-------doDecode----------");  

        CharsetDecoder cd = charset.newDecoder();  

        String mes = in.getString(cd);  

        out.write(mes);  

return true;  

/*

        if (in.remaining() > 4) {// 有数据时,读取字节判断消息长度

            in.mark();// 标记当前位置,以便reset

            int size = in.getInt();

            // 如果消息内容不够,则重置,相当于不读取size

            if (size > in.remaining()) {

                in.reset();

                return false;// 接收新数据,以拼凑成完整数据

            } else if (size != 0 && (size - 4 >= 0)) {

                byte[] bytes = new byte[size - 4];

                //int protocol = in.getInt();

                // 拿到客户端发过来的数据组装成基础包写出去

                in.get(bytes, 0, size - 4);

                //in.get(bytes, size - 4, size);

                PackageBeanFactory beanFactory = (PackageBeanFactory) session

                        .getAttribute(ServerHandler.BEAN_FACTORY);

                //out.write(beanFactory.getPackage(protocol, size, bytes));

                String mes = in.getString(cd);

                out.write(mes);

                // 如果读取内容后还粘了包,就让父类再给读取进行下次解析

                if (in.remaining() > 0) {

                    return true;

                }

            }

        }

        return false;// 处理成功,让父类进行接收下个包

*/    

    }  

}  

HEncoder.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import java.nio.charset.Charset;  

import java.nio.charset.CharsetDecoder;  

import java.nio.charset.CharsetEncoder;  

import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.buffer.IoBuffer;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.ProtocolEncoder;  

import org.apache.mina.filter.codec.ProtocolEncoderOutput;  

/**

 * @Description: 编码工具类

 * @author whl

 * @date 2014-9-30 下午12:35:35

 *

 */  

public class HEncoder implements ProtocolEncoder {  

private final static Log log = LogFactory.getLog(HEncoder.class);  

private final Charset charset;  

public HEncoder(Charset charset) {  

this.charset = charset;  

    }  

@Override  

public void encode(IoSession session, Object message,  

ProtocolEncoderOutput out)throws Exception {  

        CharsetEncoder ce = charset.newEncoder();  

        String mes = (String) message;  

IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);  

        buffer.putString(mes,ce);  

        buffer.flip();  

        out.write(buffer);  

/*System.out.println("---------encode-------------");

        String mes = (String) message;

        byte[] data = mes.getBytes("UTF-8");

        IoBuffer buffer = IoBuffer.allocate(data.length + 4);

        buffer.putInt(data.length);

        buffer.put(data);

        buffer.flip();

        out.write(buffer);

        out.flush();*/  

    }  

@Override  

public void dispose(IoSession session) throws Exception {  

log.info("Dispose called,session is " + session);  

    }  

}  

6,客户端程序

ClentMain.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import java.net.InetSocketAddress;  

import java.text.SimpleDateFormat;  

import java.util.Date;  

import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.filterchain.IoFilterAdapter;  

import org.apache.mina.core.future.ConnectFuture;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.ProtocolCodecFilter;  

import org.apache.mina.filter.logging.LoggingFilter;  

import org.apache.mina.transport.socket.nio.NioSocketConnector;  

/**

 * @Description: mina客户端,包含断线重连机制,空闲重连机制

 * @author whl

 * @date 2014-11-2

 */  

public class ClentMain extends Thread{  

private final static Log log = LogFactory.getLog(ClentMain.class);  

@Override  

public void run() {  

//ip  

String host ="192.168.0.38";  

//端口  

int port = 6007;  

//停车场id  

final String carPark_id = "1";  

// 创建客户端连接器.  

final NioSocketConnector connector = new NioSocketConnector();  

//设置连接超时    

connector.setConnectTimeoutMillis(30000);   

// 设置默认访问地址    

connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));  

//将IoSession的主键属性注入线程映射表MDC中  

//connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());    

//日志过滤器  

connector.getFilterChain().addLast("logger", new LoggingFilter());  

// 设置编码过滤器  

connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new HCoderFactory()));          

//添加处理器    

connector.setHandler(new ClintHandler());  

// 设置接收缓冲区的大小    

connector.getSessionConfig().setReceiveBufferSize(10240);  

// 设置输出缓冲区的大小    

connector.getSessionConfig().setSendBufferSize(10240);  

/**

         * 空闲重连的机制,根据需要选择相应的配置

         */  

// 读写都空闲时间:30秒    

//connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);   

// 读(接收通道)空闲时间:40秒   

//connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40);  

// 写(发送通道)空闲时间:50秒   

//connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50);   

//断线重连回调拦截器    

connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {    

@Override    

public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {    

for(;;){    

try{    

Thread.sleep(3000);   

                        ConnectFuture future = connector.connect();    

future.awaitUninterruptibly();// 等待连接创建成功    

IoSession session = future.getSession();// 获取会话    

session.write("key="+carPark_id);  

if(session.isConnected()){    

log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");    

//System.out.println("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  

break;    

                        }    

}catch(Exception ex){    

log.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());    

//System.out.println("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());    

                    }    

                }    

            }    

        });  

//开始连接  

for (;;) {    

try {    

                ConnectFuture future = connector.connect();    

// 等待连接创建成功    

                future.awaitUninterruptibly();    

// 获取会话    

                IoSession session = future.getSession();    

//发送消息  

session.write("key=" + carPark_id);  

log.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));    

break;    

}catch (Exception e) {    

//System.out.println("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage());    

log.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);    

// 连接失败后,重连10次,间隔30s    

try {  

Thread.sleep(5000);  

}catch (InterruptedException e1) {  

                    e1.printStackTrace();  

log.error("连接服务端失败后,睡眠5秒发生异常!");  

                }  

            }    

        }  

// cf.getSession().write("quit");//发送消息  

//cf.getSession().close();  

//cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开  

//connector.dispose();  

    }  

}  

ClintHandler.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  

import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.service.IoHandlerAdapter;  

import org.apache.mina.core.session.IdleStatus;  

import org.apache.mina.core.session.IoSession;  

/**

 * @Description: 客户端业务处理类

 * @author whl

 * @date 2014-11-2

 */  

public class ClintHandler extends IoHandlerAdapter {  

private final static Log log = LogFactory.getLog(ClintHandler.class);  

/**

     * 写处理服务端推送的信息的逻辑

     */  

@Override  

public void messageReceived(IoSession session, Object message)  

throws Exception {  

System.out.println("-----服务顿返回的json数据----");  

        String s = message.toString();  

System.out.println("message :" + s);  

System.out.println("message length:" + s.length());  

    }  

@Override    

public void sessionIdle(IoSession session, IdleStatus status) throws Exception {    

log.info("-客户端与服务端连接[空闲] - " + status.toString());    

System.out.println("-客户端与服务端连接[空闲] - " + status.toString());  

if(session != null){    

session.close(true);    

        }    

    }    

}  

7,测试

这里就不写实际的代码了,

1,首先部署web工程启动tomcat,让服务端先运行起来,

2,然后运行客户端

[java] view plain copy

public class ClintTest1 {  

public static void main(String[] args) throws Exception {  

ClentMain mina =new ClentMain();  

        mina.start();  

    }  

}  

3,想要服务端主动发回来信息,还得在服务端的web工程的action中写一个htpp访问的方法,方法中去sessionMap类中缓存的map中查找session,然后发送消息。

代码如下,我用的是springmvc

[java] view plain copy

/**

     * @Description: 发送消息到客户端

     * @author whl

     * @date 2014-9-29 下午1:18:54

     */  

@ResponseBody  

@RequestMapping(value="/sendMessage")  

public String sendMessage(HttpServletRequest request, String[] carPark_id){  

try{  

//  

//获取链接的参数carPark_id  

log.debug("carPark_id[].length--------- " + carPark_id.length);  

for(String id : carPark_id){  

log.debug("carPark_id --------- " + id);  

            }  

//这里用的假数据                                                carPark_id = new String[]{"1"};  

//发送的信息String jsonstr = "123";//反向发送信息log.debug("开始反向发送消息到客户端-------- ");SessionMap sessionMap = SessionMap.newInstance();sessionMap.sendMessage(carPark_id, jsonstr);//返回信息return "发送的信息为" + jsonstr;

}catch (Exception e) {log.error(e);return "出错了。。";}}

4,好了,现在重新发布工程,启动服务端和客户端,然后访问这个链接就可以了,当然springmvc需要自己配置,这也不是这里的重点。

注意:客户端发过来的carPark_id必须与服务端查找的一致,不让就找不到相应的客户端session连接,消息无法发送成功。

相关文章

  • mina整合spring ,服务端反向发送消息到客户端 完整实例

    之前的项目需要用到mina,实现的功能主要是:服务端主动发送消息到客户端,这个的服务端为外网的tomcat,客户端...

  • 基于socket的进程通信

    实现目标实现服务端与客户端的连接实现多个客户端向服务端发送消息,并由服务端将消息发送给每个客户端 涉及的Java类...

  • iOS字符串哈希

    应用场景 客户端向服务端发送消息 服务端收到后向客户端发送应答如果客户端超时时间内没有收到应答 则重发消息消息间需...

  • 初尝Netty(一):Echo通信

    EchoServer与EchoClient编写 客户端发送一个消息给服务端,服务端收到消息后返回一个消息 MySe...

  • python之socket编程

    实例一:最简单的socket例子这个程序,实现了客户端向服务端发送数据,服务端将小写字母变为大写并且返回 实例二:...

  • Netty实现心跳机制

    心跳检测逻辑:服务端启动后,等待客户端连接,客户端连接之后,向服务端发送消息。如果客户端在线服务端必定会收到数据,...

  • Kafka——消息的发送流程

    前言 本文将介绍kafka的一条消息的发送流程,从消息的发送到服务端的存储。上文说到kafak分为客户端与服务端,...

  • Netty笔记之三:Netty实现Socket编程

    netty实现Tcp Socket编程。 demo实现功能客户端向服务端发送消息,服务器接收到消息后向客户端响应。...

  • 服务发现:服务注册中心

    背景 服务的客户端采用客户端服务发现或者服务端服务发现来确定发送请求的实例地址。 问题 客户端服务发现 里的客户端...

  • 前端必备HTTP技能之请求头响应头格式以及请求方法简述

    请求头响应头格式 http协议中,客户端和服务端通过发送纯文本(ASCII)消息的方式进行通信,客户端发送请求(r...

网友评论

      本文标题:mina整合spring ,服务端反向发送消息到客户端 完整实例

      本文链接:https://www.haomeiwen.com/subject/fkiafftx.html