Merge branch 'fix/23.09' into 'master'

删除metric指标client_ip_sketch。(TSG-17949)

See merge request galaxy/tsg_olap/app-protocol-stat-traffic-merge!2
This commit is contained in:
戚岱杰
2024-01-18 05:43:59 +00:00
30 changed files with 631 additions and 939 deletions

View File

@@ -1,22 +1,65 @@
# app-protocol-stat-traffic-merge # app-protocol-stat-traffic-merge
Live Traffic Chart统计程序基于协议栈拆分多流聚合存储到协议与应用统计表中使用增量窗口计算周期15秒。 Live Traffic Chart统计程序基于协议栈拆分多流聚合存储到协议与应用统计表中使用增量窗口计算统计周期5秒watermark5秒。
## 数据源 ## 数据源
以下不论基于哪种计算Topic均为NETWORK-TRAFFIC-METRICS
### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics聚合粒度为1秒。
### 2.功能端进行统计产生的Application and Protocol Metrics数据聚合粒度为1秒。
## 统计操作 以下不论基于哪种计算Topic均为NETWORK-TRAFFIC-METRICS
### 1.过滤name是traffic_application_protocol_stat的数据。
### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics聚合粒度为1秒。TSG 23.05版本)
### 2.功能端进行统计产生的Application and Protocol Metrics数据聚合粒度为1秒。≥TSG 23.05版本)
## 操作
### 1.过滤Measurement Name是traffic_application_protocol_stat的数据。
### 2.基于Tags内容进行分组统计。 ### 2.基于Tags内容进行分组统计。
### 3.拆分protocol_stack_id协议树为多个节点 ### 3.拆分protocol_stack_id协议树为多个节点
#### 例如ETHERNET.IPv4.TCP.https.kingsoft.wps_office每个节点ID为 #### 例如ETHERNET.IPv4.TCP.https.kingsoft.wps_office每个节点ID为
##### ETHERNET ##### ETHERNET
##### ETHERNET.IPv4 ##### ETHERNET.IPv4
##### ETHERNET.IPv4.TCP ##### ETHERNET.IPv4.TCP
##### ETHERNET.IPv4.TCP.https ##### ETHERNET.IPv4.TCP.https
##### ETHERNET.IPv4.TCP.https.kingsoft ##### ETHERNET.IPv4.TCP.https.kingsoft
##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office ##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office
### 4.app_name仅在终端节点输出。 ### 4.app_name仅在终端节点输出。
### 5.输出结果时Measurement Name=application_protocol_stat。 ### 5.输出结果时Measurement Name=application_protocol_stat。
<br/>
## 启动
Standalone:
`flink run [-p parallelism] -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
Yarn:
`flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=APP-PROTOCOL-STAT-TRAFFIC-MERGE -Dtaskmanager.numberOfTaskSlots=1 -d -p 3 -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
<br/>
## 配置项说明
|配置项|类型|必填|默认值|含义|
|--|--|--|--|--|
|source.kafka.topic|STRING|Y||数据源的Kafka Topic 名称|
|source.kafka.props.*|STRING|N||数据源的Kafka 消费者连接相关参数|
|startup.mode|STRING|N|group|数据源消费策略group从当前消费组的偏移量开始latest从分区最新的偏移量开始earliest从分区最早的偏移量开始|
|sink.kafka.topic|STRING|Y||数据输出的Kafka Topic 名称|
|sink.kafka.props.*|STRING|N||数据输出的Kafka 生产者连接相关参数|
|count.window.time|INT|N|5|聚合窗口大小(单位:秒)|
|watermark.max.orderness|INT|N|5|乱序数据的最大延迟时间(单位:秒)|
|log.failures.only|BOOLEAN|N|false|true生产者出现错误时任务失败false只记录错误信息|
|measurement.name|STRING|N|application_protocol_stat|数据输出时的指标标识名称|

29
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId> <artifactId>app-protocol-stat-traffic-merge</artifactId>
<version>230710-Time</version> <version>2.0.1</version>
<name>app-protocol-stat-traffic-merge</name> <name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>
@@ -199,27 +199,6 @@
<version>${jasypt.version}</version> <version>${jasypt.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
@@ -227,12 +206,6 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>${datasketches.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>

View File

@@ -0,0 +1,27 @@
#kafka 接收数据topic
source.kafka.topic=NETWORK-TRAFFIC-METRIC-TEST
source.kafka.props.bootstrap.servers=192.168.44.12:9094
source.kafka.props.group.id=appapp-protocol-merge-231109-1
source.kafka.props.security.protocol=SASL_PLAINTEXT
source.kafka.props.sasl.mechanism=PLAIN
source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
#补全数据 输出 topic
sink.kafka.topic=APP-PROTOCOL-TEST-RESULT
sink.kafka.props.bootstrap.servers=192.168.44.12:9094
sink.kafka.props.security.protocol=SASL_PLAINTEXT
sink.kafka.props.sasl.mechanism=PLAIN
sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
count.window.time=5
watermark.max.orderness=5

View File

@@ -1,46 +0,0 @@
#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
#kafka source poll
max.poll.records=5000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#====================Kafka KafkaProducer====================#
#producer重试的次数设置
retries=0
#他的含义就是说一个Batch被创建之后最多过多久不管这个Batch有没有写满都必须发送出去了
linger.ms=10
#如果在超时之前未收到响应,客户端将在必要时重新发送请求
request.timeout.ms=30000
#producer都是按照batch进行发送的,批次大小,默认:16384
batch.size=262144
#Producer端用于缓存消息的缓冲区大小
#128M
buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
#10M
max.request.size=10485760
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
#生产者ack
producer.ack=1
#====================kafka default====================#
#kafka SASL验证用户名-加密
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#====================Topology Default====================#
measurement.name=application_protocol_stat

View File

@@ -1,41 +0,0 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
sink.kafka.servers=192.168.44.12:9094
#--------------------------------HTTP------------------------------#
#kafka 证书地址
tools.library=D:\\workerspace\\dat
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
source.kafka.topic=etl-test
#补全数据 输出 topic
sink.kafka.topic=etl-test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=livecharts-test-20230423-1
#--------------------------------topology配置------------------------------#
#consumer 并行度
source.parallelism=1
#map函数并行度
parse.parallelism=1
#第一次窗口计算并行度
window.parallelism=1
#producer 并行度
sink.parallelism=1
#初次随机预聚合窗口时间
count.window.time=15
#数据源 firewall or agent
metrics.data.source=firewall

View File

@@ -1,73 +0,0 @@
package com.zdjizhi.common.config;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
public class GlobalConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
/**
* 协议分隔符,需要转义
*/
public static final String PROTOCOL_SPLITTER = "\\.";
/**
* System
*/
public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism");
public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source");
/**
* Kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin"));
/**
* kafka sink config
*/
public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers");
public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack");
public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries");
public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size");
/**
* kafka source config
*/
public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers");
public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka限流配置-20201117
*/
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type");
}

View File

@@ -1,70 +0,0 @@
package com.zdjizhi.common.config;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
/**
* @author Administrator
*/
public final class GlobalConfigLoad {
private static Properties propDefault = new Properties();
private static Properties propService = new Properties();
static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
} else if (type == 1) {
return propDefault.getProperty(key);
} else {
return null;
}
}
static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
} else if (type == 1) {
return Integer.parseInt(propDefault.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
} else if (type == 1) {
return Long.parseLong(propDefault.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) {
return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
}
static {
try {
propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) {
propDefault = null;
propService = null;
}
}
}

View File

@@ -0,0 +1,79 @@
package com.zdjizhi.common.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Containing configuration options for the Fusion application.
*
* @author chaoc
* @since 1.0
*/
public class MergeConfigs {
/**
* The prefix for Kafka properties used in the source.
*/
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
/**
* The prefix for Kafka properties used in the sink.
*/
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the source.");
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
public static final ConfigOption<Integer> COUNT_WINDOW_TIME =
ConfigOptions.key("count.window.time")
.intType()
.defaultValue(5)
.withDescription("The aggregate window time");
public static final ConfigOption<Integer> WARTERMARK_MAX_ORDERNESS =
ConfigOptions.key("watermark.max.orderness")
.intType()
.defaultValue(5)
.withDescription("The aggregate watermark max time");
public static final ConfigOption<String> STARTUP_MODE =
ConfigOptions.key("startup.mode")
.stringType()
.defaultValue("group")
.withDescription("The offset commit mode for the consumer.");
public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
ConfigOptions.key("log.failures.only")
.booleanType()
.defaultValue(false)
.withDescription("Defines whether the producer should fail on errors, or only log them.");
public static final ConfigOption<String> MEASUREMENT_NAME =
ConfigOptions.key("measurement.name")
.stringType()
.defaultValue("application_protocol_stat")
.withDescription("The data identification.");
public static final ConfigOption<String> JOB_NAME =
ConfigOptions.key("job.name")
.stringType()
.defaultValue("agg_app_protocol_traffic")
.withDescription("The flink job name.");
}

View File

@@ -0,0 +1,44 @@
package com.zdjizhi.common.config;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
/**
* A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
* properties with a specific prefix. This class allows retrieving properties that start with the
* given `prefix` and converts them into a `java.util.Properties` object.
*
* @author chaoc
* @since 1.0
*/
public class MergeConfiguration {
private final Configuration config;
public MergeConfiguration(final Configuration config) {
this.config = config;
}
/**
* Retrieves properties from the underlying `Configuration` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
public Properties getProperties(final String prefix) {
if (prefix == null) {
final Properties props = new Properties();
props.putAll(config.toMap());
return props;
}
return config.toMap()
.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.collect(Properties::new, (props, e) ->
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
Properties::putAll);
}
}

View File

@@ -7,28 +7,27 @@ package com.zdjizhi.common.pojo;
* @date 2023/4/2311:47 * @date 2023/4/2311:47
*/ */
public class Fields { public class Fields {
private Long sessions; private long sessions;
private Long in_bytes; private long in_bytes;
private Long out_bytes; private long out_bytes;
private Long in_pkts; private long in_pkts;
private Long out_pkts; private long out_pkts;
private Long c2s_pkts; private long c2s_pkts;
private Long s2c_pkts; private long s2c_pkts;
private Long c2s_bytes; private long c2s_bytes;
private Long s2c_bytes; private long s2c_bytes;
private Long c2s_fragments; private long c2s_fragments;
private Long s2c_fragments; private long s2c_fragments;
private Long c2s_tcp_lost_bytes; private long c2s_tcp_lost_bytes;
private Long s2c_tcp_lost_bytes; private long s2c_tcp_lost_bytes;
private Long c2s_tcp_ooorder_pkts; private long c2s_tcp_ooorder_pkts;
private Long s2c_tcp_ooorder_pkts; private long s2c_tcp_ooorder_pkts;
private Long c2s_tcp_retransmitted_pkts; private long c2s_tcp_retransmitted_pkts;
private Long s2c_tcp_retransmitted_pkts; private long s2c_tcp_retransmitted_pkts;
private Long c2s_tcp_retransmitted_bytes; private long c2s_tcp_retransmitted_bytes;
private Long s2c_tcp_retransmitted_bytes; private long s2c_tcp_retransmitted_bytes;
private String client_ip_sketch;
public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, String client_ip_sketch) { public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes) {
this.sessions = sessions; this.sessions = sessions;
this.in_bytes = in_bytes; this.in_bytes = in_bytes;
this.out_bytes = out_bytes; this.out_bytes = out_bytes;
@@ -48,166 +47,158 @@ public class Fields {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts; this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes; this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
this.client_ip_sketch = client_ip_sketch;
} }
public Long getSessions() { public long getSessions() {
return sessions; return sessions;
} }
public void setSessions(Long sessions) { public void setSessions(long sessions) {
this.sessions = sessions; this.sessions = sessions;
} }
public Long getIn_bytes() { public long getIn_bytes() {
return in_bytes; return in_bytes;
} }
public void setIn_bytes(Long in_bytes) { public void setIn_bytes(long in_bytes) {
this.in_bytes = in_bytes; this.in_bytes = in_bytes;
} }
public Long getOut_bytes() { public long getOut_bytes() {
return out_bytes; return out_bytes;
} }
public void setOut_bytes(Long out_bytes) { public void setOut_bytes(long out_bytes) {
this.out_bytes = out_bytes; this.out_bytes = out_bytes;
} }
public Long getIn_pkts() { public long getIn_pkts() {
return in_pkts; return in_pkts;
} }
public void setIn_pkts(Long in_pkts) { public void setIn_pkts(long in_pkts) {
this.in_pkts = in_pkts; this.in_pkts = in_pkts;
} }
public Long getOut_pkts() { public long getOut_pkts() {
return out_pkts; return out_pkts;
} }
public void setOut_pkts(Long out_pkts) { public void setOut_pkts(long out_pkts) {
this.out_pkts = out_pkts; this.out_pkts = out_pkts;
} }
public Long getC2s_pkts() { public long getC2s_pkts() {
return c2s_pkts; return c2s_pkts;
} }
public void setC2s_pkts(Long c2s_pkts) { public void setC2s_pkts(long c2s_pkts) {
this.c2s_pkts = c2s_pkts; this.c2s_pkts = c2s_pkts;
} }
public Long getS2c_pkts() { public long getS2c_pkts() {
return s2c_pkts; return s2c_pkts;
} }
public void setS2c_pkts(Long s2c_pkts) { public void setS2c_pkts(long s2c_pkts) {
this.s2c_pkts = s2c_pkts; this.s2c_pkts = s2c_pkts;
} }
public Long getC2s_bytes() { public long getC2s_bytes() {
return c2s_bytes; return c2s_bytes;
} }
public void setC2s_bytes(Long c2s_bytes) { public void setC2s_bytes(long c2s_bytes) {
this.c2s_bytes = c2s_bytes; this.c2s_bytes = c2s_bytes;
} }
public Long getS2c_bytes() { public long getS2c_bytes() {
return s2c_bytes; return s2c_bytes;
} }
public void setS2c_bytes(Long s2c_bytes) { public void setS2c_bytes(long s2c_bytes) {
this.s2c_bytes = s2c_bytes; this.s2c_bytes = s2c_bytes;
} }
public Long getC2s_fragments() { public long getC2s_fragments() {
return c2s_fragments; return c2s_fragments;
} }
public void setC2s_fragments(Long c2s_fragments) { public void setC2s_fragments(long c2s_fragments) {
this.c2s_fragments = c2s_fragments; this.c2s_fragments = c2s_fragments;
} }
public Long getS2c_fragments() { public long getS2c_fragments() {
return s2c_fragments; return s2c_fragments;
} }
public void setS2c_fragments(Long s2c_fragments) { public void setS2c_fragments(long s2c_fragments) {
this.s2c_fragments = s2c_fragments; this.s2c_fragments = s2c_fragments;
} }
public Long getC2s_tcp_lost_bytes() { public long getC2s_tcp_lost_bytes() {
return c2s_tcp_lost_bytes; return c2s_tcp_lost_bytes;
} }
public void setC2s_tcp_lost_bytes(Long c2s_tcp_lost_bytes) { public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes; this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
} }
public Long getS2c_tcp_lost_bytes() { public long getS2c_tcp_lost_bytes() {
return s2c_tcp_lost_bytes; return s2c_tcp_lost_bytes;
} }
public void setS2c_tcp_lost_bytes(Long s2c_tcp_lost_bytes) { public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes; this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
} }
public Long getC2s_tcp_ooorder_pkts() { public long getC2s_tcp_ooorder_pkts() {
return c2s_tcp_ooorder_pkts; return c2s_tcp_ooorder_pkts;
} }
public void setC2s_tcp_ooorder_pkts(Long c2s_tcp_ooorder_pkts) { public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts; this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
} }
public Long getS2c_tcp_ooorder_pkts() { public long getS2c_tcp_ooorder_pkts() {
return s2c_tcp_ooorder_pkts; return s2c_tcp_ooorder_pkts;
} }
public void setS2c_tcp_ooorder_pkts(Long s2c_tcp_ooorder_pkts) { public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts; this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
} }
public Long getC2s_tcp_retransmitted_pkts() { public long getC2s_tcp_retransmitted_pkts() {
return c2s_tcp_retransmitted_pkts; return c2s_tcp_retransmitted_pkts;
} }
public void setC2s_tcp_retransmitted_pkts(Long c2s_tcp_retransmitted_pkts) { public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts; this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
} }
public Long getS2c_tcp_retransmitted_pkts() { public long getS2c_tcp_retransmitted_pkts() {
return s2c_tcp_retransmitted_pkts; return s2c_tcp_retransmitted_pkts;
} }
public void setS2c_tcp_retransmitted_pkts(Long s2c_tcp_retransmitted_pkts) { public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts; this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
} }
public Long getC2s_tcp_retransmitted_bytes() { public long getC2s_tcp_retransmitted_bytes() {
return c2s_tcp_retransmitted_bytes; return c2s_tcp_retransmitted_bytes;
} }
public void setC2s_tcp_retransmitted_bytes(Long c2s_tcp_retransmitted_bytes) { public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes; this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
} }
public Long getS2c_tcp_retransmitted_bytes() { public long getS2c_tcp_retransmitted_bytes() {
return s2c_tcp_retransmitted_bytes; return s2c_tcp_retransmitted_bytes;
} }
public void setS2c_tcp_retransmitted_bytes(Long s2c_tcp_retransmitted_bytes) { public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
} }
public String getClient_ip_sketch() {
return client_ip_sketch;
}
public void setClient_ip_sketch(String client_ip_sketch) {
this.client_ip_sketch = client_ip_sketch;
}
} }

