MQTT Broker Python Integration Guide

Introduction

MQTT Broker is the center of the MQTT implementation that exchanges data between the multiple MQTT Clients deployed in the field and the communication can happen over wifi, GSM or other ways of connectivity. CrystalMQ MQTT Server is build using C for the maximum performance. However based on the business needs, we have added python based interfaces for the MQTT Server to ensure that the implementation can extend it to achieve their business goals. As the direct interfacing is done behind the Broker, building applications by subscription is avoided to make the IoT Implementation scale and optimize bandwidth.

This documentation is a comprehensive guide for developers who are planning to build IoT Applications using Python MQTT hooks. You will be able to build a complete business IoT Application over the broker with these interfaces designed for Storage, AI/ML Integration, Authentication hooks, Schedules to run data analysis, User interface builder. Based on this document, you will be able to build an MQTT application in a day.

Python MQTT Interfaces

Bevywise MQTT Broker is a python / C based middleware with a publish subscribe architecture. It provides complete support for the MQTT protocol. Developers can use Python's hooks to enable an interface for extending the implementation. The interface can be used to build complex IoT applications. CrystalMQ comes with the default storage, User interface, rule engine, etc., However for the IoT/IIoT Implementation convenience, extensions were provided to customize and build around the MQTT Broker.

The current extendability options provided are:

  • Integrate your business dashboards using Custom UI
  • Storage customization and send data to Relational, Big Data engines.
  • Scheduling the Data Analysis that runs pre defined interval to analyze data and trigger actions.
  • Transform received IoT Data with AI/ML Integration
  • Third party Authentication integration like LDAP, IAM Application for the device security

A more detailed how to customize the Python MQTT Broker hook to achieve your goal is explained further.

Python MQTT Hook 1 : Custom Data Storage

MQTT Broker supports a few database by default for the storage. The data is stored in a pre defined format in relational Databases. Follownig are the list of supported databases:

  • MySQL
  • SQLite
  • PostgreSQL
  • MSSQL

You can enable these IoT data storage options without a single line of code. You can enable these options in the conf/datastore.conf

Custom Data Storage allows users to choose their own database for their application. In addition to choosing the database, you will be able to customize the data format as well in the datastore.conf file. The next section will explain how to configure the data storage.

Steps to configure Custom Data Store
  • Open the datastore.conf
  • Set the DATA_INTEGRATION option as TRUE. Selecting this option sends received messages from clients to the custom_store.py file. It also includes the default data storage.
datastore.conf

[CUSTOM_HOOKS]
DATA_INTEGRATION = TRUE
# TRUE || FALSE

  • When you configure custom settings of the payload storage, the MQTT data will be sent to the on_message_received_hook(data) method. This method handles the data received from clients. And the data sent to the clients from the broker will be sent to the on_message_sent_hook(data) method. This method is present in the python file specified in INTERCEPT_FILEPATH.
custom_store.py

def on_message_received_hook(data):
"""
This method is triggered every time the MQTT Broker receives a message from a Publishing Client.
data: {
'sender':<clientid>,
'topic':<topic name>,
'message':<payload>,
'unixtime':<timestamp>,
'datetime':<datetime>
}
"""
try:
pass
except Exception as e:
logger_p.error(f"{e}")

def on_message_sent_hook(data):
"""
This method is triggered every time the MQTT Broker sends a message to a Subscribing Client.
data: {
'sender':<clientid>,
'topic':<topic name>,
'sent_packetid':<packet id>,
'retain':<0/1>,
'qos':<0/1/2>,
'message':<payload>,
'unixtime':<timestamp>,
'datetime':<datetime>
}
"""
try:
pass
except Exception as e:
logger_p.error(f"{e}")

INTERCEPT_FILEPATH = ./CrystalMQ/extensions/custom_store.py

Additional object references

The custom_store.py file offers the following Python object reference. This reference allow for a closer integration with the MQTT Broker.

  • web_socket – UI Websocket object for sending data to the UI for any event notifications. This will be helpful on Custom UI implementation.
custom_store.py

def setwebsocketport(conf):
global web_socket
web_socket=conf["websocket"]

Implementation Bonus TIP

