美文网首页
CyclicBarrier 栅栏的使用

CyclicBarrier 栅栏的使用

作者: SinX竟然被占用了 | 来源:发表于2017-09-27 19:56 被阅读0次

文章来源:《Java 7 并发编程实战》


CyclicBarrier 允许两个或者多个线程在某个点上进行同步。

一、CyclicBarrier 简介

CyclicBarrier 类使用一个整型数进行初始化,这个数代表需要在某个点(栅栏)上进行同步的线程数。

当一个线程到达同步点之后,他将调用await()方法等待其他线程。当线程调用await()方法之后,CyclicBarrier对象会阻塞这个线程并使之休眠直到所有其他县城到达。当最后一个线程调用CyclicBarrier对象的await()方法之后,CyclicBarrier对象将唤醒所有在等待的线程,然后这些线程将继续执行。

CyclicBarrier类还可以传入一个Runnable对象作为初始化参数。当所有的线程都到达同步点之后,CyclicBarrier对象将这个Runnable对象作为线程执行。这个特性使得CyclicBarrier在并行任务上可以不媲美分治编程技术。


二、案例

在这个案例中,我们将在数字矩阵中想寻找一个数字出现的次数(使用分治编程技术)。这个矩阵会被分成几个子集,然后每个线程在一个子集中查找。一旦所有的线程都完成了查找,最终的任务将统一这些结果。

MatrixMock类:

import java.util.Random;

/**
 * 此矩阵类负责随机生成一个在1~10之间的数字矩阵.
 */
public class MatrixMock {

    private int[][] data;

    /**
     * 构造函数
     * @param row    行数
     * @param col    列数
     * @param number    查找的数字
     */
    public MatrixMock(int row, int col, int number) {

        int counter = 0;
        this.data = new int[row][col];
        Random random = new Random();

        //用随机数字填充矩阵
        for(int i = 0; i < row; i ++) {
            for(int j = 0; j < col; j++) {
                data[i][j] = random.nextInt(10);
                if(data[i][j] == number) {
                    counter++;
                }
            }
        }

        //将在矩阵中查找到的次数打印到控制台
        System.out.printf("Mock: There are %d ocurrences of %d in " +
                "generated data.\n", counter, number);
    }

    /**
     * 返回矩阵某行的数据
     * @param row    行号
     * @return    如果该行存在, 返回该行的数据; 否则返回nul
     */
    public int[] getRow(int row) {

        if((row >= 0) && (row < data.length)) {
            return data[row];
        }
        return null;
    }

}

Results类:

/**
 * 保存矩阵中每行找到的指定数字的次数
 */
public class Results {

    private int[] data;

    /**
     * 构造函数
     * @param size
     */
    public Results(int size) {
        this.data = new int[size];
    }

    public void setData(int position, int value) {
        data[position] = value;
    }

    public int[] getData() {
        return data;
    }
}

Searcher类:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 查找类。每个查找类单独分配一个线程,是一个子任务。
 */
public class Searcher implements Runnable {

    //查找的起始行号
    private int firstRow;
    //查找的终止行号
    private int lastRow;
    //查找的矩阵
    private MatrixMock matrixMock;
    //结果集合
    private Results results;
    //待查找的数字
    private int number;

    private final CyclicBarrier barrier;

    /**
     * 构造函数
     * @param firstRow      起始行号
     * @param lastRow       结束行号
     * @param matrixMock    矩阵
     * @param results       结果集
     * @param number        查找的数字
     * @param barrier       栅栏
     */
    public Searcher(int firstRow, int lastRow, MatrixMock matrixMock,
                    Results results, int number, CyclicBarrier barrier) {

        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.matrixMock = matrixMock;
        this.results = results;
        this.number = number;
        this.barrier = barrier;
    }

