高性能队列disruptor

作者: 划水者 | 来源:发表于2018-07-22 17:01 被阅读0次

disruptor是一个高性能的内存队列,之所以高性能,因为有以下几个特点:

1 整个disruptor的实现在并发处理中没有使用锁,而是使用的cas操作(disruptor被称为无锁队列的原因)

2 disruptor的内部实现采用循环数组,这样可以避免jvm频繁回收

3 解决了伪共享问题,加速了不同线程同时访问一个缓存行

disruptor的使用场景:

disruptor的内部的设计是生产者和消费者原理,目前log4j2的异步日志就是基于disruptor实现的,还有很多开源项目storm也会依赖disruptor,先来一波demo

package com.guoxiong.disruptor;

/**

* 2018/7/15 下午2:42

*

* @author Jungler

* @since

*/

public class MyData {

private int id;

  private Stringvalue;

  public MyData(int id, String value) {

this.id = id;

      this.value = value;

  }

public int getId() {

return id;

  }

public void setId(int id) {

this.id = id;

  }

public StringgetValue() {

return value;

  }

public void setValue(String value) {

this.value = value;

  }

@Override

  public StringtoString() {

return "MyData{" +

"id=" +id +

", value='" +value +'\'' +

'}';

  }

}


package com.guoxiong.disruptor;

/**

* 2018/7/15 下午2:42

*

* @author Jungler

* @since

*/

public class MyDataEvent {

public MyDataEvent(){

}

private MyDatadata;

