85 Commits

Author SHA1 Message Date
梁超
45891bc734 Merge branch 'feature/internal-ip-config' into 'main'
feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL'

See merge request galaxy/tsg_olap/sip-rtp-correlation!13
2023-10-13 06:43:38 +00:00
chaoc
19e0bce58f chore: update version 2023-10-13 14:43:08 +08:00
chaoc
10ce6cfa07 feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL' 2023-10-13 14:38:23 +08:00
梁超
9d5d99974b Merge branch 'release/1.0' into 'main'
release: 1.0

See merge request galaxy/tsg_olap/sip-rtp-correlation!12
2023-10-12 06:52:09 +00:00
梁超
970977ba3c Merge branch 'hotfix/output-sip' into 'release/1.0'
hotfix: add sip record output

See merge request galaxy/tsg_olap/sip-rtp-correlation!11
2023-10-12 02:44:32 +00:00
chaoc
732d9f5aa9 style: update version 2023-10-12 10:42:48 +08:00
chaoc
96fa19aea1 fix: use event time timer 2023-10-12 10:36:44 +08:00
chaoc
1b7c33d078 fix: output sip record 2023-10-12 10:36:29 +08:00
liang chao
b9463f07ac Merge branch 'release/1.0' into 'main'
merge: 1.0-rc3

See merge request galaxy/tsg_olap/sip-rtp-correlation!10
2023-08-28 03:30:06 +00:00
liang chao
35e2807a91 Merge branch 'hotfix/rename' into 'release/1.0'
style: rename job name

See merge request galaxy/tsg_olap/sip-rtp-correlation!9
2023-08-28 03:29:10 +00:00
chaoc
2275f349d1 Merge remote-tracking branch 'origin/release/1.0' into hotfix/rename 2023-08-28 11:27:23 +08:00
chaoc
1fedfbe4b8 style: add plugin reproducible 2023-08-28 11:26:25 +08:00
chaoc
b2f15b3919 style: modify job name 2023-08-28 11:11:02 +08:00
liang chao
8fc8cc7c2d Merge branch 'hotfix/null-point-err' into 'release/1.0'
style: update versions

See merge request galaxy/tsg_olap/sip-rtp-correlation!8
2023-08-21 09:09:25 +00:00
chaoc
98bb843159 style: update versions 2023-08-21 17:08:51 +08:00
liang chao
edb044596e Merge branch 'hotfix/null-point-err' into 'release/1.0'
docs: add hotfix changelog

See merge request galaxy/tsg_olap/sip-rtp-correlation!7
2023-08-21 09:06:49 +00:00
chaoc
557156af79 docs: add hotfix changelog 2023-08-21 17:06:22 +08:00
liang chao
32a811fb1c Merge branch 'hotfix/null-point-err' into 'release/1.0'
fix(utils): fix null point err

See merge request galaxy/tsg_olap/sip-rtp-correlation!6
2023-08-21 08:59:11 +00:00
chaoc
36cbaebf0c fix(utils): fix null point err 2023-08-21 16:57:13 +08:00
liang chao
da572f4bd0 Merge branch 'release/1.0' into 'main'
merge: 1.0.rc1

See merge request galaxy/tsg_olap/sip-rtp-correlation!5
2023-08-16 03:05:22 +00:00
chaoc
68e91c9ae5 style: update version 2023-08-16 10:45:46 +08:00
chaoc
69004f5c02 test: add data template 2023-08-15 09:53:50 +08:00
chaoc
9804b0cc52 doc: update readme for run cmd 2023-08-14 14:46:04 +08:00
chaoc
fa619d2595 doc: update readme 2023-08-14 11:42:59 +08:00
chaoc
a44d434952 doc: update readme for add config desc 2023-08-14 11:12:45 +08:00
chaoc
736bc40202 style: modify rtp clear interval default value 2023-08-14 11:12:01 +08:00
chaoc
6a2e5bacb9 perf: code optimization 2023-08-14 10:51:42 +08:00
chaoc
da69688270 style: update default parallelism 2023-08-14 10:50:41 +08:00
chaoc
a4e92c03e7 style: use enum type 2023-08-14 10:50:02 +08:00
chaoc
b207f8fb8e pref: support multi-parallel source 2023-08-14 10:49:35 +08:00
chaoc
5f16a98402 style: remove unused code 2023-08-14 10:48:15 +08:00
chaoc
43282c7163 style: use enum type 2023-08-14 10:47:51 +08:00
liang chao
77cdd73f02 Merge branch 'hotfix/no-collect-expire-data' into 'main'
fix: cannot collect data due to expiration

See merge request galaxy/tsg_olap/sip-rtp-correlation!4
2023-08-11 06:18:46 +00:00
chaoc
b38f128f32 style: optimize imports 2023-08-11 14:16:11 +08:00
chaoc
2c64391737 perf(functions): fix the issue of being unable to collect expired data 2023-08-11 14:14:58 +08:00
liang chao
5481a7b9ee Merge branch 'feature/address-keyby-impl' into 'main'
feature: develop job using java

