美文网首页
java之NIO处理UDP收发

java之NIO处理UDP收发

作者: PuHJ | 来源:发表于2019-03-07 16:34 被阅读0次

本文大纲如下:

  • 1、写作背景
  • 2、基本的UDP包收发用法
  • 3、采用NIO方式处理UDP

一、背景

本篇内容,主要来源是在对公司代码重构。公司一个项目是采用UDP方式通信,在UDP的不可靠基础上,封装成可靠的通信协议。其本质是UDP+协议的方式,因今天的重点是UDP通信,所以只讲解UDP模块。由于APP有N个的通信对象,之前的代码中,也就有了N个线程监听接收的消息,N个线程发送消息。这样就会使用大量的线程,而且监听的线程一直处于阻塞状态,效率低下。在这种情况下,也就有必要对此模块进行重构了。

二、基本的UDP包收发用法

这也是公司之前的用法,比较简单粗暴,好处是开发成本低,但后期业务增加的时候,性能会有所下降

对于收发UDP包,需要localIp + localPort + remoteIp + remotePort,属于端对端的通信

1)、UDP发送数据
   public static void Send(byte[] data, int offset, int length, int localPort, InetAddress remoteAddress, 
                                              int remotePort) throws Exception {
        if (remoteAddress == null || remotePort <= 0) {
            throw new Exception("Null remote address !!!");
        }

        if (data == null || offset < 0 || length <= 0) {
            throw new Exception("null send data !!!");
        }

        // 会分配一个可用的本地端口
        DatagramSocket socket = new DatagramSocket(null);
       // 多个UDP socket绑定相同的端口
        socket.setReuseAddress(true);
       // 绑定本地端口
        socket.bind(new InetSocketAddress(localPort));

        // 封装成Packet
        DatagramPacket packet = new DatagramPacket(data, offset, length, remoteAddress, remotePort);
        socket.send(packet);

        socket.close();
    }

发送UDP包流程:

  • 构建DatagramSocket
  • 绑定本地发送端口
  • 构建发送的UDP数据包
  • 发送
  • 关闭Socket
2)、UDP接收数据

DatagramSocket socket = new MulticastSocket(null);
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(listenPort));

