Python开发MQTT协议订阅和发布EMQX

作者: admin | 创建时间: 2025-02-20 14:54:37 文章分类: MQTT协议

文章简介: EMQX 是一款开源的大规模分布式 MQTT 消息服务器,功能丰富,专为物联网和实时通信应用而设计。要实现一个基于 Flask 和 Bootstrap 的 Web 应用,用于对 EMQ X(现 EMQX)进行消息的发布、订阅和取消订阅操作,可以按照以下步骤进行:

1. 项目环境搭建

首先,确保你已经安装了必要的 Python 库,包括 Flask、paho-mqtt 用于与 EMQX 进行 MQTT 通信。可以使用以下命令进行安装:

pip install flask paho-mqtt

2. 创建 Flask 应用

创建一个 app.py 文件,编写 Flask 应用代码,实现与 EMQX 的 MQTT 连接、发布、订阅和取消订阅功能。

import paho.mqtt.client as mqtt
from flask import Flask, render_template, request, jsonify

app = Flask(__name__)

# EMQX 配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "flask_mqtt_client"
MQTT_USERNAME = "your_username"
MQTT_PASSWORD = "your_password"

# 创建 MQTT 客户端
client = mqtt.Client(client_id=MQTT_CLIENT_ID)
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)

# 连接成功回调
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
    else:
        print(f"Failed to connect, return code {rc}")

client.on_connect = on_connect

# 连接到 MQTT 服务器
client.connect(MQTT_BROKER, MQTT_PORT)
client.loop_start()

# 首页路由
@app.route('/')
def index():
    return render_template('index.html')

# 发布消息路由
@app.route('/publish', methods=['POST'])
def publish():
    data = request.get_json()
    topic = data.get('topic')
    message = data.get('message')
    if topic and message:
        client.publish(topic, message)
        return jsonify({"status": "success", "message": "Message published successfully"})
    return jsonify({"status": "error", "message": "Missing topic or message"})

# 订阅主题路由
@app.route('/subscribe', methods=['POST'])
def subscribe():
    data = request.get_json()
    topic = data.get('topic')
    if topic:
        client.subscribe(topic)
        return jsonify({"status": "success", "message": f"Subscribed to {topic}"})
    return jsonify({"status": "error", "message": "Missing topic"})

# 取消订阅主题路由
@app.route('/unsubscribe', methods=['POST'])
def unsubscribe():
    data = request.get_json()
    topic = data.get('topic')
    if topic:
        client.unsubscribe(topic)
        return jsonify({"status": "success", "message": f"Unsubscribed from {topic}"})
    return jsonify({"status": "error", "message": "Missing topic"})

if __name__ == '__main__':
    app.run(debug=True)

3. 创建 HTML 模板

在项目根目录下创建一个 templates 文件夹,并在其中创建 index.html 文件,使用 Bootstrap 构建用户界面。

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MQTT Publish/Subscribe</title>
    <!-- 引入 Bootstrap CSS -->
    <link rel="stylesheet" href="{{ url_for('static',filename='bootstrap.min.css') }}">
</head>

<body>
<div class="container mt-5">
    <h1>MQTT Publish/Subscribe</h1>
    <!-- 发布消息表单 -->
    <div class="mb-3">
        <h2>发布消息</h2>
        <label for="publish-topic" class="form-label">主题</label>
        <input type="text" class="form-control" id="publish-topic" placeholder="输入主题">
        <label for="publish-message" class="form-label">消息内容</label>
        <input type="text" class="form-control" id="publish-message" placeholder="输入消息内容">
        <button class="btn btn-primary mt-2" onclick="publishMessage()">发布</button>
    </div>
    <!-- 订阅主题表单 -->
    <div class="mb-3">
        <h2>订阅主题</h2>
        <label for="subscribe-topic" class="form-label">主题</label>
        <input type="text" class="form-control" id="subscribe-topic" placeholder="输入主题">
        <button class="btn btn-success mt-2" onclick="subscribeTopic()">订阅</button>
    </div>
    <!-- 取消订阅主题表单 -->
    <div class="mb-3">
        <h2>取消订阅主题</h2>
        <label for="unsubscribe-topic" class="form-label">主题</label>
        <input type="text" class="form-control" id="unsubscribe-topic" placeholder="输入主题">
        <button class="btn btn-danger mt-2" onclick="unsubscribeTopic()">取消订阅</button>
    </div>
    <!-- 显示接收到的消息 -->
    <div id="messages" class="mt-3">
        <h2>接收到的消息</h2>
        <ul id="message-list"></ul>
    </div>
    <div id="response" class="mt-3"></div>
</div>
<!-- 引入 Bootstrap JS 和 jQuery -->
<script src="{{ url_for('static',filename='jquery-3.7.1.min.js') }}"></script>
<script src="{{ url_for('static',filename='bootstrap.bundle.min.js') }}"></script>

<script>

    function publishMessage() {
        const topic = $('#publish-topic').val();
        const message = $('#publish-message').val();
        $.ajax({
            url: '/publish',
            method: 'POST',
            contentType: 'application/json',
            data: JSON.stringify({topic: topic, message: message}),
            success: function (response) {
                $('#response').html(`<div class="alert alert-${response.status === 'success' ? 'success' : 'danger'}">${response.message}</div>`);
            },
            error: function () {
                $('#response').html('<div class="alert alert-danger">请求出错</div>');
            }
        });
    }

    function subscribeTopic() {
        const topic = $('#subscribe-topic').val();
        $.ajax({
            url: '/subscribe',
            method: 'POST',
            contentType: 'application/json',
            data: JSON.stringify({topic: topic}),
            success: function (response) {
                $('#response').html(`<div class="alert alert-${response.status === 'success' ? 'success' : 'danger'}">${response.message}</div>`);
            },
            error: function () {
                $('#response').html('<div class="alert alert-danger">请求出错</div>');
            }
        });
    }

    function unsubscribeTopic() {
        const topic = $('#unsubscribe-topic').val();
        $.ajax({
            url: '/unsubscribe',
            method: 'POST',
            contentType: 'application/json',
            data: JSON.stringify({topic: topic}),
            success: function (response) {
                $('#response').html(`<div class="alert alert-${response.status === 'success' ? 'success' : 'danger'}">${response.message}</div>`);
            },
            error: function () {
                $('#response').html('<div class="alert alert-danger">请求出错</div>');
            }
        });
    }
</script>
</body>

</html>

4. 运行应用

在终端中运行以下命令启动 Flask 应用:

python app.py

5. 访问应用

打开浏览器,访问 http://127.0.0.1:5000 ,你将看到一个基于 Bootstrap 的用户界面,可以在其中进行消息的发布、订阅和取消订阅操作。

发布订阅界面

6.运行结果

MQTT使用python运行结果

源码地址

点击下载源码

注意事项

  • 请将 MQTT_BROKER、MQTT_PORT、MQTT_USERNAME 和 MQTT_PASSWORD 替换为你实际的 EMQX 服务器配置。
  • 确保 EMQX 服务器正在运行,并且允许客户端连接。
  • 在生产环境中,建议使用 HTTPS 协议来保证通信安全,并对代码进行适当的错误处理和性能优化。

评论

目录

    关闭