提交Live Traffic Chart重构后初版代码。(TSG-14799)

This commit is contained in:
qidaijie
2023-05-06 15:08:21 +08:00
parent dbb6481635
commit ef57dda773
27 changed files with 2158 additions and 88 deletions

106
README.md
View File

@@ -1,92 +1,22 @@
# app-protocol-stat-traffic-merge # app-protocol-stat-traffic-merge
Live Traffic Chart统计程序基于协议栈拆分多流聚合存储到协议与应用统计表中使用增量窗口计算周期15秒。
##数据源
以下不论基于哪种计算Topic均为NETWORK-TRAFFIC-METRICS
###1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics聚合粒度为1秒。
###2.功能端进行统计产生的Application and Protocol Metrics数据聚合粒度为1秒。
## Getting started ##统计操作
###1.过滤name是traffic_application_protocol_stat的数据。
To make it easy for you to get started with GitLab, here's a list of recommended next steps. ###2.基于Tags内容进行分组统计。
###3.拆分protocol_stack_id协议树为多个节点
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)! ####例如ETHERNET.IPv4.TCP.https.kingsoft.wps_office每个节点ID为
#####ETHERNET
## Add your files #####ETHERNET.IPv4
#####ETHERNET.IPv4.TCP
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files #####ETHERNET.IPv4.TCP.https
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command: #####ETHERNET.IPv4.TCP.https.kingsoft
#####ETHERNET.IPv4.TCP.https.kingsoft.wps_office
``` ###4.app_name仅在终端节点输出。
cd existing_repo ###5.输出结果时Measurement Name=application_protocol_stat。
git remote add origin https://git.mesalab.cn/bigdata/tsg/flink/app-protocol-stat-traffic-merge.git
git branch -M main
git push -uf origin main
```
## Integrate with your tools
- [ ] [Set up project integrations](https://git.mesalab.cn/bigdata/tsg/flink/app-protocol-stat-traffic-merge/-/settings/integrations)
## Collaborate with your team
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
- [ ] [Automatically merge when pipeline succeeds](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
## Test and Deploy
Use the built-in continuous integration in GitLab.
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html)
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing(SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
***
# Editing this README
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thank you to [makeareadme.com](https://www.makeareadme.com/) for this template.
## Suggestions for a good README
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
## Name
Choose a self-explaining name for your project.
## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
## Badges
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
## Visuals
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
## Support
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
## Roadmap
If you have ideas for releases in the future, it is a good idea to list them in the README.
## Contributing
State if you are open to contributions and what your requirements are for accepting them.
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
## Authors and acknowledgment
Show your appreciation to those who have contributed to the project.
## License
For open source projects, say how it is licensed.
## Project status
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.

244
pom.xml Normal file
View File

@@ -0,0 +1,244 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId>
<version>230506</version>
<name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<!--<enabled>true</enabled>-->
</releases>
<snapshots>
<!--<enabled>true</enabled>-->
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
<zdjz.tools.version>1.0.8</zdjz.tools.version>
<fastjson.version>2.0.26</fastjson.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zdjizhi.topology.ApplicationProtocolTopology</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src\main\java</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<dependencies>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>${zdjz.tools.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>${scope.type}</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>
<version>1.9.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,51 @@
#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
#kafka source poll
max.poll.records=5000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#====================Kafka KafkaProducer====================#
#producer重试的次数设置
retries=0
#他的含义就是说一个Batch被创建之后最多过多久不管这个Batch有没有写满都必须发送出去了
linger.ms=10
#如果在超时之前未收到响应,客户端将在必要时重新发送请求
request.timeout.ms=30000
#producer都是按照batch进行发送的,批次大小,默认:16384
batch.size=262144
#Producer端用于缓存消息的缓冲区大小
#128M
buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
#10M
max.request.size=10485760
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
#生产者ack
producer.ack=1
#====================kafka default====================#
#kafka SASL验证用户名-加密
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASL及SSL验证密码-加密
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#====================Topology Default====================#
#两个输出之间的最大时间(单位milliseconds)
buffer.timeout=100
#第一次随机分组random范围
random.range.num=20

View File

@@ -0,0 +1,38 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
sink.kafka.servers=192.168.44.12:9094
#--------------------------------HTTP------------------------------#
#kafka 证书地址
tools.library=D:\\workerspace\\dat
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
source.kafka.topic=test
#补全数据 输出 topic
sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=livecharts-test-20230423-1
#--------------------------------topology配置------------------------------#
#consumer 并行度
source.parallelism=3
#map函数并行度
parse.parallelism=3
#第一次窗口计算并行度
window.parallelism=3
#producer 并行度
sink.parallelism=3
#初次随机预聚合窗口时间
count.window.time=15

View File

@@ -0,0 +1,71 @@
package com.zdjizhi.common.config;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
public class GlobalConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
/**
* 协议分隔符,需要转义
*/
public static final String PROTOCOL_SPLITTER = "\\.";
/**
* System
*/
public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism");
public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
/**
* Kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin"));
/**
* kafka sink config
*/
public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers");
public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack");
public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries");
public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size");
/**
* kafka source config
*/
public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers");
public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka限流配置-20201117
*/
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type");
}

View File

@@ -0,0 +1,70 @@
package com.zdjizhi.common.config;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
/**
* @author Administrator
*/
public final class GlobalConfigLoad {
private static Properties propKafka = new Properties();
private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
} else if (type == 1) {
return propKafka.getProperty(key);
} else {
return null;
}
}
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
} else if (type == 1) {
return Integer.parseInt(propKafka.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
} else if (type == 1) {
return Long.parseLong(propKafka.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) {
return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
}
static {
try {
propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propKafka.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) {
propKafka = null;
propService = null;
}
}
}

View File

@@ -0,0 +1,257 @@
package com.zdjizhi.common.pojo;
import com.alibaba.fastjson.annotation.JSONField;
/**
* @author qidaijie
* @Package com.zdjizhi.common.pojo
* @Description:
* @date 2023/4/2116:06
*/
public class AppProtocol {
private Long timestamp;
private int vsys_id;
private String device_id;
private String device_group;
private String data_center;
private String protocol_stack_id;
private String app_name;
private Long sessions;
private Long in_bytes;
private Long out_bytes;
private Long in_pkts;
private Long out_pkts;
private Long c2s_pkts;
private Long s2c_pkts;
private Long c2s_bytes;
private Long s2c_bytes;
private Long c2s_fragments;
private Long s2c_fragments;
private Long c2s_tcp_lost_bytes;
private Long s2c_tcp_lost_bytes;
private Long c2s_tcp_ooorder_pkts;
private Long s2c_tcp_ooorder_pkts;
private Long c2s_tcp_retransmitted_pkts;
private Long s2c_tcp_retransmitted_pkts;
private Long c2s_tcp_retransmitted_bytes;
private Long s2c_tcp_retransmitted_bytes;
private String client_ip_sketch;
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public String getProtocol_stack_id() {
return protocol_stack_id;
}
@JSONField(name = "protocol_label")
public void setProtocol_stack_id(String protocol_stack_id) {
this.protocol_stack_id = protocol_stack_id;
}
public String getApp_name() {
return app_name;
}
@JSONField(name = "app_full_path")
public void setApp_name(String app_name) {
this.app_name = app_name;
}
public Long getSessions() {
return sessions;
}
public void setSessions(Long sessions) {
this.sessions = sessions;
}
public Long getIn_bytes() {
return in_bytes;
}
public void setIn_bytes(Long in_bytes) {
this.in_bytes = in_bytes;
}
public Long getOut_bytes() {
return out_bytes;
}
public void setOut_bytes(Long out_bytes) {
this.out_bytes = out_bytes;
}
public Long getIn_pkts() {
return in_pkts;
}
public void setIn_pkts(Long in_pkts) {
this.in_pkts = in_pkts;
}
public Long getOut_pkts() {
return out_pkts;
}
public void setOut_pkts(Long out_pkts) {
this.out_pkts = out_pkts;
}
public Long getC2s_pkts() {
return c2s_pkts;
}
public void setC2s_pkts(Long c2s_pkts) {
this.c2s_pkts = c2s_pkts;
}
public Long getS2c_pkts() {
return s2c_pkts;
}
public void setS2c_pkts(Long s2c_pkts) {
this.s2c_pkts = s2c_pkts;
}
public Long getC2s_bytes() {
return c2s_bytes;
}
public void setC2s_bytes(Long c2s_bytes) {
this.c2s_bytes = c2s_bytes;
}
public Long getS2c_bytes() {
return s2c_bytes;
}
public void setS2c_bytes(Long s2c_bytes) {
this.s2c_bytes = s2c_bytes;
}
public Long getC2s_fragments() {
return c2s_fragments;
}
public void setC2s_fragments(Long c2s_fragments) {
this.c2s_fragments = c2s_fragments;
}
public Long getS2c_fragments() {
return s2c_fragments;
}
public void setS2c_fragments(Long s2c_fragments) {
this.s2c_fragments = s2c_fragments;
}
public Long getC2s_tcp_lost_bytes() {
return c2s_tcp_lost_bytes;
}
public void setC2s_tcp_lost_bytes(Long c2s_tcp_lost_bytes) {
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
}
public Long getS2c_tcp_lost_bytes() {
return s2c_tcp_lost_bytes;
}
public void setS2c_tcp_lost_bytes(Long s2c_tcp_lost_bytes) {
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
}
public Long getC2s_tcp_ooorder_pkts() {
return c2s_tcp_ooorder_pkts;
}
public void setC2s_tcp_ooorder_pkts(Long c2s_tcp_ooorder_pkts) {
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
}
public Long getS2c_tcp_ooorder_pkts() {
return s2c_tcp_ooorder_pkts;
}
public void setS2c_tcp_ooorder_pkts(Long s2c_tcp_ooorder_pkts) {
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
}
public Long getC2s_tcp_retransmitted_pkts() {
return c2s_tcp_retransmitted_pkts;
}
public void setC2s_tcp_retransmitted_pkts(Long c2s_tcp_retransmitted_pkts) {
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
}
public Long getS2c_tcp_retransmitted_pkts() {
return s2c_tcp_retransmitted_pkts;
}
public void setS2c_tcp_retransmitted_pkts(Long s2c_tcp_retransmitted_pkts) {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
}
public Long getC2s_tcp_retransmitted_bytes() {
return c2s_tcp_retransmitted_bytes;
}
public void setC2s_tcp_retransmitted_bytes(Long c2s_tcp_retransmitted_bytes) {
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
}
public Long getS2c_tcp_retransmitted_bytes() {
return s2c_tcp_retransmitted_bytes;
}
public void setS2c_tcp_retransmitted_bytes(Long s2c_tcp_retransmitted_bytes) {
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
}
public String getClient_ip_sketch() {
return client_ip_sketch;
}
public void setClient_ip_sketch(String client_ip_sketch) {
this.client_ip_sketch = client_ip_sketch;
}
}

View File

@@ -0,0 +1,213 @@
package com.zdjizhi.common.pojo;
/**
* @author qidaijie
* @Package com.zdjizhi.common.pojo
* @Description:
* @date 2023/4/2311:47
*/
public class Fields {
private Long sessions;
private Long in_bytes;
private Long out_bytes;
private Long in_pkts;
private Long out_pkts;
private Long c2s_pkts;
private Long s2c_pkts;
private Long c2s_bytes;
private Long s2c_bytes;
private Long c2s_fragments;
private Long s2c_fragments;
private Long c2s_tcp_lost_bytes;
private Long s2c_tcp_lost_bytes;
private Long c2s_tcp_ooorder_pkts;
private Long s2c_tcp_ooorder_pkts;
private Long c2s_tcp_retransmitted_pkts;
private Long s2c_tcp_retransmitted_pkts;
private Long c2s_tcp_retransmitted_bytes;
private Long s2c_tcp_retransmitted_bytes;
private byte[] client_ip_sketch;
public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, byte[] client_ip_sketch) {
this.sessions = sessions;
this.in_bytes = in_bytes;
this.out_bytes = out_bytes;
this.in_pkts = in_pkts;
this.out_pkts = out_pkts;
this.c2s_pkts = c2s_pkts;
this.s2c_pkts = s2c_pkts;
this.c2s_bytes = c2s_bytes;
this.s2c_bytes = s2c_bytes;
this.c2s_fragments = c2s_fragments;
this.s2c_fragments = s2c_fragments;
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
this.client_ip_sketch = client_ip_sketch;
}
public Long getSessions() {
return sessions;
}
public void setSessions(Long sessions) {
this.sessions = sessions;
}
public Long getIn_bytes() {
return in_bytes;
}
public void setIn_bytes(Long in_bytes) {
this.in_bytes = in_bytes;
}
public Long getOut_bytes() {
return out_bytes;
}
public void setOut_bytes(Long out_bytes) {
this.out_bytes = out_bytes;
}
public Long getIn_pkts() {
return in_pkts;
}
public void setIn_pkts(Long in_pkts) {
this.in_pkts = in_pkts;
}
public Long getOut_pkts() {
return out_pkts;
}
public void setOut_pkts(Long out_pkts) {
this.out_pkts = out_pkts;
}
public Long getC2s_pkts() {
return c2s_pkts;
}
public void setC2s_pkts(Long c2s_pkts) {
this.c2s_pkts = c2s_pkts;
}
public Long getS2c_pkts() {
return s2c_pkts;
}
public void setS2c_pkts(Long s2c_pkts) {
this.s2c_pkts = s2c_pkts;
}
public Long getC2s_bytes() {
return c2s_bytes;
}
public void setC2s_bytes(Long c2s_bytes) {
this.c2s_bytes = c2s_bytes;
}
public Long getS2c_bytes() {
return s2c_bytes;
}
public void setS2c_bytes(Long s2c_bytes) {
this.s2c_bytes = s2c_bytes;
}
public Long getC2s_fragments() {
return c2s_fragments;
}
public void setC2s_fragments(Long c2s_fragments) {
this.c2s_fragments = c2s_fragments;
}
public Long getS2c_fragments() {
return s2c_fragments;
}
public void setS2c_fragments(Long s2c_fragments) {
this.s2c_fragments = s2c_fragments;
}
public Long getC2s_tcp_lost_bytes() {
return c2s_tcp_lost_bytes;
}
public void setC2s_tcp_lost_bytes(Long c2s_tcp_lost_bytes) {
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
}
public Long getS2c_tcp_lost_bytes() {
return s2c_tcp_lost_bytes;
}
public void setS2c_tcp_lost_bytes(Long s2c_tcp_lost_bytes) {
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
}
public Long getC2s_tcp_ooorder_pkts() {
return c2s_tcp_ooorder_pkts;
}
public void setC2s_tcp_ooorder_pkts(Long c2s_tcp_ooorder_pkts) {
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
}
public Long getS2c_tcp_ooorder_pkts() {
return s2c_tcp_ooorder_pkts;
}
public void setS2c_tcp_ooorder_pkts(Long s2c_tcp_ooorder_pkts) {
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
}
public Long getC2s_tcp_retransmitted_pkts() {
return c2s_tcp_retransmitted_pkts;
}
public void setC2s_tcp_retransmitted_pkts(Long c2s_tcp_retransmitted_pkts) {
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
}
public Long getS2c_tcp_retransmitted_pkts() {
return s2c_tcp_retransmitted_pkts;
}
public void setS2c_tcp_retransmitted_pkts(Long s2c_tcp_retransmitted_pkts) {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
}
public Long getC2s_tcp_retransmitted_bytes() {
return c2s_tcp_retransmitted_bytes;
}
public void setC2s_tcp_retransmitted_bytes(Long c2s_tcp_retransmitted_bytes) {
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
}
public Long getS2c_tcp_retransmitted_bytes() {
return s2c_tcp_retransmitted_bytes;
}
public void setS2c_tcp_retransmitted_bytes(Long s2c_tcp_retransmitted_bytes) {
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
}
public byte[] getClient_ip_sketch() {
return client_ip_sketch;
}
public void setClient_ip_sketch(byte[] client_ip_sketch) {
this.client_ip_sketch = client_ip_sketch;
}
}

View File

@@ -0,0 +1,73 @@
package com.zdjizhi.common.pojo;
/**
* @author qidaijie
* @Package com.zdjizhi.common.pojo
* @Description:
* @date 2023/4/2311:48
*/
public class Tags {
private int vsys_id;
private String device_id;
private String device_group;
private String data_center;
private String protocol_label;
private String app_full_path;
public Tags(int vsys_id, String device_id, String device_group, String data_center, String protocol_label, String app_full_path) {
this.vsys_id = vsys_id;
this.device_id = device_id;
this.device_group = device_group;
this.data_center = data_center;
this.protocol_label = protocol_label;
this.app_full_path = app_full_path;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public String getProtocol_label() {
return protocol_label;
}
public void setProtocol_label(String protocol_label) {
this.protocol_label = protocol_label;
}
public String getApp_full_path() {
return app_full_path;
}
public void setApp_full_path(String app_full_path) {
this.app_full_path = app_full_path;
}
}

View File

@@ -0,0 +1,67 @@
package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.utils.functions.filter.DataTypeFilter;
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
import com.zdjizhi.utils.functions.map.MetricsParseMap;
import com.zdjizhi.utils.functions.map.ResultFlatMap;
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author qidaijie
* @Package com.zdjizhi.topology
* @Description:
* @date 2021/5/2016:42
*/
public class ApplicationProtocolTopology {
private static final Log logger = LogFactory.get();
public static void main(String[] args) {
try {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//解析原始日志
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
.setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC);
SingleOutputStreamOperator<String> appProtocolFilter = streamSource.filter(new DataTypeFilter())
.name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM);
SingleOutputStreamOperator<Tuple2<String, AppProtocol>> parseDataMap = appProtocolFilter.map(new MetricsParseMap())
.name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM);
SingleOutputStreamOperator<AppProtocol> dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy())
.window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
.reduce(new DispersionCountWindow(), new MergeCountWindow())
.name("DispersionCountWindow")
.setParallelism(GlobalConfig.WINDOW_PARALLELISM);
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
.name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM);
resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
.setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
}
}
}

View File

@@ -0,0 +1,18 @@
package com.zdjizhi.utils.exception;
/**
* @author qidaijie
* @Package com.zdjizhi.storm.utils.execption
* @Description:
* @date 2021/3/259:42
*/
public class AnalysisException extends RuntimeException {
public AnalysisException() {
}
public AnalysisException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,36 @@
package com.zdjizhi.utils.functions.filter;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions.filter
* @Description:
* @date 2023/4/1919:02
*/
public class DataTypeFilter implements FilterFunction<String> {
private static final Log logger = LogFactory.get();
private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
@Override
public boolean filter(String message) throws Exception {
boolean protocolData = false;
try {
if (StringUtil.isNotBlank(message)) {
Object name = JSONPath.eval(message, dataTypeExpr);
if (name != null) {
protocolData = true;
}
}
} catch (RuntimeException e) {
logger.error("Parsing metric data is abnormal! The exception message is:" + e.getMessage());
}
return protocolData;
}
}

View File

@@ -0,0 +1,26 @@
package com.zdjizhi.utils.functions.keyby;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2112:13
*/
public class DimensionKeyBy implements KeySelector<Tuple2<String, AppProtocol>, String> {
@Override
public String getKey(Tuple2<String, AppProtocol> value) throws Exception {
//以map拼接的key分组
return value.f0;
}
}

View File

@@ -0,0 +1,49 @@
package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class MetricsParseMap implements MapFunction<String, Tuple2<String, AppProtocol>> {
private static final Log logger = LogFactory.get();
@Override
@SuppressWarnings("unchecked")
public Tuple2<String, AppProtocol> map(String message) {
try {
JSONObject originalLog = JSON.parseObject(message);
JSONObject fieldsObject = JSONObject.parseObject(originalLog.getString("fields"));
JSONObject tagsObject = JSONObject.parseObject(originalLog.getString("tags"));
fieldsObject.putAll(tagsObject);
AppProtocol appProtocol = JSON.to(AppProtocol.class, fieldsObject);
String appFullPath = appProtocol.getApp_name();
if (StringUtil.isNotBlank(appFullPath)) {
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
String protocolLabel = appProtocol.getProtocol_stack_id();
appProtocol.setApp_name(appName);
appProtocol.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
}
return new Tuple2<>(tagsObject.toJSONString(), appProtocol);
} catch (RuntimeException e) {
logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
return new Tuple2<>(null, null);
}
}
}

View File

@@ -0,0 +1,55 @@
package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.FormatConverterUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2114:52
*/
public class ResultFlatMap implements FlatMapFunction<AppProtocol, String> {
private static final Log logger = LogFactory.get();
@Override
@SuppressWarnings("unchecked")
public void flatMap(AppProtocol appProtocol, Collector<String> out) throws Exception {
try {
JSONObject tags = FormatConverterUtil.getTags(appProtocol);
JSONObject conversion = FormatConverterUtil.structureConversion(appProtocol);
String protocolStackId = tags.getString("protocol_stack_id");
out.collect(FormatConverterUtil.updateTagsData(conversion, tags));
tags.remove("app_name");
StringBuilder stringBuilder = new StringBuilder();
String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) {
if (StringUtil.isBlank(stringBuilder.toString())) {
stringBuilder.append(protocolIds[i]);
tags.put("protocol_stack_id", stringBuilder.toString());
out.collect(FormatConverterUtil.updateTagsData(conversion, tags));
} else {
stringBuilder.append(".").append(protocolIds[i]);
tags.put("protocol_stack_id", stringBuilder.toString());
conversion.put("tags", tags);
out.collect(FormatConverterUtil.updateTagsData(conversion, tags));
}
}
} catch (RuntimeException e) {
logger.error("An exception occurred during parsing the result data,error message is:" + e);
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,33 @@
package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.utils.general.MetricUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions.statistics
* @Description:
* @date 2023/4/2314:02
*/
public class DispersionCountWindow implements ReduceFunction<Tuple2<String, AppProtocol>> {
private static final Log logger = LogFactory.get();
@Override
public Tuple2<String, AppProtocol> reduce(Tuple2<String, AppProtocol> value1, Tuple2<String, AppProtocol> value2) throws Exception {
try {
AppProtocol cacheData = value1.f1;
AppProtocol newData = value2.f1;
MetricUtil.statisticsMetrics(cacheData, newData);
return new Tuple2<>(value1.f0, cacheData);
} catch (RuntimeException e) {
logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage());
return value1;
}
}
}

View File

@@ -0,0 +1,34 @@
package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.AppProtocol;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions.statistics
* @Description:
* @date 2023/4/2314:43
*/
public class MergeCountWindow extends ProcessWindowFunction<Tuple2<String, AppProtocol>, AppProtocol, String, TimeWindow> {
private static final Log logger = LogFactory.get();
@Override
public void process(String windowKey, Context context, Iterable<Tuple2<String, AppProtocol>> input, Collector<AppProtocol> output) throws Exception {
try {
Long endTime = context.window().getEnd() / 1000;
for (Tuple2<String, AppProtocol> tuple : input) {
AppProtocol data = tuple.f1;
data.setTimestamp(endTime);
output.collect(data);
}
} catch (RuntimeException e) {
logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage());
}
}
}

View File

@@ -0,0 +1,91 @@
package com.zdjizhi.utils.general;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import com.zdjizhi.common.pojo.AppProtocol;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.general
* @Description:
* @date 2023/5/519:04
*/
public class FormatConverterUtil {
/**
* 生成tags类型数据
*
* @param appProtocol 结果集
* @return tags结果
*/
public static JSONObject getTags(AppProtocol appProtocol) {
JSONObject tags = new JSONObject();
tags.fluentPut("vsys_id", appProtocol.getVsys_id())
.fluentPut("device_id", appProtocol.getDevice_id())
.fluentPut("device_group", appProtocol.getDevice_group())
.fluentPut("data_center", appProtocol.getData_center())
.fluentPut("protocol_stack_id", appProtocol.getProtocol_stack_id())
.fluentPut("app_name", appProtocol.getApp_name());
return tags;
}
/**
* 将数据结构转换为最终的结构
*
* @param appProtocol 结果集
* @return 结果数据
*/
public static JSONObject structureConversion(AppProtocol appProtocol) {
JSONObject metrics = new JSONObject();
JSONObject fields = new JSONObject();
fields.fluentPut("sessions", appProtocol.getSessions())
.fluentPut("in_bytes", appProtocol.getIn_bytes())
.fluentPut("out_bytes", appProtocol.getOut_bytes())
.fluentPut("in_pkts", appProtocol.getIn_pkts())
.fluentPut("out_pkts", appProtocol.getOut_pkts())
.fluentPut("c2s_bytes", appProtocol.getC2s_bytes())
.fluentPut("s2c_bytes", appProtocol.getS2c_bytes())
.fluentPut("c2s_pkts", appProtocol.getC2s_pkts())
.fluentPut("s2c_pkts", appProtocol.getS2c_pkts())
.fluentPut("c2s_fragments", appProtocol.getC2s_fragments())
.fluentPut("s2c_fragments", appProtocol.getS2c_fragments())
.fluentPut("c2s_tcp_lost_bytes", appProtocol.getC2s_tcp_lost_bytes())
.fluentPut("s2c_tcp_lost_bytes", appProtocol.getS2c_tcp_lost_bytes())
.fluentPut("c2s_tcp_ooorder_pkts", appProtocol.getC2s_tcp_ooorder_pkts())
.fluentPut("s2c_tcp_ooorder_pkts", appProtocol.getS2c_tcp_ooorder_pkts())
.fluentPut("c2s_tcp_retransmitted_pkts", appProtocol.getC2s_tcp_retransmitted_bytes())
.fluentPut("s2c_tcp_retransmitted_pkts", appProtocol.getS2c_tcp_retransmitted_bytes())
.fluentPut("c2s_tcp_retransmitted_bytes", appProtocol.getC2s_tcp_retransmitted_pkts())
.fluentPut("s2c_tcp_retransmitted_bytes", appProtocol.getS2c_tcp_retransmitted_pkts())
.fluentPut("client_ip_sketch", appProtocol.getClient_ip_sketch());
metrics.put("timestamp", appProtocol.getTimestamp());
metrics.put("name", "application_protocol_stat");
metrics.fluentPut("timestamp", appProtocol.getTimestamp())
.fluentPut("name", "application_protocol_stat")
.fluentPut("fields", fields);
return metrics;
}
/**
* 更新结果集tags数据不同协议层级并输出json
*
* @param conversion 结果集
* @param tags tags结果
* @return 结果json
*/
public static String updateTagsData(JSONObject conversion, JSONObject tags) {
conversion.put("tags", tags);
return JSONObject.toJSONString(conversion
, JSONWriter.Feature.WriteNullStringAsEmpty
, JSONWriter.Feature.WriteNullNumberAsZero);
}
}

View File

@@ -0,0 +1,111 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.utils.StringUtil;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import java.util.Base64;
/**
* @author qidaijie
* @Package com.zdjizhi.storm.utils.general
* @Description:
* @date 2021/7/2015:31
*/
public class MetricUtil {
private static final Log logger = LogFactory.get();
/**
* 用于对业务指标进行统计
*
* @param cacheData 缓存中数据
* @param newData 新数据
*/
public static void statisticsMetrics(AppProtocol cacheData, AppProtocol newData) {
cacheData.setSessions(MetricUtil.longSum(cacheData.getSessions(), newData.getSessions()));
cacheData.setIn_bytes(MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes()));
cacheData.setOut_pkts(MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes()));
cacheData.setIn_pkts(MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts()));
cacheData.setOut_pkts(MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts()));
cacheData.setC2s_bytes(MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes()));
cacheData.setS2c_bytes(MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes()));
cacheData.setC2s_pkts(MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts()));
cacheData.setS2c_pkts(MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts()));
cacheData.setC2s_fragments(MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments()));
cacheData.setS2c_fragments(MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments()));
cacheData.setC2s_tcp_lost_bytes(MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes()));
cacheData.setS2c_tcp_lost_bytes(MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes()));
cacheData.setC2s_tcp_ooorder_pkts(MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts()));
cacheData.setS2c_tcp_ooorder_pkts(MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts()));
cacheData.setC2s_tcp_retransmitted_bytes(MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()));
cacheData.setS2c_tcp_retransmitted_bytes(MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()));
cacheData.setC2s_tcp_retransmitted_pkts(MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts()));
cacheData.setS2c_tcp_retransmitted_pkts(MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts()));
cacheData.setClient_ip_sketch(MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()));
}
/**
* Long类型的数据求和
*
* @param value1 第一个值
* @param value2 第二个值
* @return value1 + value2
*/
private static Long longSum(Long value1, Long value2) {
Long result = 0L;
try {
if (value1 > 0 && value2 > 0) {
result = value1 + value2;
} else {
result = value1;
}
} catch (RuntimeException e) {
logger.error("Abnormal sending of traffic indicator statistics! The message is:" + e.getMessage());
result = value1;
}
return result;
}
/**
* @param cacheHll 缓存的sketch
* @param newHll 聚合后的sketch
* @return 合并后的sketch
*/
private static String hllSketchUnion(String cacheHll, String newHll) {
Union union = new Union(12);
try {
if (StringUtil.isNotBlank(cacheHll)) {
byte[] cacheHllBytes = Base64.getDecoder().decode(cacheHll);
HllSketch cacheSketch = HllSketch.heapify(cacheHllBytes);
union.update(cacheSketch);
}
if (StringUtil.isNotBlank(newHll)) {
byte[] newHllBytes = Base64.getDecoder().decode(newHll);
HllSketch newSketch = HllSketch.heapify(newHllBytes);
union.update(newSketch);
}
return Base64.getEncoder().encodeToString(union.getResult().toUpdatableByteArray());
} catch (RuntimeException e) {
logger.error("Merge hllSketch results abnormal! The message is:" + e.getMessage());
return null;
}
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/9/610:37
*/
class CertUtils {
/**
* Kafka SASL认证端口
*/
private static final String SASL_PORT = "9094";
/**
* Kafka SSL认证端口
*/
private static final String SSL_PORT = "9095";
/**
* 根据连接信息端口判断认证方式。
*
* @param servers kafka 连接信息
* @param properties kafka 连接配置信息
*/
static void chooseCert(String servers, Properties properties) {
if (servers.contains(SASL_PORT)) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
}
}
}

View File

@@ -0,0 +1,47 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/6/813:54
*/
public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", GlobalConfig.GROUP_ID);
properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
//随着checkpoint提交将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
//从消费组当前的offset开始消费
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/6/814:04
*/
public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS);
properties.put("acks", GlobalConfig.PRODUCER_ACK);
properties.put("retries", GlobalConfig.RETRIES);
properties.put("linger.ms", GlobalConfig.LINGER_MS);
properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", GlobalConfig.BATCH_SIZE);
properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY);
properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
GlobalConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
kafkaProducer.setLogFailuresOnly(true);
return kafkaProducer;
}
}

View File

@@ -0,0 +1,25 @@
#Log4j
log4j.rootLogger=error,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=error
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=error
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=${nis.root}/log/galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=error
#bonecp数据源配置
log4j.category.com.jolbox=error,console

View File

@@ -0,0 +1,76 @@
package com.zdjizhi;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
import java.util.Arrays;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2023/1/617:54
*/
public class ConventionalTest {
@Test
public void protocolTreeTest() {
String groupKey = "ETHERNET.IPv4.TCP.UNCATEGORIZED.qq_r2@4";
String protocol = groupKey.substring(0, groupKey.indexOf("@"));
System.out.println(protocol);
StringBuffer stringBuffer = new StringBuffer();
String appName = "qq_r2";
String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER);
for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto);
System.out.println(stringBuffer.toString());
} else {
stringBuffer.append(".").append(proto);
if (proto.equals(appName)) {
System.out.println(stringBuffer.toString() + "---" + appName);
} else {
System.out.println(stringBuffer.toString());
}
}
}
}
@Test
public void SplitTest() {
String str = "[.]";
String protocol = "ETHERNET.IPv4.TCP.http.test";
System.out.println(Arrays.toString(protocol.split(str)));
String str2 = "\\.";
System.out.println(Arrays.toString(protocol.split(str2)));
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < protocol.split(str).length - 1; i++) {
String value = protocol.split(str)[i];
if (StringUtil.isBlank(stringBuilder.toString())) {
stringBuilder.append(value);
System.out.println(stringBuilder.toString());
} else {
stringBuilder.append(".").append(value);
System.out.println(stringBuilder.toString());
}
}
System.out.println("\n\n\n");
protocol = "ETHERNET.IPv4.TCP";
String app = "http.test";
System.out.println(Arrays.toString(app.split(str2)));
System.out.println(app.substring(app.lastIndexOf(".") + 1));
System.out.println(protocol.concat(".").concat(app));
System.out.println("\n\n\n");
app = "test";
System.out.println(Arrays.toString(app.split(str2)));
System.out.println(app.substring(app.lastIndexOf(".") + 1));
System.out.println(protocol.concat(".").concat(app));
}
}

View File

@@ -0,0 +1,248 @@
package com.zdjizhi;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson2.*;
import com.zdjizhi.utils.JsonMapper;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import java.util.*;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2023/3/217:17
*/
public class DatasketchesTest {
@Test
public void HllSketchTest() {
HashSet<String> strings = new HashSet<>();
HllSketch sketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = "192.168.1." + i;
sketch.update(ip);
strings.add(ip);
}
System.out.println(sketch.getEstimate() + "--" + strings.size());
HashSet<String> randomStrings = new HashSet<>();
HllSketch randomSketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = makeIPv4Random();
randomSketch.update(ip);
randomStrings.add(ip);
}
System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size());
}
@Test
public void HllSketchUnionTest() {
HashSet<String> strings = new HashSet<>();
HllSketch sketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = "192.168.1." + i;
sketch.update(ip);
strings.add(ip);
}
HllSketch sketch2 = new HllSketch(12);
for (int i = 0; i < 10; i++) {
String ip = "192.168.2." + i;
sketch2.update(ip);
strings.add(ip);
}
Union union = new Union(12);
union.update(sketch);
union.update(sketch2);
HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray());
System.out.println(sketch.getEstimate() + "--" + strings.size());
System.out.println(sketch2.getEstimate() + "--" + strings.size());
System.out.println(sketch_result.getEstimate() + "--" + strings.size());
}
@Test
public void HllSketchDruidTest() {
HashMap<String, Object> dataMap = new HashMap<>();
HashSet<String> strings = new HashSet<>();
HllSketch sketch = new HllSketch(12);
for (int i = 0; i < 50; i++) {
String ip = "192.168.1." + i;
sketch.update(ip);
strings.add(ip);
}
HllSketch sketch2 = new HllSketch(12);
for (int i = 0; i < 10; i++) {
String ip = "192.168.2." + i;
sketch2.update(ip);
strings.add(ip);
}
Union union = new Union(12);
union.update(sketch);
union.update(sketch2);
HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray());
HllSketch sketch3 = new HllSketch(12);
for (int i = 0; i < 10; i++) {
String ip = "192.168.3." + i;
sketch3.update(ip);
strings.add(ip);
}
Union union2 = new Union(12);
union2.update(sketch_result1);
union2.update(sketch3);
HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray());
System.out.println(sketch.getEstimate() + "--" + strings.size());
System.out.println(sketch2.getEstimate() + "--" + strings.size());
System.out.println(sketch3.getEstimate() + "--" + strings.size());
System.out.println(sketch_result1.getEstimate() + "--" + strings.size());
System.out.println(sketch_result2.getEstimate() + "--" + strings.size());
Result result = new Result();
result.setC2s_pkt_num(10);
result.setS2c_pkt_num(10);
result.setC2s_byte_num(10);
result.setS2c_byte_num(10);
result.setStat_time(1679970031);
result.setSchema_type("HLLSketchMergeTest");
//CompactByte
result.setIp_object(sketch_result2.toCompactByteArray());
// System.out.println(result.toString());
//sendMessage(JsonMapper.toJsonString(result);
//UpdatableByte
result.setIp_object(sketch_result2.toUpdatableByteArray());
// System.out.println(result.toString());
//sendMessage(JsonMapper.toJsonString(result);
//Hashmap
dataMap.put("app_name", "TEST");
dataMap.put("protocol_stack_id", "HTTP");
dataMap.put("vsys_id", 1);
dataMap.put("stat_time", 1681370100);
dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
dataMap.put("client_ip_sketch", Base64.getEncoder().encode(sketch_result2.toUpdatableByteArray()));
System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
System.out.println(JSONUtil.toJsonStr(dataMap));
// sendMessage(JSONObject.toJSONString(dataMap));
}
//随机生成ip
private static String makeIPv4Random() {
int v4_1 = new Random().nextInt(255) + 1;
int v4_2 = new Random().nextInt(255);
int v4_3 = new Random().nextInt(255);
int v4_4 = new Random().nextInt(255);
return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
}
private static void sendMessage(Object message) {
Properties props = new Properties();
//kafka地址
props.put("bootstrap.servers", "192.168.44.12:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("buffer.memory", 67108864);
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props);
kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message));
kafkaProducer.close();
}
}
class Result {
private String schema_type;
private long c2s_byte_num;
private long c2s_pkt_num;
private long s2c_byte_num;
private long s2c_pkt_num;
private long stat_time;
private byte[] ip_object;
public void setSchema_type(String schema_type) {
this.schema_type = schema_type;
}
public void setC2s_byte_num(long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public void setC2s_pkt_num(long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public void setS2c_byte_num(long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public void setS2c_pkt_num(long s2c_pkt_num) {
this.s2c_pkt_num = s2c_pkt_num;
}
public void setStat_time(long stat_time) {
this.stat_time = stat_time;
}
public void setIp_object(byte[] ip_object) {
this.ip_object = ip_object;
}
@Override
public String toString() {
return "Result{" +
"schema_type='" + schema_type + '\'' +
", c2s_byte_num=" + c2s_byte_num +
", c2s_pkt_num=" + c2s_pkt_num +
", s2c_byte_num=" + s2c_byte_num +
", s2c_pkt_num=" + s2c_pkt_num +
", stat_time=" + stat_time +
", ip_object=" + Arrays.toString(ip_object) +
'}';
}
}

View File

@@ -0,0 +1,63 @@
package com.zdjizhi;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
import com.zdjizhi.common.pojo.AppProtocol;
import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
import java.util.ArrayList;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2023/4/2116:20
*/
public class FastJsonTest {
@Test
public void jsonPathTest() {
String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
String value = "{\"fields\":{\"c2s_bytes\":120953742,\"c2s_fragments\":0,\"c2s_pkts\":513665,\"c2s_tcp_lost_bytes\":13000,\"c2s_tcp_ooorder_pkts\":7,\"c2s_tcp_retransmitted_bytes\":89555044,\"c2s_tcp_retransmitted_pkts\":240585,\"in_bytes\":64959358,\"in_pkts\":396214,\"out_bytes\":166012,\"out_pkts\":166012,\"s2c_bytes\":28703159,\"s2c_fragments\":0,\"s2c_pkts\":48561,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":377,\"s2c_tcp_retransmitted_bytes\":72122,\"s2c_tcp_retransmitted_pkts\":166,\"sessions\":32148},\"name\":\"traffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"dns\",\"device_id\":\"9800165603247024\",\"device_group\":\"group-xxg-tsgx\",\"vsys_id\":23,\"data_center\":\"center-xxg-tsgx\",\"protocol_label\":\"ETHERNET.IPv4.UDP\"},\"timestamp\":1682046260}";
JSONPath dataTypePath = JSONPath.of(dataTypeExpr);
JSONReader parser = JSONReader.of(value);
Object result = dataTypePath.extract(parser);
if (result != null) {
System.out.println(result.toString());
}
Object eval = JSONPath.eval(value, dataTypeExpr);
if (eval != null) {
System.out.println(eval.toString());
}
System.out.println(JSONPath.contains(value, dataTypeExpr));
}
@Test
public void jsonTest() {
String message = "{\"fields\":{\"c2s_bytes\":120953742,\"c2s_fragments\":0,\"c2s_pkts\":513665,\"c2s_tcp_lost_bytes\":13000,\"c2s_tcp_ooorder_pkts\":7,\"c2s_tcp_retransmitted_bytes\":89555044,\"c2s_tcp_retransmitted_pkts\":240585,\"in_bytes\":64959358,\"in_pkts\":396214,\"out_bytes\":166012,\"out_pkts\":166012,\"s2c_bytes\":28703159,\"s2c_fragments\":0,\"s2c_pkts\":48561,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":377,\"s2c_tcp_retransmitted_bytes\":72122,\"s2c_tcp_retransmitted_pkts\":166,\"sessions\":32148},\"name\":\"traffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"dns\",\"device_id\":\"9800165603247024\",\"device_group\":\"group-xxg-tsgx\",\"vsys_id\":23,\"data_center\":\"center-xxg-tsgx\",\"protocol_label\":\"ETHERNET.IPv4.UDP\"},\"timestamp\":1682046260}";
JSONObject originalLog = JSON.parseObject(message);
JSONObject fieldsObject = JSONObject.parseObject(originalLog.getString("fields"));
JSONObject tagsObject = JSONObject.parseObject(originalLog.getString("tags"));
tagsObject.putAll(fieldsObject);
AppProtocol appProtocol = JSON.to(AppProtocol.class, tagsObject);
System.out.println(JSONObject.toJSONString(appProtocol));
System.out.println(appProtocol.getApp_name());
System.out.println(appProtocol.getProtocol_stack_id());
appProtocol.setApp_name("123");
appProtocol.setProtocol_stack_id("abc");
System.out.println(appProtocol.getApp_name());
System.out.println(appProtocol.getProtocol_stack_id());
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi;
import org.junit.Test;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2023/4/1810:22
*/
public class FlagsTest {
/*
* 参考资料https://juejin.cn/post/6879226834597691405
*
* 会话标记(实际存储为64位无符号整数32-bit Field标识会话的网络行为日志记录值和如下值通过Bitwise AND(&)操作进行查询和转换:
* 0x00000001 - (1) Asymmetric
* 0x00000002 - (2) Bulky
* 0x00000004 - (4) CBR Streaming
* 0x00000008 - (8) Client is Local
* 0x00000010 - (16) Server is Local
* 0x00000020 - (32) Download
* 0x00000040 - (64) Interactive
* 0x00000080 - (128) Inbound
* 0x00000100 - (256) Outbound
* 0x00000200 - (512) Pseudo Unidirectional
* 0x00000400 - (1024) Streaming
* 0x00000800 - (2048) Unidirectional
* 0x00001000 - (4096) Random looking
* 0x00002000 - (8192) C2S
* 0x00004000 - (16384) S2C
*/
@Test
public void bitwiseAND() {
Long common_flags = 8200L;
Long clientIsLocal = 8L;
Long serverIsLocal = 16L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n");
common_flags = 16400L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal));
}
}