Merge branch 'feature/easy-refactor' into 'develop'

[GAL-602] refactor: refactor this module based on the Easy Stream framework.

See merge request galaxy/tsg_olap/sip-rtp-correlation!26
This commit is contained in:
梁超
2024-07-01 07:56:41 +00:00
62 changed files with 3120 additions and 2490 deletions

47
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,47 @@
image: 192.168.40.153:9080/common/maven:3.8.1-openjdk-11-slim-with-git
variables:
MAVEN_CLI_OPTS: "--batch-mode --errors --show-version"
stages:
- check
- test
- build
snapshot-version:
stage: check
script:
- mvn $MAVEN_CLI_OPTS enforcer:enforce@snapshot-version-check
rules:
- if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" && $CI_PIPELINE_SOURCE == "merge_request_event"
non-snapshot-version:
stage: check
script:
- mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check
- |-
if `mvn $MAVEN_CLI_OPTS dependency:get@release-deploy-check > /dev/null 2>&1`; then
echo "The current version has been deployed."
exit 1
else
echo "The current version has not been deployed."
fi
rules:
- if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /(^master$|^release\/)/ && $CI_PIPELINE_SOURCE == "merge_request_event"
test:
stage: test
script:
- mvn $MAVEN_CLI_OPTS clean test
only:
- merge_requests
# Used for building snapshot versions on the develop branch.
build:
stage: build
script:
- echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
- mvn clean site deploy -DskipTests
only:
- master
- /^release\//

View File

@@ -1,8 +0,0 @@
# Changelog
### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Feature
- 输出 SIP Record
- [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中。

View File

@@ -17,27 +17,9 @@ mvn clean package
使用以下命令运行Flink任务
```shell
flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<version>.jar application.properties
flink run -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml
```
## 配置项说明
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
## 贡献
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。

659
pom.xml
View File

@@ -5,23 +5,24 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<groupId>com.geedgenetworks.application</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2.2</version>
<version>2.0-SNAPSHOT</version>
<name>Flink : SIP-RTP : Correlation</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.13.6</flink.version>
<easy.stream.version>1.3-SNAPSHOT</easy.stream.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<jackson.version>2.13.2.20220328</jackson.version>
<junit.version>5.8.0</junit.version>
</properties>
<distributionManagement>
@@ -34,42 +35,44 @@
<id>platform-snapshots</id>
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
</snapshotRepository>
<site>
<id>platform-site</id>
<url>
dav:http://192.168.40.153:8099/content/sites/platform-site/platform/application/sip-rtp-correlate-${project.version}
</url>
</site>
</distributionManagement>
<repositories>
<repository>
<id>central</id>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
<id>snapshots</id>
<url>http://192.168.40.153:8099/content/groups/public</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-influxdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
@@ -77,54 +80,132 @@
<artifactId>galaxy</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<groupId>xyz.downgoon</groupId>
<artifactId>snowflake</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Easy Stream -->
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-common</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<scope>test</scope>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-filter-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-split-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-correlate-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-union-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-kafka-connector</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-text-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-socket-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-json-format</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-flink-shim</artifactId>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>runtime</scope>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- DEV -->
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.4.2</version>
</dependency>
<!-- LOG -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -134,30 +215,43 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Test -->
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- Common -->
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
@@ -171,170 +265,236 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Easy Stream-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.0</version>
<scope>test</scope>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-common</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-core</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-filter-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-kafka-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-json-format</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-flink-shim</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<suppressionsLocation>${basedir}/tools/maven/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<configLocation>${basedir}/tools/maven/checkstyle.xml</configLocation>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.40</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>test-sources</id>
<phase>generate-test-sources</phase>
<id>java-style-check</id>
<phase>compile</phase>
<goals>
<goal>add-test-source</goal>
<goal>check</goal>
</goals>
<configuration>
<sources>
<source>src/it/java</source>
</sources>
<sourceDirectories>src/main/java</sourceDirectories>
</configuration>
</execution>
<execution>
<id>test-resources</id>
<phase>generate-test-resources</phase>
<id>java-test-style-check</id>
<phase>test-compile</phase>
<goals>
<goal>add-test-resource</goal>
<goal>check</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>src/it/resources</directory>
</resource>
</resources>
<testSourceDirectories>src/test/java</testSourceDirectories>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>4.4.2.2</version>
<configuration>
<xmlOutput>true</xmlOutput>
<!-- Low, Medium, High ('Low' is strictest) -->
<threshold>Low</threshold>
<effort>default</effort>
<spotbugsXmlOutputDirectory>${project.build.directory}/spotbugs</spotbugsXmlOutputDirectory>
<excludeFilterFile>${basedir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
<failOnError>true</failOnError>
</configuration>
<executions>
<execution>
<id>findbugs-main</id>
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>findbugs-test</id>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<includeTests>true</includeTests>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-tests-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.outputDirectory}/tests-lib</outputDirectory>
<excludeScope>system</excludeScope>
<excludeTransitive>false</excludeTransitive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>pre-unit-test</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacoco.exec</destFile>
</configuration>
</execution>
<execution>
<id>test-report</id>
<phase>verify</phase>
<goals>
<goal>report</goal>
</goals>
<configuration>
<dataFile>${project.build.directory}/jacoco.exec</dataFile>
<outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<version>3.5.1</version>
<executions>
<execution>
<id>default-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<finalName>${project.artifactId}-${project.version}</finalName>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.mockito:mockito-core</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
@@ -343,50 +503,151 @@
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>build-jobs</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-yml-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>tools/dist/target.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>site-resources</id>
<phase>pre-site</phase>
<goals>
<goal>resources</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>src/site</directory>
<filtering>true</filtering>
<includes>
<include>**</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<configuration>
<skip>false</skip>
</configuration>
<executions>
<execution>
<id>default-site</id>
<goals>
<goal>site</goal>
</goals>
<phase>site</phase>
<configuration>
<siteDirectory>${project.build.outputDirectory}</siteDirectory>
</configuration>
</execution>
<execution>
<id>site-deploy</id>
<goals>
<goal>stage-deploy</goal>
</goals>
<phase>deploy</phase>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version>
<executions>
<execution>
<id>release-version-check</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireReleaseVersion>
<message>SNAPSHOT versions ${project.version} are not allowed.</message>
</requireReleaseVersion>
</rules>
</configuration>
</execution>
<execution>
<id>snapshot-version-check</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireSnapshotVersion>
<message>Non-SNAPSHOT versions ${project.version} are not allowed.</message>
</requireSnapshotVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.7</version>
<artifactId>maven-site-plugin</artifactId>
<version>3.9.1</version>
<configuration>
<outputDirectory>${project.build.directory}/site</outputDirectory>
<relativizeDecorationLinks>false</relativizeDecorationLinks>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-webdav-jackrabbit</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-markdown</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.3.0</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.6.0</version>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.1</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@@ -1,95 +0,0 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Objects;
// Integration test main class
public class CorrelateTest {
public static void main(String[] args) throws Exception {
final Configuration envConfig = new Configuration();
envConfig.setInteger("rest.port", 18081);
envConfig.setBoolean("rest.flamegraph.enabled", true);
envConfig.setString("metrics.reporter.influxdb.factory.class", "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory");
envConfig.setString("metrics.reporter.influxdb.scheme", "http");
envConfig.setString("metrics.reporter.influxdb.host", "127.0.0.1");
envConfig.setInteger("metrics.reporter.influxdb.port", 8086);
envConfig.setString("metrics.reporter.influxdb.db", "flinks");
envConfig.setString("metrics.reporter.influxdb.interval", "5 SECONDS");
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(envConfig);
final ParameterTool tool = ParameterTool.fromPropertiesFile(
args.length > 0 ? args[0] :
Objects.requireNonNull(
CorrelateTest.class.getResource("/application-test.properties")).getPath()
);
final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
env.enableCheckpointing(Time.minutes(1)
.toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds());
checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir"));
/* checkpointConfig.setCheckpointStorage(
Objects.requireNonNull(
CorrelateTest.class.getResource("/")).toString());*/
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
env.setParallelism(8);
final SingleOutputStreamOperator<ObjectNode> sourceStream = env.addSource(new DataGenSource())
.name("DataGenSource")
.setParallelism(4);
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
sourceStream
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(new SIPKeySelector())
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
voIpOperator.addSink(new DoNothingSink())
.name("DoNothingSink")
.setParallelism(1);
env.execute("VoIP Fusion Job");
}
}

View File

@@ -1,22 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
// Test Configs.
public class TestConfigs {
// Ratio of valuable data in the generated dataset
public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION =
ConfigOptions.key("valuable.data.ratio")
.intType()
.defaultValue(40)
.withDescription("Ratio of valuable data in the generated dataset.");
// QPS of generate date record
public static final ConfigOption<Long> DATA_GENERATE_RATE =
ConfigOptions.key("data.generate.rate")
.longType()
.defaultValue(1000L)
.withDescription("QPS of generate date record.");
}

View File

@@ -1,46 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.TestConfigs;
import com.zdjizhi.flink.voip.data.RTPGenerator;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
// Date Generate Source.
public class DataGenSource extends RichParallelSourceFunction<ObjectNode> implements FunctionHelper {
private transient SIPGenerator sipGenerator;
private transient RTPGenerator rtpGenerator;
private transient RateLimiter rateLimiter;
private volatile boolean running;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Integer ratio = getGlobalConfiguration()
.get(TestConfigs.VALUABLE_DATA_PROPORTION);
this.sipGenerator = new SIPGenerator(ratio);
this.rtpGenerator = new RTPGenerator(ratio, sipGenerator);
final Long rate = getGlobalConfiguration()
.get(TestConfigs.DATA_GENERATE_RATE);
this.rateLimiter = RateLimiter.create(
(int) (rate / getRuntimeContext().getNumberOfParallelSubtasks()));
this.running = true;
}
@Override
public void run(SourceContext<ObjectNode> ctx) throws Exception {
while (running) {
rateLimiter.acquire();
ctx.collect(sipGenerator.next());
ctx.collect(rtpGenerator.next());
}
}
@Override
public void cancel() {
this.running = false;
}
}

