复制以下代码运行即可,运行后会绑定交换机、绑定队列,若是rabbitmq后台没有这个交换机或队列,它会自动创建交换机或队列。
运行后,在后台就可以看到
可以自己在app里面往这个队列里发消息、也可以在后台发送,其他人往这个交换机对应的队列发消息的时候,只要是绑定这个交换机和队列的人都可以收到。
需注意两点:
1、当队列里面有消息的时候,你要回应一个ack确认你收到了这条消息,相当于你收到了并确认处理这消息。否则你不管你发多少都是这条消息,不会显示下一条消息,直到这条消息被确认处理了。
2、queue.bind 第二个参数 route key 要和 exchange.publish 的 route key 要一致,不然会没有反应,因为发的消息 route key 和 接收的 route key 队列,确定你要监听接收消息的队列
3、也是自己遇到的坑,官网也没有明确说明,就是在建立连接后,一定要像官网:
创建队列-->创建交换机-->队列绑定交换机,这样的顺序来,不然你会发现android可以用,iOS不行,之前自己就是先创建交换机然后创建队列,最后队列绑定交换机,按照这样的顺序在iOS死活不能用。
'use strict';
import React, {Component} from 'react';
import {View, Text, StyleSheet, Image, TouchableOpacity, NativeModules, TextInput} from 'react-native'
import {Connection, Exchange, Queue} from 'react-native-rabbitmq';
class RabbitConsumer extends Component {
constructor(props) {
super(props);
this.state = {
inputVal: 'default send something!',
}
}
componentDidMount() {
const config = {
host: '192.168.xxx.xxx',
port: xxxx,
username: '******',
password: '******',
virtualhost: '/',
ttl: 10000, // Message time to live,
// ssl: true // Enable ssl connection, make sure the port is 5671 or an other ssl port
}
let connection = new Connection(config);
console.log('connection===>>>', connection)
connection.connect()
connection.on('error', (event) => {
console.log('连接失败====>>>', event)
});
connection.on('connected', (event) => {
console.log('连接了吗', event)
/**
* 创建队列
* */
let queue = new Queue(connection, {
name: 'queue_player_test',
passive: false,
durable: true,
exclusive: false,
consumer_arguments: {'x-priority': 1}
}, {
// queueDeclare args here like x-message-ttl
});
/**
* 创建交换机
* */
this.exchange = new Exchange(connection, {
name: 'room_user_player_test',
type: 'direct',
durable: true,
autoDelete: false,
internal: false
});
/**
* 队列绑定交换机
* */
queue.bind(this.exchange, 'queue_player_test_route_key');
/**
* 接收消息
* */
// Receive one message when it arrives
queue.on('message', (data) => {
console.log('队列消息====>>>', data)
queue.basicAck(data.delivery_tag)
});
/**
* 接收所有消息
* */
// Receive all messages send with in a second
queue.on('messages', (data) => {
console.log('所有队列消息====>>>', data)
});
});
}
sendMsg() {
let message = this.state.inputVal;
let routing_key = 'queue_player_test_route_key';
let properties = {
expiration: 10000
}
this.exchange.publish(message, routing_key, properties)
}
render() {
return (
<View style={{flex: 1}}>
<TextInput style={{backgroundColor: 'gray'}} placeholder={'sss'} onChangeText={(txt) => {
this.setState({
inputVal: txt,
})
}}/>
<TouchableOpacity
style={{
paddingHorizontal: 40,
paddingVertical: 20,
borderRadius: 999,
backgroundColor: 'blue',
alignItems: 'center'
}}
onPress={() => this.sendMsg()}
>
<Text style={{color: '#fff', fontSize: 20}}>
发送
</Text>
</TouchableOpacity>
<Text>
新页面
</Text>
</View>
);
}
}
export default RabbitConsumer;
const styles = StyleSheet.create({})
交换机
图片.png
队列
图片.png
发消息
图片.png
简单的封装与使用
Consumer.js
import {Connection, Exchange, Queue} from "react-native-rabbitmq";
import Util from '../common/Util'
import * as Common from "../../utils/Common";
import {DeviceEventEmitter} from "react-native";
import store from "../../store";
import actionType from "../../local/actions/actionType";
let connection = null
let exchange = null
let queue = null
module.exports = {
setup(cloudID) {
const config = {
host: '192.168.xxx.xxx',
port: xxxx,
username: '******',
password: '******',
virtualhost: '/',
ttl: 10000, // Message time to live,
// ssl: true // Enable ssl connection, make sure the port is 5671 or an other ssl port
}
connection = new Connection(config);
connection.connect()
this.onConnect(cloudID)
this.onError()
},
onConnect(cloudID) {
console.log('cloudID 2====>>>', cloudID)
connection.on('connected', (event) => {
let content = 'MQ connect success!!!'
Common.writeLog(content)
console.log(content, event)
this.onCreateQueue(cloudID)
this.onCreateExchange()
this.onQueueBindExchange(cloudID)
this.onMessage()
this.onMessages()
this.updateRoomInfo(cloudID)
this.updateConnectState(true)
})
},
onCreateExchange() {
exchange = new Exchange(connection, {
name: 'room_user_player',
type: 'direct',
durable: true,
autoDelete: false,
internal: false
});
},
onCreateQueue(cloudID) {
console.log('create queue name is ===>>', Util.queueName(cloudID))
let content = `create queue name is:${Util.queueName(cloudID)}`
Common.writeLog(content)
queue = new Queue(connection, {
name: Util.queueName(cloudID),
passive: false,
durable: true,
exclusive: false,
consumer_arguments: {'x-priority': 1}
}, {
// queueDeclare args here like x-message-ttl
});
},
onQueueBindExchange(cloudID) {
queue.bind(exchange, Util.routeKeyName(cloudID));
// queue.bind(exchange, 'queue_player_test_route_key');
},
sendMessage(msg, routeKey = 'queue_player_test_route_key') {
console.log('sending====>>>>>', msg)
let message = msg;
let routing_key = routeKey;
let properties = {
expiration: 10000
}
exchange.publish(message, routing_key, properties)
},
onMessage() {
queue.on('message', (data) => {
let message = JSON.parse(data.message)
console.log('onMessage====>>>', message)
let newData = JSON.parse(JSON.stringify(message))
DeviceEventEmitter.emit('cloudDataListener', newData);
queue.basicAck(data.delivery_tag)
});
},
onMessages() {
queue.on('messages', (data) => {
// console.log('onMessages====>>>', data)
});
},
onClose() {
connection.close()
connection.clear()
},
// onReconnect(cloudID) {
// this.setup(cloudID)
// },
onError() {
connection.on('error', (event) => {
console.log('connect error====>>>', event)
this.updateConnectState(false)
});
},
updateRoomInfo(info) {
store.dispatch({
// type: actionType.IP_INFO,
type: actionType.ROOM_INFO,
data: info,
});
},
updateConnectState(state) {
store.dispatch({
type: actionType.NET_STATE,
data: state,
});
}
}
网友评论