See merge request galaxy/tsg_olap/sip-rtp-correlation!3
2023-08-10 09:39:51 +00:00
chaoc
7e5a524eb7 style: modify project name 2023-08-10 17:39:22 +08:00
chaoc
255d6d9fcb perf: change the timing of timer register 2023-08-10 15:33:20 +08:00
chaoc
4c90c27921 style: delete unused code 2023-08-10 15:32:23 +08:00
chaoc
f4b8ba69d0 fix: error ttl conf 2023-08-10 15:31:45 +08:00
chaoc
0f6fdda7f5 pref: adjust the timer interval 2023-08-10 15:30:30 +08:00
chaoc
44eb3ed402 style: add influxdb report conf 2023-08-10 15:29:53 +08:00
chaoc
ebada97c22 test: add do nothing sink 2023-08-10 15:29:19 +08:00
chaoc
7221f2a52f feat: impl using the Address as the keyBy parameter 2023-08-09 16:38:34 +08:00
chaoc
6a444d38ba style: add conf example file 2023-08-09 16:28:27 +08:00
chaoc
7d0d185c0e refactor: extract keyedFunction to simple class 2023-08-09 16:27:56 +08:00
chaoc
c9c26f0a9c refactor: extract keyedFunction to simple class 2023-08-09 16:27:46 +08:00
chaoc
b20c33e2bd chore: add plugins 2023-08-09 16:26:21 +08:00
chaoc
c10b8930e3 style(performance): organize performance testing code 2023-08-09 15:58:06 +08:00
chaoc
21ef68da95 test: add conf props 2023-08-07 17:15:46 +08:00
chaoc
33b9d61711 test(performance): add test case 2023-08-07 17:15:27 +08:00
chaoc
43dbc50a19 fix: fix some errors 2023-08-07 17:13:23 +08:00
chaoc
5bc835ec3f test(utils): add RTP data generator 2023-08-04 16:10:38 +08:00
chaoc
673d077e32 test(utils): optimize data generator 2023-08-04 16:10:24 +08:00
chaoc
b2af6cdad8 perf(error): optimize cond 2023-08-03 17:48:35 +08:00
chaoc
bfd28178b0 perf(error): add cond 2023-08-03 17:29:47 +08:00
chaoc
cefc581964 test(functions): add test for SIPPairingFunction 2023-08-03 17:21:38 +08:00
chaoc
c1404584c0 test(functions): add test for Address 2023-08-03 17:21:15 +08:00
chaoc
ef851cb187 test(records): add test for Record 2023-08-03 17:20:54 +08:00
chaoc
402db73b53 test(conf): add test for FusionConfiguration 2023-08-03 17:20:31 +08:00
chaoc
cbc7c91abe test(utils): add test data generator 2023-08-03 17:18:00 +08:00
chaoc
14d569edfd feat(error): add handler which process exception records 2023-08-03 17:17:24 +08:00
chaoc
4d3b53cbb7 fix: collect expire rtp data record 2023-08-03 17:16:09 +08:00
chaoc
112708f056 fix: fix time type error 2023-08-03 17:15:38 +08:00
chaoc
b40469ea8d pref: modify param type 2023-08-03 16:11:10 +08:00
chaoc
b7cc87b4bc feat(app): job main 2023-08-03 16:04:13 +08:00
chaoc
9ed0da345b feat(format): add json serializer 2023-08-03 16:03:54 +08:00
chaoc
559c7b287c fix(record): update field type 2023-08-03 16:03:30 +08:00
chaoc
bcd6826c56 perf(record): update return values 2023-08-03 16:03:09 +08:00
chaoc
7296c36d80 feat(functions): add function that fusion SIP data and RTP data to VoIP 2023-08-03 16:02:21 +08:00
chaoc
a8a6519a88 feat(functions): add function that splits record based on field 'schema_type' 2023-08-03 16:01:49 +08:00
chaoc
7d2b0bae29 pref(functions): format code 2023-08-03 16:00:39 +08:00
chaoc
098a196cc0 pref(record): add field setter 2023-08-03 15:59:26 +08:00
chaoc
cfb56877c3 feat(conf): add Enhanced-Configuration utils 2023-08-03 15:58:42 +08:00
chaoc
d4bc2787d4 chore: add test deps and some plugins 2023-08-03 15:57:20 +08:00
chaoc
53eca34191 feat(functions): impl keyedProcessFunction that pairs SIP one-way direction to double direction 2023-08-03 09:56:19 +08:00
chaoc
a949a98a37 feat(functions): add function helper 2023-08-03 09:54:23 +08:00
chaoc
fe274d4da6 feat(functions): add some function pojo types 2023-08-03 09:53:41 +08:00
chaoc
6d90d720fd feat(conf): add some configuration options 2023-08-03 09:52:11 +08:00
chaoc
43f10e38e7 chore: add lombok 2023-08-03 09:51:47 +08:00
chaoc
f52376d63d feat(record): add some enum types 2023-08-03 09:44:56 +08:00
chaoc
1046784f3e feat(record): add json data parser utils 2023-08-03 09:39:39 +08:00
chaoc
3ff568c823 chore: init maven frame 2023-08-02 17:05:56 +08:00
chaoc
906b6e6ca4 chore: add git ignore file 2023-08-02 17:04:01 +08:00
chaoc
8d5d1d9523 docs: update readme 2023-08-02 17:02:54 +08:00
36 changed files with 2605 additions and 78 deletions

35
.gitignore vendored Normal file
View File

@@ -0,0 +1,35 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

7
CHANGELOG.md Normal file
View File

@@ -0,0 +1,7 @@
# Changelog
### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Other
- 输出 SIP Record

106
README.md
View File

