服务器主动推送消息
server.py
import time
from flask import Flask, render_template, jsonify
from flask_sse import sse
from flask_cors import CORS
app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix="/stream")
CORS(app, resources={r"/stream/*": {"origins": "*"}})
@app.route('/')
def index():
return render_template("index.html")
@app.route('/test')
def test():
i=0
while i < 22:
sse.publish({"message": {"name": "dzg", "age": i}}, type="sseTest")
time.sleep(0.31)
i+=1
return jsonify({"code":200})
@app.route('/test1')
def test1():
i=0
while i < 22:
sse.publish({"message": {"name": "dzg", "age": i*i}}, type="sseTest1")
time.sleep(0.31)
i+=1
return jsonify({"code":200})
if __name__ == '__main__':
app.run(host="0.0.0.0")
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<title>flask-sse test</title>
</head>
<body>
<h1>progress:<span id="a"></span></h1>
<h1>progress:<span id="b"></span></h1>
</body>
<script>
// 创建 EventSource 对象连接服务器
var source = new EventSource("/stream")
console.log(source)
// 服务器发送信息到客户端时,会触发
source.addEventListener('sseTest', function(event){
var res_data = JSON.parse(event.data);
console.log(res_data);
h = document.getElementById('b')
h.innerText= res_data.message.age
}, false);
source.addEventListener('sseTest1', function(event){
var res_data = JSON.parse(event.data);
console.log(res_data);
h = document.getElementById('a')
h.innerText= res_data.message.age
}, false);
// 连接异常时会触发 error 事件并自动重连
source.addEventListener('error', function(event) {
if (event.target.readyState === EventSource.CLOSED) {
console.log('Disconnected');
} else if (event.target.readyState === EventSource.CONNECTING) {
console.log('Connecting...');
}
}, false);
</script>
网友评论