Springboot整合mqtt最新教程,完整教程,最佳实践
前言:
关于整合mqtt网上的教程很多,但大部分都是cv来cv去,中间的监听代码也没有讲清楚。教程也是很久之前的。所以决定自己来写一个教程。废话不多说直接开始教程。
本文只有教程,没有其他废话,如果需要请留言,后续更新下一版(包括主题消息的订阅方式改变,其他断线重连方式,EMQX的API对接-监听设备更加方便)。
Springboot整合mqtt采用注解进行监听(第二篇)
第一步安装 EMQX:
MQTT服务用的是EMQX,安装方式请搜索EMQX,移步他们官网。
官网地址:https://www.emqx.io/zh
测试工具请下载MQTTX
安装完成后默认管理地址:ip:18083 账户:admin 密码:public
第二步加入依赖:
org.springframework.integration spring-integration-core org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-stream org.springframework.integration spring-integration-mqtt
第三步加入配置:
在yml文件中加入以下配置
#MQTT客户端 publish: mqtt: host: tcp://127.0.0.1:1883 clientId: mqtt_publish options: userName: GuoShun password: qq1101165230 # 这里表示会话不过期 cleanSession: false # 配置一个默认的主题,加载时不会用到,只能在需要时手动提取 defaultTopic: devops timeout: 1000 KeepAliveInterval: 10 #断线重连方式,自动重新连接与会话不过期配合使用会导致 #断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我 automaticReconnect: true connectionTimeout: 3000 # 最大链接数 maxInflight: 100
第四步创建一个MQTTConfigBuilder类,用来加载yml中的配置
/** * @author by Guoshun * @version 1.0.0 * @description mqtt配置类 * @date 2023/12/12 15:10 */ @Configuration @ConfigurationProperties(MQTTConfigBuilder.PREFIX) @Data public class MQTTConfigBuilder { //配置的名称 public static final String PREFIX = "publish.mqtt"; /** * 服务端地址 */ private String host; /** * 客户端id */ private String clientId; /** * 配置链接项 */ private MqttConnectOptions options; }
第五步创建一个MQTTClientUtils
@Slf4j @Configuration public class MQTTClientUtils { @Autowired private MQTTConfigBuilder mqttConfig; private MqttClient mqttClient; public MQTTClientUtils createDevOpsMQTTClient() { this.createMQTTClient(); return this; } private MQTTClientUtils connect() { try { this.mqttClient.connect(mqttConfig.getOptions()); log.info("MQTTClient连接成功!"); }catch (MqttException mqttException){ mqttException.printStackTrace(); log.error("MQTTClient连接失败!"); } return this; } private MqttClient createMQTTClient() { try{ this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId()); log.info("MQTTClient创建成功!"); return this.mqttClient; }catch (MqttException exception){ exception.printStackTrace(); log.error("MQTTClient创建失败!"); return null; } } /** * 消息发送 * @param topicName * @param message * @return */ public boolean publish(String topicName, String message) { log.info("订阅主题名:{}, message:{}", topicName, message); MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8)); try { this.mqttClient.publish(topicName, mqttMessage); return true; }catch (MqttException exception){ exception.printStackTrace(); return false; } } /** * 消息发送 : retained 默认为 false * "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。 * 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。 * 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。 * @param topicName * @param message * @param qos * @return */ public boolean publish(String topicName, int qos, String message) { log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message); MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8)); try { this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false); return true; }catch (MqttException exception){ exception.printStackTrace(); return false; } } /** * 订阅某个主题 * * @param topicName * @param qos */ public void subscribe(String topicName, int qos) { log.info("订阅主题名:{}, qos:{}", topicName, qos); try { this.mqttClient.subscribe(topicName, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅某个主题 * * @param topicName * @param qos */ public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) { log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass()); try { this.mqttClient.subscribe(topicName, qos, messageListener); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消订阅主题 * @param topicName 主题名称 */ public void cleanTopic(String topicName) { log.info("取消订阅主题名:{}", topicName); try { this.mqttClient.unsubscribe(topicName); } catch (MqttException e) { e.printStackTrace(); } } //这里是初始化方法 @PostConstruct public void initMqttClient(){ //创建连接 MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect(); //这里主要是项目启动时订阅一些主题。看个人需要使用 //mqttClientUtils.subscribe("test/#", 2, new HeartBeatListener()); //MessageCallbackListener订阅主题,接受到该主题消息后交给MessageCallbackListener去处理 mqttClientUtils.subscribe("message/call/back", 2, new MessageCallbackListener()); //需要注意的是new MessageCallbackListener()虽然会接收到消息,但这么做不对。 //举个简单列子:就是你有切面对MessageCallbackListener中重写的方法做一些其他操作, //那么接收到消息后该切面并不会生效,所以不建议这么做,以下是修改过后的。 //@Resource //private MessageCallbackListener messageCallbackListener; //上面两句放到外面好吧!!!评论的大哥想的有点多,还要写个上下文获取bean... //mqttClientUtils.subscribe("message/call/back", 2, messageCallbackListener); } }
使用方式创建一个MQTTService
可以MQTTClientUtils去调用方法,也可以将MQTTService 扩展开来,使用MQTTService 去调用方法
/** * @author by Guoshun * @version 1.0.0 * @description MQTT服务类,负责调用发送消息 * @date 2023/12/12 16:53 */ @Service public class MQTTService { @Resource private MQTTClientUtils mqttClientUtils; /** * 向主题发送消息 * @param topicName * @param message */ public void sendMessage(String topicName, String message){ mqttClientUtils.publish(topicName, message); } /** * 向主题发送消息 * @param topicName 主题名称 * @param qos qos * @param message 具体消息 */ public void sendMessage(String topicName,int qos, String message){ mqttClientUtils.publish(topicName, qos, message); } }
消息监听处理(简单实现)
/** * @author by Guoshun * @version 1.0.0 * @description 消息回调返回 * @date 2023/12/12 17:27 */ @Component public class MessageCallbackListener implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8); System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody); } }
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理!
部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理!
图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!