Configuración de un cliente MQTT Java

Texto copiado
Introducción

Esta documentación le guía a través del proceso de conectar un MQTT cliente a nuestroMQTT broker(CrysqlMQ) o cualquier corredor de tu elección usando la biblioteca de clientes de Eclipse Paho Java. It cubre la configuración de conexiones a través de TCP, puertos seguros y websockets, configuraciónautenticación MQTT, utilizar características avanzadas y realizar operaciones básicas como publicar, suscribirse, darse de baja y desconectar clientes.

Pre-requisitos

Asegúrese de que tiene:

  • JDK instalado (Java Development Kit)
  • Eclipse IDE (o cualquier IDE Java de su elección)
  • Cualquier instancia de corretaje MQTT que funcione o sea accesible

Instalación de dependencia

Creación del entorno de desarrollo

Configure su Medio ambiente Java para incluir el Eclipse Paho MQTT biblioteca cliente. Esta biblioteca facilitaMQTTcomunicación y puede ser integradas su proyecto Java a través de Maven o por la inclusión manual de la biblioteca.

Descargar JARS:

org.eclipse.paho.client.mqttv3-1.2.5.jar para Mqttv3 Cliente
org.eclipse.paho.mqttv5.client-1.2.5.jar para Mqttv5 Cliente

Conexión con MQTT Broker

Esta sección tiene fragmentos de código de varias maneras de conectarse a MQTT Broker. Asegúrese de que el MQTT Broker admite el tipo de conexión que le gustaría utilizar. Además, obtenga los parámetros de conexión correspondientes del MQTT Broker (Address, Port, Nombre de usuario/Password, certificado CA)

MQTT Over TCP

Utilice el siguiente código para conectar al cliente sobre TCP.

Define el ADDRESS Macro usando la conexión de MQTT Broker parámetros.

MQTT 3.1.1

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

TLS booleano = falso; // Habilitar TLS
UT = verdadero;
int port = TLS? 8883 : 1883;
MqttClient client;
Corredor de cuerdas Url = String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = nuevo MemoryPersistence();

cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

UT = verdadero;
int port = 1883;
MqttClient client;
Corredor de cuerdas Url = String.format("%s:%d", "tcp", hostip, port);
MemoryPersistence persistence = nuevo MemoryPersistence();

cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

MQTT sobre TLS / SSL

Utilice el siguiente código para conectarse de forma segura a MQTT Broker TLS.

Define el ADDRESS Macro usando la conexión de MQTT Broker parámetros.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

TLS booleano = verdadero;
UT = verdadero;
int port = TLS? 8883 : 1883;
MqttClient client;
Corredor de cuerdas Url = String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = nuevo MemoryPersistence();

cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

MqttConnectOptions connOpts = nuevo MqttConnectOptions();
si (TLS) {
Inténtalo.
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Reemplazamiento tu camino de certadura raíz
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
}
si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

Establecer parámetros TLS antes de llamar al MQTTClient_connect conectar el cliente al mQTT Broker de forma segura sobre TLS.

SiMQTT Brokerestá alojado en un servidor de confianza y el servidor No se requiere verificación, el siguiente código se puede utilizar para configurar Opciones TLS:

TLS booleano = verdadero; // Use TLS
int port = TLS ? 8883 : 1883;
MqttClient client;
Corredor de cuerdas Url = String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = nuevo MemoryPersistence();

Inténtalo.
cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

MqttConnectOptions connOpts = nuevo MqttConnectOptions();