View File

@@ -1,43 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
// It dose nothing with the incoming data and simply collects metrics for the number of
// RTP and VoIP records processed per second.
public class DoNothingSink extends RichSinkFunction<ObjectNode> {
private transient MeterView numRTPRecordsPreSecond;
private transient MeterView numVoIPRecordsPreSecond;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext runtimeContext = getRuntimeContext();
MetricGroup metricGroup = runtimeContext.getMetricGroup();
numRTPRecordsPreSecond = metricGroup
.meter("numRTPRecordsPreSecond", new MeterView(1));
numVoIPRecordsPreSecond = metricGroup
.meter("numVoIPRecordsPreSecond", new MeterView(1));
}
@Override
public void invoke(ObjectNode obj, Context context) throws Exception {
Record record = new Record(obj);
switch (record.getSchemaType()) {
case RTP:
numRTPRecordsPreSecond.markEvent();
break;
case VOIP:
numVoIPRecordsPreSecond.markEvent();
break;
default:
}
}
}

View File

@@ -1,4 +0,0 @@
sip.state.clear.interval.minutes=1
rtp.state.clear.interval.minutes=10
valuable.data.ratio=20
data.generate.rate=1000

View File

@@ -1,25 +0,0 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.test.name = TestLogger
appender.test.type = CONSOLE
appender.test.layout.type = PatternLayout
appender.test.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

View File

@@ -0,0 +1,33 @@
package com.geedgenetworks.flink.easy.application.voip;
import com.geedgenetworks.flink.easy.application.voip.udf.*;
import com.geedgenetworks.flink.easy.common.api.UDFFactory;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.Map;
public class VoipUDFFactory implements UDFFactory {
private static final Map<String, UserDefinedFunction> R =
new HashMap<>() {{
put("IS_IP_ADDRESS", new IsIpAddress());
put("IS_INTERNAL_IP_ADDRESS", new IsInternalIpAddress());
put("IS_EXTERNAL_IP_ADDRESS", new IsExternalIpAddress());
put("HAS_IP_ADDRESS", new HasIpAddress());
put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress());
put("STREAM_DIR", new StreamDir());
put("FIND_NOT_BLANK", new FindNotBlank());
put("SORT_ADDRESS", new SortAddress());
put("SNOWFLAKE_ID", new SnowflakeID());
}};
@Override
public Map<String, UserDefinedFunction> register() {
return R;
}
}

View File

@@ -0,0 +1,15 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class FindNotBlank extends ScalarFunction {
public @DataTypeHint("STRING") String eval(String s1, String s2) {
if (StringUtils.isBlank(s1) && StringUtils.isNotBlank(s2)) {
return s2;
}
return s1;
}
}

View File

@@ -0,0 +1,19 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class HasExternalIpAddress extends ScalarFunction {
private final IsExternalIpAddress isExternalIpAddress = new IsExternalIpAddress();
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
if (null == ipaddr) {
return false;
}
for (var ip : ipaddr) {
return isExternalIpAddress.eval(ip);
}
return false;
}
}

View File

@@ -0,0 +1,18 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class HasIpAddress extends ScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
if (null == ipaddr) {
return false;
}
for (var ip : ipaddr) {
return ip != null && IPUtil.isIPAddress(ip);
}
return false;
}
}

View File

@@ -0,0 +1,17 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import static com.zdjizhi.utils.IPUtil.isIPAddress;
public class IsExternalIpAddress extends ScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (ipaddr == null || !isIPAddress(ipaddr)) {
return false;
}
return !IPUtil.internalIp(ipaddr);
}
}

View File

@@ -0,0 +1,17 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import static com.zdjizhi.utils.IPUtil.isIPAddress;
public class IsInternalIpAddress extends ScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (!isIPAddress(ipaddr)) {
return false;
}
return IPUtil.internalIp(ipaddr);
}
}

View File

@@ -0,0 +1,15 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class IsIpAddress extends ScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (null == ipaddr) {
return false;
}
return IPUtil.isIPAddress(ipaddr);
}
}

View File

@@ -0,0 +1,14 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import xyz.downgoon.snowflake.Snowflake;
public class SnowflakeID extends ScalarFunction {
private static final Snowflake SNOWFLAKE = new Snowflake(1, 1);
public @DataTypeHint("BIGINT") Long eval() {
return SNOWFLAKE.nextId();
}
}

View File

@@ -0,0 +1,31 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.google.common.collect.Lists;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class SortAddress extends ScalarFunction {
public @DataTypeHint("STRING")
String eval(
String ip1, Integer port1, String ip2, Integer port2) {
return of(Tuple2.of(ip1, port1), Tuple2.of(ip2, port2));
}
public static String of(
Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
var list = Lists.newArrayList(a1, a2);
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return Long.compare(
IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
} else {
return a.f1.compareTo(b.f1);
}
});
return String.format("%s:%s,%s:%s",
list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
}
}

View File

@@ -0,0 +1,26 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class StreamDir extends ScalarFunction {
public @DataTypeHint("INT") Integer eval(Long flags) {
int v = 0;
if (flags == null) {
return v;
}
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return v;
}
public static void main(String[] args) {
System.out.println(8192L + 16384L);
System.out.println(new StreamDir().eval(8192L + 16384L));
}
}

View File

@@ -1,108 +0,0 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/**
* The main class for running the SIP RTP Correlation application.
*
* @author chaoc
* @since 1.0
*/
public class CorrelateApp {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// param check
if (args.length < 1) {
throw new IllegalArgumentException("Error: Not found properties path. " +
"\nUsage: flink -c xxx xxx.jar app.properties.");
}
final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
final FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC),
new JsonNodeDeserializationSchema(),
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
errorHandler.filterError(sourceStream)
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(new SIPKeySelector())
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
config.get(SINK_KAFKA_TOPIC),
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute(config.get(JOB_NAME));
}
}

View File

