美文网首页
android 进程间通信 rabbitmq

android 进程间通信 rabbitmq

作者: 一个冬季 | 来源:发表于2020-09-10 23:15 被阅读0次
    参考网站

    https://github.com/Harry-III/RabbitMQ-Android

    上手了RabbitMQ?再来看看它的交换机(Exchange)吧
    RabbitMQ的Java应用(1) -- Rabbit Java Client使用
    RabbitMQ(三)入门 —— RabbitMQ的五种模式和四种交换机

    简单说明

    本例子是改编自上面的github链接

    rabbitmq解决了我什么问题

    1、android端不采用轮询的方式请求服务器,有点类似推送的感觉,能即时收到服务器的信息

    我修改了哪些地方

    1、将rabbitmq放到单独的进程中
    2、重新定义一些方法

    总结

    1、在多进程中通过 message.replyTo 方法将通信方式传递给 Service端

    ...省略
     override fun onServiceConnected(name: ComponentName?, iBinder: IBinder?) {
                try {
                    ...省略
                        将客户端的Msssenger对象传递给Service,用于相互通信使用
                    message.replyTo = mClientMessenger;
                  ...省略
                    mServiceMessenger?.send(message)
                } catch (e: RemoteException) {
                    e?.printStackTrace();
                }
            }
    

    2、rabbitmq的管道创建是要在线程里面,否则会报错
    3、如果有2个用户都采用一个管道(管道名 A),当服务器将信息都输送到A管道后,哪个用户处理消息快,哪个用户得到的信息就多,所以最好就是每个用户一个管道

    发送信息到管道.jpg
    4、如果采用Messger传递信息,传递数据不能超过1M大小的,否则会导致崩溃,因为当前进程共享该大小
    5、路由的意思,类似门票,只有持有该门票的人才可以通过
    6、该库的5.x版本系列 需要JDK 8 进行编译和运行。在Android上,这意味着仅支持Android 7.0或更高版本。4.x发布系列支持7.0之前的[JDK 6]和Android版本

    本项目github

    封装的rabbitMq代码

    RabbitMQClient .java

    public class RabbitMQClient {
        private final String TAG = "RabbitMQ";
        private final String FLAG_SEND = "send";
        private final String FLAG_RECEIVE = "receive";
    
        private final ConnectionFactory factory;
        private Connection connection;
        private Map<String, Channel> channelMap = new HashMap<>();
    
        public static final String EXCHANGETYPE_FANOUT = "fanout";   //不用匹配路由,发送给所有绑定转换器的队列
        public static final String EXCHANGETYPE_DIRECT = "direct";  //匹配路由一致,才发送给绑定转换器队列
        public static final String EXCHANGETYPE_TOPIC = "topic";  // 通配符* 和 # 匹配路由一致,才发送给绑定转换器队列
    
    
        public RabbitMQClient(String hostIp, int port, String username, String password) {
            factory = new ConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setHost(hostIp);
            factory.setPort(port);
            factory.setVirtualHost("/");//类似数据库的意思
            factory.setConnectionTimeout(15 * 1000);         //连接时间设置为10秒
            factory.setAutomaticRecoveryEnabled(true);   //恢复连接,通道
            factory.setTopologyRecoveryEnabled(true);    //恢复通道中 转换器,队列,绑定关系等
            factory.setNetworkRecoveryInterval(5 * 1000);    //恢复连接间隔,默认5秒
        }
    
    
        /**
         * @param message   需要发送的消息
         * @param queueName 管道名称
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void sendQueueMessage(String message, String queueName) throws IOException, TimeoutException, AlreadyClosedException {
            if (connection == null || !connection.isOpen()) {
                connection = factory.newConnection();
            }
            if (!channelMap.containsKey(FLAG_SEND + queueName)) {
                Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                channelMap.put(FLAG_SEND + queueName, channel);
            }
            //空名字的交换机,需要设置routingKey,此时会将routingKey 作为 队列名使用
            channelMap.get(FLAG_SEND + queueName).basicPublish("", queueName, null, message.getBytes());
        }
    
    
        /**
         * @param exchangeName 交换机名称
         * @param message      需要发送的消息
         * @param queueName    队列名称
         * @param routingKey   路由规则
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion 发送 exchangeType direct 类型的信息
         **/
        public void sendDirectTypeMessage(String exchangeName, String message, String queueName, String routingKey) throws IOException, TimeoutException, AlreadyClosedException {
            if (connection == null || !connection.isOpen()) {
                connection = factory.newConnection();
            }
            if (!channelMap.containsKey(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName)) {
                Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                channel.exchangeDeclare(exchangeName, EXCHANGETYPE_DIRECT);
                channelMap.put(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName, channel);
            }
            channelMap.get(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
        }
    
        /**
         * @param exchangeName 交换机名称
         * @param queueName    队列名称
         * @param message      发送的消息
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion 发送 exchangeType fanout 类型的信息
         **/
        public void sendFanoutTypeMessage(String exchangeName, String queueName, String message) throws IOException, TimeoutException, AlreadyClosedException {
            if (connection == null || !connection.isOpen()) {
                connection = factory.newConnection();
            }
            if (!channelMap.containsKey(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName)) {
                Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                channel.exchangeDeclare(exchangeName, EXCHANGETYPE_FANOUT);
                channelMap.put(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName, channel);
            }
            channelMap.get(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName).basicPublish(exchangeName, "", null, message.getBytes());
        }
    
        /**
         * @param exchangeName 交换机名称
         * @param exchangeType 模式
         * @param queueName    队列名称
         * @param message      需要发送的消息
         * @param routingKey   路由规则
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void sendExchangeNameQueueMessage(String exchangeName, String exchangeType, String message, String queueName, String routingKey) throws IOException, TimeoutException, AlreadyClosedException {
            if (connection == null || !connection.isOpen()) {
                connection = factory.newConnection();
            }
            if (!channelMap.containsKey(FLAG_SEND + exchangeName + exchangeType + queueName)) {
                Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                channel.exchangeDeclare(exchangeName, exchangeType);
                channelMap.put(FLAG_SEND + exchangeName + exchangeType + queueName, channel);
            }
            if (exchangeType.equals(EXCHANGETYPE_FANOUT)) {
                channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, "", null, message.getBytes());
            } else if (exchangeType.equals(EXCHANGETYPE_DIRECT)) {
                channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
            } else if (exchangeType.equals(EXCHANGETYPE_TOPIC)) {
                channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
            }
        }
    
    
        /**
         * @param queueName 队列名称
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void receiveQueueMessage(final String queueName, final ResponseListener listener)
                throws IOException, TimeoutException, AlreadyClosedException {
            receiveQueueRoutingKeyMessage(queueName, "", "", "", listener);
        }
    
    
        /**
         * @param queueName    队列名称
         * @param routingKey   路由规则
         * @param exchangeName 交换机名称
         * @param exchangeType 交换机类型
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void receiveQueueRoutingKeyMessage(String queueName, final String routingKey, String exchangeName, String exchangeType, final ResponseListener listener)
                throws IOException, TimeoutException, AlreadyClosedException {
    
            if (exchangeType.equals(EXCHANGETYPE_DIRECT) || exchangeType.equals(EXCHANGETYPE_TOPIC)) {
                if (TextUtils.isEmpty(routingKey)) {
                    throw new NullPointerException("路由规则不能为空");
                }
            }
    
            if (!TextUtils.isEmpty(routingKey)) {
                if (TextUtils.isEmpty(exchangeName)) {
                    throw new NullPointerException("交换机名称不能为空");
                }
            }
    
            if (!channelMap.containsKey(FLAG_RECEIVE + routingKey + queueName)) {
                if (connection == null || !connection.isOpen()) {
                    connection = factory.newConnection();
                }
    
                final Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                //绑定转换器,使用路由筛选消息
                if (!TextUtils.isEmpty(routingKey)) {
                    channel.exchangeDeclare(exchangeName, exchangeType);
                    channel.queueBind(queueName, exchangeName, routingKey);  //设置绑定
                }
                //监听队列
                channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                        String message = new String(body, "UTF-8");
                        if (listener != null) {
                            listener.receive(message);
                        }
                        channel.basicAck(envelope.getDeliveryTag(), false);  //消息应答
                    }
                });
                channelMap.put(FLAG_RECEIVE + routingKey + queueName, channel);
                Log.e(TAG,"已经连接上了,队列名称:" + queueName);
            }
        }
    
    
        /**
         * 关闭所有资源
         */
        public void close() {
            for (Channel next : channelMap.values()) {
                if (next != null && next.isOpen()) {
                    try {
                        next.close();
                    } catch (IOException | TimeoutException e) {
                        e.printStackTrace();
                    }
                }
            }
            channelMap.clear();
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public interface ResponseListener {
            void receive(String message);
        }
    }
    
    

    RabbitMQUtil .java

    public class RabbitMQUtil {
        private boolean isRunning = true;
        private RabbitMQClient rabbitMQ;
        private ExecutorService executor;
    
    
        public RabbitMQUtil(String hostIp, int port, String username, String password) {
            rabbitMQ = new RabbitMQClient(hostIp, port, username, password);
            executor = Executors.newSingleThreadExecutor();  //根据项目需要设置常用线程个数
        }
    
        /**
         * @param message   发送的消息
         * @param queueName 队列名称
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void sendMessage(final String message, final String queueName, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        rabbitMQ.sendQueueMessage(message, queueName);
                        if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                    } catch (IOException | TimeoutException | AlreadyClosedException e) {
                        e.printStackTrace();
                        if (errorMessageListener!=null){
                            errorMessageListener.errorMessage(e);
                        }
                        if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                    }
                }
            });
        }
    
        /**
         * @param message      发送的消息
         * @param exchangeName 交换机名称
         * @param queueName    队列名称
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void sendMessage(final String message, final String exchangeName, final String exchangeType, final String queueName, final String routingKey, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        rabbitMQ.sendExchangeNameQueueMessage(exchangeName, exchangeType, message, queueName, routingKey);
                        if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                    } catch (IOException | TimeoutException | AlreadyClosedException e) {
                        e.printStackTrace();
                        if (errorMessageListener!=null){
                            errorMessageListener.errorMessage(e);
                        }
                        if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                    }
                }
            });
        }
    
        /**
         * @param exchangeName 交换机名称
         * @param queueName    队列名称
         * @param message      需要发送的消息
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void sendFanoutTypeMessage(final String exchangeName, final String message, final String queueName, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        rabbitMQ.sendFanoutTypeMessage(exchangeName, queueName, message);
                        if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                    } catch (IOException | TimeoutException | AlreadyClosedException e) {
                        e.printStackTrace();
                        if (errorMessageListener!=null){
                            errorMessageListener.errorMessage(e);
                        }
                        if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                    }
                }
            });
        }
    
        /**
         * @param exchangeName 交换机名称
         * @param message      需要发送的消息
         * @param queueName    队列名称
         * @param routingKey   路由规则
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion 发送 exchangeType direct 类型的信息
         **/
        public void sendDirectTypeMessage(final String exchangeName, final String queueName, final String message, final String routingKey, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        rabbitMQ.sendDirectTypeMessage(exchangeName, queueName, message, routingKey);
                        if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                    } catch (IOException | TimeoutException | AlreadyClosedException e) {
                        e.printStackTrace();
                        if (errorMessageListener!=null){
                            errorMessageListener.errorMessage(e);
                        }
                        if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                    }
                }
            });
        }
    
        /**
         * @param queueName 队列名称
         * @date 创建时间:2020/9/8 0008
         * @auther gaoxiaoxiong
         * @Descriptiion
         **/
        public void receiveQueueMessage(String queueName, final ReceiveMessageListener listener,final ErrorMessageListener errorMessageListener) {
            String newQueueName = null;
            if (TextUtils.isEmpty(queueName)){
                newQueueName = createDefaultQueueName(queueName);
            }else {
                newQueueName = queueName;
            }
            final String finalNewQueueName = newQueueName;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    while (isRunning) {
                        try {
                            rabbitMQ.receiveQueueMessage(finalNewQueueName, new RabbitMQClient.ResponseListener() {
                                @Override
                                public void receive(String message) {
                                    if (listener != null) listener.receiveMessage(message);
                                }
                            });
                        } catch (IOException | TimeoutException | AlreadyClosedException e) {
                            if (errorMessageListener!=null){
                                errorMessageListener.errorMessage(e);
                            }
                            e.printStackTrace();
                            SystemClock.sleep(5000);
                        }
                    }
                }
            });
        }
    
        public void receiveQueueRoutingKeyMessage(String queueName, final String routingKey, final String exchangeName, final String exchangeType, final ReceiveMessageListener listener,final ErrorMessageListener errorMessageListener) {
            String newQueueName = null;
            if (TextUtils.isEmpty(queueName)){
                newQueueName = createDefaultQueueName(queueName);
            }else {
                newQueueName = queueName;
            }
            final String finalNewQueueName = newQueueName;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    while (isRunning) {
                        try {
                            rabbitMQ.receiveQueueRoutingKeyMessage(finalNewQueueName, routingKey, exchangeName, exchangeType, new RabbitMQClient.ResponseListener() {
                                @Override
                                public void receive(String message) {
                                    if (listener != null) listener.receiveMessage(message);
                                }
    
                            });
                        } catch (IOException | TimeoutException | AlreadyClosedException e) {
                            if (errorMessageListener!=null){
                                errorMessageListener.errorMessage(e);
                            }
                            e.printStackTrace();
                            SystemClock.sleep(5000);  //等待五秒
                        }
                    }
                }
            });
        }
    
        public String createDefaultQueueName(String routingKey) {
            if (TextUtils.isEmpty(routingKey)){
                routingKey = "";
            }
            return routingKey + "@" + UUID.randomUUID();
        }
    
        /**
         * 建议:
         * 在application中关闭或者在结束工作时关闭
         */
        public void close() {
            isRunning = false;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    rabbitMQ.close();
                    executor.shutdownNow();
                }
            });
        }
    
    
        public interface ReceiveMessageListener {
            void receiveMessage(String message);
        }
    
        public interface SendMessageListener {
            void sendMessage(boolean isSuccess);
        }
    
        public interface ErrorMessageListener{
            void errorMessage(Exception e);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:android 进程间通信 rabbitmq

          本文链接:https://www.haomeiwen.com/subject/fpbyektx.html