优化配置加载方式:通过读取外部文件加载(GAL-435)

This commit is contained in:
qidaijie
2023-11-09 14:13:45 +08:00
parent f765650d9c
commit 0a116352d6
18 changed files with 312 additions and 377 deletions

View File

@@ -1,22 +1,65 @@
# app-protocol-stat-traffic-merge # app-protocol-stat-traffic-merge
Live Traffic Chart统计程序基于协议栈拆分多流聚合存储到协议与应用统计表中使用增量窗口计算周期15秒。 Live Traffic Chart统计程序基于协议栈拆分多流聚合存储到协议与应用统计表中使用增量窗口计算统计周期5秒watermark5秒。
## 数据源 ## 数据源
以下不论基于哪种计算Topic均为NETWORK-TRAFFIC-METRICS
### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics聚合粒度为1秒。
### 2.功能端进行统计产生的Application and Protocol Metrics数据聚合粒度为1秒。
## 统计操作 以下不论基于哪种计算Topic均为NETWORK-TRAFFIC-METRICS
### 1.过滤name是traffic_application_protocol_stat的数据。
### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics聚合粒度为1秒。TSG 23.05版本)
### 2.功能端进行统计产生的Application and Protocol Metrics数据聚合粒度为1秒。≥TSG 23.05版本)
## 操作
### 1.过滤Measurement Name是traffic_application_protocol_stat的数据。
### 2.基于Tags内容进行分组统计。 ### 2.基于Tags内容进行分组统计。
### 3.拆分protocol_stack_id协议树为多个节点 ### 3.拆分protocol_stack_id协议树为多个节点
#### 例如ETHERNET.IPv4.TCP.https.kingsoft.wps_office每个节点ID为 #### 例如ETHERNET.IPv4.TCP.https.kingsoft.wps_office每个节点ID为
##### ETHERNET ##### ETHERNET
##### ETHERNET.IPv4 ##### ETHERNET.IPv4
##### ETHERNET.IPv4.TCP ##### ETHERNET.IPv4.TCP
##### ETHERNET.IPv4.TCP.https ##### ETHERNET.IPv4.TCP.https
##### ETHERNET.IPv4.TCP.https.kingsoft ##### ETHERNET.IPv4.TCP.https.kingsoft
##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office ##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office
### 4.app_name仅在终端节点输出。 ### 4.app_name仅在终端节点输出。
### 5.输出结果时Measurement Name=application_protocol_stat。 ### 5.输出结果时Measurement Name=application_protocol_stat。
<br/>
## 启动
Standalone:
`flink run [-p parallelism] -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
Yarn:
`flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=APP-PROTOCOL-STAT-TRAFFIC-MERGE -Dtaskmanager.numberOfTaskSlots=1 -d -p 3 -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
<br/>
## 配置项说明
|配置项|类型|必填|默认值|含义|
|--|--|--|--|--|
|source.kafka.topic|STRING|Y||数据源的Kafka Topic 名称|
|source.kafka.props.*|STRING|N||数据源的Kafka 消费者连接相关参数|
|startup.mode|STRING|N|group|数据源消费策略group从当前消费组的偏移量开始latest从分区最新的偏移量开始earliest从分区最早的偏移量开始|
|sink.kafka.topic|STRING|Y||数据输出的Kafka Topic 名称|
|sink.kafka.props.*|STRING|N||数据输出的Kafka 生产者连接相关参数|
|count.window.time|INT|N|5|聚合窗口大小(单位:秒)|
|watermark.max.orderness|INT|N|5|乱序数据的最大延迟时间(单位:秒)|
|log.failures.only|BOOLEAN|N|false|生产者出现错误时任务失败,还是只记录错误信息|
|measurement.name|STRING|N|application_protocol_stat|数据输出时的指标标识名称|

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId> <artifactId>app-protocol-stat-traffic-merge</artifactId>
<version>1.6</version> <version>2.0.0</version>
<name>app-protocol-stat-traffic-merge</name> <name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>

View File

