diff --git a/src/main/java/com/realtime/protection/configuration/kafka/KafkaConsumerConfig.java b/src/main/java/com/realtime/protection/configuration/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..eeb95c5 --- /dev/null +++ b/src/main/java/com/realtime/protection/configuration/kafka/KafkaConsumerConfig.java @@ -0,0 +1,54 @@ +package com.realtime.protection.configuration.kafka; + + +import com.realtime.protection.configuration.entity.alert.AlertMessage; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +//@Configuration +//@EnableKafka +public class KafkaConsumerConfig { +// @Value("${spring.kafka.bootstrap-servers}") +// private String bootstrapServers; +// +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// +// @Autowired +// private KafkaProperties kafkaProperties; +// +// @Bean +// public Map consumerConfigs() { +// Map props = new HashMap<>(); +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// return props; +// } +// +// @Bean +// public ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), +// new JsonDeserializer<>(AlertMessage.class)); +// } +// +// @Bean +// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// return factory; +// } +} + diff --git a/src/main/java/com/realtime/protection/configuration/kafka/KafkaMessage.java b/src/main/java/com/realtime/protection/configuration/kafka/KafkaMessage.java new file mode 100644 index 0000000..2eb75cd --- /dev/null +++ b/src/main/java/com/realtime/protection/configuration/kafka/KafkaMessage.java @@ -0,0 +1,4 @@ +package com.realtime.protection.configuration.kafka; + +public interface KafkaMessage { +} diff --git a/src/main/java/com/realtime/protection/configuration/kafka/KafkaProducerConfig.java b/src/main/java/com/realtime/protection/configuration/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..949301b --- /dev/null +++ b/src/main/java/com/realtime/protection/configuration/kafka/KafkaProducerConfig.java @@ -0,0 +1,42 @@ +package com.realtime.protection.configuration.kafka; + +import com.realtime.protection.configuration.entity.alert.AlertMessage; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +//@Configuration +public class KafkaProducerConfig { +// @Value("${spring.kafka.bootstrap-servers}") +// private String bootstrapServers; +// +// @Bean +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); +// return props; +// } +// +// @Bean +// public ProducerFactory producerFactory() { +// return new DefaultKafkaProducerFactory<>(producerConfigs()); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } + + +} diff --git a/src/main/java/com/realtime/protection/configuration/kafka/KafkaTopicConfig.java b/src/main/java/com/realtime/protection/configuration/kafka/KafkaTopicConfig.java new file mode 100644 index 0000000..c8fddd6 --- /dev/null +++ b/src/main/java/com/realtime/protection/configuration/kafka/KafkaTopicConfig.java @@ -0,0 +1,13 @@ +package com.realtime.protection.configuration.kafka; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class KafkaTopicConfig { + @Bean + public NewTopic batchTopic() { + return new NewTopic("testTopic", 4, (short) 1); + } +} diff --git a/src/main/java/com/realtime/protection/server/alertmessage/kafkaConsumer/KafkaProducerController.java b/src/main/java/com/realtime/protection/server/alertmessage/kafkaConsumer/KafkaProducerController.java new file mode 100644 index 0000000..bb24d19 --- /dev/null +++ b/src/main/java/com/realtime/protection/server/alertmessage/kafkaConsumer/KafkaProducerController.java @@ -0,0 +1,25 @@ +package com.realtime.protection.server.alertmessage.kafkaConsumer; + +import com.realtime.protection.configuration.entity.alert.AlertMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@RestController +public class KafkaProducerController { + KafkaTemplate kafkaTemplate; + public KafkaProducerController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @PostMapping("/kafkasend") + public void sendMessage(@RequestBody AlertMessage alerm) { + kafkaTemplate.send("topic-test", alerm); + + } + + +} diff --git a/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java b/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java new file mode 100644 index 0000000..d3f4acf --- /dev/null +++ b/src/main/java/com/realtime/protection/server/alertmessage/kafkaProducer/KafkaConsumerService.java @@ -0,0 +1,24 @@ +package com.realtime.protection.server.alertmessage.kafkaProducer; + +import com.realtime.protection.configuration.entity.alert.AlertMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class KafkaConsumerService { + @KafkaListener(topics = "${spring.kafka.consumer.topic-name}") + public void consume(AlertMessage alerm, Acknowledgment ack) { + try { + log.info("消费者监听到数据:{}", alerm); + // 手动提交 + ack.acknowledge(); + } catch (Exception e) { + throw new RuntimeException("消费失败,数据: " + alerm, e); + } + } + + +} diff --git a/src/main/resources/config/application-kafkatest.yml b/src/main/resources/config/application-kafkatest.yml index f01ffa2..1ee4330 100644 --- a/src/main/resources/config/application-kafkatest.yml +++ b/src/main/resources/config/application-kafkatest.yml @@ -41,23 +41,42 @@ spring: topic-name: topic-test # 消费者组 group-id: TestObjectGroup - # 是否自动提交 + # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false - # 消费偏移配置 - # none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常 - # earliest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从头开始消费 - # latest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从最新的数据开始消费 - auto-offset-reset: earliest - # 指定消息key和消息体的解码方式:字符串反序列化 - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - # 值反序列化:使用Json + # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + auto-offset-reset: latest + # 键的反序列化方式 + key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - # 信任的包 + # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 properties: - spring: - json: - trusted: - packages: com.realtime.protection.configuration.entity.alert + spring.json.trusted.packages: com.realtime.protection.configuration.entity.* + # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + #max-poll-records: 3 + +# properties: +# # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance +# max: +# poll: +# interval: +# ms: 600000 +# # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s +# session: +# timeout: +# ms: 10000 + + + + producer: # 重试次数,设置大于0的值,则客户端会将发送失败的记录重新发送 retries: 3 @@ -67,7 +86,7 @@ spring: buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编码方式:字符串序列化 - key-serializer: org.apache.kafka.common.serialization.StringSerializer + key-serializer: org.springframework.kafka.support.serializer.JsonSerializer #值序列化:使用Json value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 监听 @@ -80,6 +99,14 @@ spring: # manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交 # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种 ack-mode: manual_immediate +# # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 +# concurrency: 4 +# # 自动提交关闭,需要设置手动消息确认 +# ack-mode: manual_immediate +# # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 +# missing-topics-fatal: false +# # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance +# poll-timeout: 600000