Mina网络通信框架(续)

作者: Louis_陆 | 来源:发表于2016-07-13 11:09 被阅读777次

在看到本文之前,如果读者没看过笔者的前文Mina网络通信框架 ,请先翻阅。

在本节中,我将用 Mina 框架,同时实现服务器和客户端,实现客户端与客户端之间的 IM 通信,使用 Java JDBC 接口连入 MySQL 数据库,实现一个具有登录功能的简易 IM 聊天工具。

首先,我们来看看效果图:

使用 Navicat 可视化操作工具创建并连接数据库


Paste_Image.png

启动 Mina 服务器


Paste_Image.png

启动登录界面


Paste_Image.png

注册失败演示


Paste_Image.png

注册成功演示


Paste_Image.png Paste_Image.png

观察数据库,发现添加了新用户


Paste_Image.png

登录后的聊天窗口


Paste_Image.png

再登录一个用户


Paste_Image.png

发送消息演示

Paste_Image.png

下面,笔者贴出几个核心代码部分:

数据库登录处理逻辑

package database;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.mina.core.session.IoSession;

import util.MessageUtil;
import util.SessionUtil;
import common.Common;

/**
 * 管理客户端的数据库
 */
public class ClientJDBC {

    public ClientJDBC() {
        try {
            // 加载JDBC Driver
            Class.forName("com.mysql.jdbc.Driver");

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    /**
     * <pre>
     * 1、查询是否已存在此用户
     * 2、注册,添加新数据
     */
    public void addNewData(String name, String password, IoSession loginSession) {
        try {
            // 建立连接
            Connection connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/clientdb", "root", "");

            // 创建Statement
            Statement statement = connection.createStatement();
            // 查询是否已存在此用户
            String selectSQL = "select password from client where name=" + "'"
                    + name + "'";
            ResultSet set = statement.executeQuery(selectSQL);
            // 如果不存在此用户
            if (!set.next()) {
                // 添加数据
                PreparedStatement preparedStatement = connection
                        .prepareStatement("insert into client values(null,?,?)");
                preparedStatement.setString(1, name);
                preparedStatement.setString(2, password);
                preparedStatement.executeUpdate();
                // 提示用户注册成功
                loginSession.write(Common.CANREG);
            } else {
                // 提示用户已注册
                loginSession.write(Common.CANTRE);
            }
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * <pre>
     * 1、查询帐号密码是否匹配
     * 2、查询是否用户已登录
     * 3、登录
     */
    public boolean login(String name, String password, IoSession userSession) {
        try {
            // 建立连接
            Connection connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/clientdb", "root", "");

            // 创建Statement
            Statement statement = connection.createStatement();

            // 查询是否已存在此用户
            String selectSQL = "select password from client where name=" + "'"
                    + name + "'";
            ResultSet set = statement.executeQuery(selectSQL);
            // 如果存在此用户
            if (set.next()) {
                // 密码匹配成功
                if (set.getString("password").equals(password)) {
                    // 如果用户没登录
                    if (SessionUtil.getSessionFromName(name) == null) {
                        // 存储用户名
                        userSession.setAttribute("name", name);
                        // 登录成功
                        String s = name;
                        String message = MessageUtil.createSendMessage(Common.LOGIN_SUCCESS, s);
                        userSession.write(message);
                        // 发送在线列表给所有客户端
                        SessionUtil.sendClientListToAll();
                    } else {
                        // 提示用户已登录
                        userSession.write(Common.IS_LOGIN);
                    }
                } else {
                    // 提示帐号和密码不匹配
                    userSession.write(Common.ISNT_MATCH);
                }
            } else {
                // 提示帐号和密码不匹配
                userSession.write(Common.ISNT_MATCH);
            }
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return true;
    }

}

服务器端Handler处理逻辑
注意,此处继承的是StreamIoHandler,为了后续对传输文件进行操作

package server;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;

import thread.IoStreamThreadWork;
import util.SessionUtil;
import common.Common;

/**
 * Mina Handler
 */
public class ServerHandler extends StreamIoHandler {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        System.out.println(session.getRemoteAddress() + "   "
                + "exceptionCaught");
        System.out.println(cause);
    }

    /**
     * mina只能接收以换行符结束的信息
     */
    @Override
    public void messageReceived(IoSession session, Object message) {
        System.out.println(session.getRemoteAddress() + "   "
                + "messageReceived");

        // 解析信息
        String result = (String) message;

        // 注册信息
        if (result.startsWith(Common.REGISTER) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            MinaServer.clientJDBC.addNewData(array[0], array[1], session);
        }
        // 登录信息
        else if (result.startsWith(Common.LOGIN) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            MinaServer.clientJDBC.login(array[0], array[1], session);
        }
        // 登出信息
        else if (result.equals(Common.LOGOUT) && result.length() == 6) {
            // 关闭当前session
            session.close();
            // 发送在线列表给所有客户端
            SessionUtil.sendClientListToAll();
        }
        // 用户之间发送消息
        else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
            String suffix = result.substring(10);
            // 分隔选中用户名和要发送的消息
            String[] array = suffix.split(Common.SEPARATOR);
            // 向指定用户发送消息
            if (session.getAttribute("name") != null)
                SessionUtil.sendToSelectedClient(
                        (String) session.getAttribute("name"), array[0],
                        array[1]);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println(session.getRemoteAddress() + "   " + "messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out
                .println(session.getRemoteAddress() + "   " + "sessionClosed");
    }

    /**
     * 用户创建的时候保留会话的ip和端口
     */
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println(session.getRemoteAddress() + "   "
                + "sessionCreated");
        // 提取客户端ip和端口
        String s = session.getRemoteAddress() + "";
        String port = s.substring(s.indexOf(":") + 1);
        String ip = s.substring((s.indexOf("/") + 1),
                s.length() - 1 - port.length());
        // 存储ip和端口
        if (session.getAttribute("ip") == null)
            session.setAttribute("ip", ip);
        if (session.getAttribute("port") == null)
            session.setAttribute("port", port);
    }

    /**
     * 会话空闲状态
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        System.out.println(session.getRemoteAddress() + "   " + "sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) {
        System.out
                .println(session.getRemoteAddress() + "   " + "sessionOpened");
    }

    @Override
    protected void processStreamIo(IoSession session, InputStream input,
            OutputStream output) {
        System.out.println(session.getRemoteAddress() + "   "
                + "processStreamIo");
        // 设定一个线程池
        // 参数说明:最少数量3,最大数量6 空闲时间 3秒
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
                TimeUnit.SECONDS,
                // 缓冲队列为3
                new ArrayBlockingQueue<Runnable>(3),
                // 抛弃旧的任务
                new ThreadPoolExecutor.DiscardOldestPolicy());
        FileOutputStream fos = null;
        // 此处路径如何动态设定。
        File receiveFile = new File("F:\\111.jks");

        try {
            fos = new FileOutputStream(receiveFile);
        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        } finally {
            try {
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        // 将线程放入线程池 当连接很多时候可以通过线程池处理
        threadPool.execute(new IoStreamThreadWork(input, fos));
    }
}

客户端Handler处理逻辑

package client;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.swing.DefaultListModel;
import javax.swing.JOptionPane;
import javax.swing.JTextArea;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;

import thread.IoStreamThreadWork;

import common.Common;

import frame.ChatFrame;
import frame.LoginFrame;

/**
 * Mina Handler
 */
// IoHandlerAdapter
public class ClientHandler extends StreamIoHandler {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        System.out.println(session.getLocalAddress() + "   "
                + "exceptionCaught");
        System.out.println(cause);
    }

    /**
     * mina只能接收以换行符结束的信息
     */
    @Override
    public void messageReceived(IoSession session, Object message) {
        System.out.println(session.getLocalAddress() + "   "
                + "messageReceived");

        // 解析信息
        String result = (String) message;

        // 注册成功
        if (result.equals(Common.CANREG) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "注册成功!");
        }
        // 用户已存在注册失败
        else if (result.equals(Common.CANTRE) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "用户已存在!");
        }
        // 用户已登录,登录失败
        else if (result.equals(Common.IS_LOGIN) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "用户已登录!");
            session.close();// 关闭这个Session
        }
        // 帐号和密码不匹配,登录失败
        else if (result.equals(Common.ISNT_MATCH) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "帐号和密码不匹配!");
            session.close();// 关闭这个Session
        }
        // 登录成功
        else if (result.startsWith(Common.LOGIN_SUCCESS)
                && result.length() > 10) {
            String suffix = result.substring(10);
            // 从Attribute中取出登录窗口
            LoginFrame frame = (LoginFrame) session.getAttribute("loginFrame");
            // 关闭登录窗口
            frame.dispose();
            // 打开聊天窗口
            new ChatFrame(session).setTitle("用户" + suffix + "的聊天窗口");
        }
        // 服务器发送在线列表消息
        else if (result.startsWith(Common.SERVER) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            // 判断空,因为有loginSession
            if (session.getAttribute("clientListModel") != null) {
                // 取出clientListModel
                DefaultListModel<String> clientListModel = (DefaultListModel<String>) (session
                        .getAttribute("clientListModel"));
                // 清空原视图
                clientListModel.removeAllElements();
                // 重新加入在线用户列表
                for (String clientName : array)
                    clientListModel.addElement(clientName);
            }
        }
        // 客户发送过来的消息
        else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
            String suffix = result.substring(10);
            // if (session.getAttribute("receivedMessage") != null)
            JTextArea jta = (JTextArea) session.getAttribute("receivedMessage");
            jta.append(suffix + "\n");
            // 滚动到底端
            jta.setCaretPosition(jta.getText().length());
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println(session.getLocalAddress() + "   " + "messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println(session.getLocalAddress() + "   " + "sessionClosed");
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out
                .println(session.getLocalAddress() + "   " + "sessionCreated");
    }

    /**
     * 会话空闲状态
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        System.out.println(session.getLocalAddress() + "   " + "sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) {
        System.out.println(session.getLocalAddress() + "   " + "sessionOpened");
    }

    @Override
    protected void processStreamIo(IoSession session, InputStream input,
            OutputStream output) {
        System.out.println(session.getLocalAddress() + "   "
                + "processStreamIo");
        // 客户端发送文件
        File sendFile = new File("e:\\MyKey.jks");
        FileInputStream fis = null;
        try {
            fis = new FileInputStream(sendFile);

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        // 放入线程让其执行
        // 客户端一般都用一个线程实现即可 不用线程池
        new IoStreamThreadWork(fis, output).start();
        return;

    }
}

在 Mina 的使用过程中,应注意以下问题:

【1】客户端IoSession.setAttribute后,服务器IoSession.getAttribute为null,这两个session虽然getId一样,但不是同一个session

【2】使用TextLineCodecFactory,messageReceived方法默认只能接收以换行符结束的信息

【3】使用getAttribute时应注意判断空指针:if (session.getAttribute("name") != null && session.getAttribute("name").equals(name))
否则不会报错,但下面的程序段会终止执行。可在exceptionCaught方法中catch到异常

【4】每个客户端对应一个ConnectFuture,每个ConnectFuture对应一个IoSession

【5】可以巧用session.setAttribute("loginFrame", LoginFrame.this);来进行窗口的传递,注意:如果在监听中直接传this,要注意是不是LoginFrame.this

【6】mina不能动态修改acceptor的ProtocolCodecFilter和Handler

【7】传输文件可通过StreamIoHandler和MessageEncoder的方式

整个案例的源码及相关 jar 包在百度云中给出链接 Mina,读者如要运行项目,还需自行安装 MySQL 创建数据库。

谢谢您的关注和支持!

相关文章

网友评论

  • a7839c598e4e:【1】客户端IoSession.setAttribute后,服务器IoSession.getAttribute为null,这两个session虽然getId一样,但不是同一个session 麻烦请教一下这个问题是怎么解决的啊
  • 香沙小熊:下载在链接失效
  • 随想先感:android客户端可以接受服务器端返回的byte字节数组吗??测试的可以接受字符串,但是如果服务器端返回的是字节数组,就不会执行messageReceived这个方法
    Louis_陆:@smile007 可以的,自己写编码器和解码器就好
  • e76edf6f881d:請問import util.MessageUtil;是使用哪個jar?
  • acdf53ad6702:试运行了一下你的项目,运行时缺少一个包slf4j-nop-1.6.4.jar,我想把这个集成到一个APP的后台服务器中去,但是发现运行中有一些小BUG。。
    acdf53ad6702:@陆嘉杰 主要是对你这个项目不是很理解。。我想改版一下丢到服务器中去,给客户端APP调用。。但是发现理解不了。。。
    Louis_陆:@G蜗牛为梦想而生 数据库也要自行准备好
  • 客观开发者:能做成WAP端吗
    Louis_陆: @抬头笑对过往 我没有尝试过,理论上可行

本文标题:Mina网络通信框架(续)

本文链接:https://www.haomeiwen.com/subject/keesjttx.html