美文网首页SpringBoot
SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ

作者: WebGiser | 来源:发表于2023-01-04 19:07 被阅读0次

参考文章:https://blog.csdn.net/qq_35387940/article/details/100514134

Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:

*  (星号) 用来表示一个单词 (必须出现的)
#  (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 *.TT.*          队列Q2绑定键为  TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,这几个该篇暂不做讲述。

项目结构

image.png

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wzf</groupId>
    <artifactId>websocket</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>websocket</name>
    <description>websocket</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

spring.application.name=websocket-test
server.port=8888

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

测试控制类RabbitController.java

package com.wzf.websocket.controller;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDirectMsg")
    public String sendDirectMsg(){
        Map<String, Object> msg = new HashMap<>();
        msg.put("id", UUID.randomUUID().toString());
        msg.put("data", "sendDirectMsg");
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", msg);
        return "OK";
    }

    @GetMapping("/sendTopicMsg")
    public String sendTopicMsg(){
        Map<String, Object> msg = new HashMap<>();
        msg.put("id", UUID.randomUUID().toString());
        msg.put("data", "sendTopicMsg: man");
        rabbitTemplate.convertAndSend("TestTopicExchange", "topic.man", msg);
        return "OK";
    }

    @GetMapping("/sendTopicMsg2")
    public String sendTopicMsg2(){
        Map<String, Object> msg = new HashMap<>();
        msg.put("id", UUID.randomUUID().toString());
        msg.put("data", "sendTopicMsg: woman");
        rabbitTemplate.convertAndSend("TestTopicExchange", "topic.#", msg);
        return "OK";
    }

    @GetMapping("/sendFanoutMsg")
    public String sendFanoutMsg(){
        Map<String, Object> msg = new HashMap<>();
        msg.put("id", UUID.randomUUID().toString());
        msg.put("data", "sendFanoutMsg");
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, msg);
        return "OK";
    }
}

直连型交换机

DirectRabbitConfig.java
package com.wzf.websocket.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 直连型交换机
 */
@Configuration
public class DirectRabbitConfig {

    // 队列起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue(){
        return new Queue("TestDirectQueue", true);
    }

    // 交换机起名:TestDirectExchange
    @Bean
    public DirectExchange TestDirectExchange(){
        return new DirectExchange("TestDirectExchange", true, false);
    }

    // 绑定:将队列和交换机绑定,并设置匹配键 TestDirectRouting
    @Bean
    public Binding  bindingDirect(){
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

    @Bean
    public DirectExchange lonelyDirectExchange(){
        return new DirectExchange("lonelyDirectExchange");
    }

}
DirectReceiver.java
package com.wzf.websocket.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 直连交换机是采用轮询的方式对消息进行消费,而且不存在重复消费
 */
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("DirectReceiver接收到的消息:" + msg.toString());
    }
}

DirectReceiver2.java
package com.wzf.websocket.component;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 直连交换机是采用轮询的方式对消息进行消费,而且不存在重复消费
 */
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver2 {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("DirectReceiver2接收到的消息:" + msg.toString());
    }
}
效果
image.png

主题交换机

TopicRabbitConfig.java
package com.wzf.websocket.config;


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 主题型交换机
 */
@Configuration
public class TopicRabbitConfig {


    @Bean
    public Queue manTopicQueue(){
        return new Queue("topic.man", true);
    }

    @Bean
    public Queue womanTopicQueue(){
        return new Queue("topic.woman", true);
    }


    @Bean
    public TopicExchange TestTopicExchange(){
        return new TopicExchange("TestTopicExchange", true, false);
    }


    @Bean
    public Binding  bindingMan(){
        return BindingBuilder.bind(manTopicQueue()).to(TestTopicExchange()).with("topic.man");
    }

    @Bean
    public Binding  bindingTotal(){
        return BindingBuilder.bind(womanTopicQueue()).to(TestTopicExchange()).with("topic.#");
    }
}
TopicManReceiver.java
package com.wzf.websocket.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 主题交换机
 */
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("TopicManReceiver接收到的消息:" + msg.toString());
    }

    @RabbitHandler
    public void process2(String msg) {
        System.out.println("TopicManReceiver接收到的消息:" + msg);
    }
}
TopicTotalReceiver.java
package com.wzf.websocket.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 主题交换机
 */
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("TopicTotalReceiver接收到的消息:" + msg.toString());
    }

    @RabbitHandler
    public void process2(String msg) {
        System.out.println("TopicTotalReceiver接收到的消息:" + msg);
    }
}
效果
image.png

扇形交换机

FanoutRabbitConfig.java
package com.wzf.websocket.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 扇形交换机
 */
@Configuration
public class FanoutRabbitConfig {
    @Bean
    public Queue aFanoutQueue() {
        return new Queue("fanout.A", true);
    }

    @Bean
    public Queue bFanoutQueue() {
        return new Queue("fanout.B", true);
    }

    @Bean
    public Queue websocketFanoutQueue() {
        return new Queue("fanout.websocket", true);
    }

