SpringBoot中使用MQTT实现消息的订阅和发布
背景 java框架SpringBoot通过mQTT通信 控制物联网设备
还是直接上代码
第一步依赖:
<!--mqtt相关依赖--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.14</version> </dependency>
第二步配置文件
#mqtt mqtt: mqttUrl: tcp://127.0.0.1 mqttPort: 1883 mqttUsername: admin mqttPassword: public mqttClientId: aaa # MQTT回调类型 按一个MQTT服务区分 # 如果MQTT服务端换了 回调处理的是新的业务需求 就把这个换了 # 然后在MQTT配置文件中扩展新的回调类 mqttTypeCallback: breakerCallback
第三步 config类
package com.xxx.iotjava.mqtt.config; import com.xxx.iotjava.mqtt.callback.BreakerCallback; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; /** * User:Json * Date: 2024/6/17 **/ @Configuration @Slf4j public class MqttConfig { @Value("${mqtt.mqttUsername}") private String mqttUsername; @Value("${mqtt.mqttPassword}") private String mqttPassword; @Value("${mqtt.mqttUrl}") private String mqttUrl; @Value("${mqtt.mqttPort}") private Integer mqttPort; @Value("${mqtt.mqttClientId}") private String mqttClientId; @Value("${mqttTypeCallback}") private String mqttTypeCallback; private static String breakerCallback = "breakerCallback"; /** * 客户端对象 */ private MqttClient client; /** * 客户端连接服务端 * 目前只支持一个 MQTT服务端 如果后续一个项目有多个MQTT服务端那就设计成工厂模式 */ public boolean connect() { if (isMqtt()){ return false; } try { //new MemoryPersistence() 使用内存持久化 // 优点:不会在文件系统中创建任何文件(如 .lck 文件),适合对会话持久性没有要求的场景。 // 缺点: 客户端断开连接或重启后,会话数据会丢失,无法保留订阅信息和未发送的消息 // String persistenceDirectory = "/path/to/your/mqtt/persistence"; //new MqttDefaultFilePersistence(persistenceDirectory) 使用文件持久化 //如果persistenceDirectory 不写 他默认创建 根目录 linux要给权限 // 优点: 客户端断开连接或重启后,能够保留订阅信息和未发送的消息。这对于需要保持会话状态的应用非常重要 // 缺点 会在指定的目录中创建文件(如 .lck 文件),需要确保指定的目录是有效的,并且应用有权限访问该目录 //创建MQTT客户端对象 client = new MqttClient(mqttUrl + ":" + mqttPort, mqttClientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接服务器都是以新的身份 //如果他为true 会出现一个问题 //当网络断开后,客户端会进行重连,但是重连之前订阅的主题就失效了,不再接受之前订阅主题的消息。 //因为配置里将cleanSession 设为 true ,当客户端掉线时 , //服务器端会清除 客户端 session 。 重连后 客户端会有一个新的session。 // 所以如果大家把他为true 重新连接mqtt后,要注意需要手动再订阅一下主题 // 推荐文档:https://www.cnblogs.com/A-yes/p/9894144.html options.setCleanSession(true); //设置连接用户名 options.setUserName(mqttUsername); //设置连接密码 options.setPassword(mqttPassword.toCharArray()); options.setAutomaticReconnect(true); // 启用自动重连 //设置超时时间,单位为秒 如果在指定的时间内未能建立连接,客户端会放弃连接尝试并抛出异常。 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 // options.setWill("willTopic",(mqttClientId + ":与服务器断开连接").getBytes(),0,false); if (StringUtils.isEmpty(mqttTypeCallback)) { log.error("MQTT回调类型为空,请去java_config配置文件配置!"); } //设置回调 if (breakerCallback.equals(mqttTypeCallback)) { //断路器回调 client.setCallback(new BreakerCallback()); } client.connect(options); return true; } catch (MqttException e) { log.error("MQTT启动报错:" + e.getMessage()); e.printStackTrace(); return false; } } /** * qos * 0 最多一次传递【适用于对消息丢失不敏感的场景,如传感器数据频繁发送,可以接受偶尔的数据丢失】 * 1 至少一次传递 【消息至少传递一次,但可能会重复(即重复消息)】 * 2 仅一次传递 【消息确保仅传递一次,既不会丢失也不会重复。】 * retained * 保留消息:如果 retained 参数设置为 true,消息会被代理保留。代理将记住这个消息,并在新客户端订阅该主题时立即发送这个消息。 * 非保留消息:如果 retained 参数设置为 false,消息不会被保留,只会发送给当前在线并订阅该主题的客户端。 * topic 主题 * message 内容 */ public void publish(int qos, boolean retained, String topic, String message) { log.info("topic为:【"+topic+"】,qos为:【"+qos+"】 mqtt 发布数据为:"+message); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); //代理将记住这个消息,并在新客户端订阅该主题时立即发送这个消息。 mqttMessage.setPayload(message.getBytes()); //主题的目的地,用于发布信息 MqttTopic mqttTopic = client.getTopic(topic); MqttDeliveryToken token; try { //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 token = mqttTopic.publish(mqttMessage); //token.waitForCompletion(); // 等待完成 会堵塞 } catch (MqttException e) { log.warn("ClientId【" + mqttClientId + "】发布失败!主题【" + topic + "】,发布数据为:" + message); e.printStackTrace(); } } /** * 断开连接 */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /*** * 手动连接 * 可用于断线后 手动重连 * ***/ public boolean againConnect() { try { if (client != null && !client.isConnected()) { client.connect(); } return true; } catch (MqttException e) { e.printStackTrace(); return false; } } //验证是否启动mqtt连接 private boolean isMqtt(){ if (StringUtils.isEmpty(mqttUrl) || StringUtils.isEmpty(mqttPort) || StringUtils.isEmpty(mqttUsername) || StringUtils.isEmpty(mqttPassword) || StringUtils.isEmpty(mqttClientId) ) { log.info("==========mqtt 参数不全,无需启动MQTT连接=================="); return true; } return false; } /** * 订阅指定主题 * @param topic 订阅的主题 * @param qos 订阅的服务质量 */ public boolean subscribe(String topic, int qos) { if (isMqtt()){ return false; } try { if (client != null && client.isConnected()) { client.subscribe(topic, qos); log.info("订阅主题 {} 成功!", topic); } else { log.error("MQTT客户端尚未连接,无法订阅主题 {}!", topic); } return true; } catch (MqttException e) { log.error("订阅主题 {} 失败:{}", topic, e.getMessage()); e.printStackTrace(); return false; } } /** * 批量订阅主题 * 消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 * @param topic 订阅的主题集合 * @param qos 订阅的服务质量集合 */ public boolean subscribe(String[] topic, int[] qos) { if (isMqtt()){ return false; } try { if (client != null && client.isConnected()) { client.subscribe(topic, qos); log.info("订阅主题 {} 成功!", topic); } else { log.error("MQTT客户端尚未连接,无法订阅主题 {}!", topic); } return true; } catch (MqttException e) { log.error("订阅主题 {} 失败:{}", topic, e.getMessage()); e.printStackTrace(); return false; } } }
第四步 回调类
package com.xxx.iotjava.mqtt.callback; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xxx.iotjava.entities.BreakerData; import com.xxx.iotjava.enums.breaker.BreakerKeywordsEnum; import com.xxx.iotjava.enums.breaker.BreakerKeywordsValueEnum; import com.xxx.iotjava.enums.breaker.BreakerOperationEnum; import com.xxx.iotjava.service.inteface.IBreakerDataService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import com.alibaba.fastjson.JSONArray; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; /** * User:Json * Date: 2024/6/17 **/ @Component @Slf4j public class BreakerCallback implements MqttCallback { @Autowired IBreakerDataService iBreakerDataService; /** * 与服务器断开的回调 * 这里可以做手动连接 但是配置config类 配置了 自动检测异常 true 这里可以也不做 * options.setAutomaticReconnect(true); // 启用自动重连 */ @Override public void connectionLost(Throwable throwable) { log.error("MQTT连接有异常:" + throwable.getMessage()); } /** * 订阅的回调 * 消息到达的回调 * 注意 如果这个回调方法 如果有异常 报错 ,mqtt会重新连接 * 因为配置文件 设置了 options.setAutomaticReconnect(true); // 启用自动重连 * 如果自动重连了 如果是开启新的会话 以前的订阅会消失 具体操作 再上面的配置文件类说明过了 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { // System.out.println("上报时间:"+ LocalDateTime.now()); // System.out.println(String.format("接收消息主题 : %s",topic)); // System.out.println(String.format("接收消息Qos : %d",mqttMessage.getQos())); // System.out.println(String.format("接收消息内容 : %s",new String(mqttMessage.getPayload()))); // System.out.println(String.format("接收消息retained : %b",mqttMessage.isRetained())); } /** * 发布的回调 * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); log.info(client.getClientId() + "发布消息成功!"); } }
第五步 mqtt工具类
package com.xxx.iotjava.utils; import com.xxx.init.utils.AppContextUtil; import com.xxx.iotjava.enums.breaker.BreakerOperationTopicEnum; import com.xxx.iotjava.mqtt.config.MqttConfig; import lombok.extern.slf4j.Slf4j; /** * User:Json * Date: 2024/6/17 **/ @Slf4j public class MqttUtils { private static MqttConfig mqttConfig; public static MqttConfig getMqttConfig() { if (mqttConfig == null) mqttConfig = AppContextUtil.getBean(MqttConfig.class); return mqttConfig; } //初始化 订阅 public static boolean subscribeInit(){ return getMqttConfig().subscribe(BreakerOperationTopicEnum.REPORTING_API.getTopic(), 0); } /** * 发送消息 * qos 0 最多一次传递 1 至少一次传递 2 仅一次传递 * retained true 保留消息 false 非保留消息 * topic 主题 * message 内容 */ public static boolean sendMqttMsg(int qos, boolean retained, String topic, String message) { try { getMqttConfig().publish(qos, retained, topic, message); return true; } catch (Exception e) { e.printStackTrace(); log.error("MQtt发送消息报错:" + e.getMessage()); return false; } } /* * topic 主题 * message 内容 * */ public static boolean sendMqttMsg(String topic, String message) { return sendMqttMsg(1, false, topic, message); } }
第六步 调用测试
发布
MqttUtils.sendMqttMsg(topic, data)
//订阅 我做的是启动的时候 初始化订阅 所以 直接根据定义的 topic 常量进行初始化订阅
//BreakerOperationTopicEnum.REPORTING_API.getTopic() 我 定义的topic 枚举类 常量
// 这里就不分享了
MqttUtils.subscribeInit();
完成