    @Override
    public void run() {

        int counter;

        System.out.printf("%s: Processing lines " +
                "from %d to %d.\n",Thread.currentThread().getName(), firstRow, lastRow);

        for(int i = firstRow; i < lastRow; i++) {
            int row[] = matrixMock.getRow(i);   //第i行
            counter = 0;
            for(int j = 0; j < row.length; j++) {
                if(row[j] == number) {
                    counter++;
                }
            }
            results.setData(i, counter);
        }

        System.out.printf("%s: Lines processed.\n", Thread.currentThread().getName());

        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

    }
}

Grouper类:

/**
 * 汇总类。
 */
public class Grouper implements Runnable {

    /**
     * 结果集
     */
    private Results results;

    public Grouper(Results results) {
        this.results = results;
    }

    @Override
    public void run() {

        int finalResult = 0;
        System.out.printf("Grouper: Processing results...\n");
        int data[] = results.getData();
        for(int count : data) {
            finalResult += count;
        }
        System.out.printf("Grouper: Total result: %d.\n",finalResult);
    }
}

Main类:

import java.util.concurrent.CyclicBarrier;

public class Main {

    public static void main(String[] args) {

        final int ROW = 10000;   //行
        final int COL = 1000;   //列
        final int TARGET_NUMBER = 5;    //需要搜索的数字
        final int PARTICIPANTS = 5;     //子任务的个数
        final int LINES_PARTICIPANT = 2000;

        //创建矩阵
        MatrixMock matrixMock = new MatrixMock(ROW, COL, TARGET_NUMBER);
        //创建结果集
        Results results = new Results(ROW);
        Grouper grouper = new Grouper(results);
        CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);

        //创建查找类
        Searcher[] searchers = new Searcher[PARTICIPANTS];
        for(int i = 0; i < PARTICIPANTS; i++) {
            searchers[i] = new Searcher(i*LINES_PARTICIPANT,
                    i*LINES_PARTICIPANT + LINES_PARTICIPANT, matrixMock,
                    results, TARGET_NUMBER, barrier);

            Thread thread = new Thread(searchers[i]);
            thread.start();
        }

        System.out.printf("Main: The main thread has finished.\n");
    }
}

控制台结果:

image.png

三、扩展内容

CyclicBarrier 类还提供了另一种 await() 方法:

await(long time, TimeUnit unit):这个方法被调用后,线程将会一直休眠到被中断,或者CyclicBarrier的内部计数器到达0, 或者指定的时间已经过期。TimeUnit类有多种常量: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS.

CyclicBarrier 类也提供了 getNumberWaiting() 方法和
getParties() 方法,前者将返回在 await() 上阻塞的线程数,后者返回被 CyclicBarrier 对象同步的任务数。

重置 CyclicBarrier 对象

CyclicBarrier 类与 CountDownLatch 有很多共性,但是也有一些不同。最主要的不同点是,CyclicBarrier 对象可以重置到它的初始状态,重新分配新的值给内部计数器,即使它已经被初始过了。

可以使用 CyclicBarrier 的 reset() 方法来进行重置操作。当这个方法被调用后,全部的正在 await() 方法里等待的线程接收到一个 BrokenBarrierException 异常。此异常在例子中已经用打印 stack trace 处理了,但是在一个更复制的应用,它可以执行一些其他操作,例如重新开始执行或者在中断点恢复操作。

破坏 CyclicBarrier 对象

CyclicBarrier 对象可能处于一个特殊的状态,称为 broken。当多个线程正在 await() 方法中等待时,其中一个被中断了,此线程会收到 InterruptedException 异常,但是其他正在等待的线程将收到 BrokenBarrierException 异常,并且 CyclicBarrier 会被置于broken 状态中。

CyclicBarrier 类提供了 isBroken() 方法,如果对象在 broken 状态,返回true,否则返回false。

相关文章

网友评论

      本文标题:CyclicBarrier 栅栏的使用

      本文链接:https://www.haomeiwen.com/subject/abfpextx.html