    @Bean
    public FanoutExchange TestFanoutExchange() {
        return new FanoutExchange("TestFanoutExchange", true, false);
    }

    @Bean
    public Binding bindingFanoutA() {
        return BindingBuilder.bind(aFanoutQueue()).to(TestFanoutExchange());
    }

    @Bean
    public Binding bindingFanoutB() {
        return BindingBuilder.bind(bFanoutQueue()).to(TestFanoutExchange());
    }

    @Bean
    public Binding bindingFanoutWebsocket() {
        return BindingBuilder.bind(websocketFanoutQueue()).to(TestFanoutExchange());
    }
}
FanoutReceiverA.java
package com.wzf.websocket.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 扇形交换机
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("FanoutReceiverA接收到的消息:" + msg.toString());
    }

    @RabbitHandler
    public void process2(String msg) {
        System.out.println("FanoutReceiverA接收到的消息:" + msg);
    }
}
FanoutReceiverB.java
package com.wzf.websocket.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 扇形交换机
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("FanoutReceiverB接收到的消息:" + msg.toString());
    }

    @RabbitHandler
    public void process2(String msg) {
        System.out.println("FanoutReceiverB接收到的消息:" + msg);
    }
}
效果
image.png

websocket

WebSocketConfig.java
package com.wzf.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
WebSocketServer.java
package com.wzf.websocket.component;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@Slf4j
@Component
@ServerEndpoint("/websocket/{sid}")
public class WebSocketServer {

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    //接收sid
    private String sid="";

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("sid") String sid) throws InterruptedException {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        log.info("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
        this.sid=sid;
        try {
            //模拟服务器向客户端发消息
            sendMessage("服务器:恭喜你,连接成功");
        } catch (IOException e) {
            log.error("websocket IO异常");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到来自窗口"+sid+"的信息:"+message);
//        for (WebSocketServer item : webSocketSet) {
//            try {
//                item.sendMessage(message);
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
//        }
    }

    /**
     *连接发生错误时
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        if(this.session != null){
            this.session.getBasicRemote().sendText(message);
        }
    }


    /**
     * 实现服务器主动向某一客户端推送消息
     * */
    public void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
        log.info("推送消息到窗口"+sid+",推送内容:"+message);
        for (WebSocketServer item : webSocketSet) {
            try {
                if(sid == null){
                    item.sendMessage(message);
                }else if(item.sid.equals(sid)){
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}
WebsocketReceiver.java
package com.wzf.websocket.component;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * websocket消费消息对垒的消息
 */
@Component
@RabbitListener(queues = "fanout.websocket")
public class WebsocketReceiver {

    @Autowired
    private WebSocketServer webSocketServer;

    @RabbitHandler
    public void process(Map msg) throws IOException {
        if(msg.get("sid") != null){
            webSocketServer.sendInfo(msg.toString(), msg.get("sid").toString());
        }else{
            webSocketServer.sendInfo(msg.toString(), null);
        }
    }

    @RabbitHandler
    public void process2(String msg) throws IOException {
        webSocketServer.sendInfo(msg, null);
    }
}
前端
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>websocket</title>
</head>
<body>
    <script src="./Stomp.js"></script>
        <script>
            // 通过java websocket消费消息
            // let url = "ws://127.0.0.1:8888/websocket/1";
            // let socket = new WebSocket(url);
            // socket.onopen = function (){
            //  console.log("连接成功!");
            //  socket.send("客户端发送的消息");
            // };
            // socket.onmessage = function (msg){
            //  console.log(msg.data);
            // };
            // socket.onerror = function (error){
            //  console.log(error);
            // };

            // 直接消费rabbitmq的消息
            let url = "ws://127.0.0.1:15674/ws";
            let client = Stomp.client(url);
            client.connect("admin","123456",function(){
                console.log("连接成功!");
                // 直接监听某个队列消息:/amq/queue/queue名称
                let sub = client.subscribe("/amq/queue/TestDirectQueue",function(message){
                    console.log("TestDirectQueue接收到消息:", message);
                })
                // 直连交换机: /exchange/exchange名称/routingKey
                let sub1 = client.subscribe("/exchange/TestDirectExchange/TestDirectRouting",function(message){
                    console.log("TestDirectExchange接收到消息:", message);
                })
                // 主题交换机: /exchange/exchange名称/routingKey
                let sub2 = client.subscribe("/exchange/TestTopicExchange/topic.man",function(message){
                    console.log("TestTopicExchange接收到消息:", message);
                })
                // 扇形交换机: /exchange/exchange名称/routingKey
                let sub3 = client.subscribe("/exchange/TestFanoutExchange",function(message){
                    console.log("TestFanoutExchange接收到消息:", message);
                })
            }, function(error){
                console.log(error);
            });
        </script>
</body>
</html>
效果
image.png
image.png

相关文章

网友评论

    本文标题:SpringBoot整合RabbitMQ

    本文链接:https://www.haomeiwen.com/subject/yefdcdtx.html