RabbitMQ

作者: 小蜗牛Aaron | 来源:发表于2020-03-12 02:55 被阅读0次

简介

RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系 统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。


rabbitMQ特性

安装

https://www.rabbitmq.com/download.html
各种环境官方文档很详细

案例

Hello World

本案例是从消息生产者发送数据到rabbitMQ rabbitMQ把消息转发给消费者 代码如下

package com.arlley.rabbitMQ.hello;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class HelloConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("Hello", new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body, "UTF-8"));
            }
        });
        System.in.read();
        channel.close();
        conn.close();
    }
}
package com.arlley.rabbitMQ.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class HelloProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("Hello", false, false, false, null);

        channel.basicPublish("", "Hello", null, "Hello World My Rabbit MQ".getBytes());

        System.out.println("发送消息到MQ-------------");

        channel.close();
        connection.close();

    }
}
Work Queues

Work Queue 是生产者发送消息, 由多个消费者共同消费。

package com.arlley.rabbitMQ.workQueue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueConsumer implements Runnable{

    private static ConnectionFactory factory = new ConnectionFactory();

    private static Connection connection;

    static {
        try {
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //channel = connection.createChannel();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws IOException {
        Thread thread1 = new Thread(new WorkQueueConsumer());
        Thread thread2 = new Thread(new WorkQueueConsumer());
        thread1.setName("thread1");
        thread2.setName("thread2");
        thread1.start();
        thread2.start();
        System.in.read();
    }

    @Override
    public void run() {
        try {
            final String name = Thread.currentThread().getName();
            final Channel channel = connection.createChannel();
            channel.basicQos(1);
            channel.basicConsume("workQueue", new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("成功收到消息"+ name);
                    System.out.println(new String(message.getBody(), "UTF-8"));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    channel.basicCancel(consumerTag);
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.arlley.rabbitMQ.workQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("workQueue", false, false, false, null);

        for(int i=0;i<1000;i++){
            String message = ("workQueue"+i);
            System.out.println("发送消息:"+message+"------------");
            channel.basicPublish("", "workQueue", null, message.getBytes());
        }

        channel.close();
        connection.close();
    }
}
Publish/Subscribe

这是一种发布订阅模式 使用的是faout的交换机

package com.arlley.rabbitMQ.pubsub;


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PubSubConsumer implements Runnable{

    private static ConnectionFactory factory = new ConnectionFactory();
    private static Connection connection = null;

    static {
        try {
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //channel = connection.createChannel();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try{
            final Channel channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            final String threadName = Thread.currentThread().getName();
            channel.queueBind(queueName, "pub", "");
            channel.basicConsume(queueName, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(threadName+"成功收到消息----------");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }, consumerTag -> {});
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        for (int i=0; i<10; i++){
            Thread thread = new Thread(new PubSubConsumer());
            thread.setName("t"+i);
            thread.start();
        }
        System.out.println("按任意键退出!");
        System.in.read();
    }
}
package com.arlley.rabbitMQ.pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PubSubProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();

        channel.exchangeDeclare("pub", "fanout");
        for(int i=0;i<1000;i++) {
            channel.basicPublish("pub", "", null, ("pubSub模式"+i).getBytes());
        }
        channel.close();
        conn.close();
    }
}
Routing

这是一个可以将特定一些消息发送到特定的队列,是广播消息的增强。

package com.arlley.rabbitMQ.route;

import com.arlley.rabbitMQ.pubsub.PubSubConsumer;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RouteConsumer implements Runnable{

    private static ConnectionFactory factory = new ConnectionFactory();
    private static Connection connection = null;

    private String routeKey;

    static {
        try {
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //channel = connection.createChannel();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public RouteConsumer(String routeKey){
        this.routeKey = routeKey;
    }

    @Override
    public void run() {
        try{
            final Channel channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            final String threadName = Thread.currentThread().getName();
            channel.queueBind(queueName, "route", routeKey);
            channel.basicConsume(queueName, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(threadName+"成功收到消息----------");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }, consumerTag -> {});
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        for (int i=0; i<5; i++){
            Thread thread = new Thread(new RouteConsumer("my"));
            thread.setName("my"+i);
            thread.start();
        }

        for (int i=0; i<5; i++){
            Thread thread = new Thread(new RouteConsumer("notMy"));
            thread.setName("notMy"+i);
            thread.start();
        }
    }
}
package com.arlley.rabbitMQ.route;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RouteProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();

        channel.exchangeDeclare("route", "direct");

        channel.basicPublish("route", "my", null, "my".getBytes());

        channel.basicPublish("route", "notMy", null, "notMy".getBytes());

        channel.close();
        conn.close();
    }
}
Topics

是route的增强 提供了正则表达式的路由

package com.arlley.rabbitMQ.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer implements Runnable{
    private static ConnectionFactory factory = new ConnectionFactory();
    private static Connection connection = null;

    private String routeKey;

    static {
        try {
            factory.setHost("localhost");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            connection = factory.newConnection();
            //channel = connection.createChannel();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public TopicConsumer(String routeKey){
        this.routeKey = routeKey;
    }

    @Override
    public void run() {
        try{
            final Channel channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            final String threadName = Thread.currentThread().getName();
            channel.queueBind(queueName, "topic", routeKey);
            channel.basicConsume(queueName, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println(threadName+"成功收到消息----------");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }, consumerTag -> {});
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Thread thread = new Thread(new TopicConsumer("lazy.#"));
        thread.setName("t1");
        thread.start();

        Thread thread1 = new Thread(new TopicConsumer("lazy.all.#"));
        thread1.setName("t2");
        thread1.start();
    }
}
package com.arlley.rabbitMQ.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();

        channel.exchangeDeclare("topic", "topic");

        channel.basicPublish("topic","lazy.all.3", null, "topicAll".getBytes());

        channel.basicPublish("topic","lazy.eee", null, "topicE".getBytes());
        channel.close();
        conn.close();
    }
}
rpc
package com.arlley.rabbitMQ.rpc;


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RpcCaller {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();
        channel.exchangeDeclare("rpc", "direct");
        String queueName = channel.queueDeclare().getQueue();

        String correlationId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().replyTo("replyTo").correlationId(correlationId).build();


        channel.basicPublish("rpc", "rpcCall", props, "1".getBytes());

        BlockingQueue<String> reponse = new ArrayBlockingQueue<String>(1);

        channel.basicConsume("replyTo", false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("收到回调结果");
                if(correlationId.equals(message.getProperties().getCorrelationId())){
                    String result = new String(message.getBody(), "UTF-8");
                    System.out.println(result);
                    reponse.offer(result);
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }else{
                    //收到的消息不是当前客户端需要的
                    channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
                }
            }
        }, consumerTag -> {});

        String result = reponse.take();
        channel.close();
        conn.close();
    }
}
package com.arlley.rabbitMQ.rpc;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RpcConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, "rpc", "rpcCall");

        channel.basicConsume(queueName, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body, "UTF-8"));
                String replyTo = properties.getReplyTo();
                channel.queueDeclare(replyTo, false, false, true, null);
                channel.queueBind(replyTo, "rpc", "reply");
                channel.basicPublish("rpc", "reply", properties, "2".getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        System.in.read();
        channel.close();
        conn.close();
    }
}

相关文章

网友评论

    本文标题:RabbitMQ

    本文链接:https://www.haomeiwen.com/subject/yklfjhtx.html