Implementing Python Client with Paho MQTT

Text Copied

Introduction

Python is one of the most widely used programming languages today, supported across all major operating systems. For constrained environments, MicroPython offers a lightweight subset designed for low-end processors.

This guide walks you through the step-by-step process of building an MQTT client using the Eclipse Paho MQTT Python Library.

The Paho library is an open-source, freely available MQTT client implementation maintained by the Eclipse Foundation.

Pre-requisites

Before getting started, ensure you have the following:

  • Python - Make sure Python 3.6 or later is installed on your system, or download the necessary version from the official Python website.
  • Paho MQTT Library

pip install paho-mqtt==2.1.0

MQTT Connect

This section covers various ways to connect to an MQTT broker using Python. Before proceeding, ensure your broker supports the desired connection method (TCP, TLS, WebSocket, etc.) and you have the correct connection parameters:

  • Broker Address
  • Port
  • Username/Password (if required)
  • CA Certificate (for TLS connections)

Basic TCP Connection

BROKER_HOST = "broker-url"
BROKER_PORT = 1883
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="testclient", protocol=mqtt.MQTTv311)
client.connect(BROKER_HOST, BROKER_PORT)

Connect with Authentication

To connect to an MQTT Broker that requires Authentication, set Username & Password.

client.username_pw_set("username", "password")


Connect over TLS/SSL

To connect securely using TLS/SSL:

client.tls_set(cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS)


Connect via WebSocket

To connect over WebSocket instead of TCP:

client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENTID, protocol=PROTOCOL,
transport="websockets)#

MQTT Publish

To send data to a topic:

client.publish(topic="test/topic",
payload=json.dumps({"temperature":50.5, "humidity":34}),
qos=1, retain=False)

MQTT Subscribe

To receive messages, subscribe to a topic filter. The broker will forward matching messages to your client.

client.subscribe(topic="test/+", qos=1)


To stop receiving messages, the client can unsubscribe using:

client.unsubscribe(topic="test/+")

MQTT Disconnect

Always disconnect cleanly to avoid resource leaks and ensure proper session handling on the broker:

client.disconnect()
client.loop_stop()

Best Practices

1. Client Identification Strategy

Assign unique client IDs to each device for accurate identification. In private setups, allocate distinct IDs to individual clients; in shared environments, append a random string to each client ID to maintain uniqueness.

2. Data Architecture Design

Strategically plan your data structure in advance. Whether handling plain text, JSON formats, or numerical data, ensure that the design is tailored to meet the specific needs of your application.

3. Robust Error Handling

Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.

4. Securing Credentials

Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.

5. Regular Testing & Monitoring

Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.

6. Optimizing Session Management

Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.

7. Reconnect on Disconnect

Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.

Sample MQTTv311 Client


import ssl
import time
import json
import logging
import paho.mqtt.client as mqtt
# Enable debug-level logs for MQTT activity (connection, publish, etc.)
logging.basicConfig(level=logging.DEBUG)
# --- MQTT Broker Configuration ---
BROKER_HOST = "mqtt-broker-URL"   # Replace with actual broker host/IP
BROKER_PORT = 1883                # Default MQTT port (use 8883 if TLS is enabled)
TRANSPORT = "tcp"                 # 'tcp' or 'websockets'
Authentication = True             # Enable username/password auth
USERNAME = "Username"
PASSWORD = "Password"
Encryption = False                # Toggle TLS encryption
CLIENTID = "testclient4"          # Unique identifier for this MQTT client
PROTOCOL = mqtt.MQTTv311          # Use MQTT version 3.1.1
# --- Callback: On successful connection to broker ---
def on_connect(client, userdata, flags, reason_code, properties=None):
    client.subscribe("test/topic")  # Subscribe to topic on connect
# --- Callback: When a message is received ---
def on_message(client, userdata, msg):
    pass
# --- Callback: When the client disconnects from broker ---
def on_disconnect(client, userdata, reason_code, properties=None):
    client.loop_stop()  # Stop the background thread loop
# --- Create MQTT Client with proper protocol and API version ---
client = mqtt.Client(
    mqtt.CallbackAPIVersion.VERSION2,
    client_id=CLIENTID,
    protocol=PROTOCOL,
    transport=TRANSPORT
)
client.enable_logger()  # Optional: Log MQTT events to console
# --- Authentication Setup ---
if Authentication:
    client.username_pw_set(USERNAME, PASSWORD)
