美文网首页从零学Java笔录凯哥Java-工作总结
RabbitMQ学习系列教程三:快速入门

RabbitMQ学习系列教程三:快速入门

作者: 凯哥Java | 来源:发表于2019-07-12 09:15 被阅读3次

快速入门:消息的生产者和消费者

生产者的代码步骤:

1:获取到连接的工厂 ConnectionFactory

2:从工厂中获取到一个连接:connection

3:重建一个数据通信的通道,可以发送和接收消息对象:channel

4:通过channel发送消息

5:关闭流

代码编写:

public class Procuder {

public static void main(String[] args) throws IOException, TimeoutException {

//1:创建一个connectioFactory工厂对象,并进行配置

ConnectionFactory connectionFactory = new ConnectionFactory();

//设置ip 端口 vhost等

connectionFactory.setHost("192.168.31.128");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

//2:通过工厂对象获取到connection对象

Connection connection = connectionFactory.newConnection();

//3:通过connection对象获取到一个消息通信的通道 channel

Channel channel = connection.createChannel();

// 4:通过channel发送数据

/**

* 参数说明:

* exchange: 数据路由 routingKey: props: 消息描述body:消息体。字节数组

*/

for(int i = 0;i<5;i++){

String mst = "hi Rabbit mq!"+i;

channel.basicPublish("","mytest001",null,mst.getBytes());

}

System.out.println("===>>>生产者发送消息完成。。。");

//5:关闭流

channel.close();

connection.close();

}

}

消费者的代码步骤:

前三步是一样的。

4:声明一个队列

5:创建一个消费者

6:设置channel

7:获取消息

代码如下:

public class Consumer {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

//1:创建一个connectioFactory工厂对象,并进行配置

ConnectionFactory connectionFactory = new ConnectionFactory();

//设置ip 端口 vhost等

connectionFactory.setHost("192.168.31.128");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

//2:通过工厂对象获取到connection对象

Connection connection = connectionFactory.newConnection();

//3:公共connection对象获取到一个消息通信的通道 channel

Channel channel = connection.createChannel();

//4:声明(创建)一个队列

String queueName = "mytest001"; //这里可以使用routingKey

/**

*  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,

Map arguments) throws IOException;

*/

channel.queueDeclare(queueName,true,false,false,null);

//5:创建消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

//6:设置channel

//    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

/*

* 参数说明:  queue:队列名称 autoAck:是否自动签收 consumer:消费者对象

*/

channel.basicConsume(queueName,true,consumer);

System.out.println("===>>消费者开始处理消息。");

while(true){

//7 获取消息

QueueingConsumer.Delivery delivery =  consumer.nextDelivery();;

String msg = new String(delivery.getBody());

System.err.println("消费端: " + msg);

}

}

}

运行测试:

生产者:

启动消费者:

接着我们通过浏览器查看管理页面:

查看channel:

在queues中:

我们发现多了个channel和多了个queues

下节预告:下节我们将讲解重要对象之:exchange 交换机

本文来源:http://kaigejava.com/article/detail/499

凯哥个人博客:www.kaigejava.com

凯哥公众号:凯哥Java(kaigejava)

相关文章

网友评论

    本文标题:RabbitMQ学习系列教程三:快速入门

    本文链接:https://www.haomeiwen.com/subject/xpgjkctx.html