This documentation guides you through the process of connecting an MQTT client to our CrystalMQ broker or any broker of your choice using the Eclipse Paho Java client library. It covers setting up connections via TCP, secure ports, and websockets, configuring MQTT authentication, utilizing advanced features, and performing basic operations like publish, subscribe, unsubscribe, and disconnecting clients.
Ensure you have:
Setting up the development environment
Configure your Java environment to include the Eclipse Paho MQTT client library. This library facilitates MQTT communication and can be integrated into your Java project via Maven or by manual library inclusion.
Download JARS:
org.eclipse.paho.client.mqttv3-1.2.5.jar for Mqttv3 Client
org.eclipse.paho.mqttv5.client-1.2.5.jar for Mqttv5 Client
This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)
MQTT Over TCP
Use the following code to connect the client over TCP.
Define the Macro ADDRESS using MQTT Broker's connection parameters.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
boolean TLS = false; // Enable TLS
boolean AUTH = true;
int port = TLS? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip,
port);
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(brokerUrl, client_id, persistence);
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
boolean AUTH = true;
int port = 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(brokerUrl, client_id, persistence);
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
MQTT Over TLS / SSL
Use the following code to connect securely to MQTT Broker over TLS.
Define the Macro ADDRESS using MQTT Broker's connection parameters.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
boolean TLS = true;
boolean AUTH = true;
int port = TLS? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip,
port);
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Replace with
your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.
If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:
boolean TLS = true; // Use TLS
int port = TLS ? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip,
port);
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
connOpts.setSocketFactory(SocketFactory.getDefault()); // Use default socket
factory
}
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:
boolean TLS = true; // Use TLS
int port = TLS ? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip,
port);
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
SSLSocketFactory socketFactory = createSSLSocketFactory();
if (socketFactory != null) {
connOpts.setSocketFactory(socketFactory);
} else {
System.out.println("Error creating SSL socket factory.");
return;
}
}
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Continue with MQTT operations...
} catch (MqttException e) {
System.out.println("Error connecting to MQTT broker: " +
e.getMessage());
e.printStackTrace();
}
private static SSLSocketFactory createSSLSocketFactory() {
try {
// Load your trusted CA certificates
KeyStore trustedStore = KeyStore.getInstance(KeyStore.getDefaultType());
// Load your trusted CA certificates here
// trustedStore.load(new FileInputStream("path_to_trusted_ca_cert"),
"password".toCharArray());
// Create and initialize SSLContext
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null); // Use default trust managers
return sslContext.getSocketFactory();
} catch (Exception e) {
System.out.println("Error creating SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
return null;
}
If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:
// Load the root certificate
var rootCert = new X509Certificate2("path_of _the_root_file");
// Configure MQTT client options
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 8883)
.WithCredentials("highlysecure", "N4xnpPTru43T8Lmk") // Add username and
password
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List<X509Certificate> { rootCert },
AllowUntrustedCertificates = true, // Disable certificate chain
validation
IgnoreCertificateChainErrors = true, // Ignore certificate chain errors
IgnoreCertificateRevocationErrors = true // Ignore certificate revocation
errors
})
.Build();
// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");
});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);
MQTT Over WebSocket
Define the MQTT Broker Address like this to connect the client over WebSocket.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//For Version-5
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
int port;
String broker_Url= "";
boolean TLS = false; //set true to Enable TLS
boolean AUTH = true;
boolean web_socket = true;
if(web_socket)
port= TLS? 11433: 10433;
else
port = TLS? 8883 : 1883;
MqttClient client;
if(web_socket)
brokerUrl= String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip,
port);
else
brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip,
port);
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Replace with
your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
MQTT Over Secure WebSocket
Use the following code to connect the client over Secure WebSocket.
Set TLS Options as given in MQTT Over TLS section.
Define the Macro ADDRESS using MQTT Broker's connection parameters.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//For Version-5
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
int port;
String broker_Url= "";
boolean TLS = true; //set true to Enable TLS
boolean AUTH = true;
boolean web_socket = true;
if(web_socket)
port= TLS? 11433: 10433;
else
port = TLS? 8883 : 1883;
MqttClient client;
if(web_socket)
brokerUrl= String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip,
port);
else
brokerUrl =
String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip,
port);
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Replace with
your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
Configuring MQTT Authentication
To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
Advanced Features
Setting Up Last Will & Testament
Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.
Use the following code to set Last Will in the Connection Options:
// Example topic to subscribe
final String subscribeTopic = "will_topic/device1";
// Subscribing to a topic
client.subscribe(subscribeTopic);
Adjusting Keep Alive
MQTT maintains client-broker connections with a keep-alive mechanism. Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.
Modify the code below to suit your requirements:
connOpts.setKeepAlive(40);
Configuring Session Persistence
Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0. The Client can get the MQTT Broker to store its session data across connections.
MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
connOpts.setSocketFactory(SocketFactory.getDefault()); // Use
default
socket factory
}
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(cleanSession); // Set Clean Session
flag
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Continue with MQTT operations...
} catch (MqttException e) {
System.out.println("Error connecting to MQTT broker: " +
e.getMessage());
e.printStackTrace();
}
MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.
import org.eclipse.paho.client.mqttv5.*;
import org.eclipse.paho.client.mqttv5.persist.MemoryPersistence;
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttAsyncClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
connOpts.setSocketFactory(SocketFactory.getDefault()); // Use
default
socket factory
}
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanStart(cleanStart); // Set Clean Start flag
connOpts.setSessionExpiryInterval(sessionExpiryInterval); // Set
Session
Expiry Interval
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
IMqttToken connectToken = client.connect(connOpts);
connectToken.waitForCompletion();
System.out.println("Connected");
// Continue with MQTT operations...
} catch (MqttException e) {
System.out.println("Error connecting to MQTT broker: " +
e.getMessage());
e.printStackTrace();
}
Setting Maximum Packet Size
MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:
connOpts.setMaximumPacketSize(256);
Sending Data
Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
String topic = "cmq/topic";
String message = "hello world"';
int qos = 1;
boolean retain = false;
client.publish(topic, message.getBytes(), qos, retain);
import org.eclipse.paho.client.mqttv5.*;
import org.eclipse.paho.client.mqttv5.persist.MemoryPersistence;
// Publish message
String topic = "cmq/topic";
String message = "hello world";
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(1);
mqttMessage.setRetained(false);
// Publish message asynchronously
IMqttDeliveryToken publishToken = client.publish(topic, mqttMessage);
publishToken.waitForCompletion();
System.out.println("Message published");
Setting Retained Messages
Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.
To implement this, use the following code snippet:
boolean retain = false;
client.publish(topic, message.getBytes(), qos, retain);
Specifying QoS Levels
MQTT provides three levels of Quality of Service (QoS) for message delivery:
Specify the required QoS level when publishing MQTT messages using this code:
int qos = 1;
client.publish(topic, message.getBytes(), qos, retain);
Message Expiry Interval
The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.
import org.eclipse.paho.mqttv5.common.MqttProperties;
String topic ="topic1";
String messageContent= "hello world";
int qos = 1;
boolean retain = false;
MqttMessagemessage=newMqttMessage(messageContent.getBytes());
message.setQos(qos);
message.setRetained(retain);
MqttPropertiesproperties=newMqttProperties();
properties.setMessageExpiryInterval(60L);
message.setProperties(properties);
client.publish(topic, message);
Topic Alias
The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.
import org.eclipse.paho.mqttv5.common.MqttProperties;
MqttConnectionOptions options=newMqttConnectionOptions();
MqttPropertiesproperties=newMqttProperties();
properties.setTopicAliasMaximum(10);
options.setMqttProperties(properties);
client.connect(options);
Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.
Subscribing to Topic Filter
To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:
String topic = "cmq/topic";
int qos = 1;
client.subscribe(topic, qos);
This topic filter can match with an exact topic or it can have wildcards like # and +
Receiving Data
To receive data sent for the subscriptions, a callback function needs to be defined like this:
// Set callback
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
cause.printStackTrace();
// Handle reconnection or other actions
}
@Override
public void messageArrived(String topic, MqttMessage message) throws
Exception {
System.out.println("Message received:");
System.out.println(" Topic: " + topic);
System.out.println(" Message: " + new String(message.getPayload()));
System.out.println(" QoS: " + message.getQos());
System.out.println(" Retained: " + message.isRetained());
// Process the received message
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete: " + token.getMessage());
// Handle delivery confirmation
}
});
Unsubscribing from Topics
To stop receiving updates from a topic, use the code provided to unsubscribe.
// Unsubscribe from the topicawait
String topic="cmq/topic";
client.unsubscribe(topic);
Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.
Use the following code to disconnect the client from the broker:
try {
client.disconnect();
System.out.println("Disconnected from broker.");
} catch (MqttException e) {
System.out.println("Error disconnecting from broker: " + e.getMessage());
e.printStackTrace();
}
You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.
Client Identification Management
Assign distinct client IDs to individual devices to ensure accurate identification. For private instances, allocate unique IDs to each client; in shared environments, append a random string to guarantee ID uniqueness.
Data Structuring
Plan your data architecture proactively. Whether handling plain text, JSON-formatted data, or numerical values, ensure the structure aligns effectively with your application's specific requirements.
Robust Error Handling
Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.
Securing Credentials
Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.
Regular Testing & Monitoring
Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.
Optimizing Session Management
Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.
Reconnect on Disconnect
Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.
Download the complete code for client that uses Java MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice.
//paho mqttv3
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.SSLContext;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.KeyStore;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.TrustManagerFactory;
import java.security.KeyManagementException;
import java.util.Random;
import java.util.Scanner;
public class MqttClientv3 {
public static void main(String[] args) {
String client_id = "your_client_id";
String hostip = "public-mqtt-broker.bevywise.com";
boolean TLS = false; // set true to Enable TLS
boolean AUTH = true;
String client_username = "Access-Key";
String client_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
String topic = "";
int qos = 0;
boolean retain = false;
String brokerUrl= "";
int port;
if(websocket_connection)
port= TLS ? 11433:10433;
else
port = TLS ? 8883 : 1883; // Use 8883 for SSL/TLS, 1883 for non-SSL
// Creating a new MQTT client instance
MqttClient client;
if(websocket_connection)
brokerUrl=String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, port);
else
brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
// Setting callbacks
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
System.out.println("Received message on topic: " + topic);
System.out.println("Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully.");
}
});
// Setting connection options
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Replace with your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Example topic to subscribe
final String subscribeTopic = "will_topic/device1";
// Subscribing to a topic
client.subscribe(subscribeTopic);
// Publishing messages or subscribing/unsubscribing based on user input
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("1. Publish\n2. Subscribe\n3. Disconnect\n4.
Unsubscribe");
int choice = scanner.nextInt();
scanner.nextLine(); // consume newline
switch (choice) {
case 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] input = scanner.nextLine().split(" ");
topic = input[0];
qos = Integer.parseInt(input[1]);
retain = Integer.parseInt(input[2]) == 1;
int count = Integer.parseInt(input[3]);
for (int i = 0; i < count; i++) { String message="{\" temp\":" + (new
Random().nextInt(50) + 1) + ",\" pressure\":" + (new Random().nextInt(100)
+
1) + ",\" status\":" + (new Random().nextInt(2)) + "}" ; try {
client.publish(topic, message.getBytes(), qos, retain);
System.out.println("Published message: " + message);
} catch (MqttException e) {
System.out.println(" Error publishing message: " + e.getMessage());
e.printStackTrace();
}
Thread.sleep(6000); // 6 seconds
}
break;
case 2:
System.out.print(" Enter Topic QoS(0/1/2): ");
String[] subInput = scanner.nextLine().split(" ");
topic = subInput[0];
qos = Integer.parseInt(subInput[1]);
try {
client.subscribe(topic, qos);
System.out.println(" Subscribed to topic: " + topic);
} catch (MqttException e) {
System.out.println(" Error subscribing to topic: " + e.getMessage());
e.printStackTrace();
}
break;
case 3:
try {
client.disconnect();
System.out.println(" Disconnected from broker."); } catch (MqttException e)
{ System.out.println("Error disconnecting
from broker: " + e.getMessage());
e.printStackTrace();
}
break;
case 4:
System.out.print(" Enter Topic to unsubscribe: ");
topic = scanner.nextLine();
try {
client.unsubscribe(topic);
System.out.println(" Unsubscribed from topic: " + topic);
} catch (MqttException e) {
System.out.println(" Error unsubscribing from topic: " +
e.getMessage());
e.printStackTrace();
}
break;
default:
System.out.println(" Invalid choice."); } } } catch (MqttException |
InterruptedException e) { System.out.println("An
error occurred: " + e.getMessage());
e.printStackTrace();
}
}
// Method to create SSL socket factory (if needed)
private static SSLSocketFactory getSocketFactory(String caCrtFile) throws
Exception {
CertificateFactory cf = CertificateFactory.getInstance(" X.509");
FileInputStream caInput=new
FileInputStream(caCrtFile); X509Certificate caCert=(X509Certificate)
cf.generateCertificate(caInput); caInput.close(); KeyStore
keyStore=KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("ca-certificate",
caCert); TrustManagerFactory
tmf=TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore); SSLContext context=SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null); return
context.getSocketFactory(); } }
MQTT 5
//paho mqttv5
import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.SSLContext;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.KeyStore;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.KeyManagerFactory;
import java.security.KeyManagementException;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
public class MqttClientv5 {
public static void main(String[] args)throws Exception{
String client_id = "your_client_id";
String hostip = "public-mqtt-broker.bevywise.com";
boolean AUTH = true;
String client_username = "Access-Key";
String client_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
String topic = "";
int qos = 0;
boolean retain = false;
String brokerUrl= "";
int port;
if(websocket_connection)
port= 10433;
else
port = 1883; // Use 8883 for SSL/TLS, 1883 for non-SSL
// Creating a new MQTT client instance
MqttClient client;
if(websocket_connection)
brokerUrl=String.format("%s://%s:%d", "ws", hostip, port);
else
brokerUrl = String.format("%s://%s:%d", "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
// Setting callbacks
client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
System.out.println("Disconnected from broker.");
}
@Override
public void mqttErrorOccurred(MqttException exception) {
System.out.println("MQTT Error: " + exception.getMessage());
exception.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
System.out.println("Received message on topic: " + topic);
System.out.println("Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttToken token) {
System.out.println("Message delivered successfully.");
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
System.out.println("Auth packet arrived with reason code: " + reasonCode);
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to broker: " + serverURI);
}
// Setting connection options
MqttConnectionOptions connOpts = new MqttConnectionOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Replace with your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.getBytes()); // Convert to byte array
}
connOpts.setCleanStart(true); // Clean start for MQTTv5
connOpts.setMaxInflight(50);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Example topic to subscribe
final String subscribeTopic = "will_topic/device1";
// Subscribing to a topic
client.subscribe(subscribeTopic, qos);
// Publishing messages or subscribing/unsubscribing based on user input
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("1. Publish\n2. Subscribe\n3. Disconnect\n4.
Unsubscribe");
int choice = scanner.nextInt();
scanner.nextLine(); // consume newline
switch (choice) {
case 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] input = scanner.nextLine().split(" ");
topic = input[0];
qos = Integer.parseInt(input[1]);
retain = Integer.parseInt(input[2]) == 1;
int count = Integer.parseInt(input[3]);
for (int i = 0; i < count; i++) { //String message="{\" temp\":" + (new
Random().nextInt(50) + 1) + ",\" pressure\":" + (new Random().nextInt(100)
+
1) + ",\" status\":" + (new Random().nextInt(2)==0 ? "ON" : "OFF" ) + "}"
;
String message="hello world" ; MqttMessage mqttMessage=new
MqttMessage(message.getBytes()); mqttMessage.setQos(qos);
mqttMessage.setRetained(retain); System.out.println("Message:
"+message);
client.publish(topic, mqttMessage);
System.out.println(" Packet: "+i+" - sent"); } break; case 2:
System.out.print("Enter Topic QoS(0/1/2): ");
input = scanner.nextLine().split(" ");
topic = input[0];
qos = Integer.parseInt(input[1]);
client.subscribe(topic, qos);
break;
case 3:
client.disconnect();
System.out.println(" Disconnected from broker."); return; case 4:
System.out.print("Enter Topic: ");
topic = scanner.nextLine();
client.unsubscribe(topic);
break;
default:
System.out.println(" Invalid choice. Please try again."); } } } catch
(MqttException e) { System.out.println("Error
connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}
}
private static SSLSocketFactory getSocketFactory(String rootCertPath) throws
Exception {
CertificateFactory cf = CertificateFactory.getInstance(" X.509");
FileInputStream fis=new
FileInputStream(rootCertPath); X509Certificate caCert=(X509Certificate)
cf.generateCertificate(fis); fis.close(); KeyStore
ks=KeyStore.getInstance("JKS"); ks.load(null, null);
ks.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory
tmf=TrustManagerFactory.getInstance("SunX509"); tmf.init(ks); SSLContext
sslContext=SSLContext.getInstance("TLSv1.2"); sslContext.init(null,
tmf.getTrustManagers(), null); return sslContext.getSocketFactory(); } }
Steps for Connecting MqttClientv3:
Compile: javac -cp /path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3.java
Run : java -cp .:/path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3
(or) Run with JAR:
Steps for Connecting MqttClientv5:
Compile:javac -cp /path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5.java
Run:java -cp .:/path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5 (or) Run with JAR:
mkdir -p build/lib build/classes build/META-INF/
cp -r MqttClientv3.class build/classes/
cp -r /path/of/org.eclipse.paho.mqttv5.client-1.2.5.jar build/lib/
create a file name MANIFEST.MF:
Manifest-Version: 1.0 Main-Class: MqttClientExample Class-Path: lib/org.eclipse.paho.mqttv5.client-1.2.5.jar
copy the above contents in that, cp -r MANIFEST.MF build/META-INF/
cd build jar cvfm MqttClientv5.jar META-INF/MANIFEST.MF -C classes . -C lib .
java -jar MqttClientv5.jar --Use the build anywhere.
Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration.