美文网首页
并行模式与算法(二)

并行模式与算法(二)

作者: AaronSimon | 来源:发表于2018-09-18 22:09 被阅读0次

    1.矩阵算法

    在矩阵乘法中,第一个矩阵的列数和第二个矩阵的行数必须是相同的。如果需要进行并行计算,一种简单的策略是可以将A矩阵进行水平分割,得到子矩阵A1和A2,B矩阵进行垂直分割,得到子矩阵B1和B2。此时,我们只要分别计算这些子矩阵的乘积,将结果进行拼接,就能得到原始矩阵A和B的乘积。当然这个过程是可额予以反复进行的。为了计算A1*A2,我们可以进一步将A1和B1进行分解,直到我们认为子矩阵的大小已经在可接受的范围内。

    在这里我们使用FockJoin框架来实现这个并行矩阵相乘的想法。为了方便矩阵计算,我们使用jMatrces开源软件,作为矩阵计算的工具。其中,使用的主要API如下:

    • Matrix:代表一个矩阵
    • MatrixOperator.multiply(Matrix, Matrix):矩阵相乘
    • Matrix.row():获得矩阵的行数
    • Matrix.getSubMatrix():获得矩阵的子矩阵
    • MatrixOperator.horizontalConcatenation(Matrix, Matrix):将两个矩阵进行水平连接
    • MatrixOperator.verticalConcatenation(Matrix, Matrix):将两个矩阵进行垂直连接

    定义一个任务类计算矩阵乘法,如果输入的矩阵粒度太大,则会再次进行任务分解:

    public class MatrixMulTask extends RecursiveTask<Matrix> {
        Matrix m1;
        Matrix m2;
        String pos;
        /**
         * 构造函数
         * @parm m1 矩阵1
         * @parm m2 矩阵2
         * @parm pos 乘积结果在父矩阵相乘结果中所处的位置
         */
        public MatrixMulTask(Matrix m1,Matrix m2,String pos) {
            this.m1 = m1;
            this.m2 = m2;
            this.pos = pos;
        }
    
        @Override
        protected Matrix compute() {
            if(m1.rows() <= MatrixMulTask.granularity || m2.cols()<=MatrixMulTask.granularity) {
                Matrix mRe = MatrixOperator.multiply(m1, m2);
                return mRe;
            } else {
                //继续分割矩阵
                int rows;
                rows = m1.rows();
                //左乘矩阵横向分割
                Matrix m11 = m1.getSubMatrix(1, 1, rows/2, m1.cols());
                Matrix m12 = m1.getSubMatrix(rows/2+1, 1, m1.rows(), m1.cols());
                //右乘矩阵纵向分割
                Matrix m21 = m2.getSubMatrix(1, 1, m2.rows(), m12.cols()/2);
                Matrix m22 = m2.getSubMatrix(1, m2.cols()/2+1, m2.rows(), m2.cols());
    
                ArrayList<MatrixMulTask> subTasks = new ArrayList<MatrixMulTask>();
                MatrixMulTask tmp = null;
                tmp = new MatrixMulTask(m11, m21, "m1");
                subTasks.add(tmp);
                tmp = new MatrixMulTask(m11, m22, "m2");
                subTasks.add(tmp);
                tmp = new MatrixMulTask(m12, m21, "m3");
                subTasks.add(tmp);
                tmp = new MatrixMulTask(m12, m22, "m4");
                subTasks.add(tmp);
                for(MatrixMulTask t : subTasks) {
                    t.fork();
                }
                Map<String, Matrix> matrixMap = new HashMap<String,Matrix>();
                for(MatrixMulTask t :subTasks) {
                    matrixMap.put(t.pos, t.join());
                }
                Matrix tmp1 = MatrixOperator.horizontalConcatenation(matrixMap.get("m1"), matrixMap.get("m2"));
                Matrix tmp2 = MatrixOperator.horizontalConcatenation(matrixMap.get("m3"), matrixMap.get("m4"));
                Matrix reM = MatrixOperator.verticalConcatenation(tmp1, tmp2);
                return reM;
            }
        }
    }
    

    MatrixMulTask中的成员变量m1和m2表示要相乘的两个矩阵,pos表示这个乘积结果在父矩阵相乘结果中所处的位置,有m1,m2,m3,和m4等四种。先对矩阵进行分割,分割后得到m11、m12、m21和m22等四个任务,并将它们进行子任务的创建。然后计算这些子任务,最后将m1,m2,m3,和m4拼接成新的矩阵作为最终结果。

    主函数:

    public static final int granularity = 3;
    public static void main(String[] args) throws InterruptedException, ExecutionException {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            //创建两个300*300的随机矩阵
            Matrix m1 = MatrixFactory.getRandomMatrix(100, 100, null);
            Matrix m2 = MatrixFactory.getRandomMatrix(100, 100, null);
            MatrixMulTask task = new MatrixMulTask(m1, m2, null);
            ForkJoinTask<Matrix> result = forkJoinPool.submit(task);
            Matrix pr = result.get();
            System.out.println(pr);
        }
    

    二、网络NIO

    Java NIO是New IO的简称。其涉及到的基础内容有通道(Channel)和缓冲区(Buffer)、文件IO和网络IO。

    2.1 基于Socket的服务端的多线程模式

    这里,以Echo服务器为例。对于Echo服务器,它会读取客户端的一个输入,并将这个输入原封不动地返回给客户端。服务器会为每一个客户端连接启用一个线程,这个新的线程将全心全意为这个客户端服务。为了接受客户端连接,服务器还会额外使用一个派发线程。下面是服务端代码:

    public class MultiThreadEchoServer {
        // 使用线程池处理每个客户端连接
        private static ExecutorService tp = Executors.newCachedThreadPool();
        //定义了HandleMsg线程,它由一个客户端Socket构成,它的任务是读取这个Socket的内容并
        //将其进行返回,返回成功后,任务完成,客户端Socket就被正常关闭
        static class HandleMsg implements Runnable {
            Socket clientSocket;
            public HandleMsg(Socket clientSocket) {
                this.clientSocket = clientSocket;
            }
    
            @Override
            public void run() {
                BufferedReader is = null;
                PrintWriter os = null;
                try {
                    is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    os = new PrintWriter(clientSocket.getOutputStream(),true);
                    String inputLine = null;
                    long b = System.currentTimeMillis();
                    while((inputLine = is.readLine()) != null) {
                        os.println(inputLine);
                    }
                    // 统计并输出了服务端线程处理一次客户端请求所花费的时间(包括读取数据和回写数据的时间)
                    long e = System.currentTimeMillis();
                    System.out.println("spend:" + (e-b)+"ms");
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if(is!=null) is.close();
                        if(os!=null) os.close();
                        clientSocket.close();
                    } catch (Exception e2) {
                        e2.printStackTrace();
                    }
                }
            }
        }
    
        public static void main(String[] args) {
            ServerSocket echoServer = null;
            Socket clientSocket = null;
            try {
                echoServer = new ServerSocket(8000);
            } catch (Exception e) {
                System.out.println(e);
            }
    
            while(true) {
                try {
                    //客户端连接,创建HandleMsg线程进行处理
                    clientSocket = echoServer.accept();
                    System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
                    tp.execute(new HandleMsg(clientSocket));
                } catch (Exception e) {
                    System.out.println(e);
                }
            }
        }
    }
    

    定义一个重量级的客户端:

    
    public class HeavySocketClient {
        private static ExecutorService tp = Executors.newCachedThreadPool();
        private static final int sleep_time = 1000*1000*1000;
        public static class EchoClient implements Runnable {
            @Override
            public void run() {
                Socket client = null;
                PrintWriter writer = null;
                BufferedReader reader = null;
                try {
                    client = new Socket();
                    client.connect(new InetSocketAddress("localhost",8000));
                    writer = new PrintWriter(client.getOutputStream(),true);
                    writer.write("H");
                    LockSupport.parkNanos(sleep_time);
                    writer.write("e");
                    LockSupport.parkNanos(sleep_time);
                    writer.write("l");
                    LockSupport.parkNanos(sleep_time);
                    writer.write("l");
                    LockSupport.parkNanos(sleep_time);
                    writer.write("o");
                    LockSupport.parkNanos(sleep_time);
                    writer.write("!");
                    LockSupport.parkNanos(sleep_time);
                    writer.println();
                    writer.flush();
    
                    reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
                    System.out.println("from server:" + reader.readLine());
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        if(writer!=null)writer.close();
                        if(reader!=null)reader.close();
                        if(client!=null)client.close();
                    } catch (Exception e2) {
                    }
                }
            }
        }
        public static void main(String[] args) {
            EchoClient ec = new EchoClient();
            for(int i=0; i<10; i++) {
                tp.execute(ec);
            }
        }
    }
    

    上述代码定义了的客户端,它会进行10次请求。每一次请求都会访问8000端口。连接成功后,会向服务器输出“Hello!”字符串,但是在这一次交互中,客户端会延时进行输出,每次只输出一个字符,之后进行1秒的等待。因此,整个过程会持续6秒。

    对于服务端来说,每一个请求的处理时间都在6秒左右。这很容易理解,因为服务器要先读入客户端的输入,而客户端缓慢的处理速度(也可能是一个拥塞的网络环境)使得服务器花费了不少等待时间。在这个案例中,服务器处理请求之所以慢,并不是因为在服务器端有繁重的任务,而仅仅是因为服务线程在等待IO。

    2.2 使用NIO进行网络编程

    在NIO中的一个关键组件Channel(通道)。Channel有点类似于流,一个Channel可以和文件或者网络Socket对应。如果Channel对应着一个Socket,那么往这个Channel中写数据,就等同于向Socket中写入数据。

    和Channel一起使用的另一个重要组件就是Buffer。可以简单地把Buffer理解成一个内存区域或者byte数组。数据需要包装成Buffer的形式才能和Channel交互(写入或者读取)。

    另一个与Channel密切相关的是Selector(选择器)。在Channel的众多实现中,有一个SelectableChannel实现,表示可被选择的通道。任何一个SelectableChannel都可以将自己注册到一个Selector中。这样,这个Channel就能被Selector所管理。而一个Selector可以管理多个SelectableChannel。当SelectableChannel的数据准备好时,Selector就会接到通知,得到那些已经准备好的数据。而SocketChannel就是SelectableChannel的一种。

    一个Selector可以由一个线程进行管理,而一个SelectableChannel则可以表示一个客户端连接,因此这就构成由一个或者极少数线程,来处理大量客户端连接的结构。当与客户端连接的数据没有准备好时,Selector会处于等待状态,而一旦有任何一个SelectableChannel准备好了数据,Selector就能立即得到通知,获取数据进行处理。

    下面是NIO服务器的核心代码:

    //处理所有的网络连接
    private Selector selector;
    //线程池针对每个客户端进行相应处理
    private ExecutorService tp = Executors.newCachedThreadPool();
    //用于统计服务器线程在一个客户端上花费了多少时间
    public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);
    // 启动NIO Server
    private void startServer() throws Exception {
        // 通过工厂方法获得一个Selector对象的实例
        selector = SelectorProvider.provider().openSelector();
        // 获得表示服务端的SocketChannel实例
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 将这个SocketChannel设置为非阻塞模式
        // 在这种模式下,我们才可以向Channel注册感兴趣的事件,并且在数据准备好时,得到必要的通知
        ssc.configureBlocking(false);
    
        InetSocketAddress isa = new InetSocketAddress("localhost", 8000);
        // 将这个CHannel绑定到8000端口
        ssc.socket().bind(isa);
        // 将这个ServerSocketChannel绑定到Selector上,并注册它感兴趣的时间为Accept
        // 当Selector发现ServerSocketChannel有新的客户端连接时,就会通知ServerSocketChannel进行处理。
        // 方法register()返回值是一个SelectionKey,SelectionKey表示一对Selector和Channel的关系。
        // 当Channel注册到Selector上时,就相当于确定了两者的服务关系,那么SelectionKey就是这个契约。
        // 当Selector或者Channel被关闭时,它们对应的SelectionKey就会失败
        SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
        //无穷循环,它的主要任务就是等待-分发网络消息
        for(;;) {
            // select()方法是一个阻塞方法。如果当前没有任何数据准备好,它就会等待。一旦有数据可读,
            // 它就会返回。它的返回值是已经准备就绪的SelectionKey的数量。这里简单地将其忽略。
            selector.select();
            // 获取那些准备好的SelectionKey。因为Selector同时为多个Channel服务,因此已经准备就绪的Channel就有可能是多个。
            Set readyKeys = selector.selectedKeys();
            Iterator i = readyKeys.iterator();
            long e = 0;
            // 使用迭代器遍历整个集合
            while(i.hasNext()) {
                // 根据迭代器获得一个集合内的SelectionKey实例
                SelectionKey sk = (SelectionKey)i.next();
                // 将这个元素移除!注意,这个非常重要,否则就会重复处理相同的SelectionKey。
                i.remove();
                //当前SelectionKey所代表的Channel是否在Acceptable状态
                if(sk.isAcceptable()) {
                    // 客户端的接收
                    doAccept(sk);
                }
                //判断Channel是否已经可以读了
                else if(sk.isValid() && sk.isReadable()) {
                    // 为了统计系统处理每一个连接的时间,记录在读取数据之前的一个时间戳。
                    if(!time_stat.containsKey(((SocketChannel)sk.channel()).socket()))
                        time_stat.put(((SocketChannel)sk.channel()).socket(), System.currentTimeMillis());
                        // 读取
                    doRead(sk);
                }
                // 判断通道是否准备好进行写
                else if(sk.isValid() && sk.isWritable()) {
                    // 写
                    doWrite(sk);
                    e = System.currentTimeMillis();
                    long b = time_stat.remove(((SocketChannel)sk.channel()).socket());
                    System.out.println("spend:" + (e-b) +"ms");
                }
            }
        }
    }
    

    doAccept()方法,它与客户端建立连接:

    private void doAccept(SelectionKey sk) {
        ServerSocketChannel server = (ServerSocketChannel)sk.channel();
        SocketChannel clientChannel;
        try {
            // 生成的clientChannel就表示和客户端通信的通道
            clientChannel = server.accept();
            // 将这个Channel配置为非阻塞模式,也就是要求系统在准备好IO后,再通知我们的线程来读取或者写入。
            clientChannel.configureBlocking(false);
            //将新生成的Channel注册到selector选择器上,并告诉Selector,我现在对读(OP_READ)操作感兴趣。这样,
            //当Selector发现这个Channel已经准备好读时,就能给线程一个通知。
            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
            //新建一个对象实例,一个EchoClient实例代表一个客户端
        EchoClient echoClient = new EchoClient();
            // 我们将这个客户端实例作为附件,附加到表示这个连接的SelectionKey上。这样在整个连接的处理过程中,
            // 我们都可以共享这个EchoClient实例
            clientKey.attach(echoClient);
    
            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("Accepted connection from " + clientAddress.getHostAddress() + ".");
        } catch (Exception e) {
            System.out.println("False to accept new client.");
            e.printStackTrace();
        }
    }
    

    EchoClient的定义很简单,它封装了一个队列,保存在需要回复给这个客户端的所有信息,这样,再进行回复时,只要outq对象中弹出元素即可。

    public class EchoClient {
        private LinkedList<ByteBuffer> outq;
        EchoClient() {
            outq = new LinkedList<ByteBuffer>();
        }
    
        public LinkedList<ByteBuffer> getOutQueue() {
            return outq;
        }
    
        public void enqueue(ByteBuffer bb) {
            outq.addFirst(bb);
        }
    }
    

    当Channel可以读取时,doRead()方法就会被调用:

    private void doRead(SelectionKey sk) {
        // 得到当前的客户端Channel
        SocketChannel channel = (SocketChannel)sk.channel();
        // 准备8K的缓冲区读取数据
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;
    
        try {
            // 所有读取的数据存放在变量bb中
            len = channel.read(bb);
            if(len < 0) {
                disconnect(sk);
                return;
            }
        } catch (Exception e) {
            System.out.println("Failed to read from client.");
            e.printStackTrace();
            disconnect(sk);
            return;
        }
        // 重置缓冲区,为数据处理做准备
        bb.flip();
        tp.execute(new HandleMsg(sk, bb, selector));
    }
    

    为了模拟复杂的场景,使用了线程池进行数据处理。这样,如果数据处理很复杂,就能在单独的线程中进行,而不用阻塞任务派发线程。HandleMsg的实现也很简单:

    public class HandleMsg implements Runnable {
        SelectionKey sk;
        ByteBuffer bb;
        Selector selector;
    
        public HandleMsg(SelectionKey sk, ByteBuffer bb, Selector selector) {
            this.sk = sk;
            this.bb = bb;
            this.selector = selector;
        }
    
        @Override
        public void run() {
            EchoClient echoClient = (EchoClient)sk.attachment();
            echoClient.enqueue(bb);
            sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            // 强迫selector立即返回
            selector.wakeup();
        }
    }
    

    上述代码中,简单地将接收到的数据压入EchoClient的队列。如果需要处理业务逻辑,就可以在这里进行处理。在数据处理完成后,就可以准备将结果写回到客户端,因此,重新注册感兴趣的消息事件,将写操作(OP_WRITE)也作为感兴趣的事件进行提交。这样在通道准备好写入时,就能通知线程。

    写入操作使用doWrite()函数实现:

    private void doWrite(SelectionKey sk) {
        // 这个SelectionKey与doRead()拿到的SelectionKey是同一个
        SocketChannel channel = (SocketChannel)sk.channel();
        // 获取共享EchoClient
        EchoClient echoClient = (EchoClient)sk.attachment();
        // 获取发送内容列表
        LinkedList<ByteBuffer> outq = echoClient.getOutQueue();
        // 获取列表顶部元素
        ByteBuffer bb = outq.getLast();
        try {
            // 进行写会操作
            int len = channel.write(bb);
            if(len == -1) {
                disconnect(sk);
                return;
            }
    
            if(bb.remaining() == 0) {
                // 全部发送完成,则移除这个缓冲对象
                outq.removeLast();
            }
        } catch (Exception e) {
            System.out.println("Failed to write to client");
            e.printStackTrace();
            disconnect(sk);
        }
    
        if(outq.size()==0) {
            // 如果不移除,每次Channel准备好写时,都会执行doWrite()方法(此时无数据可写,显得不合理)
            sk.interestOps(SelectionKey.OP_READ);
        }
    }
    

    使用NIO技术后,即使客户端迟钝或者网络延迟,也不会给服务器带来太大的问题。

    2.3 使用NIO实现客户端

    前面的我们使用NIO来实现服务端,使用Socket编程来构建我们的客户端。下面我们使用NIO实现客户端。与NIO服务端类似,核心元素也是Selector,Channel和SelectionKey。

    初始化Selector和Channel:

    private Selector selector;
    public void init(String ip, int port) throws IOException {
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        this.selector = SelectorProvider.provider().openSelector();
        // 由于当前Channel是非阻塞的,因此,connect()方法返回时,连接并不一定建立成功,
        // 在后续使用这个连接时,还需要使用finishConnect()再次确认
        channel.connect(new InetSocketAddress(ip, port));
        channel.register(selector, SelectionKey.OP_CONNECT);
    }
    

    客户端主要执行逻辑:

    public void working() throws Exception {
        while(true) {
            if(!selector.isOpen()) {
                break;
            }
            selector.select();
            Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
            while(ite.hasNext()) {
                SelectionKey key = ite.next();
                ite.remove();
                // 连接事件发生
                if(key.isConnectable()) {
                    connect(key);
                } else if(key.isReadable()) {
                    read(key);
                }
            }
        }
    }
    

    主要处理两个事件,首先是表示连接就绪的Connect事件(由connect()函数处理)以及表示通道可读的Read事件(由read()函数处理)。

    函数connect()的实现如下:

    
    public void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel)key.channel();
        //如果正在连接,则完成连接
        if(channel.isConnectionPending()) {
            channel.finishConnect();
        }
        channel.configureBlocking(false);
        // 向Channel写入数据并同时注册读事件作为感兴趣的事件
        channel.write(ByteBuffer.wrap(new String("hello server!\r\n").getBytes()));
        channel.register(this.selector, SelectionKey.OP_READ);
    }
    

    当Channel可读时,会执行read()方法,进行数据读取:

    
    public void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel)key.channel();
        //创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(100);
        channel.read(buffer);
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("客户端收到信息:" + msg);
        channel.close();
        key.selector().close();
    }
    

    上述read()函数首先创建了100字节的缓冲区(第4行),接着从Channel中读取数据,并将其打印到控制台上。最后,关闭Channel和Selector。

    三、AIO

    AIO是异步IO的缩写。虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身还是同步的。
    对于AIO,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此,AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。下面通过AIO实现一个简单的EchoServer以及对应的客户端的。

    3.1 AIO EchoServer的实现

    异步IO需要使用异步通道(AsynchronousServerSocketChannel):

        public final static int PORT = 8000;
        // 使用AsynchronousServerSocketChannel异步Channel作为服务器,变量名为server
        private AsynchronousServerSocketChannel server;
        public AIOEchoServer() throws IOException {
            server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));
    }
    

    使用这个server来进行客户端的接收和处理:

    public void start() throws InterruptedException, ExecutionException, TimeoutException {
        System.out.println("Server listen on " + PORT);
        // 注册事件和事件完成后的处理器   
        server.accept(null,new CompletionHandler<AsynchronousSocketChannel, Object>() {
            final ByteBuffer buffer = ByteBuffer.allocate(1024);
            @Override
            public void completed(AsynchronousSocketChannel result,
                    Object attachment) {
                System.out.println(Thread.currentThread().getName());
                Future<Integer> writeResult = null;
                try {
                    buffer.clear();
                    result.read(buffer).get(100,TimeUnit.SECONDS);
                    buffer.flip();
                    writeResult = result.write(buffer);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        server.accept(null,this);
                        writeResult.get();
                        result.close();
                    } catch(Exception e) {
                        System.out.println(e.toString());
                    }
                }
            }
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("Failed: " + exc);
            }
        });
    }
    

    AsynchronousServerSocketChannel.accept()方法会立即返回。它并不会真的去等待客户端的到来。在这里使用的accept()方法的签名为:

    public final <A> void accept(A attachment,
                             CompletionHandler<AsynchronousSocketChannel,? super A> handler)
    

    它的第一个参数是一个附件,可以是任意类型,作用是让当前线程和后续的回调方法可以共享信息,它会在后续调用中,传递给handler。它的第二个参数是CompletionHandler接口。这个接口有两个方法:

    //成功被回调
    void completed(V result, A attachment);
    //失败被回调
    void failed(Throwable exc, A attachment);
    

    AsynchronousServerSocketChannel.accept()实际上做了两件事,第一是发起accept请求,告诉系统可以开始监听端口了。第二,注册CompletionHandler实例,告诉系统,一旦有客户端前来连接,如果成功连接,就去执行CompletionHandler.completed()方法;如果连接失败,就去执行CompletionHandler.failed()方法。
    所以,server.accept()方法不会阻塞,它会立即返回。

    下面,来分析一下CompletionHandler.completed()的实现。当completed()被执行时,意味着已经有客户端成功连接了。使用read()方法读取客户的数据。AsynchronousServerSocketChannel.read()方法也是异步的,换句话说它不会等待读取完成了再返回,而是立即返回,返回的结果是一个Future,因此这里就是Future模式的典型应用。为了编程方便,这里直接调用Future.get()方法,进行等待,将这个异步方法变成了同步方法。因此,在其执行完成后,数据读取就已经完成了。

    之后,将数据回写给客户端。这里调用AsynchronousServerSocketChannel.write()方法。这个方法不会等待数据全部写完,也是立即返回的。同样,它返回的也是Future对象。

    再之后,服务器进行下一个客户端连接的准备。同时关闭当前正在处理的客户端连接。但在关闭之前,得先确保之前的write()操作已经完成,因此,使用Future.get()方法进行等待。

    接下来,只需要在主函数中调用这个start()方法就可以开启服务器了:

    public static void main(String[] args) throws Exception {
        new AIOEchoServer().start();
        while (true) {
            Thread.sleep(1000);
        }
    }
    

    上述代码第2行,调用start()方法开启服务器。但由于start()方法里使用的都是异步方法,因此它会马上返回,并不像阻塞方法那样会进行等待。因此,如果想让程序驻守执行,第4~6行的等待语句是必需的。否则,在start()方法后,不等客户端到来,程序已经运行完成,主线程序就将退出。

    3.2 AIO Echo客户端实现

    客户端全部使用异步回调来实现。

    public class AIOClient {
        public static void main(String[] args) throws Exception {
            // 打开AsynchronousSocketChannel通道
            final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            // 让客户端去连接指定的服务器,并注册了一系列事件
            client.connect(new InetSocketAddress("localhost", 8000),null,new CompletionHandler<Void, Object>() {
                @Override
                public void completed(Void result, Object attachment) {
                    // 进行数据写入,向服务端发送数据,这个过程也是异步的,会很快返回
                    client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler<Integer, Object>() {
                        @Override
                        public void completed(Integer result, Object attachment) {
                            try {
                                // 准备进行数据读取,从服务端读取回写的数据
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                client.read(buffer,buffer,new CompletionHandler<Integer, ByteBuffer>() {
     
                                    @Override
                                    public void completed(Integer result,
                                            ByteBuffer buffer) {
                                        buffer.flip();
                                        // 打印接收到的数据
                                        System.out.println(new String(buffer.array()));
                                        try {
                                            client.close();
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                        }
                                    }
                                    @Override
                                    public void failed(Throwable exc,
                                            ByteBuffer attachment) {
                                    }
                                });
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        @Override
                        public void failed(Throwable exc, Object attachment) {
                        }
                    });
                }
                @Override
                public void failed(Throwable exc, Object attachment) {
                }
            });
            // 由于主线程马上结束,这里等待处理全部完成
            Thread.sleep(1000);
        }
    }
    

    相关文章

      网友评论

          本文标题:并行模式与算法(二)

          本文链接:https://www.haomeiwen.com/subject/uemynftx.html