[GAL-602] refactor: using the easy stream

This commit is contained in:
chaochaoc
2024-06-26 18:21:46 +08:00
parent 53c6c267e8
commit 1ff8c985c7
45 changed files with 1034 additions and 2492 deletions

View File

@@ -1,8 +1,5 @@
# Changelog
### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Feature
- 输出 SIP Record
- [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中
- [GAL-602](https://jira.geedge.net/browse/GAL-602) 基于 Easy Stream 框架的配置化改造

View File

@@ -17,27 +17,9 @@ mvn clean package
使用以下命令运行Flink任务
```shell
flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<version>.jar application.properties
flink run -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml
```
## 配置项说明
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
## 贡献
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。

496
pom.xml
View File

@@ -7,23 +7,38 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2.2</version>
<version>2.0</version>
<name>Flink : SIP-RTP : Correlation</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.13.6</flink.version>
<easy.stream.version>1.3-SNAPSHOT</easy.stream.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<jackson.version>2.13.2.20220328</jackson.version>
<junit.version>5.8.0</junit.version>
</properties>
<repositories>
<repository>
<id>central</id>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
<id>snapshots</id>
<url>http://192.168.40.153:8099/content/groups/public</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>platform-releases</id>
@@ -38,93 +53,116 @@
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-influxdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
</dependency>
<!-- Easy Stream -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-common</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<scope>test</scope>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-filter-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-pipeline</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-split-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-kafka-connector</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-json-format</artifactId>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-flink-shim</artifactId>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>runtime</scope>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>runtime</scope>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- DEV -->
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.4.2</version>
</dependency>
<!-- LOG -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -134,30 +172,43 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Test -->
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- Common -->
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
@@ -174,167 +225,229 @@
</exclusions>
</dependency>
<!-- Easy Stream-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.0</version>
<scope>test</scope>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-common</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-core</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-grouped-exec-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-filter-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-console-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-kafka-connector</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-json-format</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-flink-shim</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<suppressionsLocation>${basedir}/tools/maven/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<configLocation>${basedir}/tools/maven/checkstyle.xml</configLocation>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.40</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>test-sources</id>
<phase>generate-test-sources</phase>
<id>java-style-check</id>
<phase>compile</phase>
<goals>
<goal>add-test-source</goal>
<goal>check</goal>
</goals>
<configuration>
<sources>
<source>src/it/java</source>
</sources>
<sourceDirectories>src/main/java</sourceDirectories>
</configuration>
</execution>
<execution>
<id>test-resources</id>
<phase>generate-test-resources</phase>
<id>java-test-style-check</id>
<phase>test-compile</phase>
<goals>
<goal>add-test-resource</goal>
<goal>check</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>src/it/resources</directory>
</resource>
</resources>
<testSourceDirectories>src/test/java</testSourceDirectories>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>4.4.2.2</version>
<configuration>
<xmlOutput>true</xmlOutput>
<!-- Low, Medium, High ('Low' is strictest) -->
<threshold>Low</threshold>
<effort>default</effort>
<spotbugsXmlOutputDirectory>${project.build.directory}/spotbugs</spotbugsXmlOutputDirectory>
<excludeFilterFile>${basedir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
<failOnError>true</failOnError>
</configuration>
<executions>
<execution>
<id>findbugs-main</id>
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>findbugs-test</id>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<includeTests>true</includeTests>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-tests-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.outputDirectory}/tests-lib</outputDirectory>
<excludeScope>system</excludeScope>
<excludeTransitive>false</excludeTransitive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>pre-unit-test</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacoco.exec</destFile>
</configuration>
</execution>
<execution>
<id>test-report</id>
<phase>verify</phase>
<goals>
<goal>report</goal>
</goals>
<configuration>
<dataFile>${project.build.directory}/jacoco.exec</dataFile>
<outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<version>3.5.1</version>
<executions>
<execution>
<id>default-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<finalName>${project.artifactId}-${project.version}</finalName>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.mockito:mockito-core</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
@@ -348,45 +461,6 @@
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.7</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.3.0</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.6.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@@ -1,95 +0,0 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Objects;
// Integration test main class
public class CorrelateTest {
public static void main(String[] args) throws Exception {
final Configuration envConfig = new Configuration();
envConfig.setInteger("rest.port", 18081);
envConfig.setBoolean("rest.flamegraph.enabled", true);
envConfig.setString("metrics.reporter.influxdb.factory.class", "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory");
envConfig.setString("metrics.reporter.influxdb.scheme", "http");
envConfig.setString("metrics.reporter.influxdb.host", "127.0.0.1");
envConfig.setInteger("metrics.reporter.influxdb.port", 8086);
envConfig.setString("metrics.reporter.influxdb.db", "flinks");
envConfig.setString("metrics.reporter.influxdb.interval", "5 SECONDS");
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(envConfig);
final ParameterTool tool = ParameterTool.fromPropertiesFile(
args.length > 0 ? args[0] :
Objects.requireNonNull(
CorrelateTest.class.getResource("/application-test.properties")).getPath()
);
final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
env.enableCheckpointing(Time.minutes(1)
.toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds());
checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir"));
/* checkpointConfig.setCheckpointStorage(
Objects.requireNonNull(
CorrelateTest.class.getResource("/")).toString());*/
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
env.setParallelism(8);
final SingleOutputStreamOperator<ObjectNode> sourceStream = env.addSource(new DataGenSource())
.name("DataGenSource")
.setParallelism(4);
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
sourceStream
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(new SIPKeySelector())
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
voIpOperator.addSink(new DoNothingSink())
.name("DoNothingSink")
.setParallelism(1);
env.execute("VoIP Fusion Job");
}
}

View File

@@ -1,22 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
// Test Configs.
public class TestConfigs {
// Ratio of valuable data in the generated dataset
public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION =
ConfigOptions.key("valuable.data.ratio")
.intType()
.defaultValue(40)
.withDescription("Ratio of valuable data in the generated dataset.");
// QPS of generate date record
public static final ConfigOption<Long> DATA_GENERATE_RATE =
ConfigOptions.key("data.generate.rate")
.longType()
.defaultValue(1000L)
.withDescription("QPS of generate date record.");
}

View File

@@ -1,46 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.TestConfigs;
import com.zdjizhi.flink.voip.data.RTPGenerator;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
// Date Generate Source.
public class DataGenSource extends RichParallelSourceFunction<ObjectNode> implements FunctionHelper {
private transient SIPGenerator sipGenerator;
private transient RTPGenerator rtpGenerator;
private transient RateLimiter rateLimiter;
private volatile boolean running;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Integer ratio = getGlobalConfiguration()
.get(TestConfigs.VALUABLE_DATA_PROPORTION);
this.sipGenerator = new SIPGenerator(ratio);
this.rtpGenerator = new RTPGenerator(ratio, sipGenerator);
final Long rate = getGlobalConfiguration()
.get(TestConfigs.DATA_GENERATE_RATE);
this.rateLimiter = RateLimiter.create(
(int) (rate / getRuntimeContext().getNumberOfParallelSubtasks()));
this.running = true;
}
@Override
public void run(SourceContext<ObjectNode> ctx) throws Exception {
while (running) {
rateLimiter.acquire();
ctx.collect(sipGenerator.next());
ctx.collect(rtpGenerator.next());
}
}
@Override
public void cancel() {
this.running = false;
}
}

View File

@@ -1,43 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
// It dose nothing with the incoming data and simply collects metrics for the number of
// RTP and VoIP records processed per second.
public class DoNothingSink extends RichSinkFunction<ObjectNode> {
private transient MeterView numRTPRecordsPreSecond;
private transient MeterView numVoIPRecordsPreSecond;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext runtimeContext = getRuntimeContext();
MetricGroup metricGroup = runtimeContext.getMetricGroup();
numRTPRecordsPreSecond = metricGroup
.meter("numRTPRecordsPreSecond", new MeterView(1));
numVoIPRecordsPreSecond = metricGroup
.meter("numVoIPRecordsPreSecond", new MeterView(1));
}
@Override
public void invoke(ObjectNode obj, Context context) throws Exception {
Record record = new Record(obj);
switch (record.getSchemaType()) {
case RTP:
numRTPRecordsPreSecond.markEvent();
break;
case VOIP:
numVoIPRecordsPreSecond.markEvent();
break;
default:
}
}
}

