美文网首页Java多线程系列
多线程并发框架使用一

多线程并发框架使用一

作者: 丹青水 | 来源:发表于2018-03-19 10:28 被阅读0次
Semaphore

[ˈseməfɔ:(r)] 计数信号量,是用来控制线程的并发数量,它可以协调各个线程,以保证合理是使用资源,常用于流量控制

ReentrantLock

一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回可使用

Condition

java中条件变量都实现了java.util.concurrent.locks.Condition接口,条件变量的实例化是通过一个Lock对象上调用newCondition()方法来获取的,条件就和一个锁对象绑定起来了。因此,Java中的条件变量只能和锁配合使用,来控制并发程序访问竞争资源的安全。条件变量的出现是为了更精细控制线程等待与唤醒,在Java5之前,线程的等待与唤醒依靠的是Object对象的wait()和notify()/notifyAll()方法,这样的处理不够精细。简单讲,就是消费者/生产者的场景中,在原来的基础上,增加了队列满时及时通知消费者,队列空时及时通知生产者的优化,通常是两个条件变量一起出现,一个控制值,但两个条件变量可以毫无关系,终归来说还是在Lock的范围内。所以,从本质上来说,是对Object监视器的场景性优化,而不是全新机制的引入。

Exchanger

它允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中。

例子1(多进顺序执行)
public class SemaphoreDemo1 {
    private Semaphore semaphore=new Semaphore(10);//初始化信号量的大小,new Semaphore(10,isFair) isfair代表是否公平信号的标识,公平就是按照调用顺序,非公平系统随机
    private ReentrantLock reentrantLock=new ReentrantLock();//重用锁
    public void fly(){
        try {
            System.out.println("first:" + Thread.currentThread().getName() + "firstTime:" +System.currentTimeMillis());
            semaphore.acquire(3);//每个线程需要的信号量,假如有10个线程同时进来,每个需要3个通行证,则同时能进来3个,7个需要等待。
            long start= System.currentTimeMillis();
            System.out.println("threadName:" + Thread.currentThread().getName() + "beginTime:" +start);
            Thread.sleep(500);
            System.out.println("threadName:" + Thread.currentThread().getName() + "endTime:" + (System.currentTimeMillis()));
            System.out.println("avaliable permit x:"+semaphore.availablePermits()); //可使用的通行证数量
            System.out.println("qune thread : " + semaphore.getQueueLength());//获取等待许可的数量
            reentrantLock.lock();
            for(int i=0;i<5;i++){
                System.out.println("threadName:" + Thread.currentThread().getName() + "out:" + i);
            }
            reentrantLock.unlock();
        }catch (Exception e){
           System.err.println(e.getMessage());
        }finally {
            semaphore.release(3);
            System.out.println("avaliable permit xx:" + semaphore.availablePermits());
    }
    }
}
public class SemaphoreTest {
    public  static void main(String[] ags){
        SemaphoreDemo1 semaphoreDemo1=new SemaphoreDemo1();
        List<Thread> threadList= new ArrayList<>();
        for(int i=0;i<10;i++){
           threadList.add( new Thread(new DoWhat(semaphoreDemo1)));
        }
        for(Thread thread:threadList){
              thread.start();
        }
    }
};
class DoWhat implements Runnable {
    private SemaphoreDemo1 semaphoreDemo1;
    public DoWhat(SemaphoreDemo1 semaphoreDemo1) {
        this.semaphoreDemo1 = semaphoreDemo1;
    }

    public void run() {
        semaphoreDemo1.fly();
    }
}
例子2(模拟线程池)
public class PoolConnectDemo {
    private int maxSize=10;
    private int semaphoreLimit=2;
    private List<String> connections;
    private Semaphore limit=new Semaphore(semaphoreLimit);
    private ReentrantLock lock=new ReentrantLock();
    private Condition condition=lock.newCondition();//条件
    public PoolConnectDemo() {
        super();
        for(int i=0;i<maxSize;i++){
            this.connections.add("pool:"+i);
        }
    }
    public String get(){
        String  returnStr=null;
        try {
            limit.acquire();//限制并发线程
            lock.lock();//加锁
            while (connections.size()==0){//如果么有
                condition.await();//线程等待其他线程释放,挂起
            }
            returnStr=connections.remove(0);
            lock.unlock();//释放锁
        }catch (Exception e){

        }
        return  returnStr;

    }
    public void put(String connect){
        lock.lock();
        connections.add(connect);
        condition.signalAll();//通知其他等待线程继续执行,相当于notify和notifyAll
        lock.unlock();
        limit.release();
    }
}
例子3(两个线程之间数据交换)
public class ExchangerDemo {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final Exchanger<String> exchanger = new Exchanger<String>();
        service.execute( new Runnable() {
            public void run() {
                try {
                    String food = "food" ;
                    System.out.println("threadName: " + Thread.currentThread().getName() + "exchange " + food + " in");
                    Thread.sleep(( long ) (Math.random() * 10000 ));
                    String money = (String) exchanger.exchange(food);
                    System.out.println( "threadName: " + Thread.currentThread().getName()
                            + "exchange data out: " + money);
                } catch (Exception e) {

                }
            }
        });
        service.execute( new Runnable() {
            public void run() {
                try {
                    String money = "money" ;
                    System.out.println("threadName: " + Thread.currentThread().getName() + "exchange " + money + " in" );
                    Thread.sleep(( long ) (Math.random() * 10000 ));
                    String food = (String) exchanger.exchange(money);
                    System.out.println( "threadName: " + Thread.currentThread().getName() + "exchage data out:" + food);
                } catch (Exception e) {
                }
            }
        });
        service.shutdown();
    }
}

相关文章

网友评论

    本文标题:多线程并发框架使用一

    本文链接:https://www.haomeiwen.com/subject/xpulqftx.html