si (TLS) {
(SocketFactory(SocketFactory.getDefault()); // Usar socket predeterminado fábrica
}

connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

Si el MQTT Broker tiene el certificado de servidor emitido por un CA con confianza, entonces el Certificado de Servidor se puede verificar utilizando:

TLS booleano = verdadero; // Use TLS
int port = TLS ? 8883 : 1883;
MqttClient client;
Corredor de cuerdas Url = String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = nuevo MemoryPersistence();

Inténtalo.
cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

MqttConnectOptions connOpts = nuevo MqttConnectOptions();

si (TLS) {
SSLSocketEnchufe facialFactory = createSSLSocketFactory();
si (socketFactory!= null) {
connOpts.setSocketFactory(socketFactory);
. ♫ ... {
System.out.println("Error creación de la fábrica de tomas SSL.");
retorno;
}
}

connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

// Continuar con las operaciones de MQTT...

} catch (MqtException e) {}
System.out.println("Error connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}
SSLSocketFactory createSSLSocketFactory() {}
Inténtalo.
// Cargar sus certificados de CA de confianza
KeyStore reliableStore = KeyStore.getInstance(KeyStore.getDefaultType());
// Cargar sus certificados de CA de confianza aquí
// trustStore.load(new FileInputStream("path_to_trusted_ca_cert"), "password" aCharArray());

// Crear e inicializar SSLContext
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null); // Utilice administradores de confianza predeterminados

volver sslContext.getSocketFactory();
} catch (Exception e) {
System.out.println("Error creación de la fábrica de tomas SSL: " + e.getMessage());
e.printStackTrace();
}
Retorno nulo;
}

Si el MQTT Broker tiene un certificado de servidor auto-firmado entonces el Certificado de Servidor se puede verificar utilizando el Certificado de Root obtenido del MQTT Broker:

// Cargar el certificado raíz
var rootCert = nuevo X509Certificate2("path_of _the_root_file");

// Configurar las opciones del cliente MQTT
var options = nuevo MqtClientOptionsBuilder()
. ConClientId(GenerateClientId("crystalmq_",10))
. ConTcpServer("public-mqt-broker.bevywise.com", 8883)
.ConCredentials("highlysecure", "N4xnpPTru43T8Lmk") // Agregar nombre de usuario y contraseña
.ConTls(nuevo MqttClientOptionsBuilderTlsParameters
{}
UseTls = verdadero,
Certificados = nueva lista efectuadaX509Certificado { rootCert},
AllowUntrustedCertificates = true, // Cadena de certificados deshabilitables validación
IgnoreCertificate ChainErrors = verdadero, // Ignorar errores de cadena de certificados
IgnoreCertificateRevocation Errores = verdadero // Ignorar la revocación del certificado errores
})
.Build();

// Crear cliente MQTT
var factory = nuevo MqttFactory();
var mqttClient = factory.CreateMqtClient();
mqttClient.UseConnectedHandler(async e =
{}
Consola.WriteLine("Connected successfully with MQTT broker.");

});
// Conectarse con el corredor MQTT
await mqttClient.ConnectAsync(options);

MQTT sobre WebSocket

Define la dirección MQTT Broker como esta para conectar al cliente WebSocket.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//For Version-5
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

int port;
String broker_Url= "";
TLS booleano = falso; //set true to Enable TLS
UT = verdadero;
boolean web_socket = true;
si (web_socket)
port= TLS? 11433: 10433;
más
port = TLS? 8883 : 1883;
MqttClient client;
si (web_socket)
brokerUrl= String.format("%s:%d", TLS ? "ws" : "ws", hostip, port);
más
broker Url = String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = nuevo MemoryPersistence();

cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

MqttConnectOptions connOpts = nuevo MqttConnectOptions();
si (TLS) {
Inténtalo.
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Reemplazamiento
tu camino de certadura raíz
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

MQTT sobre Secure WebSocket

Utilice el siguiente código para conectar al cliente sobre Secure WebSocket.

Set TLS Opciones dadas en la sección MQTT Más TLS.

Define el ADDRESS Macro usando la conexión de MQTT Broker parámetros.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//For Version-5
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

int port;
String broker_Url= "";
TLS booleano = verdadero; //set verdadero para Habilitar TLS
UT = verdadero;
boolean web_socket = true;
si (web_socket)
port= TLS? 11433: 10433;
más
port = TLS? 8883 : 1883;
MqttClient client;
si (web_socket)
brokerUrl= String.format("%s:%d", TLS ? "ws" : "ws", hostip, port);
más

brokerUrl =
String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port); MemoryPersistence persistence = nuevo MemoryPersistence();

cliente = nuevo MqtClient(broker) Url, client_id, persistencia);

MqttConnectOptions connOpts = nuevo MqttConnectOptions();
si (TLS) {
Inténtalo.
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Reemplazamiento
tu camino de certadura raíz
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

Configuración de MQTT Autenticación

Para conectarse a MQTT Broker que requiere MQTT Usuario y MQTT Contraseña para autenticación, añadir a nombre de usuario y contraseña a las opciones de conexión como esto:

si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

Características avanzadas

Configuración de la última voluntad

Configurar elÚltima Voluntad y Testamentofunción para especificar un mensaje que el corredor publicará si el cliente se desconecta inesperadamente. Esto ayuda a informar a otros suscriptores del estado del cliente desconectado.

Utilice el siguiente código para establecer Last Will en la conexión Opciones:

// Ejemplo de tema para suscribirse
Suscríbete final de StringTema = "will_topic/device1";

// Subscribir a un tema
cliente.subscribe(subscribeTema);

Ajuste Mantener Alive

MQTT mantiene conexiones cliente-broker con un mantener la calma mecanismo. Ajuste el intervalo de mantenimiento para controlar con qué frecuencia el cliente envía Mensajes PINGREQ al corredor.

Modifique el código de abajo para satisfacer sus requisitos:

connOpts.setKeepAlive(40);

Configuración de la Persistencia de Sesión

Datos de sesión de un cliente MQTT incluyen Suscripciones hecho por el Cliente y cualquier dato que el Cliente recibiría con QoS =0. El Cliente puede conseguir que el MQTT Broker almacene sus datos de sesión a través de las conexiones.

MQTT 3.1.1 Los clientes pueden establecer sesión limpia = 0 a solicitud the MQTT Broker to keep its session information stored across connections.

MQTT 3.1.1

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

MemoryPersistence persistence = nuevo MemoryPersistence();
Inténtalo.
cliente = nuevo MqtClient(broker) Url, client_id, persistencia);
MqttConnectOptions connOpts = nuevo MqttConnectOptions();
si (TLS) {
(SocketFactory(SocketFactory.getDefault()); // Uso por defecto fábrica de enchufe
}

connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
connOpts.setCleanSession(limpieza) // Establecer sesión limpia bandera

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");

// Continuar con las operaciones de MQTT...

} catch (MqtException e) {}
System.out.println("Error connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}

MQTT 5 Los clientes pueden establecer el inicio limpio = 0 y el gasto de sesión Interval = 'N' para solicitar al MQTT Broker que mantenga su información de sesión almacenada a través de conexiones para 'N' segundos.

MQTT 3.1.1

import org.eclipse.paho.client.mqttv5.*;
import org.eclipse.paho.client.mqttv5.persist.MemoryPersistence;

MQTT 3.1.1

MemoryPersistence persistence = nuevo MemoryPersistence();

Inténtalo.
cliente = nuevo MqttAsyncClient(brokerUrl, client_id, persistencia);

MqttConnectOptions connOpts = nuevo MqttConnectOptions();
si (TLS) {
(SocketFactory(SocketFactory.getDefault()); // Uso por defecto fábrica de enchufe
}
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
connOpts.setCleanStart(cleanStart); // Set Clean Start flag
connOpts.setSessionExpiryInterval(sessionExpiryInterval); // Set Período de sesiones Intervalo de carga

// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
IMqttToken connectToken = cliente.connect(connOpts);
connectToken.waitForCompletion();
System.out.println("Connected");

// Continuar con las operaciones de MQTT...
} catch (MqtException e) {}
System.out.println("Error connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}

Configuración Máxima Tamaño del paquete

MQTT5 Client puede solicitar el MQTT Broker para enviar solo datos paquetes de menos de un tamaño específico configurando así:

MQTT 3.1.1

connOpts.setMaximumPacketSize(256);

Publish

Envío de datos

Distribución eficiente de datos a múltiples suscriptores mediante publicación a temas designados con el siguiente fragmento de código:

MQTT 3.1.1

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

Tema de cuerda = "cmq/topic";
Mensaje de cuerda = "hola mundo"
int qos = 1;
retenimiento booleano = falso;
cliente.publish(topic, message.getBytes(), qos, keep);

MQTT 5

import org.eclipse.paho.client.mqttv5.*;
import org.eclipse.paho.client.mqttv5.persist.MemoryPersistence;

// Publicar mensaje
Tema de cuerda = "cmq/topic";
Mensaje de cuerda = "mundo de ayuda";
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(1);
mqttMessage.setRetained(false);

// Publicar mensaje asincrónicamente
IMqttDeliveryToken publishToken = client.publish(topic, mqttMesage);
publicarToken.waitForCompletion();
System.out.println("Message published");

Configuración de mensajes retenidos

Permitir la bandera de retención al publicar un mensaje para asegurar broker almacena el último mensaje para cada tema. Esto garantiza que los nuevos suscriptores recibir el mensaje más reciente al conectarse.

Para implementar esto, utilice el siguiente fragmento de código:

retenimiento booleano = falso;
cliente.publish(topic, message.getBytes(), qos, keep);

Especificación de los niveles de QoS

MQTT proporciona tres niveles de calidad de servicio (QoS) Entrega del mensaje:

  • QoS 0 (Al menos una vez)
  • QoS 1 (Al menos una vez)
  • QoS 2 (Exactamente una vez)

Especifique el nivel QoS requerido al publicar mensajes MQTT utilizando este código:

int qos = 1;
cliente.publish(topic, message.getBytes(), qos, keep);

Intervalo de viaje

La propiedad 'intervalo de caducidad del mensaje' establece la vida del mensaje en segundos; si no se entrega dentro de este tiempo, el corredor lo descarta. MQTT5 admite esta función. MQTT5 Los clientes pueden configurar esto mientras publican datos.

MQTT 5

import org.eclipse.paho.mqttv5.common.MqtProperties;
Tema de cuerda = "topic1";
Mensaje de cuerdaContent= "hola mundo";
int qos = 1;
retenimiento booleano = falso;
MqttMessagemessage=newMqtMessage(messageContent.getBytes());
message.setQos(qos);
message.setRetained(retain);
MqttPropertiesproperties=newMqtProperties(); propiedades.setMessageExpiryInterval(60L);
Mensaje.setPropiedades (propiedades);
client.publish(topic, message);

Tema Alias

La propiedad 'Topic Alias' permite a los clientes utilizar un breve alias en lugar de un nombre completo del tema, reduciendo el tamaño del paquete de mensajes y mejorando la red eficiencia.

import org.eclipse.paho.mqttv5.common.MqtProperties;
MqttConnectionOptions options=newMqttConnectionOptions();
MqttPropertiesproperties=newMqtProperties();
propiedades.setTopicAliasMaximum(10);
options.setMqtProperties(propiedades);
cliente.connect(options);

Propiedades asociadas con MQTT PUBLISH realzar mensaje manejar, proporcionar contexto o instrucciones para los corredores y clientes. Estas propiedades, incluyendo intervalos de expiración de mensajes y alias tópicos, optimizar la entrega de mensajes y ancho de banda de red.

Suscríbete

Suscribirse al filtro del tema

Para recibir datos publicados por otros clientes, este cliente tiene que Suscríbete a un Filtro de Tema igualado como este:

Tema de cuerda = "cmq/topic";
int qos = 1;
cliente.subscribe(topic, qos);

Este filtro de tema puede coincidir con un tema exacto o puede tener como # y +

Datos de recepción

Para recibir los datos enviados para las suscripciones, una función callback debe definirse así:

// Retroceder
client.setCallback(new MqttCallback() {
@Override
conexión de vacío público Perdido(Causa potencial) {
System.out.println("Connection lost: " + cause.getMessage());
cause.printStackTrace();
// Reconexión manual u otras acciones
}
@Override
mensaje de vacío público Llegada (Tema de cuerda, mensaje de MqttMessage) lanza Excepción
System.out.println("Message received:");
System.out.println(" Tema: " + tema);
System.out.println(" Mensaje: " + nuevo String(message.getPayload()));
System.out.println(" QoS: " + message.getQos());
System.out.println(" Retenido: " + message.isRetained());
// Procesar el mensaje recibido
}
@Override
públicos Complete(IMqttDeliveryToken token) {
System.out.println("Delivery complete: " + token.getMessage());
// Confirmación de entrega de mano
}
});

Suscripción de Temas

Para dejar de recibir actualizaciones de un tema, utilice el código proporcionado a Suscribe.

// Unsubscribe del topicawait
Tema de cuerda="cmq/topic";
cliente.unsubscribe(topic);

Desconectar al cliente

Asegurar una terminación adecuada de la conexión de su cliente con la broker to avoid issues and resource leaks on both sides, thereby maintaining system estabilidad.

Utilice el siguiente código para desconectar al cliente del broker:

Inténtalo.
cliente.disconnect();
System.out.println("Disconnected from broker.");
} catch (MqtException e) {}
System.out.println("Error desconectado del corredor: " + e.getMessage());
e.printStackTrace();
}

Construcción de su lógica de negocio

Usted tiene la oportunidad de desarrollar y personalizar su propio lógica de negocio intrincada dentro de este entorno, adaptándolo precisamente a su necesidades y objetivos.

Aplicación de las mejores prácticas

Identificación del cliente Gestión

Assign distinct client IDs to individual devices to ensure accurate identification. Para particulares, asignar IDs únicas a cada cliente; en entornos compartidos, anexar una cadena aleatoria a garantía de identificación única.

Data Structuring

Planifique su arquitectura de datos proactivamente. Ya sea manejando texto plano, formateado por JSON datos, o valores numéricos, asegúrese de que la estructura se alinea con eficacia con la aplicación requisitos específicos.

Manejo de error robusto

Implementar una gestión de errores sólida para manejar la conexión MQTT fallas, problemas de suscripción y errores de publicación de mensajes eficazmente.

Verificación de credenciales

Guardar información confidencial como nombres de usuario, contraseñas y IDs de cliente por no codificación dura en su código fuente. Use variables ambientales o archivos de configuración seguros en su lugar.

Pruebas periódicas " Monitoreo

Prueba continuamente la comunicación del MQTT y monitorea las métricas del cliente como estado de conexión, transmisión de mensajes y tasas de error para identificar y corregir rápidamente cuestiones.

Optimización de la gestión de las sesiones

Elija entre sesiones limpias y persistentes (`limpia: verdadera` o `claran: false`) basado en su necesidad de retener suscripciones y mensajes apagados a través de conexiones cliente.

Reconnect on Disconnect

Agregar código para intentar la reconexión al MQTT Broker cuando hay un desconexión inesperada. Esto asegurará que su cliente permanezca conectado y no perder datos.

Descargar Código

Descargar el código completo para el cliente que utiliza Java MQTT Client Biblioteca para conectar con nuestraMQTT brokero cualquier agente de su elección.

MQTT 3.1.1

//paho mqttv3
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
importar javax.net.ssl.SSLSocketFactory;
importar javax.net.ssl.SSLContext;
importar java.security.cert.Certificate Fábrica;
importar java.security.cert.X509Certificate;
importar java.security.KeyStore;
importa java.io. FileInputStream;
importar java.io.IOExcepcion;
importar java.security.cert.CertificateException;
importar java.security.NoSuch AlgorithmException;
importar javax.net.ssl.TrustManager Fábrica;
importar java.security.KeyManagementException;
importa java.util. Random;
importa java.util. Scanner;
clase pública MqttClientv3 {}
public static void main(String[] args) {
String client_id = "tu_client_id";
Hostip de cuerda = "public-mqtt-broker.bevywise.com";
TLS booleano = falso; / / / se hace realidad para Habilitar TLS
UT = verdadero;
String client_username = "Access-Key";
Criar cliente_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
Tema de cuerda = ";
int qos = 0;
retenimiento booleano = falso;
Corredor de cuerdasUrl= "";
int port;
si (websocket_connection)
port= TLS ? 11433:10433;
más
port = TLS ? 8883 : 1883; // Use 8883 for SSL/TLS, 1883 for non-SSL

// Creación de una nueva instancia cliente MQTT
MqttClient client;
si (websocket_connection)
brokerUrl=String.format("%s:%d", TLS ? "ws" : "ws", hostip, port);
más
broker Url = String.format("%s:%d", TLS ? "ssl" : "tcp", hostip, port);

MemoryPersistence persistence = nuevo MemoryPersistence();
Inténtalo.
cliente = nuevo MqtClient(broker) Url, client_id, persistencia);
// Ajuste de los callbacks
client.setCallback(new MqttCallback() {
@Override
conexión de vacío público Perdido(Causa potencial) {
System.out.println("Connection lost: " + cause.getMessage());
cause.printStackTrace();
}
@Override
mensaje de vacío público Llegada (Tema de cuerda, mensaje de MqttMessage) lanza Excepción {}
System.out.println("Recibido mensaje sobre el tema: " + tema);
System.out.println("Message: " + nuevo String(message.getPayload()));
}
@Override
públicos Complete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully.");
}
});
// Opciones de conexión de configuración
MqttConnectOptions connOpts = nuevo MqttConnectOptions();
si (TLS) {
Inténtalo.
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Reemplazar con su camino de certadura raíz
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
}
si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");
// Ejemplo de tema para suscribirse
Suscríbete final de StringTema = "will_topic/device1";
// Subscribir a un tema
cliente.subscribe(subscribeTema);
// Publicar mensajes o subscribir/unsubscribir basado en la entrada del usuario
Scanner Scanner = nuevo Scanner (System.in);
mientras (verdad) {
System.out.println("1. Publish\n2. Subscribe\n3. Disconnect\n4. Unsubscribe");
int choice = scanner.nextInt();
scanner.nextLine(); // consume newline
interruptor (elección) {
Caso 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] entrada = escáner.nextLine().split(" ");
tópico = entrada[0];
qos = Integer.parseInt(input[1]);
retener = Integer.parseInt(input[2]) == 1;
int count = Integer.parseInt(input[3]);
para (int i = 0; i < count; i++) { String message="{\" temp\":" + (nuevo)
Random().nextInt(50) + 1) + ",\" presión\":" + (nuevo Random().nextInt(100) +
1) + ",\" estado\":" + (nuevo Random().nextInt(2)) + "}"
cliente.publish(topic, message.getBytes(), qos, keep);
System.out.println("Mensaje publicado: " + mensaje);
} catch (MqtException e) {}
System.out.println(" Mensaje de publicación de errores: " + e.getMessage());
e.printStackTrace();
}
Thread.sleep(6000); // 6 segundos
}
ruptura;
Caso 2:
System.out.print(" Enter Topic QoS(0/1/2): ");
String[] subInput = scanner.nextLine().split(");
tópico = subinput[0];
qos = Integer.parseInt(subInput[1]);
Inténtalo.
cliente.subscribe(topic, qos);
System.out.println(" Suscrito al tema: " + tema);
} catch (MqtException e) {}
System.out.println(" Error que se suscribe al tema: " + e.getMessage());
e.printStackTrace();
}
ruptura;
Caso 3:
Inténtalo.
cliente.disconnect();
System.out.println(" Desconectado del corredor". } catch (MqtException e) { System.out.println()Error desconexión
de corredor: " + e.getMessage());
e.printStackTrace();
}
ruptura;
Caso 4:
System.out.print() Introduzca el tema para darse de baja: ");
tópico = escáner.nextLine();
Inténtalo.
cliente.unsubscribe(topic);
System.out.println(" Describido del tema: " + tema);
} catch (MqtException e) {}
System.out.println(" Error que suscribe del tema: " + e.getMessage());
e.printStackTrace();
}
ruptura;
default:
System.out.println(" Elección inválida."); } } captura (MqttException Silencio InterruptedException e) { System.out.println("An
error ocurrido: " + e.getMessage());
e.printStackTrace();
}
}
// Método para crear fábrica de tomas SSL (si es necesario)
SSLSocket privado Factory getSocket Fábrica (CrtFile de cuerda) lanza Excepción
CertificadoFactory cf = CertificadoFactory.getInstance(" X.509"); FileInputStream caInput=new
FileInputStream(caCrtFile); X509Certificate caCert=(X509Certificate)
cf.generateCertificate(caInput); caInput.close(); KeyStore
keyStore=KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null); keyStore.setCertificateEntry("ca-certificate",
caCert); TrustManager Factory
tmf=TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore); SSLContext context=SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null); return
context.getSocketFactory(); }

MQTT 5

//paho mqttv5
import org.eclipse.paho.mqttv5.client.IMqtClient;
import org.eclipse.paho.mqttv5.client. MqttClient;
import org.eclipse.paho.mqttv5.client.MqtConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqtException;
importa org.eclipse.paho.mqttv5.common.MqttMesage;
import org.eclipse.paho.mqttv5.common.packet.MqtProperties;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqtDisconnectResponse;
importar org.eclipse.paho.mqttv5.client.IMqttoToken;
importar javax.net.ssl.SSLSocketFactory;
importar javax.net.ssl.SSLContext;
importar java.security.cert.Certificate Fábrica;
importar java.security.cert.X509Certificate;
importar java.security.KeyStore;
importa java.io. FileInputStream;
importar java.io.IOExcepcion;
importar java.security.cert.CertificateException;
importar java.security.NoSuch AlgorithmException;
importar javax.net.ssl.TrustManager Fábrica;
importar javax.net.ssl.KeyManagerFactory;
importar java.security.KeyManagementException;
importa java.util. Lista;
importa java.util. Random;
importa java.util. Scanner;
clase pública MqttClientv5 {}
vacío estático público principal(String[] args)throws Excepción{
String client_id = "tu_client_id";
Hostip de cuerda = "public-mqtt-broker.bevywise.com";
UT = verdadero;
String client_username = "Access-Key";
Criar cliente_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
Tema de cuerda = ";
int qos = 0;
retenimiento booleano = falso;
Corredor de cuerdasUrl= "";
int port;
si (websocket_connection)
port= 10433;
más
port = 1883; // Use 8883 for SSL/TLS, 1883 for non-SSL

// Creación de una nueva instancia cliente MQTT
MqttClient client;
si (websocket_connection)
brokerUrl=String.format("%s:%d", "ws", hostip, port);
más
broker Url = String.format("%s:%d", "tcp", hostip, port);

MemoryPersistence persistence = nuevo MemoryPersistence();
Inténtalo.
cliente = nuevo MqtClient(broker) Url, client_id, persistencia);
// Ajuste de los callbacks
client.setCallback(new MqttCallback() {
@Override
vacío público desconectado(MqttDisconnectResponse disconnectResponse) {
System.out.println("Disconnected from broker.");
}
@Override
vacío público mqtt ErrorOccurrido(Excepción MqtException) {
System.out.println("MQTT Error: " + excepción.getMessage());
excepción.printStackTrace();
}
@Override
mensaje de vacío público Llegada (Tema de cuerda, mensaje de MqttMessage) lanza Excepción {}
System.out.println("Recibido mensaje sobre el tema: " + tema);
System.out.println("Message: " + nuevo String(message.getPayload()));
}
@Override
públicos Complete(IMqttToken token) {
System.out.println("Message delivered successfully.");
}
@Override
public void authPacketArrived(int reasonCode, MqtProperties properties) {}
System.out.println("Auth packet llegó con el código de la razón: " + reasonCode);
}
@Override
conexión de vacío públicoCompleta(reconexión de boomlean, servidor de String) {
System.out.println("Connected to broker: " + serverURI);
}
// Opciones de conexión de configuración
MqttConnectionOptions connOpts = nuevo MqttConnectionOptions();
si (TLS) {
Inténtalo.
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Reemplazar con su camino de certadura raíz
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
}
si
connOpts.setUserName(client_username);
connOpts.setPasword(client_password.getBytes()); // Convertir en byte array
}
connOpts.setCleanStart(verdad); // Inicio limpio para MQTTv5
connOpts.setMaxInflight(50);
// Conectarse con el corredor MQTT
System.out.println("Connecting to broker: " + brokerUrl);
cliente.connect(connOpts);
System.out.println("Connected");
// Ejemplo de tema para suscribirse
Suscríbete final de StringTema = "will_topic/device1";
// Subscribir a un tema
cliente.subscribe(subscribeTema, qos);
// Publicar mensajes o subscribir/unsubscribir basado en la entrada del usuario
Scanner Scanner = nuevo Scanner (System.in);
mientras (verdad) {
System.out.println("1. Publish\n2. Subscribe\n3. Disconnect\n4. Unsubscribe");
int choice = scanner.nextInt();
scanner.nextLine(); // consume newline
interruptor (elección) {
Caso 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] entrada = escáner.nextLine().split(" ");
tópico = entrada[0];
qos = Integer.parseInt(input[1]);
retener = Integer.parseInt(input[2]) == 1;
int count = Integer.parseInt(input[3]);
para (int i = 0; i < count; i++) { //String message="{\" temp\":" + (nueva)
Random().nextInt(50) + 1) + ",\" presión\":" + (nuevo Random().nextInt(100) +
1) + ",\" estado\":" + (nuevo Random().nextInt(2)=0 "ON" : "OFF" ) + "}" ;
Mensaje de String="mundohello" ; MqttMessage mqttMesage=new
MqttMessage(message.getBytes()); mqttMessage.setQos(qos);
mqttMessage.setRetained(retain); System.out.println("Mesage: "+mensaje);
cliente.publish(topic, mqttMessage);
System.out.println(" Packet: "+i+" - sent"); } break; case 2: System.out.print("Enter Topic QoS(0/1/2): ");
entrada = escáner.nextLine().split(");
tópico = entrada[0];
qos = Integer.parseInt(input[1]);
cliente.subscribe(topic, qos);
ruptura;
Caso 3:
cliente.disconnect();
System.out.println(" Desconectado del corredor."); retorno; caso 4: System.out.print("Enter Topic: ");
tópico = escáner.nextLine();
cliente.unsubscribe(topic);
ruptura;
default:
System.out.println(" Elección inválida. Por favor, inténtelo de nuevo. (MqttException e) { System.out.println("Error
conectar a MQTT broker: " + e.getMessage());
e.printStackTrace();
}
}
SSLSocket privado Factory getSocket Fábrica(Arrastre de cuerda) lanza Excepción
CertificadoFactory cf = CertificadoFactory.getInstance(" X.509"); FileInputStream fis=new
FileInputStream(rootCertPath); X509Certificate caCert=(X509Certificate)
cf.generateCertificate(fis); fis.close(); KeyStore
ks=KeyStore.getInstance("JKS"); ks.load(null, null);
ks.setCertificateEntry("ca-certificate", caCert); TrustManager Factory
tmf=TrustManagerFactory.getInstance("SunX509"); tmf.init(ks); SSLContext
sslContext=SSLContext.getInstance("TLSv1.2"); sslContext.init(null,
tmf.getTrustManagers(), null); volver sslContext.getSocketFactory(); } }

