.NETCore Web API 框架结合 MQTTNet实现发布和订阅 EMQ

作者: admin | 创建时间: 2025-02-19 16:35:25 文章分类: MQTT协议

文章简介: 以下是一个使用 .NET Web API 框架结合 MQTTNet(版本 5.0.1.1416)实现发布和订阅 EMQ 的程序示例。我们将创建一个名为 MqttWebApi 的 .NET Web API 项目,该项目允许通过接口进行 MQTT 消息的发布和订阅操作。

一:创建项目

首先,打开命令行工具,执行以下命令创建一个新的 .NET Web API 项目:

dotnet new webapi -n MqttWebApi
cd MqttWebApi

二:安装 MQTTNet 库

使用 NuGet 包管理器安装 MQTTNet 库(版本 5.0.1.1416):

dotnet add package MQTTNet --version 5.0.1.1416

三:编写代码

1.创建一个service类MqttService.cs

创建一个名为 MqttService.cs 的类,用于处理 MQTT 客户端的连接、发布、订阅和取消订阅操作:

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using MQTTnet;
using MQTTnet.Protocol;
using MqttWebApi.Dto;
using MqttWebApi.Utils;

namespace MqttWebApi.Services
{
    public class MqttService
    {
        private readonly IMqttClient _mqttClient;
        private readonly MqttClientOptions _mqttClientOptions;

        private readonly MqttClientFactory _mqttFactory;

        public MqttService(IOptions<MqttSettings> mqttSettings)
        {
            _mqttFactory = new MqttClientFactory();
            _mqttClient = _mqttFactory.CreateMqttClient();

            var settings = mqttSettings.Value;
            _mqttClientOptions = new MqttClientOptionsBuilder()
               .WithTcpServer(settings.ServerAddress, settings.Port)
               .WithClientId(settings.ClientId)
               .WithCredentials(settings.Username, settings.Password)
               .Build();

            _mqttClient.ConnectedAsync += async e =>
            {
                Console.WriteLine("Connected to MQTT broker.");
            };

            _mqttClient.DisconnectedAsync += async e =>
            {
                Console.WriteLine("Disconnected from MQTT broker.");
            };

            _mqttClient.ApplicationMessageReceivedAsync += e =>
            {
                var message = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
                var topic = e.ApplicationMessage.Topic;
                Console.WriteLine($"Received message on topic '{topic}': {message}");
                return Task.CompletedTask;
            };
        }

        public async Task ConnectAsync()
        {
            if (!_mqttClient.IsConnected)
            {
                await _mqttClient.ConnectAsync(_mqttClientOptions);
            }
        }

        public async Task SubscribeAsync(string topic)
        {
            var subscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
               .WithTopicFilter(f => f.WithTopic(topic))
               .Build();

            await _mqttClient.SubscribeAsync(subscribeOptions);
            Console.WriteLine($"Subscribed to topic: {topic}");
        }

        public async Task PublishAsync(string topic, string message)
        {
            if (_mqttClient.IsConnected)
            {
                var applicationMessage = new MqttApplicationMessageBuilder()
                   .WithTopic(topic)
                   .WithPayload(message)
                   .Build();
                await _mqttClient.PublishAsync(applicationMessage);
            }
        }

        public async Task UnsubscribeAsync(string topic)
        {
            var unsubscribeOptions = _mqttFactory.CreateUnsubscribeOptionsBuilder()
               .WithTopicFilter(topic)
               .Build();

            await _mqttClient.UnsubscribeAsync(unsubscribeOptions);
            Console.WriteLine($"Unsubscribed from topic: {topic}");
        }

        public async Task DisconnectAsync()
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.DisconnectAsync();
            }
        }
    }
}

2.在appsettings.json中配置MQTT连接相关参数

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "MqttSettings": {
    "ServerAddress": "MQTT的服务器地址",
    "Port": 1883,
    "ClientId": "WebApiMqttClient",
    "Username": "admin",
    "Password": "public"
  }
}

3.编写一个配置参数的dto用来存储配置的参数

namespace MqttWebApi.Dto
{
    public class MqttSettings
    {
        public string ServerAddress { get; set; }
        public int Port { get; set; }
        public string ClientId { get; set; }
        public string Username { get; set; }
        public string Password { get; set; }

