美文网首页
3.rocketmq

3.rocketmq

作者: _少年不知愁 | 来源:发表于2020-09-24 20:20 被阅读0次

1.mq

消息中间件

2.特点

异步处理

流量肖锋填谷

解耦微服务

3.快速搭建单机模式

下载地址https://rocketmq.apache.org/dowloading/releases/

1.启动
nohup mqnamesrv &
2查看日志
logs/rocketmqlogs/namesrv.log
3.启动broker
nohup mqbroker -n localhost:9876 &
4.查看日志
logs/rocketmqlogs/broker.log 

控制台搭建

1.下载
https://github.com/apache/rocketmq-externals
进入rocketmq-console
的application.properties
rocketmq.config.namesrvAddr=localhost:9876
rocketmq.config.isVIPChannel=false
2. mvn clean package -D skipTests
找到target的jar
然后直接
java -jar console-rocketmq.jar --server.port=8818 &

然后打开http://192.168.1.102:8080/ 

3.1 代码quick start

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

编写生产者

配置

rocketmq:
  name-server: 192.168.0.109:9876
  producer:
    group: test-group

代码

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping
    public void test() {
        ShareDTO dto = new ShareDTO();
        dto.setAuditStatus("youyou");
        dto.setAuthor("summit");
        rocketMQTemplate.convertAndSend("my-topic", dto);
    }

编写消费者

配置

rocketmq:
  name-server: 192.168.0.109:9876

代码

@Service
@RocketMQMessageListener(consumerGroup = "consumer-study", topic = "my-topic")
@Slf4j
public class RocketMqLister implements RocketMQListener<ShareDTO> {

    @Override
    public void onMessage(ShareDTO shareDTO) {

        log.info("message ======= {},{}", shareDTO.getAuthor(), shareDTO.getAuditStatus());
    }
}

4.spring cloud stream

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>0.9.0.RELEASE</version>
        </dependency>

编写生产者

    stream:
      rocketmq:
        binder:
          name-server: 192.168.0.109:9876
      bindings:
        output:
          destination: my-str-test-topic

加注解

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableBinding(Source.class)
public class ContentApplication {

    public static void main(String[] args) {

        SpringApplication.run(ContentApplication.class, args);
    }
}

发送消息

    @Autowired
    private Source source;

    @GetMapping("/send")
    public String send() {
        System.out.println("send =====================");
        source.output().send(MessageBuilder.withPayload("message ti").build());
        return "send ok";
    }

编写消费者

配置文件

    stream:
      rocketmq:
        binder:
          name-server: 192.168.0.109:9876
      bindings:
        input:
          destination: my-str-test-topic
          group: my-steam-test

加注解

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class UserApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }
}
@Service
public class TestStreamLister {

    @StreamListener(Sink.INPUT)
    public void recive(String message) {
        System.out.println("=========" + message);
    }
}

5.自定义生产者消费者

生产者

my_output:
  destination: my-custom-topic
public interface CustomOutput {

    String MY_OUTPUT = "my_output";

    @Output(MY_OUTPUT)
    MessageChannel output();
}
@EnableBinding({Source.class, CustomOutput.class})
@Autowired
private CustomOutput customOutput;

@GetMapping("/send2")
public Result<String> send2() {
    System.out.println("send =====================");
    customOutput.output().send(MessageBuilder.withPayload("message ti").build());
    return Result.success("send ok");
}

消费者

my-input:
  destination: my-custom-topic
  group: my-custom-group
public interface CustomInput {
    String MY_INPUT = "my-input";

    @Input(MY_INPUT)
    SubscribableChannel input();
}
@Service
@RocketMQMessageListener(consumerGroup = "my-custom-group", topic = "my-custom-topic")
@Slf4j
public class CustomRocketMqLister implements RocketMQListener<String> {

    @Override
    public void onMessage(String dto) {

        log.info("custom input message ======= {}", dto);
    }
}
@EnableBinding({Sink.class, CustomInput.class})

相关文章

  • 3.rocketmq

    1.mq 消息中间件 2.特点 异步处理 流量肖锋填谷 解耦微服务 3.快速搭建单机模式 下载地址https://...

  • 2021-08-05

    1.knife4j 字符串替换不区分大小写:“(?i)” 3.RocketMq创建消费者不成功,但dashBoar...

网友评论

      本文标题:3.rocketmq

      本文链接:https://www.haomeiwen.com/subject/xwutuktx.html