Crear un Bundle ejecutable

Pasos para conectar MqtClientv3:

Compile:javac -cp /path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3.java

Corre:java -cp .:/path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3

o) Corre con JAR:

  • mkdir -p build/lib build/classes build/META-INF/
  • cp -r MqtClientv3.class build/classes/classes/
  • cp -r /path/of/org.eclipse.paho.client.mqttv3-1.2.5.jar build/lib/
  • crear un nombre de archivo MANIFEST. MF:
  • Manifiesto-Versión: 1.0
  • Clase principal: MqtClientExample
  • Class-Path: lib/org.eclipse.paho.client.mqttv3-1.2.5.jar
  • copiar el contenido anterior en ese ,cp -r MANIFEST. MF build/META-INF/
  • cd build tar cvfm MqtClientv3.jar META-INF/MANIFEST.MF -C classes . -C lib .
  • java -jar MqtClientv3.jar - java -jar MqtClientv3.jar

Pasos para conectar MqtClientv5:

Compile:javac -cp /path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5.java

Corre:java -cp .:/path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5 (o) Run with JAR:

mkdir -p build/lib build/classes build/META-INF/

cp -r MqtClientv3.class build/classes/classes/

cp -r /path/of/org.eclipse.paho.mqttv5.client-1.2.5.jar build/lib/

crear un nombre de archivo MANIFEST. MF:

Manifiesto-Versión: 1.0 Clase principal: MqtClientExample Class-Path: lib/org.eclipse.paho.mqttv5.client-1.2.5.jar

copiar el contenido anterior en que, cp -r MANIFEST.MF build/META-INF/

cd build tar cvfm MqtClientv5.jar META-INF/MANIFEST.MF -C classes . -C lib .

java -jar MqtClientv5.jar Usa la construcción en cualquier lugar.

Conecta a tu cliente con nuestro estado de arteMQTT brokero cualquier corredor de su elección. Esta potente combinación garantizará un rendimiento óptimo y fiabilidad para todas sus necesidades de mensajería, allanando el camino para una robusta y eficiente integración del sistema.