美文网首页
spring boot + disruptor 入门使用

spring boot + disruptor 入门使用

作者: cifer_pan | 来源:发表于2024-08-13 21:22 被阅读0次

1. jar 导入

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>

2.实现代码

2.1 消息体


import lombok.Data;

@Data
public class Message {
    private String data;
}

2.1 生产者


import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;


@Component
@RequiredArgsConstructor
public class Producer {

    private final Disruptor disruptor;

    /**
     * 发送数据
     *
     * @param data 数据
     */
    public void send(String data) {
        RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
        // 获取可以生成的位置
        long next = ringBuffer.next();
        try {
            Message msg = ringBuffer.get(next);
            msg.setData(data);
        } finally {
            ringBuffer.publish(next);
        }
    }
}

2.2 继承WorkHandler 消费者

import com.lmax.disruptor.WorkHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.disruptor.Message;

/**
 * WorkHandler模式消费者
 */
@Slf4j
@RequiredArgsConstructor
public class WorkConsumer implements WorkHandler<Message> {
    /**
     * WorkHandler编号
     */
    private final Integer number;

    /**
     * WorkHandler消费:每个生产者生产的数据只能被一个消费者消费
     *
     * @param message 消息
     */
    @Override
    public void onEvent(Message message) {
        log.info("work 接收到了消息 编号 : {}, message: {}", number, message);
    }
}

2.2 继承EventHandler 消费者

import com.lmax.disruptor.EventHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.disruptor.Message;

/**
 * Event模式消费者
 */
@Slf4j
@RequiredArgsConstructor
public class EventConsumer implements EventHandler<Message> {

    /**
     * Event编号
     */
    private final Integer number;

    /**
     * Event消费:每个消费者重复消费生产者生产的数据
     *
     * @param message    消息
     * @param sequence   当前序列号
     * @param endOfBatch 批次结束标识(常用于将多个消费着的数据依次组合到最后一个消费者统一处理)
     */
    @Override
    public void onEvent(Message message, long sequence, boolean endOfBatch) {
        log.info("Repeat 接收到了消息 编号 : {}, message: {}, curr sequence: {}, is end: {}",
                number, message, sequence, endOfBatch);
    }

}

2.3 Disruptor 容器初始化

import com.lmax.disruptor.dsl.Disruptor;
import org.example.disruptor.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;

@Configuration
public class ConsumerConfig {

    @Bean
    public Disruptor<Message> disruptor() {
        int bufferSize = 1024 * 1024; // 设置缓冲区大小
        Disruptor<Message> disruptor = new Disruptor(Message::new, bufferSize, Executors.defaultThreadFactory());
        // Work消费
//        WorkConsumer w1 = new WorkConsumer(1);
//        WorkConsumer w2 = new WorkConsumer(2);
//        WorkConsumer w3 = new WorkConsumer(3);
//        WorkConsumer w4 = new WorkConsumer(4);
//        disruptor.handleEventsWithWorkerPool(w1, w2, w3, w4);

        //  Event消费
//        EventConsumer a = new EventConsumer(1);
//        EventConsumer b = new EventConsumer(2);
//        disruptor.handleEventsWith(a, b);



        // 链路式 消费 a -> b -> c
        EventConsumer a = new EventConsumer(1);
        EventConsumer b = new EventConsumer(2);
        EventConsumer c = new EventConsumer(3);
        disruptor.handleEventsWith(a).then(b).then(c);




        disruptor.start();
        return disruptor;
    }

}

2.4 Controller

import org.example.disruptor.Producer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class DisruptorController {


    @Resource
    private Producer producer;
 

    @GetMapping("/send")
    public String add(String message) throws Exception {
        for (int i = 0; i < 2; i++) {
            producer.send(message + i);
        }
        return "success";
    }

}

相关文章

网友评论

      本文标题:spring boot + disruptor 入门使用

      本文链接:https://www.haomeiwen.com/subject/uxvjkjtx.html