pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<scope>compile</scope>
</dependency>
MqttSubConfig.java
package com.iut.com.config;
import com.iut.com.service.ExternalInterfaceService;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.HashMap;
import java.util.Map;
@IntegrationComponentScan
@Configuration
public class MqttSubConfig {
@Autowired
ExternalInterfaceService externalInterfaceService;
private static final String BROKER_URL = "tcp://brokerIp:port";
private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
private static final String TOPIC_FILTER = "topicName";
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inboundChannel() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(BROKER_URL, MQTT_CLIENT_ID, TOPIC_FILTER);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler inboundMessageHandler() {
return message -> {
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
Map<String, Object> mqttData = new HashMap<String, Object>();
mqttData.put(topic , message.getPayload());
// do something
try {
externalInterfaceService.getMqttData(mqttData);
} catch (Exception e) {
e.printStackTrace();
}
};
}
}
MqttPubConfig.java
package com.iut.com.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@IntegrationComponentScan
@Configuration
public class MqttPubConfig {
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://brokerIp:port" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("exoluse", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("topic");
return messageHandler;
}
}
MqttServiceImpl.java
package com.iut.com.service.impl;
import com.iut.com.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
@Service
public class MqttServiceImpl implements MqttService {
@Autowired
MqttGateway mqttGateway;
@Transactional
public int sendToMqtt(Map<String, Object> paramMap) {
mqttGateway.sendToMqtt("AAAAAAAAsdfsdfsdfdsfAAaaaaaa", "topic");
return 0;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
}