1、Kafka consumer和producer的配置类注释掉不需要了

2、添加kafka consumer和producer,消费者还需要做异常处理和生成指令
3、添加KafkaTopicConfig配置类,设置topic的Partition分区数
This commit is contained in:
Hao Miao
2024-01-30 00:00:39 +08:00
parent 11fb439e4c
commit 85410b9e53
7 changed files with 204 additions and 15 deletions

View File

@@ -0,0 +1,54 @@
package com.realtime.protection.configuration.kafka;
import com.realtime.protection.configuration.entity.alert.AlertMessage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
//@Configuration
//@EnableKafka
public class KafkaConsumerConfig {
// @Value("${spring.kafka.bootstrap-servers}")
// private String bootstrapServers;
//
// @Value("${spring.kafka.consumer.group-id}")
// private String groupId;
//
// @Autowired
// private KafkaProperties kafkaProperties;
//
// @Bean
// public Map<String, Object> consumerConfigs() {
// Map<String, Object> props = new HashMap<>();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// return props;
// }
//
// @Bean
// public ConsumerFactory<String, AlertMessage> consumerFactory() {
// return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
// new JsonDeserializer<>(AlertMessage.class));
// }
//
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, AlertMessage> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory<String, AlertMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory());
// return factory;
// }
}

View File

@@ -0,0 +1,4 @@
package com.realtime.protection.configuration.kafka;
public interface KafkaMessage {
}

View File

@@ -0,0 +1,42 @@
package com.realtime.protection.configuration.kafka;
import com.realtime.protection.configuration.entity.alert.AlertMessage;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
//@Configuration
public class KafkaProducerConfig {
// @Value("${spring.kafka.bootstrap-servers}")
// private String bootstrapServers;
//
// @Bean
// public Map<String, Object> producerConfigs() {
// Map<String, Object> props = new HashMap<>();
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// return props;
// }
//
// @Bean
// public ProducerFactory<String, AlertMessage> producerFactory() {
// return new DefaultKafkaProducerFactory<>(producerConfigs());
// }
//
// @Bean
// public KafkaTemplate<String, AlertMessage> kafkaTemplate() {
// return new KafkaTemplate<>(producerFactory());
// }
}

View File

@@ -0,0 +1,13 @@
package com.realtime.protection.configuration.kafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic batchTopic() {
return new NewTopic("testTopic", 4, (short) 1);
}
}

View File

@@ -0,0 +1,25 @@
package com.realtime.protection.server.alertmessage.kafkaConsumer;
import com.realtime.protection.configuration.entity.alert.AlertMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class KafkaProducerController {
KafkaTemplate<String, AlertMessage> kafkaTemplate;
public KafkaProducerController(KafkaTemplate<String, AlertMessage> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/kafkasend")
public void sendMessage(@RequestBody AlertMessage alerm) {
kafkaTemplate.send("topic-test", alerm);
}
}

View File

@@ -0,0 +1,24 @@
package com.realtime.protection.server.alertmessage.kafkaProducer;
import com.realtime.protection.configuration.entity.alert.AlertMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "${spring.kafka.consumer.topic-name}")
public void consume(AlertMessage alerm, Acknowledgment ack) {
try {
log.info("消费者监听到数据:{}", alerm);
// 手动提交
ack.acknowledge();
} catch (Exception e) {
throw new RuntimeException("消费失败,数据: " + alerm, e);
}
}
}

View File

@@ -41,23 +41,42 @@ spring:
topic-name: topic-test
# 消费者组
group-id: TestObjectGroup
# 是否自动提交
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: false
# 消费偏移配置
# none如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常
# earliest各分区下有提交的offset时:从offset开始消费;在各分区下无提交的offset时:从头开始消费
# latest在各分区下有提交的offset时:从offset开始消费;在各分区下无提交的offset时从最新的数据开始消费
auto-offset-reset: earliest
# 指定消息key和消息体的解码方式字符串反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值反序列化使用Json
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest各分区下有提交的offset时,从提交的offset开始消费无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
# none当各分区都存在已提交的offset时,从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-offset-reset: latest
# 键的反序列化方式
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 值反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 信任的包
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
properties:
spring:
json:
trusted:
packages: com.realtime.protection.configuration.entity.alert
spring.json.trusted.packages: com.realtime.protection.configuration.entity.*
# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
#max-poll-records: 3
# properties:
# # 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
# max:
# poll:
# interval:
# ms: 600000
# # 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
# session:
# timeout:
# ms: 10000
producer:
# 重试次数设置大于0的值则客户端会将发送失败的记录重新发送
retries: 3
@@ -67,7 +86,7 @@ spring:
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编码方式字符串序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#值序列化使用Json
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 监听
@@ -80,6 +99,14 @@ spring:
# manual当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
# manual_immediate手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
ack-mode: manual_immediate
# # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
# concurrency: 4
# # 自动提交关闭,需要设置手动消息确认
# ack-mode: manual_immediate
# # 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
# missing-topics-fatal: false
# # 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
# poll-timeout: 600000