        /// <summary>
        /// 是否要把数据发送给另一个端口 0:否 1:是
        /// </summary>
        public string RequestFlage { get; set; }
        /// <summary>
        ///  请求地址
        /// </summary>
        public string RequestUrl { get; set; }
    }
}

4.在program.cs中注册 MqttService和MqttSettings等相关服务

using MqttWebApi.Dto;
using MqttWebApi.Services;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// 读取配置信息
var mqttSettings = builder.Configuration.GetSection("MqttSettings").Get<MqttSettings>();

// 注册服务
builder.Services.Configure<MqttSettings>(builder.Configuration.GetSection("MqttSettings"));
// 添加 MqttService 到依赖注入容器
builder.Services.AddSingleton<MqttService>();

// 添加控制器
builder.Services.AddControllers();

var app = builder.Build();

// 连接到 MQTT 服务器
var mqttService = app.Services.GetRequiredService<MqttService>();
await mqttService.ConnectAsync();

// 配置中间件
if (app.Environment.IsDevelopment())
{
    app.UseDeveloperExceptionPage();
}
app.UseRouting();



// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

5.编写一个dot作为返回的JSON实体

namespace MqttWebApi.Dto
{
    public class ApiResponse
    {
        public int Code { get; set; }
        public string Message { get; set; }
        public object Data { get; set; }

        public ApiResponse(int code, string message, object data = null)
        {
            Code = code;
            Message = message;
            Data = data;
        }
    }
}

6.编写webapi调用的Control

创建一个名为 MqttController.cs 的控制器,用于处理发布和订阅的接口请求:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using MqttWebApi.Dto;
using MqttWebApi.Services;

namespace MqttWebApi.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class MqttController : ControllerBase
    {
        private readonly MqttService _mqttService;

        public MqttController(MqttService mqttService)
        {
            _mqttService = mqttService;
        }

        [HttpPost("publish")]
        public async Task<IActionResult> Publish([FromQuery] string topic, [FromBody] string message)
        {
            try
            {
                await _mqttService.PublishAsync(topic, message);
                return Ok(new ApiResponse(200, "Message published successfully", null));
            }
            catch (Exception ex)
            {
                return BadRequest(new ApiResponse(400, $"Failed to publish message: {ex.Message}", null));
            }
        }

        [HttpPost("subscribe")]
        public async Task<IActionResult> Subscribe([FromQuery] string topic)
        {
            try
            {
                await _mqttService.SubscribeAsync(topic);
                return Ok(new ApiResponse(200, "Subscribed to topic successfully", null));
            }
            catch (Exception ex)
            {
                return BadRequest(new ApiResponse(400, $"Failed to subscribe to topic: {ex.Message}", null));
            }
        }


        [HttpPost("unsubscribe")]
        public async Task<IActionResult> Unsubscribe([FromQuery] string topic)
        {
            try
            {
                await _mqttService.UnsubscribeAsync(topic);
                return Ok(new ApiResponse(200, "Unsubscribed from topic successfully", null));
            }
            catch (Exception ex)
            {
                return BadRequest(new ApiResponse(400, $"Failed to unsubscribe from topic: {ex.Message}", null));
            }
        }
    }
}

四.测试接口

可以使用 Postman 或其他工具来测试接口比如swagger:http://localhost:5058/swagger/index.html

1.发布消息

发送一个 POST 请求到 http://localhost:5000/mqtt/publish?topic=demo 请求体为要发布的消息内容比如:“发布订阅”。

发布订阅

订阅主题

发送一个 POST 请求到 http://localhost:5000/mqtt/subscribe?topic=demo

订阅

运行结果

订阅运行结果

注意事项

  • 请确保 EMQ 服务器正在运行,并且地址和端口正确。
  • 本示例使用了同步等待(GetAwaiter().GetResult())来确保 MQTT 客户端在应用程序启动时连接成功,在实际生产环境中建议使用更优雅的异步初始化方式。

通过以上步骤,你就可以实现一个简单的 .NET Web API 框架来发布和订阅 EMQ 消息。

评论

目录

    关闭