Compare commits
15 Commits
v2.0-rc2
...
feature/sc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9616a2400 | ||
|
|
7cb70bfd8b | ||
|
|
82cfc3274a | ||
|
|
a4cecd6e82 | ||
|
|
ef5a8e57b2 | ||
|
|
9af8db466e | ||
|
|
8d4c1dbd18 | ||
|
|
e573d88af3 | ||
|
|
f264303a6d | ||
|
|
92cd8dc614 | ||
|
|
81b3135b44 | ||
|
|
4fad35afa8 | ||
|
|
f313313d9e | ||
|
|
da50e8fcdb | ||
|
|
cdc6862913 |
@@ -1,38 +0,0 @@
|
||||
image: 192.168.40.153:9080/common/maven:3.8.1-openjdk-11-slim-with-git
|
||||
|
||||
variables:
|
||||
MAVEN_CLI_OPTS: "--batch-mode --errors --show-version"
|
||||
|
||||
stages:
|
||||
- check
|
||||
- test
|
||||
- build
|
||||
|
||||
release-version-check:
|
||||
stage: check
|
||||
script:
|
||||
- mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check
|
||||
- |-
|
||||
if `mvn $MAVEN_CLI_OPTS dependency:get@release-deploy-check > /dev/null 2>&1`; then
|
||||
echo "The current version has been deployed."
|
||||
exit 1
|
||||
else
|
||||
echo "The current version has not been deployed."
|
||||
fi
|
||||
rules:
|
||||
- if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "master" && $CI_PIPELINE_SOURCE == "merge_request_event"
|
||||
|
||||
test:
|
||||
stage: test
|
||||
script:
|
||||
- mvn $MAVEN_CLI_OPTS clean test
|
||||
only:
|
||||
- merge_requests
|
||||
|
||||
build:
|
||||
stage: build
|
||||
script:
|
||||
- echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
|
||||
- mvn clean site deploy -DskipTests
|
||||
only:
|
||||
- master
|
||||
12
README.md
12
README.md
@@ -1,10 +1,10 @@
|
||||
# SIP RTP Correlation
|
||||
# flink-voip-fusion
|
||||
|
||||
## 简介
|
||||
|
||||
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。
|
||||
VoIP 数据融合项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。
|
||||
|
||||
SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
|
||||
VoIP 数据融合项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
|
||||
|
||||
## 编译和打包
|
||||
|
||||
@@ -17,9 +17,13 @@ mvn clean package
|
||||
|
||||
使用以下命令运行Flink任务:
|
||||
```shell
|
||||
flink run -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml
|
||||
flink run -c className path/to/flink-voip-fusion-xxx.jar
|
||||
```
|
||||
|
||||
## 配置项说明
|
||||
|
||||
TODO 待补充
|
||||
|
||||
## 贡献
|
||||
|
||||
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。
|
||||
641
pom.xml
641
pom.xml
@@ -5,207 +5,107 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.geedgenetworks.application</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>2.0-rc2</version>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>flink-voip-fusion</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
<name>Flink : VoIP : Fusion</name>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<java.version>11</java.version>
|
||||
<maven.compiler.source>${java.version}</maven.compiler.source>
|
||||
<maven.compiler.target>${java.version}</maven.compiler.target>
|
||||
<flink.version>1.13.6</flink.version>
|
||||
<scala.version>2.12.10</scala.version>
|
||||
<scala.binary.version>2.12</scala.binary.version>
|
||||
<flink.version>1.13.6</flink.version>
|
||||
<easy.stream.version>1.3-rc1</easy.stream.version>
|
||||
<slf4j.version>1.7.32</slf4j.version>
|
||||
<log4j.version>2.17.1</log4j.version>
|
||||
<junit.version>5.8.0</junit.version>
|
||||
<jackson.version>2.13.2.20220328</jackson.version>
|
||||
<scalatest.version>3.2.15</scalatest.version>
|
||||
</properties>
|
||||
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>platform-releases</id>
|
||||
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
|
||||
<uniqueVersion>true</uniqueVersion>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>platform-snapshots</id>
|
||||
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
|
||||
</snapshotRepository>
|
||||
<site>
|
||||
<id>platform-site</id>
|
||||
<url>
|
||||
dav:http://192.168.40.153:8099/content/sites/platform-site/platform/application/sip-rtp-correlate-${project.version}
|
||||
</url>
|
||||
</site>
|
||||
</distributionManagement>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>central</id>
|
||||
<url>http://192.168.40.153:8099/content/groups/public</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>snapshots</id>
|
||||
<url>http://192.168.40.153:8099/content/groups/public</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-json</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>xyz.downgoon</groupId>
|
||||
<artifactId>snowflake</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Easy Stream -->
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-filter-pipeline</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-console-pipeline</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-split-pipeline</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-correlate-pipeline</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-union-pipeline</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-kafka-connector</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-text-connector</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-socket-connector</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-console-connector</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-json-format</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-flink-shim</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Flink -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||
<groupId>com.github.javafaker</groupId>
|
||||
<artifactId>javafaker</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
|
||||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
<classifier>tests</classifier>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<scope>test</scope>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<!-- DEV -->
|
||||
<dependency>
|
||||
<groupId>com.github.spotbugs</groupId>
|
||||
<artifactId>spotbugs-annotations</artifactId>
|
||||
<version>4.4.2</version>
|
||||
</dependency>
|
||||
<!-- LOG -->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
@@ -215,43 +115,57 @@
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<!-- API bridge between log4j 1 and 2 -->
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-1.2-api</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Test -->
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-reflect</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-compiler</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<version>${scalatest.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.javafaker</groupId>
|
||||
<artifactId>javafaker</artifactId>
|
||||
<version>1.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson</groupId>
|
||||
<artifactId>jackson-bom</artifactId>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Common -->
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
@@ -261,240 +175,72 @@
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- Easy Stream-->
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-common</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-core</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-filter-pipeline</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-console-pipeline</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-kafka-connector</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-json-format</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.geedgenetworks.flink</groupId>
|
||||
<artifactId>easy-stream-flink-shim</artifactId>
|
||||
<version>${easy.stream.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Flink -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-core</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-json</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-common</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<configuration>
|
||||
<suppressionsLocation>${basedir}/tools/maven/suppressions.xml</suppressionsLocation>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
<configLocation>${basedir}/tools/maven/checkstyle.xml</configLocation>
|
||||
<logViolationsToConsole>true</logViolationsToConsole>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>8.40</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>java-style-check</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sourceDirectories>src/main/java</sourceDirectories>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>java-test-style-check</id>
|
||||
<phase>test-compile</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<testSourceDirectories>src/test/java</testSourceDirectories>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>com.github.spotbugs</groupId>
|
||||
<artifactId>spotbugs-maven-plugin</artifactId>
|
||||
<version>4.4.2.2</version>
|
||||
<configuration>
|
||||
<xmlOutput>true</xmlOutput>
|
||||
<!-- Low, Medium, High ('Low' is strictest) -->
|
||||
<threshold>Low</threshold>
|
||||
<effort>default</effort>
|
||||
<spotbugsXmlOutputDirectory>${project.build.directory}/spotbugs</spotbugsXmlOutputDirectory>
|
||||
<excludeFilterFile>${basedir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
|
||||
<failOnError>true</failOnError>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>findbugs-main</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>findbugs-test</id>
|
||||
<phase>test-compile</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<includeTests>true</includeTests>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
<version>3.1</version>
|
||||
<configuration>
|
||||
<source>${maven.compiler.source}</source>
|
||||
<target>${maven.compiler.target}</target>
|
||||
<source>${java.version}</source>
|
||||
<target>${java.version}</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>3.2.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>scala-compile</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>scala-test-compile</id>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<scalaVersion>${scala.version}</scalaVersion>
|
||||
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.5.1</version>
|
||||
<version>3.1.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>default-shade</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||
<finalName>${project.artifactId}-${project.version}</finalName>
|
||||
<artifactSet>
|
||||
<excludes>
|
||||
<exclude>org.apache.flink:force-shading</exclude>
|
||||
<exclude>com.google.code.findbugs:jsr305</exclude>
|
||||
<exclude>org.slf4j:*</exclude>
|
||||
<exclude>org.apache.logging.log4j:*</exclude>
|
||||
<exclude>org.mockito:mockito-core</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
<filters>
|
||||
<filter>
|
||||
<!-- Do not copy the signatures in the META-INF folder.
|
||||
Otherwise, this might cause SecurityExceptions when using the JAR. -->
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
@@ -503,171 +249,10 @@
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
<transformers>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>build-jobs</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<finalName>${project.artifactId}-yml-${project.version}</finalName>
|
||||
<appendAssemblyId>false</appendAssemblyId>
|
||||
<descriptors>
|
||||
<descriptor>tools/dist/target.xml</descriptor>
|
||||
</descriptors>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>site-resources</id>
|
||||
<phase>pre-site</phase>
|
||||
<goals>
|
||||
<goal>resources</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/site</directory>
|
||||
<filtering>true</filtering>
|
||||
<includes>
|
||||
<include>**</include>
|
||||
</includes>
|
||||
</resource>
|
||||
</resources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-site-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>false</skip>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>default-site</id>
|
||||
<goals>
|
||||
<goal>site</goal>
|
||||
</goals>
|
||||
<phase>site</phase>
|
||||
<configuration>
|
||||
<siteDirectory>${project.build.outputDirectory}</siteDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>site-deploy</id>
|
||||
<goals>
|
||||
<goal>stage-deploy</goal>
|
||||
</goals>
|
||||
<phase>deploy</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<!-- CI plugins -->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>release-deploy-check</id>
|
||||
<goals>
|
||||
<goal>get</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>easy-stream-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<remoteRepositories>${project.distributionManagement.repository.url}</remoteRepositories>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>3.0.0-M3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>release-version-check</id>
|
||||
<goals>
|
||||
<goal>enforce</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<rules>
|
||||
<requireReleaseVersion>
|
||||
<message>SNAPSHOT versions ${project.version} are not allowed.</message>
|
||||
</requireReleaseVersion>
|
||||
</rules>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>snapshot-version-check</id>
|
||||
<goals>
|
||||
<goal>enforce</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<rules>
|
||||
<requireSnapshotVersion>
|
||||
<message>Non-SNAPSHOT versions ${project.version} are not allowed.</message>
|
||||
</requireSnapshotVersion>
|
||||
</rules>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-site-plugin</artifactId>
|
||||
<version>3.9.1</version>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/site</outputDirectory>
|
||||
<relativizeDecorationLinks>false</relativizeDecorationLinks>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.maven.wagon</groupId>
|
||||
<artifactId>wagon-webdav-jackrabbit</artifactId>
|
||||
<version>2.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.maven.doxia</groupId>
|
||||
<artifactId>doxia-module-markdown</artifactId>
|
||||
<version>1.9.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -1,33 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip;
|
||||
|
||||
import com.geedgenetworks.flink.easy.application.voip.udf.*;
|
||||
import com.geedgenetworks.flink.easy.common.api.UDFFactory;
|
||||
import org.apache.flink.table.functions.UserDefinedFunction;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class VoipUDFFactory implements UDFFactory {
|
||||
|
||||
private static final Map<String, UserDefinedFunction> R =
|
||||
new HashMap<>() {{
|
||||
put("IS_IP_ADDRESS", new IsIpAddress());
|
||||
|
||||
put("IS_INTERNAL_IP_ADDRESS", new IsInternalIpAddress());
|
||||
put("IS_EXTERNAL_IP_ADDRESS", new IsExternalIpAddress());
|
||||
|
||||
put("HAS_IP_ADDRESS", new HasIpAddress());
|
||||
put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress());
|
||||
|
||||
put("STREAM_DIR", new StreamDir());
|
||||
put("FIND_NOT_BLANK", new FindNotBlank());
|
||||
put("SORT_ADDRESS", new SortAddress());
|
||||
|
||||
put("SNOWFLAKE_ID", new SnowflakeID());
|
||||
}};
|
||||
|
||||
@Override
|
||||
public Map<String, UserDefinedFunction> register() {
|
||||
return R;
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
public class FindNotBlank extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("STRING") String eval(String s1, String s2) {
|
||||
if (StringUtils.isBlank(s1) && StringUtils.isNotBlank(s2)) {
|
||||
return s2;
|
||||
}
|
||||
return s1;
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
public class HasExternalIpAddress extends ScalarFunction {
|
||||
|
||||
private final IsExternalIpAddress isExternalIpAddress = new IsExternalIpAddress();
|
||||
|
||||
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
|
||||
if (null == ipaddr) {
|
||||
return false;
|
||||
}
|
||||
for (var ip : ipaddr) {
|
||||
return isExternalIpAddress.eval(ip);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
public class HasIpAddress extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
|
||||
if (null == ipaddr) {
|
||||
return false;
|
||||
}
|
||||
for (var ip : ipaddr) {
|
||||
return ip != null && IPUtil.isIPAddress(ip);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
import static com.zdjizhi.utils.IPUtil.isIPAddress;
|
||||
|
||||
public class IsExternalIpAddress extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
|
||||
if (ipaddr == null || !isIPAddress(ipaddr)) {
|
||||
return false;
|
||||
}
|
||||
return !IPUtil.internalIp(ipaddr);
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
import static com.zdjizhi.utils.IPUtil.isIPAddress;
|
||||
|
||||
public class IsInternalIpAddress extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
|
||||
if (!isIPAddress(ipaddr)) {
|
||||
return false;
|
||||
}
|
||||
return IPUtil.internalIp(ipaddr);
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
public class IsIpAddress extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
|
||||
if (null == ipaddr) {
|
||||
return false;
|
||||
}
|
||||
return IPUtil.isIPAddress(ipaddr);
|
||||
}
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
import xyz.downgoon.snowflake.Snowflake;
|
||||
|
||||
public class SnowflakeID extends ScalarFunction {
|
||||
|
||||
private static final Snowflake SNOWFLAKE = new Snowflake(1, 1);
|
||||
|
||||
public @DataTypeHint("BIGINT") Long eval() {
|
||||
return SNOWFLAKE.nextId();
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
public class SortAddress extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("STRING")
|
||||
String eval(
|
||||
String ip1, Integer port1, String ip2, Integer port2) {
|
||||
return of(Tuple2.of(ip1, port1), Tuple2.of(ip2, port2));
|
||||
}
|
||||
|
||||
public static String of(
|
||||
Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
|
||||
var list = Lists.newArrayList(a1, a2);
|
||||
list.sort((a, b) -> {
|
||||
if (a.f1.equals(b.f1)) {
|
||||
return Long.compare(
|
||||
IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
|
||||
} else {
|
||||
return a.f1.compareTo(b.f1);
|
||||
}
|
||||
});
|
||||
return String.format("%s:%s,%s:%s",
|
||||
list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
|
||||
}
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application.voip.udf;
|
||||
|
||||
import org.apache.flink.table.annotation.DataTypeHint;
|
||||
import org.apache.flink.table.functions.ScalarFunction;
|
||||
|
||||
public class StreamDir extends ScalarFunction {
|
||||
|
||||
public @DataTypeHint("INT") Integer eval(Long flags) {
|
||||
int v = 0;
|
||||
if (flags == null) {
|
||||
return v;
|
||||
}
|
||||
if ((flags & 8192) == 8192) {
|
||||
v += 1;
|
||||
}
|
||||
if ((flags & 16384) == 16384) {
|
||||
v += 2;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(8192L + 16384L);
|
||||
System.out.println(new StreamDir().eval(8192L + 16384L));
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
com.geedgenetworks.flink.easy.application.voip.VoipUDFFactory
|
||||
7
src/main/resources/app.properties
Normal file
7
src/main/resources/app.properties
Normal file
@@ -0,0 +1,7 @@
|
||||
source.kafka.topic=aaa
|
||||
source.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
source.kafka.props.group.id=flink-voip-fusion
|
||||
source.kafka.props.auto.offset.reset=earliest
|
||||
|
||||
sink.kafka.topic=bbb
|
||||
sink.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
File diff suppressed because it is too large
Load Diff
52
src/main/scala/com/zdjizhi/flink/voip/FusionApp.scala
Normal file
52
src/main/scala/com/zdjizhi/flink/voip/FusionApp.scala
Normal file
@@ -0,0 +1,52 @@
|
||||
package com.zdjizhi.flink.voip
|
||||
|
||||
import com.zdjizhi.flink.voip.conf._
|
||||
import com.zdjizhi.flink.voip.functions.TypeSplitFunction._
|
||||
import com.zdjizhi.flink.voip.functions.{SIPPairingFunction, TypeSplitFunction, VoIPFusionFunction}
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
|
||||
import org.apache.flink.api.java.utils.ParameterTool
|
||||
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import org.apache.flink.streaming.api.scala._
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
|
||||
|
||||
/**
|
||||
* App Main.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
object FusionApp {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val env = StreamExecutionEnvironment.getExecutionEnvironment
|
||||
|
||||
require(args.nonEmpty, "Error: Not found properties path. \nUsage: flink -c xxx xxx.jar app.properties.")
|
||||
|
||||
val tool = ParameterTool.fromPropertiesFile(args.head)
|
||||
|
||||
env.getConfig.setGlobalJobParameters(tool.getConfiguration)
|
||||
|
||||
val consumer = new FlinkKafkaConsumer[ObjectNode](
|
||||
tool.getConfiguration.get(FusionConfigs.SOURCE_KAFKA_TOPIC),
|
||||
new JsonNodeDeserializationSchema,
|
||||
tool.getProperties(FusionConfigs.SOURCE_KAFKA_PROPERTIES_PREFIX))
|
||||
|
||||
val schemaTypeSplitStream = env.addSource(consumer)
|
||||
.process(new TypeSplitFunction)
|
||||
|
||||
val sipStream = schemaTypeSplitStream.getSideOutput(sipSchemaTypeOutputTag)
|
||||
|
||||
val sipDoubleStream = sipStream.keyBy { i => (i.vSysID, i.callID) }
|
||||
.process(new SIPPairingFunction)
|
||||
|
||||
val rtpStream = schemaTypeSplitStream.getSideOutput(rtpSchemaTypeOutputTag)
|
||||
|
||||
rtpStream.keyBy(_.vSysID).connect(sipDoubleStream.keyBy(_.vSysID))
|
||||
.process(new VoIPFusionFunction)
|
||||
|
||||
env.execute("VoIP Fusion Job")
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.zdjizhi.flink.voip.conf
|
||||
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
|
||||
|
||||
import java.lang.{Long => JLong}
|
||||
|
||||
/**
|
||||
* An object containing configuration options for the Fusion application.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
object FusionConfigs {
|
||||
|
||||
val SOURCE_KAFKA_TOPIC: ConfigOption[String] =
|
||||
ConfigOptions.key("source.kafka.topic")
|
||||
.stringType()
|
||||
.noDefaultValue()
|
||||
.withDescription("")
|
||||
|
||||
val SINK_KAFKA_TOPIC: ConfigOption[String] =
|
||||
ConfigOptions.key("sink.kafka.topic")
|
||||
.stringType()
|
||||
.noDefaultValue()
|
||||
.withDescription("")
|
||||
|
||||
val SOURCE_KAFKA_PROPERTIES_PREFIX: String = "source.kafka.props."
|
||||
|
||||
val SINK_KAFKA_PROPERTIES_PREFIX: String = "sink.kafka.props."
|
||||
|
||||
/**
|
||||
* The configuration option for the interval at which SIP (Session Initiation Protocol) state data
|
||||
* should be cleared.
|
||||
*/
|
||||
val SIP_STATE_CLEAR_INTERVAL: ConfigOption[JLong] =
|
||||
ConfigOptions.key("")
|
||||
.longType()
|
||||
.defaultValue(Time.minutes(1).toMilliseconds)
|
||||
.withDescription("")
|
||||
|
||||
val RTP_STATE_CLEAR_INTERVAL: ConfigOption[JLong] =
|
||||
ConfigOptions.key("")
|
||||
.longType()
|
||||
.defaultValue(Time.minutes(3).toMilliseconds)
|
||||
.withDescription("")
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.zdjizhi.flink.voip.conf
|
||||
|
||||
import org.apache.flink.api.java.utils.ParameterTool
|
||||
|
||||
import java.util.Properties
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* A wrapper class that extends the Flink `ParameterTool` to provide utility methods for handling
|
||||
* properties with a specific prefix. This class allows retrieving properties that start with the
|
||||
* given `prefix` and converts them into a `java.util.Properties` object.
|
||||
*
|
||||
* @param tool The original Flink `ParameterTool` instance.
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class FusionParameterTool(tool: ParameterTool) {
|
||||
|
||||
/**
|
||||
* Retrieves properties from the underlying `ParameterTool` instance that start with the specified
|
||||
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
|
||||
*
|
||||
* @param prefix The prefix to filter properties.
|
||||
* @return A `java.util.Properties` object containing the properties with the specified prefix.
|
||||
*/
|
||||
def getProperties(prefix: String): Properties = {
|
||||
val map = tool.toMap.asScala.filterKeys(_.startsWith(prefix))
|
||||
.map { case (key, value) => (key.stripPrefix(prefix), value) }
|
||||
ParameterTool.fromMap(map.asJava).getProperties
|
||||
}
|
||||
|
||||
}
|
||||
11
src/main/scala/com/zdjizhi/flink/voip/conf/package.scala
Normal file
11
src/main/scala/com/zdjizhi/flink/voip/conf/package.scala
Normal file
@@ -0,0 +1,11 @@
|
||||
package com.zdjizhi.flink.voip
|
||||
|
||||
import org.apache.flink.api.java.utils.ParameterTool
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
package object conf {
|
||||
|
||||
implicit def asFusionParameterTool(tool: ParameterTool): FusionParameterTool = new FusionParameterTool(tool)
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import org.apache.flink.api.common.functions.RichFunction
|
||||
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.configuration.Configuration
|
||||
import org.apache.flink.streaming.api.TimerService
|
||||
|
||||
|
||||
/**
|
||||
* A trait that provides utility functions for Flink functions.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
trait FunctionHelper extends RichFunction {
|
||||
|
||||
/**
|
||||
* Get the global configuration for the current Flink job.
|
||||
*
|
||||
* @return The global configuration as a Configuration object.
|
||||
*/
|
||||
def getGlobalConfiguration: Configuration = getRuntimeContext.getExecutionConfig
|
||||
.getGlobalJobParameters.asInstanceOf[Configuration]
|
||||
|
||||
/**
|
||||
* Get a MapState with the given name, key, and value types.
|
||||
*
|
||||
* @param name The name of the MapState.
|
||||
* @param key The class representing the type of keys in the MapState.
|
||||
* @param value The class representing the type of values in the MapState.
|
||||
* @tparam K The type of keys in the MapState.
|
||||
* @tparam V The type of values in the MapState.
|
||||
* @return The MapState with the given name, key, and value types.
|
||||
*/
|
||||
def getMapState[K, V](name: String, key: Class[K], value: Class[V]): MapState[K, V] = {
|
||||
val descriptor = new MapStateDescriptor[K, V](name, key, value)
|
||||
getRuntimeContext.getMapState(descriptor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the next fire timestamp for the given TimerService and fire interval.
|
||||
*
|
||||
* @param timeService The TimerService used to register the timer.
|
||||
* @param fireInterval The interval at which the timer should fire, represented as a Time object.
|
||||
*/
|
||||
def registerNextFireTimestamp(timeService: TimerService, fireInterval: Time): Unit = {
|
||||
val currentTime = timeService.currentProcessingTime()
|
||||
val nextFireTime = (currentTime / fireInterval.toMilliseconds + 1) * fireInterval.toMilliseconds
|
||||
timeService.registerProcessingTimeTimer(nextFireTime)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs.SIP_STATE_CLEAR_INTERVAL
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
|
||||
import com.zdjizhi.flink.voip.records.{Record, StreamDirs}
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode}
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
|
||||
import org.apache.flink.util.Collector
|
||||
|
||||
/**
|
||||
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
||||
* SIP records are paired when they have the same addresses but opposite stream directions.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class SIPPairingFunction extends KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]
|
||||
with FunctionHelper {
|
||||
|
||||
private lazy val fireInterval: Time = Time.milliseconds(getGlobalConfiguration.get(SIP_STATE_CLEAR_INTERVAL))
|
||||
// A MapState to store SIP records with their addresses and expiration time.
|
||||
private lazy val mapState = getMapState("sip-state", classOf[Address], classOf[ObjectNodeWithExpiration])
|
||||
|
||||
override def processElement(obj: ObjectNode,
|
||||
ctx: KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
// Register a timer for the next clearing interval.
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
|
||||
// When SIP is a one-way stream.
|
||||
if (obj.streamDir != StreamDirs.DOUBLE) {
|
||||
// Create an Address instance based on server and client IPs and ports.
|
||||
val address = Address((obj.serverIp, obj.serverPort), (obj.clientIp, obj.clientPort))
|
||||
// If the address is already stored in the mapState and has the opposite stream direction,
|
||||
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
|
||||
if (mapState.contains(address) && mapState.get(address).obj.streamDir != obj.streamDir /* TODO consider stream direction */ ) {
|
||||
obj.merge(mapState.get(address).obj)
|
||||
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.DOUBLE))
|
||||
out.collect(obj)
|
||||
mapState.remove(address)
|
||||
} else {
|
||||
// If the address is not yet in the mapState, add it with its expiration time.
|
||||
val value = ObjectNodeWithExpiration(obj,
|
||||
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds)
|
||||
mapState.put(address, value)
|
||||
}
|
||||
} else {
|
||||
// If SIP is a double stream, pairing isn't required, directly output the record.
|
||||
out.collect(obj)
|
||||
}
|
||||
}
|
||||
|
||||
override def onTimer(timestamp: Long,
|
||||
ctx: KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]#OnTimerContext,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
|
||||
val iterator = mapState.entries().iterator()
|
||||
while (iterator.hasNext) {
|
||||
val entry = iterator.next()
|
||||
// Remove expired entries from the mapState based on their expiration time.
|
||||
if (entry.getValue.expireTime <= timestamp) {
|
||||
mapState.remove(entry.getKey)
|
||||
}
|
||||
}
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
|
||||
import com.zdjizhi.flink.voip.functions.TypeSplitFunction._
|
||||
import com.zdjizhi.flink.voip.records.Record.Implicits._
|
||||
import com.zdjizhi.flink.voip.records.SchemaTypes._
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction
|
||||
import org.apache.flink.streaming.api.scala._
|
||||
import org.apache.flink.util.Collector
|
||||
|
||||
/**
|
||||
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
|
||||
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class TypeSplitFunction extends ProcessFunction[ObjectNode, ObjectNode] {
|
||||
|
||||
override def processElement(obj: ObjectNode,
|
||||
ctx: ProcessFunction[ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
// Split record based on its 'schemaType' field.
|
||||
obj.schemaType match {
|
||||
case SIP => ctx.output(sipSchemaTypeOutputTag, obj)
|
||||
case RTP => ctx.output(rtpSchemaTypeOutputTag, obj)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Defining the OutputTags for SIP and RTP records.
|
||||
*/
|
||||
object TypeSplitFunction {
|
||||
|
||||
/**
|
||||
* OutputTag for SIP records.
|
||||
*/
|
||||
val sipSchemaTypeOutputTag: OutputTag[ObjectNode] =
|
||||
OutputTag[ObjectNode]("schema-type-sip")
|
||||
|
||||
/**
|
||||
* OutputTag for RTP records.
|
||||
*/
|
||||
val rtpSchemaTypeOutputTag: OutputTag[ObjectNode] =
|
||||
OutputTag[ObjectNode]("schema-type-rtp")
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs.RTP_STATE_CLEAR_INTERVAL
|
||||
import com.zdjizhi.flink.voip.records.{Record, SchemaTypes, StreamDirs}
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ObjectNode, TextNode}
|
||||
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
|
||||
import org.apache.flink.util.Collector
|
||||
|
||||
/**
|
||||
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
|
||||
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
|
||||
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
|
||||
* RTP records, and it uses timers to trigger regular clearing of the state.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class VoIPFusionFunction extends KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode] with FunctionHelper {
|
||||
|
||||
// The maximum number of RTP lines allowed per SIP for fusion.
|
||||
private val MAX_RTP_LINES: Int = 2
|
||||
private lazy val fireInterval: Time = Time.milliseconds(getGlobalConfiguration.get(RTP_STATE_CLEAR_INTERVAL))
|
||||
private lazy val sipDoubleState = getMapState("sip-state", classOf[Address], classOf[ObjectNodeWithInfo])
|
||||
private lazy val rtpState = getMapState("rtp-state", classOf[Address], classOf[ObjectNodeWithExpiration])
|
||||
|
||||
override def processElement1(obj: ObjectNode,
|
||||
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
|
||||
val address = Address((obj.originatorSdpConnectIp, obj.originatorSdpMediaPort),
|
||||
(obj.responderSdpConnectIp, obj.responderSdpMediaPort))
|
||||
|
||||
sipDoubleState.put(address, ObjectNodeWithInfo(obj,
|
||||
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds, 0))
|
||||
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
|
||||
override def processElement2(obj: ObjectNode,
|
||||
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
import com.zdjizhi.flink.voip.records.Record.Implicits._
|
||||
val address = Address((obj.serverIp, obj.serverPort), (obj.clientIp, obj.clientPort))
|
||||
if (sipDoubleState.contains(address)) {
|
||||
val info = sipDoubleState.get(address)
|
||||
obj.merge(info.obj)
|
||||
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaTypes.VoIP))
|
||||
out.collect(obj)
|
||||
|
||||
obj.streamDir match {
|
||||
case StreamDirs.DOUBLE =>
|
||||
// In the context of VoIP fusion, only one RTP double directional stream
|
||||
sipDoubleState.remove(address)
|
||||
case _ =>
|
||||
// Save the number of fused RTP unidirectional streams
|
||||
sipDoubleState.put(address, info.copy(times = info.times + 1))
|
||||
}
|
||||
} else {
|
||||
rtpState.put(address, ObjectNodeWithExpiration(obj,
|
||||
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds))
|
||||
}
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
|
||||
override def onTimer(timestamp: Long,
|
||||
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#OnTimerContext,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
import com.zdjizhi.flink.voip.records.Record.Implicits._
|
||||
val iterator = rtpState.iterator()
|
||||
while (iterator.hasNext) {
|
||||
val entry = iterator.next()
|
||||
val obj = entry.getValue.obj
|
||||
val address = entry.getKey
|
||||
|
||||
if (sipDoubleState.contains(address)) {
|
||||
val info = sipDoubleState.get(address)
|
||||
obj.streamDir match {
|
||||
case StreamDirs.DOUBLE =>
|
||||
sipDoubleState.remove(address)
|
||||
case _ if info.times >= MAX_RTP_LINES - 1 =>
|
||||
// One RTP unidirectional stream has already been fused
|
||||
sipDoubleState.remove(address)
|
||||
}
|
||||
obj.merge(info.obj)
|
||||
.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaTypes.VoIP))
|
||||
out.collect(obj)
|
||||
}
|
||||
|
||||
if (entry.getValue.expireTime <= timestamp) {
|
||||
rtpState.remove(entry.getKey)
|
||||
}
|
||||
}
|
||||
sipDoubleState.iterator().forEachRemaining(entry => {
|
||||
if (entry.getValue.expireTime <= timestamp) {
|
||||
sipDoubleState.remove(entry.getKey)
|
||||
}
|
||||
})
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.utils.IPUtil
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
/**
|
||||
* A case class representing an address with two IP and port pairs.
|
||||
*
|
||||
* @param ip1 The first IP address.
|
||||
* @param port1 The first port number.
|
||||
* @param ip2 The second IP address.
|
||||
* @param port2 The second port number.
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
case class Address(ip1: String, port1: Int, ip2: String, port2: Int)
|
||||
|
||||
|
||||
object Address {
|
||||
|
||||
/**
|
||||
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
|
||||
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
|
||||
* the numeric value of the IP address.
|
||||
*
|
||||
* @param address1 The first address information as a tuple (IP address, port).
|
||||
* @param address2 The second address information as a tuple (IP address, port).
|
||||
* @return An Address instance with addresses sorted and reordered.
|
||||
*/
|
||||
def apply(address1: (String, Int), address2: (String, Int)): Address = {
|
||||
val seq = (address1 :: address2 :: Nil).sortWith {
|
||||
case (a, b) =>
|
||||
if (a._2 == b._2) {
|
||||
IPUtil.getIpDesimal(a._1) < IPUtil.getIpDesimal(b._1)
|
||||
} else {
|
||||
a._2 < b._2
|
||||
}
|
||||
}
|
||||
// Create an Address instance with the first address having the smaller port number,
|
||||
// and if the ports are equal, the smaller IP address comes first.
|
||||
Address(seq.head._1, seq.head._2, seq.last._1, seq.last._2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A case class representing an ObjectNode with an expiration time.
|
||||
*
|
||||
* @param obj The ObjectNode containing data.
|
||||
* @param expireTime The expiration time for the object.
|
||||
*/
|
||||
case class ObjectNodeWithExpiration(obj: ObjectNode, expireTime: Long)
|
||||
|
||||
/**
|
||||
* A case class representing an ObjectNode with an expiration time and a pair times.
|
||||
*
|
||||
* @param obj The ObjectNode containing data.
|
||||
* @param expireTime The expiration time for the object.
|
||||
* @param times The pair times for the object.
|
||||
*/
|
||||
case class ObjectNodeWithInfo(obj: ObjectNode, expireTime: Long, times: Int)
|
||||
137
src/main/scala/com/zdjizhi/flink/voip/records/Record.scala
Normal file
137
src/main/scala/com/zdjizhi/flink/voip/records/Record.scala
Normal file
@@ -0,0 +1,137 @@
|
||||
package com.zdjizhi.flink.voip.records
|
||||
|
||||
import com.zdjizhi.flink.voip.records.Record._
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* 用于解析和访问数据记录的类。
|
||||
*
|
||||
* @param obj 包含数据记录的 ObjectNode 对象
|
||||
* @constructor 创建一个 Record 对象,用于解析和访问数据记录
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class Record(obj: ObjectNode) {
|
||||
protected implicit val o: ObjectNode = obj
|
||||
/**
|
||||
* 数据记录中的所属 vsys
|
||||
*/
|
||||
val vSysID: Int = getInt(F_COMMON_VSYS_ID)
|
||||
|
||||
/**
|
||||
* 数据记录中的字段类型
|
||||
*/
|
||||
val schemaType: String = getString(F_COMMON_SCHEMA_TYPE)
|
||||
|
||||
/**
|
||||
* 数据记录中的流类型
|
||||
*/
|
||||
val streamDir: Int = getInt(F_COMMON_STREAM_DIR)
|
||||
|
||||
/**
|
||||
* 数据记录中的服务端地址字段值
|
||||
*/
|
||||
val serverIp: String = getString(F_COMMON_SERVER_IP)
|
||||
|
||||
/**
|
||||
* 数据记录中的服务端端口
|
||||
*/
|
||||
val serverPort: Int = getInt(F_COMMON_SERVER_PORT)
|
||||
|
||||
/**
|
||||
* 数据记录中的客户端地址
|
||||
*/
|
||||
val clientIp: String = getString(F_COMMON_CLIENT_IP)
|
||||
|
||||
/**
|
||||
* 数据记录中的客户端端口
|
||||
*/
|
||||
val clientPort: Int = getInt(F_COMMON_CLIENT_PORT)
|
||||
|
||||
def merge(obj: ObjectNode): ObjectNode = {
|
||||
obj.fields().forEachRemaining(entry => {
|
||||
o.set(entry.getKey, entry.getValue)
|
||||
})
|
||||
o
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 用于解析和访问数据记录。
|
||||
*
|
||||
* @see Record
|
||||
*/
|
||||
object Record {
|
||||
|
||||
/**
|
||||
* 所属 vsys 字段名
|
||||
*/
|
||||
val F_COMMON_VSYS_ID = "common_vsys_id"
|
||||
|
||||
/**
|
||||
* 字段类型 字段名
|
||||
*/
|
||||
val F_COMMON_SCHEMA_TYPE = "common_schema_type"
|
||||
|
||||
/**
|
||||
* 流类型 字段名
|
||||
*/
|
||||
val F_COMMON_STREAM_DIR = "common_stream_dir"
|
||||
|
||||
/**
|
||||
* 服务端地址 字段名
|
||||
*/
|
||||
val F_COMMON_SERVER_IP = "common_server_ip"
|
||||
|
||||
/**
|
||||
* 服务端端口 字段名
|
||||
*/
|
||||
val F_COMMON_SERVER_PORT = "common_server_port"
|
||||
|
||||
/**
|
||||
* 客户端地址 字段名
|
||||
*/
|
||||
val F_COMMON_CLIENT_IP = "common_client_ip"
|
||||
|
||||
/**
|
||||
* 客户端端口 字段名
|
||||
*/
|
||||
val F_COMMON_CLIENT_PORT = "common_client_port"
|
||||
|
||||
/**
|
||||
* 从 ObjectNode 对象中获取整数类型字段值。
|
||||
*
|
||||
* @param field 字段名
|
||||
* @param default 默认值(可选,默认为 0)
|
||||
* @param o 包含字段的 ObjectNode 对象
|
||||
* @return 字段对应的整数值
|
||||
*/
|
||||
def getInt(field: String, default: Int = 0)(implicit o: ObjectNode): Int = {
|
||||
val node = o.get(field)
|
||||
if (node != null && node.isInt) {
|
||||
node.asInt(default)
|
||||
} else default
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 ObjectNode 对象中获取字符串类型字段值。
|
||||
*
|
||||
* @param field 字段名
|
||||
* @param default 默认值(可选,默认为 null)
|
||||
* @param o 包含字段的 ObjectNode 对象
|
||||
* @return 字段对应的字符串值
|
||||
*/
|
||||
def getString(field: String, default: String = null)(implicit o: ObjectNode): String = {
|
||||
val node = o.get(field)
|
||||
if (node != null && node.isTextual) {
|
||||
node.asText(default)
|
||||
} else default
|
||||
}
|
||||
|
||||
object Implicits {
|
||||
implicit def asRecord(o: ObjectNode): Record = new Record(o)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package com.zdjizhi.flink.voip.records
|
||||
|
||||
import com.zdjizhi.flink.voip.records.Record._
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord._
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* SIP(Session Initiation Protocol)数据记录类,用于解析和访问SIP数据记录。
|
||||
*
|
||||
* @param obj 包含 SIP 数据记录的 ObjectNode 对象
|
||||
* @constructor 创建一个 SIPRecord 对象,用于解析和访问 SIP 数据记录
|
||||
* @see Record
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class SIPRecord(obj: ObjectNode) extends Record(obj) {
|
||||
|
||||
/**
|
||||
* SIP 通话的会话 ID 字段值
|
||||
*/
|
||||
val callID: String = getString(F_CALL_ID)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的主叫语音传输 IP 字段值
|
||||
*/
|
||||
val originatorSdpConnectIp: String = getString(F_ORIGINATOR_SDP_CONNECT_IP)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的主叫语音传输端口字段值
|
||||
*/
|
||||
val originatorSdpMediaPort: Int = getInt(F_ORIGINATOR_SDP_MEDIA_PORT)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的被叫语音传输 IP 字段值
|
||||
*/
|
||||
val responderSdpConnectIp: String = getString(F_RESPONDER_SDP_CONNECT_IP)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的被叫语音传输端口字段值
|
||||
*/
|
||||
val responderSdpMediaPort: Int = getInt(F_RESPONDER_SDP_MEDIA_PORT)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 用于解析和访问 SIP 数据记录。
|
||||
*
|
||||
* @see SIPRecord
|
||||
*/
|
||||
object SIPRecord {
|
||||
|
||||
/**
|
||||
* 会话ID 字段名
|
||||
*/
|
||||
val F_CALL_ID = "sip_call_id"
|
||||
/**
|
||||
* 协调的主叫语音传输IP 字段名
|
||||
*/
|
||||
val F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip"
|
||||
/**
|
||||
* 协调的主叫语音传输端口 字段名
|
||||
*/
|
||||
val F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port"
|
||||
/**
|
||||
* 协调的被叫语音传输IP 字段名
|
||||
*/
|
||||
val F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip"
|
||||
/**
|
||||
* 协调的被叫语音传输端口 字段名
|
||||
*/
|
||||
val F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port"
|
||||
|
||||
private[voip] object Implicits {
|
||||
implicit def asSIPRecord(o: ObjectNode): SIPRecord = {
|
||||
new SIPRecord(o)
|
||||
}
|
||||
}
|
||||
}
|
||||
14
src/main/scala/com/zdjizhi/flink/voip/records/enums.scala
Normal file
14
src/main/scala/com/zdjizhi/flink/voip/records/enums.scala
Normal file
@@ -0,0 +1,14 @@
|
||||
package com.zdjizhi.flink.voip.records
|
||||
|
||||
object SchemaTypes {
|
||||
|
||||
val SIP = "SIP"
|
||||
val RTP = "RTP"
|
||||
val VoIP = "VoIP"
|
||||
}
|
||||
|
||||
object StreamDirs {
|
||||
val C2S = 1
|
||||
val S2C = 2
|
||||
val DOUBLE = 3
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
## Changelog
|
||||
|
||||
### 2.0
|
||||
|
||||
- [GAL-602](https://jira.geedge.net/browse/GAL-602) 基于 Easy Stream 框架的配置化改造。
|
||||
@@ -1,11 +0,0 @@
|
||||
## Deploy
|
||||
|
||||
- 准备 JDK ${java.version} 的环境
|
||||
- 准备 Flink ${flink.version} 的环境
|
||||
- [下载](./download.html) 对应版本 UDF 依赖 Jar
|
||||
- [下载](./download.html) 对应版本 Job 配置 (一个 yml 文件)
|
||||
- 执行命令 `flink run -Dflink.rest.bind-port=8081 -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml`
|
||||
- 您将在控制台看到启动日志,同时您可以在 `http://<you-host>:8081` 看到任务 UI。
|
||||
|
||||
> 注意:
|
||||
> SIP 和 RTP 融合作业强烈建议开启 Checkpoint 机制,否则将会由于丢失数据或重复数据导致业务数据关联错误。
|
||||
@@ -1,8 +0,0 @@
|
||||
## Download
|
||||
|
||||
### ${project.version}
|
||||
|
||||
| Easy Stream | UDF Jar | Job |
|
||||
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
|
||||
| ${easy.stream.version} | [JAR](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.sha1) ) | [YML](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.sha1) ) |
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
## SIP RTP Correlation
|
||||
|
||||
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。
|
||||
|
||||
SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
|
||||
|
||||
<br/>
|
||||
|
||||
|
||||
You can download the latest release from [Downloads](./download.html). And you can changelog from [Changelogs](./changelogs.html).
|
||||
@@ -1,13 +0,0 @@
|
||||
#banner {
|
||||
height: 108px;
|
||||
background: none;
|
||||
}
|
||||
|
||||
#bannerLeft img {
|
||||
margin-left: 18px;
|
||||
margin-top: 10px;
|
||||
}
|
||||
|
||||
div.well {
|
||||
display: none;
|
||||
}
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 4.8 KiB |
@@ -1,56 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain maven-site.vm copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project name="SIP RTP Correlate">
|
||||
<bannerLeft>
|
||||
<name>Easy Stream</name>
|
||||
<src>images/logo.png</src>
|
||||
<href>#</href>
|
||||
</bannerLeft>
|
||||
|
||||
<publishDate position="right"/>
|
||||
<version position="right"/>
|
||||
|
||||
<skin>
|
||||
<groupId>org.apache.maven.skins</groupId>
|
||||
<artifactId>maven-fluido-skin</artifactId>
|
||||
<version>1.10.0</version>
|
||||
</skin>
|
||||
|
||||
<custom>
|
||||
<fluidoSkin>
|
||||
<sourceLineNumbersEnabled>true</sourceLineNumbersEnabled>
|
||||
</fluidoSkin>
|
||||
</custom>
|
||||
|
||||
<body>
|
||||
<breadcrumbs position="left">
|
||||
<item name="Galaxy" href="#"/>
|
||||
<item name="Platform" href="#"/>
|
||||
<item name="Easy Stream" href="#"/>
|
||||
<item name="Application" href="#"/>
|
||||
</breadcrumbs>
|
||||
|
||||
<menu name="OVERVIEW" inherit="top">
|
||||
<item name="Introduction" href="index.html"/>
|
||||
<item name="Deploy" href="deploy.html"/>
|
||||
<item name="Download" href="download.html"/>
|
||||
</menu>
|
||||
|
||||
<footer>
|
||||
<![CDATA[ Copyright ©2022 <a href="#">Galaxy Platform</a>. All rights reserved.]]>
|
||||
</footer>
|
||||
</body>
|
||||
</project>
|
||||
@@ -1,33 +0,0 @@
|
||||
package com.geedgenetworks.flink.easy.application;
|
||||
|
||||
import com.geedgenetworks.flink.easy.core.Runners;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class ApplicationTest {
|
||||
|
||||
static {
|
||||
System.setProperty("easy.execute.mode", "validate");
|
||||
System.setProperty("flink.rest.bind-port", "8081");
|
||||
// System.setProperty("flink.rest.flamegraph.enabled", "true");
|
||||
System.setProperty("flink.heartbeat.timeout", "1800000");
|
||||
}
|
||||
|
||||
public static String discoverConfiguration(final String name) throws Exception {
|
||||
var path = String.format("/jobs/%s.yml", name);
|
||||
var resource = ApplicationTest.class.getResource(path);
|
||||
if (resource == null) {
|
||||
// maven
|
||||
resource = ApplicationTest.class.getResource(String.format("../classes/%s", path));
|
||||
}
|
||||
if (resource == null) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Not found job '%s' in path [%s].", name, path));
|
||||
}
|
||||
return resource.getPath();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJob() throws Exception {
|
||||
Runners.run(discoverConfiguration("job"));
|
||||
}
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240001,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.64.8","server_ip":"192.168.39.62","client_port":25524,"server_port":4580,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830004000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
|
||||
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240002,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.1","server_ip":"192.0.2.1","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1025,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000100,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":8192,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"NGMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.85","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
|
||||
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240003,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.164.18","server_ip":"192.168.39.162","client_port":65121,"server_port":4670,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
|
||||
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240004,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.2","server_ip":"192.0.2.2","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"IUMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
|
||||
7
src/test/resources/test.properties
Normal file
7
src/test/resources/test.properties
Normal file
@@ -0,0 +1,7 @@
|
||||
source.kafka.topic=aaa
|
||||
source.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
source.kafka.props.group.id=flink-voip-fusion
|
||||
source.kafka.props.auto.offset.reset=earliest
|
||||
|
||||
sink.kafka.topic=bbb
|
||||
sink.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
54
src/test/scala/com/zdjizhi/flink/voip/data/Generator.scala
Normal file
54
src/test/scala/com/zdjizhi/flink/voip/data/Generator.scala
Normal file
@@ -0,0 +1,54 @@
|
||||
package com.zdjizhi.flink.voip.data
|
||||
|
||||
import com.github.javafaker.Faker
|
||||
import com.zdjizhi.flink.voip.data.Generator.random
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
// 生成数据,有效数据即可以双向匹配的数据占比 [ratio/100]
|
||||
abstract class Generator[T](ratio: Int) {
|
||||
|
||||
require(ratio >= 0 && ratio <= 100,
|
||||
"Param 'ratio' is limited to 0-100 integers only.")
|
||||
|
||||
private lazy val state = new AtomicReference[T]()
|
||||
|
||||
final def next: T = {
|
||||
val i = random.nextInt(100)
|
||||
val v = if (i < ratio && state.get() != null) {
|
||||
afterState(state.get())
|
||||
} else {
|
||||
state.updateAndGet(_ => this.generate)
|
||||
}
|
||||
state.set(null.asInstanceOf[T])
|
||||
afterAll(v)
|
||||
}
|
||||
|
||||
protected def generate: T
|
||||
|
||||
protected def afterState(v: T): T
|
||||
|
||||
protected def afterAll(t: T): T = t
|
||||
|
||||
}
|
||||
|
||||
object Generator {
|
||||
|
||||
val random: ThreadLocalRandom = ThreadLocalRandom.current()
|
||||
|
||||
private val faker = new Faker()
|
||||
|
||||
def nextIP: String = {
|
||||
if (random.nextBoolean()) {
|
||||
faker.internet().ipV4Address()
|
||||
} else {
|
||||
faker.internet().ipV6Address()
|
||||
}
|
||||
}
|
||||
|
||||
def nextID: String = faker.idNumber().valid()
|
||||
|
||||
def nextPort: Int = random.nextInt(65535)
|
||||
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.zdjizhi.flink.voip.data
|
||||
|
||||
import com.zdjizhi.flink.voip.records.{Record, SIPRecord, StreamDirs}
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode, TextNode}
|
||||
|
||||
// 生成 SIP 数据行
|
||||
class SIPGenerator(ratio: Int = 40) extends Generator[ObjectNode](ratio) {
|
||||
|
||||
private val mapper = new ObjectMapper()
|
||||
|
||||
override protected def generate: ObjectNode = {
|
||||
val obj = mapper.createObjectNode()
|
||||
|
||||
obj.set(SIPRecord.F_CALL_ID, TextNode.valueOf(Generator.nextID))
|
||||
|
||||
val dir = if (Generator.random.nextBoolean()) StreamDirs.S2C else StreamDirs.C2S
|
||||
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(dir))
|
||||
|
||||
obj.set(Record.F_COMMON_SERVER_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(Record.F_COMMON_SERVER_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
obj.set(Record.F_COMMON_CLIENT_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(Record.F_COMMON_CLIENT_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
|
||||
obj.set(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
obj.set(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
|
||||
obj
|
||||
}
|
||||
|
||||
override def afterState(v: ObjectNode): ObjectNode = {
|
||||
import Record.Implicits._
|
||||
v.streamDir match {
|
||||
case StreamDirs.C2S =>
|
||||
v.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.S2C))
|
||||
case StreamDirs.S2C =>
|
||||
v.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.C2S))
|
||||
case _ =>
|
||||
}
|
||||
v
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
|
||||
class AddressTest extends AnyFlatSpec {
|
||||
|
||||
"Address.apply" should "return the Address with ports sorted and IPs reordered if necessary" in {
|
||||
val address1 = ("192.168.0.1", 80)
|
||||
val address2 = ("10.1.1.1", 8080)
|
||||
|
||||
val result1 = Address(address1, address2)
|
||||
assert(result1 == Address("192.168.0.1", 80, "10.1.1.1", 8080))
|
||||
|
||||
val result2 = Address(address2, address1)
|
||||
assert(result2 == Address("192.168.0.1", 80, "10.1.1.1", 8080))
|
||||
|
||||
val address3 = ("172.16.0.1", 80)
|
||||
val result3 = Address(address3, address1)
|
||||
assert(result3 == Address("172.16.0.1", 80, "192.168.0.1", 80))
|
||||
|
||||
val address4 = ("172.31.0.1", 443)
|
||||
val result4 = Address(address1, address4)
|
||||
assert(result4 == Address("192.168.0.1", 80, "172.31.0.1", 443))
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs
|
||||
import com.zdjizhi.flink.voip.data.SIPGenerator
|
||||
import com.zdjizhi.flink.voip.records.{Record, StreamDirs}
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation
|
||||
import org.apache.flink.api.java.functions.KeySelector
|
||||
import org.apache.flink.configuration.Configuration
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode}
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
|
||||
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
|
||||
import java.lang.{Long => JLong}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
class SIPPairingFunctionTest extends AnyFlatSpec with BeforeAndAfter {
|
||||
|
||||
private val keySelector: KeySelector[ObjectNode, Int] = (_: ObjectNode) => 0
|
||||
|
||||
private var testHarness: KeyedOneInputStreamOperatorTestHarness[Int, ObjectNode, ObjectNode] = _
|
||||
|
||||
private val interval: JLong = Time.minutes(5).toMilliseconds
|
||||
|
||||
before {
|
||||
val func = new SIPPairingFunction
|
||||
val operator = new KeyedProcessOperator(func)
|
||||
val tpe = TypeInformation.of(classOf[Int])
|
||||
testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, tpe)
|
||||
|
||||
val config = new Configuration()
|
||||
config.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, interval)
|
||||
testHarness.getExecutionConfig.setGlobalJobParameters(config)
|
||||
testHarness.open()
|
||||
}
|
||||
|
||||
it should "properly pair SIP records" in {
|
||||
val sipGenerator = new SIPGenerator()
|
||||
val current = System.currentTimeMillis()
|
||||
|
||||
val obj = sipGenerator.next
|
||||
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.DOUBLE))
|
||||
val record = new StreamRecord(obj, current)
|
||||
|
||||
testHarness.processElement(record)
|
||||
assert(testHarness.getOutput.asScala.toSet.contains(record))
|
||||
|
||||
val nextFireTime = (testHarness.getProcessingTime / interval + 1) * interval
|
||||
assert(testHarness.getProcessingTimeService.getActiveTimerTimestamps.contains(nextFireTime))
|
||||
|
||||
val obj1 = obj.deepCopy()
|
||||
obj1.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.S2C))
|
||||
|
||||
val obj2 = obj.deepCopy()
|
||||
obj2.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.C2S))
|
||||
|
||||
val descriptor = new MapStateDescriptor("sip-state", classOf[Address], classOf[ObjectNodeWithExpiration])
|
||||
|
||||
testHarness.processElement(obj1, current)
|
||||
val mapState = testHarness.getOperator.getKeyedStateStore.getMapState(descriptor)
|
||||
assert(mapState.values().asScala.map(_.obj).toSet(obj1))
|
||||
|
||||
testHarness.processElement(obj2, current)
|
||||
assert(!mapState.values().asScala.map(_.obj).toSet(obj1))
|
||||
assert(testHarness.getOutput.asScala.toSet.contains(record))
|
||||
assert(testHarness.getOutput.asScala.size == 2)
|
||||
}
|
||||
|
||||
}
|
||||
44
tools/dist/target.xml
vendored
44
tools/dist/target.xml
vendored
@@ -1,44 +0,0 @@
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
<assembly
|
||||
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id>bin</id>
|
||||
|
||||
<formats>
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources/jobs</directory>
|
||||
<includes>
|
||||
<include>*.yml</include>
|
||||
</includes>
|
||||
<fileMode>0755</fileMode>
|
||||
<lineEnding>lf</lineEnding>
|
||||
<directoryMode>0644</directoryMode>
|
||||
<outputDirectory>./</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
|
||||
</assembly>
|
||||
@@ -1,394 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<!DOCTYPE module PUBLIC
|
||||
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
|
||||
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
|
||||
|
||||
<!--
|
||||
This is a checkstyle configuration file. For descriptions of
|
||||
what the following rules do, please see the checkstyle configuration
|
||||
page at http://checkstyle.sourceforge.net/config.html.
|
||||
-->
|
||||
|
||||
<module name="Checker">
|
||||
|
||||
<module name="RegexpSingleline">
|
||||
<!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
|
||||
<property name="format" value="((//.*)|(\*.*))TODO\("/>
|
||||
<property name="message" value="TODO comments must not include usernames."/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="RegexpSingleline">
|
||||
<property name="format" value="\s+$"/>
|
||||
<property name="message" value="Trailing whitespace"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="RegexpSingleline">
|
||||
<property name="format" value="Throwables.propagate\("/>
|
||||
<property name="message" value="Throwables.propagate is deprecated"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<!-- Prevent *Tests.java as tools may not pick them up -->
|
||||
<module name="RegexpOnFilename">
|
||||
<property name="fileNamePattern" value=".*Tests\.java$"/>
|
||||
</module>
|
||||
|
||||
<module name="SuppressionFilter">
|
||||
<property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/>
|
||||
</module>
|
||||
|
||||
<module name="FileLength">
|
||||
<property name="max" value="3000"/>
|
||||
</module>
|
||||
|
||||
<!-- All Java AST specific tests live under TreeWalker module. -->
|
||||
<module name="TreeWalker">
|
||||
|
||||
<!-- Allow use of comment to suppress javadocstyle -->
|
||||
<module name="SuppressionCommentFilter">
|
||||
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
|
||||
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
|
||||
<property name="checkFormat" value="$1"/>
|
||||
</module>
|
||||
|
||||
<!-- Prohibit T.getT() methods for standard boxed types -->
|
||||
<module name="Regexp">
|
||||
<property name="format" value="Boolean\.getBoolean"/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="Use System.getProperties() to get system properties."/>
|
||||
</module>
|
||||
|
||||
<module name="Regexp">
|
||||
<property name="format" value="Integer\.getInteger"/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="Use System.getProperties() to get system properties."/>
|
||||
</module>
|
||||
|
||||
<module name="Regexp">
|
||||
<property name="format" value="Long\.getLong"/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="Use System.getProperties() to get system properties."/>
|
||||
</module>
|
||||
|
||||
<!--
|
||||
|
||||
IllegalImport cannot blacklist classes so we have to fall back to Regexp.
|
||||
|
||||
-->
|
||||
|
||||
<!-- forbid use of commons lang validate -->
|
||||
<module name="Regexp">
|
||||
<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message"
|
||||
value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
|
||||
</module>
|
||||
<module name="Regexp">
|
||||
<property name="format" value="org\.apache\.commons\.lang\."/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="Use commons-lang3 instead of commons-lang."/>
|
||||
</module>
|
||||
<module name="Regexp">
|
||||
<property name="format" value="org\.codehaus\.jettison"/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="Use flink-shaded-jackson instead of jettison."/>
|
||||
</module>
|
||||
<module name="Regexp">
|
||||
<property name="format" value="org\.testcontainers\.shaded"/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message"
|
||||
value="Use utilities from appropriate library instead of org.testcontainers."/>
|
||||
</module>
|
||||
|
||||
<!-- Enforce Java-style array declarations -->
|
||||
<module name="ArrayTypeStyle"/>
|
||||
|
||||
<module name="TodoComment">
|
||||
<!-- Checks that disallowed strings are not used in comments. -->
|
||||
<property name="format" value="(FIXME)|(XXX)"/>
|
||||
</module>
|
||||
|
||||
<!--
|
||||
|
||||
IMPORT CHECKS
|
||||
|
||||
-->
|
||||
|
||||
<module name="RedundantImport">
|
||||
<!-- Checks for redundant import statements. -->
|
||||
<property name="severity" value="error"/>
|
||||
<message key="import.redundancy"
|
||||
value="Redundant import {0}."/>
|
||||
</module>
|
||||
|
||||
<module name="IllegalImport">
|
||||
<property name="illegalPkgs"
|
||||
value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged"/>
|
||||
</module>
|
||||
<module name="IllegalImport">
|
||||
<property name="illegalPkgs" value="com.fasterxml.jackson"/>
|
||||
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
|
||||
</module>
|
||||
<module name="IllegalImport">
|
||||
<property name="illegalPkgs" value="org.codehaus.jackson"/>
|
||||
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
|
||||
</module>
|
||||
<module name="IllegalImport">
|
||||
<property name="illegalPkgs" value="org.objectweb.asm"/>
|
||||
<message key="import.illegal" value="{0}; Use flink-shaded-asm instead."/>
|
||||
</module>
|
||||
<module name="IllegalImport">
|
||||
<property name="illegalPkgs" value="io.netty"/>
|
||||
<message key="import.illegal" value="{0}; Use flink-shaded-netty instead."/>
|
||||
</module>
|
||||
|
||||
<module name="RedundantModifier">
|
||||
<!-- Checks for redundant modifiers on various symbol definitions.
|
||||
See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
|
||||
|
||||
We exclude METHOD_DEF to allow final methods in final classes to make them more future-proof.
|
||||
-->
|
||||
<property name="tokens"
|
||||
value="VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
|
||||
</module>
|
||||
|
||||
<!--
|
||||
IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
|
||||
code and some useful code. So we need to fall back to Regexp.
|
||||
-->
|
||||
<module name="RegexpSinglelineJava">
|
||||
<property name="format" value="^import com.google.common.base.Preconditions;$"/>
|
||||
<property name="message" value="Static import functions from Guava Preconditions"/>
|
||||
</module>
|
||||
|
||||
<module name="UnusedImports">
|
||||
<property name="severity" value="error"/>
|
||||
<property name="processJavadoc" value="true"/>
|
||||
<message key="import.unused"
|
||||
value="Unused import: {0}."/>
|
||||
</module>
|
||||
|
||||
<!--
|
||||
|
||||
NAMING CHECKS
|
||||
|
||||
-->
|
||||
|
||||
<!-- Item 38 - Adhere to generally accepted naming conventions -->
|
||||
|
||||
<module name="PackageName">
|
||||
<!-- Validates identifiers for package names against the
|
||||
supplied expression. -->
|
||||
<!-- Here the default checkstyle rule restricts package name parts to
|
||||
seven characters, this is not in line with common practice at Google.
|
||||
-->
|
||||
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="TypeNameCheck">
|
||||
<!-- Validates static, final fields against the
|
||||
expression "^[A-Z][a-zA-Z0-9]*$". -->
|
||||
<metadata name="altname" value="TypeName"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="ConstantNameCheck">
|
||||
<!-- Validates non-private, static, final fields against the supplied
|
||||
public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". -->
|
||||
<metadata name="altname" value="ConstantName"/>
|
||||
<property name="applyToPublic" value="true"/>
|
||||
<property name="applyToProtected" value="true"/>
|
||||
<property name="applyToPackage" value="true"/>
|
||||
<property name="applyToPrivate" value="true"/>
|
||||
<property name="format" value="^([A-Z][A-Z0-9]*(_[A-Z0-9]+)*|FLAG_.*)$"/>
|
||||
<message key="name.invalidPattern"
|
||||
value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="StaticVariableNameCheck">
|
||||
<!-- Validates static, non-final fields against the supplied
|
||||
expression "^[a-z][a-zA-Z0-9]*_?$". -->
|
||||
<metadata name="altname" value="StaticVariableName"/>
|
||||
<property name="applyToPublic" value="true"/>
|
||||
<property name="applyToProtected" value="true"/>
|
||||
<property name="applyToPackage" value="true"/>
|
||||
<property name="applyToPrivate" value="true"/>
|
||||
<property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="MemberNameCheck">
|
||||
<!-- Validates non-static members against the supplied expression. -->
|
||||
<metadata name="altname" value="MemberName"/>
|
||||
<property name="applyToPublic" value="true"/>
|
||||
<property name="applyToProtected" value="true"/>
|
||||
<property name="applyToPackage" value="true"/>
|
||||
<property name="applyToPrivate" value="true"/>
|
||||
<property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="MethodNameCheck">
|
||||
<!-- Validates identifiers for method names. -->
|
||||
<metadata name="altname" value="MethodName"/>
|
||||
<property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="ParameterName">
|
||||
<!-- Validates identifiers for method parameters against the
|
||||
expression "^[a-z][a-zA-Z0-9]*$". -->
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="LocalFinalVariableName">
|
||||
<!-- Validates identifiers for local final variables against the
|
||||
expression "^[a-z][a-zA-Z0-9]*$". -->
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="LocalVariableName">
|
||||
<!-- Validates identifiers for local variables against the
|
||||
expression "^[a-z][a-zA-Z0-9]*$". -->
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<!--
|
||||
|
||||
LENGTH and CODING CHECKS
|
||||
|
||||
-->
|
||||
|
||||
<!-- Checks for braces around if and else blocks -->
|
||||
<module name="NeedBraces">
|
||||
<property name="severity" value="error"/>
|
||||
<property name="tokens"
|
||||
value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/>
|
||||
</module>
|
||||
|
||||
<module name="UpperEll">
|
||||
<!-- Checks that long constants are defined with an upper ell.-->
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="FallThrough">
|
||||
<!-- Warn about falling through to the next case statement. Similar to
|
||||
javac -Xlint:fallthrough, but the check is suppressed if a single-line comment
|
||||
on the last non-blank line preceding the fallen-into case contains 'fall through' (or
|
||||
some other variants that we don't publicized to promote consistency).
|
||||
-->
|
||||
<property name="reliefPattern"
|
||||
value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<!-- Checks for over-complicated boolean expressions. -->
|
||||
<module name="SimplifyBooleanExpression"/>
|
||||
|
||||
<!-- Detects empty statements (standalone ";" semicolon). -->
|
||||
<module name="EmptyStatement"/>
|
||||
|
||||
<!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
|
||||
<module name="RegexpSinglelineJava">
|
||||
<property name="format" value=";{2,}"/>
|
||||
<property name="message" value="Use one semicolon"/>
|
||||
<property name="ignoreComments" value="true"/>
|
||||
</module>
|
||||
|
||||
<!--
|
||||
|
||||
MODIFIERS CHECKS
|
||||
|
||||
-->
|
||||
|
||||
<module name="ModifierOrder">
|
||||
<!-- Warn if modifier order is inconsistent with JLS3 8.1.1, 8.3.1, and
|
||||
8.4.3. The prescribed order is:
|
||||
public, protected, private, abstract, static, final, transient, volatile,
|
||||
synchronized, native, strictfp
|
||||
-->
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
|
||||
<!--
|
||||
|
||||
WHITESPACE CHECKS
|
||||
|
||||
-->
|
||||
|
||||
<module name="EmptyLineSeparator">
|
||||
<!-- Checks for empty line separator between tokens. The only
|
||||
excluded token is VARIABLE_DEF, allowing class fields to
|
||||
be declared on consecutive lines.
|
||||
-->
|
||||
<property name="allowMultipleEmptyLines" value="false"/>
|
||||
<property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
|
||||
<property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF,
|
||||
INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
|
||||
CTOR_DEF"/>
|
||||
</module>
|
||||
|
||||
<module name="SingleSpaceSeparator"/>
|
||||
|
||||
<module name="WhitespaceAround">
|
||||
<!-- Checks that various tokens are surrounded by whitespace.
|
||||
This includes most binary operators and keywords followed
|
||||
by regular or curly braces.
|
||||
-->
|
||||
<property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR,
|
||||
BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN,
|
||||
EQUAL, GE, GT, LAMBDA, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE,
|
||||
LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN,
|
||||
LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS,
|
||||
MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION,
|
||||
SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="WhitespaceAfter">
|
||||
<!-- Checks that commas, semicolons and typecasts are followed by
|
||||
whitespace.
|
||||
-->
|
||||
<property name="tokens" value="COMMA, SEMI, TYPECAST"/>
|
||||
</module>
|
||||
|
||||
<module name="NoWhitespaceAfter">
|
||||
<!-- Checks that there is no whitespace after various unary operators.
|
||||
Linebreaks are allowed.
|
||||
-->
|
||||
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS,
|
||||
UNARY_PLUS"/>
|
||||
<property name="allowLineBreaks" value="true"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="NoWhitespaceBefore">
|
||||
<!-- Checks that there is no whitespace before various unary operators.
|
||||
Linebreaks are allowed.
|
||||
-->
|
||||
<property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/>
|
||||
<property name="allowLineBreaks" value="true"/>
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
<module name="OperatorWrap">
|
||||
<!-- Checks that assignment operators are at the end of the line. -->
|
||||
<property name="option" value="eol"/>
|
||||
<property name="tokens" value="ASSIGN"/>
|
||||
</module>
|
||||
|
||||
<module name="ParenPad">
|
||||
<!-- Checks that there is no whitespace before close parens or after
|
||||
open parens.
|
||||
-->
|
||||
<property name="severity" value="error"/>
|
||||
</module>
|
||||
|
||||
</module>
|
||||
</module>
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<FindBugsFilter>
|
||||
<Match>
|
||||
<Bug pattern="EI_EXPOSE_REP"/>
|
||||
</Match>
|
||||
<Match>
|
||||
<Bug pattern="EI_EXPOSE_REP2"/>
|
||||
</Match>
|
||||
<Match>
|
||||
<Bug pattern="SE_NO_SERIALVERSIONID"/>
|
||||
</Match>
|
||||
<Match>
|
||||
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
|
||||
</Match>
|
||||
<Match>
|
||||
<Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/>
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
@@ -1,12 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<!DOCTYPE suppressions PUBLIC
|
||||
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
|
||||
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
|
||||
|
||||
<suppressions>
|
||||
<!-- target directory is not relevant for checkstyle -->
|
||||
<suppress
|
||||
files="[\\/]target[\\/]"
|
||||
checks=".*"/>
|
||||
|
||||
</suppressions>
|
||||
Reference in New Issue
Block a user