15 Commits

Author SHA1 Message Date
chaoc
e9616a2400 test(conf): add test config file 2023-08-02 16:57:30 +08:00
chaoc
7cb70bfd8b feat(app): add main config file 2023-08-02 16:57:08 +08:00
chaoc
82cfc3274a feat(app): add main app 2023-08-02 16:56:36 +08:00
chaoc
a4cecd6e82 feat(functions): add voip fusion functions 2023-08-02 16:56:24 +08:00
chaoc
ef5a8e57b2 feat(functions): add state types 2023-08-02 16:55:57 +08:00
chaoc
9af8db466e test(functions): add unit-case test 2023-08-02 15:49:40 +08:00
chaoc
8d4c1dbd18 test(utils): add utils for generate record data 2023-08-02 15:49:07 +08:00
chaoc
e573d88af3 feat(record): add type of record date 2023-08-02 15:48:23 +08:00
chaoc
f264303a6d feat(functions): impl sip pairing functions 2023-08-02 15:47:48 +08:00
chaoc
92cd8dc614 feat(conf): add some conf utils 2023-08-02 15:47:07 +08:00
chaoc
81b3135b44 chore(test): add deps for test 2023-08-02 15:46:44 +08:00
chaoc
4fad35afa8 feat(record): add data record visit utils 2023-08-01 10:27:50 +08:00
chaoc
f313313d9e chore: init maven deps 2023-08-01 09:59:57 +08:00
chaoc
da50e8fcdb chore: add gitignore 2023-08-01 09:59:42 +08:00
chaoc
cdc6862913 docs: update readme 2023-08-01 09:56:38 +08:00
53 changed files with 1059 additions and 72962 deletions

View File

@@ -1,40 +0,0 @@
image: 192.168.40.153:8082/common/maven:3.8.1-openjdk-11-slim
variables:
MAVEN_CLI_OPTS: "--batch-mode --errors --show-version"
stages:
- check
- test
- build
release-version-check:
stage: check
script:
- echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
- mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check
- |-
if `mvn $MAVEN_CLI_OPTS dependency:get@release-deploy-check > /dev/null 2>&1`; then
echo "The current version has been deployed."
exit 1
else
echo "The current version has not been deployed."
fi
rules:
- if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "master" && $CI_PIPELINE_SOURCE == "merge_request_event"
test:
stage: test
script:
- echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
- mvn $MAVEN_CLI_OPTS clean test
only:
- merge_requests
build:
stage: build
script:
- echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
- mvn clean site deploy -DskipTests
only:
- master

View File

@@ -1,10 +1,10 @@
# SIP RTP Correlation
# flink-voip-fusion
## 简介
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIPSession Initiation Protocol和 RTPReal-time Transport Protocol数据将它们融合成完整的 VoIPVoice over Internet Protocol通话数据。
VoIP 数据融合项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIPSession Initiation Protocol和 RTPReal-time Transport Protocol数据将它们融合成完整的 VoIPVoice over Internet Protocol通话数据。
SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
VoIP 数据融合项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
## 编译和打包
@@ -17,9 +17,13 @@ mvn clean package
使用以下命令运行Flink任务
```shell
flink run -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml
flink run -c className path/to/flink-voip-fusion-xxx.jar
```
## 配置项说明
TODO 待补充
## 贡献
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。

656
pom.xml
View File

@@ -5,193 +5,107 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.geedgenetworks.application</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>2.2.0</version>
<groupId>com.zdjizhi</groupId>
<artifactId>flink-voip-fusion</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Flink : SIP-RTP : Correlation</name>
<name>Flink : VoIP : Fusion</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.13.6</flink.version>
<easy.stream.version>1.3-rc2</easy.stream.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<junit.version>5.8.0</junit.version>
<jackson.version>2.13.2.20220328</jackson.version>
<scalatest.version>3.2.15</scalatest.version>
</properties>
<distributionManagement>
<repository>
<id>platform-releases</id>
<url>http://192.168.40.153:8081/content/repositories/platform-release</url>
<uniqueVersion>true</uniqueVersion>
</repository>
<snapshotRepository>
<id>platform-snapshots</id>
<url>http://192.168.40.153:8081/content/repositories/platform-snapshot</url>
</snapshotRepository>
<site>
<id>platform-site</id>
<url>
dav:http://192.168.40.153:8081/content/sites/platform-site/platform/application/sip-rtp-correlate-${project.version}
</url>
</site>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>xyz.downgoon</groupId>
<artifactId>snowflake</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Easy Stream -->
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-common</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-core</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-filter-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-split-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-correlate-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-union-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-kafka-connector</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-text-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-socket-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-json-format</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-flink-shim</artifactId>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- DEV -->
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.4.2</version>
</dependency>
<!-- LOG -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -201,272 +115,132 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- Common -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.32</version>
</dependency>
<!-- Easy Stream-->
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-common</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-core</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-filter-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-kafka-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-json-format</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-flink-shim</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<type>pom</type>
<scope>import</scope>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.1.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<suppressionsLocation>${basedir}/tools/maven/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<configLocation>${basedir}/tools/maven/checkstyle.xml</configLocation>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.40</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>java-style-check</id>
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<sourceDirectories>src/main/java</sourceDirectories>
</configuration>
</execution>
<execution>
<id>java-test-style-check</id>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<testSourceDirectories>src/test/java</testSourceDirectories>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>4.4.2.2</version>
<configuration>
<xmlOutput>true</xmlOutput>
<!-- Low, Medium, High ('Low' is strictest) -->
<threshold>Low</threshold>
<effort>default</effort>
<spotbugsXmlOutputDirectory>${project.build.directory}/spotbugs</spotbugsXmlOutputDirectory>
<excludeFilterFile>${basedir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
<failOnError>true</failOnError>
</configuration>
<executions>
<execution>
<id>findbugs-main</id>
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>findbugs-test</id>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<includeTests>true</includeTests>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<version>3.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<version>3.1.1</version>
<executions>
<execution>
<id>default-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<finalName>${project.artifactId}-${project.version}</finalName>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.mockito:mockito-core</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
@@ -475,172 +249,10 @@
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>build-jobs</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-yml-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>tools/dist/target.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>site-resources</id>
<phase>pre-site</phase>
<goals>
<goal>resources</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>src/site</directory>
<filtering>true</filtering>
<includes>
<include>**</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<configuration>
<skip>false</skip>
</configuration>
<executions>
<execution>
<id>default-site</id>
<goals>
<goal>site</goal>
</goals>
<phase>site</phase>
<configuration>
<siteDirectory>${project.build.outputDirectory}</siteDirectory>
</configuration>
</execution>
<execution>
<id>site-deploy</id>
<goals>
<goal>stage-deploy</goal>
</goals>
<phase>deploy</phase>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- CI plugins -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>release-deploy-check</id>
<goals>
<goal>get</goal>
</goals>
<configuration>
<groupId>${project.groupId}</groupId>
<artifactId>easy-stream-common</artifactId>
<version>${project.version}</version>
<remoteRepositories>${project.distributionManagement.repository.url}
</remoteRepositories>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version>
<executions>
<execution>
<id>release-version-check</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireReleaseVersion>
<message>SNAPSHOT versions ${project.version} are not allowed.</message>
</requireReleaseVersion>
</rules>
</configuration>
</execution>
<execution>
<id>snapshot-version-check</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireSnapshotVersion>
<message>Non-SNAPSHOT versions ${project.version} are not allowed.</message>
</requireSnapshotVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.9.1</version>
<configuration>
<outputDirectory>${project.build.directory}/site</outputDirectory>
<relativizeDecorationLinks>false</relativizeDecorationLinks>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-webdav-jackrabbit</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.maven.doxia</groupId>
<artifactId>doxia-module-markdown</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.1</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@@ -1,35 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip;
import com.geedgenetworks.flink.easy.application.voip.udf.*;
import com.geedgenetworks.flink.easy.common.api.UDFFactory;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.Map;
public class VoipUDFFactory implements UDFFactory {
private static final Map<String, UserDefinedFunction> R =
new HashMap<>() {{
put("IS_IP_ADDRESS", new IsIpAddress());
put("IS_INTERNAL_IP_ADDRESS", new IsInternalIpAddress());
put("IS_EXTERNAL_IP_ADDRESS", new IsExternalIpAddress());
put("HAS_IP_ADDRESS", new HasIpAddress());
put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress());
put("STREAM_DIR", new StreamDir());
put("STREAM_DIR_SET", new StreamDirSet());
put("FIND_NOT_BLANK", new FindNotBlank());
put("DISTINCT_CONCAT", new DistinctConcat());
put("SORT_ADDRESS", new SortAddress());
put("SNOWFLAKE_ID", new SnowflakeID());
}};
@Override
public Map<String, UserDefinedFunction> register() {
return R;
}
}