After configuring the custom MQTT data storage, you will receive the python message callback functions in your python file. These callback functions are used for the method on_message_received_hook(data).

Your implementation should receive the data and store it and return the method. You should do your data analysis in a separate thread.

Python MQTT Hook 2 : Custom Scheduler

When data that has already been acquired is better processed, AI and ML perform best. The Scheduling module assists in the processing of data over a period of time. The Custom Scheduler will assist you in creating your own MQTT broker schedule. This aids you in aggregating your data by allowing you to add your own code on the server-side.

For example, consider checking the water level in a home. The custom scheduler will help to set a reminder to check the status of the water level in the tank.

custom_scheduler.py

def custom_schedules():
enableCustomSchedules = False
schedules = [
{'time': schedule.every(2).seconds, 'job': job1}, # Every 2 seconds
{'time': schedule.every(1).minutes, 'job': job2}, # Every 1 minute
{'time': schedule.every(1).hours, 'job': job3}, # Every 1 hour
{'time': schedule.every().day.at("14:30"), 'job': job4}, # Every day at 14:30 (2:30 PM)
]
return enableCustomSchedules, schedules

  • Enable / Disable your schedule by adding value as True/False in ‘enableCustomSchedules’.
  • You can add your schedule in seconds, minutes, hours, and days in the appropriate space allocated for it.

Python MQTT Hook 3 : Custom UI Server

UI custom server provides an option to customize the user interface. By default, MQTT broker comes up with the custom dashboard option with rich widgets. Those widgets helps to visualize the data in a way user need. But to customize UI for advanced & high level IoT / IIoT implementation, Custom UI server will be the right option. You can add your own code on the server-side. This allows you to customize the UI of the MQTT server very straightforward. You can alter the code in custom_ui_server.py file as you need to customize it.

custom_ui_server.py

def custom_urls():
urls_info = {
"URL_REDIRECT": "/",
"urls":[
{"/new-page" : new_page_method},
]
}
return urls_info
def new_page_method(data):
try:
return ("Your New Page!!")
except Exception as e:
logger_p.error(f"{e}")

Python MQTT Hook 4 : Custom Authentication

CrystalMQ supports multiple MQTT authentication functionality. One of the options of the MQTT authentication supported is the custom hook where you can choose the provider of your own. It can be any Identify Access Management (IAM) or SSO or LDAP or anything that can be called using the MQTT Python Interface

mqtt.conf

#################### PORT & SECURITY CONFIGURATION #########################

[DEVICE_AUTHENTICATION]

AUTHENTICATION = CUSTOM
# DISABLED || DEFAULT || CUSTOM

# DISABLED: MQTT devices must not send Username & Password while connecting.
# DEFAULT: MQTT devices must send Username & Password credentials. Credentials can be added via UI. Authentication is verified by the Broker.
# CUSTOM: MQTT devices must send Username & Password. Authentication is NOT verified by the Broker.

CUSTOM_AUTHENTICATION_FILEPATH = ./../extensions/custom_authentication.py

# Used only when AUTHENTICATION = CUSTOM
# Implement your custom authentication in a Python method named "custom_authenticate()" and set that file path here.


How to enable custom authentication option?


  • Open Bevywise/CrystalMQ/conf folder
  • In that, open mqtt.conf file. [If you are a Windows user, you can either use a notepad or sublime to open the file]
  • In mqtt.conf file, set AUTHENTICATION as CUSTOM.

[DEVICE_AUTHENTICATION]

AUTHENTICATION = CUSTOM

  • Save the file and start running the MQTT broker.

Requesting Retries Count

When we attempt to connect to the server, some connection failures may happen. This may be due to entering incorrect login credentials. In that case providing countable retries will be helpful. By entering request retries count, you can add or limit the retries attempt of the user.

extensions/custom_authentication.py

# authentication_service = "https://<auth_url>"
# requests.adapters.DEFAULT_RETRIES = 3
# request_timeout = 0.1
# request_auth_method = "POST"

  • Open Bevywise/CrystalMQ/extensions folder
  • In that, open custom_authentication.py file. [If you are a Windows user, you can either use a notepad or sublime to open the file]
  • In custom_authentication.py file, enter number of request entries as per your need.
  • requests.adapters.DEFAULT_RETRIES = 3 (By default the value will be set as 3)

