1. 项目环境搭建
首先,确保你已经安装了必要的 Python 库,包括 Flask、paho-mqtt 用于与 EMQX 进行 MQTT 通信。可以使用以下命令进行安装:
pip install flask paho-mqtt
2. 创建 Flask 应用
创建一个 app.py 文件,编写 Flask 应用代码,实现与 EMQX 的 MQTT 连接、发布、订阅和取消订阅功能。
import paho.mqtt.client as mqtt
from flask import Flask, render_template, request, jsonify
app = Flask(__name__)
# EMQX 配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "flask_mqtt_client"
MQTT_USERNAME = "your_username"
MQTT_PASSWORD = "your_password"
# 创建 MQTT 客户端
client = mqtt.Client(client_id=MQTT_CLIENT_ID)
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# 连接成功回调
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
client.on_connect = on_connect
# 连接到 MQTT 服务器
client.connect(MQTT_BROKER, MQTT_PORT)
client.loop_start()
# 首页路由
@app.route('/')
def index():
return render_template('index.html')
# 发布消息路由
@app.route('/publish', methods=['POST'])
def publish():
data = request.get_json()
topic = data.get('topic')
message = data.get('message')
if topic and message:
client.publish(topic, message)
return jsonify({"status": "success", "message": "Message published successfully"})
return jsonify({"status": "error", "message": "Missing topic or message"})
# 订阅主题路由
@app.route('/subscribe', methods=['POST'])
def subscribe():
data = request.get_json()
topic = data.get('topic')
if topic:
client.subscribe(topic)
return jsonify({"status": "success", "message": f"Subscribed to {topic}"})
return jsonify({"status": "error", "message": "Missing topic"})
# 取消订阅主题路由
@app.route('/unsubscribe', methods=['POST'])
def unsubscribe():
data = request.get_json()
topic = data.get('topic')
if topic:
client.unsubscribe(topic)
return jsonify({"status": "success", "message": f"Unsubscribed from {topic}"})
return jsonify({"status": "error", "message": "Missing topic"})
if __name__ == '__main__':
app.run(debug=True)
3. 创建 HTML 模板
在项目根目录下创建一个 templates 文件夹,并在其中创建 index.html 文件,使用 Bootstrap 构建用户界面。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MQTT Publish/Subscribe</title>
<!-- 引入 Bootstrap CSS -->
<link rel="stylesheet" href="{{ url_for('static',filename='bootstrap.min.css') }}">
</head>
<body>
<div class="container mt-5">
<h1>MQTT Publish/Subscribe</h1>
<!-- 发布消息表单 -->
<div class="mb-3">
<h2>发布消息</h2>
<label for="publish-topic" class="form-label">主题</label>
<input type="text" class="form-control" id="publish-topic" placeholder="输入主题">
<label for="publish-message" class="form-label">消息内容</label>
<input type="text" class="form-control" id="publish-message" placeholder="输入消息内容">
<button class="btn btn-primary mt-2" onclick="publishMessage()">发布</button>
</div>
<!-- 订阅主题表单 -->
<div class="mb-3">
<h2>订阅主题</h2>
<label for="subscribe-topic" class="form-label">主题</label>
<input type="text" class="form-control" id="subscribe-topic" placeholder="输入主题">
<button class="btn btn-success mt-2" onclick="subscribeTopic()">订阅</button>
</div>
<!-- 取消订阅主题表单 -->
<div class="mb-3">
<h2>取消订阅主题</h2>
<label for="unsubscribe-topic" class="form-label">主题</label>
<input type="text" class="form-control" id="unsubscribe-topic" placeholder="输入主题">
<button class="btn btn-danger mt-2" onclick="unsubscribeTopic()">取消订阅</button>
</div>
<!-- 显示接收到的消息 -->
<div id="messages" class="mt-3">
<h2>接收到的消息</h2>
<ul id="message-list"></ul>
</div>
<div id="response" class="mt-3"></div>
</div>
<!-- 引入 Bootstrap JS 和 jQuery -->
<script src="{{ url_for('static',filename='jquery-3.7.1.min.js') }}"></script>
<script src="{{ url_for('static',filename='bootstrap.bundle.min.js') }}"></script>
<script>
function publishMessage() {
const topic = $('#publish-topic').val();
const message = $('#publish-message').val();
$.ajax({
url: '/publish',
method: 'POST',
contentType: 'application/json',
data: JSON.stringify({topic: topic, message: message}),
success: function (response) {
$('#response').html(`<div class="alert alert-${response.status === 'success' ? 'success' : 'danger'}">${response.message}</div>`);
},
error: function () {
$('#response').html('<div class="alert alert-danger">请求出错</div>');
}
});
}
function subscribeTopic() {
const topic = $('#subscribe-topic').val();
$.ajax({
url: '/subscribe',
method: 'POST',
contentType: 'application/json',
data: JSON.stringify({topic: topic}),
success: function (response) {
$('#response').html(`<div class="alert alert-${response.status === 'success' ? 'success' : 'danger'}">${response.message}</div>`);
},
error: function () {
$('#response').html('<div class="alert alert-danger">请求出错</div>');
}
});
}
function unsubscribeTopic() {
const topic = $('#unsubscribe-topic').val();
$.ajax({
url: '/unsubscribe',
method: 'POST',
contentType: 'application/json',
data: JSON.stringify({topic: topic}),
success: function (response) {
$('#response').html(`<div class="alert alert-${response.status === 'success' ? 'success' : 'danger'}">${response.message}</div>`);
},
error: function () {
$('#response').html('<div class="alert alert-danger">请求出错</div>');
}
});
}
</script>
</body>
</html>
4. 运行应用
在终端中运行以下命令启动 Flask 应用:
python app.py
5. 访问应用
打开浏览器,访问 http://127.0.0.1:5000 ,你将看到一个基于 Bootstrap 的用户界面,可以在其中进行消息的发布、订阅和取消订阅操作。
6.运行结果
源码地址
注意事项
- 请将 MQTT_BROKER、MQTT_PORT、MQTT_USERNAME 和 MQTT_PASSWORD 替换为你实际的 EMQX 服务器配置。
- 确保 EMQX 服务器正在运行,并且允许客户端连接。
- 在生产环境中,建议使用 HTTPS 协议来保证通信安全,并对代码进行适当的错误处理和性能优化。
评论