View File

@@ -1,18 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DistinctConcat extends ScalarFunction {
public @DataTypeHint("STRING") String eval(String s1, String s2) {
return Stream.of(s1, s2).filter(StringUtils::isNotBlank)
.map(StringUtils::trim)
.distinct()
.collect(Collectors.joining(","));
}
}

View File

@@ -1,15 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class FindNotBlank extends ScalarFunction {
public @DataTypeHint("STRING") String eval(String s1, String s2) {
if (StringUtils.isBlank(s1) && StringUtils.isNotBlank(s2)) {
return s2;
}
return s1;
}
}

View File

@@ -1,19 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class HasExternalIpAddress extends ScalarFunction {
private final IsExternalIpAddress isExternalIpAddress = new IsExternalIpAddress();
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
if (null == ipaddr) {
return false;
}
for (var ip : ipaddr) {
return isExternalIpAddress.eval(ip);
}
return false;
}
}

View File

@@ -1,18 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
public class HasIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
if (null == ipaddr) {
return false;
}
for (var ip : ipaddr) {
if (ip != null && isIpAddress(ip)) {
return true;
}
}
return false;
}
}

View File

@@ -1,25 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import cn.hutool.core.lang.Validator;
import cn.hutool.core.net.NetUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.functions.ScalarFunction;
public abstract class IpAddressScalarFunction extends ScalarFunction {
protected static boolean isIpAddress(String ipaddr) {
return isIpv4(ipaddr) || isIpv6(ipaddr);
}
protected static boolean isIpv4(String ipaddr) {
return Validator.isIpv4(StringUtils.trim(ipaddr));
}
protected static boolean isIpv6(String ipaddr) {
return Validator.isIpv6(StringUtils.trim(ipaddr));
}
protected static boolean isInternalIpAddress(String ipaddr) {
return NetUtil.isInnerIP(StringUtils.trim(ipaddr));
}
}

View File

@@ -1,13 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
public class IsExternalIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (ipaddr == null || !isIpAddress(ipaddr)) {
return false;
}
return !isInternalIpAddress(ipaddr);
}
}

View File

@@ -1,13 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
public class IsInternalIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (!isIpAddress(ipaddr)) {
return false;
}
return isInternalIpAddress(ipaddr);
}
}

View File

@@ -1,13 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
public class IsIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (null == ipaddr) {
return false;
}
return isIpAddress(ipaddr);
}
}

View File

@@ -1,14 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import xyz.downgoon.snowflake.Snowflake;
public class SnowflakeID extends ScalarFunction {
private static final Snowflake SNOWFLAKE = new Snowflake(1, 1);
public @DataTypeHint("BIGINT") Long eval() {
return SNOWFLAKE.nextId();
}
}

View File

@@ -1,48 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import cn.hutool.core.net.NetUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import java.util.ArrayList;
import java.util.List;
public class SortAddress extends IpAddressScalarFunction {
public @DataTypeHint("STRING")
String eval(
String ip1, Integer port1, String ip2, Integer port2) {
return of(Tuple2.of(ip1, port1), Tuple2.of(ip2, port2));
}
public static String of(
Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
var list = new ArrayList<>(List.of(a1, a2));
if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)
|| !isIpAddress(a1.f0) || !isIpAddress(a2.f0)) {
return a1.f0 + ":" + a1.f1 + "," + a2.f0 + ":" + a2.f1;
}
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return compareAddress(a.f0, b.f0);
} else {
return a.f1.compareTo(b.f1);
}
});
return String.format("%s:%s,%s:%s",
list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
}
private static int compareAddress(String ip1, String ip2) {
try {
var v1 = isIpv4(ip1) ? NetUtil.ipv4ToLong(ip1) :
NetUtil.ipv6ToBigInteger(ip1).longValue();
var v2 = isIpv4(ip2) ? NetUtil.ipv4ToLong(ip2) :
NetUtil.ipv6ToBigInteger(ip2).longValue();
return Long.compare(v1, v2);
} catch (Exception e) {
return 0;
}
}
}

View File

@@ -1,21 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class StreamDir extends ScalarFunction {
public @DataTypeHint("INT") Integer eval(Long flags) {
int v = 0;
if (flags == null) {
return v;
}
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return v;
}
}

View File

@@ -1,22 +0,0 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class StreamDirSet extends ScalarFunction {
public @DataTypeHint("BIGINT") Long eval(Long flags) {
if (flags == null) {
return 8192 + 16384L;
}
long r = 0;
if ((flags & 8192) == 0) {
r += 8192;
}
if ((flags & 16384) == 0) {
r += 16384;
}
return r;
}
}

View File

@@ -1 +0,0 @@
com.geedgenetworks.flink.easy.application.voip.VoipUDFFactory

View File

@@ -0,0 +1,7 @@
source.kafka.topic=aaa
source.kafka.props.bootstrap.servers=127.0.0.1:9292
source.kafka.props.group.id=flink-voip-fusion
source.kafka.props.auto.offset.reset=earliest
sink.kafka.topic=bbb
sink.kafka.props.bootstrap.servers=127.0.0.1:9292

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,52 @@
package com.zdjizhi.flink.voip
import com.zdjizhi.flink.voip.conf._
import com.zdjizhi.flink.voip.functions.TypeSplitFunction._
import com.zdjizhi.flink.voip.functions.{SIPPairingFunction, TypeSplitFunction, VoIPFusionFunction}
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* App Main.
*
* @author chaoc
* @since 1.0
*/
object FusionApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
require(args.nonEmpty, "Error: Not found properties path. \nUsage: flink -c xxx xxx.jar app.properties.")
val tool = ParameterTool.fromPropertiesFile(args.head)
env.getConfig.setGlobalJobParameters(tool.getConfiguration)
val consumer = new FlinkKafkaConsumer[ObjectNode](
tool.getConfiguration.get(FusionConfigs.SOURCE_KAFKA_TOPIC),
new JsonNodeDeserializationSchema,
tool.getProperties(FusionConfigs.SOURCE_KAFKA_PROPERTIES_PREFIX))
val schemaTypeSplitStream = env.addSource(consumer)
.process(new TypeSplitFunction)
val sipStream = schemaTypeSplitStream.getSideOutput(sipSchemaTypeOutputTag)
val sipDoubleStream = sipStream.keyBy { i => (i.vSysID, i.callID) }
.process(new SIPPairingFunction)
val rtpStream = schemaTypeSplitStream.getSideOutput(rtpSchemaTypeOutputTag)
rtpStream.keyBy(_.vSysID).connect(sipDoubleStream.keyBy(_.vSysID))
.process(new VoIPFusionFunction)
env.execute("VoIP Fusion Job")
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.flink.voip.conf
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
import java.lang.{Long => JLong}
/**
* An object containing configuration options for the Fusion application.
*
* @author chaoc
* @since 1.0
*/
object FusionConfigs {
val SOURCE_KAFKA_TOPIC: ConfigOption[String] =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("")
val SINK_KAFKA_TOPIC: ConfigOption[String] =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("")
val SOURCE_KAFKA_PROPERTIES_PREFIX: String = "source.kafka.props."
val SINK_KAFKA_PROPERTIES_PREFIX: String = "sink.kafka.props."
/**
* The configuration option for the interval at which SIP (Session Initiation Protocol) state data
* should be cleared.
*/
val SIP_STATE_CLEAR_INTERVAL: ConfigOption[JLong] =
ConfigOptions.key("")
.longType()
.defaultValue(Time.minutes(1).toMilliseconds)
.withDescription("")
val RTP_STATE_CLEAR_INTERVAL: ConfigOption[JLong] =
ConfigOptions.key("")
.longType()
.defaultValue(Time.minutes(3).toMilliseconds)
.withDescription("")
}