  public MyDatagetData() {

return data;

  }

public void setData(MyData data) {

this.data = data;

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.EventFactory;

/**

* 2018/7/15 下午2:43

*

* @author Jungler

* @since

*/

public class MyDataEventFactoryimplements EventFactory {

@Override

  public MyDataEventnewInstance() {

return new MyDataEvent();

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.*;

/**

* 2018/7/22 下午3:18

*

* 消费者处理

*

* @author Jungler

* @since

*/

public class MsgBatchConsumerimplements EventHandler {

private Stringname;

  public MsgBatchConsumer(String name){

this.name = name;

  }

@Override

  public void onEvent(MyDataEvent myDataEvent, long l, boolean b)throws Exception {

System.out.println("name = " +name +" data = " + myDataEvent.getData().toString());

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.WorkHandler;

/**

* 2018/7/22 下午3:42

*

* @author Jungler

* @since

*/

public class MsgWorkConsumerimplements WorkHandler {

private Stringname;

  public MsgWorkConsumer(String name){

this.name = name;

  }

@Override

  public void onEvent(MyDataEvent myDataEvent)throws Exception {

System.out.println("name = " +name +" data = " + myDataEvent.getData().toString());

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.List;

/**

* 2018/7/22 下午3:22

*

* @author Jungler

* @since

*/

public class MsgProducer {

private Disruptordisruptor;

  public MsgProducer(Disruptor disruptor){

this.disruptor = disruptor;

  }

public void send(MyData data){

RingBuffer ringBuffer =this.disruptor.getRingBuffer();

      long next = ringBuffer.next();

      try{

MyDataEvent event = ringBuffer.get(next);

        event.setData(data);

      }finally {

if(next ==5){

return;

        }

ringBuffer.publish(next);

      }

}

public void send(List dataList){

for(MyData data : dataList){

this.send(data);

      }

}

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午3:25

*

* 单生产者,多消费者,每一个消费者消费全部的数据

*

* @author Jungler

* @since

*/

public class DisruptorDemo1 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());

      //定义消费者

      MsgBatchConsumer msg1 =new MsgBatchConsumer("1");

      MsgBatchConsumer msg2 =new MsgBatchConsumer("2");

      MsgBatchConsumer msg3 =new MsgBatchConsumer("3");

      disruptor.handleEventsWith(msg1, msg2, msg3);

      disruptor.start();

      // 定义要发送的数据

      MsgProducer msgProducer =new MsgProducer(disruptor);

      List myDataList =new ArrayList();

      myDataList.add(new MyData(2,"2222"));

      myDataList.add(new MyData(3,"3333"));

      myDataList.add(new MyData(1,"1111"));

      myDataList.add(new MyData(4,"4444"));

      myDataList.add(new MyData(5,"5555"));

      msgProducer.send(myDataList);

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午3:45

*

* 单生产者,多消费者,分组消费(每一个分组合并消费全部数据)

*

* @author Jungler

* @since

*/

public class DisruptorDemo2 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 16, Executors.defaultThreadFactory());

      MsgWorkConsumer consumer1 =new MsgWorkConsumer("aa");

      //MsgWorkConsumer consumer2 = new MsgWorkConsumer("bb");

//MsgWorkConsumer consumer3 = new MsgWorkConsumer("cc");

//MsgWorkConsumer consumer4 = new MsgWorkConsumer("dd");

      disruptor.handleEventsWithWorkerPool(consumer1);

      //disruptor.handleEventsWithWorkerPool(consumer3,consumer4);

      disruptor.start();

      MsgProducer msgProducer1 =new MsgProducer(disruptor);

      MsgProducer msgProducer2 =new MsgProducer(disruptor);

      List myDataList1 =new ArrayList();

      List myDataList2 =new ArrayList();

      for(int i =1; i <6; i++){

myDataList1.add(new MyData(i,"data" + i));

      }

for(int i =6; i <11; i++){

myDataList2.add(new MyData(i,"data" + i));

      }

msgProducer1.send(myDataList1);

      msgProducer2.send(myDataList2);

      System.out.println(disruptor.getRingBuffer());

      try {

Thread.sleep(5000);

      }catch (Exception e){

}

disruptor.getRingBuffer().publish(5);

      System.out.println(disruptor.getRingBuffer());

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午4:05

*

* 多个消费者顺序消费

*

* @author Jungler

* @since

*/

public class DisruptorDemo3 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());

      //定义消费者

      MsgBatchConsumer msg1 =new MsgBatchConsumer("1");

      MsgBatchConsumer msg2 =new MsgBatchConsumer("2");

      MsgBatchConsumer msg3 =new MsgBatchConsumer("3");

      disruptor.handleEventsWith(msg1, msg3).then(msg2);

      disruptor.start();

      // 定义要发送的数据

      MsgProducer msgProducer =new MsgProducer(disruptor);

      List myDataList =new ArrayList();

      myDataList.add(new MyData(2,"2222"));

      myDataList.add(new MyData(3,"3333"));

      myDataList.add(new MyData(1,"1111"));

      myDataList.add(new MyData(4,"4444"));

      myDataList.add(new MyData(5,"5555"));

      msgProducer.send(myDataList);

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午4:13

*

* @author Jungler

* @since

*/

public class DisruptorDemo4 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());

      //定义消费者

      MsgBatchConsumer msg1 =new MsgBatchConsumer("1");

      MsgBatchConsumer msg2 =new MsgBatchConsumer("2");

      MsgBatchConsumer msg3 =new MsgBatchConsumer("3");

      MsgBatchConsumer msg4 =new MsgBatchConsumer("4");

      MsgBatchConsumer msg5 =new MsgBatchConsumer("5");

      disruptor.handleEventsWith(msg1, msg3);

      disruptor.handleEventsWith(msg2, msg4);

      disruptor.after(msg3,msg4).handleEventsWith(msg5);

      disruptor.start();

      // 定义要发送的数据

      MsgProducer msgProducer =new MsgProducer(disruptor);

      List myDataList =new ArrayList();

      myDataList.add(new MyData(2,"2222"));

      myDataList.add(new MyData(3,"3333"));

      myDataList.add(new MyData(1,"1111"));

      myDataList.add(new MyData(4,"4444"));

      myDataList.add(new MyData(5,"5555"));

      msgProducer.send(myDataList);

  }

}

相关文章

  • 并发相关

    并发相关 JAVA高性能内存队列-disruptor JAVA内置队列 高性能内存队列-disruptor dis...

  • 高性能队列Disruptor

    高性能队列Disruptor 有界队列:ArrayBlockingQueue 和 LinkedBlockingQu...

  • Storm、Log4j2高性能之—Disruptor队列

    Storm、Log4j2高性能之—Disruptor队列 1. Disruptor简介 Disruptor(htt...

  • 高性能队列disruptor

    disruptor是一个高性能的内存队列,之所以高性能,因为有以下几个特点: 1 整个disruptor的实现在并...

  • Disruptor入门

    Disruptor使用 Disruptor是LMAX公司开源的一款高性能的多线程通信库。Java的队列在高并发场景...

  • 高性能队列——Disruptor

    转载自https://tech.meituan.com/disruptor.html Disruptor是英国外汇...

  • Disruptor简介

    Disruptor是什么 Disruptor是一个由英国外汇交易公司LMAX开源的Java高性能队列,它能够以很低...

  • 算法实战3 - 高性能队列的实现思路

    本章关键词 高性能队列、并发、线程安全 Disruptor 是一个高性能的内存消息队列,用于不同线程之间的通信。为...

  • 【转】高性能队列Disruptor

    Java内置队列 介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列...

  • 高性能队列——Disruptor总论

    1 背景 这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列Disruptor特性限于3.3.4...

网友评论

    本文标题:高性能队列disruptor

    本文链接:https://www.haomeiwen.com/subject/ilgppftx.html