Mina网络通信框架(续)

作者: Louis_陆 | 来源:发表于2016-07-13 11:09 被阅读777次

    在看到本文之前,如果读者没看过笔者的前文Mina网络通信框架 ,请先翻阅。

    在本节中,我将用 Mina 框架,同时实现服务器和客户端,实现客户端与客户端之间的 IM 通信,使用 Java JDBC 接口连入 MySQL 数据库,实现一个具有登录功能的简易 IM 聊天工具。

    首先,我们来看看效果图:

    使用 Navicat 可视化操作工具创建并连接数据库


    Paste_Image.png

    启动 Mina 服务器


    Paste_Image.png

    启动登录界面


    Paste_Image.png

    注册失败演示


    Paste_Image.png

    注册成功演示


    Paste_Image.png Paste_Image.png

    观察数据库,发现添加了新用户


    Paste_Image.png

    登录后的聊天窗口


    Paste_Image.png

    再登录一个用户


    Paste_Image.png

    发送消息演示

    Paste_Image.png

    下面,笔者贴出几个核心代码部分:

    数据库登录处理逻辑

    package database;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    import org.apache.mina.core.session.IoSession;
    
    import util.MessageUtil;
    import util.SessionUtil;
    import common.Common;
    
    /**
     * 管理客户端的数据库
     */
    public class ClientJDBC {
    
        public ClientJDBC() {
            try {
                // 加载JDBC Driver
                Class.forName("com.mysql.jdbc.Driver");
    
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * <pre>
         * 1、查询是否已存在此用户
         * 2、注册,添加新数据
         */
        public void addNewData(String name, String password, IoSession loginSession) {
            try {
                // 建立连接
                Connection connection = DriverManager.getConnection(
                        "jdbc:mysql://localhost:3306/clientdb", "root", "");
    
                // 创建Statement
                Statement statement = connection.createStatement();
                // 查询是否已存在此用户
                String selectSQL = "select password from client where name=" + "'"
                        + name + "'";
                ResultSet set = statement.executeQuery(selectSQL);
                // 如果不存在此用户
                if (!set.next()) {
                    // 添加数据
                    PreparedStatement preparedStatement = connection
                            .prepareStatement("insert into client values(null,?,?)");
                    preparedStatement.setString(1, name);
                    preparedStatement.setString(2, password);
                    preparedStatement.executeUpdate();
                    // 提示用户注册成功
                    loginSession.write(Common.CANREG);
                } else {
                    // 提示用户已注册
                    loginSession.write(Common.CANTRE);
                }
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * <pre>
         * 1、查询帐号密码是否匹配
         * 2、查询是否用户已登录
         * 3、登录
         */
        public boolean login(String name, String password, IoSession userSession) {
            try {
                // 建立连接
                Connection connection = DriverManager.getConnection(
                        "jdbc:mysql://localhost:3306/clientdb", "root", "");
    
                // 创建Statement
                Statement statement = connection.createStatement();
    
                // 查询是否已存在此用户
                String selectSQL = "select password from client where name=" + "'"
                        + name + "'";
                ResultSet set = statement.executeQuery(selectSQL);
                // 如果存在此用户
                if (set.next()) {
                    // 密码匹配成功
                    if (set.getString("password").equals(password)) {
                        // 如果用户没登录
                        if (SessionUtil.getSessionFromName(name) == null) {
                            // 存储用户名
                            userSession.setAttribute("name", name);
                            // 登录成功
                            String s = name;
                            String message = MessageUtil.createSendMessage(Common.LOGIN_SUCCESS, s);
                            userSession.write(message);
                            // 发送在线列表给所有客户端
                            SessionUtil.sendClientListToAll();
                        } else {
                            // 提示用户已登录
                            userSession.write(Common.IS_LOGIN);
                        }
                    } else {
                        // 提示帐号和密码不匹配
                        userSession.write(Common.ISNT_MATCH);
                    }
                } else {
                    // 提示帐号和密码不匹配
                    userSession.write(Common.ISNT_MATCH);
                }
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return true;
        }
    
    }
    
    

    服务器端Handler处理逻辑
    注意,此处继承的是StreamIoHandler,为了后续对传输文件进行操作

    package server;
    
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.handler.stream.StreamIoHandler;
    
    import thread.IoStreamThreadWork;
    import util.SessionUtil;
    import common.Common;
    
    /**
     * Mina Handler
     */
    public class ServerHandler extends StreamIoHandler {
    
        @Override
        public void exceptionCaught(IoSession session, Throwable cause) {
            System.out.println(session.getRemoteAddress() + "   "
                    + "exceptionCaught");
            System.out.println(cause);
        }
    
        /**
         * mina只能接收以换行符结束的信息
         */
        @Override
        public void messageReceived(IoSession session, Object message) {
            System.out.println(session.getRemoteAddress() + "   "
                    + "messageReceived");
    
            // 解析信息
            String result = (String) message;
    
            // 注册信息
            if (result.startsWith(Common.REGISTER) && result.length() > 10) {
                String suffix = result.substring(10);
                String[] array = suffix.split(",");
                MinaServer.clientJDBC.addNewData(array[0], array[1], session);
            }
            // 登录信息
            else if (result.startsWith(Common.LOGIN) && result.length() > 10) {
                String suffix = result.substring(10);
                String[] array = suffix.split(",");
                MinaServer.clientJDBC.login(array[0], array[1], session);
            }
            // 登出信息
            else if (result.equals(Common.LOGOUT) && result.length() == 6) {
                // 关闭当前session
                session.close();
                // 发送在线列表给所有客户端
                SessionUtil.sendClientListToAll();
            }
            // 用户之间发送消息
            else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
                String suffix = result.substring(10);
                // 分隔选中用户名和要发送的消息
                String[] array = suffix.split(Common.SEPARATOR);
                // 向指定用户发送消息
                if (session.getAttribute("name") != null)
                    SessionUtil.sendToSelectedClient(
                            (String) session.getAttribute("name"), array[0],
                            array[1]);
            }
        }
    
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            System.out.println(session.getRemoteAddress() + "   " + "messageSent");
        }
    
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            System.out
                    .println(session.getRemoteAddress() + "   " + "sessionClosed");
        }
    
        /**
         * 用户创建的时候保留会话的ip和端口
         */
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println(session.getRemoteAddress() + "   "
                    + "sessionCreated");
            // 提取客户端ip和端口
            String s = session.getRemoteAddress() + "";
            String port = s.substring(s.indexOf(":") + 1);
            String ip = s.substring((s.indexOf("/") + 1),
                    s.length() - 1 - port.length());
            // 存储ip和端口
            if (session.getAttribute("ip") == null)
                session.setAttribute("ip", ip);
            if (session.getAttribute("port") == null)
                session.setAttribute("port", port);
        }
    
        /**
         * 会话空闲状态
         */
        @Override
        public void sessionIdle(IoSession session, IdleStatus status) {
            System.out.println(session.getRemoteAddress() + "   " + "sessionIdle");
        }
    
        @Override
        public void sessionOpened(IoSession session) {
            System.out
                    .println(session.getRemoteAddress() + "   " + "sessionOpened");
        }
    
        @Override
        protected void processStreamIo(IoSession session, InputStream input,
                OutputStream output) {
            System.out.println(session.getRemoteAddress() + "   "
                    + "processStreamIo");
            // 设定一个线程池
            // 参数说明:最少数量3,最大数量6 空闲时间 3秒
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
                    TimeUnit.SECONDS,
                    // 缓冲队列为3
                    new ArrayBlockingQueue<Runnable>(3),
                    // 抛弃旧的任务
                    new ThreadPoolExecutor.DiscardOldestPolicy());
            FileOutputStream fos = null;
            // 此处路径如何动态设定。
            File receiveFile = new File("F:\\111.jks");
    
            try {
                fos = new FileOutputStream(receiveFile);
            } catch (FileNotFoundException e1) {
                e1.printStackTrace();
            } finally {
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 将线程放入线程池 当连接很多时候可以通过线程池处理
            threadPool.execute(new IoStreamThreadWork(input, fos));
        }
    }
    

    客户端Handler处理逻辑

    package client;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    import javax.swing.DefaultListModel;
    import javax.swing.JOptionPane;
    import javax.swing.JTextArea;
    
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.handler.stream.StreamIoHandler;
    
    import thread.IoStreamThreadWork;
    
    import common.Common;
    
    import frame.ChatFrame;
    import frame.LoginFrame;
    
    /**
     * Mina Handler
     */
    // IoHandlerAdapter
    public class ClientHandler extends StreamIoHandler {
    
        @Override
        public void exceptionCaught(IoSession session, Throwable cause) {
            System.out.println(session.getLocalAddress() + "   "
                    + "exceptionCaught");
            System.out.println(cause);
        }
    
        /**
         * mina只能接收以换行符结束的信息
         */
        @Override
        public void messageReceived(IoSession session, Object message) {
            System.out.println(session.getLocalAddress() + "   "
                    + "messageReceived");
    
            // 解析信息
            String result = (String) message;
    
            // 注册成功
            if (result.equals(Common.CANREG) && result.length() == 6) {
                JOptionPane.showMessageDialog(null, "注册成功!");
            }
            // 用户已存在注册失败
            else if (result.equals(Common.CANTRE) && result.length() == 6) {
                JOptionPane.showMessageDialog(null, "用户已存在!");
            }
            // 用户已登录,登录失败
            else if (result.equals(Common.IS_LOGIN) && result.length() == 6) {
                JOptionPane.showMessageDialog(null, "用户已登录!");
                session.close();// 关闭这个Session
            }
            // 帐号和密码不匹配,登录失败
            else if (result.equals(Common.ISNT_MATCH) && result.length() == 6) {
                JOptionPane.showMessageDialog(null, "帐号和密码不匹配!");
                session.close();// 关闭这个Session
            }
            // 登录成功
            else if (result.startsWith(Common.LOGIN_SUCCESS)
                    && result.length() > 10) {
                String suffix = result.substring(10);
                // 从Attribute中取出登录窗口
                LoginFrame frame = (LoginFrame) session.getAttribute("loginFrame");
                // 关闭登录窗口
                frame.dispose();
                // 打开聊天窗口
                new ChatFrame(session).setTitle("用户" + suffix + "的聊天窗口");
            }
            // 服务器发送在线列表消息
            else if (result.startsWith(Common.SERVER) && result.length() > 10) {
                String suffix = result.substring(10);
                String[] array = suffix.split(",");
                // 判断空,因为有loginSession
                if (session.getAttribute("clientListModel") != null) {
                    // 取出clientListModel
                    DefaultListModel<String> clientListModel = (DefaultListModel<String>) (session
                            .getAttribute("clientListModel"));
                    // 清空原视图
                    clientListModel.removeAllElements();
                    // 重新加入在线用户列表
                    for (String clientName : array)
                        clientListModel.addElement(clientName);
                }
            }
            // 客户发送过来的消息
            else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
                String suffix = result.substring(10);
                // if (session.getAttribute("receivedMessage") != null)
                JTextArea jta = (JTextArea) session.getAttribute("receivedMessage");
                jta.append(suffix + "\n");
                // 滚动到底端
                jta.setCaretPosition(jta.getText().length());
            }
        }
    
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            System.out.println(session.getLocalAddress() + "   " + "messageSent");
        }
    
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            System.out.println(session.getLocalAddress() + "   " + "sessionClosed");
        }
    
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            System.out
                    .println(session.getLocalAddress() + "   " + "sessionCreated");
        }
    
        /**
         * 会话空闲状态
         */
        @Override
        public void sessionIdle(IoSession session, IdleStatus status) {
            System.out.println(session.getLocalAddress() + "   " + "sessionIdle");
        }
    
        @Override
        public void sessionOpened(IoSession session) {
            System.out.println(session.getLocalAddress() + "   " + "sessionOpened");
        }
    
        @Override
        protected void processStreamIo(IoSession session, InputStream input,
                OutputStream output) {
            System.out.println(session.getLocalAddress() + "   "
                    + "processStreamIo");
            // 客户端发送文件
            File sendFile = new File("e:\\MyKey.jks");
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(sendFile);
    
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            // 放入线程让其执行
            // 客户端一般都用一个线程实现即可 不用线程池
            new IoStreamThreadWork(fis, output).start();
            return;
    
        }
    }
    

    在 Mina 的使用过程中,应注意以下问题:

    【1】客户端IoSession.setAttribute后,服务器IoSession.getAttribute为null,这两个session虽然getId一样,但不是同一个session

    【2】使用TextLineCodecFactory,messageReceived方法默认只能接收以换行符结束的信息

    【3】使用getAttribute时应注意判断空指针:if (session.getAttribute("name") != null && session.getAttribute("name").equals(name))
    否则不会报错,但下面的程序段会终止执行。可在exceptionCaught方法中catch到异常

    【4】每个客户端对应一个ConnectFuture,每个ConnectFuture对应一个IoSession

    【5】可以巧用session.setAttribute("loginFrame", LoginFrame.this);来进行窗口的传递,注意:如果在监听中直接传this,要注意是不是LoginFrame.this

    【6】mina不能动态修改acceptor的ProtocolCodecFilter和Handler

    【7】传输文件可通过StreamIoHandler和MessageEncoder的方式

    整个案例的源码及相关 jar 包在百度云中给出链接 Mina,读者如要运行项目,还需自行安装 MySQL 创建数据库。

    谢谢您的关注和支持!

    相关文章

      网友评论

      • a7839c598e4e:【1】客户端IoSession.setAttribute后,服务器IoSession.getAttribute为null,这两个session虽然getId一样,但不是同一个session 麻烦请教一下这个问题是怎么解决的啊
      • 香沙小熊:下载在链接失效
      • 随想先感:android客户端可以接受服务器端返回的byte字节数组吗??测试的可以接受字符串,但是如果服务器端返回的是字节数组,就不会执行messageReceived这个方法
        Louis_陆:@smile007 可以的,自己写编码器和解码器就好
      • e76edf6f881d:請問import util.MessageUtil;是使用哪個jar?
      • acdf53ad6702:试运行了一下你的项目,运行时缺少一个包slf4j-nop-1.6.4.jar,我想把这个集成到一个APP的后台服务器中去,但是发现运行中有一些小BUG。。
        acdf53ad6702:@陆嘉杰 主要是对你这个项目不是很理解。。我想改版一下丢到服务器中去,给客户端APP调用。。但是发现理解不了。。。
        Louis_陆:@G蜗牛为梦想而生 数据库也要自行准备好
      • 客观开发者:能做成WAP端吗
        Louis_陆: @抬头笑对过往 我没有尝试过,理论上可行

      本文标题:Mina网络通信框架(续)

      本文链接:https://www.haomeiwen.com/subject/keesjttx.html