View File

@@ -0,0 +1,32 @@
package com.zdjizhi.flink.voip.conf
import org.apache.flink.api.java.utils.ParameterTool
import java.util.Properties
import scala.collection.JavaConverters._
/**
* A wrapper class that extends the Flink `ParameterTool` to provide utility methods for handling
* properties with a specific prefix. This class allows retrieving properties that start with the
* given `prefix` and converts them into a `java.util.Properties` object.
*
* @param tool The original Flink `ParameterTool` instance.
* @author chaoc
* @since 1.0
*/
class FusionParameterTool(tool: ParameterTool) {
/**
* Retrieves properties from the underlying `ParameterTool` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
def getProperties(prefix: String): Properties = {
val map = tool.toMap.asScala.filterKeys(_.startsWith(prefix))
.map { case (key, value) => (key.stripPrefix(prefix), value) }
ParameterTool.fromMap(map.asJava).getProperties
}
}

View File

@@ -0,0 +1,11 @@
package com.zdjizhi.flink.voip
import org.apache.flink.api.java.utils.ParameterTool
import scala.language.implicitConversions
package object conf {
implicit def asFusionParameterTool(tool: ParameterTool): FusionParameterTool = new FusionParameterTool(tool)
}

View File

@@ -0,0 +1,53 @@
package com.zdjizhi.flink.voip.functions
import org.apache.flink.api.common.functions.RichFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimerService
/**
* A trait that provides utility functions for Flink functions.
*
* @author chaoc
* @since 1.0
*/
trait FunctionHelper extends RichFunction {
/**
* Get the global configuration for the current Flink job.
*
* @return The global configuration as a Configuration object.
*/
def getGlobalConfiguration: Configuration = getRuntimeContext.getExecutionConfig
.getGlobalJobParameters.asInstanceOf[Configuration]
/**
* Get a MapState with the given name, key, and value types.
*
* @param name The name of the MapState.
* @param key The class representing the type of keys in the MapState.
* @param value The class representing the type of values in the MapState.
* @tparam K The type of keys in the MapState.
* @tparam V The type of values in the MapState.
* @return The MapState with the given name, key, and value types.
*/
def getMapState[K, V](name: String, key: Class[K], value: Class[V]): MapState[K, V] = {
val descriptor = new MapStateDescriptor[K, V](name, key, value)
getRuntimeContext.getMapState(descriptor)
}
/**
* Register the next fire timestamp for the given TimerService and fire interval.
*
* @param timeService The TimerService used to register the timer.
* @param fireInterval The interval at which the timer should fire, represented as a Time object.
*/
def registerNextFireTimestamp(timeService: TimerService, fireInterval: Time): Unit = {
val currentTime = timeService.currentProcessingTime()
val nextFireTime = (currentTime / fireInterval.toMilliseconds + 1) * fireInterval.toMilliseconds
timeService.registerProcessingTimeTimer(nextFireTime)
}
}

View File

@@ -0,0 +1,68 @@
package com.zdjizhi.flink.voip.functions
import com.zdjizhi.flink.voip.conf.FusionConfigs.SIP_STATE_CLEAR_INTERVAL
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
import com.zdjizhi.flink.voip.records.{Record, StreamDirs}
import org.apache.flink.api.common.time.Time
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
* SIP records are paired when they have the same addresses but opposite stream directions.
*
* @author chaoc
* @since 1.0
*/
class SIPPairingFunction extends KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]
with FunctionHelper {
private lazy val fireInterval: Time = Time.milliseconds(getGlobalConfiguration.get(SIP_STATE_CLEAR_INTERVAL))
// A MapState to store SIP records with their addresses and expiration time.
private lazy val mapState = getMapState("sip-state", classOf[Address], classOf[ObjectNodeWithExpiration])
override def processElement(obj: ObjectNode,
ctx: KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]#Context,
out: Collector[ObjectNode]): Unit = {
// Register a timer for the next clearing interval.
registerNextFireTimestamp(ctx.timerService(), fireInterval)
// When SIP is a one-way stream.
if (obj.streamDir != StreamDirs.DOUBLE) {
// Create an Address instance based on server and client IPs and ports.
val address = Address((obj.serverIp, obj.serverPort), (obj.clientIp, obj.clientPort))
// If the address is already stored in the mapState and has the opposite stream direction,
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
if (mapState.contains(address) && mapState.get(address).obj.streamDir != obj.streamDir /* TODO consider stream direction */ ) {
obj.merge(mapState.get(address).obj)
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.DOUBLE))
out.collect(obj)
mapState.remove(address)
} else {
// If the address is not yet in the mapState, add it with its expiration time.
val value = ObjectNodeWithExpiration(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds)
mapState.put(address, value)
}
} else {
// If SIP is a double stream, pairing isn't required, directly output the record.
out.collect(obj)
}
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]#OnTimerContext,
out: Collector[ObjectNode]): Unit = {
val iterator = mapState.entries().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
// Remove expired entries from the mapState based on their expiration time.
if (entry.getValue.expireTime <= timestamp) {
mapState.remove(entry.getKey)
}
}
registerNextFireTimestamp(ctx.timerService(), fireInterval)
}
}

View File

@@ -0,0 +1,49 @@
package com.zdjizhi.flink.voip.functions
import com.zdjizhi.flink.voip.functions.TypeSplitFunction._
import com.zdjizhi.flink.voip.records.Record.Implicits._
import com.zdjizhi.flink.voip.records.SchemaTypes._
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
*
* @author chaoc
* @since 1.0
*/
class TypeSplitFunction extends ProcessFunction[ObjectNode, ObjectNode] {
override def processElement(obj: ObjectNode,
ctx: ProcessFunction[ObjectNode, ObjectNode]#Context,
out: Collector[ObjectNode]): Unit = {
// Split record based on its 'schemaType' field.
obj.schemaType match {
case SIP => ctx.output(sipSchemaTypeOutputTag, obj)
case RTP => ctx.output(rtpSchemaTypeOutputTag, obj)
case _ =>
}
}
}
/**
* Defining the OutputTags for SIP and RTP records.
*/
object TypeSplitFunction {
/**
* OutputTag for SIP records.
*/
val sipSchemaTypeOutputTag: OutputTag[ObjectNode] =
OutputTag[ObjectNode]("schema-type-sip")
/**
* OutputTag for RTP records.
*/
val rtpSchemaTypeOutputTag: OutputTag[ObjectNode] =
OutputTag[ObjectNode]("schema-type-rtp")
}

View File

