Java MQTTクライアントの設定

テキストコピッド
導入事例

このドキュメントは、接続のプロセスを通してあなたを導きます MQTTクライアントMQTTブローカー(CrysqlMQ) またはブローカー Eclipse Paho Java クライアントライブラリを使用して選択します。 それは、 TCP、セキュアポート、Websockets、設定による接続の設定MQTT認証, 高度な機能を活用し、以下のような基本的な操作を実行 クライアントの公開、購読、退会、および切断。

前提条件

あなたが持っていることを確認してください:

  • JDKインストール(Java開発キット)
  • Eclipse IDE (または任意のJava IDE)
  • MQTTブローカーインスタンスの実行またはアクセス可能

依存性インストール

開発環境の設定

設定する Eclipse Paho MQTT を含む Java 環境 クライアントライブラリ。 このライブラリは、MQTTの特長コミュニケーションはでき、 一体化 Maven または手動ライブラリのインクルードによる Java プロジェクト。

JARSをダウンロード:

Mqttv3の顧客のためのorg.eclipse.paho.client.mqttv3-1.2.5.jar
Mqttv5クライアントのためのorg.eclipse.paho.mqttv5.client-1.2.5.jar

MQTTブローカーに接続する

このセクションには、MQTT に接続するさまざまな方法のコードスニペットがあります ブローカー。 MQTT ブローカーが使用する接続タイプをサポートしていることを確認してください。 また、MQTTブローカー(アドレス、ポート、)の対応する接続パラメータを取得 ユーザー名/パスワード、CA証明書

MQTTの特長 TCP で

クライアントを TCP に接続するために、次のコードを使用します。

MQTTブローカーの接続を使用してマクロアドレスを定義する パラメータ。

MQTT 3.1.1の特長

インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

boolean TLS = false; // TLS を有効にする
boolean AUTH = true;
int ポート = TLS? 8883 : 1883;
MqttClientクライアント
ストリングブローカー Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, ポート;
MemoryPersistence の永続性 = new MemoryPersistence();

クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

インポートorg.eclipse.paho.mqttv5.client.*
インポートorg.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

boolean AUTH = true;
int ポート = 1883;
MqttClientクライアント
ストリングブローカー Url = 文字列.format("%s://%s:%d", "tcp", hostip, port); で指定します。
MemoryPersistence の永続性 = new MemoryPersistence();

クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

TLS/SSL上のMQTT

以下のコードを使用して、MQTT ブローカーに接続します。

MQTTブローカーの接続を使用してマクロアドレスを定義する パラメータ。

インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

boolean TLS = true;
boolean AUTH = true;
int ポート = TLS? 8883 : 1883;
MqttClientクライアント
ストリングブローカー Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, ポート;
MemoryPersistence の永続性 = new MemoryPersistence();

クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

MqttConnectOptions connOpts = 新しいMqttConnectOptions();
お問い合わせ
お問い合わせ
connOpts.setSocketFactory(getSocketFactory("root.crt")); // 置換 ルート証明書パス
お問い合わせ
System.out.println(「SSLソケット工場の設定エラー」+ e.getMessage() );
e.printStackTrace() ;
お問い合わせ
お問い合わせ
お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

MQTTClient_connect を呼び出す前に TLS パラメータを設定する クライアントを TLS 上で安全に mQTT ブローカーに接続します。

もし、MQTTの特長 ブローカー信頼できるサーバーおよびサーバーでホストされます 認証は必要ありません。以下のコードは TLS オプションを設定するために使用できます。

boolean TLS = true; // TLS を使う
int ポート = TLS ? 8883 : 1883;
MqttClientクライアント
ストリングブローカー Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, ポート;
MemoryPersistence の永続性 = new MemoryPersistence();

お問い合わせ
クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

MqttConnectOptions connOpts = 新しいMqttConnectOptions();

お問い合わせ
connOpts.setSocketFactory(ソケットファクトリー)。 // // // // デフォルトソケットを使用する 本社工場
お問い合わせ

connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

MQTT ブローカーに Trusted CA から発行された Server 証明書がある場合、 その後、サーバー証明書は、次の方法で検証できます。

boolean TLS = true; // TLS を使う
int ポート = TLS ? 8883 : 1883;
MqttClientクライアント
ストリングブローカー Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, ポート;
MemoryPersistence の永続性 = new MemoryPersistence();

お問い合わせ
クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

MqttConnectOptions connOpts = 新しいMqttConnectOptions();

