美文网首页
react-native-rabbitmq 即时通讯demo

react-native-rabbitmq 即时通讯demo

作者: 物联白菜 | 来源:发表于2022-08-23 09:38 被阅读0次

    复制以下代码运行即可,运行后会绑定交换机、绑定队列,若是rabbitmq后台没有这个交换机或队列,它会自动创建交换机或队列。
    运行后,在后台就可以看到

    可以自己在app里面往这个队列里发消息、也可以在后台发送,其他人往这个交换机对应的队列发消息的时候,只要是绑定这个交换机和队列的人都可以收到。

    需注意两点:
    1、当队列里面有消息的时候,你要回应一个ack确认你收到了这条消息,相当于你收到了并确认处理这消息。否则你不管你发多少都是这条消息,不会显示下一条消息,直到这条消息被确认处理了。
    2、queue.bind 第二个参数 route key 要和 exchange.publish 的 route key 要一致,不然会没有反应,因为发的消息 route key 和 接收的 route key 队列,确定你要监听接收消息的队列
    3、也是自己遇到的坑,官网也没有明确说明,就是在建立连接后,一定要像官网:
    创建队列-->创建交换机-->队列绑定交换机,这样的顺序来,不然你会发现android可以用,iOS不行,之前自己就是先创建交换机然后创建队列,最后队列绑定交换机,按照这样的顺序在iOS死活不能用。

    
    'use strict';
    import React, {Component} from 'react';
    import {View, Text, StyleSheet, Image, TouchableOpacity, NativeModules, TextInput} from 'react-native'
    
    import {Connection, Exchange, Queue} from 'react-native-rabbitmq';
    
    
    class RabbitConsumer extends Component {
        constructor(props) {
            super(props);
            this.state = {
                inputVal: 'default send something!',
            }
        }
    
        componentDidMount() {
            const config = {
                host: '192.168.xxx.xxx',
                port: xxxx,
                username: '******',
                password: '******',
                virtualhost: '/',
                ttl: 10000, // Message time to live,
                // ssl: true // Enable ssl connection, make sure the port is 5671 or an other ssl port
            }
    
            let connection = new Connection(config);
    
            console.log('connection===>>>', connection)
    
            connection.connect()
    
            connection.on('error', (event) => {
                console.log('连接失败====>>>', event)
            });
    
            connection.on('connected', (event) => {
    
                console.log('连接了吗', event)
    
                /**
                 * 创建队列
                 * */
                let queue = new Queue(connection, {
                    name: 'queue_player_test',
                    passive: false,
                    durable: true,
                    exclusive: false,
                    consumer_arguments: {'x-priority': 1}
                }, {
                    // queueDeclare args here like x-message-ttl
                });
    
                /**
                 * 创建交换机
                 * */
                this.exchange = new Exchange(connection, {
                    name: 'room_user_player_test',
                    type: 'direct',
                    durable: true,
                    autoDelete: false,
                    internal: false
                });
    
                /**
                 * 队列绑定交换机
                 * */
                queue.bind(this.exchange, 'queue_player_test_route_key');
    
    
                /**
                 * 接收消息
                 * */
                // Receive one message when it arrives
                queue.on('message', (data) => {
                    console.log('队列消息====>>>', data)
                    queue.basicAck(data.delivery_tag)
    
                });
    
                /**
                 * 接收所有消息
                 * */
                // Receive all messages send with in a second
                queue.on('messages', (data) => {
                    console.log('所有队列消息====>>>', data)
    
                });
    
    
            });
    
        }
    
    
        sendMsg() {
            let message = this.state.inputVal;
            let routing_key = 'queue_player_test_route_key';
            let properties = {
                expiration: 10000
            }
            this.exchange.publish(message, routing_key, properties)
        }
    
    
        render() {
            return (
                <View style={{flex: 1}}>
                    <TextInput style={{backgroundColor: 'gray'}} placeholder={'sss'} onChangeText={(txt) => {
                        this.setState({
                            inputVal: txt,
                        })
                    }}/>
                    <TouchableOpacity
                        style={{
                            paddingHorizontal: 40,
                            paddingVertical: 20,
                            borderRadius: 999,
                            backgroundColor: 'blue',
                            alignItems: 'center'
                        }}
                        onPress={() => this.sendMsg()}
                    >
                        <Text style={{color: '#fff', fontSize: 20}}>
                            发送
                        </Text>
                    </TouchableOpacity>
                    <Text>
                        新页面
                    </Text>
                </View>
            );
        }
    }
    
    export default RabbitConsumer;
    const styles = StyleSheet.create({})
    
    

    交换机


    图片.png

    队列


    图片.png

    发消息


    图片.png

    简单的封装与使用

    Consumer.js

    
    import {Connection, Exchange, Queue} from "react-native-rabbitmq";
    import Util from '../common/Util'
    import * as Common from "../../utils/Common";
    import {DeviceEventEmitter} from "react-native";
    import store from "../../store";
    import actionType from "../../local/actions/actionType";
    
    let connection = null
    let exchange = null
    let queue = null
    
    module.exports = {
    
    
        setup(cloudID) {
            const config = {
                host: '192.168.xxx.xxx',
                port: xxxx,
                username: '******',
                password: '******',
                virtualhost: '/',
                ttl: 10000, // Message time to live,
                // ssl: true // Enable ssl connection, make sure the port is 5671 or an other ssl port
            }
            connection = new Connection(config);
            connection.connect()
    
            this.onConnect(cloudID)
            this.onError()
    
        },
    
        onConnect(cloudID) {
            console.log('cloudID 2====>>>', cloudID)
            connection.on('connected', (event) => {
                let content = 'MQ connect success!!!'
                Common.writeLog(content)
                console.log(content, event)
                this.onCreateQueue(cloudID)
                this.onCreateExchange()
                this.onQueueBindExchange(cloudID)
                this.onMessage()
                this.onMessages()
                this.updateRoomInfo(cloudID)
                this.updateConnectState(true)
            })
        },
    
    
        onCreateExchange() {
            exchange = new Exchange(connection, {
                name: 'room_user_player',
                type: 'direct',
                durable: true,
                autoDelete: false,
                internal: false
            });
        },
    
    
        onCreateQueue(cloudID) {
            console.log('create queue name is ===>>', Util.queueName(cloudID))
            let content = `create queue name is:${Util.queueName(cloudID)}`
            Common.writeLog(content)
    
            queue = new Queue(connection, {
                name: Util.queueName(cloudID),
                passive: false,
                durable: true,
                exclusive: false,
                consumer_arguments: {'x-priority': 1}
            }, {
                // queueDeclare args here like x-message-ttl
            });
        },
    
    
        onQueueBindExchange(cloudID) {
            queue.bind(exchange, Util.routeKeyName(cloudID));
            // queue.bind(exchange, 'queue_player_test_route_key');
        },
    
    
        sendMessage(msg, routeKey = 'queue_player_test_route_key') {
            console.log('sending====>>>>>', msg)
            let message = msg;
            let routing_key = routeKey;
            let properties = {
                expiration: 10000
            }
            exchange.publish(message, routing_key, properties)
        },
    
    
        onMessage() {
            queue.on('message', (data) => {
                let message = JSON.parse(data.message)
                console.log('onMessage====>>>', message)
                let newData = JSON.parse(JSON.stringify(message))
                DeviceEventEmitter.emit('cloudDataListener', newData);
    
                queue.basicAck(data.delivery_tag)
            });
        },
    
    
        onMessages() {
            queue.on('messages', (data) => {
                // console.log('onMessages====>>>', data)
            });
        },
    
    
    
        onClose() {
            connection.close()
            connection.clear()
        },
    
    
        // onReconnect(cloudID) {
        //     this.setup(cloudID)
        // },
    
    
        onError() {
            connection.on('error', (event) => {
                console.log('connect error====>>>', event)
                this.updateConnectState(false)
            });
        },
    
    
        updateRoomInfo(info) {
            store.dispatch({
                // type: actionType.IP_INFO,
                type: actionType.ROOM_INFO,
                data: info,
            });
        },
    
        updateConnectState(state) {
            store.dispatch({
                type: actionType.NET_STATE,
                data: state,
            });
        }
        
        
    }
    
    
    

    相关文章

      网友评论

          本文标题:react-native-rabbitmq 即时通讯demo

          本文链接:https://www.haomeiwen.com/subject/dfawgrtx.html