@@ -0,0 +1,101 @@
package com.zdjizhi.flink.voip.functions
import com.zdjizhi.flink.voip.conf.FusionConfigs.RTP_STATE_CLEAR_INTERVAL
import com.zdjizhi.flink.voip.records.{Record, SchemaTypes, StreamDirs}
import org.apache.flink.api.common.time.Time
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ObjectNode, TextNode}
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.util.Collector
/**
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
* RTP records, and it uses timers to trigger regular clearing of the state.
*
* @author chaoc
* @since 1.0
*/
class VoIPFusionFunction extends KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode] with FunctionHelper {
// The maximum number of RTP lines allowed per SIP for fusion.
private val MAX_RTP_LINES: Int = 2
private lazy val fireInterval: Time = Time.milliseconds(getGlobalConfiguration.get(RTP_STATE_CLEAR_INTERVAL))
private lazy val sipDoubleState = getMapState("sip-state", classOf[Address], classOf[ObjectNodeWithInfo])
private lazy val rtpState = getMapState("rtp-state", classOf[Address], classOf[ObjectNodeWithExpiration])
override def processElement1(obj: ObjectNode,
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#Context,
out: Collector[ObjectNode]): Unit = {
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
val address = Address((obj.originatorSdpConnectIp, obj.originatorSdpMediaPort),
(obj.responderSdpConnectIp, obj.responderSdpMediaPort))
sipDoubleState.put(address, ObjectNodeWithInfo(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds, 0))
registerNextFireTimestamp(ctx.timerService(), fireInterval)
}
override def processElement2(obj: ObjectNode,
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#Context,
out: Collector[ObjectNode]): Unit = {
import com.zdjizhi.flink.voip.records.Record.Implicits._
val address = Address((obj.serverIp, obj.serverPort), (obj.clientIp, obj.clientPort))
if (sipDoubleState.contains(address)) {
val info = sipDoubleState.get(address)
obj.merge(info.obj)
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaTypes.VoIP))
out.collect(obj)
obj.streamDir match {
case StreamDirs.DOUBLE =>
// In the context of VoIP fusion, only one RTP double directional stream
sipDoubleState.remove(address)
case _ =>
// Save the number of fused RTP unidirectional streams
sipDoubleState.put(address, info.copy(times = info.times + 1))
}
} else {
rtpState.put(address, ObjectNodeWithExpiration(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds))
}
registerNextFireTimestamp(ctx.timerService(), fireInterval)
}
override def onTimer(timestamp: Long,
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#OnTimerContext,
out: Collector[ObjectNode]): Unit = {
import com.zdjizhi.flink.voip.records.Record.Implicits._
val iterator = rtpState.iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val obj = entry.getValue.obj
val address = entry.getKey
if (sipDoubleState.contains(address)) {
val info = sipDoubleState.get(address)
obj.streamDir match {
case StreamDirs.DOUBLE =>
sipDoubleState.remove(address)
case _ if info.times >= MAX_RTP_LINES - 1 =>
// One RTP unidirectional stream has already been fused
sipDoubleState.remove(address)
}
obj.merge(info.obj)
.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaTypes.VoIP))
out.collect(obj)
}
if (entry.getValue.expireTime <= timestamp) {
rtpState.remove(entry.getKey)
}
}
sipDoubleState.iterator().forEachRemaining(entry => {
if (entry.getValue.expireTime <= timestamp) {
sipDoubleState.remove(entry.getKey)
}
})
registerNextFireTimestamp(ctx.timerService(), fireInterval)
}
}

View File

@@ -0,0 +1,61 @@
package com.zdjizhi.flink.voip.functions
import com.zdjizhi.utils.IPUtil
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
/**
* A case class representing an address with two IP and port pairs.
*
* @param ip1 The first IP address.
* @param port1 The first port number.
* @param ip2 The second IP address.
* @param port2 The second port number.
* @author chaoc
* @since 1.0
*/
case class Address(ip1: String, port1: Int, ip2: String, port2: Int)
object Address {
/**
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
* the numeric value of the IP address.
*
* @param address1 The first address information as a tuple (IP address, port).
* @param address2 The second address information as a tuple (IP address, port).
* @return An Address instance with addresses sorted and reordered.
*/
def apply(address1: (String, Int), address2: (String, Int)): Address = {
val seq = (address1 :: address2 :: Nil).sortWith {
case (a, b) =>
if (a._2 == b._2) {
IPUtil.getIpDesimal(a._1) < IPUtil.getIpDesimal(b._1)
} else {
a._2 < b._2
}
}
// Create an Address instance with the first address having the smaller port number,
// and if the ports are equal, the smaller IP address comes first.
Address(seq.head._1, seq.head._2, seq.last._1, seq.last._2)
}
}
/**
* A case class representing an ObjectNode with an expiration time.
*
* @param obj The ObjectNode containing data.
* @param expireTime The expiration time for the object.
*/
case class ObjectNodeWithExpiration(obj: ObjectNode, expireTime: Long)
/**
* A case class representing an ObjectNode with an expiration time and a pair times.
*
* @param obj The ObjectNode containing data.
* @param expireTime The expiration time for the object.
* @param times The pair times for the object.
*/
case class ObjectNodeWithInfo(obj: ObjectNode, expireTime: Long, times: Int)

View File

@@ -0,0 +1,137 @@
package com.zdjizhi.flink.voip.records
import com.zdjizhi.flink.voip.records.Record._
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import scala.language.implicitConversions
/**
* 用于解析和访问数据记录的类。
*
* @param obj 包含数据记录的 ObjectNode 对象
* @constructor 创建一个 Record 对象,用于解析和访问数据记录
* @author chaoc
* @since 1.0
*/
class Record(obj: ObjectNode) {
protected implicit val o: ObjectNode = obj
/**
* 数据记录中的所属 vsys
*/
val vSysID: Int = getInt(F_COMMON_VSYS_ID)
/**
* 数据记录中的字段类型
*/
val schemaType: String = getString(F_COMMON_SCHEMA_TYPE)
/**
* 数据记录中的流类型
*/
val streamDir: Int = getInt(F_COMMON_STREAM_DIR)
/**
* 数据记录中的服务端地址字段值
*/
val serverIp: String = getString(F_COMMON_SERVER_IP)
/**
* 数据记录中的服务端端口
*/
val serverPort: Int = getInt(F_COMMON_SERVER_PORT)
/**
* 数据记录中的客户端地址
*/
val clientIp: String = getString(F_COMMON_CLIENT_IP)
/**
* 数据记录中的客户端端口
*/
val clientPort: Int = getInt(F_COMMON_CLIENT_PORT)
def merge(obj: ObjectNode): ObjectNode = {
obj.fields().forEachRemaining(entry => {
o.set(entry.getKey, entry.getValue)
})
o
}
}
/**
* 用于解析和访问数据记录。
*
* @see Record
*/
object Record {
/**
* 所属 vsys 字段名
*/
val F_COMMON_VSYS_ID = "common_vsys_id"
/**
* 字段类型 字段名
*/
val F_COMMON_SCHEMA_TYPE = "common_schema_type"
/**
* 流类型 字段名
*/
val F_COMMON_STREAM_DIR = "common_stream_dir"
/**
* 服务端地址 字段名
*/
val F_COMMON_SERVER_IP = "common_server_ip"
/**
* 服务端端口 字段名
*/
val F_COMMON_SERVER_PORT = "common_server_port"
/**
* 客户端地址 字段名
*/
val F_COMMON_CLIENT_IP = "common_client_ip"
/**
* 客户端端口 字段名
*/
val F_COMMON_CLIENT_PORT = "common_client_port"
/**
* 从 ObjectNode 对象中获取整数类型字段值。
*
* @param field 字段名
* @param default 默认值(可选,默认为 0
* @param o 包含字段的 ObjectNode 对象
* @return 字段对应的整数值
*/
def getInt(field: String, default: Int = 0)(implicit o: ObjectNode): Int = {
val node = o.get(field)
if (node != null && node.isInt) {
node.asInt(default)
} else default
}
/**
* 从 ObjectNode 对象中获取字符串类型字段值。
*
* @param field 字段名
* @param default 默认值(可选,默认为 null
* @param o 包含字段的 ObjectNode 对象
* @return 字段对应的字符串值
*/
def getString(field: String, default: String = null)(implicit o: ObjectNode): String = {
val node = o.get(field)
if (node != null && node.isTextual) {
node.asText(default)
} else default
}
object Implicits {
implicit def asRecord(o: ObjectNode): Record = new Record(o)
}
}