View File

@@ -10,14 +10,14 @@ public class Metrics {
private String name; private String name;
private Tags tags; private Tags tags;
private Fields fields; private Fields fields;
private long timestamp; private long timestamp_ms;
public Metrics(String name, Tags tags, Fields fields, long timestamp) { public Metrics(String name, Tags tags, Fields fields, long timestamp_ms) {
this.name = name; this.name = name;
this.tags = tags; this.tags = tags;
this.fields = fields; this.fields = fields;
this.timestamp = timestamp; this.timestamp_ms = timestamp_ms;
} }
public String getName() { public String getName() {
@@ -44,11 +44,11 @@ public class Metrics {
this.fields = fields; this.fields = fields;
} }
public long getTimestamp() { public long getTimestamp_ms() {
return timestamp; return timestamp_ms;
} }
public void setTimestamp(long timestamp) { public void setTimestamp_ms(long timestamp_ms) {
this.timestamp = timestamp; this.timestamp_ms = timestamp_ms;
} }
} }

View File

@@ -2,25 +2,31 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.functions.filter.DataTypeFilter;
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
import com.zdjizhi.utils.functions.map.MetricsParseMap;
import com.zdjizhi.utils.functions.map.ResultFlatMap; import com.zdjizhi.utils.functions.map.ResultFlatMap;
import com.zdjizhi.utils.functions.process.ParsingData;
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow; import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
import com.zdjizhi.utils.functions.statistics.MergeCountWindow; import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer; import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import static com.zdjizhi.common.config.MergeConfigs.*;
/** /**
* @author qidaijie * @author qidaijie
@@ -33,35 +39,56 @@ public class ApplicationProtocolTopology {
public static void main(String[] args) { public static void main(String[] args) {
try { try {
// param check
if (args.length < 1) {
throw new IllegalArgumentException("Error: Not found properties path. " +
"\nUsage: flink -c xxx xxx.jar app.properties.");
}
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//解析原始日志 ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) final Configuration config = tool.getConfiguration();
.setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); environment.getConfig().setGlobalJobParameters(config);
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
SingleOutputStreamOperator<String> appProtocolFilter = streamSource.filter(new DataTypeFilter())
.name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM);
SingleOutputStreamOperator<Tuple2<Tags, Fields>> parseDataMap = appProtocolFilter.map(new MetricsParseMap()) //水印
.name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM); WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
.<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
.withTimestampAssigner((element, timestamp) -> element.f2);
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy()) //数据源
.window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) DataStream<String> streamSource = environment.addSource(
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
config.get(SOURCE_KAFKA_TOPIC),
config.get(STARTUP_MODE)));
//解析数据
SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
.assignTimestampsAndWatermarks(strategyForSession)
.name("ParseDataProcess");
//增量聚合窗口
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
.reduce(new DispersionCountWindow(), new MergeCountWindow()) .reduce(new DispersionCountWindow(), new MergeCountWindow())
.name("DispersionCountWindow") .name("DispersionCountWindow");
.setParallelism(GlobalConfig.WINDOW_PARALLELISM);
//拆分数据
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
.name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); .name("ResultFlatMap");
//输出
resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
config.get(SINK_KAFKA_TOPIC),
config.get(LOG_FAILURES_ONLY)));
resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) environment.execute(config.get(JOB_NAME));
.setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
environment.execute(args[0]);
} catch (Exception e) { } catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e); logger.error("This Flink task start ERROR! Exception information is :");
e.printStackTrace();
} }
} }

View File

@@ -1,36 +0,0 @@
package com.zdjizhi.utils.functions.filter;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions.filter
* @Description:
* @date 2023/4/1919:02
*/
public class DataTypeFilter implements FilterFunction<String> {
private static final Log logger = LogFactory.get();
private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
@Override
public boolean filter(String message) throws Exception {
boolean protocolData = false;
try {
if (StringUtil.isNotBlank(message)) {
Object name = JSONPath.eval(message, dataTypeExpr);
if (name != null) {
protocolData = true;
}
}
} catch (RuntimeException e) {
logger.error("Parsing metric data is abnormal! The exception message is:" + e.getMessage());
}
return protocolData;
}
}

