Python MQTT Integration - Developer Guide
Python MQTT Interfaces
MQTTRoute Plugin
Introduction
Internet of things today generates a large amount of data that gets exchanged between the broker and clients. We have built Python MQTT interface to enable application developers hook in the data from the MQTT Broker. These hooks help build a highly unified application and at the same time reduce the bandwidth required by half.
This documentation provides a complete guide for developers to use the python MQTT hooks by connecting to the broker, sending messages to a MQTT Broker, and providing a deep tech view on broker configuration. Refer to the sample IoT application that is built using the Python MQTT hooks available in the MQTTRoute.
Python MQTT Interfaces
- Flexibility in Storage of the Data as required for the application
- Analysis of the data at periodic intervals
- Integration of authentication like LDAP and other IAM application for device authentication.
- Customizing the User Interface for visual data presentation.
Python MQTT Hook 1 : Custom Data Storage
Custom Data Storage is enabling user to choose database that is unique to their application and the data is collected by the database you require; alternatively, you can decide which database is to be stored. With MQTTRoute user has the ability to store data in the following, databases by default itself.
- MySQL
- SQLite
- PostgreSQL
- MSSQL
- ElasticSearch
You can enable these data storage options without a single line of code. These options can be enabled in the conf/data_store.conf
To ensure that MQTT Broker supports all the kind of data storage available, we have added the Custom Data Storage Hook. You will be able to enable the Custom storage hook on the same data_store.conf configuration file. The next section will explain how to configure the data storage.
Steps to configure Custom Data Store
- Open the data_store.conf
- Make the CUSTOMSTORAGE option as ENABLED. The mqtt message received from clients will be sent to the custom_store.py file in addition to the default data storage if this option is selected.
- To store data in the Elastic search, the value of DATASTORE needs to be specified as ELASTIC. The same can be done for all databases. HOSTNAME, the host name or the IP of the MQTT Broker. The IP of the MQTT Broker is 127.0.0.1. PORT, the port of the custom data store you are using. Here it is mentioned as 9200 since the port number of Elastic search is 9200. If you want to utilise a different database, you must specify the appropriate port. The port number for MYSQL is 3306, MSSQL is 1434, and PostgreSQL is 5432. INDEX_NAME specify the name under which data should be stored. This looks like the name of a database. The index name is which you want to store data.
- If you are planning to have your own data storage, the value of DATASTORE need to be specified as CUSTOM. When the custom is configured, the MQTT data will be sent to handle_Received_Payload() method for the data received from clients and handle_Sent_Payload() method for the data sent to the clients from broker in the python file specified in INTERCEPT_FILEPATH.
data_store.conf
#############################CUSTOM STORAGE###########################
# def handle_Received_Payload(data)
[CUSTOM STORAGE]
CUSTOMSTORAGE = DISABLED
# ENABLED || DISABLED
DATASTORE = CUSTOM
# ELASTIC || CUSTOM
[ELASTIC]
HOSTNAME = 127.0.0.1
PORT = 9200
INDEX_NAME = mqtt
BULK_INSERT_TIMING = 2
[CUSTOM] INTERCEPT_FILEPATH = ./../extensions/custom_store.py
Addional object references
The following python object references are provided into the custom_store.py file for a closer integration with the MQTT Broker.
- db_cursor – the DB reference to work with the database. You will be able to read and write data into the database. The reference of the DB set on the conf file will be provided.
- elastic_search – Elastic Search cursor if the elastic option is enabled.
- datasend – Python object that can be used to send any MQTT messages to the connected mqtt clients.
- web_socket – UI Websocket object for sending data to the UI for any event notifications. This will be helpful on Custom UI implementation.
Refer to the files below and the same available in the <Broker Home>/extensions/ folder for further help.
custom_store.py
global db_cursor
#
# elastic_search cursor
#
global elastic_search
import os, sys
global datasend
custom_store.py
global Client_obj
sys.path.append(os.getcwd()+’/../extensions’)
# Called on the initial call to set the SQL Connector
def setsqlconnector(conf):
global db_cursor
db_cursor=conf[“sql”]
# Called on the initial call to set the Elastic Search Connector
def setelasticconnector(conf):
global elastic_search
elastic_search=conf[“elastic”]
def setwebsocketport(conf):
global web_socket
web_socket=conf[“websocket”]
def setclientobj(obj):
global Client_obj
Client_obj=obj[‘Client_obj’]
#Client_obj
custom_store.py
# Importing the custom client class into the handler
from customimpl import DataReceiver
datasend = DataReceiver()
def handle_Received_Payload(data):
#
# Write your code here. Use your connection object to
# Send data to your data store
print ” print in the handle_received_payload “,data
result = datasend.receive_data(data)
# if result is none then write failed
def handle_Sent_Payload(data):
#
# Write your code here. Use your connection object to
# Send data to your data store
print ” print in the handle_Sent_payload “,data
result = datasend.sent_data(data)
Implementation Bonus TIP
After the successful configuration of the custom data storage, you will be receiving the python message callback functions in your python file for the method handle_Received_Payload(data)
Your implementation should receive the data and store it and return the method. You should do your data analysis in a separate thread.
Python MQTT Hook 2 : Custom Scheduler
When data that has already been acquired is better processed, ML and AI perform best. The Scheduling module assists in the processing of data over a period of time. The Custom Scheduler will assist you in creating your own MQTTRoute schedule to aggregate your data by allowing you to add your own code on the server-side.
For example, consider checking the water level in a home. The custom scheduler will help to set a reminder to check the status of the water level in the tank.
def schedule_conf():
schedules={}
schedules={
‘STATUS’:’DISABLE’,
‘SCHEDULES’:[
{‘OnceIn’:1,’methodtocall’:oneminschedule},
{‘OnceIn’:5,’methodtocall’:fiveminschedule}]}
return schedules
- Enable / Disable your schedule by adding value as Enable / Disable in ‘STATUS’.
- You can add your schedule in MINUTES in ‘OnceIn’.
- Add your method to call on schedule in ‘methodtocall’.
global elastic_search
# Called on the initial call to set the SQL Connector
global web_socket
# Web_socket
def setsqlconnector(conf):
global db_cursor
db_cursor=conf[“sql”]
def setelasticconnector(conf):
global elastic_search
elastic_search=conf[“elastic”]
def setwebsocketport(conf):
global web_socket
web_socket=conf[“websocket”]
def setclientobj(obj):
global Client_obj
Client_obj=obj[‘Client_obj’]
def oneminschedule():
pass
#Write your code here
#print “extension print”
def fiveminschedule():
pass
#Write your code here
#print “extension print”
Now start adding your query in ‘#Write your code here‘ field.
Python MQTT Hook 3 : Custom UI Server
UI custom server provides an option to customize the user interface. By default, MQTTRoute comes up with the custom dashboard option with rich widgets to visualize the data in a way user need. But to customize UI for advanced & high level IoT / IIoT implementation, Custom UI server will be the right option. It will help you customize the UI of the MQTT server very straightforward by adding your own code on the server-side. You can alter the code in Custom_ui_server.py file as you need to customize it.
custom_ui_server.py
#
# SQL Connector. It will be sqlite / mssql / mysql cursor based
# on your configuration in db.conf
# Please construct your queries accordingly.
#
import sys
global db_cursor
#username’,’client_send_api’,’topic_name’,’message’,1,0,’10’,0
# elasstic_search cursor.
global elastic_search
from datetime import datetime
import json
import ast
import time
try :
from elasticsearch import Elasticsearch
elastic_search = Elasticsearch(host=”localhost”, port=9200)
except Exception as e :
print(e)
import os, sys
#
#Client object. It used to send/publish client message to any active clients
#Simply call the helper functions with parameters like User_name,Client_id,Topic_name,Message,QOS, to subscribe to a topic / subscribe client
global Client_obj
# Called on the initial call to set the SQL Connector
def setsqlconnector(conf):
global db_cursor
db_cursor=conf[“sql”]
# Called on the initial call to set the Elastic Search Connector
def setelasticconnector(conf):
global elastic_search
elastic_search=conf[“elastic”]
def setclientobj(obj):
global Client_obj
Client_obj=obj[‘Client_obj’]
The Data connectors, SQL Connector will be provided as a cursor global variable for querying the Database and Elastic Search connector for querying Elastic if you have enabled the custom storage option.
custom_ui_server.py
#
# Configure your additional URLs here.
# The default URLs are currently used for the UI.
# Please don’t remove them, if you are building it over the same UI.
#
def custom_urls():
urllist={
“AUTHENTICATION”:’DISABLE’,
“urls”:[{“/extend/url1”:method},
{“/extend/url2”:method1},
{“/extend/url3”:method2}]
}
return urllist
# write your url function codes in the following methods
def method():
return (“BEVYWISE NETWORKS”)
def method1():
return (“BEVYWISE NETWORKS”)
def method2():
return (“BEVYWISE NETWORKS”)
Add your new functionality using the URL and the corresponding method. These URLs can be invoked from your User Interface for manipulating data. We support the GET Http method in this version.
Python MQTT Hook 4 : Custom Authentication
MQTT Broker has custom authentication functionality which enables user to integrate MQTT Broker with any identity access management (IAM) & Single Sign on (SSO).
broker.conf
[CONFIG]
PORT_NO = 1883
WS_PORT_NO = 10443
TLS_ENABLED = FALSE
# TLS_PORT must be 88xx.
TLS_PORT_NO = 8883
WSS_PORT_NO = 11443
########################Device Authentication ######################
[AUTHENTICATION]
AUTHENTICATION_ENABLED = NO
# YES || NO
######################## User Interface Details ######################
[UI]
UI_Http_Port = 8080
LIST_API_CLIENTS = FALSE
[WEBSOCKET]
WEBSOCKET_PORT=8081
############# Prefix for Random Client id Generation #####################
[MQTT]
CLIENTID_PREFIX = Bevywise-
# The clean session to be handled by the broker. If the clear_session is marked as
# disabled, then every connection will be made fresh.
CLEAR_SESSION = DEFAULT
# DEFAULT || DISABLED
################ ######### WEB LOGIN ############################
# Securing the Web login XXXX Need to be removed XXX
[WEB_LOGIN_PAGE]
WEB_LOGIN = ENABLED
WEB_USERNAME = admin
WEB_PASSWORD = admin
# ENABLED || DISABLED
################ #### REMOTE AUTHENTICATION ##################
[REMOTEAUTH]
REMOTEAUTH_ENABLED = NO
# YES || NO
INTERCEPT_FILEPATH = ./../extensions/custom_auth.py
How to enable custom authentication option
- Open Bevywise/MQTTRoute/conf folder
- In that, open broker.conf file. [ If you are a Windows user, you can either use a note pad or sublime to open the file ]
- In broker.conf file, enable REMOTE AUTH field. By default it takes the value as NO
- REMOTEAUTH_ENABLED = YES
- Save the file and start running the MQTT broker.
Requesting Retries Count
When we attempt to connect to the server, some connection failures may happen eventually. This may be due to entering incorrect login credentials. In that case providing countable retries will be helpful. By entering request retries count, you can add or limit the retries attempt of the user.
extensions/custom_auth.py
# Request Retries Count
requests.adapters.DEFAULT_RETRIES = 3
# Request URL
url = “https://www.bevywise.com/auth”
# Request Timeout
request_timeout = 0.1
# Request Method
request_auth_method = “POST”
# POST | GET | PUT
- Open Bevywise/MQTTRoute/extensions folder
- In that, open custom_auth.py file. [ If you are a Windows user, you can either use a note pad or sublime to open the file ]
- In custom_auth.py file, enter number of request entries as per your need.
- requests.adapters.DEFAULT_RETRIES = 3 (By default the value will be set as 3)
Setting the request URL
Enter the URL of your authentication landing page. This authenticates the user attempting to connect with their login credentials.
In custom_auth.py file, provide the URL,
url = “https://www.bevywise.com/auth”
Request Timeout
Timeout is generally the time duration or interval that an application waits for the response from the client. These values are probably given in seconds or milliseconds.
To set the request timeout,
Open custom_auth.py file in extensions folder and enter timeout value in the space given.
request_timeout = 0.1 (By default it carries the value of 0.1)
Selecting Request Method
You can select the request method from the set of HTTP’s request methods to indicate the desired method to be performed.
GET – Requesting data from a specified resource.
POST – Submit or publish messages to the specified resource.
PUT – Replacing the existing data of the target resource
Open custom_auth.py file in extensions folder and enter auth method in the space given.
request_auth_method = “POST”
Set all your configurations, save the file & start running the broker.
MQTTRoute comes up with the complete internet of things application including user interface customization, data aggregation & analysis, event data comparison with the processed data. This IoT application framework will help you build and manage the industrial IoT applications faster and much easier within a single process.
Python MQTT Broker plugin
MongoDB Connector
MongoDB is one of the most widely used document storage engines for IoT data analysis. This plugin connects Bevywise MQTT Broker with the MongoDB to store received payload data from connected clients into MongoDB. It helps you handle complex data in an easy manner and for powerful analysis. The below documentation explains how to configure and setup MQTT Broker in MongoDB.
Configure and Set up MQTTRoute-MongoDB-connector
1. Open plugin.conf and configure the
- Update hostname and port no of the MongoDB server in MONGO section
- If AUTHENTICATION is enabled in MQTTRoute, then update the MongoDB credentials otherwise set AUTHENTICATION_ENABLED = FALSE.
- Update log file path to your own folder location. [default = Bevywise/MQTTRoute/extensions].
plugin.conf
[MONGO]
HOSTNAME = 127.0.0.1
PORT = 27017
DB_NAME = bevywise
COLLECTION = mqttroute
[AUTHENTICATION]
AUTHENTICATION_ENABLED = FALSE
# TRUE || FALSE
USERNAME = root
PASSWORD = root
[LOG]
LOG_FILE_PATH = ../extensions
2. Copy the folder mongo and paste it into Bevywise/MQTTRoute/extensions.
3. Copy the folder plugin.conf and paste it into Bevywise/MQTTRoute/extensions.
4. Replace custom_store.py with Bevywise/MQTTRoute/extensions/custom_store.py.
5. Open Bevywise/MQTTRoute/conf/data_store.conf.
- update CUSTOM STORAGE = ENABLED
- update DATA STORE = CUSTOM
6. Start the MQTTRoute and it will start storing all the payload into the Mongo DB server.
Redis Connector
In the database, the Redis connector uses both clustered and nonclustered connections. Only password-based authentication is supported by Redis Connector. This Python MQTT plugin connects MQTTRoute with the Redis server to store all the payloads to the Redis server for further processing.
Configure and Set up MQTTRoute-Redis-connector
1.Replace”custom_store.py” with Bevywise/MQTTRoute/lib/custom_store.py.
2. In custom_store.py change the server name and port of the Redis if you are running Redis on a different server or port.
custom_store.py
redishost=‘localhost’
redisport=6379
3. Then, Open Bevywise / MQTTRoute / conf / data_store.conf
- update CUSTOM STORAGE = ENABLED
- update DATA STORE = CUSTOM
4. Start the MQTTRoute and it will start storing all the payload into the Redis server with clientId_unixtime as the key.
Elastic Connector
MQTT Broker store data to Elastic search via custom implementation for better data visualization. The published payload only push to the Elastic search which helps you hook it and send to your data visualization tool.
Configure and Set up MQTTRoute-Elasticsearch-connector
1. Open plugin.conf and configure the
- Update hostname and port no of the Elastic search
- Update log file path to your own folder location. [default = Bevywise/MQTTRoute/extensions].
plugin.conf
[ELASTIC]
HOSTNAME = 127.0.0.1
PORT = 9200
INDEX_NAME = mqttroute
[LOG]
LOG_FILE_PATH = ../extensions
2. Copy the folder plugin.conf and paste it into Bevywise/MQTTRoute/extensions.
3. Copy the folder Elastic and paste it into Bevywise/MQTTRoute/extensions.
4.Replace custom_store.py with Bevywise / MQTTRoute / extensions / custom_store.py.
5.Open Bevywise / MQTTRoute / conf / data_store.conf.
- update CUSTOM STORAGE = ENABLED
- update DATA STORE = ELASTIC
6. Start the MQTTRoute and it will start storing all the payload into the Elastic search server.