借助于 RabbitMQ 的 Web STOMP 插件,实现浏览器与服务端的全双工通信。从本质上说,RabbitMQ 的 Web STOMP 插件也是利用 WebSocket 对 STOMP 协议进行了一次桥接,从而实现浏览器与服务端的双向通信。
安装 RabbitMQ 服务
mac安装rabbitmq
brew update
brew install rabbitmq
耐心等待,安装完成后需要将/usr/local/sbin添加到$PATH,可以将下面这两行加到~/.bash_profile:
# RabbitMQ Config
export PATH=$PATH:/usr/local/sbin
编辑完后:wq保存退出,使环境变量立即生效。
source ~/.bash_profile
启动rabbitmq
rabbitmq-server
启用相关插件
rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
重启 RabbitMQ 服务
service rabbitmq-server restart
验证是否安装成功
通过 Web 浏览器来查看 RabbitMQ 的运行状态,浏览器中输入 http://{server_ip}:15672,用 guest/guest 默认的用户和密码登录后即可查看 RabbitMQ 的运行状态。
基于 RabbitMQ 的实时消息推送
RabbitMQ 有很多第三方插件,可以在 AMQP 协议基础上做出许多扩展的应用。Web STOMP 插件就是基于 AMQP 之上的 STOMP 文本协议插件,利用 WebSocket 能够轻松实现浏览器和服务器之间的实时消息传递。
Paste_Image.png消息发送者
Java 作为 RabbitMQ 客户端消息发送者,Web 浏览器作为消息消费者。
package com.ibm.cdl.itaas.stomp;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Program {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("rabbitmq", "fanout");
String routingKey = "rabbitmq_routingkey";
String message = "{\"name\":\"推送测试数据!\"}";
channel.basicPublish("rabbitmq", routingKey,null, message.getBytes());
System.out.println("[x] Sent Message:"+message);
channel.close();
connection.close();
}
}
这里我们利用 RabbitMQ 官方提供的 Java Client Library 来实现消息的发送
Paste_Image.pngexchange 接收到消息后,会根据消息的 key 和已经设置的 binding 进行消息路由,最终投递到一个或多个队列里进行消息处理。RabbitMQ 预置了一些 exchange,如果客户端未声明 exchange 时,RabbitMQ 会根据 exchange 类型使用默认的 exchange。
预置exchange名称
Paste_Image.pngExchange 类型
1.Direct exchange
Direct exchange 完全根据 key 进行投递,只有 key 与绑定时的 routing key 完全一致的消息才会收到消息,参考官网提供的图 4 更直观地了解 Direct exchange。
2.Fanount exchange
Fanount 完全不关心 key,直接采取广播的方式进行消息投递,与该交换机绑定的所有队列都会收到消息
3.Topic exchange
Topic exchange 会根据 key 进行模式匹配然后进行投递,与设置的 routing key 匹配上的队列才能收到消息。
4.Headers exchange
Header exchange 使用消息头代替 routing key 作为关键字进行路由,不过在实际应用过程中这种类型的 exchange 使用较少。
消息持久化
RabbitMQ 支持消息的持久化,即将消息数据持久化到磁盘上,如果消息服务器中途断开,下次开启会将持久化的消息重新发送,消息队列持久化需要保证 exchange(指定 durable=1)、queue(指定 durable=1)和消息(delivery_mode=2)3 个部分都是持久化。出于数据安全考虑,一般消息都会进行持久化。
消息接收者
html代码
<!DOCTYPE html>
<html><head>
<script src="jquery/jquery-1.9.1.min.js"></script>
<script src="rabbitmq/sockjs-0.3.js"></script>
<script src="rabbitmq/stomp.js"></script>
<style>
.box {
width: 440px;
float: left;
margin: 0 20px 0 20px;
}
.box div, .box input {
border: 1px solid;
-moz-border-radius: 4px;
border-radius: 4px;
width: 100%;
padding: 5px;
margin: 3px 0 10px 0;
}
.box div {
border-color: grey;
height: 300px;
overflow: auto;
}
div code {
display: block;
}
#first div code {
-moz-border-radius: 2px;
border-radius: 2px;
border: 1px solid #eee;
margin-bottom: 5px;
}
#second div {
font-size: 0.8em;
}
</style>
<title>RabbitMQ Web STOMP Examples : Echo Server</title>
<link href="main.css" rel="stylesheet" type="text/css"/>
</head><body lang="en">
<h1><a href="index.html">RabbitMQ Web STOMP Examples</a> > Echo Server</h1>
<div id="first" class="box">
<h2>Received</h2>
<div></div>
<form><input autocomplete="off" value="Type here..."></input></form>
</div>
<div id="second" class="box">
<h2>Logs</h2>
<div></div>
</div>
<script>
var has_had_focus = false;
var pipe = function(el_name, send) {
var div = $(el_name + ' div');
var inp = $(el_name + ' input');
var form = $(el_name + ' form');
var print = function(m, p) {
p = (p === undefined) ? '' : JSON.stringify(p);
div.append($("<code>").text(m + ' ' + p));
div.scrollTop(div.scrollTop() + 10000);
};
if (send) {
form.submit(function() {
send(inp.val());
inp.val('');
return false;
});
}
return print;
};
var print_first = pipe('#first', function(data) {
// client.send('/topic/test', {"content-type":"text/plain"}, data);
});
// Stomp.js boilerplate
if (location.search == '?ws') {
var ws = new WebSocket('ws://127.0.0.1:15674/ws');
} else {
var ws = new SockJS('http://127.0.0.1:15674/stomp');
}
// Init Client
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');
// Declare on_connect
var on_connect = function(x) {
client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) {
print_first(d.body);
});
};
// Declare on_error
var on_error = function() {
console.log('error');
};
// Conect to RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');
$('#first input').focus(function() {
if (!has_had_focus) {
has_had_focus = true;
$(this).val("");
}
});
</script>
</body>
</html>
测试stomp插件是否好用
Paste_Image.png重要JavaScript代码
// Stomp.js boilerplate
if (location.search == '?ws') {
var ws = new WebSocket('ws://192.168.1.102:15674/ws');
} else {
var ws = new SockJS('http://192.168.1.102:15674/stomp');
}
// Init Client
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// Declare on_connect
var on_connect = function(x) {
client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) {
print_first(d.body);
});
};
// Declare on_error
var on_error = function() {
console.log('error');
};
// Conect to RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');
RabbitMQ Web STOMP 插件可以理解为 HTML5 WebSocket 与 STOMP 协议间的桥接,目的也是为了让浏览器能够使用 RabbitMQ。当 RabbitMQ 消息服务器开启了 STOMP 和 Web STOMP 插件后,浏览器端就可以轻松地使用 WebSocket 或者 SockerJS 客户端实现与 RabbitMQ 服务器进行通信。
RabbitMQ Web STOMP 是对 STOMP 协议的桥接,因此其语法也完全遵循 STOMP 协议。STOMP 是基于 frame 的协议,与 HTTP 的 frame 相似。一个 Frame 包含一个 command,一系列可选的 headers 和一个 body。STOMP client 的用户代理可以充当两个角色,当然也可能同时充当:作为生产者,通过 SEND frame 发送消息到服务器;作为消费者,发送 SUBCRIBE frame 到目的地并且通过 MESSAGE frame 从服务器获取消息。
在 Web 页面中利用 WebSocket 使用 STOMP 协议只需要下载 stomp.js 即可,考虑到老版本的浏览器不支持 WebSocket,SockJS 则提供了 WebSocket 的模拟支持。Web 页面中使用 STOMP 协议详见下列代码清单如下
// 初始化 ws 对象
if (location.search == '?ws') {
var ws = new WebSocket('ws://192.168.1.102:15674/ws');
} else {
var ws = new SockJS('http://192.168.1.102:15674/stomp');
}
// 建立连接
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// 定义连接成功回调函数
var on_connect = function(x) {
console.log('connect successfully');
// 发送消息
client.send(destination,head,body);
// 发送消息
client.subcribe(destination,callback);
// 默认主动 ACK,手动 ACK
client.subcribe(destination,function(message){
Message.ack();
},{ack:'client'});
// 事务支持
var tx = client.begin();
client.send(destination,head,body);
tx.commit();
};
// 定义连接失败回调函数
var on_error = function(error) {
console.log(error.headers.message);
};
// 连接消息服务器
client.connect(login, password, on_connect, on_error, '/');
destination 在 RabbitMQ Web STOM 中进行了相关的定义,根据使用场景的不同,主要有以下 4 种:
-
1./exchange/<exchangeName>
对于 SUBCRIBE frame,destination 一般为/exchange/<exchangeName>/[/pattern] 的形式。该 destination 会创建一个唯一的、自动删除的、名为<exchangeName>的 queue,并根据 pattern 将该 queue 绑定到所给的 exchange,实现对该队列的消息订阅。
对于 SEND frame,destination 一般为/exchange/<exchangeName>/[/routingKey] 的形式。这种情况下消息就会被发送到定义的 exchange 中,并且指定了 routingKey。 -
2./queue/<queueName>
对于 SUBCRIBE frame,destination 会定义<queueName>的共享 queue,并且实现对该队列的消息订阅。
对于 SEND frame,destination 只会在第一次发送消息的时候会定义<queueName>的共享 queue。该消息会被发送到默认的 exchange 中,routingKey 即为<queueName>。 -
3./amq/queue/<queueName>
这种情况下无论是 SUBCRIBE frame 还是 SEND frame 都不会产生 queue。但如果该 queue 不存在,SUBCRIBE frame 会报错。
对于 SUBCRIBE frame,destination 会实现对队列<queueName>的消息订阅。
对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列<queueName>中。 -
4./topic/<topicName>
对于 SUBCRIBE frame,destination 创建出自动删除的、非持久的 queue 并根据 routingkey 为<topicName>绑定到 amq.topic exchange 上,同时实现对该 queue 的订阅。
对于 SEND frame,消息会被发送到 amq.topic exchange 中,routingKey 为<topicName>。
运行java类
浏览器访问效果
Paste_Image.pngWebSocket 作为 HTML5 提供的新一代客户端-服务器异步通信方法,能够轻松完成前端与后台的双向通信。RabbitMQ 服务提供了一个 STOMP 插件,能够实现与 WebSocket 的桥接,这样既能够实现消息的主动推送,同时也能够实现消息的异步处理。在传统的 Web 开发中存在许多状态变更实时性的需求,比如资源被占用后需要广播它的实时状态,利用本文提出的解决方案,可以方便将其推送到所有监听的客户端。因此在新 J2EE 开发项目中,建议使用本文提出的方案替代原来 ajax 轮询方法刷新状态。
实现服务器端推送的几种方式
Web 应用都是基于 HTTP 协议的请求/响应模式,无法像 TCP 协议那样保持长连接,因此 Web 应用就很难像手机那样实现实时的消息推送。就目前来看,Web 应用的消息推送方式主要有以下几种
-
Ajax 短轮询
Ajax 轮询主要通过页面端的 JS 定时异步刷新任务来实现数据的加载,但这种方式实时效果较差,而且对服务端的压力也较大。 -
长轮询
长轮询主要也是通过 Ajax 机制,但区别于传统的 Ajax 应用,长轮询的服务器端会在没有数据时阻塞请求直到有新的数据产生或者请求超时才返回,之后客户端再重新建立连接获取数据,具体实现方式见图 1 所示。但长轮询服务端会长时间地占用资源,如果消息频繁发送的话会给服务端带来较大的压力。 -
WebSocket 双向通信
WebSocket 是 HTML5 中一种新的通信协议,能够实现浏览器与服务器之间全双工通信。如果浏览器和服务端都支持 WebSocket 协议的话,该方式实现的消息推送无疑是最高效、简洁的。并且最新版本的 IE、Firefox、Chrome 等浏览器都已经支持 WebSocket 协议,Apache Tomcat 7.0.27 以后的版本也开始支持 WebSocket。
网友评论
Whoops! Lost connection to undefined,我看了下代码,应该是stomp.js报的错 msg = "Whoops! Lost connection to " + _this.ws.url; 这句。是url取不到undefined。但是url传了,求解