最近需要用到websocket类似的长连接
使用这个插件:https://github.com/miguelgrinberg/python-socketio
有类似的监听功能
客户端安装:
pip install "python-socketio[client]"
服务端安装:
pip install python-socketio
客户端代码:
import socketio
def create_client():
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
sio.emit('client', {'foo': 'bar'})
@sio.on('serve')
def on_message(data):
print('client received a message!',data)
# @sio.event
# def message(data):
# print('message received with ', data)
# sio.emit('client', {'response': 'my response'})
@sio.event
def connect_error():
print("The connection failed!")
sio.disconnect()
@sio.event
def disconnect():
print('disconnected from server')
sio.disconnect()
sio.connect('http://localhost:5000')
sio.wait()
create_client()
服务端代码:
import eventlet
import socketio
def create_serve():
sio = socketio.Server()
app = socketio.WSGIApp(sio, static_files={
'/': {'content_type': 'text/html', 'filename': 'index.html'}
})
@sio.event
def connect(sid, environ):
print('connect ', sid)
sio.emit('serve', {'response': 'connert success'})
# @sio.on('client')
# def on_message(sid, data):
# print('serve received a message!111', data)
@sio.on('client')
def another_event(sid, data):
print('serve received a message!', data)
# @sio.event
# def my_event(sid, data):
# print('message ', data)
# sio.emit('serve', {'response': 'connert success'})
@sio.event
def disconnect(sid):
print('disconnect ', sid)
if __name__ == '__main__':
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
create_serve()
详情看官方文档:
https://python-socketio.readthedocs.io/en/latest/
end
————————————————————————————————————————————
自用连接:
https://python-socketio.readthedocs.io/en/latest/client.html
https://python-socketio.readthedocs.io/en/latest/server.html
网友评论