Einen Java MQTT Client einrichten

Text geknüpft
Einleitung

Diese Dokumentation führt Sie durch den Prozess der Verbindung einer MQTT-Client zu unseremMQTT Broker(CrysqlMQ) oder Makler von Ihre Wahl mit der Eclipse Paho Java-Client-Bibliothek. Es deckt die Einrichtung von Verbindungen über TCP, sichere Ports und Websockets ab, KonfigurationMQTT-Authentifizierung, die Nutzung von erweiterten Funktionen und die Durchführung von grundlegenden Operationen wie veröffentlichen, abonnieren, abmelden und trennen von Clients.

Voraussetzungen

Stellen Sie sicher, dass Sie

  • JDK installiert (Java Development Kit)
  • Eclipse IDE (oder jede Java IDE Ihrer Wahl)
  • Jede MQTT-Broker-Instanz läuft oder zugänglich

Dependance Installation

Einrichtung der Entwicklungsumgebung

Konfigurieren Sie Ihre Java-Umgebung für die Eclipse Paho MQTT Kundenbibliothek. Diese Bibliothek erleichtertMQTKommunikation und kann integriert in Ihr Java-Projekt über Maven oder durch manuelle Bibliotheksinklusion.

JARS herunterladen:

org.eclipse.paho.client.mqtv31.05.jar für Mqtv3 Client
org.eclipse.paho.mqtv5.client1.05.jar für Mqtv5 Client

Anschluss an MQTT Broker

Dieser Abschnitt hat Code-Snippets von verschiedenen Möglichkeiten, um mit MQTT zu verbinden Broker. Stellen Sie sicher, dass der MQTT Broker den Verbindungstyp unterstützt, den Sie verwenden möchten. Erhalten Sie auch die entsprechenden Verbindungsparameter des MQTT Broker (Adresse, Port, Benutzername/Passwort, CA Certificate)

MQT Über TCP

Verwenden Sie den folgenden Code, um den Client über TCP zu verbinden.

Definieren Sie den Macro ADDRESS mit der Verbindung von MQTT Broker Parameter.

MQ3.1.1

Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;

boolean TLS = falsch; // TLS aktivieren
boolean AUTH = true;
int port = TLS? 8883 : 1883;
MqtClient Client;
String Broker Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", Hostip, Hafen;
MemoryPersistenz = neue MemoryPersistence();

Client = neuer MqtClient (Broker) Url, client_id, persistent);

wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

Import org.eclipse.paho.mqtv5.client.*
Import org.eclipse.paho.mqtv5.client.persist.MemoryPersistence;

boolean AUTH = true;
int port = 1883;
MqtClient Client;
String Broker Url = String.format("%s://%s:%d", "tcp", hostip, port);
MemoryPersistenz = neue MemoryPersistence();

Client = neuer MqtClient (Broker) Url, client_id, persistent);

wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

MQTT über TLS / SSL

Verwenden Sie den folgenden Code, um sicher mit MQTT Broker über zu verbinden TLS.

Definieren Sie den Macro ADDRESS mit der Verbindung von MQTT Broker Parameter.

Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;

boolean TLS = true;
boolean AUTH = true;
int port = TLS? 8883 : 1883;
MqtClient Client;
String Broker Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", Hostip, Hafen;
MemoryPersistenz = neue MemoryPersistence();

Client = neuer MqtClient (Broker) Url, client_id, persistent);

MqtConnectOptions connOpts = neue MqttConnectOptions();
wenn (TLS)
Versuch's!
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Ersetzen mit Ihr Wurzelzert-Pfad
} Fang (Ausnahme e) {
System.out.println("Error Einrichten SSL-Sockelfabrik: " + e.getMessage()
e.printStackTrace();
}
}
wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

Setzen Sie TLS-Parameter vor dem Aufruf der MQTTClient_connect an den Client mit dem mQTT Broker sicher über TLS verbinden.

WennMQT Brokerwird in einem vertrauenswürdigen Server und dem Server gehostet Eine Überprüfung ist nicht erforderlich, der folgende Code kann verwendet werden, um TLS-Optionen einzustellen:

boolean TLS = true; // TLS verwenden
int port = TLS ? 8883 : 1883;
MqtClient Client;
String Broker Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", Hostip, Hafen;
MemoryPersistenz = neue MemoryPersistence();

Versuch's!
Client = neuer MqtClient (Broker) Url, client_id, persistent);

MqtConnectOptions connOpts = neue MqttConnectOptions();

wenn (TLS)
connOpts.setSocketFactory(SocketFactory.getDefault()); // Standard-Sockel verwenden Fabrik
}

connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

Wenn der MQTT Broker ein Serverzertifikat aus einer Trusted CA ausgestellt hat, dann kann das Server-Zertifikat mit:

boolean TLS = true; // TLS verwenden
int port = TLS ? 8883 : 1883;
MqtClient Client;
String Broker Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", Hostip, Hafen;
MemoryPersistenz = neue MemoryPersistence();

Versuch's!
Client = neuer MqtClient (Broker) Url, client_id, persistent);

MqtConnectOptions connOpts = neue MqttConnectOptions();

wenn (TLS)
SSLSocketFactory-SockelFactory = createSSLSocketFactory();
wenn (socketFactory != null) {
connOpts.setSocketFactory(socketFactory);
} auch
System.out.println("Error erstellen SSL-Sockelfabrik.");
zurück;
}
}

connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

// Weiter mit MQTT Operationen...

} Fang (MqtException e) {\cHFFFF}
System.out.println("Fehler mit MQTT Broker verbinden: " + e.getMessage()
e.printStackTrace();
}
private statische SSLSocketFactory createSSLSocketFactory() {\cHFFFF}
Versuch's!
// Laden Sie Ihre vertrauenswürdigen CA-Zertifikate
KeyStore TrustedStore = KeyStore.getInstance(KeyStore.getDefaultType());
// Laden Sie Ihre vertrauenswürdigen CA-Zertifikate hier
// TrustedStore.load(new FileInputStream("path_to_trusted_ca_cert"), "Passwort" zuCharArray());

// Erstellen und initialisieren SSLContext
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null), // Standard Trust Manager verwenden

sslContext.getSocketFactory();
} Fang (Ausnahme e) {
System.out.println("Error erstellen SSL-Sockelfabrik: " + e.getMessage()
e.printStackTrace();
}
Rückgabe null
}

Hat der MQTT Broker ein selbstsigniertes Serverzertifikat, dann das Serverzertifikat kann mit dem Root-Zertifikat aus dem MQTT überprüft werden Broker:

// Root-Zertifikat laden
var rootCert = new X509Certificate2("path_of_the_root_file");

// MQTT Client Optionen konfigurieren
var Optionen = neue MqtClientOptionenBuilder()
. MitClientId(GenerateClientId("kristallmq_",10))
. MitTcpServer("public-mqtt-broker.bevywise.com", 8883)
.WithCredentials("highlysecure", "N4xnpPTru43T8Lmk") // Benutzername hinzufügen und Passwort vergessen?
.WithTls(neue MqtClientOptionenBuilderTlsParameter
{\cHFFFF}
UseTls = true,
Zertifikate = neue List<X509Zertifikat> { rootCert }
LassenUntrustedCertificates = true, // Deaktivierte Zertifikatskette Validierung
IgnorierenCertificate ChainErrors = true, // Ignorieren von Zertifikatskettenfehlern
IgnorierenCertificateRevokation Fehler = true // Ignorieren von Zertifikatsanruf Fehler
})
.Build();

// MQTT-Client erstellen
var factory = new MqttFactory();
var mqtClient = factory.CreateMqttClient();
mqtClient.UseConnectedHandler(async e =>
{\cHFFFF}
Console.WriteLine('Connected erfolgreich mit MQTT Broker.");

})
// Anschluss an den MQTT-Broker
warten mqtClient.ConnectAsync(Optionen);

MQTT über WebSocket

Definieren Sie die MQTT Broker Adresse wie diese, um den Client über WebSocket.

Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;
//für Version-5
Import org.eclipse.paho.mqtv5.client.*
Import org.eclipse.paho.mqtv5.client.persist.MemoryPersistence;

int port;
String broker_Url= ";
boolean TLS = false; //set true to Enable TLS
boolean AUTH = true;
boolean web_socket = true;
wenn(web_socket)
Port= TLS? 11433: 10433;
andere
Port = TLS? 8883 : 1883;
MqtClient Client;
wenn(web_socket)
BrokerUrl= String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, Hafen;
andere
Broker Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, Hafen;
MemoryPersistenz = neue MemoryPersistence();

Client = neuer MqtClient (Broker) Url, client_id, persistent);

MqtConnectOptions connOpts = neue MqttConnectOptions();
wenn (TLS)
Versuch's!
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Ersetzen mit
Ihr Wurzelzert-Pfad
} Fang (Ausnahme e) {
System.out.println("Error Einrichten SSL-Sockelfabrik: " +
e.getMessage()
e.printStackTrace();
}
}
wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

MQTT über Secure WebSocket

Verwenden Sie den folgenden Code, um den Client über Secure zu verbinden WebSocket.

Set TLS Optionen wie in MQTT über TLS Abschnitt angegeben.

Definieren Sie den Macro ADDRESS mit der Verbindung von MQTT Broker Parameter.

Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;
//für Version-5
Import org.eclipse.paho.mqtv5.client.*
Import org.eclipse.paho.mqtv5.client.persist.MemoryPersistence;

int port;
String broker_Url= ";
boolean TLS = true; //set true to Enable TLS
boolean AUTH = true;
boolean web_socket = true;
wenn(web_socket)
Port= TLS? 11433: 10433;
andere
Port = TLS? 8883 : 1883;
MqtClient Client;
wenn(web_socket)
BrokerUrl= String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, Hafen;
andere

BrokerUrl =
String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, Hafen; MemoryPersistenz = neue MemoryPersistence();

Client = neuer MqtClient (Broker) Url, client_id, persistent);

MqtConnectOptions connOpts = neue MqttConnectOptions();
wenn (TLS)
Versuch's!
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Ersetzen mit
Ihr Wurzelzert-Pfad
} Fang (Ausnahme e) {
System.out.println("Error Einrichten SSL-Sockelfabrik: " +
e.getMessage()
e.printStackTrace();
}
}
wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

MQTT konfigurieren Authentifizierung

Verbindung mit MQTT Broker, die MQTT Username und MQTT benötigt Passwort für die Authentifizierung, fügen Sie Benutzername und Passwort zu den Verbindungsoptionen wie dies:

wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

Erweiterte Funktionen

Das letzte Testament einrichten

Konfigurieren derLetzter Wille und TestamentFunktion, um eine Meldung, dass der Broker veröffentlichen wird, wenn der Client unerwartet trennt. Das hilft, andere Abonnenten über den Status des abgeschalteten Clients zu informieren.

Verwenden Sie den folgenden Code, um Last Will in der Verbindung zu setzen Optionen:

// Beispielthema abonnieren
Final String abonnierenTopic = "will_topic/device1";

// Anmeldung zu einem Thema
client.subscribe(subscribeTopic);

Adjusting Keep Alive

MQTT unterhält Client-Broker-Verbindungen mit einem halten-alive Mechanismus. Einstellen des Halteintervalls, um zu kontrollieren, wie häufig der Client Absender PINGREQ Nachrichten an den Broker.

Ändern Sie den untenstehenden Code entsprechend Ihren Anforderungen:

connOpts.setKeepAlive(40);

Sitzungspersistenz konfigurieren

Sitzungsdaten eines MQTT-Clients umfassen die Abonnements der Kunde und alle Daten, die der Kunde mit QoS>0 erhalten würde. Der Client kann den MQTT Broker dazu bringen, seine Sitzungsdaten über Verbindungen zu speichern.

MQTT 3.1.1 Clients können Clean Session = 0 bis Anfrage senden der MQTT Broker, um seine Sitzungsinformationen über Verbindungen zu speichern.

MQ3.1.1

Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;

MemoryPersistenz = neue MemoryPersistence();
Versuch's!
Client = neuer MqtClient (Broker) Url, client_id, persistent);
MqtConnectOptions connOpts = neue MqttConnectOptions();
wenn (TLS)
connOpts.setSocketFactory(SocketFactory.getDefault()); // Verwendung Standard Sockel Fabrik
}

connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(cleanSession); // Saubere Sitzung einstellen Flagge

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

// Weiter mit MQTT Operationen...

} Fang (MqtException e) {\cHFFFF}
System.out.println("Fehler mit MQTT Broker verbinden: " + e.getMessage()
e.printStackTrace();
}

MQT 5 Clients können Clean Start = 0 und Session Expiry festlegen Interval = 'N', um den MQTT Broker zu bitten, seine Sitzungsinformationen gespeichert zu halten über Verbindungen für 'N' Sekunden.

MQ3.1.1

Import org.eclipse.paho.client.mqtv5.*;
Import org.eclipse.paho.client.mqtv5.persist.MemoryPersistence;

MQ3.1.1

MemoryPersistenz = neue MemoryPersistence();

Versuch's!
client = new MqttAsyncClient(brokerUrl, client_id, persistent);

MqtConnectOptions connOpts = neue MqttConnectOptions();
wenn (TLS)
connOpts.setSocketFactory(SocketFactory.getDefault()); // Verwendung Standard Sockel Fabrik
}
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanStart(cleanStart); // Start-Flag Clean setzen
connOpts.setSessionExpiryInterval(sessionExpiryInterval); // Set Sitzung Ausführendes Intervall

// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
IMqtToken connectToken = client.connect(connOpts);
verbindenToken.waitForCompletion();
System.out.println("Connected");

// Weiter mit MQTT Operationen...
} Fang (MqtException e) {\cHFFFF}
System.out.println("Fehler mit MQTT Broker verbinden: " + e.getMessage()
e.printStackTrace();
}

Maximal Packungsgröße

MQTT5 Client kann den MQTT Broker anfordern, nur Daten zu senden Pakete weniger als eine bestimmte Größe, indem sie so eingestellt werden:

MQ3.1.1

connOpts.setMaximumPacketSize(256);

Publizieren

Daten senden

Daten effizient an mehrere Teilnehmer verteilen, indem sie es zu bestimmten Themen mit dem folgenden Code-Snippet:

MQ3.1.1

Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;

Stringthema = "cmq/topic";
String message = "hello world";
int qos = 1;
boolean behalten = falsch;
client.publish(topic, message.getBytes(), qos, behalten);

MQTT 5

Import org.eclipse.paho.client.mqtv5.*;
Import org.eclipse.paho.client.mqtv5.persist.MemoryPersistence;

// Mitteilung veröffentlichen
Stringthema = "cmq/topic";
String message = "hello world";
MqttMessage mqtMessage = new MqttMessage(message.getBytes());
mqtMessage.setQos(1);
mqtMessage.setRetained(false);

// Nachricht asynchron veröffentlichen
IMqtDeliveryToken veröffentlichenToken = client.publish(topic, mqtMessage);
veröffentlichenToken.waitForCompletion();
System.out.println("Message veröffentlicht");

Festhaltene Nachrichten einstellen

Aktivieren Sie bei der Veröffentlichung einer Nachricht die Rückbeschriftung, um sicherzustellen, dass Broker speichert die letzte Nachricht für jedes Thema. Dies garantiert neue Abonnenten die jüngste Nachricht beim Verbinden erhalten.

Um dies zu implementieren, verwenden Sie den folgenden Code snippet:

boolean behalten = falsch;
client.publish(topic, message.getBytes(), qos, behalten);

QoS Levels angeben

MQTT bietet drei Ebenen der Qualität des Dienstes (QoS) für Nachrichtenaussendung:

  • QoS 0 (am meisten einmal)
  • QoS 1 (zumindest einmal)
  • QoS 2 (genau einmal)

Geben Sie beim Veröffentlichen von MQTT-Nachrichten den erforderlichen QoS-Level an dieser Code:

int qos = 1;
client.publish(topic, message.getBytes(), qos, behalten);

Nachricht abbrechen Intervall

Die Eigenschaft "Message Ablaufintervall" setzt das Leben einer Nachricht in Sekunden überspannen; wenn der Broker innerhalb dieser Zeit nicht ausgeliefert wird, wird er verworfen. MQTT5 unterstützt diese Funktion. MQTT5 Clients können dies während der Veröffentlichung von Daten festlegen.

MQTT 5

Import org.eclipse.paho.mqtv5.common.MqtProperties;
Stringthema = "topic1";
String messageContent= "hello world";
int qos = 1;
boolean behalten = falsch;
MqttMessagemessage=newMqtMessage(messageContent.getBytes());
message.setQos(qos)
message.setRetained(behalten);
MqtPropertiesproperties=newMqttProperties(); Eigenschaften.setMessageExpiryInterval(60L);
message.setProperties(Eigenschaften)
client.publish(topic, message);

Thema Alias

Die Eigenschaft 'Topic Alias' ermöglicht es Kunden, ein kurzes Alias zu verwenden anstatt eines vollständigen Themennamens, die Reduzierung der Nachrichtenpaketgröße und die Verbesserung des Netzwerks Effizienz.

Import org.eclipse.paho.mqtv5.common.MqtProperties;
MqtConnectionOptionen=newMqttConnectionOptions();
MqtPropertiesproperties=newMqttProperties();
Eigenschaften.TopicAliasMaximum(10);
Optionen.setMqtProperties(Eigenschaften);
client.connect(optionen);

Eigenschaften, die mit MQTT PUBLISH verbunden sind, verbessern die Nachricht Umgang, Bereitstellung von Kontext oder Anweisungen für Broker und Kunden. Diese Eigenschaften, einschließlich Nachrichten-Ablaufintervalle und Themen-Aliase, optimieren die Nachrichten-Lieferung und Netzwerkbandbreite.

Anmeldung

Anmeldung zum Thema Filter

Um von anderen Clients veröffentlichte Daten zu erhalten, muss dieser Client abonnieren Sie ein passendes Thema Filter wie folgt:

Stringthema = "cmq/topic";
int qos = 1;
client.subscribe(topic, qos);

Dieses Thema Filter kann mit einem genauen Thema übereinstimmen oder es kann Wildcards wie # und +

Datenerfassung

Um Daten für die Abonnements zu erhalten, eine Callback-Funktion muss so definiert werden:

// Zurück zur Übersicht
client.setCallback (neue MqttCallback() {
@Override
Öffentlichkeitsarbeit Verlust (Wachstum) {
System.out.println("Connection lost: " +cause.getMessage());
Ursache.printStackTrace();
// Schalten Sie Reconnection oder andere Aktionen
}
@Override
öffentlich-rechtliche Nachricht Arrived(String-Thema, MqtMessage-Nachricht) wirft Ausnahme
System.out.println("Nachricht erhalten:");
System.out.println(" Thema: " + Thema);
System.out.println(" Nachricht: " + new String(message.getPayload());
System.out.println(" QoS: " + message.getQos());
System.out.println(" Enthalten: " + message.isRetained());
// Die empfangene Nachricht bearbeiten
}
@Override
Lieferung der öffentlichen Leere Komplett(IMqtDeliveryToken token) {
System.out.println("Delivery komplett: " + token.getMessage());
// Lieferungsbestätigung handhaben
}
})

Anmeldung von Themen

Um den Empfang von Updates von einem Thema zu stoppen, verwenden Sie den Code, der bereitgestellt wird, um abbestellen.

// Abonnieren des Themas
String topic="cmq/topic";
client.unsubscribe(topic);

Den Client deaktivieren

Stellen Sie sicher, dass die Verbindung Ihres Kunden mit dem Broker, um Probleme und Ressourcenlecks auf beiden Seiten zu vermeiden, wodurch System beibehalten Stabilität.

Verwenden Sie den folgenden Code, um den Client vom Broker zu trennen:

Versuch's!
Client.disconnect();
System.out.println("Disconnected from broker.");
} Fang (MqtException e) {\cHFFFF}
System.out.println("Error Trennen von Broker: " + e.getMessage());
e.printStackTrace();
}

Bauen Sie Ihre Business Logic

Sie haben die Möglichkeit, sich selbst zu entwickeln und anzupassen komplizierte Business-Logik in dieser Umgebung, passgenau auf Ihre spezifischen Bedürfnisse und Ziele.

Implementierung von Best Practices

Identifizierung des Kunden Management

Den einzelnen Geräten verschiedene Client-IDs zuweisen, um eine genaue Identifizierung zu gewährleisten. Für private Instanzen, Zuordnung einzigartiger IDs zu jedem Client; in gemeinsamen Umgebungen, fügen Sie einen zufälligen String an Garantie ID Einzigartigkeit.

Datenstrukturierung

Planen Sie Ihre Datenarchitektur proaktiv. Ob der Umgang mit Klartext, JSON-formatiert Daten oder numerische Werte, stellen Sie sicher, dass die Struktur effektiv mit Ihrer Anwendung spezifische Anforderungen.

Robuste Fehlerbehandlung

Implementieren Sie eine starke Fehlerverwaltung mit MQTT-Verbindung Fehler, Abonnementprobleme und Nachrichtenveröffentlichungsfehler effektiv.

Sicherung von Anmeldeinformationen

Schutz sensibler Informationen wie Benutzernamen, Passwörter und Client-IDs, indem Sie sie nicht in Ihrem Quellcode festcodieren. Umweltvariablen verwenden oder sichere Konfigurationsdateien statt.

Regelmäßige Prüfung und Überwachung

Kontinuierlich testen Sie MQTT-Kommunikation und überwachen Sie Client Metriken wie als Verbindungsstatus, Nachrichtendurchsatz und Fehlerraten schnell identifizieren und beheben Probleme.

Optimierung des Sitzungsmanagements

Wählen Sie zwischen sauberen und persistenten Sitzungen (`clean: true` oder `clean: false`) basierend auf Ihrer Notwendigkeit, Abonnements und gelöschte Nachrichten über Kundenverbindungen.

Reconnect auf Disconnect

Code hinzufügen, um zu versuchen, die Verbindung zum MQTT Broker, wenn es eine unerwartete Trennung. Dies wird sicherstellen, dass Ihr Kunde verbunden bleibt und nicht alle Daten verlieren.

Code herunterladen

Den vollständigen Code für den Client herunterladen, der Java MQTT Client verwendet Bibliothek mit unsererMQTT Brokeroder einen Broker Ihrer Wahl.

MQ3.1.1

Englisch mqtv3
Import org.eclipse.paho.client.mqtv3.*;
Import org.eclipse.paho.client.mqtv3.persist.MemoryPersistence;
javax.net.ssl.SSLSocketFactory importieren;
javax.net.ssl.SSLContext importieren;
Import java.security.cert.Certificate Fabrik,
Import java.security.cert.X509Certificate;
Import java.security.KeyStore;
java.io importieren. FileInputStream;
Import java.io.IOException;
Import java.security.cert.CertificateException;
Import java.security.NoSuchAlgorithmException;
javax.net.ssl.TrustManager Fabrik,
Import java.security.KeyManagementException;
java.util importieren. Random;
java.util importieren. Scanner;
Öffentliche Klasse MqttClientv3 {\cHFFFF}
öffentliche statische Leerstelle (String[] args) {
String client_id = "your_client_id";
String hostip = "public-mqtt-broker.bevywise.com";
boolean TLS = false; // true to Enable TLS
boolean AUTH = true;
String client_username = "Access-Key";
String client_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
String topic = "";
int qos = 0;
boolean behalten = falsch;
String BrokerUrl= "";
int port;
wenn(websocket_connection)
Port= TLS ? 11433:10433;
andere
port = TLS ? 8883 : 1883; // Verwenden Sie 8883 für SSL/TLS, 1883 für non-SSL

// Erstellen einer neuen MQTT-Client-Instanz
MqtClient Client;
wenn(websocket_connection)
brokerUrl=String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, port);
andere
Broker Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);

MemoryPersistenz = neue MemoryPersistence();
Versuch's!
Client = neuer MqtClient (Broker) Url, client_id, persistent);
// Callbacks einrichten
client.setCallback (neue MqttCallback() {
@Override
Öffentlichkeitsarbeit Verlust (Wachstum) {
System.out.println("Connection lost: " +cause.getMessage());
Ursache.printStackTrace();
}
@Override
öffentlich-rechtliche Nachricht Arrived(String-Thema, MqtNachricht) wirft Ausnahme {\cHFFFF}
System.out.println("Empfehlung zum Thema: " + Thema);
System.out.println("Message: " + new String(message.getPayload()));
}
@Override
Lieferung der öffentlichen Leere Komplett(IMqtDeliveryToken token) {
System.out.println("Message erfolgreich geliefert.");
}
})
// Verbindungsoptionen einstellen
MqtConnectOptions connOpts = neue MqttConnectOptions();
wenn (TLS)
Versuch's!
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Ersetzen Sie mit Ihrem Wurzelzertpfad
} Fang (Ausnahme e) {
System.out.println("Error Einrichten SSL-Sockelfabrik: " + e.getMessage()
e.printStackTrace();
}
}
wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Beispielthema abonnieren
Final String abonnierenTopic = "will_topic/device1";
// Anmeldung zu einem Thema
client.subscribe(subscribeTopic);
// Veröffentlichung von Nachrichten oder Anmeldungen/Beschreibungen basierend auf Benutzereingabe
Scanner = neuer Scanner (System.in);
während (wahr) {
System.out.println("1. Publish\n2. Abonn3. Disconnect\n4. Abonnieren");
int choice = scanner.nextInt();
scanner.nextLine(); // neue Linie verbrauchen
Schalter (Auswahl) {
Fall 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] Eingabe = Scanner.nextLine().split(" ");
Thema = Eingabe[0];
qos = Integer.parseInt(Eingang[1]);
behalten = Integer.parseInt(Eingang[2]) == 1;
int count = Integer.parseInt(input[3]);
für (in i = 0; i < count; i++) { String message="{\" temp\":" + (neu)
Random().nextInt(50) + 1) + ",\" Druck\":" + (neuer Random().nextInt(100) +
1) + ",\" Status\":" + (neuer Random().nextInt(2)) + "}" ; versuchen Sie {
client.publish(topic, message.getBytes(), qos, behalten);
System.out.println("Veröffentlichte Nachricht: " + Nachricht);
} Fang (MqtException e) {\cHFFFF}
System.out.println(" Fehlermeldung: " + e.getMessage());
e.printStackTrace();
}
Thread.sleep(6000); // 6 Sekunden
}
Bruch;
Fall 2:
System.out.print(" Eingabe Thema QoS(0/1/2): ");
String[] subInput = Scanner.nextLine().split(" ");
Thema = SubInput[0];
qos = Integer.parseInt(subInput[1]);
Versuch's!
client.subscribe(topic, qos);
System.out.println(" Abonniert zum Thema: " + Thema);
} Fang (MqtException e) {\cHFFFF}
System.out.println(" Fehler zum Thema: " + e.getMessage());
e.printStackTrace();
}
Bruch;
Fall 3:
Versuch's!
Client.disconnect();
System.out.println(" vom Broker getrennt." } Fang (MqtException e) { System.out.println("Error Trennen
von Broker: " + e.getMessage());
e.printStackTrace();
}
Bruch;
Fall 4:
System.out.print(" Thema zum Abmelden eingeben: ");
Thema = Scanner.nextLine();
Versuch's!
client.unsubscribe(topic);
System.out.println(" Abonniert vom Thema: " + Thema);
} Fang (MqtException e) {\cHFFFF}
System.out.println(" Fehler bei der Anmeldung vom Thema: " + e.getMessage()
e.printStackTrace();
}
Bruch;
Standard:
System.out.println(" Invalide Wahl."); } } } fang (MqtException) | InterruptedException e) { System.out.println("An
Fehler: " + e.getMessage());
e.printStackTrace();
}
}
// Methode zur Erstellung von SSL-Steckdosenfabrik (falls erforderlich)
Private statische SSLSocket Fabrik getSocket Fabrik(String caCrtFile) wirft Ausnahme
CertificateFactory cf = CertificateFactory.getInstance(" X.509"); DateiInputStream caInput=new
FileInputStream(caCrtFile); X509Certificate caCertificate=(X509Certificate)
cf.GenerierenCertificate(caInput); caInput.close(); KeyStore
keyStore=KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null) keyStore.setCertificateEntry("ca-certificate",
caCert); TrustManager Fabrik
tmf=TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore); SSLContext context=SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManager(), null); Rückgabe
kontext.getSocketFactory(); }