@@ -1,107 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Containing configuration options for the Fusion application.
*
* @author chaoc
* @since 1.0
*/
public class FusionConfigs {
/**
* The prefix for Kafka properties used in the source.
*/
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
/**
* The prefix for Kafka properties used in the sink.
*/
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
/**
* Configuration prefix for the properties of the Kafka sink where the error data will be output.
*/
public static final String ERROR_SINK_KAFKA_PROPERTIES_PREFIX = "error.sink.kafka.props.";
/**
* Configuration option for the Kafka topic used in the source.
*/
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the source.");
/**
* Configuration option for the Kafka topic used in the sink.
*/
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
/**
* Configuration option to enable or disable the output of error records.
* If set to true, the error records will be sent to the specified Kafka topic.
* Default value is false.
*/
public static final ConfigOption<Boolean> ERROR_RECORDS_OUTPUT_ENABLE =
ConfigOptions.key("error.records.output.enable")
.booleanType()
.defaultValue(false)
.withDescription("Enable or disable the output of error records. " +
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether to perform data correlate for intranet addresses.
*/
public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
ConfigOptions.key("include.intranet.ip")
.booleanType()
.defaultValue(true)
.withDescription("Whether to perform data correlate for intranet addresses");
/**
* Configuration option for specifying the Kafka topic name where the error data will be sent.
* This configuration option is used when the output of error records is enabled.
*/
public static final ConfigOption<String> ERROR_SINK_KAFKA_TOPIC =
ConfigOptions.key("error.sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic name where the error records will be sent.");
/**
* The configuration option for the interval at which SIP state data
* should be cleared.
*/
public static final ConfigOption<Integer> SIP_STATE_CLEAR_INTERVAL =
ConfigOptions.key("sip.state.clear.interval.minutes")
.intType()
.defaultValue(1)
.withDescription("The interval at which SIP state data should be cleared.");
/**
* The configuration option for the interval at which STP state data
* should be cleared.
*/
public static final ConfigOption<Integer> RTP_STATE_CLEAR_INTERVAL =
ConfigOptions.key("rtp.state.clear.interval.minutes")
.intType()
.defaultValue(6)
.withDescription("The interval at which RTP state data should be cleared.");
/**
* Configuration option for specifying the name of a job.
*/
public static final ConfigOption<String> JOB_NAME =
ConfigOptions.key("job.name")
.stringType()
.defaultValue("correlation_sip_rtp_session")
.withDescription("The name of current job.");
}

View File

@@ -1,44 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
/**
* A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
* properties with a specific prefix. This class allows retrieving properties that start with the
* given `prefix` and converts them into a `java.util.Properties` object.
*
* @author chaoc
* @since 1.0
*/
public class FusionConfiguration {
private final Configuration config;
public FusionConfiguration(final Configuration config) {
this.config = config;
}
/**
* Retrieves properties from the underlying `Configuration` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
public Properties getProperties(final String prefix) {
if (prefix == null) {
final Properties props = new Properties();
props.putAll(config.toMap());
return props;
}
return config.toMap()
.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.collect(Properties::new, (props, e) ->
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
Properties::putAll);
}
}

View File

@@ -1,177 +0,0 @@
package com.zdjizhi.flink.voip.error;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.FunctionHelper;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
*
* @author chaoc
* @since 1.0
*/
public class ErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(ErrorHandler.class);
/**
* The OutputTag for invalid records.
*/
public static final OutputTag<ObjectNode> INVALID_OUTPUT_TAG =
new OutputTag<>("invalid-records", TypeInformation.of(ObjectNode.class));
private final Configuration config;
/**
* Creates a new ErrorHandler instance with the given configuration.
*
* @param config The configuration containing settings for error handling.
*/
public ErrorHandler(final Configuration config) {
this.config = config;
}
/**
* Filters out error records from the input data stream and outputs them to a separate stream if enabled.
*
* @param dataStream The input data stream of ObjectNode records.
* @return A new DataStream containing valid records without errors.
*/
public DataStream<ObjectNode> filterError(final DataStream<ObjectNode> dataStream) {
// Process the data stream to identify meaningless addresses and ports
final SingleOutputStreamOperator<ObjectNode> operator = dataStream
.process(new MeaninglessAddressProcessFunction())
.name("MeaninglessRecords")
.uid("meaningless-records");
// If enabled, output the error records to the specified Kafka topic
if (config.get(FusionConfigs.ERROR_RECORDS_OUTPUT_ENABLE)) {
final String topic = config.get(FusionConfigs.ERROR_SINK_KAFKA_TOPIC);
LOG.info("Meaningless data output is enabled, data will be sent to: Topic [{}]", topic);
final DataStream<ObjectNode> errorStream = operator
.getSideOutput(INVALID_OUTPUT_TAG);
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
topic,
new JsonNodeSerializationSchema(),
new FusionConfiguration(config)
.getProperties(FusionConfigs.ERROR_SINK_KAFKA_PROPERTIES_PREFIX)
);
errorStream.addSink(producer);
}
return operator;
}
}
/**
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
*/
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean includeIntranetIp;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
}
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
// and not internal network addresses.
|| cond5 && cond6
|| !cond5)) {
out.collect(obj);
} else {
// Output invalid records to the invalid output tag
ctx.output(ErrorHandler.INVALID_OUTPUT_TAG, obj);
if (LOG.isDebugEnabled()) {
LOG.debug("Meaningless record: {}", obj);
}
}
}
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
public static <T, R> R executeSafely(Function<T, R> function, T v) {
try {
return function.apply(v);
} catch (Exception e) {
return null;
}
}
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;
}
return IPUtil.isIPAddress(ipaddr);
}
private static boolean isInternalIp(final String ipaddr) {
if (!isIPAddress(ipaddr)) {
return false;
}
return IPUtil.internalIp(ipaddr);
}
}

View File

@@ -1,20 +0,0 @@
package com.zdjizhi.flink.voip.formats;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(ObjectNode jsonNodes) {
try {
return mapper.writeValueAsBytes(jsonNodes);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,54 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.google.common.collect.Lists;
import com.zdjizhi.utils.IPUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
/**
* A pojo class representing an address with two IP and port pairs.
*
* @author chaoc
* @since 1.0
*/
@Data
@AllArgsConstructor
public class Address {
// The first IP address.
private final String ip1;
//The first port number.
private final int port1;
//The second IP address.
private final String ip2;
//The second port number.
private final int port2;
/**
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
* the numeric value of the IP address.
*
* @param a1 The first address information as a tuple (IP address, port).
* @param a2 The second address information as a tuple (IP address, port).
* @return An Address instance with addresses sorted and reordered.
*/
public static Address of(Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
List<Tuple2<String, Integer>> list = Lists.newArrayList(a1, a2);
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
} else {
return a.f1.compareTo(b.f1);
}
});
return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
}
}

View File

@@ -1,32 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
/**
* An interface that provides utility functions for Flink functions.
*
* @author chaoc
* @since 1.0
*/
public interface FunctionHelper extends RichFunction {
/**
* Get the global configuration for the current Flink job.
*
* @return The global configuration as a Configuration object.
*/
default Configuration getGlobalConfiguration() {
final ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
return (Configuration) globalJobParameters;
}
default void registerNextFireTimestamp(TimerService timerService, long interval) {
long current = timerService.currentWatermark();
timerService.registerEventTimeTimer(current + interval);
}
}

View File

@@ -1,28 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Setter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A case class representing an ObjectNode with an expiration time and a pair times.
*
* @author chaoc
* @since 1.0
*/
@Data
@AllArgsConstructor
class ObjectNodeInfo {
// The ObjectNode containing data.
private ObjectNode obj;
// The pair times for the object.
@Setter
private int times;
public void incTimes() {
this.times = this.times + 1;
}
}

View File

@@ -1,31 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A KeySelector implementation for extracting a composite key from an ObjectNode representing a SIP record.
*
* @author chaoc
* @since 1.0
*/
public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> {
/**
* Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple3 containing the extracted key (VSysID, CallID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception {
final SIPRecord record = new SIPRecord(obj);
final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()),
Tuple2.of(record.getServerIp(), record.getServerPort()));
return Tuple3.of(record.getVSysID(), record.getCallID(), address);
}
}

View File

@@ -1,91 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
* SIP records are paired when they have the same addresses but opposite stream directions.
*
* @author chaoc
* @since 1.0
*/
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final ValueStateDescriptor<ObjectNode> descriptor =
new ValueStateDescriptor<>("sip-state", ObjectNode.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
descriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(ObjectNode value,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(value);
// When SIP is a one-way stream.
if (StreamDir.DOUBLE != record.getStreamDir()) {
// If the address is already stored in the mapState and has the opposite stream direction,
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
final ObjectNode obj = valueState.value();
if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) {
record.merge(obj)
.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the valueState.
valueState.update(value);
}
} else {
// If SIP is a double stream, pairing isn't required, directly output the record.
out.collect(value);
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNode value = valueState.value();
if (value != null) {
ctx.output(SIP_OUTPUT_TAG, value);
}
valueState.clear();
}
}

View File

@@ -1,45 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
*
* @author chaoc
* @since 1.0
*/
public class TypeSplitFunction extends ProcessFunction<ObjectNode, ObjectNode> {
/**
* OutputTag for SIP records.
*/
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("schema-type-sip", TypeInformation.of(ObjectNode.class));
/**
* OutputTag for RTP records.
*/
public static final OutputTag<ObjectNode> RTP_OUTPUT_TAG =
new OutputTag<>("schema-type-rtp", TypeInformation.of(ObjectNode.class));
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
switch (record.getSchemaType()) {
case RTP:
ctx.output(RTP_OUTPUT_TAG, obj);
break;
case SIP:
ctx.output(SIP_OUTPUT_TAG, obj);
break;
default:
}
}
}