Setting the request URL

Enter the URL of your authentication landing page. This authenticates the user attempting to connect with their login credentials.

In custom_authentication.py file, provide the URL,

url = “https://www.bevywise.com/auth”

Request Timeout

It is the time duration or interval that an application waits for the response from the client. These values are probably given in seconds or milliseconds.

To set the request timeout,

Open custom_authentication.py file in extensions folder and enter timeout value in the space given.

request_timeout = 0.1 (By default it carries the value of 0.1)

Selecting Request Method

There are a set of HTTP request methods. You can select anyone to indicate the desired method to be performed.

GET – Requesting data from a specified resource.

POST – Submit or publish messages to the specified resource.

PUT – Replacing the existing data of the target resource.


Open custom_authentication.py file in extensions folder and enter auth method in the space given.


request_auth_method = “POST”


Set all your configurations, save the file & start running the broker.


MQTT broker offers a complete Internet of Things application. It includes user interface customization, data aggregation, and analysis. Additionally, it enables event data comparison with the processed data. This IoT application framework will help you build & manage industrial IoT applications faster. It also makes the process much easier within a single process.

Pre built Python MQTT Broker Examples

The ready to use Python MQTT Broker integration examples help you connect MQTT broker to the Elastic Search, Mongo DB & Redis. You can download MQTT Broker for free and try deploying these examples to store data as needed. You can also download pre built MQTT Clients from our library.

Learn more about the example python interfaces more in detail:


MQTT to MongoDB

MongoDB is one of the most widely used document storage engines for IoT data analysis. This plugin connects Bevywise MQTT Broker with MongoDB. It allows the storage of received payload data from connected clients into MongoDB. It helps you handle complex data in an easy manner and for powerful analysis. The below documentation explains how to configure and setup MQTT Broker in MongoDB.

Configure and Set up MQTT Broker MongoDB Connector

Go to the Bevywise Github and access the Mqttroute-mongodb-connector folder. Then open the 'mongo' folder, and there you can see the 'plugin.conf' file.

1. Open plugin.conf and configure the following:


  • Update hostname and port no of the MongoDB server in MONGO section
  • If AUTHENTICATION is set enabled in MQTT broker, then update the MongoDB credentials. Otherwise set AUTHENTICATION_ENABLED = FALSE.
  • Update log file path to your own folder location. [default = Bevywise/CrystalMQ/extensions].
plugin.conf

[MONGO]

HOSTNAME = 127.0.0.1
PORT = 27017
DB_NAME = bevywise
COLLECTION = mqttroute
[AUTHENTICATION]
AUTHENTICATION_ENABLED = FALSE
# TRUE || FALSE
USERNAME = root
PASSWORD = root

[LOG]
LOG_FILE_PATH = ../extensions/mongo.log


2. Copy the folder mongo and paste it into Bevywise/CrystalMQ/extensions .


3. Copy the folder plugin.conf and paste it into Bevywise/CrystalMQ/extensions.


4. Replace custom_store.py with Bevywise/CrystalMQ/extensions/custom_store.py.


5. Open Bevywise/CrystalMQ/conf/datastore.conf and update DATA_INTEGRATION = TRUE


6. Start the MQTT broker and it will start storing all the payload into the Mongo DB server.


MQTT to Redis Connector

In the database, the Redis connector uses both clustered and nonclustered connections. Only password-based authentication is supported by Redis Connector. This Python MQTT plugin connects MQTT broker with the Redis server. It allows to store all the payloads to the Redis server for further processing.

Configure and Set up MQTT Broker Redis connector

1. Replace”custom_store.py” with Bevywise/CrystalMQ/lib/custom_store.py.

2. In custom_store.py change the server name & port of the Redis if you are running Redis on a different server or port.

custom_store.py

redishost=‘localhost’
redisport=6379

3.Then, open Bevywise/CrystalMQ/conf/datastore.conf and update DATA_INTEGRATION = TRUE

4. Start the MQTT broker. It will start to store all the payload into the Redis server with clientId_unixtime as the key.

Building Your IoT Application!

Looking for some get started guidance? Keep us posted.