一:创建项目
首先,打开命令行工具,执行以下命令创建一个新的 .NET Web API 项目:
dotnet new webapi -n MqttWebApi
cd MqttWebApi
二:安装 MQTTNet 库
使用 NuGet 包管理器安装 MQTTNet 库(版本 5.0.1.1416):
dotnet add package MQTTNet --version 5.0.1.1416
三:编写代码
1.创建一个service类MqttService.cs
创建一个名为 MqttService.cs 的类,用于处理 MQTT 客户端的连接、发布、订阅和取消订阅操作:
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using MQTTnet;
using MQTTnet.Protocol;
using MqttWebApi.Dto;
using MqttWebApi.Utils;
namespace MqttWebApi.Services
{
public class MqttService
{
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _mqttClientOptions;
private readonly MqttClientFactory _mqttFactory;
public MqttService(IOptions<MqttSettings> mqttSettings)
{
_mqttFactory = new MqttClientFactory();
_mqttClient = _mqttFactory.CreateMqttClient();
var settings = mqttSettings.Value;
_mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(settings.ServerAddress, settings.Port)
.WithClientId(settings.ClientId)
.WithCredentials(settings.Username, settings.Password)
.Build();
_mqttClient.ConnectedAsync += async e =>
{
Console.WriteLine("Connected to MQTT broker.");
};
_mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine("Disconnected from MQTT broker.");
};
_mqttClient.ApplicationMessageReceivedAsync += e =>
{
var message = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
var topic = e.ApplicationMessage.Topic;
Console.WriteLine($"Received message on topic '{topic}': {message}");
return Task.CompletedTask;
};
}
public async Task ConnectAsync()
{
if (!_mqttClient.IsConnected)
{
await _mqttClient.ConnectAsync(_mqttClientOptions);
}
}
public async Task SubscribeAsync(string topic)
{
var subscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topic))
.Build();
await _mqttClient.SubscribeAsync(subscribeOptions);
Console.WriteLine($"Subscribed to topic: {topic}");
}
public async Task PublishAsync(string topic, string message)
{
if (_mqttClient.IsConnected)
{
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.Build();
await _mqttClient.PublishAsync(applicationMessage);
}
}
public async Task UnsubscribeAsync(string topic)
{
var unsubscribeOptions = _mqttFactory.CreateUnsubscribeOptionsBuilder()
.WithTopicFilter(topic)
.Build();
await _mqttClient.UnsubscribeAsync(unsubscribeOptions);
Console.WriteLine($"Unsubscribed from topic: {topic}");
}
public async Task DisconnectAsync()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
}
}
}
}
2.在appsettings.json中配置MQTT连接相关参数
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"MqttSettings": {
"ServerAddress": "MQTT的服务器地址",
"Port": 1883,
"ClientId": "WebApiMqttClient",
"Username": "admin",
"Password": "public"
}
}
3.编写一个配置参数的dto用来存储配置的参数
namespace MqttWebApi.Dto
{
public class MqttSettings
{
public string ServerAddress { get; set; }
public int Port { get; set; }
public string ClientId { get; set; }
public string Username { get; set; }
public string Password { get; set; }
/// <summary>
/// 是否要把数据发送给另一个端口 0:否 1:是
/// </summary>
public string RequestFlage { get; set; }
/// <summary>
/// 请求地址
/// </summary>
public string RequestUrl { get; set; }
}
}
4.在program.cs中注册 MqttService和MqttSettings等相关服务
using MqttWebApi.Dto;
using MqttWebApi.Services;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// 读取配置信息
var mqttSettings = builder.Configuration.GetSection("MqttSettings").Get<MqttSettings>();
// 注册服务
builder.Services.Configure<MqttSettings>(builder.Configuration.GetSection("MqttSettings"));
// 添加 MqttService 到依赖注入容器
builder.Services.AddSingleton<MqttService>();
// 添加控制器
builder.Services.AddControllers();
var app = builder.Build();
// 连接到 MQTT 服务器
var mqttService = app.Services.GetRequiredService<MqttService>();
await mqttService.ConnectAsync();
// 配置中间件
if (app.Environment.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
5.编写一个dot作为返回的JSON实体
namespace MqttWebApi.Dto
{
public class ApiResponse
{
public int Code { get; set; }
public string Message { get; set; }
public object Data { get; set; }
public ApiResponse(int code, string message, object data = null)
{
Code = code;
Message = message;
Data = data;
}
}
}
6.编写webapi调用的Control
创建一个名为 MqttController.cs 的控制器,用于处理发布和订阅的接口请求:
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using MqttWebApi.Dto;
using MqttWebApi.Services;
namespace MqttWebApi.Controllers
{
[ApiController]
[Route("[controller]")]
public class MqttController : ControllerBase
{
private readonly MqttService _mqttService;
public MqttController(MqttService mqttService)
{
_mqttService = mqttService;
}
[HttpPost("publish")]
public async Task<IActionResult> Publish([FromQuery] string topic, [FromBody] string message)
{
try
{
await _mqttService.PublishAsync(topic, message);
return Ok(new ApiResponse(200, "Message published successfully", null));
}
catch (Exception ex)
{
return BadRequest(new ApiResponse(400, $"Failed to publish message: {ex.Message}", null));
}
}
[HttpPost("subscribe")]
public async Task<IActionResult> Subscribe([FromQuery] string topic)
{
try
{
await _mqttService.SubscribeAsync(topic);
return Ok(new ApiResponse(200, "Subscribed to topic successfully", null));
}
catch (Exception ex)
{
return BadRequest(new ApiResponse(400, $"Failed to subscribe to topic: {ex.Message}", null));
}
}
[HttpPost("unsubscribe")]
public async Task<IActionResult> Unsubscribe([FromQuery] string topic)
{
try
{
await _mqttService.UnsubscribeAsync(topic);
return Ok(new ApiResponse(200, "Unsubscribed from topic successfully", null));
}
catch (Exception ex)
{
return BadRequest(new ApiResponse(400, $"Failed to unsubscribe from topic: {ex.Message}", null));
}
}
}
}
四.测试接口
可以使用 Postman 或其他工具来测试接口比如swagger:http://localhost:5058/swagger/index.html
1.发布消息
发送一个 POST 请求到 http://localhost:5000/mqtt/publish?topic=demo 请求体为要发布的消息内容比如:“发布订阅”。

订阅主题
发送一个 POST 请求到 http://localhost:5000/mqtt/subscribe?topic=demo

运行结果

注意事项
- 请确保 EMQ 服务器正在运行,并且地址和端口正确。
- 本示例使用了同步等待(GetAwaiter().GetResult())来确保 MQTT 客户端在应用程序启动时连接成功,在实际生产环境中建议使用更优雅的异步初始化方式。
通过以上步骤,你就可以实现一个简单的 .NET Web API 框架来发布和订阅 EMQ 消息。
评论