View File

@@ -1,4 +0,0 @@
sip.state.clear.interval.minutes=1
rtp.state.clear.interval.minutes=10
valuable.data.ratio=20
data.generate.rate=1000

View File

@@ -1,25 +0,0 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.test.name = TestLogger
appender.test.type = CONSOLE
appender.test.layout.type = PatternLayout
appender.test.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

View File

@@ -0,0 +1,9 @@
import com.geedgenetworks.flink.easy.core.Runners;
public class Application {
public static void main(String[] args) throws Exception {
Runners.run("E:\\java-workspace\\sip-rtp-correlation\\feature\\easy-refactor\\src\\main\\resources\\job.yml");
}
}

View File

@@ -0,0 +1,25 @@
package com.geedgenetworks.flink.easy.application.voip;
import com.geedgenetworks.flink.easy.application.voip.udf.IsInternalIPAddress;
import com.geedgenetworks.flink.easy.application.voip.udf.IsIpAddress;
import com.geedgenetworks.flink.easy.application.voip.udf.StreamDir;
import com.geedgenetworks.flink.easy.common.api.UDFFactory;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.Map;
public class VoipUDFFactory implements UDFFactory {
private static final Map<String, UserDefinedFunction> R =
new HashMap<>() {{
put("IS_IP_ADDRESS", new IsIpAddress());
put("IS_INTERNAL_IP_ADDRESS", new IsInternalIPAddress());
put("STREAM_DIR", new StreamDir());
}};
@Override
public Map<String, UserDefinedFunction> register() {
return R;
}
}

View File

@@ -0,0 +1,17 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import static com.zdjizhi.utils.IPUtil.isIPAddress;
public class IsInternalIPAddress extends ScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (!isIPAddress(ipaddr)) {
return false;
}
return IPUtil.internalIp(ipaddr);
}
}

View File

@@ -0,0 +1,15 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class IsIpAddress extends ScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (null == ipaddr) {
return false;
}
return IPUtil.isIPAddress(ipaddr);
}
}

View File

@@ -0,0 +1,18 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class StreamDir extends ScalarFunction {
public @DataTypeHint("INT") Integer eval(Long flags) {
int v = 0;
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return v;
}
}

View File

@@ -1,108 +0,0 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/**
* The main class for running the SIP RTP Correlation application.
*
* @author chaoc
* @since 1.0
*/
public class CorrelateApp {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// param check
if (args.length < 1) {
throw new IllegalArgumentException("Error: Not found properties path. " +
"\nUsage: flink -c xxx xxx.jar app.properties.");
}
final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
final FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC),
new JsonNodeDeserializationSchema(),
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
errorHandler.filterError(sourceStream)
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(new SIPKeySelector())
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
config.get(SINK_KAFKA_TOPIC),
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute(config.get(JOB_NAME));
}
}

View File

@@ -1,107 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Containing configuration options for the Fusion application.
*
* @author chaoc
* @since 1.0
*/
public class FusionConfigs {
/**
* The prefix for Kafka properties used in the source.
*/
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
/**
* The prefix for Kafka properties used in the sink.
*/
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
/**
* Configuration prefix for the properties of the Kafka sink where the error data will be output.
*/
public static final String ERROR_SINK_KAFKA_PROPERTIES_PREFIX = "error.sink.kafka.props.";
/**
* Configuration option for the Kafka topic used in the source.
*/
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the source.");
/**
* Configuration option for the Kafka topic used in the sink.
*/
public static final ConfigOption<String> SINK_KAFKA_TOPIC =
ConfigOptions.key("sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
/**
* Configuration option to enable or disable the output of error records.
* If set to true, the error records will be sent to the specified Kafka topic.
* Default value is false.
*/
public static final ConfigOption<Boolean> ERROR_RECORDS_OUTPUT_ENABLE =
ConfigOptions.key("error.records.output.enable")
.booleanType()
.defaultValue(false)
.withDescription("Enable or disable the output of error records. " +
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether to perform data correlate for intranet addresses.
*/
public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
ConfigOptions.key("include.intranet.ip")
.booleanType()
.defaultValue(true)
.withDescription("Whether to perform data correlate for intranet addresses");
/**
* Configuration option for specifying the Kafka topic name where the error data will be sent.
* This configuration option is used when the output of error records is enabled.
*/
public static final ConfigOption<String> ERROR_SINK_KAFKA_TOPIC =
ConfigOptions.key("error.sink.kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("The Kafka topic name where the error records will be sent.");
/**
* The configuration option for the interval at which SIP state data
* should be cleared.
*/
public static final ConfigOption<Integer> SIP_STATE_CLEAR_INTERVAL =
ConfigOptions.key("sip.state.clear.interval.minutes")
.intType()
.defaultValue(1)
.withDescription("The interval at which SIP state data should be cleared.");
/**
* The configuration option for the interval at which STP state data
* should be cleared.
*/
public static final ConfigOption<Integer> RTP_STATE_CLEAR_INTERVAL =
ConfigOptions.key("rtp.state.clear.interval.minutes")
.intType()
.defaultValue(6)
.withDescription("The interval at which RTP state data should be cleared.");
/**
* Configuration option for specifying the name of a job.
*/
public static final ConfigOption<String> JOB_NAME =
ConfigOptions.key("job.name")
.stringType()
.defaultValue("correlation_sip_rtp_session")
.withDescription("The name of current job.");
}

View File

@@ -1,44 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
/**
* A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
* properties with a specific prefix. This class allows retrieving properties that start with the
* given `prefix` and converts them into a `java.util.Properties` object.
*
* @author chaoc
* @since 1.0
*/
public class FusionConfiguration {
private final Configuration config;
public FusionConfiguration(final Configuration config) {
this.config = config;
}
/**
* Retrieves properties from the underlying `Configuration` instance that start with the specified
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
*
* @param prefix The prefix to filter properties.
* @return A `java.util.Properties` object containing the properties with the specified prefix.
*/
public Properties getProperties(final String prefix) {
if (prefix == null) {
final Properties props = new Properties();
props.putAll(config.toMap());
return props;
}
return config.toMap()
.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.collect(Properties::new, (props, e) ->
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
Properties::putAll);
}
}

View File

@@ -1,177 +0,0 @@
package com.zdjizhi.flink.voip.error;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.FunctionHelper;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
*
* @author chaoc
* @since 1.0
*/
public class ErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(ErrorHandler.class);
/**
* The OutputTag for invalid records.
*/
public static final OutputTag<ObjectNode> INVALID_OUTPUT_TAG =
new OutputTag<>("invalid-records", TypeInformation.of(ObjectNode.class));
private final Configuration config;
/**
* Creates a new ErrorHandler instance with the given configuration.
*
* @param config The configuration containing settings for error handling.
*/
public ErrorHandler(final Configuration config) {
this.config = config;
}
/**
* Filters out error records from the input data stream and outputs them to a separate stream if enabled.
*
* @param dataStream The input data stream of ObjectNode records.
* @return A new DataStream containing valid records without errors.
*/
public DataStream<ObjectNode> filterError(final DataStream<ObjectNode> dataStream) {
// Process the data stream to identify meaningless addresses and ports
final SingleOutputStreamOperator<ObjectNode> operator = dataStream
.process(new MeaninglessAddressProcessFunction())
.name("MeaninglessRecords")
.uid("meaningless-records");
// If enabled, output the error records to the specified Kafka topic
if (config.get(FusionConfigs.ERROR_RECORDS_OUTPUT_ENABLE)) {
final String topic = config.get(FusionConfigs.ERROR_SINK_KAFKA_TOPIC);
LOG.info("Meaningless data output is enabled, data will be sent to: Topic [{}]", topic);
final DataStream<ObjectNode> errorStream = operator
.getSideOutput(INVALID_OUTPUT_TAG);
final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
topic,
new JsonNodeSerializationSchema(),
new FusionConfiguration(config)
.getProperties(FusionConfigs.ERROR_SINK_KAFKA_PROPERTIES_PREFIX)
);
errorStream.addSink(producer);
}
return operator;
}
}
/**
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
*/
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean includeIntranetIp;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
}
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
// and not internal network addresses.
|| cond5 && cond6
|| !cond5)) {
out.collect(obj);
} else {
// Output invalid records to the invalid output tag
ctx.output(ErrorHandler.INVALID_OUTPUT_TAG, obj);
if (LOG.isDebugEnabled()) {
LOG.debug("Meaningless record: {}", obj);
}
}
}
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
public static <T, R> R executeSafely(Function<T, R> function, T v) {
try {
return function.apply(v);
} catch (Exception e) {
return null;
}
}
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;
}
return IPUtil.isIPAddress(ipaddr);
}
private static boolean isInternalIp(final String ipaddr) {
if (!isIPAddress(ipaddr)) {
return false;
}
return IPUtil.internalIp(ipaddr);
}
}

