我在 github 搜索到一个有点年头了 但是可能有关的 issue , 这个 issue 已经关闭了,本来还想留言来着
https://github.com/eclipse/paho.mqtt.java/issues/323  
我使用 Spring-Integration-mqtt 模块发送和接收 MQTT 消息,发生以下异常:
```html
2024-10-22 10:56:45.161 [][] ERROR o.s.i.handler.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutboundHandler' for component 'mqttOutboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/rms/config/mqtt/MqttConfig.class]'; from source: 'com.rms.config.mqtt.MqttConfig.mqttOutboundHandler(org.eclipse.paho.mqttv5.client.MqttConnectionOptions)'], failedMessage=GenericMessage [payload={"value":[],"unitId":741,"fieldName":"landingCall-down-front-1-32","isRunningData":false,"isError":false}, headers={replyChannel=nullChannel, errorChannel=, mqtt_qos=0, id=1237ba75-e408-10ff-e322-5f692dd8970e, mqtt_topic=status/741/landingCall-down-front-1-32, timestamp=1729565799271}]
    at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:283)
    at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.handleMessageInternal(Mqttv5PahoMessageHandler.java:222)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
    at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
    at java.base/java.lang.VirtualThread.run(VirtualThread.java:311)
Caused by: Internal error, caused by no new message IDs being available (32001)
    at org.eclipse.paho.mqttv5.client.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:32)
    at org.eclipse.paho.mqttv5.client.internal.ClientState.getNextMessageId(ClientState.java:1454)
    at org.eclipse.paho.mqttv5.client.internal.ClientState.send(ClientState.java:511)
    at org.eclipse.paho.mqttv5.client.internal.ClientComms.internalSend(ClientComms.java:155)
    at org.eclipse.paho.mqttv5.client.internal.ClientComms.sendNoWait(ClientComms.java:218)
    at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1530)
    at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1499)
    at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:271)
    ... 10 more
```
环境:
- SpringBoot 3.2.5
- java21
- PahoMQTT1.2.5
下面是我的核心逻辑。我使用 Spring-Integration 集成流程配置了两个 Mqtt 客户端。一个用于接收信息,另一个用于发送消息。发送消息的频率约为每秒 5000 条消息。
```yaml
mqtt:
  client-id-inbound: rms-inbound
  client-id-outbound: rms-outbound
  url: tcp://127.0.0.1:1883
  username: rms
  password: 123456
```
```
import java.util.concurrent.Executors;


@Configuration
@IntegrationComponentScan("com.rms.config")
@Slf4j
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfig {
    private String clientIdInbound;
    private String clientIdOutbound;
    private String url;
    private String password;
    private String username;
    @Bean
    public MqttConnectionOptions mqttConnectOptions(){
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setServerURIs(new String[] { url});
        options.setUserName(username);
        options.setPassword(password.getBytes());
        options.setAutomaticReconnect(true);
        return options;
    }
    @Bean
    public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager(MqttConnectionOptions options) {
        Mqttv5ClientManager clientManager = new Mqttv5ClientManager(options, clientIdInbound);
        clientManager.setPersistence(new MqttDefaultFilePersistence());
        return clientManager;
    }

    @Bean
    public SimpleMessageConverter simpleMessageConverter(){
        return new SimpleMessageConverter();
    }

    @Bean
    public MessageHandler mqttOutboundHandler(MqttConnectionOptions connectionOptions) {
        Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(connectionOptions,clientIdOutbound);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("defaultTopic");
        messageHandler.setDefaultQos(MqttQoS.AT_MOST_ONCE.value());
        messageHandler.setConverter(simpleMessageConverter());
        return messageHandler;
    }


    @Bean
    public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
        return IntegrationFlow.from("mqttOutboundChannel")
                .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                .handle(mqttOutboundHandler)
                .get();
    }

    @Bean
    public IntegrationFlow statusInboundFlow(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManage){
        Mqttv5PahoMessageDrivenChannelAdapter messageProducer  =
                new Mqttv5PahoMessageDrivenChannelAdapter(clientManage, "status/+/#");
        return IntegrationFlow.from(messageProducer)
                .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                .transform(Transformers.objectToString())
                .transform(Transformers.fromJson(Prop.class))
                .route(Prop.class,prop->{
                    if (Boolean.TRUE.equals(prop.getIsError())){
                        return "errorDataChannel";
                    }
                    else if (Boolean.TRUE.equals(prop.getIsRunningData())){
                        return "runningDataChannel";
                    }
                    else {
                        return "discardChannel";
                    }
                })
                .get();
    }
    @Bean
    public CanProtocolLoader canProtocolLoader(){
        return new CanProtocolLoader();
    }
    @Bean
    public UnitErrorHandler errorLogHandler(LogService logService ){
        return new UnitErrorHandler(logService);
    }
    @Bean
    public UnitRunningDataHandler unitRunningDataHandler(LogService logService){
        return new UnitRunningDataHandler(logService);
    }
    @Bean
    public IntegrationFlow errorLogChannelFlow(UnitErrorHandler unitErrorHandler){
        return IntegrationFlow.from("errorDataChannel")
                .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                .handle(unitErrorHandler)
                .get();
    }
    @Bean
    public IntegrationFlow runningDataChannelFlow(UnitRunningDataHandler runningDataHandler){
        return IntegrationFlow.from("runningDataChannel")
                .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                .handle(runningDataHandler)
                .get();
    }
  
    @Bean
    public IntegrationFlow discardChannelFlow(){
        return IntegrationFlow.from("discardChannel")
                .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                .handle(message -> {
                })
                .get();
    }
}
```
```java
@Slf4j
@RequiredArgsConstructor
public class UnitErrorHandler implements MessageHandler {
    private final LogService logService;

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        Prop<?> prop = (Prop<?>) message.getPayload();
        logService.saveErrorLog(prop);
    }

}
```
我使用 MessagingGateway 注释将消息定向到我的 mqttOutboundChannel ,以便我可以使用 MqttGateway 发送 mqtt 消息
```java

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data);

    void sendToTopic(String payload, @Header(MqttHeaders.TOPIC)String topic);

    void sendToTopic(String payload, @Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS ) int qos);
    void sendWithResp(String payload, @Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.RESPONSE_TOPIC) String responseTopic,@Header(MqttHeaders.QOS ) int qos);
}

```


当消息投递率很低,一般每秒不到 200 条消息时,我一开始提到的错误就不会出现,但是当我将消息投递率提高到 5000 条时,按照上面的配置就开始出现报错,另外如果我去掉了 mqtt 消息的入站部分,也就是上面的代码中引入 的 statusInboundFlow 集成 Flow 后  发送消息又不报错了
举报· 69 次点击
登录 注册 站外分享
4 条回复  
maokg 初学 2024-10-22 13:17:47
Caused by: Internal error, caused by no new message IDs being available (32001),好像是 id 的问题
ZGame 初学 2024-10-22 13:23:53
@129duckflew  我猜是背压问题  我感觉可能是你代码写的有问题
返回顶部