diff --git a/src/main/java/com/realtime/protection/configuration/kafka/MyKafkaListenerErrorHandler.java b/src/main/java/com/realtime/protection/configuration/kafka/MyKafkaListenerErrorHandler.java new file mode 100644 index 0000000..4abaf8c --- /dev/null +++ b/src/main/java/com/realtime/protection/configuration/kafka/MyKafkaListenerErrorHandler.java @@ -0,0 +1,33 @@ +package com.realtime.protection.configuration.kafka; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; + +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.lang.NonNull; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { + + @Override + @NonNull + public Object handleError(@NonNull Message message, @NonNull ListenerExecutionFailedException exception) { + return new Object(); + } + + @Override + @NonNull + public Object handleError(@NonNull Message message, @NonNull ListenerExecutionFailedException exception, Consumer consumer) { + + log.error("消息详情:" + message); + log.error("异常信息::" + exception.getCause()); + log.error("消费者详情::" + consumer.groupMetadata()); + log.error("监听主题::" + consumer.listTopics()); + return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); + } + + + } diff --git a/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageController.java b/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageController.java index 69a29aa..0e78a9e 100644 --- a/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageController.java +++ b/src/main/java/com/realtime/protection/server/alertmessage/AlertMessageController.java @@ -12,7 +12,7 @@ import org.springframework.web.bind.annotation.*; public class AlertMessageController { private final AlertMessageService alertMessageService; - public AlertMessageController(final AlertMessageService alertMessageService) { + public AlertMessageController(AlertMessageService alertMessageService) { this.alertMessageService = alertMessageService; } diff --git a/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java b/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java index d3f4acf..7e13272 100644 --- a/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java +++ b/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java @@ -1,6 +1,7 @@ package com.realtime.protection.server.alertmessage.kafkaProducer; import com.realtime.protection.configuration.entity.alert.AlertMessage; +import com.realtime.protection.server.alertmessage.AlertMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; @@ -9,15 +10,21 @@ import org.springframework.stereotype.Service; @Slf4j @Service public class KafkaConsumerService { - @KafkaListener(topics = "${spring.kafka.consumer.topic-name}") - public void consume(AlertMessage alerm, Acknowledgment ack) { - try { - log.info("消费者监听到数据:{}", alerm); - // 手动提交 - ack.acknowledge(); - } catch (Exception e) { - throw new RuntimeException("消费失败,数据: " + alerm, e); - } + private final AlertMessageService alertMessageService; + public KafkaConsumerService(AlertMessageService alertMessageService){ + this.alertMessageService = alertMessageService; + } + + @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${spring.kafka.consumer.topic-name}", + errorHandler = "myKafkaListenerErrorHandler", + properties = {"spring.json.value.default.type=com.realtime.protection.configuration.entity.alert.AlertMessage"}) + public void consume(AlertMessage alert, Acknowledgment ack) { + log.info("消费者监听到数据:{}", alert); + + alertMessageService.processAlertMessage(alert); + ack.acknowledge(); + // 手动提交 + } diff --git a/src/main/resources/config/application-kafkatest.yml b/src/main/resources/config/application-kafkatest.yml index 1ee4330..7ebee3d 100644 --- a/src/main/resources/config/application-kafkatest.yml +++ b/src/main/resources/config/application-kafkatest.yml @@ -49,9 +49,9 @@ spring: # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest # 键的反序列化方式 - key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 properties: spring.json.trusted.packages: com.realtime.protection.configuration.entity.* @@ -62,8 +62,11 @@ spring: # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 #max-poll-records: 3 + spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer -# properties: + + # properties: # # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance # max: # poll: @@ -74,9 +77,6 @@ spring: # timeout: # ms: 10000 - - - producer: # 重试次数,设置大于0的值,则客户端会将发送失败的记录重新发送 retries: 3