在SpringBoot中集成MQTT

MQTT( Message Queuing Telemetry Transport)是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。在实际的开发中,我们通常会用到Spring,这里简单描述一下在SpringBoot中如何集成MQTT。

在Spring的一系列文档中,已经有了对应的集成代码。见:

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置 mqttClientFactory

1
2
3
4
5
6
7
@Bean
public MqttPahoClientFactory mqttClientFactory() {
List<String> urls = mqttUrls().getUrls();
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp:\\localhost:1883");
return factory;
}

配置消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p)
.handle(mqttService.handler())
.get();
}
private MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("customer",
mqttClientFactory(), "test-topic");
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}

配置生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public IntegrationFlow mqttOutFlow() {
return IntegrationFlows.from(outChannel())
.handle(mqttOutbound())
.get();
}
@Bean
public MessageChannel outChannel() {
return new DirectChannel();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test-topic");
return messageHandler;
}
1
2
3
4
@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MessageWriter {
void write(String data);
}

生产者的使用可以为:

1
2
3
4
5
6
@Autowired
MessageWriter messageWriter
void publish(String data){
messageWriter.write(data)
}

遇到的坑以及未解决的问题

  • 生产者和消费者的clientId一定一定不能一样
  • 未解决:多个topic和data的动态生产