This documentation guides you through connecting MQTT clients to our CrystalMQ broker or any broker of your choice using Eclipse Paho C library, enabling efficient communication between devices.
Eclipse Paho Embedded C is suitable for desktop operating systems but is primarily designed for embedded environments such as mbed, Arduino, and FreeRTOS.
Before connecting MQTT clients to the MQTT broker, ensure the following prerequisites are met:
Eclipse Paho MQTT C Library
This library must be included in the Client code as:
#include "MQTTAsync.h"
This library can be built from source.
Linux/macOS
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make
sudo make install
Windows
mkdir build.paho
cd build.paho
call "C:\Program Files (x86)\Microsoft Visual Studio
14.0\VC\vcvarsall.bat" x64
cmake -G "NMake Makefiles" -DPAHO_WITH_SSL=TRUE
-DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE
-DCMAKE_BUILD_TYPE=Release -DCMAKE_VERBOSE_MAKEFILE=TRUE ..
nmake
This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)
MQTT Over TCP
Use the following code to connect the client over TCP. Define the Macro ADDRESS using MQTT Broker's connection parameters.
#define ADDRESS "tcp://public-mqtt-broker.bevywise.com:1883"
#define CLIENTID "crystamq_testclient"
MQTTAsync client;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE,
NULL);
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
/ Create Client
MQTTAsync client;
MQTTAsync_createOptions create_opts =
MQTTAsync_createOptions_initializer;
create_opts.MQTTVersion = MQTTVERSION_5;
int rc = MQTTAsync_createWithOptions(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
if (rc != MQTTASYNC_SUCCESS)
{
fprintf(stderr, "Failed to create client, return code: %s\n",
MQTTAsync_strerror(rc));
exit(EXIT_FAILURE);
}
// Connect Client
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer5;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
MQTT Over TLS / SSL
Use the following code to connect securely to MQTT Broker over TLS. Define the Macro ADDRESS using MQTT Broker's connection parameters.
#define ADDRESS
"ssl://freemqttbroker.sfodo.mqttserver.com:8883"
Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.
If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
ssl_opts.verify = 0;ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
ssl_opts.enableServerCertAuth = 0;
ssl_opts.verify = 0;
conn_opts.ssl = &ssl_opts;
If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
ssl_opts.verify = 0;ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
ssl_opts.enableServerCertAuth = 1;
ssl_opts.verify = 1;
conn_opts.ssl = &ssl_opts;
If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
ssl_opts.verify = 0;ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
ssl_opts.enableServerCertAuth = 1;
ssl_opts.verify = 1;
ssl_opts.CApath = "./root.crt";
conn_opts.ssl = &ssl_opts;
MQTT Over WebSocket
Define the MQTT Broker Address like this to connect the client over WebSocket.
#define ADDRESS
"ws://freemqttbroker.sfodo.mqttserver.com:10443"
MQTT Over Secure WebSocket
Use the following code to connect the client over Secure WebSocket. Set TLS Options as given in MQTT Over TLS section. Define the Macro ADDRESS using MQTT Broker's connection parameters.
#define ADDRESS "wss://public-mqtt-broker.bevywise.com:11443"
Configuring MQTT Authentication
To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE,
NULL);
conn_opts.username = "your-mqtt-username";
conn_opts.password = "your-mqtt-password";
Advanced Features
Setting Up Last Will & Testament
Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.
Use the following code to set Last Will in the Connection Options:
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
will_opts.topicName = "MQTT/Will"; //Change this to will topic of your
choice
will_opts.qos = 0; //Change this to QoS of your choice
will_opts.message = "Unexpectedly Disconnected";
conn_opts.will = &will_opts;
Adjusting Keep Alive
Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.
Modify the code below to suit your requirements:
MQTTAsync_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
Configuring Session Persistence
Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0. The Client can get the MQTT Broker to store its session data across connections.
MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.
conn_opts.cleansession = 0;
MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer5;
conn_opts.cleanstart = 1;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property1
;
property1.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
property1.value.integer2 = 90;
MQTTProperties_add(&props, &property1);
Setting Maximum Packet Size
MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:
MQTTProperty property2;
property2.identifier = MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE;
property2.value.integer4 = 4094;
MQTTProperties_add(&props, &property2);
Sending Data
Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:
#define TOPIC "device/status"
#define PAYLOAD "ON"
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = 1;
pubmsg.retained = 0;
deliveredtoken = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &pub_opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
Setting Retained Messages
Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.
To implement this, use the following code snippet:
pubmsg.retained = 1
Specifying QoS Levels
MQTT provides three levels of Quality of Service (QoS) for message delivery:
Specify the required QoS level when publishing MQTT messages using this code:
pubmsg.qos = 1; // Set to 0 or 1 or 2
Message Expiry Interval
The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.
MQTTAsync_responseOptions pub_opts =
MQTTAsync_responseOptions_initializer;
pub_opts.context = client;
MQTTProperties pub_props = MQTTProperties_initializer;
MQTTProperty property3;
property3.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
property3.value.integer4 = 90;
MQTTProperties_add(&pub_props, &property3);
pub_opts.properties = pub_props;
Topic Alias
The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.
MQTTAsync_responseOptions pub_opts =
MQTTAsync_responseOptions_initializer;
pub_opts.context = client;
MQTTProperties pub_props = MQTTProperties_initializer;
MQTTProperty property3;
property3.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
property3.value.integer2 = 10;
MQTTProperties_add(&pub_props, &property3);
pub_opts.properties = pub_props;
Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.
Subscribing to Topic Filter
To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:
#define FILTER "your-topic-filter"
#define QOS 1
if ((rc = MQTTAsync_subscribe(client, FILTER, QOS, NULL)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
This topic filter can match with an exact topic or it can have wildcards like # and +
Receiving Data
To receive data sent for the subscriptions, a callback function needs to be defined like this:
/*This Callback is triggered when this client has received a message*/
int msgarrvd(void *context, char *topicName, int topicLen,
MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
MQTTClient_setCallbacks(client, NULL, NULL, msgarrvd, NULL);
Unsubscribing from Topics
To stop receiving updates from a topic, use the code provided to unsubscribe.
#define FILTER "your-topic-filter"
if ((rc = MQTTAsync_unsubscribe(client, FILTER, NULL)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
}
Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.
Use the following code to disconnect the client from the broker:
MQTTAsync_disconnect(client, NULL);
MQTTAsync_destroy(&client);
You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.
Client ID Assignment
Assign a specific client ID to every device to maintain accurate identification. In private instances, allocate unique IDs to individual clients, and in shared environments, attach a random string to each client ID for uniqueness assurance.
Designing data
Organize your data structure with careful planning. Whether managing plain text, JSON formatting, or numeric data, ensure the design is customized to suit the specific requirements of your application.
Robust Error Handling
Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.
Securing Credentials
Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.
Regular Testing & Monitoring
Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.
Optimizing Session Management
Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.
Reconnect on Disconnect
Reconnect on Disconnect Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.
Download the complete code for client that uses Paho C MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define TOPIC "sensor/temperature"
#define PAYLOAD "180"
#define QOS 1
#define TIMEOUT 10000L
int finished = 0;
void generate_clientid(char *str, int str_length)
{
srand(time(NULL));
const char *alphanum =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", *PREFIX =
"crystalmq_";
strncpy(str, PREFIX, strlen(PREFIX));
for (int i = 0; i < (str_length-strlen(PREFIX)); ++i) {
str[10 + i] = alphanum[rand() % 62]; // 62 is the length of the alphanum
string
}
str[str_length] = '\0';
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Successful connection\n");
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message
*message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int main(int argc, char* argv[])
{
// Create Client
char clientid[21];
generate_clientid(clientid, 20);
MQTTAsync client;
MQTTAsync_create(&client, ADDRESS, clientid, MQTTCLIENT_PERSISTENCE_NONE,
NULL);
MQTTAsync_setCallbacks(client, NULL, NULL, msgarrvd, NULL);
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);
// Subscribe
MQTTAsync_responseOptions sub_opts =
MQTTAsync_responseOptions_initializer;
sub_opts.onSuccess = onSubscribe;
sub_opts.context = client;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &sub_opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// Publish
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, NULL)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!finished)
#if defined(WIN32) || defined(WIN64)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define TOPIC "sensor/humidity"
#define PAYLOAD "20.3"
#define QOS 1
#define TIMEOUT 10000L
int finished = 0;
void generate_clientid(char *str, int str_length)
{
srand(time(NULL));
const char *alphanum =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", *PREFIX =
"crystalmq_";
strncpy(str, PREFIX, strlen(PREFIX));
for (int i = 0; i < (str_length-strlen(PREFIX)); ++i) {
str[10 + i] = alphanum[rand() % 62]; // 62 is the length of the alphanum
string
}
str[str_length] = '\0';
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Successful connection\n");
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message
*message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int main(int argc, char* argv[])
{
// Create Client
char clientid[21];
generate_clientid(clientid, 20);
MQTTAsync client;
MQTTAsync_createOptions create_opts =
MQTTAsync_createOptions_initializer;
create_opts.MQTTVersion = MQTTVERSION_5;
int rc = MQTTAsync_createWithOptions(&client, ADDRESS, clientid,
MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
MQTTAsync_setCallbacks(client, NULL, NULL, msgarrvd, NULL);
// Set CONNECT Properties
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer5;
conn_opts.keepAliveInterval = 20;
conn_opts.cleanstart = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
MQTTProperties props = MQTTProperties_initializer;
// // Session Expiry Interval
MQTTProperty property1;
property1.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
property1.value.integer2 = 90;
MQTTProperties_add(&props, &property1);
// // Maximum Packet Size
MQTTProperty property2;
property2.identifier = MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE;
property2.value.integer4 = 4094;
MQTTProperties_add(&props, &property2);
conn_opts.connectProperties = &props;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);
// Subscribe
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, NULL)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);
// Publish
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
// Set Publish Properties
MQTTAsync_responseOptions pub_opts =
MQTTAsync_responseOptions_initializer;
pub_opts.context = client;
MQTTProperties pub_props = MQTTProperties_initializer;
MQTTProperty property3;
property3.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
property3.value.integer4 = 90;
MQTTProperties_add(&pub_props, &property3);
pub_opts.properties = pub_props;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &pub_opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);
if ((rc = MQTTAsync_unsubscribe(client, TOPIC, NULL)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!finished)
#if defined(WIN32) || defined(WIN64)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}
Here is how this C client can be compiled using gcc compiler to an executable.
$ gcc -o mqttclient mqttclient.c -lpaho-mqtt3a -lpaho-mqtt3as
$ ./mqttclient
Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration