Compare commits
69 Commits
feature/sc
...
v1.0-rc2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fc8cc7c2d | ||
|
|
98bb843159 | ||
|
|
edb044596e | ||
|
|
557156af79 | ||
|
|
32a811fb1c | ||
|
|
36cbaebf0c | ||
|
|
68e91c9ae5 | ||
|
|
69004f5c02 | ||
|
|
9804b0cc52 | ||
|
|
fa619d2595 | ||
|
|
a44d434952 | ||
|
|
736bc40202 | ||
|
|
6a2e5bacb9 | ||
|
|
da69688270 | ||
|
|
a4e92c03e7 | ||
|
|
b207f8fb8e | ||
|
|
5f16a98402 | ||
|
|
43282c7163 | ||
|
|
b38f128f32 | ||
|
|
2c64391737 | ||
|
|
7e5a524eb7 | ||
|
|
255d6d9fcb | ||
|
|
4c90c27921 | ||
|
|
f4b8ba69d0 | ||
|
|
0f6fdda7f5 | ||
|
|
44eb3ed402 | ||
|
|
ebada97c22 | ||
|
|
7221f2a52f | ||
|
|
6a444d38ba | ||
|
|
7d0d185c0e | ||
|
|
c9c26f0a9c | ||
|
|
b20c33e2bd | ||
|
|
c10b8930e3 | ||
|
|
21ef68da95 | ||
|
|
33b9d61711 | ||
|
|
43dbc50a19 | ||
|
|
5bc835ec3f | ||
|
|
673d077e32 | ||
|
|
b2af6cdad8 | ||
|
|
bfd28178b0 | ||
|
|
cefc581964 | ||
|
|
c1404584c0 | ||
|
|
ef851cb187 | ||
|
|
402db73b53 | ||
|
|
cbc7c91abe | ||
|
|
14d569edfd | ||
|
|
4d3b53cbb7 | ||
|
|
112708f056 | ||
|
|
b40469ea8d | ||
|
|
b7cc87b4bc | ||
|
|
9ed0da345b | ||
|
|
559c7b287c | ||
|
|
bcd6826c56 | ||
|
|
7296c36d80 | ||
|
|
a8a6519a88 | ||
|
|
7d2b0bae29 | ||
|
|
098a196cc0 | ||
|
|
cfb56877c3 | ||
|
|
d4bc2787d4 | ||
|
|
53eca34191 | ||
|
|
a949a98a37 | ||
|
|
fe274d4da6 | ||
|
|
6d90d720fd | ||
|
|
43f10e38e7 | ||
|
|
f52376d63d | ||
|
|
1046784f3e | ||
|
|
3ff568c823 | ||
|
|
906b6e6ca4 | ||
|
|
8d5d1d9523 |
35
.gitignore
vendored
Normal file
35
.gitignore
vendored
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
target/
|
||||||
|
!.mvn/wrapper/maven-wrapper.jar
|
||||||
|
!**/src/main/**/target/
|
||||||
|
!**/src/test/**/target/
|
||||||
|
|
||||||
|
### IntelliJ IDEA ###
|
||||||
|
.idea/
|
||||||
|
*.iws
|
||||||
|
*.iml
|
||||||
|
*.ipr
|
||||||
|
|
||||||
|
### Eclipse ###
|
||||||
|
.apt_generated
|
||||||
|
.classpath
|
||||||
|
.factorypath
|
||||||
|
.project
|
||||||
|
.settings
|
||||||
|
.springBeans
|
||||||
|
.sts4-cache
|
||||||
|
|
||||||
|
### NetBeans ###
|
||||||
|
/nbproject/private/
|
||||||
|
/nbbuild/
|
||||||
|
/dist/
|
||||||
|
/nbdist/
|
||||||
|
/.nb-gradle/
|
||||||
|
build/
|
||||||
|
!**/src/main/**/build/
|
||||||
|
!**/src/test/**/build/
|
||||||
|
|
||||||
|
### VS Code ###
|
||||||
|
.vscode/
|
||||||
|
|
||||||
|
### Mac OS ###
|
||||||
|
.DS_Store
|
||||||
4
CHANGELOG.md
Normal file
4
CHANGELOG.md
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
### Hotfix
|
||||||
|
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
|
||||||
105
README.md
105
README.md
@@ -1,92 +1,41 @@
|
|||||||
# flink-voip-fusion
|
# SIP RTP Correlation
|
||||||
|
|
||||||
|
## 简介
|
||||||
|
|
||||||
|
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。
|
||||||
|
|
||||||
## Getting started
|
SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
|
||||||
|
|
||||||
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
|
## 编译和打包
|
||||||
|
|
||||||
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
|
使用Maven工具对项目进行编译和打包:
|
||||||
|
```shell
|
||||||
## Add your files
|
mvn clean package
|
||||||
|
|
||||||
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
|
|
||||||
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
|
|
||||||
|
|
||||||
```
|
|
||||||
cd existing_repo
|
|
||||||
git remote add origin https://git.mesalab.cn/galaxy/tsg_olap/flink-voip-fusion.git
|
|
||||||
git branch -M main
|
|
||||||
git push -uf origin main
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Integrate with your tools
|
## 运行Flink任务
|
||||||
|
|
||||||
- [ ] [Set up project integrations](https://git.mesalab.cn/galaxy/tsg_olap/flink-voip-fusion/-/settings/integrations)
|
使用以下命令运行Flink任务:
|
||||||
|
```shell
|
||||||
|
flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<version>.jar application.properties
|
||||||
|
```
|
||||||
|
|
||||||
## Collaborate with your team
|
## 配置项说明
|
||||||
|
|
||||||
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
|
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
||||||
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
|
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- | ----------------------------------------------------------- |
|
||||||
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
||||||
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
|
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||||
- [ ] [Automatically merge when pipeline succeeds](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
|
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||||
|
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||||
|
| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||||
|
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||||
|
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||||
|
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||||
|
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
||||||
|
|
||||||
## Test and Deploy
|
|
||||||
|
|
||||||
Use the built-in continuous integration in GitLab.
|
|
||||||
|
|
||||||
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html)
|
## 贡献
|
||||||
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing(SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
|
|
||||||
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
|
|
||||||
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
|
|
||||||
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
|
|
||||||
|
|
||||||
***
|
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。
|
||||||
|
|
||||||
# Editing this README
|
|
||||||
|
|
||||||
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thank you to [makeareadme.com](https://www.makeareadme.com/) for this template.
|
|
||||||
|
|
||||||
## Suggestions for a good README
|
|
||||||
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
|
|
||||||
|
|
||||||
## Name
|
|
||||||
Choose a self-explaining name for your project.
|
|
||||||
|
|
||||||
## Description
|
|
||||||
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
|
|
||||||
|
|
||||||
## Badges
|
|
||||||
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
|
|
||||||
|
|
||||||
## Visuals
|
|
||||||
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
|
|
||||||
|
|
||||||
## Installation
|
|
||||||
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
|
|
||||||
|
|
||||||
## Support
|
|
||||||
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
|
|
||||||
|
|
||||||
## Roadmap
|
|
||||||
If you have ideas for releases in the future, it is a good idea to list them in the README.
|
|
||||||
|
|
||||||
## Contributing
|
|
||||||
State if you are open to contributions and what your requirements are for accepting them.
|
|
||||||
|
|
||||||
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
|
|
||||||
|
|
||||||
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
|
|
||||||
|
|
||||||
## Authors and acknowledgment
|
|
||||||
Show your appreciation to those who have contributed to the project.
|
|
||||||
|
|
||||||
## License
|
|
||||||
For open source projects, say how it is licensed.
|
|
||||||
|
|
||||||
## Project status
|
|
||||||
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.
|
|
||||||
366
pom.xml
Normal file
366
pom.xml
Normal file
@@ -0,0 +1,366 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<groupId>com.zdjizhi</groupId>
|
||||||
|
<artifactId>sip-rtp-correlation</artifactId>
|
||||||
|
<version>1.0-rc2</version>
|
||||||
|
|
||||||
|
<name>Flink : SIP-RTP : Correlation</name>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
<java.version>1.8</java.version>
|
||||||
|
<maven.compiler.source>${java.version}</maven.compiler.source>
|
||||||
|
<maven.compiler.target>${java.version}</maven.compiler.target>
|
||||||
|
<flink.version>1.13.6</flink.version>
|
||||||
|
<scala.version>2.12.10</scala.version>
|
||||||
|
<scala.binary.version>2.12</scala.binary.version>
|
||||||
|
<slf4j.version>1.7.32</slf4j.version>
|
||||||
|
<log4j.version>2.17.1</log4j.version>
|
||||||
|
<jackson.version>2.13.2.20220328</jackson.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-json</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-metrics-influxdb_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.zdjizhi</groupId>
|
||||||
|
<artifactId>galaxy</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>1.18.26</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.javafaker</groupId>
|
||||||
|
<artifactId>javafaker</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.flink</groupId>
|
||||||
|
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||||
|
<version>${flink.version}</version>
|
||||||
|
<classifier>tests</classifier>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-slf4j-impl</artifactId>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-api</artifactId>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-core</artifactId>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
<version>${slf4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-slf4j-impl</artifactId>
|
||||||
|
<version>${log4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-api</artifactId>
|
||||||
|
<version>${log4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-core</artifactId>
|
||||||
|
<version>${log4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<!-- API bridge between log4j 1 and 2 -->
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-1.2-api</artifactId>
|
||||||
|
<version>${log4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.javafaker</groupId>
|
||||||
|
<artifactId>javafaker</artifactId>
|
||||||
|
<version>1.0.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.zdjizhi</groupId>
|
||||||
|
<artifactId>galaxy</artifactId>
|
||||||
|
<version>1.1.3</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>log4j</groupId>
|
||||||
|
<artifactId>log4j</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.junit.jupiter</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>5.8.0</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.codehaus.mojo</groupId>
|
||||||
|
<artifactId>build-helper-maven-plugin</artifactId>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>test-sources</id>
|
||||||
|
<phase>generate-test-sources</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>add-test-source</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<sources>
|
||||||
|
<source>src/it/java</source>
|
||||||
|
</sources>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>test-resources</id>
|
||||||
|
<phase>generate-test-resources</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>add-test-resource</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<resources>
|
||||||
|
<resource>
|
||||||
|
<directory>src/it/resources</directory>
|
||||||
|
</resource>
|
||||||
|
</resources>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<version>3.1</version>
|
||||||
|
<configuration>
|
||||||
|
<source>${java.version}</source>
|
||||||
|
<target>${java.version}</target>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-dependency-plugin</artifactId>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>copy-tests-dependencies</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>copy-dependencies</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<outputDirectory>${project.build.outputDirectory}/tests-lib</outputDirectory>
|
||||||
|
<excludeScope>system</excludeScope>
|
||||||
|
<excludeTransitive>false</excludeTransitive>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.jacoco</groupId>
|
||||||
|
<artifactId>jacoco-maven-plugin</artifactId>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>pre-unit-test</id>
|
||||||
|
<goals>
|
||||||
|
<goal>prepare-agent</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<destFile>${project.build.directory}/jacoco.exec</destFile>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>test-report</id>
|
||||||
|
<phase>verify</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>report</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<dataFile>${project.build.directory}/jacoco.exec</dataFile>
|
||||||
|
<outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-source-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-javadoc-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-jar-plugin</artifactId>
|
||||||
|
<version>3.1.0</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<goals>
|
||||||
|
<goal>test-jar</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-shade-plugin</artifactId>
|
||||||
|
<version>3.1.1</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>shade</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||||
|
<artifactSet>
|
||||||
|
<excludes>
|
||||||
|
<exclude>org.apache.flink:force-shading</exclude>
|
||||||
|
<exclude>com.google.code.findbugs:jsr305</exclude>
|
||||||
|
<exclude>org.slf4j:*</exclude>
|
||||||
|
<exclude>org.apache.logging.log4j:*</exclude>
|
||||||
|
</excludes>
|
||||||
|
</artifactSet>
|
||||||
|
<filters>
|
||||||
|
<filter>
|
||||||
|
<!-- Do not copy the signatures in the META-INF folder.
|
||||||
|
Otherwise, this might cause SecurityExceptions when using the JAR. -->
|
||||||
|
<artifact>*:*</artifact>
|
||||||
|
<excludes>
|
||||||
|
<exclude>META-INF/*.SF</exclude>
|
||||||
|
<exclude>META-INF/*.DSA</exclude>
|
||||||
|
<exclude>META-INF/*.RSA</exclude>
|
||||||
|
</excludes>
|
||||||
|
</filter>
|
||||||
|
</filters>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
|
||||||
|
<pluginManagement>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-source-plugin</artifactId>
|
||||||
|
<version>3.2.1</version>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<version>3.0.0-M5</version>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.jacoco</groupId>
|
||||||
|
<artifactId>jacoco-maven-plugin</artifactId>
|
||||||
|
<version>0.8.7</version>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-javadoc-plugin</artifactId>
|
||||||
|
<version>3.3.0</version>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.codehaus.mojo</groupId>
|
||||||
|
<artifactId>build-helper-maven-plugin</artifactId>
|
||||||
|
<version>3.1.0</version>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-jar-plugin</artifactId>
|
||||||
|
<version>3.1.0</version>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-dependency-plugin</artifactId>
|
||||||
|
<version>3.6.0</version>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</pluginManagement>
|
||||||
|
</build>
|
||||||
|
</project>
|
||||||
95
src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java
Normal file
95
src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
package com.zdjizhi.flink.voip;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.functions.*;
|
||||||
|
import org.apache.flink.api.common.time.Time;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.apache.flink.api.java.utils.ParameterTool;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.CheckpointingMode;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
// Integration test main class
|
||||||
|
public class CorrelateTest {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
final Configuration envConfig = new Configuration();
|
||||||
|
envConfig.setInteger("rest.port", 18081);
|
||||||
|
envConfig.setBoolean("rest.flamegraph.enabled", true);
|
||||||
|
envConfig.setString("metrics.reporter.influxdb.factory.class", "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory");
|
||||||
|
envConfig.setString("metrics.reporter.influxdb.scheme", "http");
|
||||||
|
envConfig.setString("metrics.reporter.influxdb.host", "127.0.0.1");
|
||||||
|
envConfig.setInteger("metrics.reporter.influxdb.port", 8086);
|
||||||
|
envConfig.setString("metrics.reporter.influxdb.db", "flinks");
|
||||||
|
envConfig.setString("metrics.reporter.influxdb.interval", "5 SECONDS");
|
||||||
|
|
||||||
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment
|
||||||
|
.createLocalEnvironmentWithWebUI(envConfig);
|
||||||
|
|
||||||
|
final ParameterTool tool = ParameterTool.fromPropertiesFile(
|
||||||
|
args.length > 0 ? args[0] :
|
||||||
|
Objects.requireNonNull(
|
||||||
|
CorrelateTest.class.getResource("/application-test.properties")).getPath()
|
||||||
|
);
|
||||||
|
|
||||||
|
final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
|
||||||
|
env.enableCheckpointing(Time.minutes(1)
|
||||||
|
.toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
|
||||||
|
checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds());
|
||||||
|
checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir"));
|
||||||
|
/* checkpointConfig.setCheckpointStorage(
|
||||||
|
Objects.requireNonNull(
|
||||||
|
CorrelateTest.class.getResource("/")).toString());*/
|
||||||
|
|
||||||
|
final Configuration config = tool.getConfiguration();
|
||||||
|
env.getConfig().setGlobalJobParameters(config);
|
||||||
|
env.setParallelism(8);
|
||||||
|
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> sourceStream = env.addSource(new DataGenSource())
|
||||||
|
.name("DataGenSource")
|
||||||
|
.setParallelism(4);
|
||||||
|
|
||||||
|
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
|
||||||
|
sourceStream
|
||||||
|
.process(new TypeSplitFunction())
|
||||||
|
.name("SplitsRecordsBasedSchemaType")
|
||||||
|
.uid("splits-records-based-schema-type");
|
||||||
|
|
||||||
|
// Get the DataStreams for SIP and RTP data from the side outputs.
|
||||||
|
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
|
||||||
|
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
|
||||||
|
|
||||||
|
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
|
||||||
|
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
|
||||||
|
|
||||||
|
// Process SIP data to create a double directional stream using SIPPairingFunction.
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
|
||||||
|
.keyBy(new SIPKeySelector())
|
||||||
|
.process(new SIPPairingFunction())
|
||||||
|
.name("PairingOneWayToDoubleStream")
|
||||||
|
.uid("pairing-one-way-to-double");
|
||||||
|
|
||||||
|
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
|
||||||
|
|
||||||
|
// Fusion SIP data and RTP data to VoIP data.
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
|
||||||
|
.keyBy(vSysSelector)
|
||||||
|
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
|
||||||
|
.process(new VoIPFusionFunction())
|
||||||
|
.name("VoIPFusion")
|
||||||
|
.uid("voip-fusion");
|
||||||
|
|
||||||
|
voIpOperator.addSink(new DoNothingSink())
|
||||||
|
.name("DoNothingSink")
|
||||||
|
.setParallelism(1);
|
||||||
|
|
||||||
|
env.execute("VoIP Fusion Job");
|
||||||
|
}
|
||||||
|
}
|
||||||
22
src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java
Normal file
22
src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package com.zdjizhi.flink.voip.conf;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.ConfigOption;
|
||||||
|
import org.apache.flink.configuration.ConfigOptions;
|
||||||
|
|
||||||
|
// Test Configs.
|
||||||
|
public class TestConfigs {
|
||||||
|
|
||||||
|
// Ratio of valuable data in the generated dataset
|
||||||
|
public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION =
|
||||||
|
ConfigOptions.key("valuable.data.ratio")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(40)
|
||||||
|
.withDescription("Ratio of valuable data in the generated dataset.");
|
||||||
|
|
||||||
|
// QPS of generate date record
|
||||||
|
public static final ConfigOption<Long> DATA_GENERATE_RATE =
|
||||||
|
ConfigOptions.key("data.generate.rate")
|
||||||
|
.longType()
|
||||||
|
.defaultValue(1000L)
|
||||||
|
.withDescription("QPS of generate date record.");
|
||||||
|
}
|
||||||
@@ -0,0 +1,46 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.conf.TestConfigs;
|
||||||
|
import com.zdjizhi.flink.voip.data.RTPGenerator;
|
||||||
|
import com.zdjizhi.flink.voip.data.SIPGenerator;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
|
||||||
|
|
||||||
|
// Date Generate Source.
|
||||||
|
public class DataGenSource extends RichParallelSourceFunction<ObjectNode> implements FunctionHelper {
|
||||||
|
|
||||||
|
private transient SIPGenerator sipGenerator;
|
||||||
|
private transient RTPGenerator rtpGenerator;
|
||||||
|
private transient RateLimiter rateLimiter;
|
||||||
|
private volatile boolean running;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
Integer ratio = getGlobalConfiguration()
|
||||||
|
.get(TestConfigs.VALUABLE_DATA_PROPORTION);
|
||||||
|
this.sipGenerator = new SIPGenerator(ratio);
|
||||||
|
this.rtpGenerator = new RTPGenerator(ratio, sipGenerator);
|
||||||
|
final Long rate = getGlobalConfiguration()
|
||||||
|
.get(TestConfigs.DATA_GENERATE_RATE);
|
||||||
|
this.rateLimiter = RateLimiter.create(
|
||||||
|
(int) (rate / getRuntimeContext().getNumberOfParallelSubtasks()));
|
||||||
|
this.running = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(SourceContext<ObjectNode> ctx) throws Exception {
|
||||||
|
while (running) {
|
||||||
|
rateLimiter.acquire();
|
||||||
|
ctx.collect(sipGenerator.next());
|
||||||
|
ctx.collect(rtpGenerator.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancel() {
|
||||||
|
this.running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.metrics.MeterView;
|
||||||
|
import org.apache.flink.metrics.MetricGroup;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||||
|
|
||||||
|
|
||||||
|
// It dose nothing with the incoming data and simply collects metrics for the number of
|
||||||
|
// RTP and VoIP records processed per second.
|
||||||
|
public class DoNothingSink extends RichSinkFunction<ObjectNode> {
|
||||||
|
|
||||||
|
private transient MeterView numRTPRecordsPreSecond;
|
||||||
|
private transient MeterView numVoIPRecordsPreSecond;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
RuntimeContext runtimeContext = getRuntimeContext();
|
||||||
|
MetricGroup metricGroup = runtimeContext.getMetricGroup();
|
||||||
|
numRTPRecordsPreSecond = metricGroup
|
||||||
|
.meter("numRTPRecordsPreSecond", new MeterView(1));
|
||||||
|
numVoIPRecordsPreSecond = metricGroup
|
||||||
|
.meter("numVoIPRecordsPreSecond", new MeterView(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void invoke(ObjectNode obj, Context context) throws Exception {
|
||||||
|
Record record = new Record(obj);
|
||||||
|
switch (record.getSchemaType()) {
|
||||||
|
case RTP:
|
||||||
|
numRTPRecordsPreSecond.markEvent();
|
||||||
|
break;
|
||||||
|
case VOIP:
|
||||||
|
numVoIPRecordsPreSecond.markEvent();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
4
src/it/resources/application-test.properties
Normal file
4
src/it/resources/application-test.properties
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
sip.state.clear.interval.minutes=1
|
||||||
|
rtp.state.clear.interval.minutes=10
|
||||||
|
valuable.data.ratio=20
|
||||||
|
data.generate.rate=1000
|
||||||
25
src/it/resources/log4j2-test.properties
Normal file
25
src/it/resources/log4j2-test.properties
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
################################################################################
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
rootLogger.level = INFO
|
||||||
|
rootLogger.appenderRef.test.ref = TestLogger
|
||||||
|
|
||||||
|
appender.test.name = TestLogger
|
||||||
|
appender.test.type = CONSOLE
|
||||||
|
appender.test.layout.type = PatternLayout
|
||||||
|
appender.test.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
|
||||||
99
src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
Normal file
99
src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
package com.zdjizhi.flink.voip;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||||
|
import com.zdjizhi.flink.voip.error.ErrorHandler;
|
||||||
|
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||||
|
import com.zdjizhi.flink.voip.functions.*;
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.apache.flink.api.java.utils.ParameterTool;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||||
|
|
||||||
|
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The main class for running the SIP RTP Correlation application.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class CorrelateApp {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
|
||||||
|
// param check
|
||||||
|
if (args.length < 1) {
|
||||||
|
throw new IllegalArgumentException("Error: Not found properties path. " +
|
||||||
|
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
||||||
|
|
||||||
|
final Configuration config = tool.getConfiguration();
|
||||||
|
env.getConfig().setGlobalJobParameters(config);
|
||||||
|
|
||||||
|
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
|
||||||
|
|
||||||
|
final FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>(
|
||||||
|
config.get(SOURCE_KAFKA_TOPIC),
|
||||||
|
new JsonNodeDeserializationSchema(),
|
||||||
|
fusionConfiguration
|
||||||
|
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
|
||||||
|
|
||||||
|
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
|
||||||
|
|
||||||
|
final ErrorHandler errorHandler = new ErrorHandler(config);
|
||||||
|
|
||||||
|
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
|
||||||
|
errorHandler.filterError(sourceStream)
|
||||||
|
.process(new TypeSplitFunction())
|
||||||
|
.name("SplitsRecordsBasedSchemaType")
|
||||||
|
.uid("splits-records-based-schema-type");
|
||||||
|
|
||||||
|
// Get the DataStreams for SIP and RTP data from the side outputs.
|
||||||
|
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
|
||||||
|
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
|
||||||
|
|
||||||
|
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
|
||||||
|
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
|
||||||
|
|
||||||
|
// Process SIP data to create a double directional stream using SIPPairingFunction.
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
|
||||||
|
.keyBy(new SIPKeySelector())
|
||||||
|
.process(new SIPPairingFunction())
|
||||||
|
.name("PairingOneWayToDoubleStream")
|
||||||
|
.uid("pairing-one-way-to-double");
|
||||||
|
|
||||||
|
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
|
||||||
|
|
||||||
|
// Fusion SIP data and RTP data to VoIP data.
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
|
||||||
|
.keyBy(vSysSelector)
|
||||||
|
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
|
||||||
|
.process(new VoIPFusionFunction())
|
||||||
|
.name("VoIPFusion")
|
||||||
|
.uid("voip-fusion");
|
||||||
|
|
||||||
|
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
|
||||||
|
config.get(SINK_KAFKA_TOPIC),
|
||||||
|
new JsonNodeSerializationSchema(),
|
||||||
|
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
||||||
|
|
||||||
|
voIpOperator.addSink(producer);
|
||||||
|
|
||||||
|
env.execute("VoIP Fusion Job");
|
||||||
|
}
|
||||||
|
}
|
||||||
88
src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
Normal file
88
src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package com.zdjizhi.flink.voip.conf;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.ConfigOption;
|
||||||
|
import org.apache.flink.configuration.ConfigOptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Containing configuration options for the Fusion application.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class FusionConfigs {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The prefix for Kafka properties used in the source.
|
||||||
|
*/
|
||||||
|
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The prefix for Kafka properties used in the sink.
|
||||||
|
*/
|
||||||
|
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration prefix for the properties of the Kafka sink where the error data will be output.
|
||||||
|
*/
|
||||||
|
public static final String ERROR_SINK_KAFKA_PROPERTIES_PREFIX = "error.sink.kafka.props.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option for the Kafka topic used in the source.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
|
||||||
|
ConfigOptions.key("source.kafka.topic")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDescription("The Kafka topic used in the source.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option for the Kafka topic used in the sink.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
|
||||||
|
ConfigOptions.key("sink.kafka.topic")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDescription("The Kafka topic used in the sink.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option to enable or disable the output of error records.
|
||||||
|
* If set to true, the error records will be sent to the specified Kafka topic.
|
||||||
|
* Default value is false.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<Boolean> ERROR_RECORDS_OUTPUT_ENABLE =
|
||||||
|
ConfigOptions.key("error.records.output.enable")
|
||||||
|
.booleanType()
|
||||||
|
.defaultValue(false)
|
||||||
|
.withDescription("Enable or disable the output of error records. " +
|
||||||
|
"If set to true, the error records will be sent to the specified Kafka topic.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option for specifying the Kafka topic name where the error data will be sent.
|
||||||
|
* This configuration option is used when the output of error records is enabled.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<String> ERROR_SINK_KAFKA_TOPIC =
|
||||||
|
ConfigOptions.key("error.sink.kafka.topic")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDescription("The Kafka topic name where the error records will be sent.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The configuration option for the interval at which SIP state data
|
||||||
|
* should be cleared.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<Integer> SIP_STATE_CLEAR_INTERVAL =
|
||||||
|
ConfigOptions.key("sip.state.clear.interval.minutes")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1)
|
||||||
|
.withDescription("The interval at which SIP state data should be cleared.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The configuration option for the interval at which STP state data
|
||||||
|
* should be cleared.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<Integer> RTP_STATE_CLEAR_INTERVAL =
|
||||||
|
ConfigOptions.key("rtp.state.clear.interval.minutes")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(6)
|
||||||
|
.withDescription("The interval at which RTP state data should be cleared.");
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
package com.zdjizhi.flink.voip.conf;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
|
||||||
|
* properties with a specific prefix. This class allows retrieving properties that start with the
|
||||||
|
* given `prefix` and converts them into a `java.util.Properties` object.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class FusionConfiguration {
|
||||||
|
private final Configuration config;
|
||||||
|
|
||||||
|
public FusionConfiguration(final Configuration config) {
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves properties from the underlying `Configuration` instance that start with the specified
|
||||||
|
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
|
||||||
|
*
|
||||||
|
* @param prefix The prefix to filter properties.
|
||||||
|
* @return A `java.util.Properties` object containing the properties with the specified prefix.
|
||||||
|
*/
|
||||||
|
public Properties getProperties(final String prefix) {
|
||||||
|
if (prefix == null) {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
props.putAll(config.toMap());
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
return config.toMap()
|
||||||
|
.entrySet()
|
||||||
|
.stream()
|
||||||
|
.filter(entry -> entry.getKey().startsWith(prefix))
|
||||||
|
.collect(Properties::new, (props, e) ->
|
||||||
|
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
|
||||||
|
Properties::putAll);
|
||||||
|
}
|
||||||
|
}
|
||||||
152
src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
Normal file
152
src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
package com.zdjizhi.flink.voip.error;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||||
|
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||||
|
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||||
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||||
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||||
|
import com.zdjizhi.utils.IPUtil;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.apache.flink.util.OutputTag;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
||||||
|
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class ErrorHandler {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ErrorHandler.class);
|
||||||
|
/**
|
||||||
|
* The OutputTag for invalid records.
|
||||||
|
*/
|
||||||
|
public static final OutputTag<ObjectNode> INVALID_OUTPUT_TAG =
|
||||||
|
new OutputTag<>("invalid-records", TypeInformation.of(ObjectNode.class));
|
||||||
|
|
||||||
|
private final Configuration config;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new ErrorHandler instance with the given configuration.
|
||||||
|
*
|
||||||
|
* @param config The configuration containing settings for error handling.
|
||||||
|
*/
|
||||||
|
public ErrorHandler(final Configuration config) {
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filters out error records from the input data stream and outputs them to a separate stream if enabled.
|
||||||
|
*
|
||||||
|
* @param dataStream The input data stream of ObjectNode records.
|
||||||
|
* @return A new DataStream containing valid records without errors.
|
||||||
|
*/
|
||||||
|
public DataStream<ObjectNode> filterError(final DataStream<ObjectNode> dataStream) {
|
||||||
|
// Process the data stream to identify meaningless addresses and ports
|
||||||
|
final SingleOutputStreamOperator<ObjectNode> operator = dataStream
|
||||||
|
.process(new MeaninglessAddressProcessFunction())
|
||||||
|
.name("MeaninglessRecords")
|
||||||
|
.uid("meaningless-records");
|
||||||
|
|
||||||
|
// If enabled, output the error records to the specified Kafka topic
|
||||||
|
if (config.get(FusionConfigs.ERROR_RECORDS_OUTPUT_ENABLE)) {
|
||||||
|
|
||||||
|
final String topic = config.get(FusionConfigs.ERROR_SINK_KAFKA_TOPIC);
|
||||||
|
|
||||||
|
LOG.info("Meaningless data output is enabled, data will be sent to: Topic [{}]", topic);
|
||||||
|
|
||||||
|
final DataStream<ObjectNode> errorStream = operator
|
||||||
|
.getSideOutput(INVALID_OUTPUT_TAG);
|
||||||
|
|
||||||
|
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
|
||||||
|
topic,
|
||||||
|
new JsonNodeSerializationSchema(),
|
||||||
|
new FusionConfiguration(config)
|
||||||
|
.getProperties(FusionConfigs.ERROR_SINK_KAFKA_PROPERTIES_PREFIX)
|
||||||
|
);
|
||||||
|
|
||||||
|
errorStream.addSink(producer);
|
||||||
|
}
|
||||||
|
return operator;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
|
||||||
|
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
|
||||||
|
*/
|
||||||
|
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processElement(ObjectNode obj,
|
||||||
|
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
// Check for invalid or meaningless addresses and ports
|
||||||
|
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
|
||||||
|
StringUtils.isNotBlank(record.getServerIp()) &&
|
||||||
|
record.getClientPort() >= 0 &&
|
||||||
|
record.getServerPort() >= 0;
|
||||||
|
|
||||||
|
final SIPRecord sipRecord = new SIPRecord(obj);
|
||||||
|
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||||
|
|| isInternalIp(sipRecord.getOriginatorSdpConnectIp());
|
||||||
|
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|
||||||
|
|| isInternalIp(sipRecord.getResponderSdpConnectIp());
|
||||||
|
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||||
|
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||||
|
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
||||||
|
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
|
||||||
|
isInternalIp(sipRecord.getResponderSdpConnectIp()) &&
|
||||||
|
isInternalIp(sipRecord.getOriginatorSdpConnectIp());
|
||||||
|
|
||||||
|
// Both client and server addresses in the data are valid.
|
||||||
|
if (cond1 && (
|
||||||
|
// The address in the SIP one-way stream is valid and not an internal network address.
|
||||||
|
cond2 && cond3 && cond4 && cond5
|
||||||
|
// The coordinating addresses in the SIP double directional stream are valid
|
||||||
|
// and not internal network addresses.
|
||||||
|
|| cond5 && cond6
|
||||||
|
|| !cond5)) {
|
||||||
|
out.collect(obj);
|
||||||
|
} else {
|
||||||
|
// Output invalid records to the invalid output tag
|
||||||
|
ctx.output(ErrorHandler.INVALID_OUTPUT_TAG, obj);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Meaningless record: {}", obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ======================================================================================
|
||||||
|
// ----------------------------------- private helper -----------------------------------
|
||||||
|
|
||||||
|
private static boolean isIPAddress(final String ipaddr) {
|
||||||
|
if (null == ipaddr) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return IPUtil.isIPAddress(ipaddr);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isInternalIp(final String ipaddr) {
|
||||||
|
if (!isIPAddress(ipaddr)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return IPUtil.internalIp(ipaddr);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package com.zdjizhi.flink.voip.formats;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.serialization.SerializationSchema;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
|
||||||
|
|
||||||
|
private final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] serialize(ObjectNode jsonNodes) {
|
||||||
|
try {
|
||||||
|
return mapper.writeValueAsBytes(jsonNodes);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
54
src/main/java/com/zdjizhi/flink/voip/functions/Address.java
Normal file
54
src/main/java/com/zdjizhi/flink/voip/functions/Address.java
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.zdjizhi.utils.IPUtil;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A pojo class representing an address with two IP and port pairs.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class Address {
|
||||||
|
|
||||||
|
// The first IP address.
|
||||||
|
private final String ip1;
|
||||||
|
|
||||||
|
//The first port number.
|
||||||
|
private final int port1;
|
||||||
|
|
||||||
|
//The second IP address.
|
||||||
|
private final String ip2;
|
||||||
|
|
||||||
|
//The second port number.
|
||||||
|
private final int port2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
|
||||||
|
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
|
||||||
|
* the numeric value of the IP address.
|
||||||
|
*
|
||||||
|
* @param a1 The first address information as a tuple (IP address, port).
|
||||||
|
* @param a2 The second address information as a tuple (IP address, port).
|
||||||
|
* @return An Address instance with addresses sorted and reordered.
|
||||||
|
*/
|
||||||
|
public static Address of(Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
|
||||||
|
List<Tuple2<String, Integer>> list = Lists.newArrayList(a1, a2);
|
||||||
|
list.sort((a, b) -> {
|
||||||
|
if (a.f1.equals(b.f1)) {
|
||||||
|
return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
|
||||||
|
} else {
|
||||||
|
return a.f1.compareTo(b.f1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.ExecutionConfig;
|
||||||
|
import org.apache.flink.api.common.functions.RichFunction;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.TimerService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An interface that provides utility functions for Flink functions.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public interface FunctionHelper extends RichFunction {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the global configuration for the current Flink job.
|
||||||
|
*
|
||||||
|
* @return The global configuration as a Configuration object.
|
||||||
|
*/
|
||||||
|
|
||||||
|
default Configuration getGlobalConfiguration() {
|
||||||
|
final ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
|
||||||
|
.getExecutionConfig().getGlobalJobParameters();
|
||||||
|
return (Configuration) globalJobParameters;
|
||||||
|
}
|
||||||
|
|
||||||
|
default void registerNextFireTimestamp(TimerService timerService, long interval) {
|
||||||
|
long current = timerService.currentProcessingTime();
|
||||||
|
timerService.registerProcessingTimeTimer(current + interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.Setter;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A case class representing an ObjectNode with an expiration time and a pair times.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
class ObjectNodeInfo {
|
||||||
|
|
||||||
|
// The ObjectNode containing data.
|
||||||
|
private ObjectNode obj;
|
||||||
|
|
||||||
|
// The pair times for the object.
|
||||||
|
@Setter
|
||||||
|
private int times;
|
||||||
|
|
||||||
|
public void incTimes() {
|
||||||
|
this.times = this.times + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A KeySelector implementation for extracting a composite key from an ObjectNode representing a SIP record.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode.
|
||||||
|
*
|
||||||
|
* @param obj The ObjectNode representing a SIP record.
|
||||||
|
* @return A Tuple3 containing the extracted key (VSysID, CallID, Address).
|
||||||
|
* @throws Exception Thrown if an error occurs during key extraction.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception {
|
||||||
|
final SIPRecord record = new SIPRecord(obj);
|
||||||
|
final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()),
|
||||||
|
Tuple2.of(record.getServerIp(), record.getServerPort()));
|
||||||
|
return Tuple3.of(record.getVSysID(), record.getCallID(), address);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,82 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||||
|
import org.apache.flink.api.common.state.StateTtlConfig;
|
||||||
|
import org.apache.flink.api.common.state.ValueState;
|
||||||
|
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||||
|
import org.apache.flink.api.common.time.Time;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
||||||
|
* SIP records are paired when they have the same addresses but opposite stream directions.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
|
||||||
|
implements FunctionHelper {
|
||||||
|
|
||||||
|
private transient Time fireInterval;
|
||||||
|
|
||||||
|
private transient ValueState<ObjectNode> valueState;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
|
||||||
|
fireInterval = Time.minutes(minutes);
|
||||||
|
final ValueStateDescriptor<ObjectNode> descriptor =
|
||||||
|
new ValueStateDescriptor<>("sip-state", ObjectNode.class);
|
||||||
|
|
||||||
|
final StateTtlConfig ttlConfig = StateTtlConfig
|
||||||
|
.newBuilder(fireInterval)
|
||||||
|
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
|
||||||
|
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
|
||||||
|
.useProcessingTime()
|
||||||
|
.cleanupFullSnapshot()
|
||||||
|
.build();
|
||||||
|
descriptor.enableTimeToLive(ttlConfig);
|
||||||
|
valueState = getRuntimeContext().getState(descriptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processElement(ObjectNode value,
|
||||||
|
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
|
||||||
|
final Record record = new Record(value);
|
||||||
|
// When SIP is a one-way stream.
|
||||||
|
if (StreamDir.DOUBLE != record.getStreamDir()) {
|
||||||
|
// If the address is already stored in the mapState and has the opposite stream direction,
|
||||||
|
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
|
||||||
|
final ObjectNode obj = valueState.value();
|
||||||
|
if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) {
|
||||||
|
record.merge(obj)
|
||||||
|
.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
|
||||||
|
out.collect(value);
|
||||||
|
valueState.clear();
|
||||||
|
} else {
|
||||||
|
// If the address is not yet in the mapState.
|
||||||
|
valueState.update(value);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If SIP is a double stream, pairing isn't required, directly output the record.
|
||||||
|
out.collect(value);
|
||||||
|
}
|
||||||
|
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTimer(long timestamp,
|
||||||
|
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
valueState.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,45 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.apache.flink.util.OutputTag;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
|
||||||
|
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class TypeSplitFunction extends ProcessFunction<ObjectNode, ObjectNode> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OutputTag for SIP records.
|
||||||
|
*/
|
||||||
|
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
|
||||||
|
new OutputTag<>("schema-type-sip", TypeInformation.of(ObjectNode.class));
|
||||||
|
/**
|
||||||
|
* OutputTag for RTP records.
|
||||||
|
*/
|
||||||
|
public static final OutputTag<ObjectNode> RTP_OUTPUT_TAG =
|
||||||
|
new OutputTag<>("schema-type-rtp", TypeInformation.of(ObjectNode.class));
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processElement(ObjectNode obj,
|
||||||
|
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
switch (record.getSchemaType()) {
|
||||||
|
case RTP:
|
||||||
|
ctx.output(RTP_OUTPUT_TAG, obj);
|
||||||
|
break;
|
||||||
|
case SIP:
|
||||||
|
ctx.output(SIP_OUTPUT_TAG, obj);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||||
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A KeySelector implementation that extracts the key(VSysID) from an ObjectNode.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts the composite key (VSysID, Address) from the given ObjectNode.
|
||||||
|
*
|
||||||
|
* @param obj The ObjectNode representing a SIP record.
|
||||||
|
* @return A Tuple2 containing the extracted key (VSysID, Address).
|
||||||
|
* @throws Exception Thrown if an error occurs during key extraction.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception {
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
final Address address;
|
||||||
|
if (record.getSchemaType() == SchemaType.SIP) {
|
||||||
|
final SIPRecord sipRecord = new SIPRecord(obj);
|
||||||
|
address = Address.of(
|
||||||
|
Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()),
|
||||||
|
Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort()));
|
||||||
|
} else {
|
||||||
|
address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
|
||||||
|
Tuple2.of(record.getClientIp(), record.getClientPort()));
|
||||||
|
}
|
||||||
|
return Tuple2.of(record.getVSysID(), address);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,157 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||||
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||||
|
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||||
|
import org.apache.flink.api.common.state.*;
|
||||||
|
import org.apache.flink.api.common.time.Time;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
|
||||||
|
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
|
||||||
|
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
|
||||||
|
* RTP records, and it uses timers to trigger regular clearing of the state.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>
|
||||||
|
implements FunctionHelper {
|
||||||
|
|
||||||
|
private static final int MAX_RTP_LINES = 2;
|
||||||
|
private transient Time fireInterval;
|
||||||
|
private transient ValueState<ObjectNodeInfo> sipState;
|
||||||
|
private transient MapState<StreamDir, ObjectNode> rtpState;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
|
||||||
|
fireInterval = Time.minutes(minutes);
|
||||||
|
final RuntimeContext context = getRuntimeContext();
|
||||||
|
final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
|
||||||
|
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
|
||||||
|
|
||||||
|
final StateTtlConfig ttlConfig = StateTtlConfig
|
||||||
|
.newBuilder(fireInterval)
|
||||||
|
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
|
||||||
|
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
|
||||||
|
.useProcessingTime()
|
||||||
|
.cleanupFullSnapshot()
|
||||||
|
.build();
|
||||||
|
sipDescriptor.enableTimeToLive(ttlConfig);
|
||||||
|
sipState = context.getState(sipDescriptor);
|
||||||
|
|
||||||
|
final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor =
|
||||||
|
new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class);
|
||||||
|
|
||||||
|
final StateTtlConfig rtpTtlConfig = StateTtlConfig
|
||||||
|
.newBuilder(Time.minutes(minutes + 1))
|
||||||
|
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
|
||||||
|
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
|
||||||
|
.useProcessingTime()
|
||||||
|
.cleanupFullSnapshot()
|
||||||
|
.build();
|
||||||
|
rtpDescriptor.enableTimeToLive(rtpTtlConfig);
|
||||||
|
rtpState = context.getMapState(rtpDescriptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
// SIP
|
||||||
|
@Override
|
||||||
|
public void processElement2(ObjectNode sipObj,
|
||||||
|
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator();
|
||||||
|
if (rtpState.isEmpty()) {
|
||||||
|
sipState.update(new ObjectNodeInfo(sipObj, 0));
|
||||||
|
}
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
final Map.Entry<StreamDir, ObjectNode> entry = iterator.next();
|
||||||
|
final ObjectNode rtpObj = entry.getValue();
|
||||||
|
final Record rtpRecord = new Record(rtpObj);
|
||||||
|
|
||||||
|
rtpRecord.merge(sipObj)
|
||||||
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
|
out.collect(rtpObj);
|
||||||
|
|
||||||
|
switch (entry.getKey()) {
|
||||||
|
case S2C:
|
||||||
|
case C2S:
|
||||||
|
ObjectNodeInfo info = sipState.value();
|
||||||
|
if (info != null) {
|
||||||
|
info.incTimes();
|
||||||
|
if (info.getTimes() >= MAX_RTP_LINES) {
|
||||||
|
sipState.clear();
|
||||||
|
} else {
|
||||||
|
sipState.update(new ObjectNodeInfo(sipObj, info.getTimes()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sipState.update(new ObjectNodeInfo(sipObj, 1));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Double directional:
|
||||||
|
// In the context of VoIP fusion, only one RTP double directional stream
|
||||||
|
sipState.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
// RTP
|
||||||
|
@Override
|
||||||
|
public void processElement1(ObjectNode rtpObj,
|
||||||
|
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
final Record rtpRecord = new Record(rtpObj);
|
||||||
|
final ObjectNodeInfo info = sipState.value();
|
||||||
|
|
||||||
|
final StreamDir streamDir = rtpRecord.getStreamDir();
|
||||||
|
if (null != info) {
|
||||||
|
|
||||||
|
rtpRecord.merge(info.getObj())
|
||||||
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
|
out.collect(rtpObj);
|
||||||
|
|
||||||
|
switch (streamDir) {
|
||||||
|
case C2S:
|
||||||
|
case S2C:
|
||||||
|
info.incTimes();
|
||||||
|
if (info.getTimes() >= MAX_RTP_LINES) {
|
||||||
|
sipState.clear();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Double
|
||||||
|
sipState.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
rtpState.put(streamDir, rtpObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTimer(long timestamp,
|
||||||
|
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||||
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
for (ObjectNode obj : rtpState.values()) {
|
||||||
|
out.collect(obj);
|
||||||
|
}
|
||||||
|
rtpState.clear();
|
||||||
|
sipState.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
195
src/main/java/com/zdjizhi/flink/voip/records/Record.java
Normal file
195
src/main/java/com/zdjizhi/flink/voip/records/Record.java
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record class represents a data record with various fields.
|
||||||
|
* <p>
|
||||||
|
* It provides getter and setter methods for accessing the fields of the data record.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class Record {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的所属 vsys
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的字段类型
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的流类型
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的服务端地址
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_SERVER_IP = "common_server_ip";
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的服务端端口
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_SERVER_PORT = "common_server_port";
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的客户端地址
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
|
||||||
|
/**
|
||||||
|
* 字段名:数据记录中的客户端端口
|
||||||
|
*/
|
||||||
|
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ObjectNode data.
|
||||||
|
*/
|
||||||
|
protected final ObjectNode obj;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the VSys ID from the data record.
|
||||||
|
*
|
||||||
|
* @return The VSys ID as an integer.
|
||||||
|
*/
|
||||||
|
public int getVSysID() {
|
||||||
|
return Record.getInt(obj, F_COMMON_VSYS_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the schema type from the data record.
|
||||||
|
*
|
||||||
|
* @return The schema type.
|
||||||
|
*/
|
||||||
|
public final SchemaType getSchemaType() {
|
||||||
|
return SchemaType.of(Record.getString(obj, F_COMMON_SCHEMA_TYPE));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the stream direction from the data record.
|
||||||
|
*
|
||||||
|
* @return The stream direction.
|
||||||
|
*/
|
||||||
|
public final StreamDir getStreamDir() {
|
||||||
|
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the server IP address from the data record.
|
||||||
|
*
|
||||||
|
* @return The server IP address as a string.
|
||||||
|
*/
|
||||||
|
public final String getServerIp() {
|
||||||
|
return Record.getString(obj, F_COMMON_SERVER_IP);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the server port from the data record.
|
||||||
|
*
|
||||||
|
* @return The server port as an integer.
|
||||||
|
*/
|
||||||
|
public final int getServerPort() {
|
||||||
|
return Record.getInt(obj, F_COMMON_SERVER_PORT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the client IP address from the data record.
|
||||||
|
*
|
||||||
|
* @return The client IP address as a string.
|
||||||
|
*/
|
||||||
|
public final String getClientIp() {
|
||||||
|
return Record.getString(obj, F_COMMON_CLIENT_IP);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the client port from the data record.
|
||||||
|
*
|
||||||
|
* @return The client port as an integer.
|
||||||
|
*/
|
||||||
|
public final int getClientPort() {
|
||||||
|
return Record.getInt(obj, F_COMMON_CLIENT_PORT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set an integer value to the specified field in the data record.
|
||||||
|
*
|
||||||
|
* @param name The name of the field.
|
||||||
|
* @param value The integer value to set.
|
||||||
|
*/
|
||||||
|
public final void setInt(final String name, final int value) {
|
||||||
|
obj.set(name, IntNode.valueOf(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a string value to the specified field in the data record.
|
||||||
|
*
|
||||||
|
* @param name The name of the field.
|
||||||
|
* @param value The string value to set.
|
||||||
|
*/
|
||||||
|
public final void setString(final String name, final String value) {
|
||||||
|
obj.set(name, TextNode.valueOf(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge the fields of another ObjectNode into the current data record.
|
||||||
|
*
|
||||||
|
* @param other The ObjectNode containing the fields to be merged.
|
||||||
|
* @return This record.
|
||||||
|
*/
|
||||||
|
public final Record merge(final ObjectNode other) {
|
||||||
|
other.fields().forEachRemaining(entry -> obj.set(entry.getKey(), entry.getValue()));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an integer value from the specified field in the ObjectNode.
|
||||||
|
*
|
||||||
|
* @param obj The ObjectNode to get the value from.
|
||||||
|
* @param field The name of the field.
|
||||||
|
* @param defaultValue The default value to return if the field is not found or is not an integer.
|
||||||
|
* @return The integer value from the field or the default value if the field is not found or is not an integer.
|
||||||
|
*/
|
||||||
|
public static int getInt(final ObjectNode obj, final String field, final int defaultValue) {
|
||||||
|
final JsonNode node = obj.get(field);
|
||||||
|
return node != null && node.isInt() ? node.asInt(defaultValue) : defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an integer value from the specified field in the ObjectNode.
|
||||||
|
*
|
||||||
|
* @param obj The ObjectNode to get the value from.
|
||||||
|
* @param field The name of the field.
|
||||||
|
* @return The integer value from the field or 0 if the field is not found or is not an integer.
|
||||||
|
*/
|
||||||
|
public static int getInt(final ObjectNode obj, final String field) {
|
||||||
|
return getInt(obj, field, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a string value from the specified field in the ObjectNode.
|
||||||
|
*
|
||||||
|
* @param obj The ObjectNode to get the value from.
|
||||||
|
* @param field The name of the field.
|
||||||
|
* @param defaultValue The default value to return if the field is not found or is not a string.
|
||||||
|
* @return The string value from the field or the default value if the field is not found or is not a string.
|
||||||
|
*/
|
||||||
|
public static String getString(final ObjectNode obj, final String field, final String defaultValue) {
|
||||||
|
final JsonNode node = obj.get(field);
|
||||||
|
return node != null && node.isTextual() ? node.asText(defaultValue) : defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a string value from the specified field in the ObjectNode.
|
||||||
|
*
|
||||||
|
* @param obj The ObjectNode to get the value from.
|
||||||
|
* @param field The name of the field.
|
||||||
|
* @return The string value from the field or null if the field is not found or is not a string.
|
||||||
|
*/
|
||||||
|
public static String getString(final ObjectNode obj, final String field) {
|
||||||
|
return getString(obj, field, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
57
src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java
Normal file
57
src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SIP(Session Initiation Protocol)data record class, used to parse and access SIP data records.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class SIPRecord extends Record {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Field Name: SIP 通话的会话 ID
|
||||||
|
*/
|
||||||
|
public static final String F_CALL_ID = "sip_call_id";
|
||||||
|
/**
|
||||||
|
* Field Name: SIP 通话的协调的主叫语音传输 IP
|
||||||
|
*/
|
||||||
|
public static final String F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip";
|
||||||
|
/**
|
||||||
|
* Field Name: SIP 通话的协调的主叫语音传输端口
|
||||||
|
*/
|
||||||
|
public static final String F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port";
|
||||||
|
/**
|
||||||
|
* Field Name: SIP 通话的协调的被叫语音传输 IP
|
||||||
|
*/
|
||||||
|
public static final String F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip";
|
||||||
|
/**
|
||||||
|
* Field Name: SIP 通话的协调的被叫语音传输端口
|
||||||
|
*/
|
||||||
|
public static final String F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port";
|
||||||
|
|
||||||
|
public SIPRecord(final ObjectNode obj) {
|
||||||
|
super(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCallID() {
|
||||||
|
return Record.getString(obj, F_CALL_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOriginatorSdpConnectIp() {
|
||||||
|
return Record.getString(obj, F_ORIGINATOR_SDP_CONNECT_IP);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getOriginatorSdpMediaPort() {
|
||||||
|
return Record.getInt(obj, F_ORIGINATOR_SDP_MEDIA_PORT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getResponderSdpConnectIp() {
|
||||||
|
return Record.getString(obj, F_RESPONDER_SDP_CONNECT_IP);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getResponderSdpMediaPort() {
|
||||||
|
return Record.getInt(obj, F_RESPONDER_SDP_MEDIA_PORT);
|
||||||
|
}
|
||||||
|
}
|
||||||
51
src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java
Normal file
51
src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The SchemaType enum represents different types of data schemas.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Getter
|
||||||
|
public enum SchemaType {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the SIP schema type.
|
||||||
|
*/
|
||||||
|
SIP("SIP"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the RTP schema type.
|
||||||
|
*/
|
||||||
|
RTP("RTP"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the VoIP schema type.
|
||||||
|
*/
|
||||||
|
VOIP("VoIP");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The string value of the SchemaType.
|
||||||
|
*/
|
||||||
|
private final String value;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the SchemaType enum based on the provided string value.
|
||||||
|
*
|
||||||
|
* @param value The string value of the SchemaType to retrieve.
|
||||||
|
* @return The corresponding SchemaType enum.
|
||||||
|
* @throws IllegalArgumentException if the provided value does not match any known SchemaType.
|
||||||
|
*/
|
||||||
|
public static SchemaType of(final String value) {
|
||||||
|
for (SchemaType schemaType : values()) {
|
||||||
|
if (schemaType.value.equalsIgnoreCase(value)) {
|
||||||
|
return schemaType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("Unknown SchemaType value '" + value + "'.");
|
||||||
|
}
|
||||||
|
}
|
||||||
51
src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
Normal file
51
src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The StreamDir enum represents different types of data stream directions.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Getter
|
||||||
|
public enum StreamDir {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the Client-to-Server (C2S) stream direction.
|
||||||
|
*/
|
||||||
|
C2S(1),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the Server-to-Client (S2C) stream direction.
|
||||||
|
*/
|
||||||
|
S2C(2),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the bidirectional (double) stream direction.
|
||||||
|
*/
|
||||||
|
DOUBLE(3);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The integer value of the StreamDir.
|
||||||
|
*/
|
||||||
|
private final int value;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the StreamDir enum based on the provided integer value.
|
||||||
|
*
|
||||||
|
* @param value The integer value of the StreamDir to retrieve.
|
||||||
|
* @return The corresponding StreamDir enum.
|
||||||
|
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
|
||||||
|
*/
|
||||||
|
public static StreamDir of(int value) {
|
||||||
|
for (StreamDir streamDir : values()) {
|
||||||
|
if (value == streamDir.value) {
|
||||||
|
return streamDir;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
|
||||||
|
}
|
||||||
|
}
|
||||||
7
src/main/resources/application.properties
Normal file
7
src/main/resources/application.properties
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
sink.kafka.topic=VOIP-CONVERSATION-RECORD
|
||||||
|
sink.kafka.props.bootstrap.servers=localhost:9292
|
||||||
|
|
||||||
|
|
||||||
|
source.kafka.topic=VOIP-RECORD
|
||||||
|
source.kafka.props.bootstrap.servers=localhost:9292
|
||||||
|
source.kafka.props.group.id=flink-voip-fusion
|
||||||
25
src/main/resources/log4j2.properties
Normal file
25
src/main/resources/log4j2.properties
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
################################################################################
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
rootLogger.level = WARN
|
||||||
|
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||||
|
|
||||||
|
appender.console.name = ConsoleAppender
|
||||||
|
appender.console.type = CONSOLE
|
||||||
|
appender.console.layout.type = PatternLayout
|
||||||
|
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
package com.zdjizhi.flink.voip.conf;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class FusionConfigurationTest {
|
||||||
|
|
||||||
|
private FusionConfiguration fusionConfiguration;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() {
|
||||||
|
final Configuration config;
|
||||||
|
config = new Configuration();
|
||||||
|
config.setString("prefix_key1", "value1");
|
||||||
|
config.setString("prefix_key2", "value2");
|
||||||
|
config.setString("other_key", "other_value");
|
||||||
|
|
||||||
|
fusionConfiguration = new FusionConfiguration(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPropertiesWithValidPrefix() {
|
||||||
|
String prefix = "prefix_";
|
||||||
|
Properties properties = fusionConfiguration.getProperties(prefix);
|
||||||
|
|
||||||
|
assertEquals(2, properties.size());
|
||||||
|
assertEquals("value1", properties.getProperty("key1"));
|
||||||
|
assertEquals("value2", properties.getProperty("key2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPropertiesWithInvalidPrefix() {
|
||||||
|
String prefix = "invalid_";
|
||||||
|
Properties properties = fusionConfiguration.getProperties(prefix);
|
||||||
|
|
||||||
|
assertTrue(properties.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPropertiesWithEmptyPrefix() {
|
||||||
|
String prefix = "";
|
||||||
|
Properties properties = fusionConfiguration.getProperties(prefix);
|
||||||
|
|
||||||
|
assertEquals(3, properties.size());
|
||||||
|
assertEquals("value1", properties.getProperty("prefix_key1"));
|
||||||
|
assertEquals("value2", properties.getProperty("prefix_key2"));
|
||||||
|
assertEquals("other_value", properties.getProperty("other_key"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPropertiesWithNullPrefix() {
|
||||||
|
// Null prefix should be treated as an empty prefix
|
||||||
|
String prefix = null;
|
||||||
|
Properties properties = fusionConfiguration.getProperties(prefix);
|
||||||
|
|
||||||
|
assertEquals(3, properties.size());
|
||||||
|
assertEquals("value1", properties.getProperty("prefix_key1"));
|
||||||
|
assertEquals("value2", properties.getProperty("prefix_key2"));
|
||||||
|
assertEquals("other_value", properties.getProperty("other_key"));
|
||||||
|
}
|
||||||
|
}
|
||||||
98
src/test/java/com/zdjizhi/flink/voip/data/Generator.java
Normal file
98
src/test/java/com/zdjizhi/flink/voip/data/Generator.java
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package com.zdjizhi.flink.voip.data;
|
||||||
|
|
||||||
|
import com.github.javafaker.Faker;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The abstract class Generator<T> serves as the base class for implementing record generators.
|
||||||
|
* It provides common functionalities for generating random data and controlling the generation ratio.
|
||||||
|
*
|
||||||
|
* @param <T> The type of records to be generated.
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public abstract class Generator<T> {
|
||||||
|
|
||||||
|
protected final ThreadLocalRandom random;
|
||||||
|
/**
|
||||||
|
* The ratio of generated records. The total number of records generated will be 100 / ratio.
|
||||||
|
*/
|
||||||
|
protected final int ratio;
|
||||||
|
|
||||||
|
private final Faker faker;
|
||||||
|
|
||||||
|
protected final AtomicReference<T> state;
|
||||||
|
|
||||||
|
protected final ObjectMapper mapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new Generator with the given ratio.
|
||||||
|
*
|
||||||
|
* @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio.
|
||||||
|
*/
|
||||||
|
public Generator(final int ratio) {
|
||||||
|
this.ratio = ratio;
|
||||||
|
this.faker = new Faker();
|
||||||
|
this.random = ThreadLocalRandom.current();
|
||||||
|
this.state = new AtomicReference<>();
|
||||||
|
this.mapper = new ObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates the next record based on the specified ratio and the current state.
|
||||||
|
* It randomly selects whether to generate a new record or return the last generated record.
|
||||||
|
*
|
||||||
|
* @return The next generated record of type T.
|
||||||
|
*/
|
||||||
|
public abstract T next();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a new record of type T.
|
||||||
|
*
|
||||||
|
* @return The newly generated record of type T.
|
||||||
|
*/
|
||||||
|
protected abstract T generate();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs post-processing on the generated record of type T.
|
||||||
|
* Subclasses can override this method to modify the generated record before returning it.
|
||||||
|
*
|
||||||
|
* @param v The generated record of type T.
|
||||||
|
* @return The post-processed record of type T.
|
||||||
|
*/
|
||||||
|
protected abstract T afterState(T v);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a random IP address (either IPv4 or IPv6) .
|
||||||
|
*
|
||||||
|
* @return A randomly generated IP address as a string.
|
||||||
|
*/
|
||||||
|
public final String nextIp() {
|
||||||
|
if (random.nextBoolean()) {
|
||||||
|
return faker.internet().ipV4Address();
|
||||||
|
}
|
||||||
|
return faker.internet().ipV6Address();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a random ID number.
|
||||||
|
*
|
||||||
|
* @return A randomly generated ID number as a string.
|
||||||
|
*/
|
||||||
|
public final String nextId() {
|
||||||
|
return faker.idNumber().valid();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a random port number within the range of 0 to 65535 (inclusive).
|
||||||
|
*
|
||||||
|
* @return A randomly generated port number as an integer.
|
||||||
|
*/
|
||||||
|
public final int nextPort() {
|
||||||
|
return random.nextInt(65535);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
111
src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java
Normal file
111
src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
package com.zdjizhi.flink.voip.data;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||||
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RTPGenerator extends Generator<ObjectNode> and is responsible for generating RTP records.
|
||||||
|
* It generates random RTP records with specific properties.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class RTPGenerator extends Generator<ObjectNode> {
|
||||||
|
private final SIPGenerator sipGenerator;
|
||||||
|
|
||||||
|
public RTPGenerator(int ratio, SIPGenerator sipGenerator) {
|
||||||
|
super(ratio);
|
||||||
|
this.sipGenerator = sipGenerator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectNode next() {
|
||||||
|
int i = random.nextInt(100);
|
||||||
|
if (i < ratio) {
|
||||||
|
final ObjectNode node = sipGenerator.state.get();
|
||||||
|
if (null != node) {
|
||||||
|
final ObjectNode obj = generate();
|
||||||
|
obj.set(Record.F_COMMON_CLIENT_IP, node.get(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP));
|
||||||
|
obj.set(Record.F_COMMON_CLIENT_PORT, node.get(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT));
|
||||||
|
obj.set(Record.F_COMMON_SERVER_IP, node.get(SIPRecord.F_RESPONDER_SDP_CONNECT_IP));
|
||||||
|
obj.set(Record.F_COMMON_SERVER_PORT, node.get(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT));
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return generate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ObjectNode generate() {
|
||||||
|
final String json = "{\n" +
|
||||||
|
" \"common_address_list\": \"42924-36682-172.17.201.16-172.17.200.50\",\n" +
|
||||||
|
" \"common_address_type\": 4,\n" +
|
||||||
|
" \"common_app_full_path\": \"rtp\",\n" +
|
||||||
|
" \"common_app_label\": \"rtp\",\n" +
|
||||||
|
" \"common_c2s_byte_num\": 0,\n" +
|
||||||
|
" \"common_c2s_pkt_num\": 0,\n" +
|
||||||
|
" \"common_client_ip\": \"172.17.201.16\",\n" +
|
||||||
|
" \"common_client_port\": 42924,\n" +
|
||||||
|
" \"common_con_duration_ms\": 1086,\n" +
|
||||||
|
" \"common_device_id\": \"unknown\",\n" +
|
||||||
|
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
|
||||||
|
" \"common_direction\": 73,\n" +
|
||||||
|
" \"common_end_time\": 1689295970,\n" +
|
||||||
|
" \"common_flags\": 16401,\n" +
|
||||||
|
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":5,\\\"Server is Local\\\":1,\\\"S2C\\\":1}\",\n" +
|
||||||
|
" \"common_l4_protocol\": \"IPv4_UDP\",\n" +
|
||||||
|
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
|
||||||
|
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
|
||||||
|
" \"common_protocol_label\": \"ETHERNET.IPv4.UDP\",\n" +
|
||||||
|
" \"common_s2c_byte_num\": 7570,\n" +
|
||||||
|
" \"common_s2c_pkt_num\": 5,\n" +
|
||||||
|
" \"common_schema_type\": \"RTP\",\n" +
|
||||||
|
" \"common_server_ip\": \"172.17.200.50\",\n" +
|
||||||
|
" \"common_server_port\": 36682,\n" +
|
||||||
|
" \"common_sessions\": 1,\n" +
|
||||||
|
" \"common_sled_ip\": \"192.168.42.54\",\n" +
|
||||||
|
" \"common_start_time\": 1689294629,\n" +
|
||||||
|
" \"common_stream_dir\": 3,\n" +
|
||||||
|
" \"common_stream_trace_id\": \"290484792956466709\",\n" +
|
||||||
|
" \"common_t_vsys_id\": 24,\n" +
|
||||||
|
" \"common_vsys_id\": 24,\n" +
|
||||||
|
" \"raw_log_status\": \"CLOSE\",\n" +
|
||||||
|
" \"rtp_payload_type_c2s\": 0,\n" +
|
||||||
|
" \"rtp_payload_type_s2c\": 0,\n" +
|
||||||
|
" \"rtp_pcap_path\": \"http://192.168.44.67:9098/hos/rtp_hos_bucket/rtp_172.17.200.50_172.17.201.16_27988_55806_1689294629.pcap\"\n" +
|
||||||
|
"}";
|
||||||
|
|
||||||
|
ObjectNode obj;
|
||||||
|
try {
|
||||||
|
obj = mapper.readValue(json, ObjectNode.class);
|
||||||
|
} catch (Exception e){
|
||||||
|
obj = mapper.createObjectNode();
|
||||||
|
}
|
||||||
|
final SIPRecord record = new SIPRecord(obj);
|
||||||
|
|
||||||
|
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
||||||
|
record.setString(SIPRecord.F_CALL_ID, nextId());
|
||||||
|
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, random.nextInt(3) + 1);
|
||||||
|
|
||||||
|
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
|
||||||
|
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
|
||||||
|
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
|
||||||
|
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ObjectNode afterState(ObjectNode v) {
|
||||||
|
final Record record = new Record(v);
|
||||||
|
switch (record.getStreamDir()) {
|
||||||
|
case DOUBLE:
|
||||||
|
record.setInt(Record.F_COMMON_STREAM_DIR, random.nextInt(2) + 1);
|
||||||
|
break;
|
||||||
|
case S2C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
}
|
||||||
138
src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java
Normal file
138
src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
package com.zdjizhi.flink.voip.data;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||||
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||||
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records.
|
||||||
|
* It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port,
|
||||||
|
* client IP, client port... information for both originator and responder.
|
||||||
|
*
|
||||||
|
* @author chaoc
|
||||||
|
* @since 1.0
|
||||||
|
*/
|
||||||
|
public class SIPGenerator extends Generator<ObjectNode> {
|
||||||
|
public SIPGenerator(final int ratio) {
|
||||||
|
super(ratio);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new SIPGenerator with the default ratio of 40.
|
||||||
|
*/
|
||||||
|
public SIPGenerator() {
|
||||||
|
this(40);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final ObjectNode next() {
|
||||||
|
int i = random.nextInt(100);
|
||||||
|
if (i < ratio && state.get() != null) {
|
||||||
|
ObjectNode t = afterState(state.get());
|
||||||
|
state.set(null);
|
||||||
|
return t;
|
||||||
|
} else {
|
||||||
|
return state.updateAndGet(t -> generate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ObjectNode generate() {
|
||||||
|
final String json = "{\n" +
|
||||||
|
" \"common_schema_type\": \"SIP\",\n" +
|
||||||
|
" \"common_sessions\": 1,\n" +
|
||||||
|
" \"sip_call_id\": \"3257b5c0f3d0266@spirent.com\",\n" +
|
||||||
|
" \"sip_originator_description\": \"<sip:caller@spirent.com>\",\n" +
|
||||||
|
" \"sip_responder_description\": \"sip:callee@spirent.com\",\n" +
|
||||||
|
" \"sip_originator_sdp_content\": \"v=0\\r\\no=SIP-UA 3395000 3397200 IN IP4 172.17.200.50\\r\\ns=SIP Call\\r\\nc=IN IP4 172.17.200.50\\r\\nt=0 0\\r\\nm=audio 36682 RTP/AVP 0\\r\\na=rtpmap:0 PCMU/8000\\r\\n\",\n" +
|
||||||
|
" \"sip_originator_sdp_connect_ip\": \"172.17.200.50\",\n" +
|
||||||
|
" \"sip_originator_sdp_media_port\": 36682,\n" +
|
||||||
|
" \"sip_originator_sdp_media_type\": \"0 PCMU/8000\",\n" +
|
||||||
|
" \"common_first_ttl\": 128,\n" +
|
||||||
|
" \"common_c2s_ipfrag_num\": 0,\n" +
|
||||||
|
" \"common_s2c_ipfrag_num\": 0,\n" +
|
||||||
|
" \"common_c2s_tcp_unorder_num\": 0,\n" +
|
||||||
|
" \"common_s2c_tcp_unorder_num\": 0,\n" +
|
||||||
|
" \"common_c2s_tcp_lostlen\": 0,\n" +
|
||||||
|
" \"common_s2c_tcp_lostlen\": 0,\n" +
|
||||||
|
" \"common_c2s_pkt_retrans\": 2,\n" +
|
||||||
|
" \"common_s2c_pkt_retrans\": 0,\n" +
|
||||||
|
" \"common_c2s_byte_retrans\": 1100,\n" +
|
||||||
|
" \"common_s2c_byte_retrans\": 0,\n" +
|
||||||
|
" \"common_direction\": 69,\n" +
|
||||||
|
" \"common_app_full_path\": \"sip\",\n" +
|
||||||
|
" \"common_app_label\": \"sip\",\n" +
|
||||||
|
" \"common_tcp_client_isn\": 3004427198,\n" +
|
||||||
|
" \"common_server_ip\": \"172.17.201.16\",\n" +
|
||||||
|
" \"common_client_ip\": \"172.17.200.49\",\n" +
|
||||||
|
" \"common_server_port\": 5060,\n" +
|
||||||
|
" \"common_client_port\": 6948,\n" +
|
||||||
|
" \"common_stream_dir\": 1,\n" +
|
||||||
|
" \"common_address_type\": 4,\n" +
|
||||||
|
" \"common_address_list\": \"6948-5060-172.17.200.50-172.17.201.16\",\n" +
|
||||||
|
" \"common_start_time\": 1689295655,\n" +
|
||||||
|
" \"common_end_time\": 1689295670,\n" +
|
||||||
|
" \"common_con_duration_ms\": 41467,\n" +
|
||||||
|
" \"common_s2c_pkt_num\": 0,\n" +
|
||||||
|
" \"common_s2c_byte_num\": 0,\n" +
|
||||||
|
" \"common_c2s_pkt_num\": 6,\n" +
|
||||||
|
" \"common_c2s_byte_num\": 1834,\n" +
|
||||||
|
" \"common_establish_latency_ms\": 249,\n" +
|
||||||
|
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
|
||||||
|
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
|
||||||
|
" \"common_flags\": 8201,\n" +
|
||||||
|
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":6,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\n" +
|
||||||
|
" \"common_protocol_label\": \"ETHERNET.IPv4.TCP\",\n" +
|
||||||
|
" \"common_stream_trace_id\": \"290502385134123507\",\n" +
|
||||||
|
" \"common_l4_protocol\": \"IPv4_TCP\",\n" +
|
||||||
|
" \"common_sled_ip\": \"192.168.42.54\",\n" +
|
||||||
|
" \"common_device_id\": \"unknown\",\n" +
|
||||||
|
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
|
||||||
|
" \"common_t_vsys_id\": 24,\n" +
|
||||||
|
" \"common_vsys_id\": 24\n" +
|
||||||
|
"}";
|
||||||
|
|
||||||
|
ObjectNode obj;
|
||||||
|
try {
|
||||||
|
obj = mapper.readValue(json, ObjectNode.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
obj = mapper.createObjectNode();
|
||||||
|
}
|
||||||
|
|
||||||
|
final SIPRecord record = new SIPRecord(obj);
|
||||||
|
|
||||||
|
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue());
|
||||||
|
|
||||||
|
record.setString(SIPRecord.F_CALL_ID, nextId() + "@spirent.com");
|
||||||
|
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
|
||||||
|
|
||||||
|
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
|
||||||
|
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
|
||||||
|
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
|
||||||
|
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
|
||||||
|
|
||||||
|
record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
|
||||||
|
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
|
||||||
|
record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
|
||||||
|
record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort());
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ObjectNode afterState(ObjectNode v) {
|
||||||
|
final Record record = new Record(v);
|
||||||
|
switch (record.getStreamDir()) {
|
||||||
|
case C2S:
|
||||||
|
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
|
||||||
|
break;
|
||||||
|
case S2C:
|
||||||
|
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple2;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
public class AddressTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddressOfWithDifferentPortNumbers() {
|
||||||
|
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
|
||||||
|
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 9090);
|
||||||
|
|
||||||
|
Address address = Address.of(a1, a2);
|
||||||
|
|
||||||
|
assertEquals("192.168.1.10", address.getIp1());
|
||||||
|
assertEquals(8080, address.getPort1());
|
||||||
|
assertEquals("192.168.1.20", address.getIp2());
|
||||||
|
assertEquals(9090, address.getPort2());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddressOfWithSamePortNumber() {
|
||||||
|
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
|
||||||
|
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 8080);
|
||||||
|
|
||||||
|
Address address = Address.of(a1, a2);
|
||||||
|
|
||||||
|
assertEquals("192.168.1.10", address.getIp1());
|
||||||
|
assertEquals(8080, address.getPort1());
|
||||||
|
assertEquals("192.168.1.20", address.getIp2());
|
||||||
|
assertEquals(8080, address.getPort2());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddressOfWithInvertedOrder() {
|
||||||
|
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.20", 8080);
|
||||||
|
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.10", 9090);
|
||||||
|
|
||||||
|
Address address = Address.of(a1, a2);
|
||||||
|
|
||||||
|
assertEquals("192.168.1.20", address.getIp1());
|
||||||
|
assertEquals(8080, address.getPort1());
|
||||||
|
assertEquals("192.168.1.10", address.getIp2());
|
||||||
|
assertEquals(9090, address.getPort2());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,104 @@
|
|||||||
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
|
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||||
|
import com.zdjizhi.flink.voip.data.SIPGenerator;
|
||||||
|
import com.zdjizhi.flink.voip.records.Record;
|
||||||
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||||
|
import org.apache.flink.api.common.state.MapState;
|
||||||
|
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||||
|
import org.apache.flink.api.common.time.Time;
|
||||||
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||||
|
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||||
|
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
|
||||||
|
public class SIPPairingFunctionTest {
|
||||||
|
private static final int INTERVAL_MINUTES = 5;
|
||||||
|
private static KeyedOneInputStreamOperatorTestHarness<Integer, ObjectNode, ObjectNode> testHarness;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUp() throws Exception {
|
||||||
|
final SIPPairingFunction func = new SIPPairingFunction();
|
||||||
|
final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator =
|
||||||
|
new KeyedProcessOperator<>(func);
|
||||||
|
final TypeInformation<Integer> type = TypeInformation.of(Integer.class);
|
||||||
|
final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0;
|
||||||
|
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, type);
|
||||||
|
|
||||||
|
final Configuration configuration = new Configuration();
|
||||||
|
configuration.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, INTERVAL_MINUTES);
|
||||||
|
testHarness.getExecutionConfig().setGlobalJobParameters(configuration);
|
||||||
|
testHarness.open();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProperlyPairSIPRecords() throws Exception {
|
||||||
|
final SIPGenerator sipGenerator = new SIPGenerator();
|
||||||
|
long current = System.currentTimeMillis();
|
||||||
|
|
||||||
|
final ObjectNode obj = sipGenerator.next();
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
|
||||||
|
|
||||||
|
final StreamRecord<ObjectNode> streamRecord = new StreamRecord<>(obj, current);
|
||||||
|
|
||||||
|
testHarness.processElement(streamRecord);
|
||||||
|
assertTrue(testHarness.getOutput().contains(streamRecord));
|
||||||
|
|
||||||
|
long interval = Time.minutes(INTERVAL_MINUTES).toMilliseconds();
|
||||||
|
long nextFireTime = (testHarness.getProcessingTime() / interval + 1) * interval;
|
||||||
|
assertTrue(testHarness.getProcessingTimeService().getActiveTimerTimestamps().contains(nextFireTime));
|
||||||
|
|
||||||
|
final ObjectNode obj1 = obj.deepCopy();
|
||||||
|
final Record record1 = new Record(obj1);
|
||||||
|
record1.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
|
||||||
|
|
||||||
|
final ObjectNode obj2 = obj.deepCopy();
|
||||||
|
final Record record2 = new Record(obj2);
|
||||||
|
record2.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
|
||||||
|
|
||||||
|
final MapStateDescriptor<Address, ObjectNodeInfo> descriptor =
|
||||||
|
new MapStateDescriptor<>(
|
||||||
|
"sip-state",
|
||||||
|
TypeInformation.of(Address.class),
|
||||||
|
TypeInformation.of(ObjectNodeInfo.class)
|
||||||
|
);
|
||||||
|
|
||||||
|
testHarness.processElement(obj1, current);
|
||||||
|
final MapState<Address, ObjectNodeInfo> mapState = testHarness.getOperator()
|
||||||
|
.getKeyedStateStore().getMapState(descriptor);
|
||||||
|
final Set<ObjectNode> objectNodes1 = Lists.newArrayList(mapState.values()).stream()
|
||||||
|
.map(ObjectNodeInfo::getObj)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
assertTrue(objectNodes1.contains(obj1));
|
||||||
|
|
||||||
|
testHarness.processElement(obj2, current);
|
||||||
|
final Set<ObjectNode> objectNodes2 = Lists.newArrayList(mapState.values()).stream()
|
||||||
|
.map(ObjectNodeInfo::getObj)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
assertFalse(objectNodes2.contains(obj2));
|
||||||
|
assertTrue(testHarness.getOutput().contains(new StreamRecord<>(obj2, current)));
|
||||||
|
assertEquals(2, testHarness.getOutput().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void close() throws Exception {
|
||||||
|
testHarness.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
70
src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
Normal file
70
src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
public class RecordTest {
|
||||||
|
|
||||||
|
private static ObjectMapper mapper;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUp() {
|
||||||
|
mapper = new ObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetVSysID() {
|
||||||
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
record.setInt(Record.F_COMMON_VSYS_ID, 42);
|
||||||
|
assertEquals(42, record.getVSysID());
|
||||||
|
|
||||||
|
obj.set(Record.F_COMMON_VSYS_ID, IntNode.valueOf(40));
|
||||||
|
assertEquals(40, record.getVSysID());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetSchemaType() {
|
||||||
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
||||||
|
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
|
||||||
|
|
||||||
|
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
||||||
|
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSetInt() {
|
||||||
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
record.setInt("intField", 123);
|
||||||
|
assertEquals(123, obj.get("intField").intValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSetString() {
|
||||||
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
|
final Record record = new Record(obj);
|
||||||
|
record.setString("stringField", "testValue");
|
||||||
|
assertEquals("testValue", obj.get("stringField").textValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMerge() {
|
||||||
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
|
obj.set("field1", IntNode.valueOf(1));
|
||||||
|
ObjectNode other = mapper.createObjectNode();
|
||||||
|
other.set("field2", TextNode.valueOf("value2"));
|
||||||
|
Record record = new Record(obj);
|
||||||
|
record.merge(other);
|
||||||
|
assertEquals(1, obj.get("field1").intValue());
|
||||||
|
assertEquals("value2", obj.get("field2").textValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user