@@ -1,92 +1,42 @@
# flink-voip-fusion # SIP RTP Correlation
## 简介
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIPSession Initiation Protocol和 RTPReal-time Transport Protocol数据将它们融合成完整的 VoIPVoice over Internet Protocol通话数据。
## Getting started SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
To make it easy for you to get started with GitLab, here's a list of recommended next steps. ## 编译和打包
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)! 使用Maven工具对项目进行编译和打包
```shell
## Add your files mvn clean package
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
```
cd existing_repo
git remote add origin https://git.mesalab.cn/galaxy/tsg_olap/flink-voip-fusion.git
git branch -M main
git push -uf origin main
``` ```
## Integrate with your tools ## 运行Flink任务
- [ ] [Set up project integrations](https://git.mesalab.cn/galaxy/tsg_olap/flink-voip-fusion/-/settings/integrations) 使用以下命令运行Flink任务
```shell
flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<version>.jar application.properties
```
## Collaborate with your team ## 配置项说明
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/) | 配置项 | 类型 | 必需 | 默认值 | 描述 |
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html) | --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- | ----------------------------------------------------------- |
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically) | source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/) | source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
- [ ] [Automatically merge when pipeline succeeds](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html) | sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| determine.intranet.ip.be.abnormal | BOOLEAN | N | True | SIP 中协商四元组中存在内网 IP 地址时,是否将其判定为异常数据 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
## Test and Deploy
Use the built-in continuous integration in GitLab.
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html) ## 贡献
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing(SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
*** 如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。
# Editing this README
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thank you to [makeareadme.com](https://www.makeareadme.com/) for this template.
## Suggestions for a good README
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
## Name
Choose a self-explaining name for your project.
## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
## Badges
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
## Visuals
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
## Support
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
## Roadmap
If you have ideas for releases in the future, it is a good idea to list them in the README.
## Contributing
State if you are open to contributions and what your requirements are for accepting them.
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
## Authors and acknowledgment
Show your appreciation to those who have contributed to the project.
## License
For open source projects, say how it is licensed.
## Project status
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.

380
pom.xml Normal file
View File

@@ -0,0 +1,380 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.1-rc1</version>
<name>Flink : SIP-RTP : Correlation</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<jackson.version>2.13.2.20220328</jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-influxdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.1.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/it/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>test-resources</id>
<phase>generate-test-resources</phase>
<goals>
<goal>add-test-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>src/it/resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-tests-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.outputDirectory}/tests-lib</outputDirectory>
<excludeScope>system</excludeScope>
<excludeTransitive>false</excludeTransitive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>pre-unit-test</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacoco.exec</destFile>
</configuration>
</execution>
<execution>
<id>test-report</id>
<phase>verify</phase>
<goals>
<goal>report</goal>
</goals>
<configuration>
<dataFile>${project.build.directory}/jacoco.exec</dataFile>
<outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.7</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.3.0</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.6.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@@ -0,0 +1,95 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Objects;
// Integration test main class
public class CorrelateTest {
public static void main(String[] args) throws Exception {
final Configuration envConfig = new Configuration();
envConfig.setInteger("rest.port", 18081);
envConfig.setBoolean("rest.flamegraph.enabled", true);
envConfig.setString("metrics.reporter.influxdb.factory.class", "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory");
envConfig.setString("metrics.reporter.influxdb.scheme", "http");
envConfig.setString("metrics.reporter.influxdb.host", "127.0.0.1");
envConfig.setInteger("metrics.reporter.influxdb.port", 8086);
envConfig.setString("metrics.reporter.influxdb.db", "flinks");
envConfig.setString("metrics.reporter.influxdb.interval", "5 SECONDS");
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(envConfig);
final ParameterTool tool = ParameterTool.fromPropertiesFile(
args.length > 0 ? args[0] :
Objects.requireNonNull(
CorrelateTest.class.getResource("/application-test.properties")).getPath()
);
final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
env.enableCheckpointing(Time.minutes(1)
.toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds());
checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir"));
/* checkpointConfig.setCheckpointStorage(
Objects.requireNonNull(
CorrelateTest.class.getResource("/")).toString());*/
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
env.setParallelism(8);
final SingleOutputStreamOperator<ObjectNode> sourceStream = env.addSource(new DataGenSource())
.name("DataGenSource")
.setParallelism(4);
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
sourceStream
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(new SIPKeySelector())
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
voIpOperator.addSink(new DoNothingSink())
.name("DoNothingSink")
.setParallelism(1);
env.execute("VoIP Fusion Job");
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
// Test Configs.
public class TestConfigs {
// Ratio of valuable data in the generated dataset
public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION =
ConfigOptions.key("valuable.data.ratio")
.intType()
.defaultValue(40)
.withDescription("Ratio of valuable data in the generated dataset.");
// QPS of generate date record
public static final ConfigOption<Long> DATA_GENERATE_RATE =
ConfigOptions.key("data.generate.rate")
.longType()
.defaultValue(1000L)
.withDescription("QPS of generate date record.");
}

View File

@@ -0,0 +1,46 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.TestConfigs;
import com.zdjizhi.flink.voip.data.RTPGenerator;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
// Date Generate Source.
public class DataGenSource extends RichParallelSourceFunction<ObjectNode> implements FunctionHelper {
private transient SIPGenerator sipGenerator;
private transient RTPGenerator rtpGenerator;
private transient RateLimiter rateLimiter;
private volatile boolean running;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Integer ratio = getGlobalConfiguration()
.get(TestConfigs.VALUABLE_DATA_PROPORTION);
this.sipGenerator = new SIPGenerator(ratio);
this.rtpGenerator = new RTPGenerator(ratio, sipGenerator);
final Long rate = getGlobalConfiguration()
.get(TestConfigs.DATA_GENERATE_RATE);
this.rateLimiter = RateLimiter.create(
(int) (rate / getRuntimeContext().getNumberOfParallelSubtasks()));
this.running = true;
}
@Override
public void run(SourceContext<ObjectNode> ctx) throws Exception {
while (running) {
rateLimiter.acquire();
ctx.collect(sipGenerator.next());
ctx.collect(rtpGenerator.next());
}
}
@Override
public void cancel() {
this.running = false;
}
}

View File

@@ -0,0 +1,43 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
// It dose nothing with the incoming data and simply collects metrics for the number of
// RTP and VoIP records processed per second.
public class DoNothingSink extends RichSinkFunction<ObjectNode> {
private transient MeterView numRTPRecordsPreSecond;
private transient MeterView numVoIPRecordsPreSecond;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext runtimeContext = getRuntimeContext();
MetricGroup metricGroup = runtimeContext.getMetricGroup();
numRTPRecordsPreSecond = metricGroup
.meter("numRTPRecordsPreSecond", new MeterView(1));
numVoIPRecordsPreSecond = metricGroup
.meter("numVoIPRecordsPreSecond", new MeterView(1));
}
@Override
public void invoke(ObjectNode obj, Context context) throws Exception {
Record record = new Record(obj);
switch (record.getSchemaType()) {
case RTP:
numRTPRecordsPreSecond.markEvent();
break;
case VOIP:
numVoIPRecordsPreSecond.markEvent();
break;
default:
}
}
}

View File

@@ -0,0 +1,4 @@
sip.state.clear.interval.minutes=1
rtp.state.clear.interval.minutes=10
valuable.data.ratio=20
data.generate.rate=1000

View File

@@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.test.name = TestLogger
appender.test.type = CONSOLE
appender.test.layout.type = PatternLayout
appender.test.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

View File

@@ -0,0 +1,106 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/**
* The main class for running the SIP RTP Correlation application.
*
* @author chaoc
* @since 1.0
*/
public class CorrelateApp {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// param check
if (args.length < 1) {
throw new IllegalArgumentException("Error: Not found properties path. " +
"\nUsage: flink -c xxx xxx.jar app.properties.");
}
final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
final FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC),
new JsonNodeDeserializationSchema(),
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("common_start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
errorHandler.filterError(sourceStream)
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(new SIPKeySelector())
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
config.get(SINK_KAFKA_TOPIC),
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}
}

View File