MQTT 5

Englisch mqtv5
Import org.eclipse.paho.mqtv5.client.IMqtClient;
Import org.eclipse.paho.mqtv5.client. MqtClient;
Import org.eclipse.paho.mqtv5.client.MqtConnectionOptionen;
Import org.eclipse.paho.mqtv5.client.persist.MemoryPersistence;
Import org.eclipse.paho.mqtv5.common.MqtException;
Import org.eclipse.paho.mqtv5.common.MqtMessage;
Import org.eclipse.paho.mqtv5.common.packet.MqtProperties;
Import org.eclipse.paho.mqtv5.client.MqtCallback;
Import org.eclipse.paho.mqtv5.client.MqtDisconnectResponse;
Import org.eclipse.paho.mqtv5.client.IMqtToken;
javax.net.ssl.SSLSocketFactory importieren;
javax.net.ssl.SSLContext importieren;
Import java.security.cert.Certificate Fabrik,
Import java.security.cert.X509Certificate;
Import java.security.KeyStore;
java.io importieren. FileInputStream;
Import java.io.IOException;
Import java.security.cert.CertificateException;
Import java.security.NoSuchAlgorithmException;
javax.net.ssl.TrustManager Fabrik,
javax.net.ssl.KeyManagerFactory importieren;
Import java.security.KeyManagementException;
java.util importieren. Liste
java.util importieren. Random;
java.util importieren. Scanner;
Öffentliche Klasse MqttClientv5 {\cHFFFF}
öffentliche statische Leere Haupt(String[] args) stürzt Ausnahme{
String client_id = "your_client_id";
String hostip = "public-mqtt-broker.bevywise.com";
boolean AUTH = true;
String client_username = "Access-Key";
String client_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
String topic = "";
int qos = 0;
boolean behalten = falsch;
String BrokerUrl= "";
int port;
wenn(websocket_connection)
Port = 10433;
andere
port = 1883; // Verwenden Sie 8883 für SSL/TLS, 1883 für non-SSL

// Erstellen einer neuen MQTT-Client-Instanz
MqtClient Client;
wenn(websocket_connection)
brokerUrl=String.format("%s://%s:%d", "ws", hostip, port);
andere
Broker Url = String.format("%s://%s:%d", "tcp", hostip, port);

MemoryPersistenz = neue MemoryPersistence();
Versuch's!
Client = neuer MqtClient (Broker) Url, client_id, persistent);
// Callbacks einrichten
client.setCallback (neue MqttCallback() {
@Override
öffentliches Leerzeichen getrennt(MqtDisconnectResponse trennenResponse) {
System.out.println("Disconnected from broker.");
}
@Override
öffentlichkeit mqtt ErrorOccurred(MqtException Ausnahme) {
System.out.println("MQTT Error: " + Except.getMessage());
Ausnahme.printStackTrace();
}
@Override
öffentlich-rechtliche Nachricht Arrived(String-Thema, MqtNachricht) wirft Ausnahme {\cHFFFF}
System.out.println("Empfehlung zum Thema: " + Thema);
System.out.println("Message: " + new String(message.getPayload()));
}
@Override
Lieferung der öffentlichen Leere Vollständig (IMqtToken token) {
System.out.println("Message erfolgreich geliefert.");
}
@Override
öffentliche leere authPacketArrived(innigen GrundCode, MqtProperties Eigenschaften) {\cHFFFF}
System.out.println("Auth Paket mit Grundcode eingetroffen: " + ReasonCode);
}
@Override
öffentliche LeerverbindungKomplette(boolean reconnect, String serverURI) {
System.out.println("Connected to broker: " + serverURI);
}
// Verbindungsoptionen einstellen
MqtConnectionOptionen connOpts = neue MqttConnectionOptionen();
wenn (TLS)
Versuch's!
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Ersetzen Sie mit Ihrem Wurzelzertpfad
} Fang (Ausnahme e) {
System.out.println("Error Einrichten SSL-Sockelfabrik: " + e.getMessage()
e.printStackTrace();
}
}
wenn (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.getBytes()); // nach Byte-Array umrechnen
}
connOpts.setCleanStart(true); // Clean start für MQTTv5
connOpts.setMaxInflight(50);
// MQTT-Broker verbinden
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Beispielthema abonnieren
Final String abonnierenTopic = "will_topic/device1";
// Anmeldung zu einem Thema
client.subscribe(subscribeTopic, qos);
// Veröffentlichung von Nachrichten oder Anmeldungen/Beschreibungen basierend auf Benutzereingabe
Scanner = neuer Scanner (System.in);
während (wahr) {
System.out.println("1. Publish\n2. Abonn3. Disconnect\n4. Abonnieren");
int choice = scanner.nextInt();
scanner.nextLine(); // neue Linie verbrauchen
Schalter (Auswahl) {
Fall 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] Eingabe = Scanner.nextLine().split(" ");
Thema = Eingabe[0];
qos = Integer.parseInt(Eingang[1]);
behalten = Integer.parseInt(Eingang[2]) == 1;
int count = Integer.parseInt(input[3]);
für (in i = 0; i < count; i++) { //String message="{\" temp\":" + (neu)
Random().nextInt(50) + 1) + ",\" Druck\":" + (neuer Random().nextInt(100) +
1) + ",\"-Status\":" + (neuer Random().nextInt(2)=================================== "ON": "OFF" ;
String message="hello world" ; MqtMessage mqtMessage=new
MqtMessage(message.getBytes()); mqtMessage.setQos(qos);
mqtMessage.setRetained(retain); System.out.println("Message: "+Nachricht;
client.publish(topic, mqtMessage);
System.out.println(" Paket: "+i+" - gesendet"); } Pause; Fall 2: System.out.print("Enter Topic QoS(0/1/2): ");
Eingabe = scanner.nextLine().split(" ");
Thema = Eingabe[0];
qos = Integer.parseInt(Eingang[1]);
client.subscribe(topic, qos);
Bruch;
Fall 3:
Client.disconnect();
System.out.println(" vom Broker getrennt."); Rückgabe; Fall 4: System.out.print("Enter Topic: ");
Thema = Scanner.nextLine();
client.unsubscribe(topic);
Bruch;
Standard:
System.out.println(" Invalide Wahl. Bitte versuchen Sie es wieder." (MqtException e) { System.out.println("Error
Anschluss an MQTT Broker: " + e.getMessage());
e.printStackTrace();
}
}
Private statische SSLSocket Fabrik getSocket Fabrik(String rootCertPath) wirft Ausnahme
CertificateFactory cf = CertificateFactory.getInstance(" X.509"); DateiInputStream fis=new
FileInputStream(rootCertPath); X509Certificate caCert=(X509Certificate)
cf.generierenCertificate(fis); fis.close(); KeyStore
ks=KeyStore.getInstance("JKS"); ks.load(null, null);
ks.setCertificateEntry("ca-certificate", caCert); TrustManager Fabrik
tmf=TrustManagerFactory.getInstance("SunX509"); tmf.init(ks); SSLContext
sslContext=SSLContext.getInstance("TLSv1.2"); sslContext.init(null,
tmf.getTrustManager(), null); sslContext.getSocketFactory(); }

Ausführender Bundle erstellen

Schritte zum Verbinden von MqttClientv3:

Compile:Javac - cp /path/to/org.eclipse.paho.client.mqtv31.35.jar MqttClientv3.java

Lauft!Java - cp .:/path/to/org.eclipse.paho.client.mqtv31.35.jar MqtClientv3

Lauf mit JAR:

  • mkdir -p build/lib build/classes build/META-INF/
  • cp -r MqttClientv3.class build/classes/
  • cp -r /path/of/org.eclipse.paho.client.mqtv31.35.jar build/lib/
  • einen Dateinamen MANIFEST erstellen. MF:
  • Manifest-Version: 1.0
  • Hauptklasse: MqttClientExample
  • Klasse-Path: lib/org.eclipse.paho.client.mqtv31.35.jar
  • die oben genannten Inhalte in diesem ,cp -r MANIFEST kopieren. MF-Bau/META-INF/
  • cd build jar cvfm MqttClientv3.jar META-INF/MANIFEST.MF -C Klassen . -C lib .
  • java -jar MqttClientv3.jar -- java -jar MqttClientv3.jar

Schritte zum Verbinden von MqttClientv5:

Compile:javac -cp /path/to/org.eclipse.paho.mqtv5.client1.35.jar MqttClientv5.java

Lauft!Java - cp .:/path/to/org.eclipse.paho.mqtv5.client-1.35.jar MqttClientv5 (oder) Lauf mit JAR:

mkdir -p build/lib build/classes build/META-INF/

cp -r MqttClientv3.class build/classes/

cp -r /path/of/org.eclipse.paho.mqtv5.client1.35.jar Bau/lib/

einen Dateinamen MANIFEST erstellen. MF:

Manifest-Version: 1.0 Hauptklasse: MqttClientExample Klasse-Path: lib/org.eclipse.paho.mqtv5.client1.35.jar

die oben genannten Inhalte darin kopieren, cp -r MANIFEST.MF baut/META-INF/

cd build jar cvfm MqttClientv5.jar META-INF/MANIFEST.MF -C Klassen . -C lib .

java -jar MqttClientv5.jar -- Verwenden Sie den Bau überall.

Verbinden Sie Ihren Kunden mit unserem hochmodernenMQTT Brokeroder jeder Broker Ihrer Wahl. Diese leistungsstarke Kombination sorgt für optimale Leistung und Zuverlässigkeit für alle Ihre Messaging-Anforderungen, den Weg für eine robuste und effiziente Systemintegration.