Python is one of the most widely used programming languages today, supported across all major
operating systems. For constrained environments, MicroPython offers a lightweight subset designed
for low-end processors.
This guide walks you through the step-by-step process of building an
MQTT client using the Eclipse Paho MQTT Python Library.
The Paho library is an open-source, freely available MQTT client implementation maintained by the Eclipse Foundation.
Before getting started, ensure you have the following:
pip install paho-mqtt==2.1.0
This section covers various ways to connect to an MQTT broker using Python. Before proceeding, ensure your broker supports the desired connection method (TCP, TLS, WebSocket, etc.) and you have the correct connection parameters:
Basic TCP Connection
BROKER_HOST = "broker-url"
BROKER_PORT = 1883
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="testclient", protocol=mqtt.MQTTv311)
client.connect(BROKER_HOST, BROKER_PORT)
Connect with Authentication
To connect to an MQTT Broker that requires Authentication, set Username & Password.
client.username_pw_set("username", "password")
Connect over TLS/SSL
To connect securely using TLS/SSL:
client.tls_set(cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS)
Connect via WebSocket
To connect over WebSocket instead of TCP:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENTID, protocol=PROTOCOL,
transport="websockets)#
To send data to a topic:
client.publish(topic="test/topic",
payload=json.dumps({"temperature":50.5, "humidity":34}),
qos=1, retain=False)
To receive messages, subscribe to a topic filter. The broker will forward matching messages to your client.
client.subscribe(topic="test/+", qos=1)
To stop receiving messages, the client can unsubscribe using:
client.unsubscribe(topic="test/+")
Always disconnect cleanly to avoid resource leaks and ensure proper session handling on the broker:
client.disconnect()
client.loop_stop()
1. Client Identification Strategy
Assign unique client IDs to each device for accurate identification. In private setups, allocate distinct IDs to individual clients; in shared environments, append a random string to each client ID to maintain uniqueness.
2. Data Architecture Design
Strategically plan your data structure in advance. Whether handling plain text, JSON formats, or numerical data, ensure that the design is tailored to meet the specific needs of your application.
3. Robust Error Handling
Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.
4. Securing Credentials
Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.
5. Regular Testing & Monitoring
Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.
6. Optimizing Session Management
Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.
7. Reconnect on Disconnect
Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.
import ssl
import time
import json
import logging
import paho.mqtt.client as mqtt
# Enable debug-level logs for MQTT activity (connection, publish, etc.)
logging.basicConfig(level=logging.DEBUG)
# --- MQTT Broker Configuration ---
BROKER_HOST = "mqtt-broker-URL" # Replace with actual broker host/IP
BROKER_PORT = 1883 # Default MQTT port (use 8883 if TLS is enabled)
TRANSPORT = "tcp" # 'tcp' or 'websockets'
Authentication = True # Enable username/password auth
USERNAME = "Username"
PASSWORD = "Password"
Encryption = False # Toggle TLS encryption
CLIENTID = "testclient4" # Unique identifier for this MQTT client
PROTOCOL = mqtt.MQTTv311 # Use MQTT version 3.1.1
# --- Callback: On successful connection to broker ---
def on_connect(client, userdata, flags, reason_code, properties=None):
client.subscribe("test/topic") # Subscribe to topic on connect
# --- Callback: When a message is received ---
def on_message(client, userdata, msg):
pass
# --- Callback: When the client disconnects from broker ---
def on_disconnect(client, userdata, reason_code, properties=None):
client.loop_stop() # Stop the background thread loop
# --- Create MQTT Client with proper protocol and API version ---
client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENTID,
protocol=PROTOCOL,
transport=TRANSPORT
)
client.enable_logger() # Optional: Log MQTT events to console
# --- Authentication Setup ---
if Authentication:
client.username_pw_set(USERNAME, PASSWORD)
# --- TLS Encryption Setup (only if enabled) ---
if Encryption:
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS
)
# --- Assign Callback Handlers ---
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# --- Connect to the MQTT Broker ---
client.connect(BROKER_HOST, BROKER_PORT)
client.loop_start() # Start a background thread to handle network loop
# --- Wait a bit to ensure connection and subscription complete ---
time.sleep(2)
# --- Publish a test message to the topic ---
TOPIC = "test/topic"
payload = json.dumps({"temperature": 50.5, "humidity": 34})
client.publish(topic=TOPIC, payload=payload, qos=1, retain=False)
# --- Give time for publish and any incoming messages ---
time.sleep(3)
# --- Disconnect from the broker ---
client.disconnect()
import ssl
import time
import json
import logging
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
# Enable debug-level logging for detailed MQTT activity
logging.basicConfig(level=logging.DEBUG)
# --- MQTT Broker Configuration ---
BROKER_HOST = "mqtt-broker-url" # Replace with actual broker address
BROKER_PORT = 1883 # Default port (use 8883 for TLS)
TRANSPORT = "tcp" # Options: 'tcp' or 'websockets'
Authentication = True # Enable username/password authentication
USERNAME = "Username"
PASSWORD = "Password"
Encryption = False # Set to True to use TLS
CLIENTID = "testclient5" # Unique client ID
PROTOCOL = mqtt.MQTTv5 # Use MQTT v5 protocol
# --- Callback: On Connect ---
def on_connect(client, userdata, flags, reason_code, properties=None):
# Subscribe to a topic upon successful connection
client.subscribe("test/topic")
# --- Callback: On Message ---
def on_message(client, userdata, msg):
# Handle incoming messages here
pass
# --- Callback: On Disconnect ---
def on_disconnect(client, userdata, reason_code, properties=None):
# Stop the loop when disconnected
client.loop_stop()
# --- Create MQTT Client ---
client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENTID,
protocol=PROTOCOL,
transport=TRANSPORT
)
client.enable_logger() # Enable internal logging (helps debug)
# --- Authentication ---
if Authentication:
client.username_pw_set(USERNAME, PASSWORD)
# --- TLS/SSL (if enabled) ---
if Encryption:
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS
)
# --- Assign Callbacks ---
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# --- MQTT v5 Connect Properties ---
connect_props = Properties(PacketTypes.CONNECT)
connect_props.SessionExpiryInterval = 60 # Keep session for 60s after disconnect
# --- Connect to the broker ---
client.connect(
BROKER_HOST,
BROKER_PORT,
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
properties=connect_props
)
client.loop_start() # Start the network loop in a separate thread
# --- Wait to ensure subscription completes ---
time.sleep(2)
# --- MQTT v5 Publish Properties ---
publish_props = Properties(PacketTypes.PUBLISH)
publish_props.MessageExpiryInterval = 30 # Expire message if not delivered in 30s
# --- Publish a test message ---
payload = json.dumps({"temperature": 50.5, "humidity": 34})
client.publish(
topic="test/topic",
payload=payload,
qos=1,
retain=False,
properties=publish_props
)
# --- Wait to ensure message is sent/received ---
time.sleep(3)
# --- Disconnect from broker ---
client.disconnect()