美文网首页写作与程序
java初入多线程17

java初入多线程17

作者: 胖琪的升级之路 | 来源:发表于2017-10-27 17:05 被阅读4次

使用Disruptor 实现消费者和生产者

<!--引入需要的jar包 -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.7</version>
</dependency>
// 主方法
public class CusProMain {
    
    public static void main(String[] args) throws Exception {
        ExecutorService  exector = Executors.newCachedThreadPool() ;
        int bufferSize = 1024 ;
        PCDataFactory  eventFactory = new PCDataFactory() ;
        
        Disruptor<PCdataTwo> disruptor =  new Disruptor<>(eventFactory, 
                bufferSize,
                exector,
                ProducerType.MULTI,
                new BlockingWaitStrategy()
                );
        disruptor.handleEventsWithWorkerPool(
                new ConsumerTwo(),
                new ConsumerTwo(),
                new ConsumerTwo(),
                new ConsumerTwo()
                );

        disruptor.start(); 
        
        RingBuffer<PCdataTwo> ringBuffer = disruptor.getRingBuffer() ;
        ProducerTwo producer = new ProducerTwo(ringBuffer) ;
        ByteBuffer bb =ByteBuffer.allocate(8);
        for(long l = 0 ; true ; l++){
            bb.putLong(0 , l);
            producer.pushDate(bb);
            Thread.sleep(100);
            System.out.println("add Data "+ l);
        }
        
    }

}

// 消费者 
public class ConsumerTwo implements WorkHandler<PCdataTwo> {

    @Override
    public void onEvent(PCdataTwo event) throws Exception {
        System.out.println(Thread.currentThread().getId() + ":Event: --"
            +event.getValue() * event.getValue() + "--");
    }
}

//生产者
private final RingBuffer<PCdataTwo> ringBuffer ;
    
    public ProducerTwo(RingBuffer<PCdataTwo> ringBuffer){
        this.ringBuffer = ringBuffer ;
    }
    
    
    public void pushDate (ByteBuffer bb){
        long sequence = ringBuffer.next();  
        try {
            PCdataTwo event = ringBuffer.get(sequence);
            event.setValue(bb.getLong());
        } catch (Exception e) {
        }finally{       
            ringBuffer.publish(sequence);
        }
    }
  • 我们在主方法操作中将缓冲区设置成1024 , 在这里有四个消费者, 有一个生产者不断向缓冲区存入数据。结果如图所示
结果

提高消费者的相应时间:选择合适的策略

  • 在Disruptor 环形缓冲区内,提供了几种策略 。策略如下
    1. BlockingWaitStrategy : 这是默认的策略。 该策略是使用锁和条件进行数据的监控和线程的唤醒 ,因为涉及到线程的切换, 所以该策略是最节省CPU的,但是高并发性能上时最糟糕的。
    2. SleepingWaitStrategy :该策略 回来循环中不断的等待数据,先自旋等待,如果不成功,则使用yield () 让出cpu ,并且会进行休眠。 保证不占用太多的cpu资源。 这个策略适用于 延时 不是特别高的场合,好处是他对生产者的线程影响 最小, 适合场景是异步日志。
    3. YieldWaitStrategy : 适用于低延时的场合。 消费者线程不断的循环将空缓冲的变化。在循环内部,使用yield 让出cpu 给别的线程执行时间, 如果需要一个高性能的系统,并且对延时有较高的要求,那么适合该策略。
    4. BusySpinWaitStrategy : 死循环策略。 消费者线程区域会疯狂的监控缓冲区的变化。 并且 cpu数量必须大于消费者的线程数量。

相关文章

  • java初入多线程17

    使用Disruptor 实现消费者和生产者 我们在主方法操作中将缓冲区设置成1024 , 在这里有四个消费者, 有...

  • java初入多线程4

    线程中断 概念 :让目标线程停止执行,但是是高知目标线程希望线退出,具体退出由目标线程自己决定。 相关的方法,暂时...

  • java初入多线程11

    核心线程池的内部实现机制。 阿里巴巴 code检验推荐自己实现线程池的创建。不是使用Executors的创建方法。...

  • java初入多线程12

    自定义线程创建:ThreadFactory 我们原先用的线程池ThreadPoolExecutor 里面的线程都...

  • java初入多线程10

    线程阻塞工具类 :LockSupport LockSupport 是一个非常实用的线程阻塞工具, 可以在线程内任意...

  • java初入多线程7

    同步控制 synchronized 扩展:重入锁 重入锁来代替synchronized,在Jdk1.6以后 syn...

  • java初入多线程14

    接下来几章说的是锁的优化和注意事项问题。 减小锁持有的时间 对于在方法执行的过程中有的步骤不需要进行同步,那么就在...

  • java初入多线程15

    无锁的线程安全整数: AtomicInteger 方法介绍public final int get(); 取得当前...

  • java初入多线程13

    并发集合简介 ConcurrentHashMap : 线程安全的HashMap; CopyOnWriteArray...

  • java初入多线程16

    并行模式与算法 单例模式 : 保证在系统中只生产一个实例。下面是几种单例模式。 这个容易出现的问题就是单例什么时候...

网友评论

    本文标题:java初入多线程17

    本文链接:https://www.haomeiwen.com/subject/kttdpxtx.html