|
import org.eclipse.paho.client.mqttv3.*; |
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
|
|
|
import javax.net.ssl.*; |
|
import java.security.KeyStore; |
|
import java.security.KeyStoreException; |
|
import java.security.NoSuchAlgorithmException; |
|
import java.security.cert.CertificateException; |
|
import java.util.Properties; |
|
|
|
public class SecureMqttClient { |
|
|
|
public static void main(String[] args) { |
|
String brokerUrl = "ssl://mqtt.yourbroker.com:8883"; // 替换为你的MQTT代理地址和端口 |
|
String clientId = "JavaSecureMqttClient"; |
|
String topic = "test/topic"; |
|
String content = "Hello, MQTT with TLS/SSL!"; |
|
int qos = 2; |
|
|
|
try { |
|
// 配置MQTT客户端 |
|
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); |
|
MqttConnectOptions connOpts = new MqttConnectOptions(); |
|
connOpts.setCleanSession(true); |
|
|
|
// 加载并配置SSL上下文 |
|
SSLContext sslContext = SSLContext.getInstance("TLS"); |
|
KeyStore keyStore = KeyStore.getInstance("JKS"); |
|
// 加载信任库,替换为你的信任库路径和密码 |
|
keyStore.load(SecureMqttClient.class.getResourceAsStream("/path/to/your/truststore.jks"), "yourTrustStorePassword".toCharArray()); |
|
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); |
|
trustManagerFactory.init(keyStore); |
|
|
|
// 初始化SSL上下文 |
|
sslContext.init(null, trustManagerFactory.getTrustManagers(), null); |
|
|
|
// 设置SSL套接字工厂 |
|
SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); |
|
connOpts.setSocketFactory(sslSocketFactory); |
|
|
|
// 设置其他连接选项 |
|
connOpts.setAutomaticReconnect(true); |
|
connOpts.setConnectionTimeout(10); |
|
connOpts.setKeepAliveInterval(60); |
|
|
|
// 注册回调以处理连接丢失、消息到达和消息完成事件 |
|
client.setCallback(new MqttCallback() { |
|
@Override |
|
public void connectionLost(Throwable cause) { |
|
System.out.println("Connection lost: " + cause.getMessage()); |
|
} |
|
|
|
@Override |
|
public void messageArrived(String topic, MqttMessage message) throws Exception { |
|
System.out.println("Message arrived on topic: " + topic); |
|
System.out.println("Message content: " + new String(message.getPayload())); |
|
} |
|
|
|
@Override |
|
public void deliveryComplete(IMqttDeliveryToken token) { |
|
System.out.println("Delivery complete for token: " + token.getToken()); |
|
} |
|
}); |
|
|
|
// 连接到MQTT代理 |
|
client.connect(connOpts); |
|
|
|
// 发布消息 |
|
MqttMessage mqttMessage = new MqttMessage(content.getBytes()); |
|
mqttMessage.setQos(qos); |
|
client.publish(topic, mqttMessage); |
|
|
|
// 订阅主题 |
|
client.subscribe(topic, qos); |
|
|
|
// 等待一段时间以接收消息 |
|
Thread.sleep(5000); |
|
|
|
// 断开连接并关闭客户端 |
|
client.disconnect(); |
|
client.close(); |
|
|
|
} catch (MqttException me) { |
|
System.out.println("MQTT Exception: " + me.getMessage()); |
|
me.printStackTrace(); |
|
} catch (NoSuchAlgorithmException | KeyStoreException | CertificateException | KeyManagementException | InterruptedException e) { |
|
System.out.println("Exception: " + e.getMessage()); |
|
e.printStackTrace(); |
|
} |
|
} |
|
} |