简介
RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系 统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
rabbitMQ特性
安装
https://www.rabbitmq.com/download.html
各种环境官方文档很详细
案例
Hello World
本案例是从消息生产者发送数据到rabbitMQ rabbitMQ把消息转发给消费者 代码如下
package com.arlley.rabbitMQ.hello;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.basicConsume("Hello", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
System.in.read();
channel.close();
conn.close();
}
}
package com.arlley.rabbitMQ.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("Hello", false, false, false, null);
channel.basicPublish("", "Hello", null, "Hello World My Rabbit MQ".getBytes());
System.out.println("发送消息到MQ-------------");
channel.close();
connection.close();
}
}
Work Queues
Work Queue 是生产者发送消息, 由多个消费者共同消费。
package com.arlley.rabbitMQ.workQueue;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueueConsumer implements Runnable{
private static ConnectionFactory factory = new ConnectionFactory();
private static Connection connection;
static {
try {
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
//channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Thread thread1 = new Thread(new WorkQueueConsumer());
Thread thread2 = new Thread(new WorkQueueConsumer());
thread1.setName("thread1");
thread2.setName("thread2");
thread1.start();
thread2.start();
System.in.read();
}
@Override
public void run() {
try {
final String name = Thread.currentThread().getName();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume("workQueue", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("成功收到消息"+ name);
System.out.println(new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
channel.basicCancel(consumerTag);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.arlley.rabbitMQ.workQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueueProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("workQueue", false, false, false, null);
for(int i=0;i<1000;i++){
String message = ("workQueue"+i);
System.out.println("发送消息:"+message+"------------");
channel.basicPublish("", "workQueue", null, message.getBytes());
}
channel.close();
connection.close();
}
}
Publish/Subscribe
这是一种发布订阅模式 使用的是faout的交换机
package com.arlley.rabbitMQ.pubsub;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubConsumer implements Runnable{
private static ConnectionFactory factory = new ConnectionFactory();
private static Connection connection = null;
static {
try {
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
//channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try{
final Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
final String threadName = Thread.currentThread().getName();
channel.queueBind(queueName, "pub", "");
channel.basicConsume(queueName, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(threadName+"成功收到消息----------");
System.out.println(new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, consumerTag -> {});
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
for (int i=0; i<10; i++){
Thread thread = new Thread(new PubSubConsumer());
thread.setName("t"+i);
thread.start();
}
System.out.println("按任意键退出!");
System.in.read();
}
}
package com.arlley.rabbitMQ.pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("pub", "fanout");
for(int i=0;i<1000;i++) {
channel.basicPublish("pub", "", null, ("pubSub模式"+i).getBytes());
}
channel.close();
conn.close();
}
}
Routing
这是一个可以将特定一些消息发送到特定的队列,是广播消息的增强。
package com.arlley.rabbitMQ.route;
import com.arlley.rabbitMQ.pubsub.PubSubConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RouteConsumer implements Runnable{
private static ConnectionFactory factory = new ConnectionFactory();
private static Connection connection = null;
private String routeKey;
static {
try {
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
//channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public RouteConsumer(String routeKey){
this.routeKey = routeKey;
}
@Override
public void run() {
try{
final Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
final String threadName = Thread.currentThread().getName();
channel.queueBind(queueName, "route", routeKey);
channel.basicConsume(queueName, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(threadName+"成功收到消息----------");
System.out.println(new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, consumerTag -> {});
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
for (int i=0; i<5; i++){
Thread thread = new Thread(new RouteConsumer("my"));
thread.setName("my"+i);
thread.start();
}
for (int i=0; i<5; i++){
Thread thread = new Thread(new RouteConsumer("notMy"));
thread.setName("notMy"+i);
thread.start();
}
}
}
package com.arlley.rabbitMQ.route;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RouteProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("route", "direct");
channel.basicPublish("route", "my", null, "my".getBytes());
channel.basicPublish("route", "notMy", null, "notMy".getBytes());
channel.close();
conn.close();
}
}
Topics
是route的增强 提供了正则表达式的路由
package com.arlley.rabbitMQ.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumer implements Runnable{
private static ConnectionFactory factory = new ConnectionFactory();
private static Connection connection = null;
private String routeKey;
static {
try {
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
//channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public TopicConsumer(String routeKey){
this.routeKey = routeKey;
}
@Override
public void run() {
try{
final Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
final String threadName = Thread.currentThread().getName();
channel.queueBind(queueName, "topic", routeKey);
channel.basicConsume(queueName, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(threadName+"成功收到消息----------");
System.out.println(new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, consumerTag -> {});
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
Thread thread = new Thread(new TopicConsumer("lazy.#"));
thread.setName("t1");
thread.start();
Thread thread1 = new Thread(new TopicConsumer("lazy.all.#"));
thread1.setName("t2");
thread1.start();
}
}
package com.arlley.rabbitMQ.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("topic", "topic");
channel.basicPublish("topic","lazy.all.3", null, "topicAll".getBytes());
channel.basicPublish("topic","lazy.eee", null, "topicE".getBytes());
channel.close();
conn.close();
}
}
rpc
package com.arlley.rabbitMQ.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RpcCaller {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("rpc", "direct");
String queueName = channel.queueDeclare().getQueue();
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().replyTo("replyTo").correlationId(correlationId).build();
channel.basicPublish("rpc", "rpcCall", props, "1".getBytes());
BlockingQueue<String> reponse = new ArrayBlockingQueue<String>(1);
channel.basicConsume("replyTo", false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到回调结果");
if(correlationId.equals(message.getProperties().getCorrelationId())){
String result = new String(message.getBody(), "UTF-8");
System.out.println(result);
reponse.offer(result);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}else{
//收到的消息不是当前客户端需要的
channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
}
}
}, consumerTag -> {});
String result = reponse.take();
channel.close();
conn.close();
}
}
package com.arlley.rabbitMQ.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RpcConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "rpc", "rpcCall");
channel.basicConsume(queueName, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
String replyTo = properties.getReplyTo();
channel.queueDeclare(replyTo, false, false, true, null);
channel.queueBind(replyTo, "rpc", "reply");
channel.basicPublish("rpc", "reply", properties, "2".getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.in.read();
channel.close();
conn.close();
}
}
网友评论