View File

@@ -4,6 +4,9 @@ import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.sql.Timestamp;
/** /**
* @author qidaijie * @author qidaijie
@@ -11,10 +14,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description: * @Description:
* @date 2021/7/2112:13 * @date 2021/7/2112:13
*/ */
public class DimensionKeyBy implements KeySelector<Tuple2<Tags, Fields>, String> { public class DimensionKeyBy implements KeySelector<Tuple3<Tags, Fields, Long>, String> {
@Override @Override
public String getKey(Tuple2<Tags, Fields> value) throws Exception { public String getKey(Tuple3<Tags, Fields, Long> value) throws Exception {
//以map拼接的key分组 //以map拼接的key分组
return value.f0.toString(); return value.f0.toString();
} }

View File

@@ -1,46 +0,0 @@
package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class MetricsParseMap implements MapFunction<String, Tuple2<Tags, Fields>> {
private static final Log logger = LogFactory.get();
@Override
public Tuple2<Tags, Fields> map(String message) {
try {
JSONObject originalLog = JSON.parseObject(message);
Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
String appFullPath = tags.getApp_name();
if (StringUtil.isNotBlank(appFullPath)) {
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
String protocolLabel = tags.getProtocol_stack_id();
tags.setApp_name(appName);
tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
}
return new Tuple2<>(tags, fields);
} catch (RuntimeException e) {
logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
return new Tuple2<>(null, null);
}
}
}

View File

@@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
@@ -20,9 +17,14 @@ import org.apache.flink.util.Collector;
*/ */
public class ResultFlatMap implements FlatMapFunction<Metrics, String> { public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
/**
* 协议分隔符,需要转义
*/
private static final String PROTOCOL_SPLITTER = "\\.";
@Override @Override
public void flatMap(Metrics metrics, Collector<String> out) throws Exception { public void flatMap(Metrics metrics, Collector<String> out) {
try { try {
Tags tags = metrics.getTags(); Tags tags = metrics.getTags();
String protocolStackId = tags.getProtocol_stack_id(); String protocolStackId = tags.getProtocol_stack_id();
@@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
tags.setApp_name(null); tags.setApp_name(null);
StringBuilder stringBuilder = new StringBuilder(); StringBuilder stringBuilder = new StringBuilder();
String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER); String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length; int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) { for (int i = 0; i < protocolIdsNum - 1; i++) {
if (StringUtil.isBlank(stringBuilder.toString())) { if (StringUtil.isBlank(stringBuilder.toString())) {

View File

@@ -0,0 +1,47 @@
package com.zdjizhi.utils.functions.process;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Long>> {
private static final Log logger = LogFactory.get();
private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
@Override
public void processElement(String value, ProcessFunction<String, Tuple3<Tags, Fields, Long>>.Context ctx, Collector<Tuple3<Tags, Fields, Long>> out) {
try {
if (StringUtil.isNotBlank(value)) {
Object isProtocolData = JSONPath.eval(value, dataTypeExpr);
if (isProtocolData != null) {
JSONObject originalLog = JSON.parseObject(value);
Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
Long timestamp_ms = originalLog.getLong("timestamp_ms");
String appFullPath = tags.getApp_name();
if (StringUtil.isNotBlank(appFullPath)) {
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
String protocolLabel = tags.getProtocol_stack_id();
tags.setApp_name(appName);
tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
}
out.collect(new Tuple3<>(tags, fields, timestamp_ms));
}
}
} catch (RuntimeException e) {
logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage());
}
}
}

View File

@@ -3,10 +3,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.general.MetricUtil; import com.zdjizhi.utils.general.MetricUtil;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/** /**
* @author qidaijie * @author qidaijie
@@ -14,21 +16,24 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description: * @Description:
* @date 2023/4/2314:02 * @date 2023/4/2314:02
*/ */
public class DispersionCountWindow implements ReduceFunction<Tuple2<Tags, Fields>> { public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
@Override @Override
public Tuple2<Tags, Fields> reduce(Tuple2<Tags, Fields> value1, Tuple2<Tags, Fields> value2) throws Exception { public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) {
try { try {
Fields cacheData = value1.f1; Fields cacheData = value1.f1;
Fields newData = value2.f1; Fields newData = value2.f1;
Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData); Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData);
return new Tuple2<>(value1.f0, metricsResult); return new Tuple3<>(value1.f0, metricsResult, value1.f2);
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage()); logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage());
return value1; return value1;
} }
} }
} }

View File

@@ -2,11 +2,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
@@ -17,19 +18,33 @@ import org.apache.flink.util.Collector;
* @Description: * @Description:
* @date 2023/4/2314:43 * @date 2023/4/2314:43
*/ */
public class MergeCountWindow extends ProcessWindowFunction<Tuple2<Tags, Fields>, Metrics, String, TimeWindow> { public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields, Long>, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private String NAME = null;
@Override @Override
public void process(String windowKey, Context context, Iterable<Tuple2<Tags, Fields>> input, Collector<Metrics> output) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration configuration = (Configuration) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
}
@Override
public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields, Long>> input, Collector<Metrics> output) {
try { try {
Long endTime = context.window().getStart() / 1000; long timestamp_ms = context.window().getStart();
for (Tuple2<Tags, Fields> tuple : input) { for (Tuple3<Tags, Fields, Long> tuple : input) {
Tags tags = tuple.f0; Tags tags = tuple.f0;
Fields fields = tuple.f1; Fields fields = tuple.f1;
Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, endTime); Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms);
output.collect(metrics); output.collect(metrics);
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage()); logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage());
} }