お問い合わせ
SSLSocketFactory ソケットファクトリー = CreateSSLSocketFactory();
もし (socketFactory != null) {
connOpts.setSocketFactory(ソケットファクトリー)
お問い合わせ
System.out.println(「SSLソケット工場作成エラー」)
リターン;
お問い合わせ
お問い合わせ

connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

// MQTT 操作で続けて...

お問い合わせ お問い合わせ
System.out.println("MQTTブローカーに接続するエラー:" + e.getMessage() );
e.printStackTrace() ;
お問い合わせ
プライベート静的SSLSocketFactory作成SSLSocketFactory() お問い合わせ
お問い合わせ
// 信頼できるCA証明書をロードする
キーストアは、KeyStore.getInstance(KeyStore.getDefaultType())) を信頼しています。
// 信頼できるCA証明書をここに読み込む
// 信頼できるStore.load(新しいFileInputStream("path_to_trusted_ca_cert")、 "password".toCharArray() );

// SSLContext の作成と初期化
SSLContext sslContext = SSLContext.getInstance(「TLS」) );
sslContext.init(null、null、null)。 // // // // デフォルトの信頼管理者を使用する

sslContext.getSocketFactory(); を返す
お問い合わせ
System.out.println(「SSLソケット工場作成のエラー」) e.getMessage() );
e.printStackTrace() ;
お問い合わせ
null を返す。
お問い合わせ

MQTTブローカーに自己署名されたサーバー証明書がある場合 サーバー証明書は、MQTTから入手したルート証明書を使用して検証することができます ブローカー:

// ルート証明書をロードする
var rootCert = 新しい X509Certificate2("path_of _the_root_file");

// MQTT クライアントオプションの設定
var オプション = 新しい MqttClientOptionsBuilder()
. . withClientId(GenerateClientId("crystalmq_",10) )
. . © 2019 www.securehotelsreservations.com 著作権所有
.WithCredentials("highlysecure", "N4xnpPTru43T8Lmk") // ユーザー名を追加 パスワード
.WithTls (新しいMqttClientOptionsBuilderTlsParameters)
お問い合わせ
useTls = true,
証明書 = 新しいリスト<X509証明書> お問い合わせ
AllowUntrustedCertificates = true, // ディレクティブ 無効な証明書チェーン 検証
無視証明書 ChainErrors = true, // エラー 証明書チェーンエラーを無視する
IgnoreCertificateの取消 エラー = true // 証明書の取消 エラー
お問い合わせ
.ビルド();

