This guide offers a comprehensive tutorial on connecting an MQTT client running on an ESP32 device to our CrystalMQ broker or any broker of your choice. It covers essential tasks such as establishing connections, subscribing to topics, unsubscribing, and exchanging messages. By following these steps, you can effectively integrate MQTT communication into your IoT projects.
Before you begin, ensure you have:
Install the Arduino IDE
Download and install the Arduino IDE from the official Arduino website.
Install the ESP32 Board Package
https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json
If there are already URLs in this field, you can separate them with commas.
Select the ESP32 Board
Install Required Libraries
Simply click on the desired library and then click the Install button.
Verify Installation
Define Constants
Replace your_wifi_ssid, your_wifi_password, mqtt_server, and mqtt_port with your actual WiFi and MQTT broker details.
/ WiFi
const char *ssid = "xxxxx"; // Enter your WiFi ssid
const char *password = "xxxxx"; // Enter WiFi password
// MQTT Broker
const char *mqtt_server = "public-mqtt-broker.bevywise.com";
const int mqtt_port = 1883;
const char *mqtt_username = "username"; (optional)
const char *mqtt_password = "pwd"; (optional)
This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)
MQTT Over TCP
Use the following code to connect the client over TCP. Define the Macro ADDRESS using MQTT Broker's connection parameters.
For MQTT 3.1.1 :
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{
delay(500);
Serial.println("Connecting to WiFi..");
}
const char *client_id = “esp_client”;
client.setServer(mqtt_server,mqtt_port);
For MQTT 5 :
AsyncMqttClient mqttClient;
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
// You can subscribe to topics here
// mqttClient.subscribe("test/topic", 1);
}
void setup() {
Serial.begin(115200);
connectToWifi();
mqttClient.onConnect(onMqttConnect);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setClientId(client_id);
// Set the protocol version to MQTT 5.0
mqttClient.setProtocolVersion(5);
mqttClient.connect();
}
MQTT Over TLS / SSL
Use the following code to connect securely to MQTT Broker over TLS. Define the Macro ADDRESS using MQTT Broker's connection parameters.
#include <WiFiClientSecure.h>
// Root CA certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_ROOT_CA_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// init wifi secure client
WiFiClientSecure espClient;
PubSubClient client(espClient);
espClient.setCACert(root_ca);
espClient.setCertificate(server_cert); //for client verification
espClient.setPrivateKey(server_key); // for client verification
Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.
If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:
// MQTT Broker settings
const char* mqtt_server = "your_mqtt_server";
const uint16_t mqtt_port = 8883; // Typically, port 8883 is used for
MQTT over TLS
const char* client_id = generateClientId(crystal_, length);
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Disable server certificate verification
espClient.setInsecure();
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}
If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:
/ Root CA certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_ROOT_CA_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Set the root CA certificate
espClient.setCACert(root_ca);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}
If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:
// Self-signed root certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_SELF_SIGNED_ROOT_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Set the self-signed root CA certificate
espClient.setCACert(root_ca);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}
MQTT Over WebSocket
Define the MQTT Broker Address like this to connect the client over WebSocket.
#include <PubSubClient.h>
#include <WebSocketsClient.h>
// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com";
const uint16_t mqtt_port = 10443; // WebSocket without TLS usually runs
on port 80
const char* client_id = generateClientId(crystal_, length);
// Initialize WiFi and MQTT clients
WiFiClient espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT over WebSocket... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Handle incoming messages
Serial.print("Message arrived on topic: ");
Serial.print(topic);
Serial.print(". Message: ");
for (unsigned int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}
MQTT Over Secure WebSocket
Use the following code to connect the client over Secure WebSocket. Set TLS Options as given in MQTT Over TLS section. Define the Macro ADDRESS using MQTT Broker's connection parameters.
// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com";
const uint16_t mqtt_port =11443; // WebSocket with TLS usually runs on
port 443
const char* client_id = "esp_client";
// Root CA certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_ROOT_CA_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Set the root CA certificate for TLS
espClient.setCACert(root_ca);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT over WebSocket with TLS... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Handle incoming messages
Serial.print("Message arrived on topic: ");
Serial.print(topic);
Serial.print(". Message: ");
for (unsigned int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}
Configuring MQTT Authentication
To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:
#include <WiFi.h>
#include <WiFiClient.h>
#include <PubSubClient.h>
// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com";
const uint16_t mqtt_port = 1883; // Use port 1883 for non-TLS
const char* mqtt_user = "your_mqtt_username";
const char* mqtt_password = "your_mqtt_password";
const char* client_id_prefix = "esp_client_";
// Initialize WiFi and MQTT clients
WiFiClient espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
String generateClientId(String prefix, int length) {
const char chars[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
int charsLength = sizeof(chars) - 1;
String clientId = prefix;
// Initialize random seed
randomSeed(analogRead(0));
for (int i = 0; i < length; i++) {
clientId += chars[random(0, charsLength)];
}
return clientId;
}
void connectToMqtt() {
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
String client_id = generateClientId(client_id_prefix, 10);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id.c_str(), mqtt_user, mqtt_password))
{
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length)
{
// Handle incoming messages
Serial.print("Message arrived on topic: ");
Serial.print(topic);
Serial.print(". Message: ");
for (unsigned int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}
Setting Up Last Will & Testament
Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.
Use the following code to set Last Will in the Connection Options:
// LWT settings
const char* lwt_topic = "test/lwt";
const char* lwt_message = "ESP32 disconnected";
const int lwt_qos = 1;
const bool lwt_retain = true;
//Function to reconnect MQTTBroker
void reconnect()
{
while (!mqttClient.connected())
{
Serial.print("Attempting MQTT connection...");
// Attempt to connect with the MQTT username, password, and LWT
if(mqttClient.connect("ESP32Client",mqtt_user,mqtt_password,
lwt_topic,lwt_qos, lwt_retain, lwt_message)) {
Serial.println("connected"); // Subscribe to a topic
mqttClient.subscribe("test/topic");
}
else
{
Serial.print("failed, rc=");
Serial.print(mqttClient.state());
Serial.println(" try again in 5 seconds"); // Wait 5 seconds before
retrying
delay(5000); } } }
Adjusting Keep Alive
MQTT maintains client-broker connections with a keep-alive mechanism. Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.
Modify the code below to suit your requirements:
// MQTT keep-alive interval (in seconds)
const int mqtt_keep_alive = 60; // Adjust as needed
void setup()
{
mqttClient.setKeepAlive(mqtt_keep_alive);
}
Configuring Session Persistence
Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0.
The Client can get the MQTT Broker to store its session data across connections.
MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.
For MQTT 3.1.1 :
if (mqttClient.connect("ESP32ClientClean", mqtt_user, mqtt_password,
nullptr, 0, false, nullptr, true))
{
Serial.println("connected");
}
if (mqttClient.connect("ESP32ClientPersistent", mqtt_user,
mqtt_password, nullptr, 0, false, nullptr, false))
{
Serial.println("connected");
}
MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.
For MQTT 5 :
// MQTT client settings
const char* client_id = "ESP32Client";
const bool clean_start = false;
const uint32_t session_expiry_interval = 3600; // Session expiry
interval in seconds
AsyncMqttClient mqttClient;
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic", 0);
}
void setup() {
Serial.begin(115200);
connectToWifi();
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCredentials(mqtt_user, mqtt_password);
// Configure MQTT 5 features
mqttClient.setCleanSession(clean_start);
mqttClient.setSessionExpiry(session_expiry_interval);
mqttClient.connect(client_id);
}
void loop() {
// Nothing to do here, AsyncMqttClient handles everything
}
Setting Maximum Packet Size
MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:
For MQTT 5 :
const char* data = “largest data to send”;
const int chunkSize = 1024;
int dataLength = strlen(data);
int numChunks = (dataLength + chunkSize - 1) / chunkSize;
// Calculate number of chunks needed
for (int i = 0; i < numChunks; i++)
{
char chunk[chunkSize + 1]; // Buffer for chunk
// Copy chunk of data into buffer
strncpy(chunk, data + i * chunkSize, chunkSize);
chunk[chunkSize] = '\0'; // Null-terminate the string
// Publish chunk as MQTT message
mqttClient.publish("large_data_topic", chunk, false); // false for QoS 0
Serial.print("Published chunk ");
}
Sending Data
Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:
For MQTT 3.1.1
mqttclient.publish("topic",message.c_str());
For MQTT 5
// Example publishing
String message = "Hello, MQTT!";
mqttClient.publish("topic", 0, false, message.c_str());
}
Setting Retained Messages
Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.
To implement this, use the following code snippet:
// Example: Publish a retained message every 10 seconds
static unsigned long lastPublish = 0;
if (millis() - lastPublish > 10000)
{
lastPublish = millis();
if (mqttClient.publish("topic", "Device Status = 1", true))
{
Serial.println("Retained message published successfully");
}
else
{
Serial.println("Retained message publish failed");
}
}
Specifying QoS Levels
MQTT provides three levels of Quality of Service (QoS) for message delivery:
if (mqttClient.publish("topic", "Device status with DoS 0", false, 0))
{
Serial.println("Message with QoS 0 published successfully");
}
else
{
Serial.println("Message with QoS 0 publish failed");
}
// Publish a message with QoS 1
if (mqttClient.publish("topic", "Devicce status with QoS 1", false, 1))
{
Serial.println("Message with QoS 1 published successfully");
}
else
{
Serial.println("Message with QoS 1 publish failed");
}
// Publish a message with QoS 2
if (mqttClient.publish("topic", "Device status with QoS 2", false, 2))
{
Serial.println("Message with QoS 2 published successfully");
}
else
{
Serial.println("Message with QoS 2 publish failed");
}
Message Expiry Interval
The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.
For MQTT 5
int expiryInterval = 60;
const char* message = “device status messge”;
mqttClient.beginPublish(topic, strlen(message), false, expiryInterval
*1000); // Expiry interval in milliseconds
mqttClient.print(payload);
mqttClient.endPublish();
Topic Alias
The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.
For MQTT 5
// Topic Alias maximum value (adjust based on MQTT broker capabilities)
const int MAX_TOPIC_ALIAS = 10;
const char* message = “Device status message”;
// Publish with topic alias if supported by the broker
mqttClient.beginPublish(topic, strlen(message), false);
mqttClient.writeTopic(topic); // Publish full topic name
mqttClient.writeTopicAlias(1); // Set topic alias, e.g., 1
mqttClient.print(message);
mqttClient.endPublish();
Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.
Subscribing to Topic Filter
To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:
mqttClient.subscribe("cmq/topic");
This topic filter can match with an exact topic or it can have wildcards like # and +
Sending Data
To receive data sent for the subscriptions, a callback function needs to be defined like this:
void onMqttMessage(char* topic, char* payload,
AsyncMqttClientMessageProperties properties, size_t len, size_t index,
size_t total) {
Serial.println("Message received:");
Serial.print(" Topic: ");
Serial.println(topic);
Serial.print(" Payload: ");
Serial.println(payload);
// You can add more processing here based on your application needs
}
void setup() {
Serial.begin(115200);
connectToWifi();
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onMessage(onMqttMessage); // Set the callback function for
incoming messages
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCredentials(mqtt_user, mqtt_password);
mqttClient.connect(client_id);
}
Unsubscribing from Topics
To stop receiving updates from a topic, use the code provided to unsubscribe.
// Unsubscribe from the topicawait
mqttClient.unsubscribe("cmq/topic");
Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.
Use the following code to disconnect the client from the broker:
mqttClient.disconnect();
You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.
Unique Client ID Management
Assign a specific client ID to each device for accurate identification. In private settings, designate unique IDs for each client. In shared environments, append a random string to each client ID to maintain uniqueness.
Data Design
Strategically plan your data structure beforehand. Whether you are working with plain text, JSON-formatted data, or numerical data, ensure the design aligns with your application's needs and use case.
Robust Error Handling
Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.
Securing Credentials
Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.
Regular Testing & Monitoring
Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.
Optimizing Session Management
Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.
Reconnect on Disconnect
Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.
Download the complete code for client that uses ESP32 MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice.
For MQTT 3.1.1
//This code reads the availability status and productivity status of machine
to mqttbroker
//include necessary libraries
#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include "time.h"
// WiFiUDP ntpUDP;
const char *ntpServer = "time.google.com";
WiFiClient ProxSen;
PubSubClient client(ProxSen);
unsigned long epochTime = 0;
unsigned long currentMillis1 = 0;
const char* ssid = "xxxxxxxxxx"; // replace xxx.. with your WIFI SSID
const char* password = "xxxxxxxxxx"; // repace with WIFI Password
const char* mqtt_server = "xxxxxxxxxxxxx"; //MQTT server name
const char* mqtt_user = "xxxxxxxxxxxxx"; //MQTT broker user name
const char* mqtt_password = "xxxxxxxxxx"; //MQTT broker password
const int ProxSenPin = 33; //availablity status read
const int productivityPin = 26; // produtivity status read
const unsigned long aggregationInterval = 60000; // 1 minute
unsigned int availabilityValues[60]; // Store availability and productivity
values for 1 minutes
// unsigned int changeoverValues[300];
unsigned int valueIndex = 0;
int availability = 0;
int productivity = 1;
bool produceFlag = false;
unsigned long getTime()
{
time_t now;
Serial.println("inside gettime");
struct tm timeinfo;
if (!getLocalTime(&timeinfo))
{
Serial.println("Failed to obtain time");
getTime();
}
time(&now);
return now;
}
// To read the availability of the machine
void IRAM_ATTR availIsr()
{
availability = digitalRead(ProxSenPin) ? 0 : 1; //device works at negataive
logic
}
// To read the productivity of the machine
void IRAM_ATTR produceIsr()
{
produceFlag = digitalRead(productivityPin) ? false : true;
}
//Connection with mqtt broker
void connectToMqtt()
{
while (!client.connected())
{
Serial.println("Connecting to MQTT...");
if (client.connect("ProxSen", mqtt_user, mqtt_password))
{
Serial.println("Connected to MQTT");
}
else
{
Serial.print("Failed to connect to MQTT, rc=");
Se}
rial.print(client.state());
Serial.println(" Retrying in 5 seconds...");
delay(5000);
}
}
void setup()
{
Serial.begin(115200);
//configure WIFI connection
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{
delay(5000);
Serial.print(".");
}
Serial.println("WiFi connected");
//configure MQTTBroker
client.setServer(mqtt_server,1883);
configTime(0, 0, ntpServer);
epochTime = getTime();
connectToMqtt();
pinMode(ProxSenPin, INPUT);
attachInterrupt(digitalPinToInterrupt(ProxSenPin), availIsr, CHANGE);
pinMode(productivityPin, INPUT);
attachInterrupt(digitalPinToInterrupt(productivityPin), produceIsr,
CHANGE);
availability = digitalRead(ProxSenPin)?0:1;
delay(25);
}
void loop()
{
epochTime = epochTime + 1;
availabilityValues[valueIndex] = availability;
valueIndex++;
//publish the production data at the part end time
if(produceFlag)
{
DynamicJsonDocument jsonDocument1(1000);
String jsonProductivity;
produceFlag = false;
jsonDocument1["production_status"] = 1;
jsonDocument1["timestamp"] = epochTime;
serializeJson(jsonDocument1,jsonProductivity);
if (!client.connected())
{
connectToMqtt();
}
client.publish("ProxSen/productivity",jsonProductivity.c_str());
}
//publish machine status every 5 seconds
if(millis() - currentMillis1 >= 5000)
{
currentMillis1 = millis();
DynamicJsonDocument jsonDocument2(1000);
String jsonAvailStatus;
jsonDocument2["machine_status"] = availability;
jsonDocument2["timestamp"] = epochTime;
serializeJson(jsonDocument2,jsonAvailStatus);
if (!client.connected())
{
connectToMqtt();
}
client.publish("ProxSen/machine_status", jsonAvailStatus.c_str());
}
delay(1000);
}
For MQTT 5
#include <Arduino.h>
#include <WiFi.h>
#include <AsyncMqttClient.h>
#include <ArduinoJson.h>
#include "time.h"
// WiFi credentials
const char* ssid = "your_SSID"; // Replace with your WiFi SSID
const char* password = "your_PASSWORD"; // Replace with your WiFi
password
// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com"; // MQTT broker
server name
const uint16_t mqtt_port = 1883; // MQTT broker port (use 1883 for
non-TLS)
const char* mqtt_user = "your_mqtt_username"; // MQTT broker username
const char* mqtt_password = "your_mqtt_password"; // MQTT broker
password
// MQTT client settings
const char* client_id = "ESP32Client";
AsyncMqttClient mqttClient;
WiFiClient wifiClient;
unsigned long epochTime = 0;
unsigned long currentMillis1 = 0;
const char* ntpServer = "time.google.com";
const int ProxSenPin = 33; // Availability status pin
const int productivityPin = 26; // Productivity status pin
const unsigned long aggregationInterval = 60000; // 1 minute
unsigned int availabilityValues[60]; // Store availability values for 1
minute
unsigned int valueIndex = 0;
int availability = 0;
bool produceFlag = false;
void IRAM_ATTR availIsr() {
availability = digitalRead(ProxSenPin) ? 0 : 1; // Inverse logic (negative
logic)
}
void IRAM_ATTR produceIsr() {
produceFlag = digitalRead(productivityPin) ? false : true;
}
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
}
unsigned long getTime() {
time_t now;
struct tm timeinfo;
if (!getLocalTime(&timeinfo)) {
Serial.println("Failed to obtain time");
getTime();
}
time(&now);
return now;
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
// Subscribe to necessary topics if needed
mqttClient.subscribe("ProxSen/#");
// Example publishing on connect (if needed)
String message = "Hello, MQTT!";
mqttClient.publish("ProxSen/test", 0, false, message.c_str());
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Serial.println("Disconnected from MQTT.");
if (WiFi.isConnected()) {
connectToMqtt();
}
}
void setup() {
Serial.begin(115200);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCredentials(mqtt_user, mqtt_password);
mqttClient.setClientId(client_id);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
// Configure MQTT 5 specific settings (optional)
mqttClient.setCleanSession(true); // Set clean session to true or false
based on your requirement
// Connect to MQTT broker
connectToMqtt();
// Setup interrupts for availability and productivity pins
pinMode(ProxSenPin, INPUT);
attachInterrupt(digitalPinToInterrupt(ProxSenPin), availIsr, CHANGE);
pinMode(productivityPin, INPUT);
attachInterrupt(digitalPinToInterrupt(productivityPin), produceIsr,
CHANGE);
availability = digitalRead(ProxSenPin) ? 0 : 1;
delay(25);
}
void loop() {
epochTime = epochTime + 1;
availabilityValues[valueIndex] = availability;
valueIndex++;
// Publish productivity data when produceFlag is set
if (produceFlag) {
DynamicJsonDocument jsonDocument1(1000);
String jsonProductivity;
produceFlag = false;
jsonDocument1["production_status"] = 1;
jsonDocument1["timestamp"] = epochTime;
serializeJson(jsonDocument1, jsonProductivity);
if (mqttClient.connected()) {
mqttClient.publish("ProxSen/productivity", 0, false,
jsonProductivity.c_str());
}
}
// Publish machine status every 5 seconds
if (millis() - currentMillis1 >= 5000) {
currentMillis1 = millis();
DynamicJsonDocument jsonDocument2(1000);
String jsonAvailStatus;
jsonDocument2["machine_status"] = availability;
jsonDocument2["timestamp"] = epochTime;
serializeJson(jsonDocument2, jsonAvailStatus);
if (mqttClient.connected()) {
mqttClient.publish("ProxSen/machine_status", 0, false,
jsonAvailStatus.c_str());
}
}
// Handle MQTT client library keep alive and message processing
if (mqttClient.connected()) {
mqttClient.loop();
} else {
connectToMqtt();
}
delay(1000);
}
Steps to Flash the Code into ESP32
Following are the steps to upload the code into the ESP32 processor :
Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration.