zoukankan      html  css  js  c++  java
  • mqttnet3.0用法

    .net常用的mqtt类库有2个,m2mqtt和mqttnet两个类库

    当然了,这两个库的教程网上一搜一大把

    但mqttnet搜到的教程全是2.7及以下版本的,但3.0版语法却不再兼容,升级版本会导致很多问题,今天进行了成功的升级,现记录下来

    参考文档地址:https://github.com/chkr1011/MQTTnet/wiki/Client

    上代码:

      1 ///开源库地址:https://github.com/chkr1011/MQTTnet
      2 ///对应文档:https://github.com/chkr1011/MQTTnet/wiki/Client
      3 
      4 using MQTTnet;
      5 using MQTTnet.Client;
      6 using MQTTnet.Client.Options;
      7 using System;
      8 using System.Text;
      9 using System.Threading;
     10 using System.Threading.Tasks;
     11 using System.Windows.Forms;
     12 
     13 namespace MqttServerTest
     14 {
     15     public partial class mqtt测试工具 : Form
     16     {
     17         private IMqttClient mqttClient = null;
     18         private bool isReconnect = true;
     19 
     20         public mqtt测试工具()
     21         {
     22             InitializeComponent();
     23         }
     24 
     25         private void Form1_Load(object sender, EventArgs e)
     26         {
     27 
     28         }
     29 
     30         private async void BtnPublish_Click(object sender, EventArgs e)
     31         {
     32             await Publish();
     33         }
     34 
     35         private async void BtnSubscribe_ClickAsync(object sender, EventArgs e)
     36         {
     37             await Subscribe();
     38         }
     39 
     40         private async Task Publish()
     41         {
     42             string topic = txtPubTopic.Text.Trim();
     43 
     44             if (string.IsNullOrEmpty(topic))
     45             {
     46                 MessageBox.Show("发布主题不能为空!");
     47                 return;
     48             }
     49 
     50             string inputString = txtSendMessage.Text.Trim();
     51             try
     52             {
     53 
     54                 var message = new MqttApplicationMessageBuilder()
     55         .WithTopic(topic)
     56         .WithPayload(inputString)
     57         .WithExactlyOnceQoS()
     58         .WithRetainFlag()
     59         .Build();
     60 
     61                 await mqttClient.PublishAsync(message);
     62             }
     63             catch (Exception ex)
     64             {
     65 
     66                 Invoke((new Action(() =>
     67                 {
     68                     txtReceiveMessage.AppendText($"发布主题失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
     69                 })));
     70             }
     71 
     72 
     73 
     74 
     75         }
     76 
     77         private async Task Subscribe()
     78         {
     79             string topic = txtSubTopic.Text.Trim();
     80 
     81             if (string.IsNullOrEmpty(topic))
     82             {
     83                 MessageBox.Show("订阅主题不能为空!");
     84                 return;
     85             }
     86 
     87             if (!mqttClient.IsConnected)
     88             {
     89                 MessageBox.Show("MQTT客户端尚未连接!");
     90                 return;
     91             }
     92 
     93             // Subscribe to a topic
     94             await mqttClient.SubscribeAsync(new TopicFilterBuilder()
     95                 .WithTopic(topic)
     96                 .WithAtMostOnceQoS()
     97                 .Build()
     98                 );
     99             Invoke((new Action(() =>
    100             {
    101                 txtReceiveMessage.AppendText($"已订阅[{topic}]主题{Environment.NewLine}");
    102             })));
    103 
    104         }
    105 
    106         private async Task ConnectMqttServerAsync()
    107         {
    108             // Create a new MQTT client.
    109 
    110             if (mqttClient == null)
    111             {
    112                 try
    113                 {
    114                     var factory = new MqttFactory();
    115                     mqttClient = factory.CreateMqttClient();
    116 
    117                     var options = new MqttClientOptionsBuilder()
    118                         .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)).WithCredentials(txtUsername.Text, txtPsw.Text).WithClientId(txtClientId.Text) // Port is optional
    119                         .Build();
    120 
    121 
    122                     await mqttClient.ConnectAsync(options, CancellationToken.None);
    123                     Invoke((new Action(() =>
    124                     {
    125                         txtReceiveMessage.AppendText($"连接到MQTT服务器成功!" + txtIp.Text);
    126                     })));
    127                     mqttClient.UseApplicationMessageReceivedHandler(e =>
    128                     {
    129 
    130                         Invoke((new Action(() =>
    131                         {
    132                             txtReceiveMessage.AppendText($"收到订阅消息!" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
    133                         })));
    134                    
    135                     });
    136                 }
    137                 catch (Exception ex)
    138                 {
    139 
    140                     Invoke((new Action(() =>
    141                     {
    142                         txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
    143                     })));
    144                 }
    145             }
    146         }
    147 
    148         private void MqttClient_Connected(object sender, EventArgs e)
    149         {
    150             Invoke((new Action(() =>
    151             {
    152                 txtReceiveMessage.Clear();
    153                 txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
    154             })));
    155         }
    156 
    157         private void MqttClient_Disconnected(object sender, EventArgs e)
    158         {
    159             Invoke((new Action(() =>
    160             {
    161                 txtReceiveMessage.Clear();
    162                 DateTime curTime = new DateTime();
    163                 curTime = DateTime.UtcNow;
    164                 txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]");
    165                 txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine);
    166             })));
    167 
    168             //Reconnecting
    169             if (isReconnect)
    170             {
    171                 Invoke((new Action(() =>
    172                 {
    173                     txtReceiveMessage.AppendText("正在尝试重新连接" + Environment.NewLine);
    174                 })));
    175 
    176                 var options = new MqttClientOptionsBuilder()
    177                     .WithClientId(txtClientId.Text)
    178                     .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text))
    179                     .WithCredentials(txtUsername.Text, txtPsw.Text)
    180                     //.WithTls()
    181                     .WithCleanSession()
    182                     .Build();
    183                 Invoke((new Action(async () =>
    184                 {
    185                     await Task.Delay(TimeSpan.FromSeconds(5));
    186                     try
    187                     {
    188                         await mqttClient.ConnectAsync(options);
    189                     }
    190                     catch
    191                     {
    192                         txtReceiveMessage.AppendText("### RECONNECTING FAILED ###" + Environment.NewLine);
    193                     }
    194                 })));
    195             }
    196             else
    197             {
    198                 Invoke((new Action(() =>
    199                 {
    200                     txtReceiveMessage.AppendText("已下线!" + Environment.NewLine);
    201                 })));
    202             }
    203         }
    204 
    205         private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
    206         {
    207             Invoke((new Action(() =>
    208             {
    209                 txtReceiveMessage.AppendText($">> {"### RECEIVED APPLICATION MESSAGE ###"}{Environment.NewLine}");
    210             })));
    211             Invoke((new Action(() =>
    212             {
    213                 txtReceiveMessage.AppendText($">> Topic = {e.ApplicationMessage.Topic}{Environment.NewLine}");
    214             })));
    215             Invoke((new Action(() =>
    216             {
    217                 txtReceiveMessage.AppendText($">> Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
    218             })));
    219             Invoke((new Action(() =>
    220             {
    221                 txtReceiveMessage.AppendText($">> QoS = {e.ApplicationMessage.QualityOfServiceLevel}{Environment.NewLine}");
    222             })));
    223             Invoke((new Action(() =>
    224             {
    225                 txtReceiveMessage.AppendText($">> Retain = {e.ApplicationMessage.Retain}{Environment.NewLine}");
    226             })));
    227         }
    228 
    229         private void btnLogIn_Click(object sender, EventArgs e)
    230         {
    231             isReconnect = true;
    232             Task.Run(async () => { await ConnectMqttServerAsync(); });
    233         }
    234 
    235         private void btnLogout_Click(object sender, EventArgs e)
    236         {
    237             isReconnect = false;
    238             Task.Run(async () => { await mqttClient.DisconnectAsync(); });
    239         }
    240 
    241     }
    242 }
  • 相关阅读:
    sprint2(第九天)
    sprint2 (第八天)
    sprint2(第七天)
    sprint2(第六天)
    sprint2(第四天)
    sprint2(第三天)
    sprint2(第二天)
    sprint 2(第一天)
    0621 第三次冲刺及课程设计
    0617 操作系统实验4 主存空间的分配和回收
  • 原文地址:https://www.cnblogs.com/bjjjunjie/p/mqtt.html
Copyright © 2011-2022 走看看