美文网首页
C# MQTT 工具封装

C# MQTT 工具封装

作者: Messix_1102 | 来源:发表于2023-12-26 16:32 被阅读0次

需要安装MQTTnet nugut 包,二话不说,贴代码

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System.Text;

namespace Utility
{
    /// <summary>
    /// MQTT 服务代理
    /// </summary>
    public class MQTTProxy
    {
        /// <summary>
        /// MQTT 客户端
        /// </summary>
        private IMqttClient _client { get; set; }
        /// <summary>
        /// MQTT 连接选项
        /// </summary>
        private MqttClientOptions _mqttClientOptions { get; set; }
        /// <summary>
        /// 订阅主题
        /// </summary>
        private HashSet<string> _subscribeTopics { get; set; } = new HashSet<string>();
        /// <summary>
        /// 消息处理程序
        /// </summary>
        private Func<MqttApplicationMessageReceivedEventArgs, Task> _messageHandler { get; set; }
        /// <summary>
        /// 重新连接次数
        /// </summary>
        private int reconnectedTimes = 1;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="host"></param>
        /// <param name="port"></param>
        /// <param name="username"></param>
        /// <param name="password"></param>
        /// <param name="clientId"></param>
        public MQTTProxy(
            string host,
            int port,
            string username,
            string password,
            string clientId)
        {
            try
            {
                // 初始化连接参数
                _mqttClientOptions = new MqttClientOptionsBuilder()
                    .WithTcpServer(host, port)
                    .WithCredentials(username, password)
                    .WithClientId(clientId)
                    // 连接断开, 会话不删除
                    // 意味着会话队列里的数据还在其他节点上, 重新连接能接着消费
                    // 如果数据丢失无所谓就设置为true
                    .WithCleanSession(false) 
                    .Build();
                // 创建连接
                MqttFactory factory = new MqttFactory();
                _client = factory.CreateMqttClient();
                _client.ConnectAsync(_mqttClientOptions).Wait();
                // 断线自动重连
                _client.DisconnectedAsync += Reconnect;
            }
            catch
            {
                throw new Exception("MQTT 连接失败, 请检查连接信息");
            }
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="messageBody"></param>
        /// <returns></returns>
        public async Task<string> Publish(string topic, string messageBody)
        {
            try
            {
                MqttApplicationMessage message = new MqttApplicationMessageBuilder()
                    .WithTopic(topic)
                    // AtLeastOnce 至少发成功一次
                    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                    .WithRetainFlag(false)
                    .WithPayload(messageBody)
                    .Build();
                await _client.PublishAsync(message);
                return string.Empty;
            }
            catch 
            {
                return "消息发送失败";
            }
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public string Subscribe(string topic, Func<MqttApplicationMessageReceivedEventArgs, Task> callback)
        {
            try
            {
                _subscribeTopics.Add(topic);
                _messageHandler = callback;

                // 添加消息处理函数
                if (callback != null)
                {
                    _client.ApplicationMessageReceivedAsync += callback;
                }
                else
                {
                    _client.ApplicationMessageReceivedAsync += MessageHandler;
                }
                // AtLeastOnce 直到订阅成功一次
                _client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
                return string.Empty;
            }
            catch
            {
                return "订阅失败";
            }
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public string MultipSubscribe(List<string> topics, Func<MqttApplicationMessageReceivedEventArgs, Task> callback)
        {
            try
            {
                _messageHandler = callback;

                // 添加消息处理函数
                if (callback != null)
                {
                    _client.ApplicationMessageReceivedAsync += callback;
                }
                else
                {
                    _client.ApplicationMessageReceivedAsync += MessageHandler;
                }
                foreach(string topic in topics)
                {
                    _subscribeTopics.Add(topic);
                    _client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
                }
                return string.Empty;
            }
            catch
            {
                return "订阅失败";
            }
        }

        /// <summary>
        /// 默认消息处理程序
        /// </summary>
        /// <param name="e"></param>
        /// <returns></returns>
        private async Task MessageHandler(MqttApplicationMessageReceivedEventArgs e)
        {
            Thread.Sleep(2000);
            await Task.Run(() =>
            {
                Console.WriteLine(e.ApplicationMessage.Topic);
                Console.WriteLine(Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment));
            });
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        public async Task UnSubscribe(string topic)
        {
            await _client.UnsubscribeAsync(topic);
            _subscribeTopics.Remove(topic);
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        public async Task UnSubscribeAll()
        {
            foreach(string topic in _subscribeTopics)
            {
                await _client.UnsubscribeAsync(topic);
            }
            _subscribeTopics.Clear();
        }

        /// <summary>
        /// 断线重连
        /// </summary>
        /// <param name="e"></param>
        /// <returns></returns>
        private async Task Reconnect(MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("重新连接被调用");
            if (_mqttClientOptions == null || _client.IsConnected)
            {
                return;
            }
            await Task.Run(() =>
            {
                try
                {
                    _client.ConnectAsync(_mqttClientOptions).Wait();
                    if (_client.IsConnected)
                    {
                        // 连接成功重新订阅
                        if (_subscribeTopics.Count() > 0 && _messageHandler != null)
                        {
                            foreach (string topic in _subscribeTopics)
                            {
                                _client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce).GetAwaiter().GetResult();
                            }
                        }

                        reconnectedTimes = 1;
                        Console.WriteLine("重连成功");
                    }
                }
                catch 
                {
                    Console.WriteLine($"重连第{reconnectedTimes}次失败");
                    reconnectedTimes += 1;
                }
            });
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose() 
        {
            if (_client != null && _client.IsConnected)
            {
                _client.DisconnectedAsync -= Reconnect;
                _client.DisconnectAsync().Wait();
            }
            if (_client != null)
            {
                _client.Dispose();
            }
        }

        /// <summary>
        /// Finalize 函数释放资源
        /// 如果用户忘记手动调用Dispose方法, 那么在MQTTProxy被垃圾回收时会释放资源
        /// </summary>
        ~MQTTProxy()
        {
            Dispose();
        }
    }
}

相关文章

  • MQTT GUI 客户端 可视化管理工具推荐

    一款好用的 MQTT 客户端工具可以极大地提高开发者使用MQTT的效率。MQTT 客户端工具常用于建立与 MQTT...

  • mqtt.js使用

    工作中的尝试和积累 安装mqtt.js 封装类 mqtt_service.js 在页面中调用

  • MQTT、RxJava封装

    最近物联网的项目需要做推送,功能实现方式有以下几种: 轮训,我们客户端定时请求后端接口,并不是推送,缺点是耗电、耗...

  • MQTT、RxJava封装

    最近物联网的项目需要做推送,功能实现方式有以下几种: 轮训,我们客户端定时请求后端接口,并不是推送,缺点是耗电、耗...

  • 客户端与MQTT服务端通信 --- vue.js

    1.在index.html中引入mqttws31.js 2.封装mqtt通信组件 在需要通信的页面引入mqtt组件...

  • Unity 使用VisualStuido将C#脚本封装打包DLL

    为了方便开发,不破坏写好工具的封装性,我们经常讲C#脚本打包成DLL使用,操作流程如下 创建项目,注意选择Net ...

  • Mqtt X工具的Script功能使用

    介绍 MQTT X 是一款开源的MQTT 5.0 桌面测试客户端,相比其他mqtt的桌面客户端工具,比如paho、...

  • MQTT Illustrated - The QoS Level

    本系列文章以当前普遍使用的 MQTT 版本 3.1.1 为例,结合 wireshark 工具,详细解析 MQTT ...

  • MQTT协议解析系列文章之一: QoS Level

    本系列文章以当前普遍使用的 MQTT 版本 3.1.1 为例,结合 wireshark 工具,详细解析 MQTT ...

  • MQTT 测试

    MQTT 测试工具 mqtt.fx 最新版版本的收费。http://www.jensd.de/apps/mqttf...

网友评论

      本文标题:C# MQTT 工具封装

      本文链接:https://www.haomeiwen.com/subject/gzvlndtx.html