美文网首页
react-native-rabbitmq 即时通讯demo

react-native-rabbitmq 即时通讯demo

作者: 物联白菜 | 来源:发表于2022-08-23 09:38 被阅读0次

复制以下代码运行即可,运行后会绑定交换机、绑定队列,若是rabbitmq后台没有这个交换机或队列,它会自动创建交换机或队列。
运行后,在后台就可以看到

可以自己在app里面往这个队列里发消息、也可以在后台发送,其他人往这个交换机对应的队列发消息的时候,只要是绑定这个交换机和队列的人都可以收到。

需注意两点:
1、当队列里面有消息的时候,你要回应一个ack确认你收到了这条消息,相当于你收到了并确认处理这消息。否则你不管你发多少都是这条消息,不会显示下一条消息,直到这条消息被确认处理了。
2、queue.bind 第二个参数 route key 要和 exchange.publish 的 route key 要一致,不然会没有反应,因为发的消息 route key 和 接收的 route key 队列,确定你要监听接收消息的队列
3、也是自己遇到的坑,官网也没有明确说明,就是在建立连接后,一定要像官网:
创建队列-->创建交换机-->队列绑定交换机,这样的顺序来,不然你会发现android可以用,iOS不行,之前自己就是先创建交换机然后创建队列,最后队列绑定交换机,按照这样的顺序在iOS死活不能用。


'use strict';
import React, {Component} from 'react';
import {View, Text, StyleSheet, Image, TouchableOpacity, NativeModules, TextInput} from 'react-native'

import {Connection, Exchange, Queue} from 'react-native-rabbitmq';


class RabbitConsumer extends Component {
    constructor(props) {
        super(props);
        this.state = {
            inputVal: 'default send something!',
        }
    }

    componentDidMount() {
        const config = {
            host: '192.168.xxx.xxx',
            port: xxxx,
            username: '******',
            password: '******',
            virtualhost: '/',
            ttl: 10000, // Message time to live,
            // ssl: true // Enable ssl connection, make sure the port is 5671 or an other ssl port
        }

        let connection = new Connection(config);

        console.log('connection===>>>', connection)

        connection.connect()

        connection.on('error', (event) => {
            console.log('连接失败====>>>', event)
        });

        connection.on('connected', (event) => {

            console.log('连接了吗', event)

            /**
             * 创建队列
             * */
            let queue = new Queue(connection, {
                name: 'queue_player_test',
                passive: false,
                durable: true,
                exclusive: false,
                consumer_arguments: {'x-priority': 1}
            }, {
                // queueDeclare args here like x-message-ttl
            });

            /**
             * 创建交换机
             * */
            this.exchange = new Exchange(connection, {
                name: 'room_user_player_test',
                type: 'direct',
                durable: true,
                autoDelete: false,
                internal: false
            });

            /**
             * 队列绑定交换机
             * */
            queue.bind(this.exchange, 'queue_player_test_route_key');


            /**
             * 接收消息
             * */
            // Receive one message when it arrives
            queue.on('message', (data) => {
                console.log('队列消息====>>>', data)
                queue.basicAck(data.delivery_tag)

            });

            /**
             * 接收所有消息
             * */
            // Receive all messages send with in a second
            queue.on('messages', (data) => {
                console.log('所有队列消息====>>>', data)

            });


        });

    }


    sendMsg() {
        let message = this.state.inputVal;
        let routing_key = 'queue_player_test_route_key';
        let properties = {
            expiration: 10000
        }
        this.exchange.publish(message, routing_key, properties)
    }


    render() {
        return (
            <View style={{flex: 1}}>
                <TextInput style={{backgroundColor: 'gray'}} placeholder={'sss'} onChangeText={(txt) => {
                    this.setState({
                        inputVal: txt,
                    })
                }}/>
                <TouchableOpacity
                    style={{
                        paddingHorizontal: 40,
                        paddingVertical: 20,
                        borderRadius: 999,
                        backgroundColor: 'blue',
                        alignItems: 'center'
                    }}
                    onPress={() => this.sendMsg()}
                >
                    <Text style={{color: '#fff', fontSize: 20}}>
                        发送
                    </Text>
                </TouchableOpacity>
                <Text>
                    新页面
                </Text>
            </View>
        );
    }
}

