背景: 团队中有同学实现长链接的时候,直接在主线程中进行发送消息和派发消息,导致消息延迟很严重
方案:应该在子线程中进行耗时操作
需求:
实现一个优雅的长链接功能,要求用系统自带的socket,要求有断线重连功能,要求有发送消息的函数,和注册监听消息的函数,要求发送消息的时候,先将消息放入一个队列中缓冲,然后有一个子线程事件循环,不断的检测发送队列是否有消息需要发送,有则发送,没有则休眠等待,直到消息队列有新的消息唤醒发送消息的子线程事件循环;要求收到消息后,将消息缓冲到一个派发队列进行缓冲,然后有一个派发消息的子线程事件循环,不断的检测派发队列是否有消息需要派发,有则取出消息,进行处理,处理完毕获取在主线程中获取注册的监听消息函数进行派发,没有消息则休眠,直到派发队列有消息开始唤醒派发线程。
Demo:
实现一个基于系统自带的 Socket 的优雅长连接功能,包括断线重连、发送消息、注册监听消息等功能,并通过队列和线程循环来实现异步消息的处理
import 'dart:io';
import 'dart:convert';
import 'dart:async';
void main() {
final client = MySocketClient();
client.start();
}
class MySocketClient {
static const String host = 'your_server_host';
static const int port = your_server_port;
late Socket _socket;
bool _isConnected = false;
bool _isConnecting = false;
bool _shouldReconnect = true;
Queue<String> _sendQueue = Queue();
Queue<String> _receiveQueue = Queue();
MySocketClient() {
_init();
}
Future<void> _init() async {
await _connect();
_listenForInput();
_listenForOutput();
}
Future<void> _connect() async {
if (_isConnected || _isConnecting) {
return;
}
_isConnecting = true;
try {
_socket = await Socket.connect(host, port);
_isConnected = true;
_isConnecting = false;
print('Connected to server');
// Start the output loop
_outputLoop();
} catch (e) {
print('Connection failed: $e');
_isConnected = false;
_isConnecting = false;
if (_shouldReconnect) {
await Future.delayed(Duration(seconds: 5));
await _connect();
}
}
}
void _listenForInput() {
_socket.listen(
(List<int> data) {
final message = utf8.decode(data);
print('Received: $message');
_receiveQueue.add(message);
},
onError: (error) {
print('Error: $error');
_socket.close();
_isConnected = false;
if (_shouldReconnect) {
_connect();
}
},
onDone: () {
print('Connection closed by server');
_socket.close();
_isConnected = false;
if (_shouldReconnect) {
_connect();
}
},
);
}
void _listenForOutput() {
// Start the input loop in a new Isolate
_startIsolate(_inputLoop);
// Start the output loop in a new Isolate
_startIsolate(_outputLoop);
}
void _startIsolate(void Function() loopFunction) {
Isolate.spawn(loopFunction, null);
}
void _inputLoop() {
if (_isConnected) {
if (_sendQueue.isNotEmpty) {
final message = _sendQueue.removeFirst();
_socket.write(message);
}
Future.delayed(Duration(milliseconds: 100), _inputLoop);
} else {
Future.delayed(Duration(seconds: 1), _inputLoop);
}
}
void _outputLoop() {
if (_isConnected) {
if (_receiveQueue.isNotEmpty) {
final message = _receiveQueue.removeFirst();
// Process the received message here
// Dispatch to registered listeners
// Simulating a delay for processing
Future.delayed(Duration(milliseconds: 500), () {
print('Processed: $message');
});
}
Future.delayed(Duration(milliseconds: 100), _outputLoop);
} else {
Future.delayed(Duration(seconds: 1), _outputLoop);
}
}
void sendMessage(String message) {
_sendQueue.add(message);
}
void stop() {
_shouldReconnect = false;
_socket.close();
}
void start() {
_shouldReconnect = true;
_connect();
}
}
网友评论