@@ -1,46 +0,0 @@
#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
#kafka source poll
max.poll.records=5000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#====================Kafka KafkaProducer====================#
#producer重试的次数设置
retries=0
#他的含义就是说一个Batch被创建之后最多过多久不管这个Batch有没有写满都必须发送出去了
linger.ms=10
#如果在超时之前未收到响应,客户端将在必要时重新发送请求
request.timeout.ms=30000
#producer都是按照batch进行发送的,批次大小,默认:16384
batch.size=262144
#Producer端用于缓存消息的缓冲区大小
#128M
buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
#10M
max.request.size=10485760
#生产者压缩模式 none or snappy
producer.kafka.compression.type=snappy
#生产者ack
producer.ack=1
#====================kafka default====================#
#kafka SASL验证用户名-加密
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#====================Topology Default====================#
measurement.name=application_protocol_stat

View File

@@ -1,42 +1,27 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
sink.kafka.servers=192.168.44.12:9094
#--------------------------------HTTP------------------------------#
#kafka 证书地址
tools.library=D:\\workerspace\\dat
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic #kafka 接收数据topic
source.kafka.topic=APP-PROTOCOL-TEST source.kafka.topic=NETWORK-TRAFFIC-METRIC
source.kafka.props.bootstrap.servers=192.168.44.12:9094
source.kafka.props.group.id=appapp-protocol-merge-231109-1
source.kafka.props.security.protocol=SASL_PLAINTEXT
source.kafka.props.sasl.mechanism=PLAIN
source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
#补全数据 输出 topic #补全数据 输出 topic
sink.kafka.topic=APP-PROTOCOL-TEST-RESULT sink.kafka.topic=APP-PROTOCOL-TEST-RESULT
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据 sink.kafka.props.bootstrap.servers=192.168.44.12:9094
group.id=livecharts-test-20230423-2
#--------------------------------topology配置------------------------------# sink.kafka.props.security.protocol=SASL_PLAINTEXT
#consumer 并行度
source.parallelism=1
#map函数并行度 sink.kafka.props.sasl.mechanism=PLAIN
parse.parallelism=1
#第一次窗口计算并行度 sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
window.parallelism=1
#producer 并行度
sink.parallelism=1
#预聚合窗口时间
count.window.time=5 count.window.time=5
#watermark延迟 watermark.max.orderness=5
watermark.max.orderness=5
#数据源 firewall or agent
metrics.data.source=firewall

View File

@@ -1,74 +0,0 @@
package com.zdjizhi.common.config;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
public class GlobalConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
/**
* 协议分隔符,需要转义
*/
public static final String PROTOCOL_SPLITTER = "\\.";
/**
* System
*/
public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism");
public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness");
public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source");
/**
* Kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin"));
/**
* kafka sink config
*/
public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers");
public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack");
public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries");
public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size");
/**
* kafka source config
*/
public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers");
public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka限流配置-20201117
*/
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type");
}

View File