# --- TLS Encryption Setup (only if enabled) ---
if Encryption:
    client.tls_set(
        cert_reqs=ssl.CERT_REQUIRED,
        tls_version=ssl.PROTOCOL_TLS
    )
# --- Assign Callback Handlers ---
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# --- Connect to the MQTT Broker ---
client.connect(BROKER_HOST, BROKER_PORT)
client.loop_start()  # Start a background thread to handle network loop
# --- Wait a bit to ensure connection and subscription complete ---
time.sleep(2)
# --- Publish a test message to the topic ---
TOPIC = "test/topic"
payload = json.dumps({"temperature": 50.5, "humidity": 34})
client.publish(topic=TOPIC, payload=payload, qos=1, retain=False)
# --- Give time for publish and any incoming messages ---
time.sleep(3)
# --- Disconnect from the broker ---
client.disconnect()
        

Sample MQTTv5 Client


import ssl
import time
import json
import logging
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
# Enable debug-level logging for detailed MQTT activity
logging.basicConfig(level=logging.DEBUG)
# --- MQTT Broker Configuration ---
BROKER_HOST = "mqtt-broker-url"       # Replace with actual broker address
BROKER_PORT = 1883                    # Default port (use 8883 for TLS)
TRANSPORT = "tcp"                     # Options: 'tcp' or 'websockets'
Authentication = True                 # Enable username/password authentication
USERNAME = "Username"
PASSWORD = "Password"
Encryption = False                    # Set to True to use TLS
CLIENTID = "testclient5"              # Unique client ID
PROTOCOL = mqtt.MQTTv5                # Use MQTT v5 protocol
# --- Callback: On Connect ---
def on_connect(client, userdata, flags, reason_code, properties=None):
    # Subscribe to a topic upon successful connection
    client.subscribe("test/topic")
# --- Callback: On Message ---
def on_message(client, userdata, msg):
    # Handle incoming messages here
    pass
# --- Callback: On Disconnect ---
def on_disconnect(client, userdata, reason_code, properties=None):
    # Stop the loop when disconnected
    client.loop_stop()
# --- Create MQTT Client ---
client = mqtt.Client(
    mqtt.CallbackAPIVersion.VERSION2,
    client_id=CLIENTID,
    protocol=PROTOCOL,
    transport=TRANSPORT
)
client.enable_logger()  # Enable internal logging (helps debug)
# --- Authentication ---
if Authentication:
    client.username_pw_set(USERNAME, PASSWORD)
# --- TLS/SSL (if enabled) ---
if Encryption:
    client.tls_set(
        cert_reqs=ssl.CERT_REQUIRED,
        tls_version=ssl.PROTOCOL_TLS  
    )
# --- Assign Callbacks ---
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# --- MQTT v5 Connect Properties ---
connect_props = Properties(PacketTypes.CONNECT)
connect_props.SessionExpiryInterval = 60  # Keep session for 60s after disconnect
# --- Connect to the broker ---
client.connect(
    BROKER_HOST,
    BROKER_PORT,
    clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
    properties=connect_props
)
client.loop_start()  # Start the network loop in a separate thread
# --- Wait to ensure subscription completes ---
time.sleep(2)
# --- MQTT v5 Publish Properties ---
publish_props = Properties(PacketTypes.PUBLISH)
publish_props.MessageExpiryInterval = 30  # Expire message if not delivered in 30s
# --- Publish a test message ---
payload = json.dumps({"temperature": 50.5, "humidity": 34})
client.publish(
    topic="test/topic",
    payload=payload,
    qos=1,
    retain=False,
    properties=publish_props
)
# --- Wait to ensure message is sent/received ---
time.sleep(3)
# --- Disconnect from broker ---
client.disconnect()
        

Troubleshooting

  • If the MQTT broker URL is wrong or not reachable, the program may throw a connection exception.
  • If you're using TLS, make sure to set Encryption=True and port to 8883.

Power IoT with Python!

Run Smarter Workloads on
Our MQTT Broker

Easily connect your Python MQTT client, streamline messaging, and scale IoT projects
with a reliable, high-performance broker.