更新时间:2023年10月13日15时28分 来源:传智教育 浏览次数:
Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。
Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景。
Paho Java客户端提供了两个API:
1:MqttAsyncClient提供了一个完全异步的API,其中活动的完成是通过注册的回调通知的。
2:MqttClient是MqttAsyncClient周围的同步包装器,在这里,功能似乎与应用程序同步。
(1)找到项目:emq-demo,添加坐标依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
(2)编写客户端封装类的代码:com.itheima.mqtt.client.EmqClient
/**
* Created by 传智播客*黑马程序员.
*/
@Component
public class EmqClient {
private Logger log = LoggerFactory.getLogger(EmqClient.class);
private IMqttClient mqttClient;
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttCallback mqttCallback;
@PostConstruct
private void init(){
//MqttClientPersistence是接口 实现类有:MqttDefaultFilePersistence;MemoryPersistence
MqttClientPersistence memoryPersistence = new MemoryPersistence();
try {
mqttClient = new
MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence);
} catch (MqttException e) {
log.error("MqttClient初始化失败,brokerurl={},clientId=
{}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
}
}
/**
* 连接broker
* @param username
* @param password
*/
public void connect(String username,String password){
//创建MQTT连接选项对象--可配置mqtt连接相关选项
MqttConnectOptions connectOptions = new MqttConnectOptions();
//自动重连
connectOptions.setAutomaticReconnect(true);
/**
* 设置为true后意味着:客户端断开连接后emq不保留会话保留会话,否则会产生订阅共享队列的存活
客户端收不到消息的情况
* 因为断开的连接还被保留的话,emq会将队列中的消息负载到断开但还保留的客户端,导致存活的客户
端收不到消息
* 解决该问题有两种方案:1.连接断开后不要保持;2.保证每个客户端有固定的clientId
*/
connectOptions.setCleanSession(true);
connectOptions.setUserName(username);
connectOptions.setPassword(password.toCharArray());
//设置mqtt消息回调
mqttClient.setCallback(mqttCallback);
//连接broker
try {
mqttClient.connect(connectOptions);
} catch (MqttException e) {
log.error("连接mqtt broker失败,失败原因:{}",e.getMessage());
}
}
/**
* 发布
* @param topic
* @param msg
*/
public void publish(String topic, String msg, QosEnum qos, boolean retain){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos.value());
mqttMessage.setRetained(retain);
mqttMessage.setPayload(msg.getBytes());
if(mqttClient.isConnected()){
try {
mqttClient.publish(topic,mqttMessage);
} catch (MqttException e) {
log.error("mqtt消息发布失败,topic={},msg={},qos={},retain={},errormsg=
{}",topic,msg,qos,retain,e.getMessage());
}
}
}
/**
* 订阅
* @param topicFilter
* @return
*/
public void subscribe(String topicFilter,QosEnum qos){
try {
mqttClient.subscribe(topicFilter,qos.value());
} catch (MqttException e) {
log.error("订阅失败,topicfilter={},qos={},errormsg=
{}",topicFilter,qos,e.getMessage());
}
}
/**
* 断开连接
*/
@PreDestroy
public void disConnect(){
try {
mqttClient.disconnect();
} catch (MqttException e) {
log.error("断开连接出现异常,errormsg={}",e.getMessage());
}
}
}
需要在application.yml中添加自定义的配置:
mqtt:
broker-url: tcp://192.168.200.129:1883
client-id: demo-client
username: user
password: 123456
同时需要创建属性配置类来加载该配置数据,创建:com.itheima.mqtt.properties.MqttProperties
/**
* Created by 传智播客*黑马程序员.
*/
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String brokerUrl;
private String clientId;
private String username;
private String password;
public String getBrokerUrl() {
return brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "MqttProperties{" +
"brokerUrl='" + brokerUrl + '\'' +
", clientId='" + clientId + '\'' +
", username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
还需创建QoS服务之类枚举:com.itheima.mqtt.enums.QosEnum
/**
* Created by 传智播客*黑马程序员.
*/
public enum QosEnum {
QoS0(0),QoS1(1),QoS2(2);
QosEnum(int qos) {
this.value = qos;
}
private final int value;
public int value(){
return this.value;
}
}
(3)在连接接收到消息之后,我们需要将消息传入消息回调:com.itheima.mqtt.client.MessageCallback
/**
* Created by 传智播客*黑马程序员.
*/
@Component
public class MessageCallback implements MqttCallback {
private Logger log = LoggerFactory.getLogger(MessageCallback.class);
@Override
public void connectionLost(Throwable cause) {
//丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连
log.info("丢失了对broker的连接");
}
/**
* 订阅到消息后的回调
* 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
* 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的
消息都将由
* broker服务器再次发送到客户端
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("订阅到了消息;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload()));
}
/**
* 消息发布完成且收到ack确认后的回调
* QoS0:消息被网络发出后触发一次
* QoS1:当收到broker的PUBACK消息后触发
* QoS2:当收到broer的PUBCOMP消息后触发
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
log.info("消息发送完成,messageId={},topics={}", messageId, topics);
}
}
(4)编写消息发布和订阅的测试,在启动类中添加如下代码。
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties mqttProperties;
@PostConstruct
public void init(){
emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword());
//订阅某一主题
emqClient.subscribe("testtopic/#", QosEnum.QoS2);
//开启一个新的线程向该主题发送消息
new Thread(()->{
while (true){
emqClient.publish("testtopic/123","mqtt msg:"+
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
(5)测试:在Dashboard中开启使用username进行认证的组件,其他组件停止即可,然后启动项目,查看控制台输出即可。