@@ -1,70 +0,0 @@
package com.zdjizhi.common.config;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
/**
* @author Administrator
*/
public final class GlobalConfigLoad {
private static Properties propDefault = new Properties();
private static Properties propService = new Properties();
static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
} else if (type == 1) {
return propDefault.getProperty(key);
} else {
return null;
}
}
static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
} else if (type == 1) {
return Integer.parseInt(propDefault.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
} else if (type == 1) {
return Long.parseLong(propDefault.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) {
return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
}
static {
try {
propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) {
propDefault = null;
propService = null;
}
}
}

View File

@@ -0,0 +1,72 @@
package com.zdjizhi.common.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Containing configuration options for the Fusion application.
*
* @author chaoc
* @since 1.0
*/
public class MergeConfigs {
/**
* The prefix for Kafka properties used in the source.
*/
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
/**
* The prefix for Kafka properties used in the sink.
*/
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the source.");
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
public static final ConfigOption<Integer> COUNT_WINDOW_TIME =
ConfigOptions.key("count.window.time")
.intType()
.defaultValue(5)
.withDescription("The aggregate window time");
public static final ConfigOption<Integer> WARTERMARK_MAX_ORDERNESS =
ConfigOptions.key("watermark.max.orderness")
.intType()
.defaultValue(5)
.withDescription("The aggregate watermark max time");
public static final ConfigOption<String> STARTUP_MODE =
ConfigOptions.key("startup.mode")
.stringType()
.defaultValue("group")
.withDescription("The offset commit mode for the consumer.");
public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
ConfigOptions.key("log.failures.only")
.booleanType()
.defaultValue(false)
.withDescription("Defines whether the producer should fail on errors, or only log them.");
public static final ConfigOption<String> MEASUREMENT_NAME =
ConfigOptions.key("measurement.name")
.stringType()
.defaultValue("application_protocol_stat")
.withDescription("The data identification.");
}

View File

@@ -0,0 +1,44 @@
package com.zdjizhi.common.config;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
/**
* A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
* properties with a specific prefix. This class allows retrieving properties that start with the
* given `prefix` and converts them into a `java.util.Properties` object.
*
* @author chaoc
* @since 1.0
*/
public class MergeConfiguration {
private final Configuration config;
public MergeConfiguration(final Configuration config) {
this.config = config;
}
/**
* Retrieves properties from the underlying `Configuration` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
public Properties getProperties(final String prefix) {
if (prefix == null) {
final Properties props = new Properties();
props.putAll(config.toMap());
return props;
}
return config.toMap()
.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.collect(Properties::new, (props, e) ->
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
Properties::putAll);
}
}

View File

@@ -2,7 +2,8 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
@@ -25,6 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration; import java.time.Duration;
import static com.zdjizhi.common.config.MergeConfigs.*;
/** /**
* @author qidaijie * @author qidaijie
* @Package com.zdjizhi.topology * @Package com.zdjizhi.topology
@@ -36,36 +39,51 @@ public class ApplicationProtocolTopology {
public static void main(String[] args) { public static void main(String[] args) {
try { try {
// param check
if (args.length < 1) {
throw new IllegalArgumentException("Error: Not found properties path. " +
"\nUsage: flink -c xxx xxx.jar app.properties.");
}
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
final Configuration config = tool.getConfiguration();
environment.getConfig().setGlobalJobParameters(config);
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
//水印
WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
.<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS)) .<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
.withTimestampAssigner((element,timestamp) -> element.f2); .withTimestampAssigner((element, timestamp) -> element.f2);
//数据源 //数据源
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) DataStream<String> streamSource = environment.addSource(
.setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
config.get(SOURCE_KAFKA_TOPIC),
config.get(STARTUP_MODE)));
//解析数据 //解析数据
SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData()) SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
.assignTimestampsAndWatermarks(strategyForSession) .assignTimestampsAndWatermarks(strategyForSession)
.name("ParseDataProcess") .name("ParseDataProcess");
.setParallelism(GlobalConfig.PARSE_PARALLELISM);
//增量聚合窗口 //增量聚合窗口
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy()) SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
.window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
.reduce(new DispersionCountWindow(), new MergeCountWindow()) .reduce(new DispersionCountWindow(), new MergeCountWindow())
.name("DispersionCountWindow") .name("DispersionCountWindow");
.setParallelism(GlobalConfig.WINDOW_PARALLELISM);
//拆分数据 //拆分数据
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
.name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); .name("ResultFlatMap");
//输出 //输出
resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
.setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); config.get(SINK_KAFKA_TOPIC),
config.get(LOG_FAILURES_ONLY)));
environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE"); environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE");
} catch (Exception e) { } catch (Exception e) {

View File

@@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
@@ -20,9 +17,14 @@ import org.apache.flink.util.Collector;
*/ */
public class ResultFlatMap implements FlatMapFunction<Metrics, String> { public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
/**
* 协议分隔符,需要转义
*/
private static final String PROTOCOL_SPLITTER = "\\.";
@Override @Override
public void flatMap(Metrics metrics, Collector<String> out) throws Exception { public void flatMap(Metrics metrics, Collector<String> out) {
try { try {
Tags tags = metrics.getTags(); Tags tags = metrics.getTags();
String protocolStackId = tags.getProtocol_stack_id(); String protocolStackId = tags.getProtocol_stack_id();
@@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
tags.setApp_name(null); tags.setApp_name(null);
StringBuilder stringBuilder = new StringBuilder(); StringBuilder stringBuilder = new StringBuilder();
String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER); String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length; int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) { for (int i = 0; i < protocolIdsNum - 1; i++) {
if (StringUtil.isBlank(stringBuilder.toString())) { if (StringUtil.isBlank(stringBuilder.toString())) {

View File

@@ -19,8 +19,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> { public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
@Override @Override
public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) throws Exception { public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) {
try { try {
Fields cacheData = value1.f1; Fields cacheData = value1.f1;
Fields newData = value2.f1; Fields newData = value2.f1;

View File

@@ -2,12 +2,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
@@ -18,17 +18,30 @@ import org.apache.flink.util.Collector;
* @Description: * @Description:
* @date 2023/4/2314:43 * @date 2023/4/2314:43
*/ */
public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields,Long>, Metrics, String, TimeWindow> { public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields, Long>, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private String NAME = null;
@Override @Override
public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields,Long>> input, Collector<Metrics> output) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration configuration = (Configuration) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
}
@Override
public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields, Long>> input, Collector<Metrics> output) {
try { try {
long timestamp_ms = context.window().getStart(); long timestamp_ms = context.window().getStart();
for (Tuple3<Tags, Fields,Long> tuple : input) { for (Tuple3<Tags, Fields, Long> tuple : input) {
Tags tags = tuple.f0; Tags tags = tuple.f0;
Fields fields = tuple.f1; Fields fields = tuple.f1;
Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms); Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms);
output.collect(metrics); output.collect(metrics);
} }

View File

@@ -2,7 +2,6 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.HllSketch;
@@ -19,8 +18,6 @@ import java.util.Base64;
*/ */
public class MetricUtil { public class MetricUtil {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private static final String METRICS_DEFAULT_TYPE = "agent";
/** /**
* 用于对业务指标进行统计 * 用于对业务指标进行统计
@@ -58,28 +55,26 @@ public class MetricUtil {
Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) { // String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); // return new Fields(sessions,
return new Fields(sessions, // inBytes, outBytes, inPkts, outPkts,
inBytes, outBytes, inPkts, outPkts, // c2sPkts, s2cPkts, c2sBytes, s2cBytes,
c2sPkts, s2cPkts, c2sBytes, s2cBytes, // c2sFragments, s2cFragments,
c2sFragments, s2cFragments, // c2sTcpLostBytes, s2cTcpLostBytes,
c2sTcpLostBytes, s2cTcpLostBytes, // c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpooorderPkts, s2cTcpooorderPkts, // c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, // c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, // clientIpSketch);
clientIpSketch);
} else { return new Fields(sessions,
return new Fields(sessions, inBytes, outBytes, inPkts, outPkts,
inBytes, outBytes, inPkts, outPkts, c2sPkts, s2cPkts, c2sBytes, s2cBytes,
c2sPkts, s2cPkts, c2sBytes, s2cBytes, c2sFragments, s2cFragments,
c2sFragments, s2cFragments, c2sTcpLostBytes, s2cTcpLostBytes,
c2sTcpLostBytes, s2cTcpLostBytes, c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpooorderPkts, s2cTcpooorderPkts, c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, null);
null);
}
} }
/** /**

View File

@@ -1,48 +0,0 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/9/610:37
*/
class CertUtils {
/**
* Kafka SASL认证端口
*/
private static final String SASL_PORT = "9094";
/**
* Kafka SSL认证端口
*/
private static final String SSL_PORT = "9095";
/**
* 根据连接信息端口判断认证方式。
*
* @param servers kafka 连接信息
* @param properties kafka 连接配置信息
*/
static void chooseCert(String servers, Properties properties) {
if (servers.contains(SASL_PORT)) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
}
}
}

View File

@@ -1,8 +1,8 @@
package com.zdjizhi.utils.kafka; package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties; import java.util.Properties;
@@ -13,35 +13,40 @@ import java.util.Properties;
* @date 2021/6/813:54 * @date 2021/6/813:54
*/ */
public class KafkaConsumer { public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", GlobalConfig.GROUP_ID);
properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
/** /**
* 官方序列化kafka数据 * 官方序列化kafka数据
* *
* @return kafka logs * @return kafka logs
*/ */
public static FlinkKafkaConsumer<String> getKafkaConsumer() { public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
//随着checkpoint提交将offset提交到kafka setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true); setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
//从消费组当前的offset开始消费 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
kafkaConsumer.setStartFromGroupOffsets();
switch (startupMode) {
case "group":
kafkaConsumer.setStartFromGroupOffsets();
break;
case "latest":
kafkaConsumer.setStartFromLatest();
break;
case "earliest":
kafkaConsumer.setStartFromEarliest();
break;
default:
kafkaConsumer.setStartFromGroupOffsets();
}
return kafkaConsumer; return kafkaConsumer;
} }
private static void setDefaultConfig(Properties properties, String key, Object value) {
if (!properties.contains(key)) {
properties.put(key, value);
}
}
} }

View File

@@ -1,9 +1,7 @@
package com.zdjizhi.utils.kafka; package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
@@ -16,33 +14,29 @@ import java.util.Properties;
*/ */
public class KafkaProducer { public class KafkaProducer {
private static Properties createProducerConfig() { public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
Properties properties = new Properties(); setDefaultConfig(properties, "ack", 1);
properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS); setDefaultConfig(properties, "retries", 0);
properties.put("acks", GlobalConfig.PRODUCER_ACK); setDefaultConfig(properties, "linger.ms", 10);
properties.put("retries", GlobalConfig.RETRIES); setDefaultConfig(properties, "request.timeout.ms", 30000);
properties.put("linger.ms", GlobalConfig.LINGER_MS); setDefaultConfig(properties, "batch.size", 262144);
properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS); setDefaultConfig(properties, "buffer.memory", 134217728);
properties.put("batch.size", GlobalConfig.BATCH_SIZE); setDefaultConfig(properties, "max.request.size", 10485760);
properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY); setDefaultConfig(properties, "compression.type", "snappy");
properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties); FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
return properties;
}
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
GlobalConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(), new SimpleStringSchema(),
createProducerConfig(), Optional.empty()); properties, Optional.empty());
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 kafkaProducer.setLogFailuresOnly(logFailuresOnly);
kafkaProducer.setLogFailuresOnly(true);
return kafkaProducer; return kafkaProducer;
} }
private static void setDefaultConfig(Properties properties, String key, Object value) {
if (!properties.contains(key)) {
properties.put(key, value);
}
}
} }

View File

@@ -1,11 +1,10 @@
package com.zdjizhi; package com.zdjizhi;
import com.zdjizhi.conf.FusionConfiguration; import com.zdjizhi.conf.FusionConfiguration;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -30,6 +29,7 @@ public class ConfigTest {
System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC), config.get(SOURCE_KAFKA_TOPIC),
new SimpleStringSchema(), new SimpleStringSchema(),
@@ -41,12 +41,14 @@ public class ConfigTest {
sourceStream.process(new ProcessFunction<String, String>() { sourceStream.process(new ProcessFunction<String, String>() {
@Override @Override
public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception { public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) {
out.collect(value); out.collect(value);
} }
}).print(); }).print();
environment.execute(); environment.execute();
} catch (Exception e) { } catch (Exception e) {

View File

@@ -1,6 +1,5 @@
package com.zdjizhi; package com.zdjizhi;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
import org.junit.Test; import org.junit.Test;
@@ -22,7 +21,7 @@ public class ConventionalTest {
System.out.println(protocol); System.out.println(protocol);
StringBuffer stringBuffer = new StringBuffer(); StringBuffer stringBuffer = new StringBuffer();
String appName = "qq_r2"; String appName = "qq_r2";
String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER); String[] protocolIds = protocol.split("\\.");
for (String proto : protocolIds) { for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) { if (StringUtil.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto); stringBuffer.append(proto);