@@ -0,0 +1,97 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Containing configuration options for the Fusion application.
*
* @author chaoc
* @since 1.0
*/
public class FusionConfigs {
/**
* The prefix for Kafka properties used in the source.
*/
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
/**
* The prefix for Kafka properties used in the sink.
*/
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
/**
* Configuration prefix for the properties of the Kafka sink where the error data will be output.
*/
public static final String ERROR_SINK_KAFKA_PROPERTIES_PREFIX = "error.sink.kafka.props.";
/**
* Configuration option for the Kafka topic used in the source.
*/
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the source.");
/**
* Configuration option for the Kafka topic used in the sink.
*/
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
/**
* Configuration option to enable or disable the output of error records.
* If set to true, the error records will be sent to the specified Kafka topic.
* Default value is false.
*/
public static final ConfigOption<Boolean> ERROR_RECORDS_OUTPUT_ENABLE =
ConfigOptions.key("error.records.output.enable")
.booleanType()
.defaultValue(false)
.withDescription("Enable or disable the output of error records. " +
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether intranet IP addresses should be considered abnormal.
*/
public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL =
ConfigOptions.key("determine.intranet.ip.be.abnormal")
.booleanType()
.defaultValue(true)
.withDescription("Specifies whether intranet IP addresses should be treated as abnormal.");
/**
* Configuration option for specifying the Kafka topic name where the error data will be sent.
* This configuration option is used when the output of error records is enabled.
*/
public static final ConfigOption<String> ERROR_SINK_KAFKA_TOPIC =
ConfigOptions.key("error.sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic name where the error records will be sent.");
/**
* The configuration option for the interval at which SIP state data
* should be cleared.
*/
public static final ConfigOption<Integer> SIP_STATE_CLEAR_INTERVAL =
ConfigOptions.key("sip.state.clear.interval.minutes")
.intType()
.defaultValue(1)
.withDescription("The interval at which SIP state data should be cleared.");
/**
* The configuration option for the interval at which STP state data
* should be cleared.
*/
public static final ConfigOption<Integer> RTP_STATE_CLEAR_INTERVAL =
ConfigOptions.key("rtp.state.clear.interval.minutes")
.intType()
.defaultValue(6)
.withDescription("The interval at which RTP state data should be cleared.");
}

View File

@@ -0,0 +1,44 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
/**
* A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
* properties with a specific prefix. This class allows retrieving properties that start with the
* given `prefix` and converts them into a `java.util.Properties` object.
*
* @author chaoc
* @since 1.0
*/
public class FusionConfiguration {
private final Configuration config;
public FusionConfiguration(final Configuration config) {
this.config = config;
}
/**
* Retrieves properties from the underlying `Configuration` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
public Properties getProperties(final String prefix) {
if (prefix == null) {
final Properties props = new Properties();
props.putAll(config.toMap());
return props;
}
return config.toMap()
.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.collect(Properties::new, (props, e) ->
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
Properties::putAll);
}
}

View File

@@ -0,0 +1,162 @@
package com.zdjizhi.flink.voip.error;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.FunctionHelper;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
*
* @author chaoc
* @since 1.0
*/
public class ErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(ErrorHandler.class);
/**
* The OutputTag for invalid records.
*/
public static final OutputTag<ObjectNode> INVALID_OUTPUT_TAG =
new OutputTag<>("invalid-records", TypeInformation.of(ObjectNode.class));
private final Configuration config;
/**
* Creates a new ErrorHandler instance with the given configuration.
*
* @param config The configuration containing settings for error handling.
*/
public ErrorHandler(final Configuration config) {
this.config = config;
}
/**
* Filters out error records from the input data stream and outputs them to a separate stream if enabled.
*
* @param dataStream The input data stream of ObjectNode records.
* @return A new DataStream containing valid records without errors.
*/
public DataStream<ObjectNode> filterError(final DataStream<ObjectNode> dataStream) {
// Process the data stream to identify meaningless addresses and ports
final SingleOutputStreamOperator<ObjectNode> operator = dataStream
.process(new MeaninglessAddressProcessFunction())
.name("MeaninglessRecords")
.uid("meaningless-records");
// If enabled, output the error records to the specified Kafka topic
if (config.get(FusionConfigs.ERROR_RECORDS_OUTPUT_ENABLE)) {
final String topic = config.get(FusionConfigs.ERROR_SINK_KAFKA_TOPIC);
LOG.info("Meaningless data output is enabled, data will be sent to: Topic [{}]", topic);
final DataStream<ObjectNode> errorStream = operator
.getSideOutput(INVALID_OUTPUT_TAG);
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
topic,
new JsonNodeSerializationSchema(),
new FusionConfiguration(config)
.getProperties(FusionConfigs.ERROR_SINK_KAFKA_PROPERTIES_PREFIX)
);
errorStream.addSink(producer);
}
return operator;
}
}
/**
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
*/
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean determineIntranetIpBeAbnormal;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL);
}
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
// Both client and server addresses in the data are valid.
if (cond1 && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
// and not internal network addresses.
|| cond5 && cond6
|| !cond5)) {
out.collect(obj);
} else {
// Output invalid records to the invalid output tag
ctx.output(ErrorHandler.INVALID_OUTPUT_TAG, obj);
if (LOG.isDebugEnabled()) {
LOG.debug("Meaningless record: {}", obj);
}
}
}
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;
}
return IPUtil.isIPAddress(ipaddr);
}
private static boolean isInternalIp(final String ipaddr) {
if (!isIPAddress(ipaddr)) {
return false;
}
return IPUtil.internalIp(ipaddr);
}
}

View File

@@ -0,0 +1,20 @@
package com.zdjizhi.flink.voip.formats;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(ObjectNode jsonNodes) {
try {
return mapper.writeValueAsBytes(jsonNodes);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,54 @@
package com.zdjizhi.flink.voip.functions;
import com.google.common.collect.Lists;
import com.zdjizhi.utils.IPUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
/**
* A pojo class representing an address with two IP and port pairs.
*
* @author chaoc
* @since 1.0
*/
@Data
@AllArgsConstructor
public class Address {
// The first IP address.
private final String ip1;
//The first port number.
private final int port1;
//The second IP address.
private final String ip2;
//The second port number.
private final int port2;
/**
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
* the numeric value of the IP address.
*
* @param a1 The first address information as a tuple (IP address, port).
* @param a2 The second address information as a tuple (IP address, port).
* @return An Address instance with addresses sorted and reordered.
*/
public static Address of(Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
List<Tuple2<String, Integer>> list = Lists.newArrayList(a1, a2);
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
} else {
return a.f1.compareTo(b.f1);
}
});
return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
}
}

View File

@@ -0,0 +1,32 @@
package com.zdjizhi.flink.voip.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
/**
* An interface that provides utility functions for Flink functions.
*
* @author chaoc
* @since 1.0
*/
public interface FunctionHelper extends RichFunction {
/**
* Get the global configuration for the current Flink job.
*
* @return The global configuration as a Configuration object.
*/
default Configuration getGlobalConfiguration() {
final ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
return (Configuration) globalJobParameters;
}
default void registerNextFireTimestamp(TimerService timerService, long interval) {
long current = timerService.currentWatermark();
timerService.registerEventTimeTimer(current + interval);
}
}

View File

@@ -0,0 +1,28 @@
package com.zdjizhi.flink.voip.functions;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Setter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A case class representing an ObjectNode with an expiration time and a pair times.
*
* @author chaoc
* @since 1.0
*/
@Data
@AllArgsConstructor
class ObjectNodeInfo {
// The ObjectNode containing data.
private ObjectNode obj;
// The pair times for the object.
@Setter
private int times;
public void incTimes() {
this.times = this.times + 1;
}
}

View File

