需要安装MQTTnet nugut 包,二话不说,贴代码
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Text;
namespace Utility
{
/// <summary>
/// MQTT 服务代理
/// </summary>
public class MQTTProxy
{
/// <summary>
/// MQTT 客户端
/// </summary>
private IMqttClient _client { get; set; }
/// <summary>
/// MQTT 连接选项
/// </summary>
private MqttClientOptions _mqttClientOptions { get; set; }
/// <summary>
/// 订阅主题
/// </summary>
private HashSet<string> _subscribeTopics { get; set; } = new HashSet<string>();
/// <summary>
/// 消息处理程序
/// </summary>
private Func<MqttApplicationMessageReceivedEventArgs, Task> _messageHandler { get; set; }
/// <summary>
/// 重新连接次数
/// </summary>
private int reconnectedTimes = 1;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="username"></param>
/// <param name="password"></param>
/// <param name="clientId"></param>
public MQTTProxy(
string host,
int port,
string username,
string password,
string clientId)
{
try
{
// 初始化连接参数
_mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(host, port)
.WithCredentials(username, password)
.WithClientId(clientId)
// 连接断开, 会话不删除
// 意味着会话队列里的数据还在其他节点上, 重新连接能接着消费
// 如果数据丢失无所谓就设置为true
.WithCleanSession(false)
.Build();
// 创建连接
MqttFactory factory = new MqttFactory();
_client = factory.CreateMqttClient();
_client.ConnectAsync(_mqttClientOptions).Wait();
// 断线自动重连
_client.DisconnectedAsync += Reconnect;
}
catch
{
throw new Exception("MQTT 连接失败, 请检查连接信息");
}
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="topic"></param>
/// <param name="messageBody"></param>
/// <returns></returns>
public async Task<string> Publish(string topic, string messageBody)
{
try
{
MqttApplicationMessage message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
// AtLeastOnce 至少发成功一次
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(false)
.WithPayload(messageBody)
.Build();
await _client.PublishAsync(message);
return string.Empty;
}
catch
{
return "消息发送失败";
}
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public string Subscribe(string topic, Func<MqttApplicationMessageReceivedEventArgs, Task> callback)
{
try
{
_subscribeTopics.Add(topic);
_messageHandler = callback;
// 添加消息处理函数
if (callback != null)
{
_client.ApplicationMessageReceivedAsync += callback;
}
else
{
_client.ApplicationMessageReceivedAsync += MessageHandler;
}
// AtLeastOnce 直到订阅成功一次
_client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
return string.Empty;
}
catch
{
return "订阅失败";
}
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public string MultipSubscribe(List<string> topics, Func<MqttApplicationMessageReceivedEventArgs, Task> callback)
{
try
{
_messageHandler = callback;
// 添加消息处理函数
if (callback != null)
{
_client.ApplicationMessageReceivedAsync += callback;
}
else
{
_client.ApplicationMessageReceivedAsync += MessageHandler;
}
foreach(string topic in topics)
{
_subscribeTopics.Add(topic);
_client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
}
return string.Empty;
}
catch
{
return "订阅失败";
}
}
/// <summary>
/// 默认消息处理程序
/// </summary>
/// <param name="e"></param>
/// <returns></returns>
private async Task MessageHandler(MqttApplicationMessageReceivedEventArgs e)
{
Thread.Sleep(2000);
await Task.Run(() =>
{
Console.WriteLine(e.ApplicationMessage.Topic);
Console.WriteLine(Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment));
});
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="topic"></param>
public async Task UnSubscribe(string topic)
{
await _client.UnsubscribeAsync(topic);
_subscribeTopics.Remove(topic);
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="topic"></param>
public async Task UnSubscribeAll()
{
foreach(string topic in _subscribeTopics)
{
await _client.UnsubscribeAsync(topic);
}
_subscribeTopics.Clear();
}
/// <summary>
/// 断线重连
/// </summary>
/// <param name="e"></param>
/// <returns></returns>
private async Task Reconnect(MqttClientDisconnectedEventArgs e)
{
Console.WriteLine("重新连接被调用");
if (_mqttClientOptions == null || _client.IsConnected)
{
return;
}
await Task.Run(() =>
{
try
{
_client.ConnectAsync(_mqttClientOptions).Wait();
if (_client.IsConnected)
{
// 连接成功重新订阅
if (_subscribeTopics.Count() > 0 && _messageHandler != null)
{
foreach (string topic in _subscribeTopics)
{
_client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
}
}
reconnectedTimes = 1;
Console.WriteLine("重连成功");
}
}
catch
{
Console.WriteLine($"重连第{reconnectedTimes}次失败");
reconnectedTimes += 1;
}
});
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (_client != null && _client.IsConnected)
{
_client.DisconnectedAsync -= Reconnect;
_client.DisconnectAsync().Wait();
}
if (_client != null)
{
_client.Dispose();
}
}
/// <summary>
/// Finalize 函数释放资源
/// 如果用户忘记手动调用Dispose方法, 那么在MQTTProxy被垃圾回收时会释放资源
/// </summary>
~MQTTProxy()
{
Dispose();
}
}
}
网友评论