我遇到的问题:
```
2024-08-29 07:13:10 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_5 , error = pingresp not received, disconnecting
2024-08-29 07:13:14 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_88 , error = pingresp not received, disconnecting
2024-08-29 07:13:15 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_43 , error = pingresp not received, disconnecting
2024-08-29 07:13:15 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_72 , error = pingresp not received, disconnecting
2024-08-29 07:13:15 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_1 , error = pingresp not received, disconnecting
2024-08-29 07:13:17 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_37 , error = pingresp not received, disconnecting
2024-08-29 07:13:18 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_10 , error = pingresp not received, disconnecting
2024-08-29 07:14:13 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_52 , error = pingresp not received, disconnecting
2024-08-29 07:14:18 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_59 , error = pingresp not received, disconnecting
2024-08-29 07:14:19 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_84 , error = pingresp not received, disconnecting
2024-08-29 07:14:19 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_54 , error = pingresp not received, disconnecting
2024-08-29 07:14:21 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_22 , error = pingresp not received, disconnecting
2024-08-29 07:14:22 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_12 , error = pingresp not received, disconnecting
2024-08-29 07:14:23 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_25 , error = pingresp not received, disconnecting
2024-08-29 07:14:24 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:14:26 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_36 , error = pingresp not received, disconnecting
2024-08-29 07:15:08 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_63 , error = pingresp not received, disconnecting
2024-08-29 07:15:16 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_23 , error = pingresp not received, disconnecting
2024-08-29 07:15:19 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_96 , error = pingresp not received, disconnecting
2024-08-29 07:15:20 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_50 , error = pingresp not received, disconnecting
2024-08-29 07:15:25 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_75 , error = pingresp not received, disconnecting
2024-08-29 07:15:30 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_78 , error = pingresp not received, disconnecting
2024-08-29 07:15:36 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_7 , error = pingresp not received, disconnecting
2024-08-29 07:15:39 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:16:17 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting
```
这是我正在使用的程序代码
```
package main
import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
"sync"
"time"
)
// MqttInterface 定义了 MQTT 客户端的基本接口
type MqttInterface struct {
client mqtt.Client
Id string
Chan chan []byte
Config MqttConfig
wg sync.WaitGroup
}
// NewMqttClient 初始化并返回一个新的 MqttInterface 实例
func NewMqttClient(id string, config MqttConfig) *MqttInterface {
return &MqttInterface{
Id: id,
Chan: make(chan []byte, 1000),
Config: config,
}
}
// Connect 连接到 MQTT 服务器
func (m *MqttInterface) Connect(host, username, password string, port int) error {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
opts.SetUsername(username)
opts.SetAutoReconnect(false)
opts.SetPassword(password)
opts.SetClientID(m.Id)
//opts.SetDefaultPublishHandler(m.messageHandler)
opts.OnConnectionLost = func(client mqtt.Client, err error) {
zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
StopMqttClient(m.Id, m.Config)
}
opts.SetOrderMatters(false)
opts.SetKeepAlive(60 * time.Second)
// 创建并启动客户端
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
m.client = client
return nil
}
// messageHandler 处理接收到的消息
func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {
}
// Subscribe 订阅一个或多个主题
func (m *MqttInterface) Subscribe(topics string) error {
var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
m.wg.Add(1)
defer func() {
m.wg.Done()
//zap.S().Errorf("mqtt subscribe id = %s , topic = %s", m.Id, msg.Topic())
}()
mqttMsg := MQTTMessage{
MQTTClientID: m.Id,
Message: string(msg.Payload()),
}
jsonData, _ := json.Marshal(mqttMsg)
m.Chan <- jsonData
})
if token.Wait() && token.Error() != nil {
zap.S().Errorf(token.Error().Error())
return token.Error()
}
return nil
}
// Publish 向一个主题发布消息
func (m *MqttInterface) Publish(topic string, payload interface{}) {
token := m.client.Publish(topic, 0, false, payload)
token.Wait()
}
// Disconnect 断开与 MQTT 服务器的连接
func (m *MqttInterface) Disconnect() {
m.client.Disconnect(250)
}
func (m *MqttInterface) HandlerMsg() {
for {
c := <-m.Chan
PushToQueue("pre_handler", c)
}
}
```
创建 MQTT 客户端和开启订阅
```
client := NewMqttClient(clientId,config)
err := client.Connect(broker, username, password, port)
if err != nil {
zap.S().Errorf("mqtt connect err = %v", err)
return false
}
go client.Subscribe(subTopic)
go client.HandlerMsg()
```
请问这个问题应该如何解决。
## 我的尝试
1. 我发起了一个 Issues ,我理解是让消息接收后进行异步处理 https://github.com/eclipse/paho.mqtt.golang/issues/686
2. 修改程序如下
```
var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
go func() {
mqttMsg := MQTTMessage{
MQTTClientID: m.Id,
Message: string(msg.Payload()),
}
jsonData, _ := json.Marshal(mqttMsg)
m.Chan <- jsonData
}()
})
```
上述两个操作均没有得到正常处理。请问应当如何解决这个问题。 |
|