View File

@@ -2,13 +2,7 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.utils.StringUtil;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import java.util.Base64;
/** /**
@@ -19,8 +13,6 @@ import java.util.Base64;
*/ */
public class MetricUtil { public class MetricUtil {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private static final String METRICS_DEFAULT_TYPE = "agent";
/** /**
* 用于对业务指标进行统计 * 用于对业务指标进行统计
@@ -30,106 +22,66 @@ public class MetricUtil {
*/ */
public static Fields statisticsMetrics(Fields cacheData, Fields newData) { public static Fields statisticsMetrics(Fields cacheData, Fields newData) {
Long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions()); long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions());
Long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes()); long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes());
Long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes()); long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes());
Long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts()); long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts());
Long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts()); long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts());
Long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes()); long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes());
Long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes()); long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes());
Long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts()); long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts());
Long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts()); long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts());
Long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments()); long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments());
Long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments()); long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments());
Long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes()); long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes());
Long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes()); long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes());
Long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts()); long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts());
Long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts()); long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts());
Long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts()); long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts());
Long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts()); long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts());
Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) { return new Fields(sessions,
String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); inBytes, outBytes, inPkts, outPkts,
return new Fields(sessions, c2sPkts, s2cPkts, c2sBytes, s2cBytes,
inBytes, outBytes, inPkts, outPkts, c2sFragments, s2cFragments,
c2sPkts, s2cPkts, c2sBytes, s2cBytes, c2sTcpLostBytes, s2cTcpLostBytes,
c2sFragments, s2cFragments, c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpLostBytes, s2cTcpLostBytes, c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpooorderPkts, s2cTcpooorderPkts, c2sTcpretransmittedBytes, s2cTcpretransmittedBytes);
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
clientIpSketch);
} else {
return new Fields(sessions,
inBytes, outBytes, inPkts, outPkts,
c2sPkts, s2cPkts, c2sBytes, s2cBytes,
c2sFragments, s2cFragments,
c2sTcpLostBytes, s2cTcpLostBytes,
c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
null);
}
} }
/** /**
* Long类型的数据求和 * Long类型的数据求和
* *
* @param value1 第一个 * @param cacheData 缓存中的
* @param value2 第二个 * @param newData 新来数据的
* @return value1 + value2 * @return cacheData + newData
*/ */
private static Long longSum(Long value1, Long value2) { private static long longSum(long cacheData, long newData) {
Long result;
long result;
try { try {
if (value1 >= 0 && value2 >= 0) { if (cacheData >= 0 && newData >= 0) {
result = value1 + value2; result = cacheData + newData;
} else { } else {
result = value1; result = cacheData;
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Abnormal sending of traffic indicator statistics! The message is:" + e.getMessage()); logger.error("Abnormal sending of traffic indicator statistics! The message is:{}", e);
result = value1; result = cacheData;
} }
return result; return result;
} }
/**
* @param cacheHll 缓存的sketch
* @param newHll 聚合后的sketch
* @return 合并后的sketch
*/
private static String hllSketchUnion(String cacheHll, String newHll) {
Union union = new Union(12);
try {
if (StringUtil.isNotBlank(cacheHll)) {
byte[] cacheHllBytes = Base64.getDecoder().decode(cacheHll);
HllSketch cacheSketch = HllSketch.heapify(cacheHllBytes);
union.update(cacheSketch);
}
if (StringUtil.isNotBlank(newHll)) {
byte[] newHllBytes = Base64.getDecoder().decode(newHll);
HllSketch newSketch = HllSketch.heapify(newHllBytes);
union.update(newSketch);
}
return Base64.getEncoder().encodeToString(union.getResult().toUpdatableByteArray());
} catch (RuntimeException e) {
logger.error("Merge hllSketch results abnormal! The message is:" + e.getMessage());
return null;
}
}
} }