View File

@@ -1,20 +0,0 @@
package com.zdjizhi.flink.voip.formats;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(ObjectNode jsonNodes) {
try {
return mapper.writeValueAsBytes(jsonNodes);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,54 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.google.common.collect.Lists;
import com.zdjizhi.utils.IPUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
/**
* A pojo class representing an address with two IP and port pairs.
*
* @author chaoc
* @since 1.0
*/
@Data
@AllArgsConstructor
public class Address {
// The first IP address.
private final String ip1;
//The first port number.
private final int port1;
//The second IP address.
private final String ip2;
//The second port number.
private final int port2;
/**
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
* the numeric value of the IP address.
*
* @param a1 The first address information as a tuple (IP address, port).
* @param a2 The second address information as a tuple (IP address, port).
* @return An Address instance with addresses sorted and reordered.
*/
public static Address of(Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
List<Tuple2<String, Integer>> list = Lists.newArrayList(a1, a2);
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
} else {
return a.f1.compareTo(b.f1);
}
});
return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
}
}

View File

@@ -1,32 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
/**
* An interface that provides utility functions for Flink functions.
*
* @author chaoc
* @since 1.0
*/
public interface FunctionHelper extends RichFunction {
/**
* Get the global configuration for the current Flink job.
*
* @return The global configuration as a Configuration object.
*/
default Configuration getGlobalConfiguration() {
final ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
return (Configuration) globalJobParameters;
}
default void registerNextFireTimestamp(TimerService timerService, long interval) {
long current = timerService.currentWatermark();
timerService.registerEventTimeTimer(current + interval);
}
}

View File

@@ -1,28 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Setter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A case class representing an ObjectNode with an expiration time and a pair times.
*
* @author chaoc
* @since 1.0
*/
@Data
@AllArgsConstructor
class ObjectNodeInfo {
// The ObjectNode containing data.
private ObjectNode obj;
// The pair times for the object.
@Setter
private int times;
public void incTimes() {
this.times = this.times + 1;
}
}

View File

@@ -1,31 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A KeySelector implementation for extracting a composite key from an ObjectNode representing a SIP record.
*
* @author chaoc
* @since 1.0
*/
public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> {
/**
* Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple3 containing the extracted key (VSysID, CallID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception {
final SIPRecord record = new SIPRecord(obj);
final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()),
Tuple2.of(record.getServerIp(), record.getServerPort()));
return Tuple3.of(record.getVSysID(), record.getCallID(), address);
}
}

View File

@@ -1,91 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
* SIP records are paired when they have the same addresses but opposite stream directions.
*
* @author chaoc
* @since 1.0
*/
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final ValueStateDescriptor<ObjectNode> descriptor =
new ValueStateDescriptor<>("sip-state", ObjectNode.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
descriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(ObjectNode value,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(value);
// When SIP is a one-way stream.
if (StreamDir.DOUBLE != record.getStreamDir()) {
// If the address is already stored in the mapState and has the opposite stream direction,
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
final ObjectNode obj = valueState.value();
if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) {
record.merge(obj)
.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the valueState.
valueState.update(value);
}
} else {
// If SIP is a double stream, pairing isn't required, directly output the record.
out.collect(value);
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNode value = valueState.value();
if (value != null) {
ctx.output(SIP_OUTPUT_TAG, value);
}
valueState.clear();
}
}

View File

@@ -1,45 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
*
* @author chaoc
* @since 1.0
*/
public class TypeSplitFunction extends ProcessFunction<ObjectNode, ObjectNode> {
/**
* OutputTag for SIP records.
*/
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("schema-type-sip", TypeInformation.of(ObjectNode.class));
/**
* OutputTag for RTP records.
*/
public static final OutputTag<ObjectNode> RTP_OUTPUT_TAG =
new OutputTag<>("schema-type-rtp", TypeInformation.of(ObjectNode.class));
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
switch (record.getSchemaType()) {
case RTP:
ctx.output(RTP_OUTPUT_TAG, obj);
break;
case SIP:
ctx.output(SIP_OUTPUT_TAG, obj);
break;
default:
}
}
}

View File

@@ -1,40 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A KeySelector implementation that extracts the key(VSysID) from an ObjectNode.
*
* @author chaoc
* @since 1.0
*/
public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> {
/**
* Extracts the composite key (VSysID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple2 containing the extracted key (VSysID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception {
final Record record = new Record(obj);
final Address address;
if (record.getSchemaType() == SchemaType.SIP) {
final SIPRecord sipRecord = new SIPRecord(obj);
address = Address.of(
Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()),
Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort()));
} else {
address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
Tuple2.of(record.getClientIp(), record.getClientPort()));
}
return Tuple2.of(record.getVSysID(), address);
}
}

View File

@@ -1,180 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
* RTP records, and it uses timers to trigger regular clearing of the state.
*
* @author chaoc
* @since 1.0
*/
public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>
implements FunctionHelper {
private static final int MAX_RTP_LINES = 2;
private transient Time fireInterval;
private transient ValueState<ObjectNodeInfo> sipState;
private transient MapState<StreamDir, ObjectNode> rtpState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final RuntimeContext context = getRuntimeContext();
final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
sipDescriptor.enableTimeToLive(ttlConfig);
sipState = context.getState(sipDescriptor);
final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor =
new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class);
final StateTtlConfig rtpTtlConfig = StateTtlConfig
.newBuilder(Time.minutes(minutes + 1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
rtpDescriptor.enableTimeToLive(rtpTtlConfig);
rtpState = context.getMapState(rtpDescriptor);
}
// SIP
@Override
public void processElement2(ObjectNode sipObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator();
if (rtpState.isEmpty()) {
sipState.update(new ObjectNodeInfo(sipObj, 0));
}
while (iterator.hasNext()) {
final Map.Entry<StreamDir, ObjectNode> entry = iterator.next();
final ObjectNode rtpObj = entry.getValue();
final Record rtpRecord = new Record(rtpObj);
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C:
case C2S:
ObjectNodeInfo info = sipState.value();
if (info != null) {
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
} else {
sipState.update(new ObjectNodeInfo(sipObj, info.getTimes()));
}
} else {
sipState.update(new ObjectNodeInfo(sipObj, 1));
}
break;
default:
// Double directional:
// In the context of VoIP fusion, only one RTP double directional stream
sipState.clear();
}
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
// RTP
@Override
public void processElement1(ObjectNode rtpObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record rtpRecord = new Record(rtpObj);
final ObjectNodeInfo info = sipState.value();
final StreamDir streamDir = rtpRecord.getStreamDir();
if (null != info) {
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
rtpRecord.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
switch (streamDir) {
case C2S:
case S2C:
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
}
break;
default:
// Double
sipState.clear();
}
} else {
rtpState.put(streamDir, rtpObj);
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
for (ObjectNode obj : rtpState.values()) {
final Record rtpRecord = new Record(obj);
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
out.collect(obj);
}
rtpState.clear();
sipState.clear();
}
// ======================================================================
// PRIVATE HELPER
// ======================================================================
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
return;
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
return;
}
}
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
}
}

View File

@@ -1,27 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class RTPRecord extends Record {
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
public RTPRecord(ObjectNode obj) {
super(obj);
}
@Getter
public enum OriginatorDir {
UNKNOWN(0),
C2S(1),
S2C(2);
private final int code;
OriginatorDir(int code) {
this.code = code;
}
}
}

View File

@@ -1,224 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
/**
* Record class represents a data record with various fields.
* <p>
* It provides getter and setter methods for accessing the fields of the data record.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
public static final String F_COMMON_VSYS_ID = "vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
/**
* 字段名:数据记录中的流类型的 Flags
*/
public static final String F_FLAGS = "flags";
/**
* 字段名:数据记录中的服务端地址
*/
public static final String F_COMMON_SERVER_IP = "server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
public static final String F_COMMON_SERVER_PORT = "server_port";
/**
* 字段名:数据记录中的客户端地址
*/
public static final String F_COMMON_CLIENT_IP = "client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
public static final String F_COMMON_CLIENT_PORT = "client_port";
/**
* ObjectNode data.
*/
protected final ObjectNode obj;
/**
* Get the VSys ID from the data record.
*
* @return The VSys ID as an integer.
*/
public int getVSysID() {
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
return v == 0 ? 1 : v;
}
/**
* Get the schema type from the data record.
*
* @return The schema type.
*/
public final SchemaType getSchemaType() {
return SchemaType.of(Record.getString(obj, F_COMMON_SCHEMA_TYPE));
}
/**
* Get the stream direction from the data record.
*
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
}
/**
* Get the server IP address from the data record.
*
* @return The server IP address as a string.
*/
public final String getServerIp() {
return Record.getString(obj, F_COMMON_SERVER_IP);
}
/**
* Get the server port from the data record.
*
* @return The server port as an integer.
*/
public final int getServerPort() {
return Record.getInt(obj, F_COMMON_SERVER_PORT);
}
/**
* Get the client IP address from the data record.
*
* @return The client IP address as a string.
*/
public final String getClientIp() {
return Record.getString(obj, F_COMMON_CLIENT_IP);
}
/**
* Get the client port from the data record.
*
* @return The client port as an integer.
*/
public final int getClientPort() {
return Record.getInt(obj, F_COMMON_CLIENT_PORT);
}
/**
* Set an integer value to the specified field in the data record.
*
* @param name The name of the field.
* @param value The integer value to set.
*/
public final void setInt(final String name, final int value) {
obj.set(name, IntNode.valueOf(value));
}
/**
* Set a string value to the specified field in the data record.
*
* @param name The name of the field.
* @param value The string value to set.
*/
public final void setString(final String name, final String value) {
obj.set(name, TextNode.valueOf(value));
}
/**
* Merge the fields of another ObjectNode into the current data record.
*
* @param other The ObjectNode containing the fields to be merged.
* @return This record.
*/
public final Record merge(final ObjectNode other) {
other.fields().forEachRemaining(entry -> obj.set(entry.getKey(), entry.getValue()));
return this;
}
/**
* Get an integer value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not an integer.
* @return The integer value from the field or the default value if the field is not found or is not an integer.
*/
public static int getInt(final ObjectNode obj, final String field, final int defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isInt() ? node.asInt(defaultValue) : defaultValue;
}
/**
* Get an integer value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The integer value from the field or 0 if the field is not found or is not an integer.
*/
public static int getInt(final ObjectNode obj, final String field) {
return getInt(obj, field, 0);
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a long.
* @return The long value from the field or the default value if the field is not found or is not a long.
*/
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isNumber() ? node.asLong() : defaultValue;
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The long value from the field or 0 if the field is not found or is not a long.
*/
private static long getLong(final ObjectNode obj, final String field) {
return getLong(obj, field, 0L);
}
/**
* Get a string value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a string.
* @return The string value from the field or the default value if the field is not found or is not a string.
*/
public static String getString(final ObjectNode obj, final String field, final String defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isTextual() ? node.asText(defaultValue) : defaultValue;
}
/**
* Get a string value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The string value from the field or null if the field is not found or is not a string.
*/
public static String getString(final ObjectNode obj, final String field) {
return getString(obj, field, null);
}
}

View File

@@ -1,57 +0,0 @@
package com.zdjizhi.flink.voip.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPSession Initiation Protocoldata record class, used to parse and access SIP data records.
*
* @author chaoc
* @since 1.0
*/
public class SIPRecord extends Record {
/**
* Field Name: SIP 通话的会话 ID
*/
public static final String F_CALL_ID = "sip_call_id";
/**
* Field Name: SIP 通话的协调的主叫语音传输 IP
*/
public static final String F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip";
/**
* Field Name: SIP 通话的协调的主叫语音传输端口
*/
public static final String F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port";
/**
* Field Name: SIP 通话的协调的被叫语音传输 IP
*/
public static final String F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip";
/**
* Field Name: SIP 通话的协调的被叫语音传输端口
*/
public static final String F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port";
public SIPRecord(final ObjectNode obj) {
super(obj);
}
public String getCallID() {
return Record.getString(obj, F_CALL_ID);
}
public String getOriginatorSdpConnectIp() {
return Record.getString(obj, F_ORIGINATOR_SDP_CONNECT_IP);
}
public int getOriginatorSdpMediaPort() {
return Record.getInt(obj, F_ORIGINATOR_SDP_MEDIA_PORT);
}
public String getResponderSdpConnectIp() {
return Record.getString(obj, F_RESPONDER_SDP_CONNECT_IP);
}
public int getResponderSdpMediaPort() {
return Record.getInt(obj, F_RESPONDER_SDP_MEDIA_PORT);
}
}

View File

@@ -1,51 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* The SchemaType enum represents different types of data schemas.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
@Getter
public enum SchemaType {
/**
* Represents the SIP schema type.
*/
SIP("SIP"),
/**
* Represents the RTP schema type.
*/
RTP("RTP"),
/**
* Represents the VoIP schema type.
*/
VOIP("VoIP");
/**
* The string value of the SchemaType.
*/
private final String value;
/**
* Get the SchemaType enum based on the provided string value.
*
* @param value The string value of the SchemaType to retrieve.
* @return The corresponding SchemaType enum.
* @throws IllegalArgumentException if the provided value does not match any known SchemaType.
*/
public static SchemaType of(final String value) {
for (SchemaType schemaType : values()) {
if (schemaType.value.equalsIgnoreCase(value)) {
return schemaType;
}
}
throw new IllegalArgumentException("Unknown SchemaType value '" + value + "'.");
}
}

View File

@@ -1,69 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* The StreamDir enum represents different types of data stream directions.
*
* @author chaoc
* @since 1.0
*/
@AllArgsConstructor
@Getter
public enum StreamDir {
/**
* Represents the Client-to-Server (C2S) stream direction.
*/
C2S(1),
/**
* Represents the Server-to-Client (S2C) stream direction.
*/
S2C(2),
/**
* Represents the bidirectional (double) stream direction.
*/
DOUBLE(3);
/**
* The integer value of the StreamDir.
*/
private final int value;
/**
* Get the StreamDir enum based on the provided integer value.
*
* @param value The integer value of the StreamDir to retrieve.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir of(int value) {
for (StreamDir streamDir : values()) {
if (value == streamDir.value) {
return streamDir;
}
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
/**
* Get the StreamDir enum based on the provided flags value.
*
* @param flags The flags.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir ofFlags(long flags) {
int v = 0;
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return of(v);
}
}

View File

@@ -0,0 +1 @@
com.geedgenetworks.flink.easy.application.voip.VoipUDFFactory

232
src/main/resources/job.yml Normal file
View File

@@ -0,0 +1,232 @@
job:
name: A Stream Example
parallelism: 1
active-pipeline:
- console
source:
- name: session-records
type: kafka
option:
topic: SESSION-RECORD
properties:
bootstrap.servers: 192.168.44.12:9092
group.id: easy-stream-tester9
client.id: easy-stream-tester9
format: json
schema:
- name: session_id
data-type: BIGINT NOT NULL
- name: start_timestamp_ms
data-type: BIGINT NOT NULL
# row-time:
- name: start_timestamp
for: TO_TIMESTAMP_LTZ(start_timestamp_ms, 3)
watermark: start_timestamp - INTERVAL '5' MINUTE
- name: end_timestamp_ms
data-type: BIGINT NOT NULL
- name: decoded_as
data-type: STRING NOT NULL
- name: duration_ms
data-type: BIGINT NOT NULL
- name: tcp_handshake_latency_ms
data-type: BIGINT
- name: device_id
data-type: STRING NOT NULL
- name: out_link_id
data-type: BIGINT
- name: in_link_id
data-type: BIGINT
- name: device_tag
data-type: STRING
- name: data_center
data-type: STRING
- name: device_group
data-type: STRING
- name: sled_ip
data-type: STRING
- name: address_type
data-type: INT
- name: direction
data-type: INT
- name: t_vsys_id
data-type: BIGINT
- name: flags
data-type: BIGINT
- name: flags_identify_info
data-type: STRING
## Treatment
- name: security_rule_list
data-type: ARRAY<BIGINT>
- name: security_action
data-type: STRING
- name: monitor_rule_list
data-type: ARRAY<BIGINT>
- name: shaping_rule_list
data-type: ARRAY<BIGINT>
- name: sc_rule_list
data-type: ARRAY<BIGINT>
- name: statistics_rule_list
data-type: ARRAY<BIGINT>
- name: sc_rsp_raw
data-type: ARRAY<BIGINT>
- name: sc_rsp_decrypted
data-type: ARRAY<BIGINT>
- name: proxy_rule_list
data-type: ARRAY<BIGINT>
- name: proxy_action
data-type: STRING
- name: proxy_pinning_status
data-type: INT
- name: proxy_intercept_status
data-type: INT
- name: proxy_passthrough_reason
data-type: STRING
- name: proxy_client_side_latency_ms
data-type: BIGINT
- name: proxy_server_side_latency_ms
data-type: BIGINT
- name: proxy_client_side_version
data-type: STRING
- name: proxy_server_side_version
data-type: STRING
- name: proxy_cert_verify
data-type: INT
- name: proxy_intercept_error
data-type: STRING
- name: monitor_mirrored_pkts
data-type: INT
- name: monitor_mirrored_bytes
data-type: INT
## Source
- name: client_ip
data-type: STRING
- name: client_port
data-type: INT
- name: client_os_desc
data-type: STRING
- name: client_geolocation
data-type: STRING
- name: client_country
data-type: STRING
- name: client_super_administrative_area
data-type: STRING
- name: client_administrative_area
data-type: STRING
- name: client_sub_administrative_area
data-type: STRING
- name: client_asn
data-type: BIGINT
- name: subscriber_id
data-type: STRING
- name: imei
data-type: STRING
- name: imsi
data-type: STRING
- name: phone_number
data-type: STRING
- name: apn
data-type: STRING
## Destination
- name: server_ip
data-type: STRING
- name: server_port
data-type: INT
- name: server_os_desc
data-type: STRING
- name: server_geolocation
data-type: STRING
- name: server_country
data-type: STRING
- name: server_super_administrative_area
data-type: STRING
- name: server_administrative_area
data-type: STRING
- name: server_sub_administrative_area
data-type: STRING
- name: server_asn
data-type: BIGINT
- name: server_fqdn
data-type: STRING
- name: server_domain
data-type: STRING
- name: fqdn_category_list
data-type: ARRAY<BIGINT>
## Application
- name: app_transition
data-type: STRING
- name: app
data-type: STRING
- name: app_debug_info
data-type: STRING
- name: app_content
data-type: STRING
- name: app_extra_info
data-type: STRING
## Protocol
- name: ip_protocol
data-type: STRING
- name: decoded_path
data-type: STRING
## SIP
- name: sip_call_id
data-type: STRING
- name: sip_originator_description
data-type: STRING
- name: sip_responder_description
data-type: STRING
- name: sip_user_agent
data-type: STRING
- name: sip_server
data-type: STRING
- name: sip_originator_sdp_connect_ip
data-type: STRING
- name: sip_originator_sdp_media_port
data-type: INT
- name: sip_originator_sdp_media_type
data-type: STRING
- name: sip_originator_sdp_content
data-type: STRING
- name: sip_responder_sdp_connect_ip
data-type: STRING
- name: sip_responder_sdp_media_port
data-type: INT
- name: sip_responder_sdp_media_type
data-type: STRING
- name: sip_responder_sdp_content
data-type: STRING
- name: sip_duration_s
data-type: INT
- name: sip_bye
data-type: STRING
## RTP
- name: rtp_payload_type_c2s
data-type: INT
- name: rtp_payload_type_s2c
data-type: INT
- name: rtp_pcap_path
data-type: STRING
- name: rtp_originator_dir
data-type: INT
pipeline:
- name: console
category: PRINT
on: split-for-error
- name: split-for-error
category: SPLIT
on: session-records
splits:
# Invalid stream dir
- name: error1-records
where: STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
# Invalid ip or port
- name: error2-records
where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0
# - name: split-by-protocol
# category: SPLIT
# splits:
# - name: rtp-records
# where: "decoded_as == 'RTP'"
# - name: sip-records
# where: "decoded_as == 'SIP'"

View File

@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
rootLogger.level = WARN
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender

View File

@@ -1,67 +0,0 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FusionConfigurationTest {
private FusionConfiguration fusionConfiguration;
@BeforeEach
public void setUp() {
final Configuration config;
config = new Configuration();
config.setString("prefix_key1", "value1");
config.setString("prefix_key2", "value2");
config.setString("other_key", "other_value");
fusionConfiguration = new FusionConfiguration(config);
}
@Test
public void testGetPropertiesWithValidPrefix() {
String prefix = "prefix_";
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(2, properties.size());
assertEquals("value1", properties.getProperty("key1"));
assertEquals("value2", properties.getProperty("key2"));
}
@Test
public void testGetPropertiesWithInvalidPrefix() {
String prefix = "invalid_";
Properties properties = fusionConfiguration.getProperties(prefix);
assertTrue(properties.isEmpty());
}
@Test
public void testGetPropertiesWithEmptyPrefix() {
String prefix = "";
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(3, properties.size());
assertEquals("value1", properties.getProperty("prefix_key1"));
assertEquals("value2", properties.getProperty("prefix_key2"));
assertEquals("other_value", properties.getProperty("other_key"));
}
@Test
public void testGetPropertiesWithNullPrefix() {
// Null prefix should be treated as an empty prefix
String prefix = null;
Properties properties = fusionConfiguration.getProperties(prefix);
assertEquals(3, properties.size());
assertEquals("value1", properties.getProperty("prefix_key1"));
assertEquals("value2", properties.getProperty("prefix_key2"));
assertEquals("other_value", properties.getProperty("other_key"));
}
}

View File

@@ -1,98 +0,0 @@
package com.zdjizhi.flink.voip.data;
import com.github.javafaker.Faker;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
/**
* The abstract class Generator<T> serves as the base class for implementing record generators.
* It provides common functionalities for generating random data and controlling the generation ratio.
*
* @param <T> The type of records to be generated.
* @author chaoc
* @since 1.0
*/
public abstract class Generator<T> {
protected final ThreadLocalRandom random;
/**
* The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
protected final int ratio;
private final Faker faker;
protected final AtomicReference<T> state;
protected final ObjectMapper mapper;
/**
* Creates a new Generator with the given ratio.
*
* @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio.
*/
public Generator(final int ratio) {
this.ratio = ratio;
this.faker = new Faker();
this.random = ThreadLocalRandom.current();
this.state = new AtomicReference<>();
this.mapper = new ObjectMapper();
}
/**
* Generates the next record based on the specified ratio and the current state.
* It randomly selects whether to generate a new record or return the last generated record.
*
* @return The next generated record of type T.
*/
public abstract T next();
/**
* Generates a new record of type T.
*
* @return The newly generated record of type T.
*/
protected abstract T generate();
/**
* Performs post-processing on the generated record of type T.
* Subclasses can override this method to modify the generated record before returning it.
*
* @param v The generated record of type T.
* @return The post-processed record of type T.
*/
protected abstract T afterState(T v);
/**
* Generates a random IP address (either IPv4 or IPv6) .
*
* @return A randomly generated IP address as a string.
*/
public final String nextIp() {
if (random.nextBoolean()) {
return faker.internet().ipV4Address();
}
return faker.internet().ipV6Address();
}
/**
* Generates a random ID number.
*
* @return A randomly generated ID number as a string.
*/
public final String nextId() {
return faker.idNumber().valid();
}
/**
* Generates a random port number within the range of 0 to 65535 (inclusive).
*
* @return A randomly generated port number as an integer.
*/
public final int nextPort() {
return random.nextInt(65535);
}
}

View File

@@ -1,111 +0,0 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* RTPGenerator extends Generator<ObjectNode> and is responsible for generating RTP records.
* It generates random RTP records with specific properties.
*
* @author chaoc
* @since 1.0
*/
public class RTPGenerator extends Generator<ObjectNode> {
private final SIPGenerator sipGenerator;
public RTPGenerator(int ratio, SIPGenerator sipGenerator) {
super(ratio);
this.sipGenerator = sipGenerator;
}
@Override
public ObjectNode next() {
int i = random.nextInt(100);
if (i < ratio) {
final ObjectNode node = sipGenerator.state.get();
if (null != node) {
final ObjectNode obj = generate();
obj.set(Record.F_COMMON_CLIENT_IP, node.get(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP));
obj.set(Record.F_COMMON_CLIENT_PORT, node.get(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT));
obj.set(Record.F_COMMON_SERVER_IP, node.get(SIPRecord.F_RESPONDER_SDP_CONNECT_IP));
obj.set(Record.F_COMMON_SERVER_PORT, node.get(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT));
return obj;
}
}
return generate();
}
@Override
protected ObjectNode generate() {
final String json = "{\n" +
" \"common_address_list\": \"42924-36682-172.17.201.16-172.17.200.50\",\n" +
" \"common_address_type\": 4,\n" +
" \"common_app_full_path\": \"rtp\",\n" +
" \"common_app_label\": \"rtp\",\n" +
" \"common_c2s_byte_num\": 0,\n" +
" \"common_c2s_pkt_num\": 0,\n" +
" \"common_client_ip\": \"172.17.201.16\",\n" +
" \"common_client_port\": 42924,\n" +
" \"common_con_duration_ms\": 1086,\n" +
" \"common_device_id\": \"unknown\",\n" +
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
" \"common_direction\": 73,\n" +
" \"common_end_time\": 1689295970,\n" +
" \"common_flags\": 16401,\n" +
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":5,\\\"Server is Local\\\":1,\\\"S2C\\\":1}\",\n" +
" \"common_l4_protocol\": \"IPv4_UDP\",\n" +
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
" \"common_protocol_label\": \"ETHERNET.IPv4.UDP\",\n" +
" \"common_s2c_byte_num\": 7570,\n" +
" \"common_s2c_pkt_num\": 5,\n" +
" \"common_schema_type\": \"RTP\",\n" +
" \"common_server_ip\": \"172.17.200.50\",\n" +
" \"common_server_port\": 36682,\n" +
" \"common_sessions\": 1,\n" +
" \"common_sled_ip\": \"192.168.42.54\",\n" +
" \"common_start_time\": 1689294629,\n" +
" \"common_stream_dir\": 3,\n" +
" \"common_stream_trace_id\": \"290484792956466709\",\n" +
" \"common_t_vsys_id\": 24,\n" +
" \"common_vsys_id\": 24,\n" +
" \"raw_log_status\": \"CLOSE\",\n" +
" \"rtp_payload_type_c2s\": 0,\n" +
" \"rtp_payload_type_s2c\": 0,\n" +
" \"rtp_pcap_path\": \"http://192.168.44.67:9098/hos/rtp_hos_bucket/rtp_172.17.200.50_172.17.201.16_27988_55806_1689294629.pcap\"\n" +
"}";
ObjectNode obj;
try {
obj = mapper.readValue(json, ObjectNode.class);
} catch (Exception e){
obj = mapper.createObjectNode();
}
final SIPRecord record = new SIPRecord(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
record.setString(SIPRecord.F_CALL_ID, nextId());
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, random.nextInt(3) + 1);
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (record.getStreamDir()) {
case DOUBLE:
record.setInt(Record.F_COMMON_STREAM_DIR, random.nextInt(2) + 1);
break;
case S2C:
default:
}
return v;
}
}

View File

@@ -1,138 +0,0 @@
package com.zdjizhi.flink.voip.data;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
* SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records.
* It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port,
* client IP, client port... information for both originator and responder.
*
* @author chaoc
* @since 1.0
*/
public class SIPGenerator extends Generator<ObjectNode> {
public SIPGenerator(final int ratio) {
super(ratio);
}
/**
* Creates a new SIPGenerator with the default ratio of 40.
*/
public SIPGenerator() {
this(40);
}
@Override
public final ObjectNode next() {
int i = random.nextInt(100);
if (i < ratio && state.get() != null) {
ObjectNode t = afterState(state.get());
state.set(null);
return t;
} else {
return state.updateAndGet(t -> generate());
}
}
@Override
protected ObjectNode generate() {
final String json = "{\n" +
" \"common_schema_type\": \"SIP\",\n" +
" \"common_sessions\": 1,\n" +
" \"sip_call_id\": \"3257b5c0f3d0266@spirent.com\",\n" +
" \"sip_originator_description\": \"<sip:caller@spirent.com>\",\n" +
" \"sip_responder_description\": \"sip:callee@spirent.com\",\n" +
" \"sip_originator_sdp_content\": \"v=0\\r\\no=SIP-UA 3395000 3397200 IN IP4 172.17.200.50\\r\\ns=SIP Call\\r\\nc=IN IP4 172.17.200.50\\r\\nt=0 0\\r\\nm=audio 36682 RTP/AVP 0\\r\\na=rtpmap:0 PCMU/8000\\r\\n\",\n" +
" \"sip_originator_sdp_connect_ip\": \"172.17.200.50\",\n" +
" \"sip_originator_sdp_media_port\": 36682,\n" +
" \"sip_originator_sdp_media_type\": \"0 PCMU/8000\",\n" +
" \"common_first_ttl\": 128,\n" +
" \"common_c2s_ipfrag_num\": 0,\n" +
" \"common_s2c_ipfrag_num\": 0,\n" +
" \"common_c2s_tcp_unorder_num\": 0,\n" +
" \"common_s2c_tcp_unorder_num\": 0,\n" +
" \"common_c2s_tcp_lostlen\": 0,\n" +
" \"common_s2c_tcp_lostlen\": 0,\n" +
" \"common_c2s_pkt_retrans\": 2,\n" +
" \"common_s2c_pkt_retrans\": 0,\n" +
" \"common_c2s_byte_retrans\": 1100,\n" +
" \"common_s2c_byte_retrans\": 0,\n" +
" \"common_direction\": 69,\n" +
" \"common_app_full_path\": \"sip\",\n" +
" \"common_app_label\": \"sip\",\n" +
" \"common_tcp_client_isn\": 3004427198,\n" +
" \"common_server_ip\": \"172.17.201.16\",\n" +
" \"common_client_ip\": \"172.17.200.49\",\n" +
" \"common_server_port\": 5060,\n" +
" \"common_client_port\": 6948,\n" +
" \"common_stream_dir\": 1,\n" +
" \"common_address_type\": 4,\n" +
" \"common_address_list\": \"6948-5060-172.17.200.50-172.17.201.16\",\n" +
" \"common_start_time\": 1689295655,\n" +
" \"common_end_time\": 1689295670,\n" +
" \"common_con_duration_ms\": 41467,\n" +
" \"common_s2c_pkt_num\": 0,\n" +
" \"common_s2c_byte_num\": 0,\n" +
" \"common_c2s_pkt_num\": 6,\n" +
" \"common_c2s_byte_num\": 1834,\n" +
" \"common_establish_latency_ms\": 249,\n" +
" \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
" \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
" \"common_flags\": 8201,\n" +
" \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":6,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\n" +
" \"common_protocol_label\": \"ETHERNET.IPv4.TCP\",\n" +
" \"common_stream_trace_id\": \"290502385134123507\",\n" +
" \"common_l4_protocol\": \"IPv4_TCP\",\n" +
" \"common_sled_ip\": \"192.168.42.54\",\n" +
" \"common_device_id\": \"unknown\",\n" +
" \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
" \"common_t_vsys_id\": 24,\n" +
" \"common_vsys_id\": 24\n" +
"}";
ObjectNode obj;
try {
obj = mapper.readValue(json, ObjectNode.class);
} catch (Exception e) {
obj = mapper.createObjectNode();
}
final SIPRecord record = new SIPRecord(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue());
record.setString(SIPRecord.F_CALL_ID, nextId() + "@spirent.com");
record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
record.setString(Record.F_COMMON_SERVER_IP, nextIp());
record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort());
return obj;
}
@Override
protected ObjectNode afterState(ObjectNode v) {
final Record record = new Record(v);
switch (record.getStreamDir()) {
case C2S:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
break;
case S2C:
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
break;
default:
}
return v;
}
}

View File

@@ -1,48 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class AddressTest {
@Test
public void testAddressOfWithDifferentPortNumbers() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 9090);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.10", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.20", address.getIp2());
assertEquals(9090, address.getPort2());
}
@Test
public void testAddressOfWithSamePortNumber() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 8080);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.10", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.20", address.getIp2());
assertEquals(8080, address.getPort2());
}
@Test
public void testAddressOfWithInvertedOrder() {
Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.20", 8080);
Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.10", 9090);
Address address = Address.of(a1, a2);
assertEquals("192.168.1.20", address.getIp1());
assertEquals(8080, address.getPort1());
assertEquals("192.168.1.10", address.getIp2());
assertEquals(9090, address.getPort2());
}
}

View File

@@ -1,104 +0,0 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
public class SIPPairingFunctionTest {
private static final int INTERVAL_MINUTES = 5;
private static KeyedOneInputStreamOperatorTestHarness<Integer, ObjectNode, ObjectNode> testHarness;
@BeforeAll
static void setUp() throws Exception {
final SIPPairingFunction func = new SIPPairingFunction();
final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator =
new KeyedProcessOperator<>(func);
final TypeInformation<Integer> type = TypeInformation.of(Integer.class);
final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0;
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, type);
final Configuration configuration = new Configuration();
configuration.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, INTERVAL_MINUTES);
testHarness.getExecutionConfig().setGlobalJobParameters(configuration);
testHarness.open();
}
@Test
public void testProperlyPairSIPRecords() throws Exception {
final SIPGenerator sipGenerator = new SIPGenerator();
long current = System.currentTimeMillis();
final ObjectNode obj = sipGenerator.next();
final Record record = new Record(obj);
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
final StreamRecord<ObjectNode> streamRecord = new StreamRecord<>(obj, current);
testHarness.processElement(streamRecord);
assertTrue(testHarness.getOutput().contains(streamRecord));
long interval = Time.minutes(INTERVAL_MINUTES).toMilliseconds();
long nextFireTime = (testHarness.getProcessingTime() / interval + 1) * interval;
assertTrue(testHarness.getProcessingTimeService().getActiveTimerTimestamps().contains(nextFireTime));
final ObjectNode obj1 = obj.deepCopy();
final Record record1 = new Record(obj1);
record1.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
final ObjectNode obj2 = obj.deepCopy();
final Record record2 = new Record(obj2);
record2.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
final MapStateDescriptor<Address, ObjectNodeInfo> descriptor =
new MapStateDescriptor<>(
"sip-state",
TypeInformation.of(Address.class),
TypeInformation.of(ObjectNodeInfo.class)
);
testHarness.processElement(obj1, current);
final MapState<Address, ObjectNodeInfo> mapState = testHarness.getOperator()
.getKeyedStateStore().getMapState(descriptor);
final Set<ObjectNode> objectNodes1 = Lists.newArrayList(mapState.values()).stream()
.map(ObjectNodeInfo::getObj)
.collect(Collectors.toSet());
assertTrue(objectNodes1.contains(obj1));
testHarness.processElement(obj2, current);
final Set<ObjectNode> objectNodes2 = Lists.newArrayList(mapState.values()).stream()
.map(ObjectNodeInfo::getObj)
.collect(Collectors.toSet());
assertFalse(objectNodes2.contains(obj2));
assertTrue(testHarness.getOutput().contains(new StreamRecord<>(obj2, current)));
assertEquals(2, testHarness.getOutput().size());
}
@AfterAll
static void close() throws Exception {
testHarness.close();
}
}

View File

@@ -1,70 +0,0 @@
package com.zdjizhi.flink.voip.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RecordTest {
private static ObjectMapper mapper;
@BeforeAll
static void setUp() {
mapper = new ObjectMapper();
}
@Test
void testGetVSysID() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setInt(Record.F_COMMON_VSYS_ID, 42);
assertEquals(42, record.getVSysID());
obj.set(Record.F_COMMON_VSYS_ID, IntNode.valueOf(40));
assertEquals(40, record.getVSysID());
}
@Test
void testGetSchemaType() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
assertEquals(SchemaType.RTP, record.getSchemaType());
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
assertEquals(SchemaType.VOIP, record.getSchemaType());
}
@Test
void testSetInt() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setInt("intField", 123);
assertEquals(123, obj.get("intField").intValue());
}
@Test
void testSetString() {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString("stringField", "testValue");
assertEquals("testValue", obj.get("stringField").textValue());
}
@Test
void testMerge() {
final ObjectNode obj = mapper.createObjectNode();
obj.set("field1", IntNode.valueOf(1));
ObjectNode other = mapper.createObjectNode();
other.set("field2", TextNode.valueOf("value2"));
Record record = new Record(obj);
record.merge(other);
assertEquals(1, obj.get("field1").intValue());
assertEquals("value2", obj.get("field2").textValue());
}
}

398
tools/maven/checkstyle.xml Normal file
View File

@@ -0,0 +1,398 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
This is a checkstyle configuration file. For descriptions of
what the following rules do, please see the checkstyle configuration
page at http://checkstyle.sourceforge.net/config.html.
-->
<module name="Checker">
<module name="RegexpSingleline">
<!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
<property name="format" value="((//.*)|(\*.*))TODO\("/>
<property name="message" value="TODO comments must not include usernames."/>
<property name="severity" value="error"/>
</module>
<module name="RegexpSingleline">
<property name="format" value="\s+$"/>
<property name="message" value="Trailing whitespace"/>
<property name="severity" value="error"/>
</module>
<module name="RegexpSingleline">
<property name="format" value="Throwables.propagate\("/>
<property name="message" value="Throwables.propagate is deprecated"/>
<property name="severity" value="error"/>
</module>
<!-- Prevent *Tests.java as tools may not pick them up -->
<module name="RegexpOnFilename">
<property name="fileNamePattern" value=".*Tests\.java$"/>
</module>
<module name="SuppressionFilter">
<property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/>
</module>
<module name="FileLength">
<property name="max" value="3000"/>
</module>
<!-- All Java AST specific tests live under TreeWalker module. -->
<module name="TreeWalker">
<!-- Allow use of comment to suppress javadocstyle -->
<module name="SuppressionCommentFilter">
<property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
<property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
<property name="checkFormat" value="$1"/>
</module>
<!-- Prohibit T.getT() methods for standard boxed types -->
<module name="Regexp">
<property name="format" value="Boolean\.getBoolean"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<module name="Regexp">
<property name="format" value="Integer\.getInteger"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<module name="Regexp">
<property name="format" value="Long\.getLong"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use System.getProperties() to get system properties."/>
</module>
<!--
IllegalImport cannot blacklist classes so we have to fall back to Regexp.
-->
<!-- forbid use of commons lang validate -->
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
<property name="illegalPattern" value="true"/>
<property name="message"
value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
</module>
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang\."/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use commons-lang3 instead of commons-lang."/>
</module>
<module name="Regexp">
<property name="format" value="org\.codehaus\.jettison"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use flink-shaded-jackson instead of jettison."/>
</module>
<module name="Regexp">
<property name="format" value="org\.testcontainers\.shaded"/>
<property name="illegalPattern" value="true"/>
<property name="message"
value="Use utilities from appropriate library instead of org.testcontainers."/>
</module>
<!-- Enforce Java-style array declarations -->
<module name="ArrayTypeStyle"/>
<module name="TodoComment">
<!-- Checks that disallowed strings are not used in comments. -->
<property name="format" value="(FIXME)|(XXX)"/>
</module>
<!--
IMPORT CHECKS
-->
<module name="RedundantImport">
<!-- Checks for redundant import statements. -->
<property name="severity" value="error"/>
<message key="import.redundancy"
value="Redundant import {0}."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs"
value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged"/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="com.fasterxml.jackson"/>
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="org.codehaus.jackson"/>
<message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="org.objectweb.asm"/>
<message key="import.illegal" value="{0}; Use flink-shaded-asm instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="io.netty"/>
<message key="import.illegal" value="{0}; Use flink-shaded-netty instead."/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value="com.google.common"/>
<message key="import.illegal" value="{0}; Use flink-shaded-guava instead."/>
</module>
<module name="RedundantModifier">
<!-- Checks for redundant modifiers on various symbol definitions.
See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
We exclude METHOD_DEF to allow final methods in final classes to make them more future-proof.
-->
<property name="tokens"
value="VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
</module>
<!--
IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
code and some useful code. So we need to fall back to Regexp.
-->
<module name="RegexpSinglelineJava">
<property name="format" value="^import com.google.common.base.Preconditions;$"/>
<property name="message" value="Static import functions from Guava Preconditions"/>
</module>
<module name="UnusedImports">
<property name="severity" value="error"/>
<property name="processJavadoc" value="true"/>
<message key="import.unused"
value="Unused import: {0}."/>
</module>
<!--
NAMING CHECKS
-->
<!-- Item 38 - Adhere to generally accepted naming conventions -->
<module name="PackageName">
<!-- Validates identifiers for package names against the
supplied expression. -->
<!-- Here the default checkstyle rule restricts package name parts to
seven characters, this is not in line with common practice at Google.
-->
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/>
<property name="severity" value="error"/>
</module>
<module name="TypeNameCheck">
<!-- Validates static, final fields against the
expression "^[A-Z][a-zA-Z0-9]*$". -->
<metadata name="altname" value="TypeName"/>
<property name="severity" value="error"/>
</module>
<module name="ConstantNameCheck">
<!-- Validates non-private, static, final fields against the supplied
public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". -->
<metadata name="altname" value="ConstantName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^([A-Z][A-Z0-9]*(_[A-Z0-9]+)*|FLAG_.*)$"/>
<message key="name.invalidPattern"
value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/>
<property name="severity" value="error"/>
</module>
<module name="StaticVariableNameCheck">
<!-- Validates static, non-final fields against the supplied
expression "^[a-z][a-zA-Z0-9]*_?$". -->
<metadata name="altname" value="StaticVariableName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/>
<property name="severity" value="error"/>
</module>
<module name="MemberNameCheck">
<!-- Validates non-static members against the supplied expression. -->
<metadata name="altname" value="MemberName"/>
<property name="applyToPublic" value="true"/>
<property name="applyToProtected" value="true"/>
<property name="applyToPackage" value="true"/>
<property name="applyToPrivate" value="true"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
<property name="severity" value="error"/>
</module>
<module name="MethodNameCheck">
<!-- Validates identifiers for method names. -->
<metadata name="altname" value="MethodName"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
<property name="severity" value="error"/>
</module>
<module name="ParameterName">
<!-- Validates identifiers for method parameters against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<module name="LocalFinalVariableName">
<!-- Validates identifiers for local final variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<module name="LocalVariableName">
<!-- Validates identifiers for local variables against the
expression "^[a-z][a-zA-Z0-9]*$". -->
<property name="severity" value="error"/>
</module>
<!--
LENGTH and CODING CHECKS
-->
<!-- Checks for braces around if and else blocks -->
<module name="NeedBraces">
<property name="severity" value="error"/>
<property name="tokens"
value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/>
</module>
<module name="UpperEll">
<!-- Checks that long constants are defined with an upper ell.-->
<property name="severity" value="error"/>
</module>
<module name="FallThrough">
<!-- Warn about falling through to the next case statement. Similar to
javac -Xlint:fallthrough, but the check is suppressed if a single-line comment
on the last non-blank line preceding the fallen-into case contains 'fall through' (or
some other variants that we don't publicized to promote consistency).
-->
<property name="reliefPattern"
value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/>
<property name="severity" value="error"/>
</module>
<!-- Checks for over-complicated boolean expressions. -->
<module name="SimplifyBooleanExpression"/>
<!-- Detects empty statements (standalone ";" semicolon). -->
<module name="EmptyStatement"/>
<!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
<module name="RegexpSinglelineJava">
<property name="format" value=";{2,}"/>
<property name="message" value="Use one semicolon"/>
<property name="ignoreComments" value="true"/>
</module>
<!--
MODIFIERS CHECKS
-->
<module name="ModifierOrder">
<!-- Warn if modifier order is inconsistent with JLS3 8.1.1, 8.3.1, and
8.4.3. The prescribed order is:
public, protected, private, abstract, static, final, transient, volatile,
synchronized, native, strictfp
-->
<property name="severity" value="error"/>
</module>
<!--
WHITESPACE CHECKS
-->
<module name="EmptyLineSeparator">
<!-- Checks for empty line separator between tokens. The only
excluded token is VARIABLE_DEF, allowing class fields to
be declared on consecutive lines.
-->
<property name="allowMultipleEmptyLines" value="false"/>
<property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
<property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF,
INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
CTOR_DEF"/>
</module>
<module name="SingleSpaceSeparator"/>
<module name="WhitespaceAround">
<!-- Checks that various tokens are surrounded by whitespace.
This includes most binary operators and keywords followed
by regular or curly braces.
-->
<property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR,
BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN,
EQUAL, GE, GT, LAMBDA, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE,
LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN,
LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS,
MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION,
SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/>
<property name="severity" value="error"/>
</module>
<module name="WhitespaceAfter">
<!-- Checks that commas, semicolons and typecasts are followed by
whitespace.
-->
<property name="tokens" value="COMMA, SEMI, TYPECAST"/>
</module>
<module name="NoWhitespaceAfter">
<!-- Checks that there is no whitespace after various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS,
UNARY_PLUS"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>
<module name="NoWhitespaceBefore">
<!-- Checks that there is no whitespace before various unary operators.
Linebreaks are allowed.
-->
<property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/>
<property name="allowLineBreaks" value="true"/>
<property name="severity" value="error"/>
</module>
<module name="OperatorWrap">
<!-- Checks that assignment operators are at the end of the line. -->
<property name="option" value="eol"/>
<property name="tokens" value="ASSIGN"/>
</module>
<module name="ParenPad">
<!-- Checks that there is no whitespace before close parens or after
open parens.
-->
<property name="severity" value="error"/>
</module>
</module>
</module>

View File

@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
<Match>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>
<Match>
<Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/>
</Match>
</FindBugsFilter>

View File

@@ -0,0 +1,12 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<!-- target directory is not relevant for checkstyle -->
<suppress
files="[\\/]target[\\/]"
checks=".*"/>
</suppressions>