From 0a116352d672d56cc82c28ed9f8331cc6a59e95d Mon Sep 17 00:00:00 2001 From: qidaijie Date: Thu, 9 Nov 2023 14:13:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=E6=96=B9=E5=BC=8F=EF=BC=9A=E9=80=9A=E8=BF=87=E8=AF=BB?= =?UTF-8?q?=E5=8F=96=E5=A4=96=E9=83=A8=E6=96=87=E4=BB=B6=E5=8A=A0=E8=BD=BD?= =?UTF-8?q?=EF=BC=88GAL-435=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 55 ++++++++++++-- pom.xml | 2 +- properties/default_config.properties | 46 ------------ properties/service_flow_config.properties | 47 ++++-------- .../zdjizhi/common/config/GlobalConfig.java | 74 ------------------- .../common/config/GlobalConfigLoad.java | 70 ------------------ .../zdjizhi/common/config/MergeConfigs.java | 72 ++++++++++++++++++ .../common/config/MergeConfiguration.java | 44 +++++++++++ .../topology/ApplicationProtocolTopology.java | 44 +++++++---- .../utils/functions/map/ResultFlatMap.java | 12 +-- .../statistics/DispersionCountWindow.java | 3 +- .../statistics/MergeCountWindow.java | 25 +++++-- .../com/zdjizhi/utils/general/MetricUtil.java | 45 +++++------ .../com/zdjizhi/utils/kafka/CertUtils.java | 48 ------------ .../zdjizhi/utils/kafka/KafkaConsumer.java | 47 ++++++------ .../zdjizhi/utils/kafka/KafkaProducer.java | 44 +++++------ src/test/java/com/zdjizhi/ConfigTest.java | 8 +- .../java/com/zdjizhi/ConventionalTest.java | 3 +- 18 files changed, 312 insertions(+), 377 deletions(-) delete mode 100644 properties/default_config.properties delete mode 100644 src/main/java/com/zdjizhi/common/config/GlobalConfig.java delete mode 100644 src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java create mode 100644 src/main/java/com/zdjizhi/common/config/MergeConfigs.java create mode 100644 src/main/java/com/zdjizhi/common/config/MergeConfiguration.java delete mode 100644 src/main/java/com/zdjizhi/utils/kafka/CertUtils.java diff --git a/README.md b/README.md index b888ea8..8a1bc55 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,65 @@ # app-protocol-stat-traffic-merge -Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到协议与应用统计表中,使用增量窗口计算,周期15秒。 +Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到协议与应用统计表中,使用增量窗口计算,统计周期5秒,watermark5秒。 ## 数据源 -以下不论基于哪种计算,Topic均为NETWORK-TRAFFIC-METRICS -### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics,聚合粒度为1秒。 -### 2.功能端进行统计产生的Application and Protocol Metrics数据,聚合粒度为1秒。 -## 统计操作 -### 1.过滤name是traffic_application_protocol_stat的数据。 +以下不论基于哪种计算,Topic均为NETWORK-TRAFFIC-METRICS + +### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics,聚合粒度为1秒。(TSG 23.05版本) + +### 2.功能端进行统计产生的Application and Protocol Metrics数据,聚合粒度为1秒。(≥TSG 23.05版本) + +## 操作 + +### 1.过滤Measurement Name是traffic_application_protocol_stat的数据。 + ### 2.基于Tags内容进行分组统计。 + ### 3.拆分protocol_stack_id协议树为多个节点 + #### 例如,ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为: + ##### ETHERNET + ##### ETHERNET.IPv4 + ##### ETHERNET.IPv4.TCP + ##### ETHERNET.IPv4.TCP.https + ##### ETHERNET.IPv4.TCP.https.kingsoft + ##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office + ### 4.app_name仅在终端节点输出。 + ### 5.输出结果时Measurement Name=application_protocol_stat。 + +
+ +## 启动 + +Standalone: + +`flink run [-p parallelism] -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties` + +Yarn: + +`flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=APP-PROTOCOL-STAT-TRAFFIC-MERGE -Dtaskmanager.numberOfTaskSlots=1 -d -p 3 -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties` + +
+ +## 配置项说明 + +|配置项|类型|必填|默认值|含义| +|--|--|--|--|--| +|source.kafka.topic|STRING|Y||数据源的Kafka Topic 名称| +|source.kafka.props.*|STRING|N||数据源的Kafka 消费者连接相关参数| +|startup.mode|STRING|N|group|数据源消费策略(group:从当前消费组的偏移量开始,latest:从分区最新的偏移量开始,earliest:从分区最早的偏移量开始)| +|sink.kafka.topic|STRING|Y||数据输出的Kafka Topic 名称| +|sink.kafka.props.*|STRING|N||数据输出的Kafka 生产者连接相关参数| +|count.window.time|INT|N|5|聚合窗口大小(单位:秒)| +|watermark.max.orderness|INT|N|5|乱序数据的最大延迟时间(单位:秒)| +|log.failures.only|BOOLEAN|N|false|生产者出现错误时任务失败,还是只记录错误信息| +|measurement.name|STRING|N|application_protocol_stat|数据输出时的指标标识名称| diff --git a/pom.xml b/pom.xml index 2568e90..f3e7a70 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi app-protocol-stat-traffic-merge - 1.6 + 2.0.0 app-protocol-stat-traffic-merge http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties deleted file mode 100644 index 89d7987..0000000 --- a/properties/default_config.properties +++ /dev/null @@ -1,46 +0,0 @@ -#====================Kafka KafkaConsumer====================# -#kafka source connection timeout -session.timeout.ms=60000 - -#kafka source poll -max.poll.records=5000 - -#kafka source poll bytes -max.partition.fetch.bytes=31457280 - -#====================Kafka KafkaProducer====================# -#producer重试的次数设置 -retries=0 - -#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 -linger.ms=10 - -#如果在超时之前未收到响应,客户端将在必要时重新发送请求 -request.timeout.ms=30000 - -#producer都是按照batch进行发送的,批次大小,默认:16384 -batch.size=262144 - -#Producer端用于缓存消息的缓冲区大小 -#128M -buffer.memory=134217728 - -#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 -#10M -max.request.size=10485760 - -#生产者压缩模式 none or snappy -producer.kafka.compression.type=snappy - -#生产者ack -producer.ack=1 - -#====================kafka default====================# -#kafka SASL验证用户名-加密 -kafka.user=nsyGpHKGFA4KW0zro9MDdw== - -#kafka SASL及SSL验证密码-加密 -kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ - -#====================Topology Default====================# -measurement.name=application_protocol_stat \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index e401d29..82a236c 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,42 +1,27 @@ -#--------------------------------地址配置------------------------------# -#管理kafka地址 -source.kafka.servers=192.168.44.12:9094 - -#管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 - -#--------------------------------HTTP------------------------------# -#kafka 证书地址 -tools.library=D:\\workerspace\\dat - -#--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=APP-PROTOCOL-TEST +source.kafka.topic=NETWORK-TRAFFIC-METRIC + +source.kafka.props.bootstrap.servers=192.168.44.12:9094 + +source.kafka.props.group.id=appapp-protocol-merge-231109-1 + +source.kafka.props.security.protocol=SASL_PLAINTEXT + +source.kafka.props.sasl.mechanism=PLAIN + +source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; #补全数据 输出 topic sink.kafka.topic=APP-PROTOCOL-TEST-RESULT -#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=livecharts-test-20230423-2 +sink.kafka.props.bootstrap.servers=192.168.44.12:9094 -#--------------------------------topology配置------------------------------# -#consumer 并行度 -source.parallelism=1 +sink.kafka.props.security.protocol=SASL_PLAINTEXT -#map函数并行度 -parse.parallelism=1 +sink.kafka.props.sasl.mechanism=PLAIN -#第一次窗口计算并行度 -window.parallelism=1 +sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; -#producer 并行度 -sink.parallelism=1 - -#预聚合窗口时间 count.window.time=5 -#watermark延迟 -watermark.max.orderness=5 - -#数据源 firewall or agent -metrics.data.source=firewall +watermark.max.orderness=5 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java deleted file mode 100644 index 24702a5..0000000 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.zdjizhi.common.config; - - -import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; - -/** - * @author Administrator - */ -public class GlobalConfig { - - private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - - static { - encryptor.setPassword("galaxy"); - } - - /** - * 协议分隔符,需要转义 - */ - public static final String PROTOCOL_SPLITTER = "\\."; - - - /** - * System - */ - public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism"); - public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name"); - public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism"); - public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism"); - public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness"); - public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); - public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library"); - public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism"); - public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source"); - - /** - * Kafka common - */ - public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user")); - public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin")); - - - /** - * kafka sink config - */ - public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers"); - public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic"); - public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack"); - public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries"); - public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms"); - public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms"); - public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size"); - public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory"); - public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size"); - - - /** - * kafka source config - */ - public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers"); - public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic"); - public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id"); - public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms"); - public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records"); - public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes"); - - - /** - * kafka限流配置-20201117 - */ - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type"); - - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java deleted file mode 100644 index 0ae91e5..0000000 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.zdjizhi.common.config; - -import com.zdjizhi.utils.StringUtil; - -import java.io.IOException; -import java.util.Locale; -import java.util.Properties; - - -/** - * @author Administrator - */ - -public final class GlobalConfigLoad { - - private static Properties propDefault = new Properties(); - private static Properties propService = new Properties(); - - - static String getStringProperty(Integer type, String key) { - if (type == 0) { - return propService.getProperty(key); - } else if (type == 1) { - return propDefault.getProperty(key); - } else { - return null; - } - - } - - static Integer getIntProperty(Integer type, String key) { - if (type == 0) { - return Integer.parseInt(propService.getProperty(key)); - } else if (type == 1) { - return Integer.parseInt(propDefault.getProperty(key)); - } else { - return null; - } - } - - public static Long getLongProperty(Integer type, String key) { - if (type == 0) { - return Long.parseLong(propService.getProperty(key)); - } else if (type == 1) { - return Long.parseLong(propDefault.getProperty(key)); - } else { - return null; - } - } - - public static Boolean getBooleanProperty(Integer type, String key) { - if (type == 0) { - return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); - } else if (type == 1) { - return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); - } else { - return null; - } - } - - static { - try { - propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties")); - } catch (IOException | RuntimeException e) { - propDefault = null; - propService = null; - } - } -} diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java new file mode 100644 index 0000000..738537c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java @@ -0,0 +1,72 @@ +package com.zdjizhi.common.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Containing configuration options for the Fusion application. + * + * @author chaoc + * @since 1.0 + */ +public class MergeConfigs { + + /** + * The prefix for Kafka properties used in the source. + */ + public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; + + /** + * The prefix for Kafka properties used in the sink. + */ + public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; + + + public static final ConfigOption SOURCE_KAFKA_TOPIC = + ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the source."); + + + public static final ConfigOption SINK_KAFKA_TOPIC = + ConfigOptions.key("sink.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the sink."); + + + public static final ConfigOption COUNT_WINDOW_TIME = + ConfigOptions.key("count.window.time") + .intType() + .defaultValue(5) + .withDescription("The aggregate window time"); + + + public static final ConfigOption WARTERMARK_MAX_ORDERNESS = + ConfigOptions.key("watermark.max.orderness") + .intType() + .defaultValue(5) + .withDescription("The aggregate watermark max time"); + + + public static final ConfigOption STARTUP_MODE = + ConfigOptions.key("startup.mode") + .stringType() + .defaultValue("group") + .withDescription("The offset commit mode for the consumer."); + + + public static final ConfigOption LOG_FAILURES_ONLY = + ConfigOptions.key("log.failures.only") + .booleanType() + .defaultValue(false) + .withDescription("Defines whether the producer should fail on errors, or only log them."); + + + public static final ConfigOption MEASUREMENT_NAME = + ConfigOptions.key("measurement.name") + .stringType() + .defaultValue("application_protocol_stat") + .withDescription("The data identification."); +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java new file mode 100644 index 0000000..4da7616 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java @@ -0,0 +1,44 @@ +package com.zdjizhi.common.config; + +import org.apache.flink.configuration.Configuration; + +import java.util.Properties; + +/** + * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling + * properties with a specific prefix. This class allows retrieving properties that start with the + * given `prefix` and converts them into a `java.util.Properties` object. + * + * @author chaoc + * @since 1.0 + */ + +public class MergeConfiguration { + private final Configuration config; + + public MergeConfiguration(final Configuration config) { + this.config = config; + } + + /** + * Retrieves properties from the underlying `Configuration` instance that start with the specified + * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. + * + * @param prefix The prefix to filter properties. + * @return A `java.util.Properties` object containing the properties with the specified prefix. + */ + public Properties getProperties(final String prefix) { + if (prefix == null) { + final Properties props = new Properties(); + props.putAll(config.toMap()); + return props; + } + return config.toMap() + .entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(prefix)) + .collect(Properties::new, (props, e) -> + props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), + Properties::putAll); + } +} diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index 5f046c9..116c45a 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -2,7 +2,8 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.config.GlobalConfig; +import com.zdjizhi.common.config.MergeConfigs; +import com.zdjizhi.common.config.MergeConfiguration; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; @@ -25,6 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; +import static com.zdjizhi.common.config.MergeConfigs.*; + /** * @author qidaijie * @Package com.zdjizhi.topology @@ -36,36 +39,51 @@ public class ApplicationProtocolTopology { public static void main(String[] args) { try { + + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final MergeConfiguration fusionConfiguration = new MergeConfiguration(config); + + + //水印 WatermarkStrategy> strategyForSession = WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS)) - .withTimestampAssigner((element,timestamp) -> element.f2); + .>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS))) + .withTimestampAssigner((element, timestamp) -> element.f2); //数据源 - DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) - .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); + DataStream streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); //解析数据 SingleOutputStreamOperator> parseDataProcess = streamSource.process(new ParsingData()) .assignTimestampsAndWatermarks(strategyForSession) - .name("ParseDataProcess") - .setParallelism(GlobalConfig.PARSE_PARALLELISM); + .name("ParseDataProcess"); //增量聚合窗口 SingleOutputStreamOperator dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy()) - .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) + .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME)))) .reduce(new DispersionCountWindow(), new MergeCountWindow()) - .name("DispersionCountWindow") - .setParallelism(GlobalConfig.WINDOW_PARALLELISM); + .name("DispersionCountWindow"); //拆分数据 SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) - .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); + .name("ResultFlatMap"); //输出 - resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) - .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); + resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), + config.get(SINK_KAFKA_TOPIC), + config.get(LOG_FAILURES_ONLY))); environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE"); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java index 22f2f51..28c01f5 100644 --- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java +++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java @@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.JSONWriter; -import com.zdjizhi.common.config.GlobalConfig; -import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.StringUtil; @@ -20,9 +17,14 @@ import org.apache.flink.util.Collector; */ public class ResultFlatMap implements FlatMapFunction { private static final Log logger = LogFactory.get(); + /** + * 协议分隔符,需要转义 + */ + private static final String PROTOCOL_SPLITTER = "\\."; + @Override - public void flatMap(Metrics metrics, Collector out) throws Exception { + public void flatMap(Metrics metrics, Collector out) { try { Tags tags = metrics.getTags(); String protocolStackId = tags.getProtocol_stack_id(); @@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction { tags.setApp_name(null); StringBuilder stringBuilder = new StringBuilder(); - String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER); + String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER); int protocolIdsNum = protocolIds.length; for (int i = 0; i < protocolIdsNum - 1; i++) { if (StringUtil.isBlank(stringBuilder.toString())) { diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java index 8216320..44276e2 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java @@ -19,8 +19,9 @@ import org.apache.flink.api.java.tuple.Tuple3; public class DispersionCountWindow implements ReduceFunction> { private static final Log logger = LogFactory.get(); + @Override - public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception { + public Tuple3 reduce(Tuple3 value1, Tuple3 value2) { try { Fields cacheData = value1.f1; Fields newData = value2.f1; diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java index 2677855..4766000 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java @@ -2,12 +2,12 @@ package com.zdjizhi.utils.functions.statistics; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.config.GlobalConfig; +import com.zdjizhi.common.config.MergeConfigs; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -18,17 +18,30 @@ import org.apache.flink.util.Collector; * @Description: * @date 2023/4/2314:43 */ -public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> { +public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> { private static final Log logger = LogFactory.get(); + private String NAME = null; + @Override - public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final Configuration configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + + NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME); + + } + + @Override + public void process(String windowKey, Context context, Iterable> input, Collector output) { try { long timestamp_ms = context.window().getStart(); - for (Tuple3 tuple : input) { + for (Tuple3 tuple : input) { Tags tags = tuple.f0; Fields fields = tuple.f1; - Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms); + Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms); output.collect(metrics); } diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java index 6a48bcf..cc8b32c 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java @@ -2,7 +2,6 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.utils.StringUtil; import org.apache.datasketches.hll.HllSketch; @@ -19,8 +18,6 @@ import java.util.Base64; */ public class MetricUtil { private static final Log logger = LogFactory.get(); - private static final String METRICS_DEFAULT_TYPE = "agent"; - /** * 用于对业务指标进行统计 @@ -58,28 +55,26 @@ public class MetricUtil { Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); - if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) { - String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); - return new Fields(sessions, - inBytes, outBytes, inPkts, outPkts, - c2sPkts, s2cPkts, c2sBytes, s2cBytes, - c2sFragments, s2cFragments, - c2sTcpLostBytes, s2cTcpLostBytes, - c2sTcpooorderPkts, s2cTcpooorderPkts, - c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, - c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, - clientIpSketch); - } else { - return new Fields(sessions, - inBytes, outBytes, inPkts, outPkts, - c2sPkts, s2cPkts, c2sBytes, s2cBytes, - c2sFragments, s2cFragments, - c2sTcpLostBytes, s2cTcpLostBytes, - c2sTcpooorderPkts, s2cTcpooorderPkts, - c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, - c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, - null); - } +// String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); +// return new Fields(sessions, +// inBytes, outBytes, inPkts, outPkts, +// c2sPkts, s2cPkts, c2sBytes, s2cBytes, +// c2sFragments, s2cFragments, +// c2sTcpLostBytes, s2cTcpLostBytes, +// c2sTcpooorderPkts, s2cTcpooorderPkts, +// c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, +// c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, +// clientIpSketch); + + return new Fields(sessions, + inBytes, outBytes, inPkts, outPkts, + c2sPkts, s2cPkts, c2sBytes, s2cBytes, + c2sFragments, s2cFragments, + c2sTcpLostBytes, s2cTcpLostBytes, + c2sTcpooorderPkts, s2cTcpooorderPkts, + c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, + c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, + null); } /** diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java deleted file mode 100644 index 877b2e6..0000000 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.zdjizhi.utils.kafka; - -import com.zdjizhi.common.config.GlobalConfig; -import org.apache.kafka.common.config.SslConfigs; - -import java.util.Properties; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.kafka - * @Description: - * @date 2021/9/610:37 - */ -class CertUtils { - /** - * Kafka SASL认证端口 - */ - private static final String SASL_PORT = "9094"; - - /** - * Kafka SSL认证端口 - */ - private static final String SSL_PORT = "9095"; - - /** - * 根据连接信息端口判断认证方式。 - * - * @param servers kafka 连接信息 - * @param properties kafka 连接配置信息 - */ - static void chooseCert(String servers, Properties properties) { - if (servers.contains(SASL_PORT)) { - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";"); - } else if (servers.contains(SSL_PORT)) { - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN); - } - - } -} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index b0bc2fb..397814f 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -1,8 +1,8 @@ package com.zdjizhi.utils.kafka; -import com.zdjizhi.common.config.GlobalConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; @@ -13,35 +13,40 @@ import java.util.Properties; * @date 2021/6/813:54 */ public class KafkaConsumer { - private static Properties createConsumerConfig() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS); - properties.put("group.id", GlobalConfig.GROUP_ID); - properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS); - properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS); - properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES); - properties.put("partition.discovery.interval.ms", "10000"); - - CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties); - - return properties; - } /** * 官方序列化kafka数据 * * @return kafka logs */ - public static FlinkKafkaConsumer getKafkaConsumer() { - FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC, - new SimpleStringSchema(), createConsumerConfig()); + public static FlinkKafkaConsumer getKafkaConsumer(Properties properties, String topic, String startupMode) { - //随着checkpoint提交,将offset提交到kafka - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); + setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); + setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280); - //从消费组当前的offset开始消费 - kafkaConsumer.setStartFromGroupOffsets(); + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); + + switch (startupMode) { + case "group": + kafkaConsumer.setStartFromGroupOffsets(); + break; + case "latest": + kafkaConsumer.setStartFromLatest(); + break; + case "earliest": + kafkaConsumer.setStartFromEarliest(); + break; + default: + kafkaConsumer.setStartFromGroupOffsets(); + } return kafkaConsumer; } + + private static void setDefaultConfig(Properties properties, String key, Object value) { + if (!properties.contains(key)) { + properties.put(key, value); + } + } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index ca62061..c7cd3f2 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -1,9 +1,7 @@ package com.zdjizhi.utils.kafka; -import com.zdjizhi.common.config.GlobalConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Optional; import java.util.Properties; @@ -16,33 +14,29 @@ import java.util.Properties; */ public class KafkaProducer { - private static Properties createProducerConfig() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS); - properties.put("acks", GlobalConfig.PRODUCER_ACK); - properties.put("retries", GlobalConfig.RETRIES); - properties.put("linger.ms", GlobalConfig.LINGER_MS); - properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS); - properties.put("batch.size", GlobalConfig.BATCH_SIZE); - properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY); - properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE); - properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + public static FlinkKafkaProducer getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { + setDefaultConfig(properties, "ack", 1); + setDefaultConfig(properties, "retries", 0); + setDefaultConfig(properties, "linger.ms", 10); + setDefaultConfig(properties, "request.timeout.ms", 30000); + setDefaultConfig(properties, "batch.size", 262144); + setDefaultConfig(properties, "buffer.memory", 134217728); + setDefaultConfig(properties, "max.request.size", 10485760); + setDefaultConfig(properties, "compression.type", "snappy"); - CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties); - - return properties; - } - - - public static FlinkKafkaProducer getKafkaProducer() { - FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( - GlobalConfig.SINK_KAFKA_TOPIC, + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( + topic, new SimpleStringSchema(), - createProducerConfig(), Optional.empty()); + properties, Optional.empty()); - //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 - kafkaProducer.setLogFailuresOnly(true); + kafkaProducer.setLogFailuresOnly(logFailuresOnly); return kafkaProducer; } + + private static void setDefaultConfig(Properties properties, String key, Object value) { + if (!properties.contains(key)) { + properties.put(key, value); + } + } } diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java index 7b9580b..7a2b5d3 100644 --- a/src/test/java/com/zdjizhi/ConfigTest.java +++ b/src/test/java/com/zdjizhi/ConfigTest.java @@ -1,11 +1,10 @@ package com.zdjizhi; import com.zdjizhi.conf.FusionConfiguration; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -30,6 +29,7 @@ public class ConfigTest { System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); + final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( config.get(SOURCE_KAFKA_TOPIC), new SimpleStringSchema(), @@ -41,12 +41,14 @@ public class ConfigTest { sourceStream.process(new ProcessFunction() { @Override - public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception { + public void processElement(String value, ProcessFunction.Context ctx, Collector out) { out.collect(value); } }).print(); + + environment.execute(); } catch (Exception e) { diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java index 7900d61..287b3fb 100644 --- a/src/test/java/com/zdjizhi/ConventionalTest.java +++ b/src/test/java/com/zdjizhi/ConventionalTest.java @@ -1,6 +1,5 @@ package com.zdjizhi; -import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.utils.StringUtil; import org.junit.Test; @@ -22,7 +21,7 @@ public class ConventionalTest { System.out.println(protocol); StringBuffer stringBuffer = new StringBuffer(); String appName = "qq_r2"; - String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER); + String[] protocolIds = protocol.split("\\."); for (String proto : protocolIds) { if (StringUtil.isBlank(stringBuffer.toString())) { stringBuffer.append(proto);