View File

@@ -1,48 +0,0 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/9/610:37
*/
class CertUtils {
/**
* Kafka SASL认证端口
*/
private static final String SASL_PORT = "9094";
/**
* Kafka SSL认证端口
*/
private static final String SSL_PORT = "9095";
/**
* 根据连接信息端口判断认证方式。
*
* @param servers kafka 连接信息
* @param properties kafka 连接配置信息
*/
static void chooseCert(String servers, Properties properties) {
if (servers.contains(SASL_PORT)) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
}
}
}

View File

@@ -1,8 +1,8 @@
package com.zdjizhi.utils.kafka; package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties; import java.util.Properties;
@@ -13,35 +13,40 @@ import java.util.Properties;
* @date 2021/6/813:54 * @date 2021/6/813:54
*/ */
public class KafkaConsumer { public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", GlobalConfig.GROUP_ID);
properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
/** /**
* 官方序列化kafka数据 * 官方序列化kafka数据
* *
* @return kafka logs * @return kafka logs
*/ */
public static FlinkKafkaConsumer<String> getKafkaConsumer() { public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
//随着checkpoint提交将offset提交到kafka setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true); setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
//从消费组当前的offset开始消费 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
kafkaConsumer.setStartFromGroupOffsets();
switch (startupMode) {
case "group":
kafkaConsumer.setStartFromGroupOffsets();
break;
case "latest":
kafkaConsumer.setStartFromLatest();
break;
case "earliest":
kafkaConsumer.setStartFromEarliest();
break;
default:
kafkaConsumer.setStartFromGroupOffsets();
}
return kafkaConsumer; return kafkaConsumer;
} }
private static void setDefaultConfig(Properties properties, String key, Object value) {
if (!properties.contains(key)) {
properties.put(key, value);
}
}
} }

View File