View File

@@ -1,40 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A KeySelector implementation that extracts the key(VSysID) from an ObjectNode.
*
* @author chaoc
* @since 1.0
*/
public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> {
/**
* Extracts the composite key (VSysID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple2 containing the extracted key (VSysID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception {
final Record record = new Record(obj);
final Address address;
if (record.getSchemaType() == SchemaType.SIP) {
final SIPRecord sipRecord = new SIPRecord(obj);
address = Address.of(
Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()),
Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort()));
} else {
address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
Tuple2.of(record.getClientIp(), record.getClientPort()));
}
return Tuple2.of(record.getVSysID(), address);
}
}

View File

@@ -1,180 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
* RTP records, and it uses timers to trigger regular clearing of the state.
*
* @author chaoc
* @since 1.0
*/
public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>
implements FunctionHelper {
private static final int MAX_RTP_LINES = 2;
private transient Time fireInterval;
private transient ValueState<ObjectNodeInfo> sipState;
private transient MapState<StreamDir, ObjectNode> rtpState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final RuntimeContext context = getRuntimeContext();
final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
sipDescriptor.enableTimeToLive(ttlConfig);
sipState = context.getState(sipDescriptor);
final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor =
new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class);
final StateTtlConfig rtpTtlConfig = StateTtlConfig
.newBuilder(Time.minutes(minutes + 1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
rtpDescriptor.enableTimeToLive(rtpTtlConfig);
rtpState = context.getMapState(rtpDescriptor);
}
// SIP
@Override
public void processElement2(ObjectNode sipObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator();
if (rtpState.isEmpty()) {
sipState.update(new ObjectNodeInfo(sipObj, 0));
}
while (iterator.hasNext()) {
final Map.Entry<StreamDir, ObjectNode> entry = iterator.next();
final ObjectNode rtpObj = entry.getValue();
final Record rtpRecord = new Record(rtpObj);
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C:
case C2S:
ObjectNodeInfo info = sipState.value();
if (info != null) {
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
} else {
sipState.update(new ObjectNodeInfo(sipObj, info.getTimes()));
}
} else {
sipState.update(new ObjectNodeInfo(sipObj, 1));
}
break;
default:
// Double directional:
// In the context of VoIP fusion, only one RTP double directional stream
sipState.clear();
}
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
// RTP
@Override
public void processElement1(ObjectNode rtpObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record rtpRecord = new Record(rtpObj);
final ObjectNodeInfo info = sipState.value();
final StreamDir streamDir = rtpRecord.getStreamDir();
if (null != info) {
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
rtpRecord.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
switch (streamDir) {
case C2S:
case S2C:
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
}
break;
default:
// Double
sipState.clear();
}
} else {
rtpState.put(streamDir, rtpObj);
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
for (ObjectNode obj : rtpState.values()) {
final Record rtpRecord = new Record(obj);
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
out.collect(obj);
}
rtpState.clear();
sipState.clear();
}
// ======================================================================
// PRIVATE HELPER
// ======================================================================
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
return;
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
return;
}
}
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
}
}

View File

@@ -1,27 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class RTPRecord extends Record {
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
public RTPRecord(ObjectNode obj) {
super(obj);
}
@Getter
public enum OriginatorDir {
UNKNOWN(0),
C2S(1),
S2C(2);
private final int code;
OriginatorDir(int code) {
this.code = code;
}
}
}

View File

@@ -1,224 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
/**
* Record class represents a data record with various fields.
* <p>
* It provides getter and setter methods for accessing the fields of the data record.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
public static final String F_COMMON_VSYS_ID = "vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
/**
* 字段名:数据记录中的流类型的 Flags
*/
public static final String F_FLAGS = "flags";
/**
* 字段名:数据记录中的服务端地址
*/
public static final String F_COMMON_SERVER_IP = "server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
public static final String F_COMMON_SERVER_PORT = "server_port";
/**
* 字段名:数据记录中的客户端地址
*/
public static final String F_COMMON_CLIENT_IP = "client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
public static final String F_COMMON_CLIENT_PORT = "client_port";
/**
* ObjectNode data.
*/
protected final ObjectNode obj;
/**
* Get the VSys ID from the data record.
*
* @return The VSys ID as an integer.
*/
public int getVSysID() {
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
return v == 0 ? 1 : v;
}
/**
* Get the schema type from the data record.
*
* @return The schema type.
*/
public final SchemaType getSchemaType() {
return SchemaType.of(Record.getString(obj, F_COMMON_SCHEMA_TYPE));
}
/**
* Get the stream direction from the data record.
*
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
}
/**
* Get the server IP address from the data record.
*
* @return The server IP address as a string.
*/
public final String getServerIp() {
return Record.getString(obj, F_COMMON_SERVER_IP);
}
/**
* Get the server port from the data record.
*
* @return The server port as an integer.
*/
public final int getServerPort() {
return Record.getInt(obj, F_COMMON_SERVER_PORT);
}
/**
* Get the client IP address from the data record.
*
* @return The client IP address as a string.
*/
public final String getClientIp() {
return Record.getString(obj, F_COMMON_CLIENT_IP);
}
/**
* Get the client port from the data record.
*
* @return The client port as an integer.
*/
public final int getClientPort() {
return Record.getInt(obj, F_COMMON_CLIENT_PORT);
}
/**
* Set an integer value to the specified field in the data record.
*
* @param name The name of the field.
* @param value The integer value to set.
*/
public final void setInt(final String name, final int value) {
obj.set(name, IntNode.valueOf(value));
}
/**
* Set a string value to the specified field in the data record.
*
* @param name The name of the field.
* @param value The string value to set.
*/
public final void setString(final String name, final String value) {
obj.set(name, TextNode.valueOf(value));
}
/**
* Merge the fields of another ObjectNode into the current data record.
*
* @param other The ObjectNode containing the fields to be merged.
* @return This record.
*/
public final Record merge(final ObjectNode other) {
other.fields().forEachRemaining(entry -> obj.set(entry.getKey(), entry.getValue()));
return this;
}
/**
* Get an integer value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not an integer.
* @return The integer value from the field or the default value if the field is not found or is not an integer.
*/
public static int getInt(final ObjectNode obj, final String field, final int defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isInt() ? node.asInt(defaultValue) : defaultValue;
}
/**
* Get an integer value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The integer value from the field or 0 if the field is not found or is not an integer.
*/
public static int getInt(final ObjectNode obj, final String field) {
return getInt(obj, field, 0);
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a long.
* @return The long value from the field or the default value if the field is not found or is not a long.
*/
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isNumber() ? node.asLong() : defaultValue;
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The long value from the field or 0 if the field is not found or is not a long.
*/
private static long getLong(final ObjectNode obj, final String field) {
return getLong(obj, field, 0L);
}
/**
* Get a string value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a string.
* @return The string value from the field or the default value if the field is not found or is not a string.
*/
public static String getString(final ObjectNode obj, final String field, final String defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isTextual() ? node.asText(defaultValue) : defaultValue;
}
/**
* Get a string value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The string value from the field or null if the field is not found or is not a string.
*/
public static String getString(final ObjectNode obj, final String field) {
return getString(obj, field, null);
}
}

View File

@@ -1,57 +0,0 @@
package com.zdjizhi.flink.voip.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPSession Initiation Protocoldata record class, used to parse and access SIP data records.
*
* @author chaoc
* @since 1.0
*/
public class SIPRecord extends Record {
/**
* Field Name: SIP 通话的会话 ID
*/
public static final String F_CALL_ID = "sip_call_id";
/**
* Field Name: SIP 通话的协调的主叫语音传输 IP
*/
public static final String F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip";
/**
* Field Name: SIP 通话的协调的主叫语音传输端口
*/
public static final String F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port";
/**
* Field Name: SIP 通话的协调的被叫语音传输 IP
*/
public static final String F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip";
/**
* Field Name: SIP 通话的协调的被叫语音传输端口
*/
public static final String F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port";
public SIPRecord(final ObjectNode obj) {
super(obj);
}
public String getCallID() {
return Record.getString(obj, F_CALL_ID);
}
public String getOriginatorSdpConnectIp() {
return Record.getString(obj, F_ORIGINATOR_SDP_CONNECT_IP);
}
public int getOriginatorSdpMediaPort() {
return Record.getInt(obj, F_ORIGINATOR_SDP_MEDIA_PORT);
}
public String getResponderSdpConnectIp() {
return Record.getString(obj, F_RESPONDER_SDP_CONNECT_IP);
}
public int getResponderSdpMediaPort() {
return Record.getInt(obj, F_RESPONDER_SDP_MEDIA_PORT);
}
}

View File

@@ -1,51 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* The SchemaType enum represents different types of data schemas.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
@Getter
public enum SchemaType {
/**
* Represents the SIP schema type.
*/
SIP("SIP"),
/**
* Represents the RTP schema type.
*/
RTP("RTP"),
/**
* Represents the VoIP schema type.
*/
VOIP("VoIP");
/**
* The string value of the SchemaType.
*/
private final String value;
/**
* Get the SchemaType enum based on the provided string value.
*
* @param value The string value of the SchemaType to retrieve.
* @return The corresponding SchemaType enum.
* @throws IllegalArgumentException if the provided value does not match any known SchemaType.
*/
public static SchemaType of(final String value) {
for (SchemaType schemaType : values()) {
if (schemaType.value.equalsIgnoreCase(value)) {
return schemaType;
}
}
throw new IllegalArgumentException("Unknown SchemaType value '" + value + "'.");
}
}

View File

@@ -1,69 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* The StreamDir enum represents different types of data stream directions.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
@Getter
public enum StreamDir {
/**
* Represents the Client-to-Server (C2S) stream direction.
*/
C2S(1),
/**
* Represents the Server-to-Client (S2C) stream direction.
*/
S2C(2),
/**
* Represents the bidirectional (double) stream direction.
*/
DOUBLE(3);
/**
* The integer value of the StreamDir.
*/
private final int value;
/**
* Get the StreamDir enum based on the provided integer value.
*
* @param value The integer value of the StreamDir to retrieve.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir of(int value) {
for (StreamDir streamDir : values()) {
if (value == streamDir.value) {
return streamDir;
}
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
/**
* Get the StreamDir enum based on the provided flags value.
*
* @param flags The flags.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir ofFlags(long flags) {
int v = 0;
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return of(v);
}
}

View File

@@ -0,0 +1 @@
com.geedgenetworks.flink.easy.application.voip.VoipUDFFactory

View File

@@ -1,7 +0,0 @@
sink.kafka.topic=VOIP-CONVERSATION-RECORD
sink.kafka.props.bootstrap.servers=localhost:9292
source.kafka.topic=VOIP-RECORD
source.kafka.props.bootstrap.servers=localhost:9292
source.kafka.props.group.id=flink-voip-fusion

File diff suppressed because it is too large Load Diff

View File

@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
rootLogger.level = WARN
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender

View File

@@ -0,0 +1,5 @@
## Changelog
### 2.0
- [GAL-602](https://jira.geedge.net/browse/GAL-602) 基于 Easy Stream 框架的配置化改造。

View File

@@ -0,0 +1,13 @@
## Deploy
- 准备 JDK ${java.version} 的环境
- 准备 Flink ${flink.version} 的环境
- [下载](./download.html) 对应版本 UDF 依赖 Jar
- [下载](./download.html) 对应版本 Job 配置 (一个 yml 文件)
- 执行命令 `flink run -Dflink.rest.bind-port=8081 -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml`
- 您将在控制台看到启动日志,同时您可以在 `http://<you-host>:8081` 看到任务 UI。

View File

@@ -0,0 +1,8 @@
## Download
### Easy Stream ${project.version}
| UDF Jar | Job |
|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------- |
| [JAR](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.sha1) ) | [YML](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.sha1) ) |

View File

@@ -0,0 +1,10 @@
## SIP RTP Correlation
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIPSession Initiation Protocol和 RTPReal-time Transport Protocol数据将它们融合成完整的 VoIPVoice over Internet Protocol通话数据。
SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
<br/>
You can download the latest release from [Job Yml](./jobs/job.yml). And you can changelog from [CHANGELOG.md](./changelogs.html).

View File

@@ -0,0 +1,13 @@
#banner {
height: 108px;
background: none;
}
#bannerLeft img {
margin-left: 18px;
margin-top: 10px;
}
div.well {
display: none;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.8 KiB

56
src/site/site.xml Normal file
View File

@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain maven-site.vm copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project name="SIP RTP Correlate">
<bannerLeft>
<name>Easy Stream</name>
<src>images/logo.png</src>
<href>#</href>
</bannerLeft>
<publishDate position="right"/>
<version position="right"/>
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.10.0</version>
</skin>
<custom>
<fluidoSkin>
<sourceLineNumbersEnabled>true</sourceLineNumbersEnabled>
</fluidoSkin>
</custom>
<body>
<breadcrumbs position="left">
<item name="Galaxy" href="#"/>
<item name="Platform" href="#"/>
<item name="Easy Stream" href="#"/>
<item name="Application" href="#"/>
</breadcrumbs>
<menu name="OVERVIEW" inherit="top">
<item name="Introduction" href="index.html"/>
<item name="Deploy" href="deploy.html"/>
<item name="Download" href="download.html"/>
</menu>
<footer>
<![CDATA[ Copyright ©2022 <a href="#">Galaxy Platform</a>. All rights reserved.]]>
</footer>
</body>
</project>

View File

@@ -0,0 +1,33 @@
package com.geedgenetworks.flink.easy.application;
import com.geedgenetworks.flink.easy.core.Runners;
import org.junit.jupiter.api.Test;
public class ApplicationTest {
static {
System.setProperty("easy.execute.mode", "validate");
System.setProperty("flink.rest.bind-port", "8081");
// System.setProperty("flink.rest.flamegraph.enabled", "true");
System.setProperty("flink.heartbeat.timeout", "1800000");
}
public static String discoverConfiguration(final String name) throws Exception {
var path = String.format("/jobs/%s.yml", name);
var resource = ApplicationTest.class.getResource(path);
if (resource == null) {
// maven
resource = ApplicationTest.class.getResource(String.format("../classes/%s", path));
}
if (resource == null) {
throw new IllegalArgumentException(
String.format("Not found job '%s' in path [%s].", name, path));
}
return resource.getPath();
}
@Test
public void testJob() throws Exception {
Runners.run(discoverConfiguration("job"));
}
}

View File

@@ -1,67 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FusionConfigurationTest {
private FusionConfiguration fusionConfiguration;
@BeforeEach
public void setUp() {
final Configuration config;
config = new Configuration();
config.setString("prefix_key1", "value1");
config.setString("prefix_key2", "value2");
config.setString("other_key", "other_value");
fusionConfiguration = new FusionConfiguration(config);
}
@Test
public void testGetPropertiesWithValidPrefix() {
String prefix = "prefix_";
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(2, properties.size());
assertEquals("value1", properties.getProperty("key1"));
assertEquals("value2", properties.getProperty("key2"));
}
@Test
public void testGetPropertiesWithInvalidPrefix() {
String prefix = "invalid_";
Properties properties = fusionConfiguration.getProperties(prefix);
assertTrue(properties.isEmpty());
}
@Test
public void testGetPropertiesWithEmptyPrefix() {
String prefix = "";
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(3, properties.size());
assertEquals("value1", properties.getProperty("prefix_key1"));
assertEquals("value2", properties.getProperty("prefix_key2"));
assertEquals("other_value", properties.getProperty("other_key"));
}
@Test
public void testGetPropertiesWithNullPrefix() {
// Null prefix should be treated as an empty prefix
String prefix = null;
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(3, properties.size());
assertEquals("value1", properties.getProperty("prefix_key1"));
assertEquals("value2", properties.getProperty("prefix_key2"));
assertEquals("other_value", properties.getProperty("other_key"));
}
}

View File

@@ -1,98 +0,0 @@
package com.zdjizhi.flink.voip.data;
import com.github.javafaker.Faker;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
/**
* The abstract class Generator<T> serves as the base class for implementing record generators.
* It provides common functionalities for generating random data and controlling the generation ratio.
*
* @param <T> The type of records to be generated.
* @author chaoc
* @since 1.0
*/
public abstract class Generator<T> {
protected final ThreadLocalRandom random;
/**
* The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
protected final int ratio;
private final Faker faker;
protected final AtomicReference<T> state;
protected final ObjectMapper mapper;
/**
* Creates a new Generator with the given ratio.
*
* @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
public Generator(final int ratio) {
this.ratio = ratio;
this.faker = new Faker();
this.random = ThreadLocalRandom.current();
this.state = new AtomicReference<>();
this.mapper = new ObjectMapper();
}
/**
* Generates the next record based on the specified ratio and the current state.
* It randomly selects whether to generate a new record or return the last generated record.
*
* @return The next generated record of type T.
*/
public abstract T next();
/**
* Generates a new record of type T.
*
* @return The newly generated record of type T.
*/
protected abstract T generate();
/**
* Performs post-processing on the generated record of type T.
* Subclasses can override this method to modify the generated record before returning it.
*
* @param v The generated record of type T.
* @return The post-processed record of type T.
*/
protected abstract T afterState(T v);
/**
* Generates a random IP address (either IPv4 or IPv6) .
*
* @return A randomly generated IP address as a string.
*/
public final String nextIp() {
if (random.nextBoolean()) {
return faker.internet().ipV4Address();
}
return faker.internet().ipV6Address();
}
/**
* Generates a random ID number.
*
* @return A randomly generated ID number as a string.
*/
public final String nextId() {
return faker.idNumber().valid();
}
/**
* Generates a random port number within the range of 0 to 65535 (inclusive).
*
* @return A randomly generated port number as an integer.
*/
public final int nextPort() {
return random.nextInt(65535);
}
}

View File

@@ -1,111 +0,0 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* RTPGenerator extends Generator<ObjectNode> and is responsible for generating RTP records.
* It generates random RTP records with specific properties.
*
* @author chaoc
* @since 1.0
*/
public class RTPGenerator extends Generator<ObjectNode> {
private final SIPGenerator sipGenerator;
public RTPGenerator(int ratio, SIPGenerator sipGenerator) {
super(ratio);
this.sipGenerator = sipGenerator;
}
@Override
public ObjectNode next() {
int i = random.nextInt(100);
if (i < ratio) {
final ObjectNode node = sipGenerator.state.get();
if (null != node) {
final ObjectNode obj = generate();
obj.set(Record.F_COMMON_CLIENT_IP, node.get(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP));
obj.set(Record.F_COMMON_CLIENT_PORT, node.get(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT));
obj.set(Record.F_COMMON_SERVER_IP, node.get(SIPRecord.F_RESPONDER_SDP_CONNECT_IP));
obj.set(Record.F_COMMON_SERVER_PORT, node.get(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT));
return obj;
}
}
return generate();
}
@Override
protected ObjectNode generate() {
final String json = "{\n" +
" \"common_address_list\": \"42924-36682-172.17.201.16-172.17.200.50\",\n" +
" \"common_address_type\": 4,\n" +
" \"common_app_full_path\": \"rtp\",\n" +
" \"common_app_label\": \"rtp\",\n" +
" \"common_c2s_byte_num\": 0,\n" +
" \"common_c2s_pkt_num\": 0,\n" +
" \"common_client_ip\": \"172.17.201.16\",\n" +
" \"common_client_port\": 42924,\n" +
" \"common_con_duration_ms\": 1086,\n" +
" \"common_device_id\": \"unknown\",\n" +
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
" \"common_direction\": 73,\n" +
" \"common_end_time\": 1689295970,\n" +
" \"common_flags\": 16401,\n" +
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":5,\\\"Server is Local\\\":1,\\\"S2C\\\":1}\",\n" +
" \"common_l4_protocol\": \"IPv4_UDP\",\n" +
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
" \"common_protocol_label\": \"ETHERNET.IPv4.UDP\",\n" +
" \"common_s2c_byte_num\": 7570,\n" +
" \"common_s2c_pkt_num\": 5,\n" +
" \"common_schema_type\": \"RTP\",\n" +
" \"common_server_ip\": \"172.17.200.50\",\n" +
" \"common_server_port\": 36682,\n" +
" \"common_sessions\": 1,\n" +
" \"common_sled_ip\": \"192.168.42.54\",\n" +
" \"common_start_time\": 1689294629,\n" +
" \"common_stream_dir\": 3,\n" +
" \"common_stream_trace_id\": \"290484792956466709\",\n" +
" \"common_t_vsys_id\": 24,\n" +
" \"common_vsys_id\": 24,\n" +
" \"raw_log_status\": \"CLOSE\",\n" +
" \"rtp_payload_type_c2s\": 0,\n" +
" \"rtp_payload_type_s2c\": 0,\n" +
" \"rtp_pcap_path\": \"http://192.168.44.67:9098/hos/rtp_hos_bucket/rtp_172.17.200.50_172.17.201.16_27988_55806_1689294629.pcap\"\n" +
"}";
ObjectNode obj;
try {
obj = mapper.readValue(json, ObjectNode.class);
} catch (Exception e){
obj = mapper.createObjectNode();
}
final SIPRecord record = new SIPRecord(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
record.setString(SIPRecord.F_CALL_ID, nextId());
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, random.nextInt(3) + 1);
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (record.getStreamDir()) {
case DOUBLE:
record.setInt(Record.F_COMMON_STREAM_DIR, random.nextInt(2) + 1);
break;
case S2C:
default:
}
return v;
}
}

View File

@@ -1,138 +0,0 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records.
* It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port,
* client IP, client port... information for both originator and responder.
*
* @author chaoc
* @since 1.0
*/
public class SIPGenerator extends Generator<ObjectNode> {
public SIPGenerator(final int ratio) {
super(ratio);
}
/**
* Creates a new SIPGenerator with the default ratio of 40.
*/
public SIPGenerator() {
this(40);
}
@Override
public final ObjectNode next() {
int i = random.nextInt(100);
if (i < ratio && state.get() != null) {
ObjectNode t = afterState(state.get());
state.set(null);
return t;
} else {
return state.updateAndGet(t -> generate());
}
}
@Override
protected ObjectNode generate() {
final String json = "{\n" +
" \"common_schema_type\": \"SIP\",\n" +
" \"common_sessions\": 1,\n" +
" \"sip_call_id\": \"3257b5c0f3d0266@spirent.com\",\n" +
" \"sip_originator_description\": \"<sip:caller@spirent.com>\",\n" +
" \"sip_responder_description\": \"sip:callee@spirent.com\",\n" +
" \"sip_originator_sdp_content\": \"v=0\\r\\no=SIP-UA 3395000 3397200 IN IP4 172.17.200.50\\r\\ns=SIP Call\\r\\nc=IN IP4 172.17.200.50\\r\\nt=0 0\\r\\nm=audio 36682 RTP/AVP 0\\r\\na=rtpmap:0 PCMU/8000\\r\\n\",\n" +
" \"sip_originator_sdp_connect_ip\": \"172.17.200.50\",\n" +
" \"sip_originator_sdp_media_port\": 36682,\n" +
" \"sip_originator_sdp_media_type\": \"0 PCMU/8000\",\n" +
" \"common_first_ttl\": 128,\n" +
" \"common_c2s_ipfrag_num\": 0,\n" +
" \"common_s2c_ipfrag_num\": 0,\n" +
" \"common_c2s_tcp_unorder_num\": 0,\n" +
" \"common_s2c_tcp_unorder_num\": 0,\n" +
" \"common_c2s_tcp_lostlen\": 0,\n" +
" \"common_s2c_tcp_lostlen\": 0,\n" +
" \"common_c2s_pkt_retrans\": 2,\n" +
" \"common_s2c_pkt_retrans\": 0,\n" +
" \"common_c2s_byte_retrans\": 1100,\n" +
" \"common_s2c_byte_retrans\": 0,\n" +
" \"common_direction\": 69,\n" +
" \"common_app_full_path\": \"sip\",\n" +
" \"common_app_label\": \"sip\",\n" +
" \"common_tcp_client_isn\": 3004427198,\n" +
" \"common_server_ip\": \"172.17.201.16\",\n" +
" \"common_client_ip\": \"172.17.200.49\",\n" +
" \"common_server_port\": 5060,\n" +
" \"common_client_port\": 6948,\n" +
" \"common_stream_dir\": 1,\n" +
" \"common_address_type\": 4,\n" +
" \"common_address_list\": \"6948-5060-172.17.200.50-172.17.201.16\",\n" +
" \"common_start_time\": 1689295655,\n" +
" \"common_end_time\": 1689295670,\n" +
" \"common_con_duration_ms\": 41467,\n" +
" \"common_s2c_pkt_num\": 0,\n" +
" \"common_s2c_byte_num\": 0,\n" +
" \"common_c2s_pkt_num\": 6,\n" +
" \"common_c2s_byte_num\": 1834,\n" +
" \"common_establish_latency_ms\": 249,\n" +
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
" \"common_flags\": 8201,\n" +
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":6,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\n" +
" \"common_protocol_label\": \"ETHERNET.IPv4.TCP\",\n" +
" \"common_stream_trace_id\": \"290502385134123507\",\n" +
" \"common_l4_protocol\": \"IPv4_TCP\",\n" +
" \"common_sled_ip\": \"192.168.42.54\",\n" +
" \"common_device_id\": \"unknown\",\n" +
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
" \"common_t_vsys_id\": 24,\n" +
" \"common_vsys_id\": 24\n" +
"}";
ObjectNode obj;
try {
obj = mapper.readValue(json, ObjectNode.class);
} catch (Exception e) {
obj = mapper.createObjectNode();
}
final SIPRecord record = new SIPRecord(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue());
record.setString(SIPRecord.F_CALL_ID, nextId() + "@spirent.com");
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (record.getStreamDir()) {
case C2S:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
break;
case S2C:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
break;
default:
}
return v;
}
}

View File

@@ -1,48 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class AddressTest {
@Test
public void testAddressOfWithDifferentPortNumbers() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 9090);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.10", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.20", address.getIp2());
assertEquals(9090, address.getPort2());
}
@Test
public void testAddressOfWithSamePortNumber() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 8080);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.10", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.20", address.getIp2());
assertEquals(8080, address.getPort2());
}
@Test
public void testAddressOfWithInvertedOrder() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.20", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.10", 9090);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.20", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.10", address.getIp2());
assertEquals(9090, address.getPort2());
}
}

View File

@@ -1,104 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
public class SIPPairingFunctionTest {
private static final int INTERVAL_MINUTES = 5;
private static KeyedOneInputStreamOperatorTestHarness<Integer, ObjectNode, ObjectNode> testHarness;
@BeforeAll
static void setUp() throws Exception {
final SIPPairingFunction func = new SIPPairingFunction();
final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator =
new KeyedProcessOperator<>(func);
final TypeInformation<Integer> type = TypeInformation.of(Integer.class);
final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0;
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, type);
final Configuration configuration = new Configuration();
configuration.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, INTERVAL_MINUTES);
testHarness.getExecutionConfig().setGlobalJobParameters(configuration);
testHarness.open();
}
@Test
public void testProperlyPairSIPRecords() throws Exception {
final SIPGenerator sipGenerator = new SIPGenerator();
long current = System.currentTimeMillis();
final ObjectNode obj = sipGenerator.next();
final Record record = new Record(obj);
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
final StreamRecord<ObjectNode> streamRecord = new StreamRecord<>(obj, current);
testHarness.processElement(streamRecord);
assertTrue(testHarness.getOutput().contains(streamRecord));
long interval = Time.minutes(INTERVAL_MINUTES).toMilliseconds();
long nextFireTime = (testHarness.getProcessingTime() / interval + 1) * interval;
assertTrue(testHarness.getProcessingTimeService().getActiveTimerTimestamps().contains(nextFireTime));
final ObjectNode obj1 = obj.deepCopy();
final Record record1 = new Record(obj1);
record1.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
final ObjectNode obj2 = obj.deepCopy();
final Record record2 = new Record(obj2);
record2.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
final MapStateDescriptor<Address, ObjectNodeInfo> descriptor =
new MapStateDescriptor<>(
"sip-state",
TypeInformation.of(Address.class),
TypeInformation.of(ObjectNodeInfo.class)
);
testHarness.processElement(obj1, current);
final MapState<Address, ObjectNodeInfo> mapState = testHarness.getOperator()
.getKeyedStateStore().getMapState(descriptor);
final Set<ObjectNode> objectNodes1 = Lists.newArrayList(mapState.values()).stream()
.map(ObjectNodeInfo::getObj)
.collect(Collectors.toSet());
assertTrue(objectNodes1.contains(obj1));
testHarness.processElement(obj2, current);
final Set<ObjectNode> objectNodes2 = Lists.newArrayList(mapState.values()).stream()
.map(ObjectNodeInfo::getObj)
.collect(Collectors.toSet());
assertFalse(objectNodes2.contains(obj2));
assertTrue(testHarness.getOutput().contains(new StreamRecord<>(obj2, current)));
assertEquals(2, testHarness.getOutput().size());
}
@AfterAll
static void close() throws Exception {
testHarness.close();
}
}

View File

@@ -1,70 +0,0 @@
package com.zdjizhi.flink.voip.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RecordTest {
private static ObjectMapper mapper;
@BeforeAll
static void setUp() {
mapper = new ObjectMapper();
}
@Test
void testGetVSysID() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setInt(Record.F_COMMON_VSYS_ID, 42);
assertEquals(42, record.getVSysID());
obj.set(Record.F_COMMON_VSYS_ID, IntNode.valueOf(40));
assertEquals(40, record.getVSysID());
}
@Test
void testGetSchemaType() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
assertEquals(SchemaType.RTP, record.getSchemaType());
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
assertEquals(SchemaType.VOIP, record.getSchemaType());
}
@Test
void testSetInt() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setInt("intField", 123);
assertEquals(123, obj.get("intField").intValue());
}
@Test
void testSetString() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString("stringField", "testValue");
assertEquals("testValue", obj.get("stringField").textValue());
}
@Test
void testMerge() {
final ObjectNode obj = mapper.createObjectNode();
obj.set("field1", IntNode.valueOf(1));
ObjectNode other = mapper.createObjectNode();
other.set("field2", TextNode.valueOf("value2"));
Record record = new Record(obj);
record.merge(other);
assertEquals(1, obj.get("field1").intValue());
assertEquals("value2", obj.get("field2").textValue());
}
}

View File

@@ -0,0 +1,4 @@
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240001,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.64.8","server_ip":"192.168.39.62","client_port":25524,"server_port":4580,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830004000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240002,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.1","server_ip":"192.0.2.1","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1025,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000100,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":8192,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"NGMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.85","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240003,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.164.18","server_ip":"192.168.39.162","client_port":65121,"server_port":4670,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240004,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.2","server_ip":"192.0.2.2","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"IUMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}

44
tools/dist/target.xml vendored Normal file
View File

@@ -0,0 +1,44 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>bin</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources/jobs</directory>
<includes>
<include>*.yml</include>
</includes>
<fileMode>0755</fileMode>
<lineEnding>lf</lineEnding>
<directoryMode>0644</directoryMode>
<outputDirectory>./</outputDirectory>
</fileSet>
</fileSets>
</assembly>

394
tools/maven/checkstyle.xml Normal file
View File

@@ -0,0 +1,394 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
This is a checkstyle configuration file. For descriptions of
what the following rules do, please see the checkstyle configuration
page at http://checkstyle.sourceforge.net/config.html.
-->
<module name="Checker">
<module name="RegexpSingleline">
<!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
<property name="format" value="((//.*)|(\*.*))TODO\("/>
<property name="message" value="TODO comments must not include usernames."/>
<property name="severity" value="error"/>
</module>
<module name="RegexpSingleline">
<property name="format" value="\s+$"/>
<property name="message" value="Trailing whitespace"/>
<property name="severity" value="error"/>
</module>
<module name="RegexpSingleline">
<property name="format" value="Throwables.propagate\("/>
<property name="message" value="Throwables.propagate is deprecated"/>
<property name="severity" value="error"/>
</module>
<!-- Prevent *Tests.java as tools may not pick them up -->
<module name="RegexpOnFilename">
<property name="fileNamePattern" value=".*Tests\.java$"/>
</module>
<module name="SuppressionFilter">
<property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/>
</module>
<module name="FileLength">
<property name="max" value="3000"/>
</module>
<!-- All Java AST specific tests live under TreeWalker module. -->
<module name="TreeWalker">
<!-- Allow use of comment to suppress javadocstyle -->
<module name="SuppressionCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<property name="checkFormat" value="$1"/>
</module>
<!-- Prohibit T.getT() methods for standard boxed types -->
<module name="Regexp">
<property name="format" value="Boolean\.getBoolean"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<module name="Regexp">
<property name="format" value="Integer\.getInteger"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<module name="Regexp">
<property name="format" value="Long\.getLong"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<!--
IllegalImport cannot blacklist classes so we have to fall back to Regexp.
-->
<!-- forbid use of commons lang validate -->
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
<property name="illegalPattern" value="true"/>
<property name="message"
value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
</module>
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang\."/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use commons-lang3 instead of commons-lang."/>
</module>
<module name="Regexp">
<property name="format" value="org\.codehaus\.jettison"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use flink-shaded-jackson instead of jettison."/>
</module>
<module name="Regexp">
<property name="format" value="org\.testcontainers\.shaded"/>
<property name="illegalPattern" value="true"/>
<property name="message"
value="Use utilities from appropriate library instead of org.testcontainers."/>
</module>
<!-- Enforce Java-style array declarations -->
<module name="ArrayTypeStyle"/>
<module name="TodoComment">
<!-- Checks that disallowed strings are not used in comments. -->
<property name="format" value="(FIXME)|(XXX)"/>
</module>
<!--
IMPORT CHECKS
-->
<module name="RedundantImport">
<!-- Checks for redundant import statements. -->
<property name="severity" value="error"/>
<message key="import.redundancy"
value="Redundant import {0}."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs"
value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged"/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="com.fasterxml.jackson"/>
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="org.codehaus.jackson"/>
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="org.objectweb.asm"/>
<message key="import.illegal" value="{0}; Use flink-shaded-asm instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="io.netty"/>
<message key="import.illegal" value="{0}; Use flink-shaded-netty instead."/>
</module>
<module name="RedundantModifier">
<!-- Checks for redundant modifiers on various symbol definitions.
See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
We exclude METHOD_DEF to allow final methods in final classes to make them more future-proof.
-->
<property name="tokens"
value="VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
</module>
<!--
IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
code and some useful code. So we need to fall back to Regexp.
-->
<module name="RegexpSinglelineJava">
<property name="format" value="^import com.google.common.base.Preconditions;$"/>
<property name="message" value="Static import functions from Guava Preconditions"/>
</module>
<module name="UnusedImports">
<property name="severity" value="error"/>
<property name="processJavadoc" value="true"/>
<message key="import.unused"
value="Unused import: {0}."/>
</module>
<!--
NAMING CHECKS
-->
<!-- Item 38 - Adhere to generally accepted naming conventions -->
<module name="PackageName">
<!-- Validates identifiers for package names against the
supplied expression. -->
<!-- Here the default checkstyle rule restricts package name parts to
seven characters, this is not in line with common practice at Google.
-->
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/>
<property name="severity" value="error"/>
</module>
<module name="TypeNameCheck">
<!-- Validates static, final fields against the
expression "^[A-Z][a-zA-Z0-9]*$". -->
<metadata name="altname" value="TypeName"/>
<property name="severity" value="error"/>
</module>
<module name="ConstantNameCheck">
<!-- Validates non-private, static, final fields against the supplied
public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". -->
<metadata name="altname" value="ConstantName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^([A-Z][A-Z0-9]*(_[A-Z0-9]+)*|FLAG_.*)$"/>
<message key="name.invalidPattern"
value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/>
<property name="severity" value="error"/>
</module>
<module name="StaticVariableNameCheck">
<!-- Validates static, non-final fields against the supplied
expression "^[a-z][a-zA-Z0-9]*_?$". -->
<metadata name="altname" value="StaticVariableName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/>
<property name="severity" value="error"/>
</module>
<module name="MemberNameCheck">
<!-- Validates non-static members against the supplied expression. -->
<metadata name="altname" value="MemberName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
<property name="severity" value="error"/>
</module>
<module name="MethodNameCheck">
<!-- Validates identifiers for method names. -->
<metadata name="altname" value="MethodName"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
<property name="severity" value="error"/>
</module>
<module name="ParameterName">
<!-- Validates identifiers for method parameters against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<module name="LocalFinalVariableName">
<!-- Validates identifiers for local final variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<module name="LocalVariableName">
<!-- Validates identifiers for local variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<!--
LENGTH and CODING CHECKS
-->
<!-- Checks for braces around if and else blocks -->
<module name="NeedBraces">
<property name="severity" value="error"/>
<property name="tokens"
value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/>
</module>
<module name="UpperEll">
<!-- Checks that long constants are defined with an upper ell.-->
<property name="severity" value="error"/>
</module>
<module name="FallThrough">
<!-- Warn about falling through to the next case statement. Similar to
javac -Xlint:fallthrough, but the check is suppressed if a single-line comment
on the last non-blank line preceding the fallen-into case contains 'fall through' (or
some other variants that we don't publicized to promote consistency).
-->
<property name="reliefPattern"
value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/>
<property name="severity" value="error"/>
</module>
<!-- Checks for over-complicated boolean expressions. -->
<module name="SimplifyBooleanExpression"/>
<!-- Detects empty statements (standalone ";" semicolon). -->
<module name="EmptyStatement"/>
<!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
<module name="RegexpSinglelineJava">
<property name="format" value=";{2,}"/>
<property name="message" value="Use one semicolon"/>
<property name="ignoreComments" value="true"/>
</module>
<!--
MODIFIERS CHECKS
-->
<module name="ModifierOrder">
<!-- Warn if modifier order is inconsistent with JLS3 8.1.1, 8.3.1, and
8.4.3. The prescribed order is:
public, protected, private, abstract, static, final, transient, volatile,
synchronized, native, strictfp
-->
<property name="severity" value="error"/>
</module>
<!--
WHITESPACE CHECKS
-->
<module name="EmptyLineSeparator">
<!-- Checks for empty line separator between tokens. The only
excluded token is VARIABLE_DEF, allowing class fields to
be declared on consecutive lines.
-->
<property name="allowMultipleEmptyLines" value="false"/>
<property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
<property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF,
INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
CTOR_DEF"/>
</module>
<module name="SingleSpaceSeparator"/>
<module name="WhitespaceAround">
<!-- Checks that various tokens are surrounded by whitespace.
This includes most binary operators and keywords followed
by regular or curly braces.
-->
<property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR,
BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN,
EQUAL, GE, GT, LAMBDA, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE,
LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN,
LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS,
MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION,
SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/>
<property name="severity" value="error"/>
</module>
<module name="WhitespaceAfter">
<!-- Checks that commas, semicolons and typecasts are followed by
whitespace.
-->
<property name="tokens" value="COMMA, SEMI, TYPECAST"/>
</module>
<module name="NoWhitespaceAfter">
<!-- Checks that there is no whitespace after various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS,
UNARY_PLUS"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>
<module name="NoWhitespaceBefore">
<!-- Checks that there is no whitespace before various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>
<module name="OperatorWrap">
<!-- Checks that assignment operators are at the end of the line. -->
<property name="option" value="eol"/>
<property name="tokens" value="ASSIGN"/>
</module>
<module name="ParenPad">
<!-- Checks that there is no whitespace before close parens or after
open parens.
-->
<property name="severity" value="error"/>
</module>
</module>
</module>

View File

@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
<Match>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>
<Match>
<Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/>
</Match>
</FindBugsFilter>

View File

@@ -0,0 +1,12 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<!-- target directory is not relevant for checkstyle -->
<suppress
files="[\\/]target[\\/]"
checks=".*"/>
</suppressions>