View File

@@ -0,0 +1,80 @@
package com.zdjizhi.flink.voip.records
import com.zdjizhi.flink.voip.records.Record._
import com.zdjizhi.flink.voip.records.SIPRecord._
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import scala.language.implicitConversions
/**
* SIPSession Initiation Protocol数据记录类用于解析和访问SIP数据记录。
*
* @param obj 包含 SIP 数据记录的 ObjectNode 对象
* @constructor 创建一个 SIPRecord 对象,用于解析和访问 SIP 数据记录
* @see Record
* @author chaoc
* @since 1.0
*/
class SIPRecord(obj: ObjectNode) extends Record(obj) {
/**
* SIP 通话的会话 ID 字段值
*/
val callID: String = getString(F_CALL_ID)
/**
* SIP 通话的协调的主叫语音传输 IP 字段值
*/
val originatorSdpConnectIp: String = getString(F_ORIGINATOR_SDP_CONNECT_IP)
/**
* SIP 通话的协调的主叫语音传输端口字段值
*/
val originatorSdpMediaPort: Int = getInt(F_ORIGINATOR_SDP_MEDIA_PORT)
/**
* SIP 通话的协调的被叫语音传输 IP 字段值
*/
val responderSdpConnectIp: String = getString(F_RESPONDER_SDP_CONNECT_IP)
/**
* SIP 通话的协调的被叫语音传输端口字段值
*/
val responderSdpMediaPort: Int = getInt(F_RESPONDER_SDP_MEDIA_PORT)
}
/**
* 用于解析和访问 SIP 数据记录。
*
* @see SIPRecord
*/
object SIPRecord {
/**
* 会话ID 字段名
*/
val F_CALL_ID = "sip_call_id"
/**
* 协调的主叫语音传输IP 字段名
*/
val F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip"
/**
* 协调的主叫语音传输端口 字段名
*/
val F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port"
/**
* 协调的被叫语音传输IP 字段名
*/
val F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip"
/**
* 协调的被叫语音传输端口 字段名
*/
val F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port"
private[voip] object Implicits {
implicit def asSIPRecord(o: ObjectNode): SIPRecord = {
new SIPRecord(o)
}
}
}

View File

@@ -0,0 +1,14 @@
package com.zdjizhi.flink.voip.records
object SchemaTypes {
val SIP = "SIP"
val RTP = "RTP"
val VoIP = "VoIP"
}
object StreamDirs {
val C2S = 1
val S2C = 2
val DOUBLE = 3
}

View File

@@ -1,5 +0,0 @@
## Changelog
### 2.0
- [GAL-602](https://jira.geedge.net/browse/GAL-602) 基于 Easy Stream 框架的配置化改造。

View File

@@ -1,11 +0,0 @@
## Deploy
- 准备 JDK ${java.version} 的环境
- 准备 Flink ${flink.version} 的环境
- [下载](./download.html) 对应版本 UDF 依赖 Jar
- [下载](./download.html) 对应版本 Job 配置 (一个 yml 文件)
- 执行命令 `flink run -Dflink.rest.bind-port=8081 -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml`
- 您将在控制台看到启动日志,同时您可以在 `http://<you-host>:8081` 看到任务 UI。
> 注意:
> SIP 和 RTP 融合作业强烈建议开启 Checkpoint 机制,否则将会由于丢失数据或重复数据导致业务数据关联错误。

View File

@@ -1,102 +0,0 @@
## Download
### ${project.version}
- [TSG-23852](https://jira.geedge.net/browse/TSG-23852) 适配 VoIP Records 以及 Session Records 中 Client/Server 字段重命名。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| ${easy.stream.version} | [JAR](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.sha1) ) | [YML](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.sha1) ) |
### 2.1.1
- 当 VoIP 融合成功时RTP 将不再输出;同时如果 SIP 关联成功过 RTP 也将不再输出。
- VpIP 中的字节数,包数使用 RTP 侧的数据指标,不再累加计算。
- 将 SIP 五元组RTP 四元组中所有带有空值的数据判定为异常数据不再参与关联过程。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc2 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1.1/sip-rtp-correlation-2.1.1.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1.1/sip-rtp-correlation-2.1.1.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1.1/sip-rtp-correlation-2.1.1.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1.1/sip-rtp-correlation-2.1.1.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1.1/sip-rtp-correlation-2.1.1.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1.1/sip-rtp-correlation-2.1.1.tar.gz.sha1) ) |
### 2.1
- [TSG-23174](https://jira.geedge.net/browse/TSG-23174) 测试完成,升级为稳定版本。
- 重复数据不再进行去重操作,未关联的全部输出。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc2 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1/sip-rtp-correlation-2.1.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1/sip-rtp-correlation-2.1.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1/sip-rtp-correlation-2.1.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1/sip-rtp-correlation-2.1.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1/sip-rtp-correlation-2.1.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.1/sip-rtp-correlation-2.1.tar.gz.sha1) ) |
### 2.0-rc9
- 修复 sip 双向关联模块的数据丢失问题。
- [GAL-684](https://jira.geedge.net/browse/GAL-684) 调整融合后的字段映射,增加更加详细的监控指标。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc9/${project.artifactId}-2.0-rc9.jar) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc9/${project.artifactId}-2.0-rc9.jar.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc9/${project.artifactId}-2.0-rc9.jar.sha1) ) | [YML](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc9/${project.artifactId}-2.0-rc9.tar.gz) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc9/${project.artifactId}-2.0-rc9.tar.gz.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc9/${project.artifactId}-2.0-rc9.tar.gz.sha1) ) |
### 2.0-rc8
- 修复 Extract Key 错误,由于 IpAddress 类型数据校验导致的空指针异常。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc8/sip-rtp-correlation-2.0-rc8.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc8/sip-rtp-correlation-2.0-rc8.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc8/sip-rtp-correlation-2.0-rc8.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc8/sip-rtp-correlation-2.0-rc8.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc8/sip-rtp-correlation-2.0-rc8.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc8/sip-rtp-correlation-2.0-rc8.tar.gz.sha1) ) |
### 2.0-rc7
- VoIP Record 增加字段: SIP `sip_bye_reason` 相关字段。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc7/sip-rtp-correlation-2.0-rc7.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc7/sip-rtp-correlation-2.0-rc7.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc7/sip-rtp-correlation-2.0-rc7.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc7/sip-rtp-correlation-2.0-rc7.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc7/sip-rtp-correlation-2.0-rc7.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc7/sip-rtp-correlation-2.0-rc7.tar.gz.sha1) ) |
### 2.0-rc6
- VoIP Record 增加字段: SIP RTP `protocol` 相关字段。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc6/sip-rtp-correlation-2.0-rc6.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc6/sip-rtp-correlation-2.0-rc6.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc6/sip-rtp-correlation-2.0-rc6.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc6/sip-rtp-correlation-2.0-rc6.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc6/sip-rtp-correlation-2.0-rc6.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc6/sip-rtp-correlation-2.0-rc6.tar.gz.sha1) ) |
### 2.0-rc5
- 修复 VoIP 字段错误,使用 `decoded_as` 表示 VoIP 日志。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc5/sip-rtp-correlation-2.0-rc5.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc5/sip-rtp-correlation-2.0-rc5.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc5/sip-rtp-correlation-2.0-rc5.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc5/sip-rtp-correlation-2.0-rc5.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc5/sip-rtp-correlation-2.0-rc5.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc5/sip-rtp-correlation-2.0-rc5.tar.gz.sha1) ) |
### 2.0-rc4
- 移除对 `log_id`, `recv_time` 字段的强制非空校验, 该字段在后续步骤中处理。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc4/sip-rtp-correlation-2.0-rc4.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc4/sip-rtp-correlation-2.0-rc4.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc4/sip-rtp-correlation-2.0-rc4.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc4/sip-rtp-correlation-2.0-rc4.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc4/sip-rtp-correlation-2.0-rc4.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc4/sip-rtp-correlation-2.0-rc4.tar.gz.sha1) ) |
### 2.0-rc3
- 修复由于 State 过期策略早于 Timer 触发而导致的未关联成功的 RTP 数据未正常输出的问题。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc3/sip-rtp-correlation-2.0-rc3.jar) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc3/sip-rtp-correlation-2.0-rc3.jar.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc3/sip-rtp-correlation-2.0-rc3.jar.sha1) ) | [YML](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc3/sip-rtp-correlation-2.0-rc3.tar.gz) ( [MD5](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc3/sip-rtp-correlation-2.0-rc3.tar.gz.md5) [SHA1](http://192.168.40.153:8081/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc3/sip-rtp-correlation-2.0-rc3.tar.gz.sha1) ) |
### 2.0-rc2
- 修复由于 Flink 1.13.6 和 Flink 1.13.1 之间某些 Runtime 不兼容的问题导致的系统无法启动。
| Easy Stream | UDF Jar | Job |
|-------------| ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1.3-rc1 | [JAR](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc2/sip-rtp-correlation-2.0-rc2.jar) ( [MD5](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc2/sip-rtp-correlation-2.0-rc2.jar.md5) [SHA1](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc2/sip-rtp-correlation-2.0-rc2.jar.sha1) ) | [YML](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc2/sip-rtp-correlation-2.0-rc2.tar.gz) ( [MD5](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc2/sip-rtp-correlation-2.0-rc2.tar.gz.md5) [SHA1](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc2/sip-rtp-correlation-2.0-rc2.tar.gz.sha1) ) |
### 2.0-rc1
| Easy Stream | UDF Jar | Job |
|-------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3-rc1 | [JAR](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc1/sip-rtp-correlation-2.0-rc1.jar) ( [MD5](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc1/sip-rtp-correlation-2.0-rc1.jar.md5) [SHA1](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc1/sip-rtp-correlation-2.0-rc1.jar.sha1) ) | [YML](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc1/sip-rtp-correlation-2.0-rc1.tar.gz) ( [MD5](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc1/sip-rtp-correlation-2.0-rc1.tar.gz.md5) [SHA1](http://192.168.40.153:8099/content/repositories/platform-release/com/geedgenetworks/application/sip-rtp-correlation/2.0-rc1/sip-rtp-correlation-2.0-rc1.tar.gz.sha1) ) |

View File

@@ -1,10 +0,0 @@
## SIP RTP Correlation
SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIPSession Initiation Protocol和 RTPReal-time Transport Protocol数据将它们融合成完整的 VoIPVoice over Internet Protocol通话数据。
SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
<br/>
You can download the latest release from [Downloads](./download.html). And you can changelog from [Changelogs](./changelogs.html).

View File

@@ -1,13 +0,0 @@
#banner {
height: 108px;
background: none;
}
#bannerLeft img {
margin-left: 18px;
margin-top: 10px;
}
div.well {
display: none;
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.8 KiB

View File

@@ -1,56 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain maven-site.vm copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project name="SIP RTP Correlate">
<bannerLeft>
<name>Easy Stream</name>
<src>images/logo.png</src>
<href>#</href>
</bannerLeft>
<publishDate position="right"/>
<version position="right"/>
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
<version>1.10.0</version>
</skin>
<custom>
<fluidoSkin>
<sourceLineNumbersEnabled>true</sourceLineNumbersEnabled>
</fluidoSkin>
</custom>
<body>
<breadcrumbs position="left">
<item name="Galaxy" href="#"/>
<item name="Platform" href="#"/>
<item name="Easy Stream" href="#"/>
<item name="Application" href="#"/>
</breadcrumbs>
<menu name="OVERVIEW" inherit="top">
<item name="Introduction" href="index.html"/>
<item name="Deploy" href="deploy.html"/>
<item name="Download" href="download.html"/>
</menu>
<footer>
<![CDATA[ Copyright ©2022 <a href="#">Galaxy Platform</a>. All rights reserved.]]>
</footer>
</body>
</project>

View File

@@ -1,3 +0,0 @@
#!/bin/bash
flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=4G -Dtaskmanager.numberOfTaskSlots=1 -Dtaskmanager.memory.framework.off-heap.size=256m -Dtaskmanager.memory.jvm-metaspace.size=256m -Dtaskmanager.memory.network.max=256m -Dyarn.application.name=voip-fusion -Drest.flamegraph.enabled=true -p 1 -d -c com.geedgenetworks.flink.easy.core.Runner /home/tsg/olap/flink/topology/sip-rtp-correlation/sip-rtp-correlation-2.1.jar /home/tsg/olap/flink/topology/sip-rtp-correlation/job.yml

View File

@@ -1,33 +0,0 @@
package com.geedgenetworks.flink.easy.application;
import com.geedgenetworks.flink.easy.core.Runners;
import org.junit.jupiter.api.Test;
public class ApplicationTest {
static {
System.setProperty("easy.execute.mode", "validate");
System.setProperty("flink.rest.bind-port", "8081");
// System.setProperty("flink.rest.flamegraph.enabled", "true");
System.setProperty("flink.heartbeat.timeout", "1800000");
}
public static String discoverConfiguration(final String name) throws Exception {
var path = String.format("/jobs/%s.yml", name);
var resource = ApplicationTest.class.getResource(path);
if (resource == null) {
// maven
resource = ApplicationTest.class.getResource(String.format("../classes/%s", path));
}
if (resource == null) {
throw new IllegalArgumentException(
String.format("Not found job '%s' in path [%s].", name, path));
}
return resource.getPath();
}
@Test
public void testJob() throws Exception {
Runners.run(discoverConfiguration("job"));
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +0,0 @@
{"__timestamp":946681200,"__inputid":"tsg_olap","session_id":10240001,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.64.8","server_ip":"192.168.39.62","client_port":25524,"server_port":4580,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","start_timestamp_ms":1721639438014,"end_timestamp_ms":1721639438014,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"flags":24576,"flags_identify_info":[1,1],"fqdn_category_list":[0],"client_os_desc":"Windows","server_os_desc":"Linux","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
{"__timestamp":946681200,"__inputid":"tsg_olap","session_id":10240002,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.1","server_ip":"192.0.2.1","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","start_timestamp_ms":1721639438014,"end_timestamp_ms":1721639438014,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"client_os_desc":"Windows","server_os_desc":"Linux","sip_call_id":"NGMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
{"__timestamp":946681200,"__inputid":"tsg_olap","session_id":10240003,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.164.18","server_ip":"192.168.39.162","client_port":65121,"server_port":4670,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","start_timestamp_ms":1721639438014,"end_timestamp_ms":1721639438014,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"client_os_desc":"Windows","server_os_desc":"Linux","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174001"}
{"__timestamp":946681200,"__inputid":"tsg_olap","session_id":10240004,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.2","server_ip":"192.0.2.2","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","start_timestamp_ms":1721639438014,"end_timestamp_ms":1721639438014,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"client_os_desc":"Windows","server_os_desc":"Linux","sip_call_id":"CUMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.68.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.18.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}

View File

@@ -1,4 +0,0 @@
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240001,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.64.8","server_ip":"192.168.39.62","client_port":25524,"server_port":4580,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830004000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240002,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.1","server_ip":"192.0.2.1","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1025,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000100,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":8192,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"NGMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.85","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240003,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.164.18","server_ip":"192.168.39.162","client_port":65121,"server_port":4670,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240004,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.2","server_ip":"192.0.2.2","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"IUMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:lina@192.0.2.1>;tag=1837055d","sip_responder_description":"\"1075\"<sip:1075@192.0.2.1>","sip_originator_sdp_connect_ip":"192.168.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}

View File

@@ -0,0 +1,7 @@
source.kafka.topic=aaa
source.kafka.props.bootstrap.servers=127.0.0.1:9292
source.kafka.props.group.id=flink-voip-fusion
source.kafka.props.auto.offset.reset=earliest
sink.kafka.topic=bbb
sink.kafka.props.bootstrap.servers=127.0.0.1:9292

View File

@@ -0,0 +1,54 @@
package com.zdjizhi.flink.voip.data
import com.github.javafaker.Faker
import com.zdjizhi.flink.voip.data.Generator.random
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicReference
// 生成数据,有效数据即可以双向匹配的数据占比 [ratio/100]
abstract class Generator[T](ratio: Int) {
require(ratio >= 0 && ratio <= 100,
"Param 'ratio' is limited to 0-100 integers only.")
private lazy val state = new AtomicReference[T]()
final def next: T = {
val i = random.nextInt(100)
val v = if (i < ratio && state.get() != null) {
afterState(state.get())
} else {
state.updateAndGet(_ => this.generate)
}
state.set(null.asInstanceOf[T])
afterAll(v)
}
protected def generate: T
protected def afterState(v: T): T
protected def afterAll(t: T): T = t
}
object Generator {
val random: ThreadLocalRandom = ThreadLocalRandom.current()
private val faker = new Faker()
def nextIP: String = {
if (random.nextBoolean()) {
faker.internet().ipV4Address()
} else {
faker.internet().ipV6Address()
}
}
def nextID: String = faker.idNumber().valid()
def nextPort: Int = random.nextInt(65535)
}

View File

@@ -0,0 +1,44 @@
package com.zdjizhi.flink.voip.data
import com.zdjizhi.flink.voip.records.{Record, SIPRecord, StreamDirs}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode, TextNode}
// 生成 SIP 数据行
class SIPGenerator(ratio: Int = 40) extends Generator[ObjectNode](ratio) {
private val mapper = new ObjectMapper()
override protected def generate: ObjectNode = {
val obj = mapper.createObjectNode()
obj.set(SIPRecord.F_CALL_ID, TextNode.valueOf(Generator.nextID))
val dir = if (Generator.random.nextBoolean()) StreamDirs.S2C else StreamDirs.C2S
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(dir))
obj.set(Record.F_COMMON_SERVER_IP, TextNode.valueOf(Generator.nextIP))
obj.set(Record.F_COMMON_SERVER_PORT, IntNode.valueOf(Generator.nextPort))
obj.set(Record.F_COMMON_CLIENT_IP, TextNode.valueOf(Generator.nextIP))
obj.set(Record.F_COMMON_CLIENT_PORT, IntNode.valueOf(Generator.nextPort))
obj.set(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, TextNode.valueOf(Generator.nextIP))
obj.set(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, IntNode.valueOf(Generator.nextPort))
obj.set(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, TextNode.valueOf(Generator.nextIP))
obj.set(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, IntNode.valueOf(Generator.nextPort))
obj
}
override def afterState(v: ObjectNode): ObjectNode = {
import Record.Implicits._
v.streamDir match {
case StreamDirs.C2S =>
v.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.S2C))
case StreamDirs.S2C =>
v.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.C2S))
case _ =>
}
v
}
}

View File

@@ -0,0 +1,26 @@
package com.zdjizhi.flink.voip.functions
import org.scalatest.flatspec.AnyFlatSpec
class AddressTest extends AnyFlatSpec {
"Address.apply" should "return the Address with ports sorted and IPs reordered if necessary" in {
val address1 = ("192.168.0.1", 80)
val address2 = ("10.1.1.1", 8080)
val result1 = Address(address1, address2)
assert(result1 == Address("192.168.0.1", 80, "10.1.1.1", 8080))
val result2 = Address(address2, address1)
assert(result2 == Address("192.168.0.1", 80, "10.1.1.1", 8080))
val address3 = ("172.16.0.1", 80)
val result3 = Address(address3, address1)
assert(result3 == Address("172.16.0.1", 80, "192.168.0.1", 80))
val address4 = ("172.31.0.1", 443)
val result4 = Address(address1, address4)
assert(result4 == Address("192.168.0.1", 80, "172.31.0.1", 443))
}
}

View File

@@ -0,0 +1,73 @@
package com.zdjizhi.flink.voip.functions
import com.zdjizhi.flink.voip.conf.FusionConfigs
import com.zdjizhi.flink.voip.data.SIPGenerator
import com.zdjizhi.flink.voip.records.{Record, StreamDirs}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode}
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import java.lang.{Long => JLong}
import scala.collection.JavaConverters._
class SIPPairingFunctionTest extends AnyFlatSpec with BeforeAndAfter {
private val keySelector: KeySelector[ObjectNode, Int] = (_: ObjectNode) => 0
private var testHarness: KeyedOneInputStreamOperatorTestHarness[Int, ObjectNode, ObjectNode] = _
private val interval: JLong = Time.minutes(5).toMilliseconds
before {
val func = new SIPPairingFunction
val operator = new KeyedProcessOperator(func)
val tpe = TypeInformation.of(classOf[Int])
testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, tpe)
val config = new Configuration()
config.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, interval)
testHarness.getExecutionConfig.setGlobalJobParameters(config)
testHarness.open()
}
it should "properly pair SIP records" in {
val sipGenerator = new SIPGenerator()
val current = System.currentTimeMillis()
val obj = sipGenerator.next
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.DOUBLE))
val record = new StreamRecord(obj, current)
testHarness.processElement(record)
assert(testHarness.getOutput.asScala.toSet.contains(record))
val nextFireTime = (testHarness.getProcessingTime / interval + 1) * interval
assert(testHarness.getProcessingTimeService.getActiveTimerTimestamps.contains(nextFireTime))
val obj1 = obj.deepCopy()
obj1.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.S2C))
val obj2 = obj.deepCopy()
obj2.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.C2S))
val descriptor = new MapStateDescriptor("sip-state", classOf[Address], classOf[ObjectNodeWithExpiration])
testHarness.processElement(obj1, current)
val mapState = testHarness.getOperator.getKeyedStateStore.getMapState(descriptor)
assert(mapState.values().asScala.map(_.obj).toSet(obj1))
testHarness.processElement(obj2, current)
assert(!mapState.values().asScala.map(_.obj).toSet(obj1))
assert(testHarness.getOutput.asScala.toSet.contains(record))
assert(testHarness.getOutput.asScala.size == 2)
}
}

44
tools/dist/target.xml vendored
View File

@@ -1,44 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>bin</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources/jobs</directory>
<includes>
<include>*.yml</include>
</includes>
<fileMode>0755</fileMode>
<lineEnding>lf</lineEnding>
<directoryMode>0644</directoryMode>
<outputDirectory>./</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@@ -1,394 +0,0 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
This is a checkstyle configuration file. For descriptions of
what the following rules do, please see the checkstyle configuration
page at http://checkstyle.sourceforge.net/config.html.
-->
<module name="Checker">
<module name="RegexpSingleline">
<!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
<property name="format" value="((//.*)|(\*.*))TODO\("/>
<property name="message" value="TODO comments must not include usernames."/>
<property name="severity" value="error"/>
</module>
<module name="RegexpSingleline">
<property name="format" value="\s+$"/>
<property name="message" value="Trailing whitespace"/>
<property name="severity" value="error"/>
</module>
<module name="RegexpSingleline">
<property name="format" value="Throwables.propagate\("/>
<property name="message" value="Throwables.propagate is deprecated"/>
<property name="severity" value="error"/>
</module>
<!-- Prevent *Tests.java as tools may not pick them up -->
<module name="RegexpOnFilename">
<property name="fileNamePattern" value=".*Tests\.java$"/>
</module>
<module name="SuppressionFilter">
<property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/>
</module>
<module name="FileLength">
<property name="max" value="3000"/>
</module>
<!-- All Java AST specific tests live under TreeWalker module. -->
<module name="TreeWalker">
<!-- Allow use of comment to suppress javadocstyle -->
<module name="SuppressionCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<property name="checkFormat" value="$1"/>
</module>
<!-- Prohibit T.getT() methods for standard boxed types -->
<module name="Regexp">
<property name="format" value="Boolean\.getBoolean"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<module name="Regexp">
<property name="format" value="Integer\.getInteger"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<module name="Regexp">
<property name="format" value="Long\.getLong"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<!--
IllegalImport cannot blacklist classes so we have to fall back to Regexp.
-->
<!-- forbid use of commons lang validate -->
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
<property name="illegalPattern" value="true"/>
<property name="message"
value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
</module>
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang\."/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use commons-lang3 instead of commons-lang."/>
</module>
<module name="Regexp">
<property name="format" value="org\.codehaus\.jettison"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use flink-shaded-jackson instead of jettison."/>
</module>
<module name="Regexp">
<property name="format" value="org\.testcontainers\.shaded"/>
<property name="illegalPattern" value="true"/>
<property name="message"
value="Use utilities from appropriate library instead of org.testcontainers."/>
</module>
<!-- Enforce Java-style array declarations -->
<module name="ArrayTypeStyle"/>
<module name="TodoComment">
<!-- Checks that disallowed strings are not used in comments. -->
<property name="format" value="(FIXME)|(XXX)"/>
</module>
<!--
IMPORT CHECKS
-->
<module name="RedundantImport">
<!-- Checks for redundant import statements. -->
<property name="severity" value="error"/>
<message key="import.redundancy"
value="Redundant import {0}."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs"
value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged"/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="com.fasterxml.jackson"/>
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="org.codehaus.jackson"/>
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="org.objectweb.asm"/>
<message key="import.illegal" value="{0}; Use flink-shaded-asm instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="io.netty"/>
<message key="import.illegal" value="{0}; Use flink-shaded-netty instead."/>
</module>
<module name="RedundantModifier">
<!-- Checks for redundant modifiers on various symbol definitions.
See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
We exclude METHOD_DEF to allow final methods in final classes to make them more future-proof.
-->
<property name="tokens"
value="VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
</module>
<!--
IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
code and some useful code. So we need to fall back to Regexp.
-->
<module name="RegexpSinglelineJava">
<property name="format" value="^import com.google.common.base.Preconditions;$"/>
<property name="message" value="Static import functions from Guava Preconditions"/>
</module>
<module name="UnusedImports">
<property name="severity" value="error"/>
<property name="processJavadoc" value="true"/>
<message key="import.unused"
value="Unused import: {0}."/>
</module>
<!--
NAMING CHECKS
-->
<!-- Item 38 - Adhere to generally accepted naming conventions -->
<module name="PackageName">
<!-- Validates identifiers for package names against the
supplied expression. -->
<!-- Here the default checkstyle rule restricts package name parts to
seven characters, this is not in line with common practice at Google.
-->
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/>
<property name="severity" value="error"/>
</module>
<module name="TypeNameCheck">
<!-- Validates static, final fields against the
expression "^[A-Z][a-zA-Z0-9]*$". -->
<metadata name="altname" value="TypeName"/>
<property name="severity" value="error"/>
</module>
<module name="ConstantNameCheck">
<!-- Validates non-private, static, final fields against the supplied
public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". -->
<metadata name="altname" value="ConstantName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^([A-Z][A-Z0-9]*(_[A-Z0-9]+)*|FLAG_.*)$"/>
<message key="name.invalidPattern"
value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/>
<property name="severity" value="error"/>
</module>
<module name="StaticVariableNameCheck">
<!-- Validates static, non-final fields against the supplied
expression "^[a-z][a-zA-Z0-9]*_?$". -->
<metadata name="altname" value="StaticVariableName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/>
<property name="severity" value="error"/>
</module>
<module name="MemberNameCheck">
<!-- Validates non-static members against the supplied expression. -->
<metadata name="altname" value="MemberName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
<property name="severity" value="error"/>
</module>
<module name="MethodNameCheck">
<!-- Validates identifiers for method names. -->
<metadata name="altname" value="MethodName"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
<property name="severity" value="error"/>
</module>
<module name="ParameterName">
<!-- Validates identifiers for method parameters against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<module name="LocalFinalVariableName">
<!-- Validates identifiers for local final variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<module name="LocalVariableName">
<!-- Validates identifiers for local variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<!--
LENGTH and CODING CHECKS
-->
<!-- Checks for braces around if and else blocks -->
<module name="NeedBraces">
<property name="severity" value="error"/>
<property name="tokens"
value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/>
</module>
<module name="UpperEll">
<!-- Checks that long constants are defined with an upper ell.-->
<property name="severity" value="error"/>
</module>
<module name="FallThrough">
<!-- Warn about falling through to the next case statement. Similar to
javac -Xlint:fallthrough, but the check is suppressed if a single-line comment
on the last non-blank line preceding the fallen-into case contains 'fall through' (or
some other variants that we don't publicized to promote consistency).
-->
<property name="reliefPattern"
value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/>
<property name="severity" value="error"/>
</module>
<!-- Checks for over-complicated boolean expressions. -->
<module name="SimplifyBooleanExpression"/>
<!-- Detects empty statements (standalone ";" semicolon). -->
<module name="EmptyStatement"/>
<!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
<module name="RegexpSinglelineJava">
<property name="format" value=";{2,}"/>
<property name="message" value="Use one semicolon"/>
<property name="ignoreComments" value="true"/>
</module>
<!--
MODIFIERS CHECKS
-->
<module name="ModifierOrder">
<!-- Warn if modifier order is inconsistent with JLS3 8.1.1, 8.3.1, and
8.4.3. The prescribed order is:
public, protected, private, abstract, static, final, transient, volatile,
synchronized, native, strictfp
-->
<property name="severity" value="error"/>
</module>
<!--
WHITESPACE CHECKS
-->
<module name="EmptyLineSeparator">
<!-- Checks for empty line separator between tokens. The only
excluded token is VARIABLE_DEF, allowing class fields to
be declared on consecutive lines.
-->
<property name="allowMultipleEmptyLines" value="false"/>
<property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
<property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF,
INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
CTOR_DEF"/>
</module>
<module name="SingleSpaceSeparator"/>
<module name="WhitespaceAround">
<!-- Checks that various tokens are surrounded by whitespace.
This includes most binary operators and keywords followed
by regular or curly braces.
-->
<property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR,
BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN,
EQUAL, GE, GT, LAMBDA, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE,
LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN,
LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS,
MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION,
SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/>
<property name="severity" value="error"/>
</module>
<module name="WhitespaceAfter">
<!-- Checks that commas, semicolons and typecasts are followed by
whitespace.
-->
<property name="tokens" value="COMMA, SEMI, TYPECAST"/>
</module>
<module name="NoWhitespaceAfter">
<!-- Checks that there is no whitespace after various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS,
UNARY_PLUS"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>
<module name="NoWhitespaceBefore">
<!-- Checks that there is no whitespace before various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>
<module name="OperatorWrap">
<!-- Checks that assignment operators are at the end of the line. -->
<property name="option" value="eol"/>
<property name="tokens" value="ASSIGN"/>
</module>
<module name="ParenPad">
<!-- Checks that there is no whitespace before close parens or after
open parens.
-->
<property name="severity" value="error"/>
</module>
</module>
</module>

View File

@@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
<Match>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
<Bug pattern="REC_CATCH_EXCEPTION"/>
</Match>
<Match>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>
<Match>
<Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/>
</Match>
</FindBugsFilter>

View File

@@ -1,12 +0,0 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<!-- target directory is not relevant for checkstyle -->
<suppress
files="[\\/]target[\\/]"
checks=".*"/>
</suppressions>