cloud stream 消费异常处理

ima
ima
2021-08-17 / 0 评论 / 65 阅读
温馨提示:
本文最后更新于2021-08-17,若内容或图片失效,请留言反馈。

消费异常即消息消费过程异常。

在消费过程中==抛出==异常默认会立即重新尝试3次。

然后才开始重试机制

每次重试 过程中抛出异常,都会重新尝试 3次。

需求:即要抛出异常又要使消息消费成功。

spring给我们提供了消息处理方法,原理会涉及到Spring-Integration ,关于Spring-Integration自行了解。

官方描述是这样

Handling Errors

By default, an error channel is configured for the pollable source; if the callback throws an exception, an ErrorMessage is sent to the error channel (<destination>.<group>.errors); this error channel is also bridged to the global Spring Integration errorChannel.

You can subscribe to either error channel with a @ServiceActivator to handle errors; without a subscription, the error will simply be logged and the message will be acknowledged as successful. If the error channel service activator throws an exception, the message will be rejected (by default) and won’t be redelivered. If the service activator throws a RequeueCurrentMessageException, the message will be requeued at the broker and will be again retrieved on a subsequent poll.

If the listener throws a RequeueCurrentMessageException directly, the message will be requeued, as discussed above, and will not be sent to the error channels.

实现起来是这样:

    @ServiceActivator(inputChannel = "ERBADAGANG-TOPIC-01.trek-consumer-group-TREK-TOPIC-01.errors") // destination.group+.errors
    public void handleError(ErrorMessage errorMessage) { // 符合条件进入这里,这里不抛出新的异常 消息消费成功,抛出异常 重试
        log.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
        log.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
        log.error("[handleError][headers:{}]", errorMessage.getHeaders());
    }