@@ -0,0 +1,31 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A KeySelector implementation for extracting a composite key from an ObjectNode representing a SIP record.
*
* @author chaoc
* @since 1.0
*/
public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> {
/**
* Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple3 containing the extracted key (VSysID, CallID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception {
final SIPRecord record = new SIPRecord(obj);
final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()),
Tuple2.of(record.getServerIp(), record.getServerPort()));
return Tuple3.of(record.getVSysID(), record.getCallID(), address);
}
}

View File

@@ -0,0 +1,82 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
* SIP records are paired when they have the same addresses but opposite stream directions.
*
* @author chaoc
* @since 1.0
*/
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final ValueStateDescriptor<ObjectNode> descriptor =
new ValueStateDescriptor<>("sip-state", ObjectNode.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
descriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(ObjectNode value,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(value);
// When SIP is a one-way stream.
if (StreamDir.DOUBLE != record.getStreamDir()) {
// If the address is already stored in the mapState and has the opposite stream direction,
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
final ObjectNode obj = valueState.value();
if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) {
record.merge(obj)
.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the mapState.
valueState.update(value);
}
} else {
// If SIP is a double stream, pairing isn't required, directly output the record.
out.collect(value);
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
valueState.clear();
}
}

View File

@@ -0,0 +1,45 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
*
* @author chaoc
* @since 1.0
*/
public class TypeSplitFunction extends ProcessFunction<ObjectNode, ObjectNode> {
/**
* OutputTag for SIP records.
*/
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("schema-type-sip", TypeInformation.of(ObjectNode.class));
/**
* OutputTag for RTP records.
*/
public static final OutputTag<ObjectNode> RTP_OUTPUT_TAG =
new OutputTag<>("schema-type-rtp", TypeInformation.of(ObjectNode.class));
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
switch (record.getSchemaType()) {
case RTP:
ctx.output(RTP_OUTPUT_TAG, obj);
break;
case SIP:
ctx.output(SIP_OUTPUT_TAG, obj);
break;
default:
}
}
}

View File

@@ -0,0 +1,40 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A KeySelector implementation that extracts the key(VSysID) from an ObjectNode.
*
* @author chaoc
* @since 1.0
*/
public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> {
/**
* Extracts the composite key (VSysID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple2 containing the extracted key (VSysID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception {
final Record record = new Record(obj);
final Address address;
if (record.getSchemaType() == SchemaType.SIP) {
final SIPRecord sipRecord = new SIPRecord(obj);
address = Address.of(
Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()),
Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort()));
} else {
address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
Tuple2.of(record.getClientIp(), record.getClientPort()));
}
return Tuple2.of(record.getVSysID(), address);
}
}

View File

@@ -0,0 +1,157 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
* RTP records, and it uses timers to trigger regular clearing of the state.
*
* @author chaoc
* @since 1.0
*/
public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>
implements FunctionHelper {
private static final int MAX_RTP_LINES = 2;
private transient Time fireInterval;
private transient ValueState<ObjectNodeInfo> sipState;
private transient MapState<StreamDir, ObjectNode> rtpState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final RuntimeContext context = getRuntimeContext();
final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
sipDescriptor.enableTimeToLive(ttlConfig);
sipState = context.getState(sipDescriptor);
final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor =
new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class);
final StateTtlConfig rtpTtlConfig = StateTtlConfig
.newBuilder(Time.minutes(minutes + 1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
rtpDescriptor.enableTimeToLive(rtpTtlConfig);
rtpState = context.getMapState(rtpDescriptor);
}
// SIP
@Override
public void processElement2(ObjectNode sipObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator();
if (rtpState.isEmpty()) {
sipState.update(new ObjectNodeInfo(sipObj, 0));
}
while (iterator.hasNext()) {
final Map.Entry<StreamDir, ObjectNode> entry = iterator.next();
final ObjectNode rtpObj = entry.getValue();
final Record rtpRecord = new Record(rtpObj);
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
switch (entry.getKey()) {
case S2C:
case C2S:
ObjectNodeInfo info = sipState.value();
if (info != null) {
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
} else {
sipState.update(new ObjectNodeInfo(sipObj, info.getTimes()));
}
} else {
sipState.update(new ObjectNodeInfo(sipObj, 1));
}
break;
default:
// Double directional:
// In the context of VoIP fusion, only one RTP double directional stream
sipState.clear();
}
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
// RTP
@Override
public void processElement1(ObjectNode rtpObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record rtpRecord = new Record(rtpObj);
final ObjectNodeInfo info = sipState.value();
final StreamDir streamDir = rtpRecord.getStreamDir();
if (null != info) {
rtpRecord.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
switch (streamDir) {
case C2S:
case S2C:
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
}
break;
default:
// Double
sipState.clear();
}
} else {
rtpState.put(streamDir, rtpObj);
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
for (ObjectNode obj : rtpState.values()) {
out.collect(obj);
}
rtpState.clear();
sipState.clear();
}
}

View File

@@ -0,0 +1,195 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
/**
* Record class represents a data record with various fields.
* <p>
* It provides getter and setter methods for accessing the fields of the data record.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
/**
* 字段名:数据记录中的服务端地址
*/
public static final String F_COMMON_SERVER_IP = "common_server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
public static final String F_COMMON_SERVER_PORT = "common_server_port";
/**
* 字段名:数据记录中的客户端地址
*/
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
/**
* ObjectNode data.
*/
protected final ObjectNode obj;
/**
* Get the VSys ID from the data record.
*
* @return The VSys ID as an integer.
*/
public int getVSysID() {
return Record.getInt(obj, F_COMMON_VSYS_ID);
}
/**
* Get the schema type from the data record.
*
* @return The schema type.
*/
public final SchemaType getSchemaType() {
return SchemaType.of(Record.getString(obj, F_COMMON_SCHEMA_TYPE));
}
/**
* Get the stream direction from the data record.
*
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
}
/**
* Get the server IP address from the data record.
*
* @return The server IP address as a string.
*/
public final String getServerIp() {
return Record.getString(obj, F_COMMON_SERVER_IP);
}
/**
* Get the server port from the data record.
*
* @return The server port as an integer.
*/
public final int getServerPort() {
return Record.getInt(obj, F_COMMON_SERVER_PORT);
}
/**
* Get the client IP address from the data record.
*
* @return The client IP address as a string.
*/
public final String getClientIp() {
return Record.getString(obj, F_COMMON_CLIENT_IP);
}
/**
* Get the client port from the data record.
*
* @return The client port as an integer.
*/
public final int getClientPort() {
return Record.getInt(obj, F_COMMON_CLIENT_PORT);
}
/**
* Set an integer value to the specified field in the data record.
*
* @param name The name of the field.
* @param value The integer value to set.
*/
public final void setInt(final String name, final int value) {
obj.set(name, IntNode.valueOf(value));
}
/**
* Set a string value to the specified field in the data record.
*
* @param name The name of the field.
* @param value The string value to set.
*/
public final void setString(final String name, final String value) {
obj.set(name, TextNode.valueOf(value));
}
/**
* Merge the fields of another ObjectNode into the current data record.
*
* @param other The ObjectNode containing the fields to be merged.
* @return This record.
*/
public final Record merge(final ObjectNode other) {
other.fields().forEachRemaining(entry -> obj.set(entry.getKey(), entry.getValue()));
return this;
}
/**
* Get an integer value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not an integer.
* @return The integer value from the field or the default value if the field is not found or is not an integer.
*/
public static int getInt(final ObjectNode obj, final String field, final int defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isInt() ? node.asInt(defaultValue) : defaultValue;
}
/**
* Get an integer value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The integer value from the field or 0 if the field is not found or is not an integer.
*/
public static int getInt(final ObjectNode obj, final String field) {
return getInt(obj, field, 0);
}
/**
* Get a string value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a string.
* @return The string value from the field or the default value if the field is not found or is not a string.
*/
public static String getString(final ObjectNode obj, final String field, final String defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isTextual() ? node.asText(defaultValue) : defaultValue;
}
/**
* Get a string value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The string value from the field or null if the field is not found or is not a string.
*/
public static String getString(final ObjectNode obj, final String field) {
return getString(obj, field, null);
}
}

View File

@@ -0,0 +1,57 @@
package com.zdjizhi.flink.voip.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPSession Initiation Protocoldata record class, used to parse and access SIP data records.
*
* @author chaoc
* @since 1.0
*/
public class SIPRecord extends Record {
/**
* Field Name: SIP 通话的会话 ID
*/
public static final String F_CALL_ID = "sip_call_id";
/**
* Field Name: SIP 通话的协调的主叫语音传输 IP
*/
public static final String F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip";
/**
* Field Name: SIP 通话的协调的主叫语音传输端口
*/
public static final String F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port";
/**
* Field Name: SIP 通话的协调的被叫语音传输 IP
*/
public static final String F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip";
/**
* Field Name: SIP 通话的协调的被叫语音传输端口
*/
public static final String F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port";
public SIPRecord(final ObjectNode obj) {
super(obj);
}
public String getCallID() {
return Record.getString(obj, F_CALL_ID);
}
public String getOriginatorSdpConnectIp() {
return Record.getString(obj, F_ORIGINATOR_SDP_CONNECT_IP);
}
public int getOriginatorSdpMediaPort() {
return Record.getInt(obj, F_ORIGINATOR_SDP_MEDIA_PORT);
}
public String getResponderSdpConnectIp() {
return Record.getString(obj, F_RESPONDER_SDP_CONNECT_IP);
}
public int getResponderSdpMediaPort() {
return Record.getInt(obj, F_RESPONDER_SDP_MEDIA_PORT);
}
}

View File

@@ -0,0 +1,51 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* The SchemaType enum represents different types of data schemas.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
@Getter
public enum SchemaType {
/**
* Represents the SIP schema type.
*/
SIP("SIP"),
/**
* Represents the RTP schema type.
*/
RTP("RTP"),
/**
* Represents the VoIP schema type.
*/
VOIP("VoIP");
/**
* The string value of the SchemaType.
*/
private final String value;
/**
* Get the SchemaType enum based on the provided string value.
*
* @param value The string value of the SchemaType to retrieve.
* @return The corresponding SchemaType enum.
* @throws IllegalArgumentException if the provided value does not match any known SchemaType.
*/
public static SchemaType of(final String value) {
for (SchemaType schemaType : values()) {
if (schemaType.value.equalsIgnoreCase(value)) {
return schemaType;
}
}
throw new IllegalArgumentException("Unknown SchemaType value '" + value + "'.");
}
}

View File

@@ -0,0 +1,51 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* The StreamDir enum represents different types of data stream directions.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
@Getter
public enum StreamDir {
/**
* Represents the Client-to-Server (C2S) stream direction.
*/
C2S(1),
/**
* Represents the Server-to-Client (S2C) stream direction.
*/
S2C(2),
/**
* Represents the bidirectional (double) stream direction.
*/
DOUBLE(3);
/**
* The integer value of the StreamDir.
*/
private final int value;
/**
* Get the StreamDir enum based on the provided integer value.
*
* @param value The integer value of the StreamDir to retrieve.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir of(int value) {
for (StreamDir streamDir : values()) {
if (value == streamDir.value) {
return streamDir;
}
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
}

View File

@@ -0,0 +1,7 @@
sink.kafka.topic=VOIP-CONVERSATION-RECORD
sink.kafka.props.bootstrap.servers=localhost:9292
source.kafka.topic=VOIP-RECORD
source.kafka.props.bootstrap.servers=localhost:9292
source.kafka.props.group.id=flink-voip-fusion

View File

@@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

View File

@@ -0,0 +1,67 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FusionConfigurationTest {
private FusionConfiguration fusionConfiguration;
@BeforeEach
public void setUp() {
final Configuration config;
config = new Configuration();
config.setString("prefix_key1", "value1");
config.setString("prefix_key2", "value2");
config.setString("other_key", "other_value");
fusionConfiguration = new FusionConfiguration(config);
}
@Test
public void testGetPropertiesWithValidPrefix() {
String prefix = "prefix_";
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(2, properties.size());
assertEquals("value1", properties.getProperty("key1"));
assertEquals("value2", properties.getProperty("key2"));
}
@Test
public void testGetPropertiesWithInvalidPrefix() {
String prefix = "invalid_";
Properties properties = fusionConfiguration.getProperties(prefix);
assertTrue(properties.isEmpty());
}
@Test
public void testGetPropertiesWithEmptyPrefix() {
String prefix = "";
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(3, properties.size());
assertEquals("value1", properties.getProperty("prefix_key1"));
assertEquals("value2", properties.getProperty("prefix_key2"));
assertEquals("other_value", properties.getProperty("other_key"));
}
@Test
public void testGetPropertiesWithNullPrefix() {
// Null prefix should be treated as an empty prefix
String prefix = null;
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(3, properties.size());
assertEquals("value1", properties.getProperty("prefix_key1"));
assertEquals("value2", properties.getProperty("prefix_key2"));
assertEquals("other_value", properties.getProperty("other_key"));
}
}

View File

@@ -0,0 +1,98 @@
package com.zdjizhi.flink.voip.data;
import com.github.javafaker.Faker;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
/**
* The abstract class Generator<T> serves as the base class for implementing record generators.
* It provides common functionalities for generating random data and controlling the generation ratio.
*
* @param <T> The type of records to be generated.
* @author chaoc
* @since 1.0
*/
public abstract class Generator<T> {
protected final ThreadLocalRandom random;
/**
* The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
protected final int ratio;
private final Faker faker;
protected final AtomicReference<T> state;
protected final ObjectMapper mapper;
/**
* Creates a new Generator with the given ratio.
*
* @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
public Generator(final int ratio) {
this.ratio = ratio;
this.faker = new Faker();
this.random = ThreadLocalRandom.current();
this.state = new AtomicReference<>();
this.mapper = new ObjectMapper();
}
/**
* Generates the next record based on the specified ratio and the current state.
* It randomly selects whether to generate a new record or return the last generated record.
*
* @return The next generated record of type T.
*/
public abstract T next();
/**
* Generates a new record of type T.
*
* @return The newly generated record of type T.
*/
protected abstract T generate();
/**
* Performs post-processing on the generated record of type T.
* Subclasses can override this method to modify the generated record before returning it.
*
* @param v The generated record of type T.
* @return The post-processed record of type T.
*/
protected abstract T afterState(T v);
/**
* Generates a random IP address (either IPv4 or IPv6) .
*
* @return A randomly generated IP address as a string.
*/
public final String nextIp() {
if (random.nextBoolean()) {
return faker.internet().ipV4Address();
}
return faker.internet().ipV6Address();
}
/**
* Generates a random ID number.
*
* @return A randomly generated ID number as a string.
*/
public final String nextId() {
return faker.idNumber().valid();
}
/**
* Generates a random port number within the range of 0 to 65535 (inclusive).
*
* @return A randomly generated port number as an integer.
*/
public final int nextPort() {
return random.nextInt(65535);
}
}

View File

@@ -0,0 +1,111 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* RTPGenerator extends Generator<ObjectNode> and is responsible for generating RTP records.
* It generates random RTP records with specific properties.
*
* @author chaoc
* @since 1.0
*/
public class RTPGenerator extends Generator<ObjectNode> {
private final SIPGenerator sipGenerator;
public RTPGenerator(int ratio, SIPGenerator sipGenerator) {
super(ratio);
this.sipGenerator = sipGenerator;
}
@Override
public ObjectNode next() {
int i = random.nextInt(100);
if (i < ratio) {
final ObjectNode node = sipGenerator.state.get();
if (null != node) {
final ObjectNode obj = generate();
obj.set(Record.F_COMMON_CLIENT_IP, node.get(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP));
obj.set(Record.F_COMMON_CLIENT_PORT, node.get(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT));
obj.set(Record.F_COMMON_SERVER_IP, node.get(SIPRecord.F_RESPONDER_SDP_CONNECT_IP));
obj.set(Record.F_COMMON_SERVER_PORT, node.get(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT));
return obj;
}
}
return generate();
}
@Override
protected ObjectNode generate() {
final String json = "{\n" +
" \"common_address_list\": \"42924-36682-172.17.201.16-172.17.200.50\",\n" +
" \"common_address_type\": 4,\n" +
" \"common_app_full_path\": \"rtp\",\n" +
" \"common_app_label\": \"rtp\",\n" +
" \"common_c2s_byte_num\": 0,\n" +
" \"common_c2s_pkt_num\": 0,\n" +
" \"common_client_ip\": \"172.17.201.16\",\n" +
" \"common_client_port\": 42924,\n" +
" \"common_con_duration_ms\": 1086,\n" +
" \"common_device_id\": \"unknown\",\n" +
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
" \"common_direction\": 73,\n" +
" \"common_end_time\": 1689295970,\n" +
" \"common_flags\": 16401,\n" +
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":5,\\\"Server is Local\\\":1,\\\"S2C\\\":1}\",\n" +
" \"common_l4_protocol\": \"IPv4_UDP\",\n" +
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
" \"common_protocol_label\": \"ETHERNET.IPv4.UDP\",\n" +
" \"common_s2c_byte_num\": 7570,\n" +
" \"common_s2c_pkt_num\": 5,\n" +
" \"common_schema_type\": \"RTP\",\n" +
" \"common_server_ip\": \"172.17.200.50\",\n" +
" \"common_server_port\": 36682,\n" +
" \"common_sessions\": 1,\n" +
" \"common_sled_ip\": \"192.168.42.54\",\n" +
" \"common_start_time\": 1689294629,\n" +
" \"common_stream_dir\": 3,\n" +
" \"common_stream_trace_id\": \"290484792956466709\",\n" +
" \"common_t_vsys_id\": 24,\n" +
" \"common_vsys_id\": 24,\n" +
" \"raw_log_status\": \"CLOSE\",\n" +
" \"rtp_payload_type_c2s\": 0,\n" +
" \"rtp_payload_type_s2c\": 0,\n" +
" \"rtp_pcap_path\": \"http://192.168.44.67:9098/hos/rtp_hos_bucket/rtp_172.17.200.50_172.17.201.16_27988_55806_1689294629.pcap\"\n" +
"}";
ObjectNode obj;
try {
obj = mapper.readValue(json, ObjectNode.class);
} catch (Exception e){
obj = mapper.createObjectNode();
}
final SIPRecord record = new SIPRecord(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
record.setString(SIPRecord.F_CALL_ID, nextId());
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, random.nextInt(3) + 1);
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (record.getStreamDir()) {
case DOUBLE:
record.setInt(Record.F_COMMON_STREAM_DIR, random.nextInt(2) + 1);
break;
case S2C:
default:
}
return v;
}
}

View File

@@ -0,0 +1,138 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records.
* It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port,
* client IP, client port... information for both originator and responder.
*
* @author chaoc
* @since 1.0
*/
public class SIPGenerator extends Generator<ObjectNode> {
public SIPGenerator(final int ratio) {
super(ratio);
}
/**
* Creates a new SIPGenerator with the default ratio of 40.
*/
public SIPGenerator() {
this(40);
}
@Override
public final ObjectNode next() {
int i = random.nextInt(100);
if (i < ratio && state.get() != null) {
ObjectNode t = afterState(state.get());
state.set(null);
return t;
} else {
return state.updateAndGet(t -> generate());
}
}
@Override
protected ObjectNode generate() {
final String json = "{\n" +
" \"common_schema_type\": \"SIP\",\n" +
" \"common_sessions\": 1,\n" +
" \"sip_call_id\": \"3257b5c0f3d0266@spirent.com\",\n" +
" \"sip_originator_description\": \"<sip:caller@spirent.com>\",\n" +
" \"sip_responder_description\": \"sip:callee@spirent.com\",\n" +
" \"sip_originator_sdp_content\": \"v=0\\r\\no=SIP-UA 3395000 3397200 IN IP4 172.17.200.50\\r\\ns=SIP Call\\r\\nc=IN IP4 172.17.200.50\\r\\nt=0 0\\r\\nm=audio 36682 RTP/AVP 0\\r\\na=rtpmap:0 PCMU/8000\\r\\n\",\n" +
" \"sip_originator_sdp_connect_ip\": \"172.17.200.50\",\n" +
" \"sip_originator_sdp_media_port\": 36682,\n" +
" \"sip_originator_sdp_media_type\": \"0 PCMU/8000\",\n" +
" \"common_first_ttl\": 128,\n" +
" \"common_c2s_ipfrag_num\": 0,\n" +
" \"common_s2c_ipfrag_num\": 0,\n" +
" \"common_c2s_tcp_unorder_num\": 0,\n" +
" \"common_s2c_tcp_unorder_num\": 0,\n" +
" \"common_c2s_tcp_lostlen\": 0,\n" +
" \"common_s2c_tcp_lostlen\": 0,\n" +
" \"common_c2s_pkt_retrans\": 2,\n" +
" \"common_s2c_pkt_retrans\": 0,\n" +
" \"common_c2s_byte_retrans\": 1100,\n" +
" \"common_s2c_byte_retrans\": 0,\n" +
" \"common_direction\": 69,\n" +
" \"common_app_full_path\": \"sip\",\n" +
" \"common_app_label\": \"sip\",\n" +
" \"common_tcp_client_isn\": 3004427198,\n" +
" \"common_server_ip\": \"172.17.201.16\",\n" +
" \"common_client_ip\": \"172.17.200.49\",\n" +
" \"common_server_port\": 5060,\n" +
" \"common_client_port\": 6948,\n" +
" \"common_stream_dir\": 1,\n" +
" \"common_address_type\": 4,\n" +
" \"common_address_list\": \"6948-5060-172.17.200.50-172.17.201.16\",\n" +
" \"common_start_time\": 1689295655,\n" +
" \"common_end_time\": 1689295670,\n" +
" \"common_con_duration_ms\": 41467,\n" +
" \"common_s2c_pkt_num\": 0,\n" +
" \"common_s2c_byte_num\": 0,\n" +
" \"common_c2s_pkt_num\": 6,\n" +
" \"common_c2s_byte_num\": 1834,\n" +
" \"common_establish_latency_ms\": 249,\n" +
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
" \"common_flags\": 8201,\n" +
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":6,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\n" +
" \"common_protocol_label\": \"ETHERNET.IPv4.TCP\",\n" +
" \"common_stream_trace_id\": \"290502385134123507\",\n" +
" \"common_l4_protocol\": \"IPv4_TCP\",\n" +
" \"common_sled_ip\": \"192.168.42.54\",\n" +
" \"common_device_id\": \"unknown\",\n" +
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
" \"common_t_vsys_id\": 24,\n" +
" \"common_vsys_id\": 24\n" +
"}";
ObjectNode obj;
try {
obj = mapper.readValue(json, ObjectNode.class);
} catch (Exception e) {
obj = mapper.createObjectNode();
}
final SIPRecord record = new SIPRecord(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue());
record.setString(SIPRecord.F_CALL_ID, nextId() + "@spirent.com");
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (record.getStreamDir()) {
case C2S:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
break;
case S2C:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
break;
default:
}
return v;
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.flink.voip.functions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class AddressTest {
@Test
public void testAddressOfWithDifferentPortNumbers() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 9090);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.10", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.20", address.getIp2());
assertEquals(9090, address.getPort2());
}
@Test
public void testAddressOfWithSamePortNumber() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 8080);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.10", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.20", address.getIp2());
assertEquals(8080, address.getPort2());
}
@Test
public void testAddressOfWithInvertedOrder() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.20", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.10", 9090);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.20", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.10", address.getIp2());
assertEquals(9090, address.getPort2());
}
}

View File

@@ -0,0 +1,104 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
public class SIPPairingFunctionTest {
private static final int INTERVAL_MINUTES = 5;
private static KeyedOneInputStreamOperatorTestHarness<Integer, ObjectNode, ObjectNode> testHarness;
@BeforeAll
static void setUp() throws Exception {
final SIPPairingFunction func = new SIPPairingFunction();
final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator =
new KeyedProcessOperator<>(func);
final TypeInformation<Integer> type = TypeInformation.of(Integer.class);
final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0;
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, type);
final Configuration configuration = new Configuration();
configuration.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, INTERVAL_MINUTES);
testHarness.getExecutionConfig().setGlobalJobParameters(configuration);
testHarness.open();
}
@Test
public void testProperlyPairSIPRecords() throws Exception {
final SIPGenerator sipGenerator = new SIPGenerator();
long current = System.currentTimeMillis();
final ObjectNode obj = sipGenerator.next();
final Record record = new Record(obj);
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
final StreamRecord<ObjectNode> streamRecord = new StreamRecord<>(obj, current);
testHarness.processElement(streamRecord);
assertTrue(testHarness.getOutput().contains(streamRecord));
long interval = Time.minutes(INTERVAL_MINUTES).toMilliseconds();
long nextFireTime = (testHarness.getProcessingTime() / interval + 1) * interval;
assertTrue(testHarness.getProcessingTimeService().getActiveTimerTimestamps().contains(nextFireTime));
final ObjectNode obj1 = obj.deepCopy();
final Record record1 = new Record(obj1);
record1.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
final ObjectNode obj2 = obj.deepCopy();
final Record record2 = new Record(obj2);
record2.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
final MapStateDescriptor<Address, ObjectNodeInfo> descriptor =
new MapStateDescriptor<>(
"sip-state",
TypeInformation.of(Address.class),
TypeInformation.of(ObjectNodeInfo.class)
);
testHarness.processElement(obj1, current);
final MapState<Address, ObjectNodeInfo> mapState = testHarness.getOperator()
.getKeyedStateStore().getMapState(descriptor);
final Set<ObjectNode> objectNodes1 = Lists.newArrayList(mapState.values()).stream()
.map(ObjectNodeInfo::getObj)
.collect(Collectors.toSet());
assertTrue(objectNodes1.contains(obj1));
testHarness.processElement(obj2, current);
final Set<ObjectNode> objectNodes2 = Lists.newArrayList(mapState.values()).stream()
.map(ObjectNodeInfo::getObj)
.collect(Collectors.toSet());
assertFalse(objectNodes2.contains(obj2));
assertTrue(testHarness.getOutput().contains(new StreamRecord<>(obj2, current)));
assertEquals(2, testHarness.getOutput().size());
}
@AfterAll
static void close() throws Exception {
testHarness.close();
}
}

View File

@@ -0,0 +1,70 @@
package com.zdjizhi.flink.voip.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RecordTest {
private static ObjectMapper mapper;
@BeforeAll
static void setUp() {
mapper = new ObjectMapper();
}
@Test
void testGetVSysID() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setInt(Record.F_COMMON_VSYS_ID, 42);
assertEquals(42, record.getVSysID());
obj.set(Record.F_COMMON_VSYS_ID, IntNode.valueOf(40));
assertEquals(40, record.getVSysID());
}
@Test
void testGetSchemaType() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
}
@Test
void testSetInt() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setInt("intField", 123);
assertEquals(123, obj.get("intField").intValue());
}
@Test
void testSetString() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString("stringField", "testValue");
assertEquals("testValue", obj.get("stringField").textValue());
}
@Test
void testMerge() {
final ObjectNode obj = mapper.createObjectNode();
obj.set("field1", IntNode.valueOf(1));
ObjectNode other = mapper.createObjectNode();
other.set("field2", TextNode.valueOf("value2"));
Record record = new Record(obj);
record.merge(other);
assertEquals(1, obj.get("field1").intValue());
assertEquals("value2", obj.get("field2").textValue());
}
}