MQTT连接远程MQTT服务器

发布时间 2023-12-28 10:52:14作者: 戒爱学Java

MQTT连接远程MQTT服务器

本篇文章以Spring Boot项目为主进行介绍连接步骤。

1、首先需要给定远程MQTT服务器的地址

Broker Addr:远程地址(域名):1883

1883端口号在TCP/IP协议中被用于MQTT通信。

既然用到MQTT,我们一般都是用来将设备的数据进行上传到云平台进行展示的。因此我们还需要设备号clientId,用户名,密码,用户名一般要求和设备号保持一致,为了方便,不过也是要按服务端的规则来。而且密码的规则则也是需要根据MQTT服务器的要求来进行加密等等。

总的来说,连接MQTT服务器我们需要连接地址,设备号,设备用户名,设备密码。

2、创建Spring Boot项目

1)导入依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

我们采用eclipse.paho包里的mqttv3进行连接。

2)注入地址、设备号、用户名、密码

public class MqttService {

    @Value("${spring.mqtt.url}")
    private String mqttUrl;

    @Value("${spring.mqtt.username}")
    private String mqttUsername;

    @Value("${spring.mqtt.password}")
    private String mqttPassword;


    @Value("${spring.mqtt.clientid}")
    private String clientId;

    private MqttClient client;

    

3)连接并进行发布

继续在MqttService里写,上述的私有client,是为了解决发布和订阅不是同一个客户端的问题,要不然发送出去数据,是无法同时订阅到其他的主题的。

public void connectAndSendMessage(String topic, String message) {
        try {

            MqttClientPersistence persistence = new MemoryPersistence();

            if (client == null  || !client.isConnected()){
                client = new MqttClient(mqttUrl, clientId, persistence);

                MqttConnectOptions connOpts = new MqttConnectOptions();
                connOpts.setCleanSession(true);
                connOpts.setUserName(mqttUsername);
                connOpts.setPassword(mqttPassword.toCharArray());

                client.connect(connOpts);
            }
            client.publish(topic, message.getBytes(), 0, false);
            System.out.println("是否连接"+client.isConnected());

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

4)订阅主题

public void subscribeToTopic(String topic) {
        try {
            MqttClientPersistence persistence = new MemoryPersistence();
            if (client == null  || !client.isConnected()){
                client = new MqttClient(mqttUrl, clientId, persistence);

                MqttConnectOptions connOpts = new MqttConnectOptions();
                connOpts.setCleanSession(true);
                connOpts.setUserName(mqttUsername);
                connOpts.setPassword(mqttPassword.toCharArray());

                client.connect(connOpts);
            }
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {}

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // 处理接收到的消息
                    String receivedMessage = new String(message.getPayload());
                    System.out.println("接收到消息:" + receivedMessage);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {}
            });

            client.subscribe(topic);

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

client == null || !client.isConnected()这一步是为了判断client是否已经创建,如果已经存在就需要在重新创建新的客户端