// MQTT クライアントを作成する
var 工場 = 新しい MqttFactory();
var mqttClient = factory.CreateMqttClient() ;
mqttClient.UseConnectedHandler(非同期e =>)
お問い合わせ
Console.WriteLine("MQTTブローカーと正常に接続)。

お問い合わせ
// MQTT ブローカーに接続する
mqttClient.ConnectAsync(options) を待ちます。

WebSocket 経由で MQTT

このようなMQTTブローカーアドレスを定義して、クライアントを経由して接続します ウェブソケット。

インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//Version-5 の場合
インポートorg.eclipse.paho.mqttv5.client.*
インポートorg.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

intの港;
ストリングブローカー_Url= "";
boolean TLS = false; //true を TLS を有効にします。
boolean AUTH = true;
boolean web_socket = true;
もし(web_socket)
port= TLS? 11433:10433;
その他
ポート = TLS? 8883 : 1883;
MqttClientクライアント
もし(web_socket)
BrokerUrl= String.format("%s://%s:%d", TLS ? "ws" : "ws", hostip, ポート;
その他
ブローカー Url = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, ポート;
MemoryPersistence の永続性 = new MemoryPersistence();

クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

MqttConnectOptions connOpts = 新しいMqttConnectOptions();
お問い合わせ
お問い合わせ
connOpts.setSocketFactory(getSocketFactory("root.crt")); // 置換
ルート証明書パス
お問い合わせ
System.out.println(「SSLソケット工場の設定エラー」+
e.getMessage() );
e.printStackTrace() ;
お問い合わせ
お問い合わせ
お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

セキュアなWebSocket上MQTT

以下のコードを使用して、クライアントをセキュアに接続します。 ウェブソケット。

TLSの設定 TLS セクションで MQTT で指定されたオプション。

MQTTブローカーの接続を使用してマクロアドレスを定義する パラメータ。

インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//Version-5 の場合
インポートorg.eclipse.paho.mqttv5.client.*
インポートorg.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

intの港;
ストリングブローカー_Url= "";
boolean TLS = true; //true は TLS を有効にします
boolean AUTH = true;
boolean web_socket = true;
もし(web_socket)
port= TLS? 11433:10433;
その他
ポート = TLS? 8883 : 1883;
MqttClientクライアント
もし(web_socket)
BrokerUrl= String.format("%s://%s:%d", TLS ? "ws" : "ws", hostip, ポート;
その他

ブローカーUrl =
文字列.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, ポート; MemoryPersistence の永続性 = new MemoryPersistence();

クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;

MqttConnectOptions connOpts = 新しいMqttConnectOptions();
お問い合わせ
お問い合わせ
connOpts.setSocketFactory(getSocketFactory("root.crt")); // 置換
ルート証明書パス
お問い合わせ
System.out.println(「SSLソケット工場の設定エラー」+
e.getMessage() );
e.printStackTrace() ;
お問い合わせ
お問い合わせ
お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

MQTT の設定 認証

MQTT ユーザー名と MQTT を必要とする MQTT ブローカーに接続するには 認証用のパスワード、ユーザー名、パスワードなどの接続オプションに追加 これ:

お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);

高度な機能

最終ウィル&テストの設定

設定する最終ウィルとテスト機能を指定する ブローカーは、クライアントが予期しない接続を解除した場合、ブローカーが公開されるメッセージ。 お問い合わせ 接続されていないクライアントのステータスの他の加入者に通知するのに役立ちます。

接続にLast Willを設定するには、次のコードを使用します。 オプション:

// 購読するトピックの例
最後の文字列の購読 = "will_topic/device1";

// トピックへのサブスクライブ
client.subscribe(subscribeTopic)。

調節は生きている保ちます

MQTT はクライアント・ブローカー・コネクションを維持します。 常に生き続ける メカニズム。 クライアントの頻度を制御するために、一貫した間隔を調整 メッセージ ブローカーへのPINGREQメッセージ。

あなたの要件に合ったコードを変更します。

connOpts.setKeepAlive(40);

セッションの持続性の設定

MQTTクライアントのセッションデータには、 サブスクリプション クライアントとクライアントがQoS>0で受け取るデータによって作られる。 クライアントは、MQTT ブローカーがセッションデータを接続間で保存することができます。

MQTT 3.1.1 クライアントは、クリーンセッション = 0 を 0 にセットできます リクエスト MQTT ブローカーは、セッション情報をコネクション全体に保持します。

MQTT 3.1.1の特長

インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

MemoryPersistence の永続性 = new MemoryPersistence();
お問い合わせ
クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;
MqttConnectOptions connOpts = 新しいMqttConnectOptions();
お問い合わせ
connOpts.setSocketFactory(ソケットファクトリー)。 // // // // 使用条件 デフォルト ソケット工場
お問い合わせ

connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
connOpts.setCleanSession(クリーニングセッション) // クリーンセッションの設定 ログイン

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);

// MQTT 操作で続けて...

お問い合わせ お問い合わせ
System.out.println("MQTTブローカーに接続するエラー:" + e.getMessage() );
e.printStackTrace() ;
お問い合わせ

MQTTの特長 5 クライアントは、クリーンスタート = 0 とセッションの有効期限を設定できます Interval = 'N' が MQTT ブローカーに要求され、セッション情報を保持する 'N' 秒間の接続

MQTT 3.1.1の特長

インポートorg.eclipse.paho.client.mqttv5.*;
インポートorg.eclipse.paho.client.mqttv5.persist.MemoryPersistence;

MQTT 3.1.1の特長

MemoryPersistence の永続性 = new MemoryPersistence();

お問い合わせ
クライアント = 新しいMqttAsyncClient(brokerUrl、client_id、永続性);

MqttConnectOptions connOpts = 新しいMqttConnectOptions();
お問い合わせ
connOpts.setSocketFactory(ソケットファクトリー)。 // // // // 使用条件 デフォルト ソケット工場
お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
connOpts.setCleanStart(cleanStart) // クリーンスタートフラグの設定
connOpts.setSessionExpiryInterval(sessionExpiryInterval); // セット セッション 有効期限インターバル

// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
IMqttToken の connectToken = client.connect(connOpts)
connectToken.waitForCompletion() は、
System.out.println(「コネクテッド」);

// MQTT 操作で続けて...
お問い合わせ お問い合わせ
System.out.println("MQTTブローカーに接続するエラー:" + e.getMessage() );
e.printStackTrace() ;
お問い合わせ

設定最大 パケット サイズ

MQTT5 クライアントは、MQTT ブローカーにデータを送信するだけを要求することができます このように設定することで、特定のサイズ未満のパケット:

MQTT 3.1.1の特長

connOpts.setMaximumPacketSize(256);

出版情報

データの送信

公開することにより、複数の加入者にデータを効率的に配信 次のコードスニペットで指定されたトピックへ:

MQTT 3.1.1の特長

インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

文字列トピック = "cmq/topic";
文字列 メッセージ = "hello world";
int qos = 1;
boolean は = false を保持します。
client.publish(topic, message.getBytes(), qos, 保持);

MQTT 5の特長

インポートorg.eclipse.paho.client.mqttv5.*;
インポートorg.eclipse.paho.client.mqttv5.persist.MemoryPersistence;

// メッセージを公開する
文字列トピック = "cmq/topic";
文字列 メッセージ = "hello world";
MqttMessage mqttMessage = 新しいMqttMessage(message.getBytes());
mqttMessage.setQos(1);
mqttMessage.setRetained(false);

// メッセージを非同期的に公開する
IMqttDeliveryToken 公開トークン = client.publish(トピック、mqttMessage)
publishToken.waitForCompletion() は、
System.out.println(「メッセージ公開」);

保持メッセージの設定

メッセージの公開時に保持フラグを有効にして、 ブローカーは、各トピックの最後のメッセージを格納します。 これは、新しい加入者を保証する 接続時に最新のメッセージを受信します。

これを実行するには、次のコードスニペットを使用します。

boolean は = false を保持します。
client.publish(topic, message.getBytes(), qos, 保持);

QoSレベルを指定する

MQTTは、サービスの品質(QoS)の3つのレベルを提供します メッセージ配達:

  • QoS 0 (一度に)
  • QoS 1 (少なくとも一度)
  • QoS 2 (一度だけ)

MQTT メッセージを公開する際に必要な QoS レベルを指定します。 このコード:

int qos = 1;
client.publish(topic, message.getBytes(), qos, 保持);

メッセージの有効期限インターバル

「メッセージの有効期限間隔」プロパティは、メッセージの人生を設定します 秒単位でスパン;この時間内に未配信の場合、ブローカーはそれを破棄します。 MQTT5は、この機能をサポートしています。 MQTT5 クライアントは、データを公開する際にこれを設定できます。

MQTT 5の特長

インポートorg.eclipse.paho.mqttv5.common.MqttProperties;
文字列トピック ="topic1";
文字列 メッセージ内容= "hello world";
int qos = 1;
boolean は = false を保持します。
MqttMessagemessage=newMqttMessage(messageContent.getBytes()); ;
メッセージ.setQos(qos)。
message.setRetained(保持);
MqttPropertiesproperties=newMqttProperties(); ; プロパティ.setMessageExpiryInterval(60L);
message.setProperties(プロパティ)
client.publish(トピック、メッセージ)

トピック 別名で

'Topic Alias' プロパティは、クライアントが短いエイリアスを使用することを可能にします トピック名の代わりに、メッセージパケットサイズを減らし、ネットワークを改善 効率。

インポートorg.eclipse.paho.mqttv5.common.MqttProperties;
MqttConnectionOptions オプション=newMqttConnectionOptions();
MqttPropertiesproperties=newMqttProperties(); ;
プロパティ.setTopicAliasMaximum(10);
options.setMqttProperties(プロパティ)
client.connect(オプション)

MQTT PUBLISHに関連付けられている特性はメッセージを高めます ブローカーやクライアントのコンテキストや指示を提供、処理。 これらのプロパティ, メッセージの余分間隔およびトピックのエイリアスを含んで、メッセージ配達を最大限に活用して下さい ネットワーク帯域幅。

会員登録

トピックフィルタへのサブスクライブ

他のクライアントが公開したデータを受信するには、このクライアントは マッチングトピックフィルタを購読する:

文字列トピック = "cmq/topic";
int qos = 1;
client.subscribe(トピック、qos);

このトピックフィルタは、正確なトピックと一致させることができます。 # や + のようなワイルドカード

データの受信

サブスクリプションに送信されたデータを受信するには、コールバック関数 次のように定義する必要があります。

// // // // コールバックの設定
client.setCallback(新しいMqttCallback() {
@オーバーライド
パブリック void 接続 失われた(恐ろしい原因) {
System.out.println("Connection Lost:" + 原因.getMessage()); );
原因。printStackTrace();
// 再接続またはその他のアクションの処理
お問い合わせ
@オーバーライド
パブリック void メッセージ 到着(文字列トピック、MqttMessageメッセージ) スロー 例外 {
System.out.println(「メッセージ受信:」);
System.out.println("トピック:" +トピック);
System.out.println(「システムアウト」) メッセージ:「+新しい文字列(message.getPayload()」);
System.out.println(" QoS:" + message.getQos()); ;
System.out.println(「システムアウト」) 保持: "+ message.isRetained());
// 受信したメッセージを処理する
お問い合わせ
@オーバーライド
公共 void 配達 完全な(IMqttDeliveryTokenトークン) {
System.out.println("Delivery complete:" + token.getMessage()); ); と
// ハンドルの配達確認
お問い合わせ
お問い合わせ

トピックからの購読解除

トピックから更新を受信を停止するには、提供されたコードを使用します 購読解除

// トピックから退会する
文字列トピック="cmq/topic";
client.unsubscribe(トピック);

クライアントの切断

クライアントの接続を適切に終了させる 双方の問題および資源の漏出を避けるブローカー、それによって維持システム 安定性。

以下のコードを使用して、ブローカーからクライアントを切断します。

お問い合わせ
クライアント.disconnect();
System.out.println(「ブローカーから切断」);
お問い合わせ お問い合わせ
System.out.println(「ブローカーから切断するエラー:「+ e.getMessage()」);」);
e.printStackTrace() ;
お問い合わせ

ビジネスロジックの構築

自分で開発してカスタマイズする機会があります この環境内でビジネスロジックを複雑にし、具体的に具体的に ニーズと目的。

ベストプラクティスの実践

クライアントの識別 マネジメント

特定のクライアント ID を個々のデバイスに割り当て、正確な識別を確実にします。 のために プライベートインスタンス, 各クライアントに固有のIDを割り当てます。共有環境では、ランダムな文字列を付加して、 IDの独自性を保証する。

データ構造

データアーキテクチャを積極的に計画します。 プレーンテキストを扱うかどうか、JSON のフォーマット データ、または 数値値、構造がアプリケーションと効果的に整列することを確認します 特定の条件。

強力なエラー処理

MQTT接続を処理する強力なエラー管理を実施 失敗、サブスクリプションの問題、およびメッセージを効果的に公開するエラー。

機密保持

ユーザー名、パスワードなどの機密情報を保護 ソースコードにハードコーディングしないクライアントID。 環境変数または 代わりに設定ファイルを固定します。

定期的なテストと監視

MQTT通信を継続的にテストし、クライアントのメトリックを監視 接続状況、メッセージスループット、エラーレートなど、迅速に識別および修正 問題。

セッションマネジメントの最適化

クリーンで永続的なセッション(`clean: true` または `clean: false`) は、サブスクリプションとキューイングされたメッセージを全体に保持する必要がある クライアントの接続。

接続解除の解除

MQTTブローカーに再接続しようとするコードを追加します。 予期しない切断。 これにより、クライアントが接続され、接続されていないことを確実にします。 データを失います。

コードをダウンロード

Java MQTT クライアントを使用するクライアントのための完全なコードをダウンロード 図書館がつながるMQTTブローカーまたはあなたの選択のブローカー。

MQTT 3.1.1の特長

//パホ mqttv3
インポートorg.eclipse.paho.client.mqttv3.*;
インポートorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
javax.net.sl.SSLSocketFactoryをインポートします。
javax.net.ssl.SSLContext をインポートします。
java.security.cert.Certificate をインポート 工場;
java.security.cert.X509Certificate をインポートします。
java.security.KeyStore をインポートします。
java.io をインポートします。 ファイル入力ストリーム;
java.io.IOException のインポート
java.security.cert.CertificateException のインポート
java.security.NoSuchAlgorithmException をインポートします。
インポート javax.net.sl.TrustManager 工場;
java.security.KeyManagementException のインポート
java.util をインポートします。 ランダム;
java.util をインポートします。 スキャナ;
パブリッククラス MqttClientv3 お問い合わせ
パブリック static void main(String[] args) {
文字列 client_id = "your_client_id";
文字列 hostip = "public-mqtt-broker.bevywise.com";
boolean TLS = false; // TLS を有効にするには true を設定
boolean AUTH = true;
文字列 client_username = "Access-Key";
文字列 client_password = "アクセストークン";
boolean websocket_connection=false; //websocketを有効にするためにtrueを設定
文字列のトピック = "";
int qos = 0;
boolean は = false を保持します。
ストリングブローカーUrl= "";
intの港;
もし(websocket_connection)
ポート= TLS ? 11433:10433;
その他
ポート = TLS ? 8883 : 1883; // SSL/TLS で 8883 を使用

// 新しいMQTTクライアントインスタンスを作成する
MqttClientクライアント
もし(websocket_connection)
BrokerUrl=String.format("%s://%s:%d", TLS ? "ws" : "ws", hostip, port);
その他
ブローカー Url = 文字列.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);

MemoryPersistence の永続性 = new MemoryPersistence();
お問い合わせ
クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;
// コールバックの設定
client.setCallback(新しいMqttCallback() {
@オーバーライド
パブリック void 接続 失われた(恐ろしい原因) {
System.out.println("Connection Lost:" + 原因.getMessage()); );
原因。printStackTrace();
お問い合わせ
@オーバーライド
パブリック void メッセージ 到着(文字列トピック、MqttMessageメッセージ)は例外を投げます お問い合わせ
System.out.println(「トピックに関するメッセージを受け取る」+トピック);
System.out.println("メッセージ:" + new String(message.getPayload()));
お問い合わせ
@オーバーライド
公共 void 配達 完全な(IMqttDeliveryTokenトークン) {
System.out.println(「メッセージが正常に配信されました」)。
お問い合わせ
お問い合わせ
// 接続オプションの設定
MqttConnectOptions connOpts = 新しいMqttConnectOptions();
お問い合わせ
お問い合わせ
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); // // // //
ルートの証明書パスに置き換える
お問い合わせ
System.out.println(「SSLソケット工場の設定エラー」+ e.getMessage() );
e.printStackTrace() ;
お問い合わせ
お問い合わせ
お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.toCharArray())) );
お問い合わせ
connOpts.setCleanSession(true);
// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);
// 購読するトピックの例
最後の文字列の購読 = "will_topic/device1";
// トピックへのサブスクライブ
client.subscribe(subscribeTopic)。
// ユーザの入力に基づいてメッセージやサブスクライブ/サブスクライブを投稿
スキャナスキャナー = 新しいスキャナ(System.in)。
時 (true) {
System.out.println("1. Publish\n2.subscribe\n3. Disconnect\n4. 購読解除
int 選択 = スキャナ.nextInt();
Scanner.nextLine(); // 新規ラインを消費する
スイッチ (choice) {
ケース1:
System.out.print("Enter Topic QoS(0/1/2) 保持(0/1) カウント: ");
文字列[] 入力 = スキャナ.nextLine().split(");
トピック = 入力[0];
qos = Integer.parseInt(input[1]); ;
保持 = Integer.parseInt(input[2]) === 1;
int カウント = Integer.parseInt(input[3]);;
(int i = 0; i < count; i++) { 文字列 message="{\"temp\":" + (new)
Random().nextInt(50) + 1) + ",\" 圧力\":" + (新しいRandom().nextInt(100) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
1 + ",\" ステータス: + (new Random().nextInt(2)) + "}" ; ; ; ; ; ; ; ; ; ; ; ; ;
client.publish(topic, message.getBytes(), qos, 保持);
System.out.println("公開メッセージ:" + メッセージ);
お問い合わせ お問い合わせ
System.out.println(「システムアウト」) エラー公開メッセージ: " + e.getMessage());
e.printStackTrace() ;
お問い合わせ
Thread.sleep(6000); // 6秒
お問い合わせ
ブレイク;
ケース2:
System.out.print(" トピックQoS(0/1/2): ");
文字列[] subInput = Scanner.nextLine().split("););
トピック = subInput[0];
qos = Integer.parseInt(subInput[1]); ;
お問い合わせ
client.subscribe(トピック、qos);
System.out.println(「システムアウト」) トピック: " + トピック;
お問い合わせ お問い合わせ
System.out.println(「システムアウト」) トピックにサブスクライブエラー: " + e.getMessage());
e.printStackTrace() ;
お問い合わせ
ブレイク;
場合3:
お問い合わせ
クライアント.disconnect();
System.out.println(「システムアウト」) ブローカーから切断」。 お問い合わせ { System.out.println("エラー切断")
ブローカーから: " + e.getMessage());
e.printStackTrace() ;
お問い合わせ
ブレイク;
場合4:
System.out.print(「システムアウト」) 退会するトピックを入力してください。
トピック = Scanner.nextLine();
お問い合わせ
client.unsubscribe(トピック);
System.out.println(「システムアウト」) トピックから退会: " + トピック;
お問い合わせ お問い合わせ
System.out.println(「システムアウト」) トピックから購読しないエラー: " + e.getMessage() );
e.printStackTrace() ;
お問い合わせ
ブレイク;
デフォルト:
お問い合わせ _ Invalid Choice お問い合わせ System.out.println("An"An") は、
エラーが発生しました: " + e.getMessage());
e.printStackTrace() ;
お問い合わせ
お問い合わせ
// // // // SSLソケット工場(必要に応じて)を作成する方法
プライベート静的SSLSocket 工場getSocket 工場(String caCrtFile) スロー 例外 {
証明書工場 cf = CertificateFactory.getInstance(" X.509"); ファイル入力ストリーム caInput=new
FileInputStream(caCrtFile)、X509Certificate caCert=(X509Certificate)
cf.generateCertificate(caInput); caInput.close(); キーストア
keyStore=KeyStore.getInstance(KeyStore.getDefaultType()); );
キーストア.load(null、null); キーストア.setCertificateEntry("ca-certificate")
証明書; TrustManager 本社工場
tmf=TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()));
tmf.init(keystore)、SSLContext=SSLContext.getInstance(「TLS」)。
context.init(null, tmf.getTrustManager(), null); を返します。
お問い合わせ _ socketFactory

MQTT 5の特長

//パホmqttv5
インポートorg.eclipse.paho.mqttv5.client.IMqttClient;
org.eclipse.paho.mqttv5.clientを輸入して下さい。 MqttClient;
インポートorg.eclipse.paho.mqttv5.client.MqttConnectionOptions;
インポートorg.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
インポートorg.eclipse.paho.mqttv5.common.MqttException;
インポートorg.eclipse.paho.mqttv5.common.MqttMessage;
インポートorg.eclipse.paho.mqttv5.common.packet.MqttProperties;
インポートorg.eclipse.paho.mqttv5.client.MqttCallback;
インポートorg.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
インポートorg.eclipse.paho.mqttv5.client.IMqttToken;
javax.net.sl.SSLSocketFactoryをインポートします。
javax.net.ssl.SSLContext をインポートします。
java.security.cert.Certificate をインポート 工場;
java.security.cert.X509Certificate をインポートします。
java.security.KeyStore をインポートします。
java.io をインポートします。 ファイル入力ストリーム;
java.io.IOException のインポート
java.security.cert.CertificateException のインポート
java.security.NoSuchAlgorithmException をインポートします。
インポート javax.net.sl.TrustManager 工場;
javax.net.sl.KeyManagerFactoryをインポートします。
java.security.KeyManagementException のインポート
java.util をインポートします。 リスト;
java.util をインポートします。 ランダム;
java.util をインポートします。 スキャナ;
パブリッククラス MqttClientv5 お問い合わせ
パブリック static void main(String[] args) のスロー Exception{
文字列 client_id = "your_client_id";
文字列 hostip = "public-mqtt-broker.bevywise.com";
boolean AUTH = true;
文字列 client_username = "Access-Key";
文字列 client_password = "アクセストークン";
boolean websocket_connection=false; //websocketを有効にするためにtrueを設定
文字列のトピック = "";
int qos = 0;
boolean は = false を保持します。
ストリングブローカーUrl= "";
intの港;
もし(websocket_connection)
port= 10433;
その他
ポート = 1883; // SSL/TLS で 8883 を使用

// 新しいMQTTクライアントインスタンスを作成する
MqttClientクライアント
もし(websocket_connection)
BrokerUrl=String.format("%s://%s:%d"、"ws"、hostip、ポート);
その他
ブローカー Url = 文字列.format("%s://%s:%d", "tcp", hostip, port); で指定します。

MemoryPersistence の永続性 = new MemoryPersistence();
お問い合わせ
クライアント = 新しいMqttClient(ブローカー) Url、client_id、永続性;
// コールバックの設定
client.setCallback(新しいMqttCallback() {
@オーバーライド
未接続(MqttDisconnectResponse DisconnectResponse) {
System.out.println(「ブローカーから切断」);
お問い合わせ
@オーバーライド
パブリック void mqtt エラー発生(MqttException例外) {
System.out.println("MQTT Error:" +except.getMessage()); );
例外。printStackTrace();
お問い合わせ
@オーバーライド
パブリック void メッセージ 到着(文字列トピック、MqttMessageメッセージ)は例外を投げます お問い合わせ
System.out.println(「トピックに関するメッセージを受け取る」+トピック);
System.out.println("メッセージ:" + new String(message.getPayload()));
お問い合わせ
@オーバーライド
公共 void 配達 完全な(IMqttTokenトークン) {
System.out.println(「メッセージが正常に配信されました」)。
お問い合わせ
@オーバーライド
パブリック void authPacketArrived(int resultCode, MqttProperties プロパティ) お問い合わせ
System.out.println(「Auth パケット」は、理由コード「+ 理由コード」で到着しました)。
お問い合わせ
@オーバーライド
パブリック void connectComplete(boolean reconnect, String serverURI) {
System.out.println("ブローカーに接続:" + serverURI);
お問い合わせ
// 接続オプションの設定
MqttConnectionOptions connOpts = 新しいMqttConnectionOptions();
お問い合わせ
お問い合わせ
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); // // // //
ルートの証明書パスに置き換える
お問い合わせ
System.out.println(「SSLソケット工場の設定エラー」+ e.getMessage() );
e.printStackTrace() ;
お問い合わせ
お問い合わせ
お問い合わせ
connOpts.setUserName(client_username) );
connOpts.setPassword(client_password.getBytes())) ); // // // // バイト配列に変換する
お問い合わせ
connOpts.setCleanStart(true); // MQTTv5のクリーンスタート
connOpts.setMaxInflight(50);
// MQTTブローカーに接続する
System.out.println(「ブローカーに接続する:「+ブローカーUrl」);
client.connect(connOpts);
System.out.println(「コネクテッド」);
// 購読するトピックの例
最後の文字列の購読 = "will_topic/device1";
// トピックへのサブスクライブ
client.subscribe(サブスクライブトピック、qos);
// ユーザの入力に基づいてメッセージやサブスクライブ/サブスクライブを投稿
スキャナスキャナー = 新しいスキャナ(System.in)。
時 (true) {
System.out.println("1. Publish\n2.subscribe\n3. Disconnect\n4. 購読解除
int 選択 = スキャナ.nextInt();
Scanner.nextLine(); // 新規ラインを消費する
スイッチ (choice) {
ケース1:
System.out.print("Enter Topic QoS(0/1/2) 保持(0/1) カウント: ");
文字列[] 入力 = スキャナ.nextLine().split(");
トピック = 入力[0];
qos = Integer.parseInt(input[1]); ;
保持 = Integer.parseInt(input[2]) === 1;
int カウント = Integer.parseInt(input[3]);;
(int i = 0; i < count; i++) { //String message="{\"temp\":" + (new)
Random().nextInt(50) + 1) + ",\" 圧力\":" + (新しいRandom().nextInt(100) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
1 + ",\" ステータス: + (new Random().nextInt(2)==0 ? "ON" : "OFF" ) + "}" ;;;
文字列 message="hello world"; MqttMessage mqttMessage=new
MqttMessage(message.getBytes()); mqttMessage.setQos(qos);
mqttMessage.setRetained(保持); System.out.println("メッセージ: 「+メッセージ」
client.publish(トピック、mqttMessage)
System.out.println(「パケット:+i+」) System.out.print(「入力トピックQoS(0/1/2):」);
入力 = スキャナ.nextLine().split(");
トピック = 入力[0];
qos = Integer.parseInt(input[1]); ;
client.subscribe(トピック、qos);
ブレイク;
場合3:
クライアント.disconnect();
System.out.println("ブローカーから切断")); 戻り値 4: System.out.print(「トピックを入力: ");
トピック = Scanner.nextLine();
client.unsubscribe(トピック);
ブレイク;
デフォルト:
System.out.println(「無効な選択」) お問い合わせ (MqttException e) { System.out.println("エラー")
MQTTブローカーに接続する: " + e.getMessage());
e.printStackTrace() ;
お問い合わせ
お問い合わせ
プライベート静的SSLSocket 工場getSocket 工場(String rootCertPath) スロー 例外 {
証明書工場 cf = CertificateFactory.getInstance(" X.509"); ファイル入力ストリーム fis=new
FileInputStream(rootCertPath)、X509Certificate caCert=(X509Certificate)
cf.generateCertificate(fis); fis.close(); キーストア
ks=KeyStore.getInstance("JKS"); ks.load(null、null);
ks.setCertificateEntry("ca-certificate"、caCert); TrustManager 本社工場
tmf=TrustManagerFactory.getInstance("SunX509"); tmf.init(ks); SSLContext
sslContext=SSLContext.getInstance("TLSv1.2"); sslContext.init(null,
tmf.getTrustManager(), null;sslContext.getSocketFactory() を返します。

実行可能なバンドルを作成する

MqttClientv3 を接続するための手順:

コンパイル:javac -cpの /path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3.javaの特長

実行 :java -cpの .:/path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3の特長

(または) JAR で実行する:

  • mkdir -p build/lib build/classes build/META-INF/
  • cp -r MqttClientv3.class ビルド/クラス/
  • cp -r /path/of/org.eclipse.paho.client.mqttv3-1.2.5.jarビルド/lib/
  • ファイル名 MANIFEST を作成します。 MF:
  • マニフェストバージョン: 1.0
  • メインクラス: MqttClientExample
  • クラスパス: lib/org.eclipse.paho.client.mqttv3-1.2.5.jar
  • 上記の内容を、cp -r MANIFEST にコピーします。 MFビルド/メタルインフ/
  • cdビルド jar cvfm MqttClientv3.jar META-INF/MANIFEST.MF -C クラス . -C lib .
  • java -jar MqttClientv3.jar -- java -jar MqttClientv3.jar

MqttClientv5 を接続するための手順:

コンパイル:javac -cp /path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5.javaの特長

実行:java -cpの .:/path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5の特長 (または) JAR で実行する:

mkdir -p build/lib build/classes build/META-INF/

cp -r MqttClientv3.class ビルド/クラス/

cp -r /path/of/org.eclipse.paho.mqttv5.client-1.2.5.jar ビルド/lib/

ファイル名 MANIFEST を作成します。 MF:

マニフェストバージョン: 1.0 メインクラス: MqttClientExample クラスパス: lib/org.eclipse.paho.mqttv5.client-1.2.5.jar

上記内容をコピーする cp -r MANIFEST.MF ビルド/メタイン/

cdビルド jar cvfm MqttClientv5.jar META-INF/MANIFEST.MF -C クラス . -C lib .

java - jar MqttClientv5.jar を --ビルドをどこでも使用して下さい。

クライアントを最先端のアートに接続MQTTブローカーまたは あなたの選択のブローカー。 この強力な組み合わせにより、最適なパフォーマンスが保証され、 すべてのあなたのメッセージングのニーズのための信頼性、堅牢で効率的な方法舗装 システム統合。