protected Runnable listenLoop = new Runnable() {
        @Override
        public void run() {
            byte[] receiveBuffer = new byte[1024];
            DatagramPacket packet = new DatagramPacket(receiveBuffer, receiveBuffer.length);

            while (listenRunning) {
                if (socket != null && !socket.isClosed()) {
                    try {
                        socket.receive(packet);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

接收UDP包流程:

  • 构建DatagramSocket
  • 绑定本地发送端口
  • 构建接收的UDP数据包
  • socket.receive(packet);
  • 关闭Socket

三、NIO重构UDP收发模块

1)、思路

NIO是同步非阻塞方式,将DatagramChannel向Selector选择器注册,使用一个Thread轮询Selector,当网卡准备数据时,就能告知用户开始处理发送或接收事件。总之,一切的数据发送和接收前,都得到Selector注册,得到了Selector的“允许”后,才能处理后续的工作。


接收和发送

2)、核心代码

// 发送接口
public interface Sender extends Closeable {

    // 触发异步的发送请求
    boolean postSendAsync() throws IOException;

    void send(String message,InetSocketAddress remoteAddress);
}
// 接收接口
public interface Receiver extends Closeable {

    // 触发异步的接收请求
    boolean postReceiveAsync() throws IOException;

    // 开始监听
    void start();
}
// 用于Channel向Selector注册
public interface IoProvider extends Closeable {

    boolean registerInput(DatagramChannel channel, HandleProviderCallback callback);

    boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback);

    void unRegisterInput(DatagramChannel channel);

    void unRegisterOutput(DatagramChannel channel);


    abstract class HandleProviderCallback implements Runnable {

        @Override
        public final void run() {
            onProviderIo();
        }

        /**
         * 可以进行接收或者发送时的回调
         *
         */
        protected abstract void onProviderIo();

    }

}

// 实现了Sender和Receiver
class DatagramChannelAdapter implements Sender,Receiver,Closeable {

    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicBoolean isSending = new AtomicBoolean();
    private final DatagramChannel channel;
    private final IoProvider ioProvider;
    private final UdpDataDispatcher dispatcher;

    private final Queue<UDPSendSnapshot> queue = new ConcurrentLinkedQueue<>();
    private final ReceiveUdpListener receiverUdpListener;

    DatagramChannelAdapter(DatagramChannel channel, IoProvider ioProvider, ReceiveUdpListener receiverUdpListener) throws IOException {
        this.channel = channel;
        this.ioProvider = ioProvider;
        this.receiverUdpListener = receiverUdpListener;
        dispatcher = new UdpDataDispatcher(channel);

        // 非阻塞模式下操作
        channel.configureBlocking(false);
    }

    @Override
    public boolean postReceiveAsync() throws IOException {
        if (isClosed.get()) {
            throw new IOException("Current channel is closed!");
        }

        // 注册能不能输入
        return ioProvider.registerInput(channel, inputCallback);
    }

    @Override
    public void start() {
        try {
            postReceiveAsync();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean postSendAsync() throws IOException {
        if (isClosed.get()) {
            throw new IOException("Current channel is closed!");
        }

        // 当前发送的数据附加到回调中
        return ioProvider.registerOutput(channel, outputCallback);
    }

    @Override
    public void send(String message,InetSocketAddress remoteAddress) {
        queue.offer(new UDPSendSnapshot(message,remoteAddress));
        requestSend();
    }

    private void requestSend() {
        if (isSending.compareAndSet(false,true) ) {
            if (queue.size() <= 0){
                isSending.set(false);
                return;
            }
            try {
                if (!postSendAsync()) {
                    isSending.set(false);
                }
            } catch (IOException e) {
                e.printStackTrace();
                CloseUtils.close(this);
            }
        }

    }

    @Override
    public void close() throws IOException {
        if (isClosed.compareAndSet(false, true)) {
            // 解除注册回调
            ioProvider.unRegisterInput(channel);
            ioProvider.unRegisterOutput(channel);
            // 关闭
            CloseUtils.close(channel);
        }
    }

    // 输入的数据操作
    private final IoProvider.HandleProviderCallback inputCallback = new IoProvider.HandleProviderCallback() {
        @Override
        protected void onProviderIo() {


            if (isClosed.get()) {
                return;
            }
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!inputCallback");
            ReceiveUdpData receiveUdp = dispatcher.receive();

            try {
                if (receiveUdp == null) {
                    throw new IOException();
                }
                postReceiveAsync();
                receiverUdpListener.onReceiveUdpListener(receiveUdp.getBytes(),receiveUdp.getTotal(),receiveUdp.getAddress(),receiveUdp.getPort());
            } catch (IOException e) {
                CloseUtils.close(DatagramChannelAdapter.this);
            }
        }
    };

    // 输出的数据操作
    private final IoProvider.HandleProviderCallback outputCallback = new IoProvider.HandleProviderCallback() {
        @Override
        protected void onProviderIo() {
            if (isClosed.get() || queue.size() == 0) {
                return;
            }
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!outputCallback");
            synchronized (isSending) {
                UDPSendSnapshot snapshot = queue.poll();
                dispatcher.sendMessage(snapshot.getMessage(),snapshot.getRemoteAddress());
                isSending.set(false);
            }
        }
    };


    /**
     * 收到监听UDP消息之后的回调
     */
    interface ReceiveUdpListener {
        void onReceiveUdpListener(byte[] data, int length, InetSocketAddress address, int port);
    }
}
public class IoSelectorProvider implements IoProvider {

    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    // 是否处于某个过程
    private final AtomicBoolean inRegInput = new AtomicBoolean(false);
    private final AtomicBoolean inRegOutput = new AtomicBoolean(false);

    // 读和写的数据选择器
    private final Selector readSelector;
    private final Selector writeSelector;
    private final ExecutorService dataHandlePool;

    private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();
    private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();


    public IoSelectorProvider() throws IOException {
        readSelector = Selector.open();
        writeSelector = Selector.open();
        dataHandlePool = Executors.newFixedThreadPool(4,
                new Factory.NameableThreadFactory("IoProvider-Thread-"));

        // 开始输出输入的监听
        startRead();
        startWrite();
    }

    private void startRead() {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (!isClosed.get()) {
                    try {
                        if (readSelector.select() == 0) {
                            waitSelection(inRegInput);
                            continue;
                        } else if (inRegInput.get()) {
                            waitSelection(inRegInput);
                        }

                        Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            if (selectionKey.isValid()) {
                                // 对应着下面的两种形式 可读
                                System.out.println("可读的回调");
                                handleSelection(selectionKey,
                                        SelectionKey.OP_READ, inputCallbackMap, dataHandlePool, inRegInput);

                            }
                            iterator.remove();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ClosedSelectorException ignored) {
                        break;
                    }
                }
            }
        };

        // 启动线程
        new Thread(runnable)
                .start();
    }

    private void startWrite() {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (!isClosed.get()) {
                    try {
                        if (writeSelector.select() == 0) {
                            waitSelection(inRegOutput);
                            continue;
                        } else if (inRegOutput.get()) {
                            waitSelection(inRegOutput);
                        }

                        Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            if (selectionKey.isValid()) {

                                // 可写
                                if (selectionKey.isWritable()) {
                                    System.out.println("可写的回调");
                                    handleSelection(selectionKey,
                                            SelectionKey.OP_WRITE, outputCallbackMap, dataHandlePool, inRegOutput);
                                }

                            }
                            iterator.remove();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ClosedSelectorException ignored) {
                        break;
                    }
                }
            }
        };

        // 启动线程
        new Thread(runnable)
                .start();
    }


    private static void handleSelection(SelectionKey key, int keyOps,
                                        HashMap<SelectionKey, Runnable> map,
                                        ExecutorService pool, AtomicBoolean locker) {
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            try {
                // 重点
                // 取消继续对keyOps的监听
                key.interestOps(key.readyOps() & ~keyOps);
            } catch (CancelledKeyException e) {
                return;
            }
        }

        Runnable runnable = null;
        try {
            runnable = map.get(key);
        } catch (Exception ignored) {

        }

        if (runnable != null && !pool.isShutdown()) {
            // 异步调度
            pool.execute(runnable);
        }
    }


    @Override
    public boolean registerInput(DatagramChannel channel, HandleProviderCallback callback) {
        return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,
                inputCallbackMap, callback) != null;
    }

    @Override
    public boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback) {
        return registerSelection(channel, writeSelector, SelectionKey.OP_WRITE, inRegOutput,
                outputCallbackMap, callback) != null;
    }

    @Override
    public void unRegisterInput(DatagramChannel channel) {
        unRegisterSelection(channel, readSelector, inputCallbackMap, inRegInput);
    }

    @Override
    public void unRegisterOutput(DatagramChannel channel) {
        unRegisterSelection(channel, writeSelector, outputCallbackMap, inRegOutput);
    }

    private static SelectionKey registerSelection(DatagramChannel channel, Selector selector,
                                                  int registerOps, AtomicBoolean locker,
                                                  HashMap<SelectionKey, Runnable> map,
                                                  Runnable runnable) {

        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            // 设置锁定状态
            locker.set(true);

            try {
                // 唤醒当前的selector,让selector不处于select()状态
                selector.wakeup();

                SelectionKey key = null;
                if (channel.isRegistered()) {
                    // 查询是否已经注册过
                    key = channel.keyFor(selector);

                }

                if (key != null) {
                    key.interestOps(key.readyOps() | registerOps);
                }

                if (key == null) {
                    // 注册selector得到Key
                    key = channel.register(selector, registerOps);
                    // 注册回调
                    map.put(key, runnable);
                }

                return key;
            } catch (ClosedChannelException
                    | CancelledKeyException
                    | ClosedSelectorException e) {
                e.printStackTrace();
                return null;
            } finally {
                // 解除锁定状态
                locker.set(false);
                try {
                    // 通知
                    locker.notify();
                } catch (Exception ignored) {
                }
            }
        }
    }

    private static void unRegisterSelection(DatagramChannel channel, Selector selector,
                                            Map<SelectionKey, Runnable> map,
                                            AtomicBoolean locker) {
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            locker.set(true);
            selector.wakeup();
            try {
                if (channel.isRegistered()) {
                    SelectionKey key = channel.keyFor(selector);
                    if (key != null) {
                        // 取消监听的方法
                        key.cancel();
                        map.remove(key);
                    }
                }
            } finally {
                locker.set(false);
                try {
                    locker.notifyAll();
                } catch (Exception ignored) {
                }
            }
        }
    }

    private static void waitSelection(final AtomicBoolean locker) {
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            if (locker.get()) {
                try {
                    locker.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (isClosed.compareAndSet(false, true)) {
            dataHandlePool.shutdown();

            inputCallbackMap.clear();
            outputCallbackMap.clear();

            CloseUtils.close(readSelector);
        }
    }
}

相关文章

网友评论

      本文标题:java之NIO处理UDP收发

      本文链接:https://www.haomeiwen.com/subject/rzpcpqtx.html