@@ -1,9 +1,7 @@
package com.zdjizhi.utils.kafka; package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
@@ -16,33 +14,29 @@ import java.util.Properties;
*/ */
public class KafkaProducer { public class KafkaProducer {
private static Properties createProducerConfig() { public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
Properties properties = new Properties(); setDefaultConfig(properties, "ack", 1);
properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS); setDefaultConfig(properties, "retries", 0);
properties.put("acks", GlobalConfig.PRODUCER_ACK); setDefaultConfig(properties, "linger.ms", 10);
properties.put("retries", GlobalConfig.RETRIES); setDefaultConfig(properties, "request.timeout.ms", 30000);
properties.put("linger.ms", GlobalConfig.LINGER_MS); setDefaultConfig(properties, "batch.size", 262144);
properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS); setDefaultConfig(properties, "buffer.memory", 134217728);
properties.put("batch.size", GlobalConfig.BATCH_SIZE); setDefaultConfig(properties, "max.request.size", 10485760);
properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY); setDefaultConfig(properties, "compression.type", "snappy");
properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties); FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
return properties;
}
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
GlobalConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(), new SimpleStringSchema(),
createProducerConfig(), Optional.empty()); properties, Optional.empty());
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 kafkaProducer.setLogFailuresOnly(logFailuresOnly);
kafkaProducer.setLogFailuresOnly(true);
return kafkaProducer; return kafkaProducer;
} }
private static void setDefaultConfig(Properties properties, String key, Object value) {
if (!properties.contains(key)) {
properties.put(key, value);
}
}
} }

View File

@@ -1,14 +1,14 @@
#Log4j #Log4j
log4j.rootLogger=error,console,file log4j.rootLogger=info,console,file
# 控制台日志设置 # 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=error log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置 # 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=error log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8 log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下 #路径请用相对路径,做好相关测试输出到应用目下
@@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n #log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包 #MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=error log4j.logger.com.nis.web.dao=info
#bonecp数据源配置 #bonecp数据源配置
log4j.category.com.jolbox=error,console log4j.category.com.jolbox=info,console

View File

