使用rabbitmq, pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建队列:
package com.secondkill.messagequeue.rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangkai12 on 2018/7/9.
*/
@Configuration
public class RabbitMqConfig {
@Bean
public Queue createQueue() {
return new Queue("hello_topic");
}
}
发送数据
package com.secondkill.messagequeue.rabbitmq;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* Created by zhangkai12 on 2018/7/6.
*/
@Service
public class RabbitMqSender {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Autowired
public RabbitMqSender(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
public void sendMsg(String topic, String msg) {
amqpTemplate.convertAndSend(topic, msg);
}
}
接收数据
package com.secondkill.messagequeue.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* Created by zhangkai12 on 2018/7/6.
*/
@Service
@RabbitListener(queues = "someQueue")
public class RabbitMqReceiver {
@RabbitHandler
public void processMessage(String content) {
System.out.println("consumer content " + content);
}
}
test类:
package com.secondkill.messagequeue.rabbitmq;
import com.secondkill.SecondkillApplicationTests;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.Assert.*;
/**
* Created by zhangkai12 on 2018/7/6.
*/
public class RabbitMqSenderTest extends SecondkillApplicationTests {
@Autowired
RabbitMqSender rabbitMqSender;
@Test
public void sendMsg() throws Exception {
for (int i = 0; i < 100; i++) {
System.out.println("send to server " + i);
rabbitMqSender.sendMsg("someQueue", " " + i);
Thread.sleep(100);
}
}
@Test
public void helloTopicTest() {
for (int i = 0; i < 100; i++) {
System.out.println("send to server " + i);
rabbitMqSender.sendMsg("hello_topic", " " + i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果:
image.png
网友评论