美文网首页
mina整合spring ,服务端反向发送消息到客户端 完整实例

mina整合spring ,服务端反向发送消息到客户端 完整实例

作者: 安易学车 | 来源:发表于2018-03-13 09:49 被阅读0次

    之前的项目需要用到mina,实现的功能主要是:服务端主动发送消息到客户端,这个的服务端为外网的tomcat,客户端为内网的tomcat,由于无法知道内网tomcat 的地址,也就不能直接通过http的方式发送信息回来,最后想来想去用mina实现了这个功能。

    当然,我这里的服务端是整合的了spring 的,也可以直接把服务端独立出来,不整合spring,这个都一样,区别不大。

    代码和配置如下:

    ---------------------------

    1,jar包,我这里使用的是spring4.0.5,mina2.0.7

    maven部分文件如下,这个包会自动也依赖进来mina-filter-ssl-1.1.7.jar

    [java] view plain copy

    [java] view plain copy

                      

                org.springframework  

                spring-context-support  

                ${springframework-version}  

              

              

                org.springframework  

                spring-jdbc  

                ${springframework-version}  

              

              

                org.springframework  

                spring-orm  

                ${springframework-version}  

              

              

                org.springframework  

                spring-aop  

                ${springframework-version}  

              

              

              

                org.springframework  

                spring-web  

                ${springframework-version}  

                  

                      

                        commons-logging  

                        commons-logging  

              

                org.springframework  

                spring-webmvc  

                ${springframework-version}  

              

                org.aspectj  

                aspectjrt  

                ${aspectj-version}  

              

                org.aspectj  

                aspectjweaver  

                ${aspectj-version}  

              

                org.apache.mina  

                mina-core  

    2.0.7  

              

                org.apache.mina  

                mina-integration-spring  

    1.1.7  

              

                org.apache.mina  

                mina-integration-beans  

    2.0.8  

    2,spring-ztc_app-mina.xml (spring与mina 的配置文件)

    spring与mina 的配置文件,需要导入到spring 的总配置文件中,或者加入到web.xml的spring监听扫描中

    [java] view plain copy

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

    xsi:schemaLocation="http://www.springframework.org/schema/beans   

    http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  

      

      

                  

      

      

                    -->  

    value="org.apache.mina.integration.beans.InetSocketAddressEditor">  

      

    init-method="bind" destroy-method="unbind">  

      

      

      

      

              

                    /> -->  

      

      

    class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">  

      

                  

      

      

    3,mina服务端业务处理类

    [java] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import org.apache.commons.logging.Log;  

    import org.apache.commons.logging.LogFactory;  

    import org.apache.mina.core.service.IoHandlerAdapter;  

    import org.apache.mina.core.session.IdleStatus;  

    import org.apache.mina.core.session.IoSession;  

    /**

     * @Description: mina服务端业务处理类

     * @author whl

     * @date 2014-9-30 下午12:36:28

     *

     */  

    public class ServerHandler extends IoHandlerAdapter {  

    private final static Log log = LogFactory.getLog(ServerHandler.class);  

    public ServerHandler() {  

    // TODO Auto-generated constructor stub  

         }  

    @Override  

    public void exceptionCaught(IoSession session, Throwable cause)  

    throws Exception {  

         }  

    @Override  

    public void messageReceived(IoSession session, Object message)  

    throws Exception {  

    log.debug("服务端收到信息-------------");  

    //获取客户端发过来的key  

              String key = message.toString();  

    System.out.println("message :"+message.toString());  

    String carPark_id = key.substring(key.indexOf("=") + 1);  

    System.out.println("carPark_id :"+carPark_id);  

    //保存客户端的会话session  

              SessionMap sessionMap = SessionMap.newInstance();  

              sessionMap.addSession(carPark_id, session);  

         }  

    @Override  

    public void messageSent(IoSession session, Object message) throws Exception {  

    log.debug("------------服务端发消息到客户端---");  

         }  

    @Override  

    public void sessionClosed(IoSession session) throws Exception {  

    // TODO Auto-generated method stub  

    log.debug("远程session关闭了一个..." + session.getRemoteAddress().toString());  

         }  

    @Override  

    public void sessionCreated(IoSession session) throws Exception {  

    log.debug(session.getRemoteAddress().toString() +"----------------------create");  

         }  

    @Override  

    public void sessionIdle(IoSession session, IdleStatus status)  

    throws Exception {  

    log.debug(session.getServiceAddress() +"IDS");  

         }  

    @Override  

    public void sessionOpened(IoSession session) throws Exception {  

    log.debug("连接打开:"+session.getLocalAddress());  

         }  

        }  

    4,服务端缓存客户端socket连接的单例类

    [html] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import java.util.HashMap;  

    import java.util.Map;  

    import org.apache.commons.logging.Log;  

    import org.apache.commons.logging.LogFactory;  

    import org.apache.mina.core.session.IoSession;  

    /**  

     * @Description: 单例工具类,保存所有mina客户端连接  

     * @author whl  

     * @date 2014-9-29 上午10:09:15  

     *  

     */  

    public class SessionMap {  

    private final static Loglog = LogFactory.getLog(SessionMap.class);  

    private static SessionMapsessionMap = null;  

    private Mapmap = new HashMap();  

        //构造私有化 单例  

        private SessionMap(){}  

        /**  

         * @Description: 获取唯一实例  

         * @author whl  

         * @date 2014-9-29 下午1:29:33  

         */  

        public static SessionMap newInstance(){  

            log.debug("SessionMap单例获取---");  

    if(sessionMap == null){  

    sessionMap = new SessionMap();  

            }  

            return sessionMap;  

        }  

        /**  

         * @Description: 保存session会话  

         * @author whl  

         * @date 2014-9-29 下午1:31:05  

         */  

        public void addSession(String key, IoSession session){  

    log.debug("保存会话到SessionMap单例---key=" + key);  

            this.map.put(key, session);  

        }  

        /**  

         * @Description: 根据key查找缓存的session  

         * @author whl  

         * @date 2014-9-29 下午1:31:55  

         */  

        public IoSession getSession(String key){  

    log.debug("获取会话从SessionMap单例---key=" + key);  

            return this.map.get(key);  

        }  

        /**  

         * @Description: 发送消息到客户端  

         * @author whl  

         * @date 2014-9-29 下午1:57:51  

         */  

        public void sendMessage(String[] keys, Object message){  

            for(String key : keys){  

    IoSessionsession = getSession(key);  

    log.debug("反向发送消息到客户端Session---key=" + key + "----------消息=" + message);  

    if(session == null){  

                    return;  

                }  

                session.write(message);  

            }  

        }  

    }  

    5,编码解码器,

    HCoderFactory.java

    [java] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import java.nio.charset.Charset;  

    import org.apache.mina.core.session.IoSession;  

    import org.apache.mina.filter.codec.ProtocolCodecFactory;  

    import org.apache.mina.filter.codec.ProtocolDecoder;  

    import org.apache.mina.filter.codec.ProtocolEncoder;  

    /**

     * @Description: 编码和解码器工厂类.

     * @author whl

     * @date 2014-9-30 下午12:34:59

     *

     */  

    public class HCoderFactory implements ProtocolCodecFactory {  

    private final HEncoder encoder;  

    private final HDecoder decoder;  

    public HCoderFactory() {  

    //this(Charset.defaultCharset());  

    this(Charset.forName("UTF-8"));  

        }  

    public HCoderFactory(Charset charSet) {  

    this.encoder = new HEncoder(charSet);  

    this.decoder = new HDecoder(charSet);  

        }  

    @Override  

    public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {  

    return decoder;  

        }  

    @Override  

    public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {  

    return encoder;  

        }  

    }  

    HDecoder.java

    [java] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import java.nio.charset.Charset;  

    import java.nio.charset.CharsetDecoder;  

    import org.apache.mina.core.buffer.IoBuffer;  

    import org.apache.mina.core.session.IoSession;  

    import org.apache.mina.filter.codec.CumulativeProtocolDecoder;  

    import org.apache.mina.filter.codec.ProtocolDecoderOutput;  

    /**

     * @Description: 解码工具类

     * @author whl

     * @date 2014-9-30 下午12:35:22

     *

     */  

    public class HDecoder extends CumulativeProtocolDecoder {  

    private final Charset charset;  

    public HDecoder(Charset charset) {  

    this.charset = charset;  

        }  

    public boolean doDecode(IoSession session, IoBuffer in,  

    ProtocolDecoderOutput out)throws Exception {  

    //System.out.println("-------doDecode----------");  

            CharsetDecoder cd = charset.newDecoder();  

            String mes = in.getString(cd);  

            out.write(mes);  

    return true;  

    /*

            if (in.remaining() > 4) {// 有数据时,读取字节判断消息长度

                in.mark();// 标记当前位置,以便reset

                int size = in.getInt();

                // 如果消息内容不够,则重置,相当于不读取size

                if (size > in.remaining()) {

                    in.reset();

                    return false;// 接收新数据,以拼凑成完整数据

                } else if (size != 0 && (size - 4 >= 0)) {

                    byte[] bytes = new byte[size - 4];

                    //int protocol = in.getInt();

                    // 拿到客户端发过来的数据组装成基础包写出去

                    in.get(bytes, 0, size - 4);

                    //in.get(bytes, size - 4, size);

                    PackageBeanFactory beanFactory = (PackageBeanFactory) session

                            .getAttribute(ServerHandler.BEAN_FACTORY);

                    //out.write(beanFactory.getPackage(protocol, size, bytes));

                    String mes = in.getString(cd);

                    out.write(mes);

                    // 如果读取内容后还粘了包,就让父类再给读取进行下次解析

                    if (in.remaining() > 0) {

                        return true;

                    }

                }

            }

            return false;// 处理成功,让父类进行接收下个包

    */    

        }  

    }  

    HEncoder.java

    [java] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import java.nio.charset.Charset;  

    import java.nio.charset.CharsetDecoder;  

    import java.nio.charset.CharsetEncoder;  

    import org.apache.commons.logging.Log;  

    import org.apache.commons.logging.LogFactory;  

    import org.apache.mina.core.buffer.IoBuffer;  

    import org.apache.mina.core.session.IoSession;  

    import org.apache.mina.filter.codec.ProtocolEncoder;  

    import org.apache.mina.filter.codec.ProtocolEncoderOutput;  

    /**

     * @Description: 编码工具类

     * @author whl

     * @date 2014-9-30 下午12:35:35

     *

     */  

    public class HEncoder implements ProtocolEncoder {  

    private final static Log log = LogFactory.getLog(HEncoder.class);  

    private final Charset charset;  

    public HEncoder(Charset charset) {  

    this.charset = charset;  

        }  

    @Override  

    public void encode(IoSession session, Object message,  

    ProtocolEncoderOutput out)throws Exception {  

            CharsetEncoder ce = charset.newEncoder();  

            String mes = (String) message;  

    IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);  

            buffer.putString(mes,ce);  

            buffer.flip();  

            out.write(buffer);  

    /*System.out.println("---------encode-------------");

            String mes = (String) message;

            byte[] data = mes.getBytes("UTF-8");

            IoBuffer buffer = IoBuffer.allocate(data.length + 4);

            buffer.putInt(data.length);

            buffer.put(data);

            buffer.flip();

            out.write(buffer);

            out.flush();*/  

        }  

    @Override  

    public void dispose(IoSession session) throws Exception {  

    log.info("Dispose called,session is " + session);  

        }  

    }  

    6,客户端程序

    ClentMain.java

    [java] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import java.net.InetSocketAddress;  

    import java.text.SimpleDateFormat;  

    import java.util.Date;  

    import org.apache.commons.logging.Log;  

    import org.apache.commons.logging.LogFactory;  

    import org.apache.mina.core.filterchain.IoFilterAdapter;  

    import org.apache.mina.core.future.ConnectFuture;  

    import org.apache.mina.core.session.IoSession;  

    import org.apache.mina.filter.codec.ProtocolCodecFilter;  

    import org.apache.mina.filter.logging.LoggingFilter;  

    import org.apache.mina.transport.socket.nio.NioSocketConnector;  

    /**

     * @Description: mina客户端,包含断线重连机制,空闲重连机制

     * @author whl

     * @date 2014-11-2

     */  

    public class ClentMain extends Thread{  

    private final static Log log = LogFactory.getLog(ClentMain.class);  

    @Override  

    public void run() {  

    //ip  

    String host ="192.168.0.38";  

    //端口  

    int port = 6007;  

    //停车场id  

    final String carPark_id = "1";  

    // 创建客户端连接器.  

    final NioSocketConnector connector = new NioSocketConnector();  

    //设置连接超时    

    connector.setConnectTimeoutMillis(30000);   

    // 设置默认访问地址    

    connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));  

    //将IoSession的主键属性注入线程映射表MDC中  

    //connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());    

    //日志过滤器  

    connector.getFilterChain().addLast("logger", new LoggingFilter());  

    // 设置编码过滤器  

    connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new HCoderFactory()));          

    //添加处理器    

    connector.setHandler(new ClintHandler());  

    // 设置接收缓冲区的大小    

    connector.getSessionConfig().setReceiveBufferSize(10240);  

    // 设置输出缓冲区的大小    

    connector.getSessionConfig().setSendBufferSize(10240);  

    /**

             * 空闲重连的机制,根据需要选择相应的配置

             */  

    // 读写都空闲时间:30秒    

    //connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);   

    // 读(接收通道)空闲时间:40秒   

    //connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40);  

    // 写(发送通道)空闲时间:50秒   

    //connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50);   

    //断线重连回调拦截器    

    connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {    

    @Override    

    public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {    

    for(;;){    

    try{    

    Thread.sleep(3000);   

                            ConnectFuture future = connector.connect();    

    future.awaitUninterruptibly();// 等待连接创建成功    

    IoSession session = future.getSession();// 获取会话    

    session.write("key="+carPark_id);  

    if(session.isConnected()){    

    log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");    

    //System.out.println("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  

    break;    

                            }    

    }catch(Exception ex){    

    log.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());    

    //System.out.println("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());    

                        }    

                    }    

                }    

            });  

    //开始连接  

    for (;;) {    

    try {    

                    ConnectFuture future = connector.connect();    

    // 等待连接创建成功    

                    future.awaitUninterruptibly();    

    // 获取会话    

                    IoSession session = future.getSession();    

    //发送消息  

    session.write("key=" + carPark_id);  

    log.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));    

    break;    

    }catch (Exception e) {    

    //System.out.println("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage());    

    log.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);    

    // 连接失败后,重连10次,间隔30s    

    try {  

    Thread.sleep(5000);  

    }catch (InterruptedException e1) {  

                        e1.printStackTrace();  

    log.error("连接服务端失败后,睡眠5秒发生异常!");  

                    }  

                }    

            }  

    // cf.getSession().write("quit");//发送消息  

    //cf.getSession().close();  

    //cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开  

    //connector.dispose();  

        }  

    }  

    ClintHandler.java

    [java] view plain copy

    package cn.hydom.ztc.ztc_app.controller.mina;  

    import org.apache.commons.logging.Log;  

    import org.apache.commons.logging.LogFactory;  

    import org.apache.mina.core.service.IoHandlerAdapter;  

    import org.apache.mina.core.session.IdleStatus;  

    import org.apache.mina.core.session.IoSession;  

    /**

     * @Description: 客户端业务处理类

     * @author whl

     * @date 2014-11-2

     */  

    public class ClintHandler extends IoHandlerAdapter {  

    private final static Log log = LogFactory.getLog(ClintHandler.class);  

    /**

         * 写处理服务端推送的信息的逻辑

         */  

    @Override  

    public void messageReceived(IoSession session, Object message)  

    throws Exception {  

    System.out.println("-----服务顿返回的json数据----");  

            String s = message.toString();  

    System.out.println("message :" + s);  

    System.out.println("message length:" + s.length());  

        }  

    @Override    

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {    

    log.info("-客户端与服务端连接[空闲] - " + status.toString());    

    System.out.println("-客户端与服务端连接[空闲] - " + status.toString());  

    if(session != null){    

    session.close(true);    

            }    

        }    

    }  

    7,测试

    这里就不写实际的代码了,

    1,首先部署web工程启动tomcat,让服务端先运行起来,

    2,然后运行客户端

    [java] view plain copy

    public class ClintTest1 {  

    public static void main(String[] args) throws Exception {  

    ClentMain mina =new ClentMain();  

            mina.start();  

        }  

    }  

    3,想要服务端主动发回来信息,还得在服务端的web工程的action中写一个htpp访问的方法,方法中去sessionMap类中缓存的map中查找session,然后发送消息。

    代码如下,我用的是springmvc

    [java] view plain copy

    /**

         * @Description: 发送消息到客户端

         * @author whl

         * @date 2014-9-29 下午1:18:54

         */  

    @ResponseBody  

    @RequestMapping(value="/sendMessage")  

    public String sendMessage(HttpServletRequest request, String[] carPark_id){  

    try{  

    //  

    //获取链接的参数carPark_id  

    log.debug("carPark_id[].length--------- " + carPark_id.length);  

    for(String id : carPark_id){  

    log.debug("carPark_id --------- " + id);  

                }  

    //这里用的假数据                                                carPark_id = new String[]{"1"};  

    //发送的信息String jsonstr = "123";//反向发送信息log.debug("开始反向发送消息到客户端-------- ");SessionMap sessionMap = SessionMap.newInstance();sessionMap.sendMessage(carPark_id, jsonstr);//返回信息return "发送的信息为" + jsonstr;

    }catch (Exception e) {log.error(e);return "出错了。。";}}

    4,好了,现在重新发布工程,启动服务端和客户端,然后访问这个链接就可以了,当然springmvc需要自己配置,这也不是这里的重点。

    注意:客户端发过来的carPark_id必须与服务端查找的一致,不让就找不到相应的客户端session连接,消息无法发送成功。

    相关文章

      网友评论

          本文标题:mina整合spring ,服务端反向发送消息到客户端 完整实例

          本文链接:https://www.haomeiwen.com/subject/fkiafftx.html