export default RabbitConsumer;
const styles = StyleSheet.create({})

交换机


图片.png

队列


图片.png

发消息


图片.png

简单的封装与使用

Consumer.js


import {Connection, Exchange, Queue} from "react-native-rabbitmq";
import Util from '../common/Util'
import * as Common from "../../utils/Common";
import {DeviceEventEmitter} from "react-native";
import store from "../../store";
import actionType from "../../local/actions/actionType";

let connection = null
let exchange = null
let queue = null

module.exports = {


    setup(cloudID) {
        const config = {
            host: '192.168.xxx.xxx',
            port: xxxx,
            username: '******',
            password: '******',
            virtualhost: '/',
            ttl: 10000, // Message time to live,
            // ssl: true // Enable ssl connection, make sure the port is 5671 or an other ssl port
        }
        connection = new Connection(config);
        connection.connect()

        this.onConnect(cloudID)
        this.onError()

    },

    onConnect(cloudID) {
        console.log('cloudID 2====>>>', cloudID)
        connection.on('connected', (event) => {
            let content = 'MQ connect success!!!'
            Common.writeLog(content)
            console.log(content, event)
            this.onCreateQueue(cloudID)
            this.onCreateExchange()
            this.onQueueBindExchange(cloudID)
            this.onMessage()
            this.onMessages()
            this.updateRoomInfo(cloudID)
            this.updateConnectState(true)
        })
    },


    onCreateExchange() {
        exchange = new Exchange(connection, {
            name: 'room_user_player',
            type: 'direct',
            durable: true,
            autoDelete: false,
            internal: false
        });
    },


    onCreateQueue(cloudID) {
        console.log('create queue name is ===>>', Util.queueName(cloudID))
        let content = `create queue name is:${Util.queueName(cloudID)}`
        Common.writeLog(content)

        queue = new Queue(connection, {
            name: Util.queueName(cloudID),
            passive: false,
            durable: true,
            exclusive: false,
            consumer_arguments: {'x-priority': 1}
        }, {
            // queueDeclare args here like x-message-ttl
        });
    },


    onQueueBindExchange(cloudID) {
        queue.bind(exchange, Util.routeKeyName(cloudID));
        // queue.bind(exchange, 'queue_player_test_route_key');
    },


    sendMessage(msg, routeKey = 'queue_player_test_route_key') {
        console.log('sending====>>>>>', msg)
        let message = msg;
        let routing_key = routeKey;
        let properties = {
            expiration: 10000
        }
        exchange.publish(message, routing_key, properties)
    },


    onMessage() {
        queue.on('message', (data) => {
            let message = JSON.parse(data.message)
            console.log('onMessage====>>>', message)
            let newData = JSON.parse(JSON.stringify(message))
            DeviceEventEmitter.emit('cloudDataListener', newData);

            queue.basicAck(data.delivery_tag)
        });
    },


    onMessages() {
        queue.on('messages', (data) => {
            // console.log('onMessages====>>>', data)
        });
    },



    onClose() {
        connection.close()
        connection.clear()
    },


    // onReconnect(cloudID) {
    //     this.setup(cloudID)
    // },


    onError() {
        connection.on('error', (event) => {
            console.log('connect error====>>>', event)
            this.updateConnectState(false)
        });
    },


    updateRoomInfo(info) {
        store.dispatch({
            // type: actionType.IP_INFO,
            type: actionType.ROOM_INFO,
            data: info,
        });
    },

    updateConnectState(state) {
        store.dispatch({
            type: actionType.NET_STATE,
            data: state,
        });
    }
    
    
}


相关文章

网友评论

      本文标题:react-native-rabbitmq 即时通讯demo

      本文链接:https://www.haomeiwen.com/subject/dfawgrtx.html