@@ -0,0 +1,58 @@
package com.zdjizhi;
import com.zdjizhi.conf.FusionConfiguration;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import static com.zdjizhi.conf.FusionConfigs.*;
public class ConfigTest {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool tool;
try {
tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties");
final Configuration config = tool.getConfiguration();
environment.getConfig().setGlobalJobParameters(config);
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
System.out.println(config.get(SOURCE_KAFKA_TOPIC));
System.out.println(config.get(SINK_KAFKA_TOPIC));
System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC),
new SimpleStringSchema(),
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStreamSource<String> sourceStream = environment.addSource(kafkaConsumer);
sourceStream.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) {
out.collect(value);
}
}).print();
environment.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@@ -1,6 +1,5 @@
package com.zdjizhi; package com.zdjizhi;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
import org.junit.Test; import org.junit.Test;
@@ -22,7 +21,7 @@ public class ConventionalTest {
System.out.println(protocol); System.out.println(protocol);
StringBuffer stringBuffer = new StringBuffer(); StringBuffer stringBuffer = new StringBuffer();
String appName = "qq_r2"; String appName = "qq_r2";
String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER); String[] protocolIds = protocol.split("\\.");
for (String proto : protocolIds) { for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) { if (StringUtil.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto); stringBuffer.append(proto);

View File

@@ -1,281 +0,0 @@
package com.zdjizhi;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson2.*;
import com.zdjizhi.utils.JsonMapper;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import java.lang.instrument.Instrumentation;
import java.util.*;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2023/3/217:17
*/
public class DatasketchesTest {
@Test
public void HllSketchTest() {
HashSet<String> strings = new HashSet<>();
HllSketch sketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = "192.168.1." + i;
sketch.update(ip);
strings.add(ip);
}
System.out.println(sketch.getEstimate() + "--" + strings.size());
HashSet<String> randomStrings = new HashSet<>();
HllSketch randomSketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = makeIPv4Random();
randomSketch.update(ip);
randomStrings.add(ip);
}
System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size());
}
@Test
public void HllSketchUnionTest() {
HashSet<String> strings = new HashSet<>();
HllSketch sketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = "192.168.1." + i;
sketch.update(ip);
strings.add(ip);
}
HllSketch sketch2 = new HllSketch(12);
for (int i = 0; i < 10; i++) {
String ip = "192.168.2." + i;
sketch2.update(ip);
strings.add(ip);
}
Union union = new Union(12);
union.update(sketch);
union.update(sketch2);
HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray());
System.out.println(sketch.getEstimate() + "--" + strings.size());
System.out.println(sketch2.getEstimate() + "--" + strings.size());
System.out.println(sketch_result.getEstimate() + "--" + strings.size());
}
@Test
public void HllSketchDruidTest() {
HashMap<String, Object> dataMap = new HashMap<>();
HashSet<String> strings = new HashSet<>();
HllSketch sketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = "192.168.1." + i;
sketch.update(ip);
strings.add(ip);
}
HllSketch sketch2 = new HllSketch(12);
for (int i = 0; i < 10; i++) {
String ip = "192.168.2." + i;
sketch2.update(ip);
strings.add(ip);
}
Union union = new Union(12);
union.update(sketch);
union.update(sketch2);
HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray());
HllSketch sketch3 = new HllSketch(12);
for (int i = 0; i < 10; i++) {
String ip = "192.168.3." + i;
sketch3.update(ip);
strings.add(ip);
}
Union union2 = new Union(12);
union2.update(sketch_result1);
union2.update(sketch3);
HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray());
System.out.println(sketch.getEstimate() + "--" + strings.size());
System.out.println(sketch2.getEstimate() + "--" + strings.size());
System.out.println(sketch3.getEstimate() + "--" + strings.size());
System.out.println(sketch_result1.getEstimate() + "--" + strings.size());
System.out.println(sketch_result2.getEstimate() + "--" + strings.size());
Result result = new Result();
result.setC2s_pkt_num(10);
result.setS2c_pkt_num(10);
result.setC2s_byte_num(10);
result.setS2c_byte_num(10);
result.setStat_time(1679970031);
result.setSchema_type("HLLSketchMergeTest");
//CompactByte
result.setIp_object(sketch_result2.toCompactByteArray());
// System.out.println(result.toString());
//sendMessage(JsonMapper.toJsonString(result);
//UpdatableByte
result.setIp_object(sketch_result2.toUpdatableByteArray());
// System.out.println(result.toString());
//sendMessage(JsonMapper.toJsonString(result);
//Hashmap
dataMap.put("app_name", "TEST");
dataMap.put("protocol_stack_id", "HTTP");
dataMap.put("vsys_id", 1);
dataMap.put("stat_time", 1681370100);
dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
dataMap.put("client_ip_sketch", Base64.getEncoder().encode(sketch_result2.toUpdatableByteArray()));
System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
System.out.println(JSONUtil.toJsonStr(dataMap));
// sendMessage(JSONObject.toJSONString(dataMap));
}
@Test
public void HllSketchStorageTest() {
TgtHllType hllType = TgtHllType.HLL_4;
// TgtHllType hllType = TgtHllType.HLL_6;
// TgtHllType hllType = TgtHllType.HLL_8;
HllSketch sketch4 = new HllSketch(4,hllType);
HllSketch sketch8 = new HllSketch(8,hllType);
HllSketch sketch12 = new HllSketch(12,hllType);
HllSketch sketch16 = new HllSketch(16,hllType);
HllSketch sketch21 = new HllSketch(21,hllType);
HashSet<String> IPSet = new HashSet<>();
for (int i = 0; i < 500000; i++) {
String ip = makeIPv4Random();
IPSet.add(ip);
sketch4.update(ip);
sketch8.update(ip);
sketch12.update(ip);
sketch16.update(ip);
sketch21.update(ip);
}
System.out.println(IPSet.size());
System.out.println(sketch4.toString());
System.out.println(sketch8.toString());
System.out.println(sketch12.toString());
System.out.println(sketch16.toString());
System.out.println(sketch21.toString());
}
//随机生成ip
private static String makeIPv4Random() {
int v4_1 = new Random().nextInt(255) + 1;
int v4_2 = new Random().nextInt(100);
int v4_3 = new Random().nextInt(100);
int v4_4 = new Random().nextInt(255);
return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
}
private static void sendMessage(Object message) {
Properties props = new Properties();
//kafka地址
props.put("bootstrap.servers", "192.168.44.12:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("buffer.memory", 67108864);
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props);
kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message));
kafkaProducer.close();
}
}
class Result {
private String schema_type;
private long c2s_byte_num;
private long c2s_pkt_num;
private long s2c_byte_num;
private long s2c_pkt_num;
private long stat_time;
private byte[] ip_object;
public void setSchema_type(String schema_type) {
this.schema_type = schema_type;
}
public void setC2s_byte_num(long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public void setC2s_pkt_num(long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public void setS2c_byte_num(long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public void setS2c_pkt_num(long s2c_pkt_num) {
this.s2c_pkt_num = s2c_pkt_num;
}
public void setStat_time(long stat_time) {
this.stat_time = stat_time;
}
public void setIp_object(byte[] ip_object) {
this.ip_object = ip_object;
}
@Override
public String toString() {
return "Result{" +
"schema_type='" + schema_type + '\'' +
", c2s_byte_num=" + c2s_byte_num +
", c2s_pkt_num=" + c2s_pkt_num +
", s2c_byte_num=" + s2c_byte_num +
", s2c_pkt_num=" + s2c_pkt_num +
", stat_time=" + stat_time +
", ip_object=" + Arrays.toString(ip_object) +
'}';
}
}

View File

@@ -32,26 +32,25 @@ public class FlagsTest {
@Test @Test
public void bitwiseAND() { public void bitwiseAND() {
Long common_flags = 8200L; Long flags = 24712L;
Long clientIsLocal = 8L; Long clientIsLocal = 8L;
Long serverIsLocal = 16L; Long serverIsLocal = 16L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("flags & clientIsLocal = " + (flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); System.out.println("flags & serverIsLocal = " + (flags & serverIsLocal)+"\n\n");
common_flags = 16400L; flags = 16400L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("flags & clientIsLocal = " + (flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)); System.out.println("flags & serverIsLocal = " + (flags & serverIsLocal)+"\n\n");
flags = 24712L;
System.out.println("flags & c2s = " + (flags & 8192));
System.out.println("flags & s2c = " + (flags & 16384));
System.out.println("flags & Bidirectional = " + (flags & 32768));
if ((0L & clientIsLocal) == 0L){
System.out.println("yes");
}else {
System.out.println("no");
}
} }
} }

View File

@@ -0,0 +1,34 @@
package com.zdjizhi.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
public class FusionConfigs {
/**
* The prefix for Kafka properties used in the source.
*/
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
/**
* The prefix for Kafka properties used in the sink.
*/
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
/**
* Configuration option for the Kafka topic used in the source.
*/
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the source.");
/**
* Configuration option for the Kafka topic used in the sink.
*/
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
}

View File

@@ -0,0 +1,36 @@
package com.zdjizhi.conf;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
public class FusionConfiguration {
private final Configuration config;
public FusionConfiguration(final Configuration config) {
this.config = config;
}
/**
* Retrieves properties from the underlying `Configuration` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
public Properties getProperties(final String prefix) {
if (prefix == null) {
final Properties props = new Properties();
props.putAll(config.toMap());
return props;
}
return config.toMap()
.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.collect(Properties::new, (props, e) ->
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
Properties::putAll);
}
}