This documentation offers a comprehensive guide on connecting MQTT clients to our CrystalMQ broker or any broker of your choice using .NET. It provides step-by-step instructions, beginning with setting up the development environment and covers establishing connections via TCP, secure ports, and WebSocket protocols. Additionally, it includes guidelines for configuring MQTT authentication, ensuring a secure and reliable connection between your .NET applications and the broker.
Before getting started, ensure you have the following:
To begin, set up your development environment by installing the necessary.NET libraries. You will need the MQTT .NET library to facilitate MQTT communication in your .NET application.
dotnet add package MQTTnet --version 4.3.6.1152
Create a new .NET console application.
Windows:
dotnet new console -n CrystalMQ
This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)
MQTT Over TCP
Use the following code to connect the client over TCP.
Define the Macro ADDRESS using MQTT Broker's connection parameters.
For MQTT 3.3.1 :
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883)
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
For MQTT 5 :
// Create Client
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500) // Use
MQTT version 5.0
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
MQTT Over TLS / SSL
Use the following code to connect securely to MQTT Broker over TLS. Define the Macro ADDRESS using MQTT Broker's connection parameters.
// Define the Macro ADDRESS
#define ADDRESS "public-mqtt-broker.bevywise.com:8883"
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_", 10))
.WithTcpServer(GetHostFromAddress(ADDRESS), GetPortFromAddress(ADDRESS))
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List
AllowUntrustedCertificates = true, // Set to false in production
IgnoreCertificateChainErrors = true, // Set to false in production
IgnoreCertificateRevocationErrors = true // Set to false in
production
})
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
Set TLS parameters before calling the MQTTClient_connect to connect the client to the MQTT Broker securely over TLS.
If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_", 10))
.WithTcpServer(GetHostFromAddress(ADDRESS), GetPortFromAddress(ADDRESS))
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
AllowUntrustedCertificates = true, // Allow untrusted certificates
IgnoreCertificateChainErrors = true, // Ignore certificate chain errors
IgnoreCertificateRevocationErrors = true // Ignore certificate revocation
errors
})
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_", 10))
.WithTcpServer(GetHostFromAddress(ADDRESS), GetPortFromAddress(ADDRESS))
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List
CertificateValidationHandler = context =>
{
// Custom server certificate validation logic
return context.SslPolicyErrors ==
System.Net.Security.SslPolicyErrors.None;
},
AllowUntrustedCertificates = false, // Do not allow untrusted
certificates
IgnoreCertificateChainErrors = false, // Do not ignore certificate chain
errors
IgnoreCertificateRevocationErrors = false // Do not ignore certificate
revocation errors
})
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:
// Load the root certificate
var rootCert = new X509Certificate2("path_of _the_root_file");
// Configure MQTT client options
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 8883)
.WithCredentials("highlysecure", "N4xnpPTru43T8Lmk") // Add username and
password
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List
AllowUntrustedCertificates = true, // Disable certificate chain
validation
IgnoreCertificateChainErrors = true, // Ignore certificate chain
errors
IgnoreCertificateRevocationErrors = true // Ignore certificate
revocation errors
})
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
MQTT Over WebSocket
Define the MQTT Broker Address like this to connect the client over WebSocket.
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithWebSocketServer("ws://public-mqtt-broker.bevywise.com:10443/mqtt") //
Use WebSocket server URI
.Build();
MQTT Over Secure WebSocket
Use the following code to connect the client over Secure WebSocket. Set TLS Options as given in MQTT Over TLS section.
Define the Macro ADDRESS using MQTT Broker's connection parameters.
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithWebSocketServer("ws://public-mqtt-broker.bevywise.com:11443/mqtt") //
Use WebSocket server URI
.Build();
Configuring MQTT Authentication
To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:
// Configure MQTT client options
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883)
.WithCredentials("username", "password") // Add username and password
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
Setting Up Last Will & Testament
Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.
Use the following code to set Last Will in the Connection Options:
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883) // Use your broker
address and port
.WithWillMessage(new MqttApplicationMessageBuilder()
.WithTopic("will/topic")
.WithPayload("Client1 has disconnected unexpectedly")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true)
.Build())
.Build();
Adjusting Keep Alive
MQTT maintains client-broker connections with a keep-alive mechanism. Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.
Modify the code below to suit your requirements:
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883) // Use your
broker address and port
.WithWillMessage(new MqttApplicationMessageBuilder()
.WithTopic("will/topic")
.WithPayload("Client1 has disconnected unexpectedly")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true)
.Build())
.Build();
Configuring Session Persistence
Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0. The Client can get the MQTT Broker to store its session data across connections.
MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.
For MQTT 3.1.1 :
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_", 10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883) // Use your
broker address and port
.WithCleanSession(false) // Set CleanSession to false to keep session
information
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) // Use
MQTT version 3.1.1
.Build();
MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.
For MQTT 5 :
var sessionExpiryInterval = 3600; // Set session expiry interval to 3600
seconds (1 hour)
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_", 10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883) // Use your
broker address and port
.WithCleanStart(false) // Set CleanStart to false
.WithSessionExpiryInterval((uint)sessionExpiryInterval) // Set Session
Expiry Interval
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500) // Use
MQTT version 5.0
.Build();
Setting Maximum Packet Size
MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:
For MQTT 5:
var maxPacketSize = 1024; // Example: Set maximum packet size to 1024
bytes
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_", 10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883) // Use your
broker address and port
.WithReceiveMaximum(maxPacketSize) // Set ReceiveMaximum to limit packet
size
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500) // Use
MQTT version 5.0
.Build();
Sending Data
Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:
For MQTT 3.1.1 :
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT Broker."); //
Publish a message
var message = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("Hello CrystalMQ")
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
Console.WriteLine("Message published.");
});
For MQTT 5 :
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT Broker.");
// Publish a message
var message = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("Hello CrystalMQ")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag(true)
.Build();
var result = await mqttClient.PublishAsync(message);
Console.WriteLine($"Message published (Result:
{result.ReasonString}).");
});
Setting Retained Messages
Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.
To implement this, use the following code snippet:
var message = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("Hello CrystalMQ")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag(true) // Enable retain flag
.Build();
Specifying QoS Levels
MQTT provides three levels of Quality of Service (QoS) for message delivery:
Specify the required QoS level when publishing MQTT messages using this code:
// Publish a message with QoS 0
var messageQoS0 = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("QoS 0: Hello CrystalMQ")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
.WithRetainFlag(true) // Retain flag can be set if needed
.Build();
// Publish a message with QoS 1
var messageQoS1 = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("QoS 1: Hello CrystalMQ")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true) // Retain flag can be set if needed
.Build();
// Publish a message with QoS 2
var messageQoS2 = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("QoS 2: Hello CrystalMQ")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag(true) // Retain flag can be set if needed
.Build();
Message Expiry Interval
The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.
For MQTT 5 :
var message = new MqttApplicationMessageBuilder()
.WithTopic("test/topic")
.WithPayload("Hello, MQTT!")
.WithMessageExpiryInterval(300) // 300 seconds = 5 minutes
.Build();
Topic Alias
The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.
For MQTT 5 :
var message = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("Hello, CrystalMQ")
.WithTopicAlias(1) // Alias for 'cmq/topic'
.Build();
Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.
Subscribing to Topic Filter
To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT Broker.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(new
MQTTnet.Client.Subscribing.MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic("cmq/topic")).Build());
Console.WriteLine("Subscribed to topic.");
});
This topic filter can match with an exact topic or it can have wildcards like # and +
Receiving Data
To receive data sent for the subscriptions, a callback function needs to be defined like this:
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
Console.WriteLine($"Received message on topic
'{e.ApplicationMessage.Topic}':
{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
});
Unsubscribing from Topics
To stop receiving updates from a topic, use the code provided to unsubscribe.
// Unsubscribe from the topicawait
mqttClient.UnsubscribeAsync("cmq/topic");
Console.WriteLine("Unsubscribed from topic.");
Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.
Use the following code to disconnect the client from the broker:
mqttClient.UseDisconnectedHandler(async e =>
{
Console.WriteLine("Disconnected from MQTT broker.");
// Optionally, handle reconnection logic here
});
await mqttClient.DisconnectAsync();
You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.
Unique Client ID Management
Assign a distinct client ID to each device for proper identification. In private instances, allocate unique IDs to individual clients. In shared environments, attach a random string to each client ID to ensure uniqueness.
Data Design
Plan your data structure in advance. Whether dealing with plain text or JSON-formatted data or numbers, ensure effective design of data that suits your application/usecase/need.
Robust Error Handling
Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.
Securing Credentials
Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.
Regular Testing & Monitoring
Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.
Optimizing Session Management
Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.
Reconnect on Disconnect
Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.
Download the complete code for client that uses .NET MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice.
For MQTT 3.1.1 :
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client
class Program
{
static async Task Main(string[] args)
{
// Load the root certificate
//var rootCert = new X509Certificate2("path_of_the_root_file");
// Configure MQTT client options
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883) // Enable TLS change
port 8883
//.WithWebSocketServer("ws://public-mqtt-broker.bevywise.com:10443/mqtt") //
Use WebSocket server URI and Enable TLS change port 11443
// .WithCredentials("username", "password") // Add username and password
//
// .WithTls(new MqttClientOptionsBuilderTlsParameters
// {
// UseTls = true,
// Certificates = new List
// AllowUntrustedCertificates = true, // Disable certificate chain
validation
// IgnoreCertificateChainErrors = true, // Ignore certificate chain
errors
// IgnoreCertificateRevocationErrors = true // Ignore certificate
revocation errors
// })
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
// Handle client events
mqttClient.ConnectedAsync += async e =>
{
Console.WriteLine("Connected successfully with MQTT Broker.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(new
MqttClientSubscribeOptionsBuilder().WithTopicFilter(f =>
f.WithTopic("cmq/topic")).Build());
Console.WriteLine("Subscribed to topic.");
// Publish a message
var message = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("Hello CrystalMQ")
.Build();
await mqttClient.PublishAsync(message);
Console.WriteLine("Message published.");
};
mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine("Disconnected from MQTT Broker.");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(options);
}
catch
{
Console.WriteLine("Reconnecting failed.");
}
};
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message:
{System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
return Task.CompletedTask;
};
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
// Keep the application running
Console.WriteLine("Press key to exit");
Console.ReadLine();
private static string GenerateClientId(string prefix, int length)
{
const string chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
var random = new Random();
var randomChars = new char[length];
for (int i = 0; i < length; i++)
{
randomChars[i] = chars[random.Next(chars.Length)];
}
return prefix + new string(randomChars);
}
}
For MQTT 5 :
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
class Program
{
static async Task Main(string[] args)
{
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 1883)
//.WithWebSocketServer("ws://public-mqtt-broker.bevywise.com:10443/mqtt")
// .WithCredentials("username", "password") // Add username and password
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500) //if you
use mqtt version 5. note: TLS enable is not support version 5
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
// Handle client events
mqttClient.ConnectedAsync += async e =>
{
Console.WriteLine("Connected successfully with MQTT Broker.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(new
MqttClientSubscribeOptionsBuilder().WithTopicFilter(f =>
f.WithTopic("cmq/topic")).Build());
Console.WriteLine("Subscribed to topic.");
// Publish a message
var message = new MqttApplicationMessageBuilder()
.WithTopic("cmq/topic")
.WithPayload("Hello CMQ")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag(true)
.Build();
await mqttClient.PublishAsync(message);
Console.WriteLine("Message published.");
};
mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine("Disconnected from MQTT Broker.");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(options);
}
catch
{
Console.WriteLine("Reconnecting failed.");
}
};
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message:
{System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
return Task.CompletedTask;
};
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
// Keep the application running
Console.WriteLine("Press key to exit");
Console.ReadLine();
private static string GenerateClientId(string prefix, int length)
{
const string chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
var random = new Random();
var randomChars = new char[length];
for (int i = 0; i < length; i++)
{
randomChars[i] = chars[random.Next(chars.Length)];
}
return prefix + new string(randomChars);
}
}
Here is how this C client can be compiled using gcc compiler to an executable.
Using .Net SDK :
To compile and run the program, use the following command:
dotnet run
This command will build and run your application, and you should see the output in the terminal.
Build the project (optional):
If you want to compile the project without running it, use:
dotnet build
The compiled binaries will be located in the bin directory within your project folder.
Using Visual Studio:
1. Compile and run the program:
Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration.