求助: GoLang MQTT 客户端使用问题

huifer · 2024-8-29 15:23:00 · 37 次点击
我遇到的问题:

```
2024-08-29 07:13:10        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_5 , error = pingresp not received, disconnecting
2024-08-29 07:13:14        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_88 , error = pingresp not received, disconnecting
2024-08-29 07:13:15        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_43 , error = pingresp not received, disconnecting
2024-08-29 07:13:15        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_72 , error = pingresp not received, disconnecting
2024-08-29 07:13:15        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_1 , error = pingresp not received, disconnecting
2024-08-29 07:13:17        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_37 , error = pingresp not received, disconnecting
2024-08-29 07:13:18        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_10 , error = pingresp not received, disconnecting
2024-08-29 07:14:13        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_52 , error = pingresp not received, disconnecting
2024-08-29 07:14:18        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_59 , error = pingresp not received, disconnecting
2024-08-29 07:14:19        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_84 , error = pingresp not received, disconnecting
2024-08-29 07:14:19        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_54 , error = pingresp not received, disconnecting
2024-08-29 07:14:21        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_22 , error = pingresp not received, disconnecting
2024-08-29 07:14:22        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_12 , error = pingresp not received, disconnecting
2024-08-29 07:14:23        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_25 , error = pingresp not received, disconnecting
2024-08-29 07:14:24        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:14:26        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_36 , error = pingresp not received, disconnecting
2024-08-29 07:15:08        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_63 , error = pingresp not received, disconnecting
2024-08-29 07:15:16        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_23 , error = pingresp not received, disconnecting
2024-08-29 07:15:19        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_96 , error = pingresp not received, disconnecting
2024-08-29 07:15:20        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_50 , error = pingresp not received, disconnecting
2024-08-29 07:15:25        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_75 , error = pingresp not received, disconnecting
2024-08-29 07:15:30        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_78 , error = pingresp not received, disconnecting
2024-08-29 07:15:36        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_7 , error = pingresp not received, disconnecting
2024-08-29 07:15:39        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:16:17        error        go-iot/mqtt_service.go:40        mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting
```


这是我正在使用的程序代码

```
package main

import (
        "encoding/json"
        "fmt"
        mqtt "github.com/eclipse/paho.mqtt.golang"
        "go.uber.org/zap"
        "sync"
        "time"
)

// MqttInterface 定义了 MQTT 客户端的基本接口
type MqttInterface struct {
        client mqtt.Client
        Id     string
        Chan   chan []byte
        Config MqttConfig
        wg     sync.WaitGroup
}

// NewMqttClient 初始化并返回一个新的 MqttInterface 实例
func NewMqttClient(id string, config MqttConfig) *MqttInterface {
        return &MqttInterface{
                Id:     id,
                Chan:   make(chan []byte, 1000),
                Config: config,
        }
}

// Connect 连接到 MQTT 服务器
func (m *MqttInterface) Connect(host, username, password string, port int) error {
        opts := mqtt.NewClientOptions()
        opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
        opts.SetUsername(username)
        opts.SetAutoReconnect(false)
        opts.SetPassword(password)
        opts.SetClientID(m.Id)
        //opts.SetDefaultPublishHandler(m.messageHandler)
        opts.OnConnectionLost = func(client mqtt.Client, err error) {
                zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
                StopMqttClient(m.Id, m.Config)
        }

        opts.SetOrderMatters(false)
        opts.SetKeepAlive(60 * time.Second)
        // 创建并启动客户端
        client := mqtt.NewClient(opts)
        if token := client.Connect(); token.Wait() && token.Error() != nil {
                return token.Error()
        }

        m.client = client
        return nil
}

// messageHandler 处理接收到的消息
func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {

}

// Subscribe 订阅一个或多个主题
func (m *MqttInterface) Subscribe(topics string) error {
        var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
                m.wg.Add(1)
                defer func() {
                        m.wg.Done()
                        //zap.S().Errorf("mqtt subscribe id = %s , topic = %s", m.Id, msg.Topic())
                }()
                mqttMsg := MQTTMessage{
                        MQTTClientID: m.Id,
                        Message:      string(msg.Payload()),
                }
                jsonData, _ := json.Marshal(mqttMsg)

                m.Chan <- jsonData

        })

        if token.Wait() && token.Error() != nil {
                zap.S().Errorf(token.Error().Error())
                return token.Error()
        }
        return nil
}

// Publish 向一个主题发布消息
func (m *MqttInterface) Publish(topic string, payload interface{}) {
        token := m.client.Publish(topic, 0, false, payload)
        token.Wait()
}

// Disconnect 断开与 MQTT 服务器的连接
func (m *MqttInterface) Disconnect() {
        m.client.Disconnect(250)
}

func (m *MqttInterface) HandlerMsg() {
        for {
                c := <-m.Chan
                PushToQueue("pre_handler", c)

        }
}
```


创建 MQTT 客户端和开启订阅


```
        client := NewMqttClient(clientId,config)
        err := client.Connect(broker, username, password, port)
        if err != nil {
                zap.S().Errorf("mqtt connect err = %v", err)
        return false
        }
        go client.Subscribe(subTopic)
        go client.HandlerMsg()
   
```


请问这个问题应该如何解决。


## 我的尝试
1. 我发起了一个 Issues ,我理解是让消息接收后进行异步处理   https://github.com/eclipse/paho.mqtt.golang/issues/686  

2. 修改程序如下

```
        var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
                go func() {
                        mqttMsg := MQTTMessage{
                                MQTTClientID: m.Id,
                                Message:      string(msg.Payload()),
                        }
                        jsonData, _ := json.Marshal(mqttMsg)

                        m.Chan <- jsonData
                }()

        })
```

上述两个操作均没有得到正常处理。请问应当如何解决这个问题。
举报· 37 次点击
登录 注册 站外分享
1 条回复  
ForrestWang 小成 2024-8-29 16:49:19
不太理解你遇到了什么问题,可以描述一下吗
返回顶部