Compare commits
15 Commits
master
...
feature/sc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9616a2400 | ||
|
|
7cb70bfd8b | ||
|
|
82cfc3274a | ||
|
|
a4cecd6e82 | ||
|
|
ef5a8e57b2 | ||
|
|
9af8db466e | ||
|
|
8d4c1dbd18 | ||
|
|
e573d88af3 | ||
|
|
f264303a6d | ||
|
|
92cd8dc614 | ||
|
|
81b3135b44 | ||
|
|
4fad35afa8 | ||
|
|
f313313d9e | ||
|
|
da50e8fcdb | ||
|
|
cdc6862913 |
35
.gitignore
vendored
Normal file
35
.gitignore
vendored
Normal file
@@ -0,0 +1,35 @@
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea/
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### Eclipse ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
|
||||
### Mac OS ###
|
||||
.DS_Store
|
||||
95
README.md
95
README.md
@@ -1,92 +1,29 @@
|
||||
# flink-voip-fusion
|
||||
|
||||
## 简介
|
||||
|
||||
VoIP 数据融合项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。
|
||||
|
||||
## Getting started
|
||||
VoIP 数据融合项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
|
||||
|
||||
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
|
||||
## 编译和打包
|
||||
|
||||
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
|
||||
|
||||
## Add your files
|
||||
|
||||
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
|
||||
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
|
||||
|
||||
```
|
||||
cd existing_repo
|
||||
git remote add origin https://git.mesalab.cn/galaxy/tsg_olap/flink-voip-fusion.git
|
||||
git branch -M main
|
||||
git push -uf origin main
|
||||
使用Maven工具对项目进行编译和打包:
|
||||
```shell
|
||||
mvn clean package
|
||||
```
|
||||
|
||||
## Integrate with your tools
|
||||
## 运行Flink任务
|
||||
|
||||
- [ ] [Set up project integrations](https://git.mesalab.cn/galaxy/tsg_olap/flink-voip-fusion/-/settings/integrations)
|
||||
使用以下命令运行Flink任务:
|
||||
```shell
|
||||
flink run -c className path/to/flink-voip-fusion-xxx.jar
|
||||
```
|
||||
|
||||
## Collaborate with your team
|
||||
## 配置项说明
|
||||
|
||||
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
|
||||
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
|
||||
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
|
||||
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
|
||||
- [ ] [Automatically merge when pipeline succeeds](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
|
||||
TODO 待补充
|
||||
|
||||
## Test and Deploy
|
||||
## 贡献
|
||||
|
||||
Use the built-in continuous integration in GitLab.
|
||||
|
||||
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html)
|
||||
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing(SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
|
||||
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
|
||||
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
|
||||
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
|
||||
|
||||
***
|
||||
|
||||
# Editing this README
|
||||
|
||||
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thank you to [makeareadme.com](https://www.makeareadme.com/) for this template.
|
||||
|
||||
## Suggestions for a good README
|
||||
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
|
||||
|
||||
## Name
|
||||
Choose a self-explaining name for your project.
|
||||
|
||||
## Description
|
||||
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
|
||||
|
||||
## Badges
|
||||
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
|
||||
|
||||
## Visuals
|
||||
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
|
||||
|
||||
## Installation
|
||||
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
|
||||
|
||||
## Usage
|
||||
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
|
||||
|
||||
## Support
|
||||
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
|
||||
|
||||
## Roadmap
|
||||
If you have ideas for releases in the future, it is a good idea to list them in the README.
|
||||
|
||||
## Contributing
|
||||
State if you are open to contributions and what your requirements are for accepting them.
|
||||
|
||||
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
|
||||
|
||||
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
|
||||
|
||||
## Authors and acknowledgment
|
||||
Show your appreciation to those who have contributed to the project.
|
||||
|
||||
## License
|
||||
For open source projects, say how it is licensed.
|
||||
|
||||
## Project status
|
||||
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.
|
||||
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。
|
||||
258
pom.xml
Normal file
258
pom.xml
Normal file
@@ -0,0 +1,258 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>flink-voip-fusion</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<name>Flink : VoIP : Fusion</name>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<java.version>11</java.version>
|
||||
<maven.compiler.source>${java.version}</maven.compiler.source>
|
||||
<maven.compiler.target>${java.version}</maven.compiler.target>
|
||||
<flink.version>1.13.6</flink.version>
|
||||
<scala.version>2.12.10</scala.version>
|
||||
<scala.binary.version>2.12</scala.binary.version>
|
||||
<slf4j.version>1.7.32</slf4j.version>
|
||||
<log4j.version>2.17.1</log4j.version>
|
||||
<jackson.version>2.13.2.20220328</jackson.version>
|
||||
<scalatest.version>3.2.15</scalatest.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-json</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.javafaker</groupId>
|
||||
<artifactId>javafaker</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
<classifier>tests</classifier>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>${slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<!-- API bridge between log4j 1 and 2 -->
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-1.2-api</artifactId>
|
||||
<version>${log4j.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-reflect</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-compiler</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<version>${scalatest.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.javafaker</groupId>
|
||||
<artifactId>javafaker</artifactId>
|
||||
<version>1.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson</groupId>
|
||||
<artifactId>jackson-bom</artifactId>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>1.1.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.1</version>
|
||||
<configuration>
|
||||
<source>${java.version}</source>
|
||||
<target>${java.version}</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>3.2.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>scala-compile</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>scala-test-compile</id>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<scalaVersion>${scala.version}</scalaVersion>
|
||||
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<artifactSet>
|
||||
<excludes>
|
||||
<exclude>org.apache.flink:force-shading</exclude>
|
||||
<exclude>com.google.code.findbugs:jsr305</exclude>
|
||||
<exclude>org.slf4j:*</exclude>
|
||||
<exclude>org.apache.logging.log4j:*</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
<filters>
|
||||
<filter>
|
||||
<!-- Do not copy the signatures in the META-INF folder.
|
||||
Otherwise, this might cause SecurityExceptions when using the JAR. -->
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
<exclude>META-INF/*.DSA</exclude>
|
||||
<exclude>META-INF/*.RSA</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
7
src/main/resources/app.properties
Normal file
7
src/main/resources/app.properties
Normal file
@@ -0,0 +1,7 @@
|
||||
source.kafka.topic=aaa
|
||||
source.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
source.kafka.props.group.id=flink-voip-fusion
|
||||
source.kafka.props.auto.offset.reset=earliest
|
||||
|
||||
sink.kafka.topic=bbb
|
||||
sink.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
52
src/main/scala/com/zdjizhi/flink/voip/FusionApp.scala
Normal file
52
src/main/scala/com/zdjizhi/flink/voip/FusionApp.scala
Normal file
@@ -0,0 +1,52 @@
|
||||
package com.zdjizhi.flink.voip
|
||||
|
||||
import com.zdjizhi.flink.voip.conf._
|
||||
import com.zdjizhi.flink.voip.functions.TypeSplitFunction._
|
||||
import com.zdjizhi.flink.voip.functions.{SIPPairingFunction, TypeSplitFunction, VoIPFusionFunction}
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
|
||||
import org.apache.flink.api.java.utils.ParameterTool
|
||||
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import org.apache.flink.streaming.api.scala._
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
|
||||
|
||||
/**
|
||||
* App Main.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
object FusionApp {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val env = StreamExecutionEnvironment.getExecutionEnvironment
|
||||
|
||||
require(args.nonEmpty, "Error: Not found properties path. \nUsage: flink -c xxx xxx.jar app.properties.")
|
||||
|
||||
val tool = ParameterTool.fromPropertiesFile(args.head)
|
||||
|
||||
env.getConfig.setGlobalJobParameters(tool.getConfiguration)
|
||||
|
||||
val consumer = new FlinkKafkaConsumer[ObjectNode](
|
||||
tool.getConfiguration.get(FusionConfigs.SOURCE_KAFKA_TOPIC),
|
||||
new JsonNodeDeserializationSchema,
|
||||
tool.getProperties(FusionConfigs.SOURCE_KAFKA_PROPERTIES_PREFIX))
|
||||
|
||||
val schemaTypeSplitStream = env.addSource(consumer)
|
||||
.process(new TypeSplitFunction)
|
||||
|
||||
val sipStream = schemaTypeSplitStream.getSideOutput(sipSchemaTypeOutputTag)
|
||||
|
||||
val sipDoubleStream = sipStream.keyBy { i => (i.vSysID, i.callID) }
|
||||
.process(new SIPPairingFunction)
|
||||
|
||||
val rtpStream = schemaTypeSplitStream.getSideOutput(rtpSchemaTypeOutputTag)
|
||||
|
||||
rtpStream.keyBy(_.vSysID).connect(sipDoubleStream.keyBy(_.vSysID))
|
||||
.process(new VoIPFusionFunction)
|
||||
|
||||
env.execute("VoIP Fusion Job")
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.zdjizhi.flink.voip.conf
|
||||
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
|
||||
|
||||
import java.lang.{Long => JLong}
|
||||
|
||||
/**
|
||||
* An object containing configuration options for the Fusion application.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
object FusionConfigs {
|
||||
|
||||
val SOURCE_KAFKA_TOPIC: ConfigOption[String] =
|
||||
ConfigOptions.key("source.kafka.topic")
|
||||
.stringType()
|
||||
.noDefaultValue()
|
||||
.withDescription("")
|
||||
|
||||
val SINK_KAFKA_TOPIC: ConfigOption[String] =
|
||||
ConfigOptions.key("sink.kafka.topic")
|
||||
.stringType()
|
||||
.noDefaultValue()
|
||||
.withDescription("")
|
||||
|
||||
val SOURCE_KAFKA_PROPERTIES_PREFIX: String = "source.kafka.props."
|
||||
|
||||
val SINK_KAFKA_PROPERTIES_PREFIX: String = "sink.kafka.props."
|
||||
|
||||
/**
|
||||
* The configuration option for the interval at which SIP (Session Initiation Protocol) state data
|
||||
* should be cleared.
|
||||
*/
|
||||
val SIP_STATE_CLEAR_INTERVAL: ConfigOption[JLong] =
|
||||
ConfigOptions.key("")
|
||||
.longType()
|
||||
.defaultValue(Time.minutes(1).toMilliseconds)
|
||||
.withDescription("")
|
||||
|
||||
val RTP_STATE_CLEAR_INTERVAL: ConfigOption[JLong] =
|
||||
ConfigOptions.key("")
|
||||
.longType()
|
||||
.defaultValue(Time.minutes(3).toMilliseconds)
|
||||
.withDescription("")
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.zdjizhi.flink.voip.conf
|
||||
|
||||
import org.apache.flink.api.java.utils.ParameterTool
|
||||
|
||||
import java.util.Properties
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* A wrapper class that extends the Flink `ParameterTool` to provide utility methods for handling
|
||||
* properties with a specific prefix. This class allows retrieving properties that start with the
|
||||
* given `prefix` and converts them into a `java.util.Properties` object.
|
||||
*
|
||||
* @param tool The original Flink `ParameterTool` instance.
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class FusionParameterTool(tool: ParameterTool) {
|
||||
|
||||
/**
|
||||
* Retrieves properties from the underlying `ParameterTool` instance that start with the specified
|
||||
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
|
||||
*
|
||||
* @param prefix The prefix to filter properties.
|
||||
* @return A `java.util.Properties` object containing the properties with the specified prefix.
|
||||
*/
|
||||
def getProperties(prefix: String): Properties = {
|
||||
val map = tool.toMap.asScala.filterKeys(_.startsWith(prefix))
|
||||
.map { case (key, value) => (key.stripPrefix(prefix), value) }
|
||||
ParameterTool.fromMap(map.asJava).getProperties
|
||||
}
|
||||
|
||||
}
|
||||
11
src/main/scala/com/zdjizhi/flink/voip/conf/package.scala
Normal file
11
src/main/scala/com/zdjizhi/flink/voip/conf/package.scala
Normal file
@@ -0,0 +1,11 @@
|
||||
package com.zdjizhi.flink.voip
|
||||
|
||||
import org.apache.flink.api.java.utils.ParameterTool
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
package object conf {
|
||||
|
||||
implicit def asFusionParameterTool(tool: ParameterTool): FusionParameterTool = new FusionParameterTool(tool)
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import org.apache.flink.api.common.functions.RichFunction
|
||||
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.configuration.Configuration
|
||||
import org.apache.flink.streaming.api.TimerService
|
||||
|
||||
|
||||
/**
|
||||
* A trait that provides utility functions for Flink functions.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
trait FunctionHelper extends RichFunction {
|
||||
|
||||
/**
|
||||
* Get the global configuration for the current Flink job.
|
||||
*
|
||||
* @return The global configuration as a Configuration object.
|
||||
*/
|
||||
def getGlobalConfiguration: Configuration = getRuntimeContext.getExecutionConfig
|
||||
.getGlobalJobParameters.asInstanceOf[Configuration]
|
||||
|
||||
/**
|
||||
* Get a MapState with the given name, key, and value types.
|
||||
*
|
||||
* @param name The name of the MapState.
|
||||
* @param key The class representing the type of keys in the MapState.
|
||||
* @param value The class representing the type of values in the MapState.
|
||||
* @tparam K The type of keys in the MapState.
|
||||
* @tparam V The type of values in the MapState.
|
||||
* @return The MapState with the given name, key, and value types.
|
||||
*/
|
||||
def getMapState[K, V](name: String, key: Class[K], value: Class[V]): MapState[K, V] = {
|
||||
val descriptor = new MapStateDescriptor[K, V](name, key, value)
|
||||
getRuntimeContext.getMapState(descriptor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the next fire timestamp for the given TimerService and fire interval.
|
||||
*
|
||||
* @param timeService The TimerService used to register the timer.
|
||||
* @param fireInterval The interval at which the timer should fire, represented as a Time object.
|
||||
*/
|
||||
def registerNextFireTimestamp(timeService: TimerService, fireInterval: Time): Unit = {
|
||||
val currentTime = timeService.currentProcessingTime()
|
||||
val nextFireTime = (currentTime / fireInterval.toMilliseconds + 1) * fireInterval.toMilliseconds
|
||||
timeService.registerProcessingTimeTimer(nextFireTime)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs.SIP_STATE_CLEAR_INTERVAL
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
|
||||
import com.zdjizhi.flink.voip.records.{Record, StreamDirs}
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode}
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
|
||||
import org.apache.flink.util.Collector
|
||||
|
||||
/**
|
||||
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
||||
* SIP records are paired when they have the same addresses but opposite stream directions.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class SIPPairingFunction extends KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]
|
||||
with FunctionHelper {
|
||||
|
||||
private lazy val fireInterval: Time = Time.milliseconds(getGlobalConfiguration.get(SIP_STATE_CLEAR_INTERVAL))
|
||||
// A MapState to store SIP records with their addresses and expiration time.
|
||||
private lazy val mapState = getMapState("sip-state", classOf[Address], classOf[ObjectNodeWithExpiration])
|
||||
|
||||
override def processElement(obj: ObjectNode,
|
||||
ctx: KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
// Register a timer for the next clearing interval.
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
|
||||
// When SIP is a one-way stream.
|
||||
if (obj.streamDir != StreamDirs.DOUBLE) {
|
||||
// Create an Address instance based on server and client IPs and ports.
|
||||
val address = Address((obj.serverIp, obj.serverPort), (obj.clientIp, obj.clientPort))
|
||||
// If the address is already stored in the mapState and has the opposite stream direction,
|
||||
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
|
||||
if (mapState.contains(address) && mapState.get(address).obj.streamDir != obj.streamDir /* TODO consider stream direction */ ) {
|
||||
obj.merge(mapState.get(address).obj)
|
||||
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.DOUBLE))
|
||||
out.collect(obj)
|
||||
mapState.remove(address)
|
||||
} else {
|
||||
// If the address is not yet in the mapState, add it with its expiration time.
|
||||
val value = ObjectNodeWithExpiration(obj,
|
||||
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds)
|
||||
mapState.put(address, value)
|
||||
}
|
||||
} else {
|
||||
// If SIP is a double stream, pairing isn't required, directly output the record.
|
||||
out.collect(obj)
|
||||
}
|
||||
}
|
||||
|
||||
override def onTimer(timestamp: Long,
|
||||
ctx: KeyedProcessFunction[(Int, String), ObjectNode, ObjectNode]#OnTimerContext,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
|
||||
val iterator = mapState.entries().iterator()
|
||||
while (iterator.hasNext) {
|
||||
val entry = iterator.next()
|
||||
// Remove expired entries from the mapState based on their expiration time.
|
||||
if (entry.getValue.expireTime <= timestamp) {
|
||||
mapState.remove(entry.getKey)
|
||||
}
|
||||
}
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
|
||||
import com.zdjizhi.flink.voip.functions.TypeSplitFunction._
|
||||
import com.zdjizhi.flink.voip.records.Record.Implicits._
|
||||
import com.zdjizhi.flink.voip.records.SchemaTypes._
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction
|
||||
import org.apache.flink.streaming.api.scala._
|
||||
import org.apache.flink.util.Collector
|
||||
|
||||
/**
|
||||
* A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
|
||||
* It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class TypeSplitFunction extends ProcessFunction[ObjectNode, ObjectNode] {
|
||||
|
||||
override def processElement(obj: ObjectNode,
|
||||
ctx: ProcessFunction[ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
// Split record based on its 'schemaType' field.
|
||||
obj.schemaType match {
|
||||
case SIP => ctx.output(sipSchemaTypeOutputTag, obj)
|
||||
case RTP => ctx.output(rtpSchemaTypeOutputTag, obj)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Defining the OutputTags for SIP and RTP records.
|
||||
*/
|
||||
object TypeSplitFunction {
|
||||
|
||||
/**
|
||||
* OutputTag for SIP records.
|
||||
*/
|
||||
val sipSchemaTypeOutputTag: OutputTag[ObjectNode] =
|
||||
OutputTag[ObjectNode]("schema-type-sip")
|
||||
|
||||
/**
|
||||
* OutputTag for RTP records.
|
||||
*/
|
||||
val rtpSchemaTypeOutputTag: OutputTag[ObjectNode] =
|
||||
OutputTag[ObjectNode]("schema-type-rtp")
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs.RTP_STATE_CLEAR_INTERVAL
|
||||
import com.zdjizhi.flink.voip.records.{Record, SchemaTypes, StreamDirs}
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ObjectNode, TextNode}
|
||||
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
|
||||
import org.apache.flink.util.Collector
|
||||
|
||||
/**
|
||||
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
|
||||
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
|
||||
* and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
|
||||
* RTP records, and it uses timers to trigger regular clearing of the state.
|
||||
*
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class VoIPFusionFunction extends KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode] with FunctionHelper {
|
||||
|
||||
// The maximum number of RTP lines allowed per SIP for fusion.
|
||||
private val MAX_RTP_LINES: Int = 2
|
||||
private lazy val fireInterval: Time = Time.milliseconds(getGlobalConfiguration.get(RTP_STATE_CLEAR_INTERVAL))
|
||||
private lazy val sipDoubleState = getMapState("sip-state", classOf[Address], classOf[ObjectNodeWithInfo])
|
||||
private lazy val rtpState = getMapState("rtp-state", classOf[Address], classOf[ObjectNodeWithExpiration])
|
||||
|
||||
override def processElement1(obj: ObjectNode,
|
||||
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord.Implicits._
|
||||
val address = Address((obj.originatorSdpConnectIp, obj.originatorSdpMediaPort),
|
||||
(obj.responderSdpConnectIp, obj.responderSdpMediaPort))
|
||||
|
||||
sipDoubleState.put(address, ObjectNodeWithInfo(obj,
|
||||
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds, 0))
|
||||
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
|
||||
override def processElement2(obj: ObjectNode,
|
||||
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#Context,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
import com.zdjizhi.flink.voip.records.Record.Implicits._
|
||||
val address = Address((obj.serverIp, obj.serverPort), (obj.clientIp, obj.clientPort))
|
||||
if (sipDoubleState.contains(address)) {
|
||||
val info = sipDoubleState.get(address)
|
||||
obj.merge(info.obj)
|
||||
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaTypes.VoIP))
|
||||
out.collect(obj)
|
||||
|
||||
obj.streamDir match {
|
||||
case StreamDirs.DOUBLE =>
|
||||
// In the context of VoIP fusion, only one RTP double directional stream
|
||||
sipDoubleState.remove(address)
|
||||
case _ =>
|
||||
// Save the number of fused RTP unidirectional streams
|
||||
sipDoubleState.put(address, info.copy(times = info.times + 1))
|
||||
}
|
||||
} else {
|
||||
rtpState.put(address, ObjectNodeWithExpiration(obj,
|
||||
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds))
|
||||
}
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
|
||||
override def onTimer(timestamp: Long,
|
||||
ctx: KeyedCoProcessFunction[Int, ObjectNode, ObjectNode, ObjectNode]#OnTimerContext,
|
||||
out: Collector[ObjectNode]): Unit = {
|
||||
import com.zdjizhi.flink.voip.records.Record.Implicits._
|
||||
val iterator = rtpState.iterator()
|
||||
while (iterator.hasNext) {
|
||||
val entry = iterator.next()
|
||||
val obj = entry.getValue.obj
|
||||
val address = entry.getKey
|
||||
|
||||
if (sipDoubleState.contains(address)) {
|
||||
val info = sipDoubleState.get(address)
|
||||
obj.streamDir match {
|
||||
case StreamDirs.DOUBLE =>
|
||||
sipDoubleState.remove(address)
|
||||
case _ if info.times >= MAX_RTP_LINES - 1 =>
|
||||
// One RTP unidirectional stream has already been fused
|
||||
sipDoubleState.remove(address)
|
||||
}
|
||||
obj.merge(info.obj)
|
||||
.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaTypes.VoIP))
|
||||
out.collect(obj)
|
||||
}
|
||||
|
||||
if (entry.getValue.expireTime <= timestamp) {
|
||||
rtpState.remove(entry.getKey)
|
||||
}
|
||||
}
|
||||
sipDoubleState.iterator().forEachRemaining(entry => {
|
||||
if (entry.getValue.expireTime <= timestamp) {
|
||||
sipDoubleState.remove(entry.getKey)
|
||||
}
|
||||
})
|
||||
registerNextFireTimestamp(ctx.timerService(), fireInterval)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.utils.IPUtil
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
/**
|
||||
* A case class representing an address with two IP and port pairs.
|
||||
*
|
||||
* @param ip1 The first IP address.
|
||||
* @param port1 The first port number.
|
||||
* @param ip2 The second IP address.
|
||||
* @param port2 The second port number.
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
case class Address(ip1: String, port1: Int, ip2: String, port2: Int)
|
||||
|
||||
|
||||
object Address {
|
||||
|
||||
/**
|
||||
* Creates an Address instance based on two tuples containing (String, Int) representing address information.
|
||||
* The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
|
||||
* the numeric value of the IP address.
|
||||
*
|
||||
* @param address1 The first address information as a tuple (IP address, port).
|
||||
* @param address2 The second address information as a tuple (IP address, port).
|
||||
* @return An Address instance with addresses sorted and reordered.
|
||||
*/
|
||||
def apply(address1: (String, Int), address2: (String, Int)): Address = {
|
||||
val seq = (address1 :: address2 :: Nil).sortWith {
|
||||
case (a, b) =>
|
||||
if (a._2 == b._2) {
|
||||
IPUtil.getIpDesimal(a._1) < IPUtil.getIpDesimal(b._1)
|
||||
} else {
|
||||
a._2 < b._2
|
||||
}
|
||||
}
|
||||
// Create an Address instance with the first address having the smaller port number,
|
||||
// and if the ports are equal, the smaller IP address comes first.
|
||||
Address(seq.head._1, seq.head._2, seq.last._1, seq.last._2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A case class representing an ObjectNode with an expiration time.
|
||||
*
|
||||
* @param obj The ObjectNode containing data.
|
||||
* @param expireTime The expiration time for the object.
|
||||
*/
|
||||
case class ObjectNodeWithExpiration(obj: ObjectNode, expireTime: Long)
|
||||
|
||||
/**
|
||||
* A case class representing an ObjectNode with an expiration time and a pair times.
|
||||
*
|
||||
* @param obj The ObjectNode containing data.
|
||||
* @param expireTime The expiration time for the object.
|
||||
* @param times The pair times for the object.
|
||||
*/
|
||||
case class ObjectNodeWithInfo(obj: ObjectNode, expireTime: Long, times: Int)
|
||||
137
src/main/scala/com/zdjizhi/flink/voip/records/Record.scala
Normal file
137
src/main/scala/com/zdjizhi/flink/voip/records/Record.scala
Normal file
@@ -0,0 +1,137 @@
|
||||
package com.zdjizhi.flink.voip.records
|
||||
|
||||
import com.zdjizhi.flink.voip.records.Record._
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* 用于解析和访问数据记录的类。
|
||||
*
|
||||
* @param obj 包含数据记录的 ObjectNode 对象
|
||||
* @constructor 创建一个 Record 对象,用于解析和访问数据记录
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class Record(obj: ObjectNode) {
|
||||
protected implicit val o: ObjectNode = obj
|
||||
/**
|
||||
* 数据记录中的所属 vsys
|
||||
*/
|
||||
val vSysID: Int = getInt(F_COMMON_VSYS_ID)
|
||||
|
||||
/**
|
||||
* 数据记录中的字段类型
|
||||
*/
|
||||
val schemaType: String = getString(F_COMMON_SCHEMA_TYPE)
|
||||
|
||||
/**
|
||||
* 数据记录中的流类型
|
||||
*/
|
||||
val streamDir: Int = getInt(F_COMMON_STREAM_DIR)
|
||||
|
||||
/**
|
||||
* 数据记录中的服务端地址字段值
|
||||
*/
|
||||
val serverIp: String = getString(F_COMMON_SERVER_IP)
|
||||
|
||||
/**
|
||||
* 数据记录中的服务端端口
|
||||
*/
|
||||
val serverPort: Int = getInt(F_COMMON_SERVER_PORT)
|
||||
|
||||
/**
|
||||
* 数据记录中的客户端地址
|
||||
*/
|
||||
val clientIp: String = getString(F_COMMON_CLIENT_IP)
|
||||
|
||||
/**
|
||||
* 数据记录中的客户端端口
|
||||
*/
|
||||
val clientPort: Int = getInt(F_COMMON_CLIENT_PORT)
|
||||
|
||||
def merge(obj: ObjectNode): ObjectNode = {
|
||||
obj.fields().forEachRemaining(entry => {
|
||||
o.set(entry.getKey, entry.getValue)
|
||||
})
|
||||
o
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 用于解析和访问数据记录。
|
||||
*
|
||||
* @see Record
|
||||
*/
|
||||
object Record {
|
||||
|
||||
/**
|
||||
* 所属 vsys 字段名
|
||||
*/
|
||||
val F_COMMON_VSYS_ID = "common_vsys_id"
|
||||
|
||||
/**
|
||||
* 字段类型 字段名
|
||||
*/
|
||||
val F_COMMON_SCHEMA_TYPE = "common_schema_type"
|
||||
|
||||
/**
|
||||
* 流类型 字段名
|
||||
*/
|
||||
val F_COMMON_STREAM_DIR = "common_stream_dir"
|
||||
|
||||
/**
|
||||
* 服务端地址 字段名
|
||||
*/
|
||||
val F_COMMON_SERVER_IP = "common_server_ip"
|
||||
|
||||
/**
|
||||
* 服务端端口 字段名
|
||||
*/
|
||||
val F_COMMON_SERVER_PORT = "common_server_port"
|
||||
|
||||
/**
|
||||
* 客户端地址 字段名
|
||||
*/
|
||||
val F_COMMON_CLIENT_IP = "common_client_ip"
|
||||
|
||||
/**
|
||||
* 客户端端口 字段名
|
||||
*/
|
||||
val F_COMMON_CLIENT_PORT = "common_client_port"
|
||||
|
||||
/**
|
||||
* 从 ObjectNode 对象中获取整数类型字段值。
|
||||
*
|
||||
* @param field 字段名
|
||||
* @param default 默认值(可选,默认为 0)
|
||||
* @param o 包含字段的 ObjectNode 对象
|
||||
* @return 字段对应的整数值
|
||||
*/
|
||||
def getInt(field: String, default: Int = 0)(implicit o: ObjectNode): Int = {
|
||||
val node = o.get(field)
|
||||
if (node != null && node.isInt) {
|
||||
node.asInt(default)
|
||||
} else default
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 ObjectNode 对象中获取字符串类型字段值。
|
||||
*
|
||||
* @param field 字段名
|
||||
* @param default 默认值(可选,默认为 null)
|
||||
* @param o 包含字段的 ObjectNode 对象
|
||||
* @return 字段对应的字符串值
|
||||
*/
|
||||
def getString(field: String, default: String = null)(implicit o: ObjectNode): String = {
|
||||
val node = o.get(field)
|
||||
if (node != null && node.isTextual) {
|
||||
node.asText(default)
|
||||
} else default
|
||||
}
|
||||
|
||||
object Implicits {
|
||||
implicit def asRecord(o: ObjectNode): Record = new Record(o)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package com.zdjizhi.flink.voip.records
|
||||
|
||||
import com.zdjizhi.flink.voip.records.Record._
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord._
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* SIP(Session Initiation Protocol)数据记录类,用于解析和访问SIP数据记录。
|
||||
*
|
||||
* @param obj 包含 SIP 数据记录的 ObjectNode 对象
|
||||
* @constructor 创建一个 SIPRecord 对象,用于解析和访问 SIP 数据记录
|
||||
* @see Record
|
||||
* @author chaoc
|
||||
* @since 1.0
|
||||
*/
|
||||
class SIPRecord(obj: ObjectNode) extends Record(obj) {
|
||||
|
||||
/**
|
||||
* SIP 通话的会话 ID 字段值
|
||||
*/
|
||||
val callID: String = getString(F_CALL_ID)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的主叫语音传输 IP 字段值
|
||||
*/
|
||||
val originatorSdpConnectIp: String = getString(F_ORIGINATOR_SDP_CONNECT_IP)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的主叫语音传输端口字段值
|
||||
*/
|
||||
val originatorSdpMediaPort: Int = getInt(F_ORIGINATOR_SDP_MEDIA_PORT)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的被叫语音传输 IP 字段值
|
||||
*/
|
||||
val responderSdpConnectIp: String = getString(F_RESPONDER_SDP_CONNECT_IP)
|
||||
|
||||
/**
|
||||
* SIP 通话的协调的被叫语音传输端口字段值
|
||||
*/
|
||||
val responderSdpMediaPort: Int = getInt(F_RESPONDER_SDP_MEDIA_PORT)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 用于解析和访问 SIP 数据记录。
|
||||
*
|
||||
* @see SIPRecord
|
||||
*/
|
||||
object SIPRecord {
|
||||
|
||||
/**
|
||||
* 会话ID 字段名
|
||||
*/
|
||||
val F_CALL_ID = "sip_call_id"
|
||||
/**
|
||||
* 协调的主叫语音传输IP 字段名
|
||||
*/
|
||||
val F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip"
|
||||
/**
|
||||
* 协调的主叫语音传输端口 字段名
|
||||
*/
|
||||
val F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port"
|
||||
/**
|
||||
* 协调的被叫语音传输IP 字段名
|
||||
*/
|
||||
val F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip"
|
||||
/**
|
||||
* 协调的被叫语音传输端口 字段名
|
||||
*/
|
||||
val F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port"
|
||||
|
||||
private[voip] object Implicits {
|
||||
implicit def asSIPRecord(o: ObjectNode): SIPRecord = {
|
||||
new SIPRecord(o)
|
||||
}
|
||||
}
|
||||
}
|
||||
14
src/main/scala/com/zdjizhi/flink/voip/records/enums.scala
Normal file
14
src/main/scala/com/zdjizhi/flink/voip/records/enums.scala
Normal file
@@ -0,0 +1,14 @@
|
||||
package com.zdjizhi.flink.voip.records
|
||||
|
||||
object SchemaTypes {
|
||||
|
||||
val SIP = "SIP"
|
||||
val RTP = "RTP"
|
||||
val VoIP = "VoIP"
|
||||
}
|
||||
|
||||
object StreamDirs {
|
||||
val C2S = 1
|
||||
val S2C = 2
|
||||
val DOUBLE = 3
|
||||
}
|
||||
25
src/test/resources/log4j2.properties
Normal file
25
src/test/resources/log4j2.properties
Normal file
@@ -0,0 +1,25 @@
|
||||
################################################################################
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
################################################################################
|
||||
|
||||
rootLogger.level = INFO
|
||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||
|
||||
appender.console.name = ConsoleAppender
|
||||
appender.console.type = CONSOLE
|
||||
appender.console.layout.type = PatternLayout
|
||||
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
|
||||
7
src/test/resources/test.properties
Normal file
7
src/test/resources/test.properties
Normal file
@@ -0,0 +1,7 @@
|
||||
source.kafka.topic=aaa
|
||||
source.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
source.kafka.props.group.id=flink-voip-fusion
|
||||
source.kafka.props.auto.offset.reset=earliest
|
||||
|
||||
sink.kafka.topic=bbb
|
||||
sink.kafka.props.bootstrap.servers=127.0.0.1:9292
|
||||
54
src/test/scala/com/zdjizhi/flink/voip/data/Generator.scala
Normal file
54
src/test/scala/com/zdjizhi/flink/voip/data/Generator.scala
Normal file
@@ -0,0 +1,54 @@
|
||||
package com.zdjizhi.flink.voip.data
|
||||
|
||||
import com.github.javafaker.Faker
|
||||
import com.zdjizhi.flink.voip.data.Generator.random
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
// 生成数据,有效数据即可以双向匹配的数据占比 [ratio/100]
|
||||
abstract class Generator[T](ratio: Int) {
|
||||
|
||||
require(ratio >= 0 && ratio <= 100,
|
||||
"Param 'ratio' is limited to 0-100 integers only.")
|
||||
|
||||
private lazy val state = new AtomicReference[T]()
|
||||
|
||||
final def next: T = {
|
||||
val i = random.nextInt(100)
|
||||
val v = if (i < ratio && state.get() != null) {
|
||||
afterState(state.get())
|
||||
} else {
|
||||
state.updateAndGet(_ => this.generate)
|
||||
}
|
||||
state.set(null.asInstanceOf[T])
|
||||
afterAll(v)
|
||||
}
|
||||
|
||||
protected def generate: T
|
||||
|
||||
protected def afterState(v: T): T
|
||||
|
||||
protected def afterAll(t: T): T = t
|
||||
|
||||
}
|
||||
|
||||
object Generator {
|
||||
|
||||
val random: ThreadLocalRandom = ThreadLocalRandom.current()
|
||||
|
||||
private val faker = new Faker()
|
||||
|
||||
def nextIP: String = {
|
||||
if (random.nextBoolean()) {
|
||||
faker.internet().ipV4Address()
|
||||
} else {
|
||||
faker.internet().ipV6Address()
|
||||
}
|
||||
}
|
||||
|
||||
def nextID: String = faker.idNumber().valid()
|
||||
|
||||
def nextPort: Int = random.nextInt(65535)
|
||||
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.zdjizhi.flink.voip.data
|
||||
|
||||
import com.zdjizhi.flink.voip.records.{Record, SIPRecord, StreamDirs}
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode, TextNode}
|
||||
|
||||
// 生成 SIP 数据行
|
||||
class SIPGenerator(ratio: Int = 40) extends Generator[ObjectNode](ratio) {
|
||||
|
||||
private val mapper = new ObjectMapper()
|
||||
|
||||
override protected def generate: ObjectNode = {
|
||||
val obj = mapper.createObjectNode()
|
||||
|
||||
obj.set(SIPRecord.F_CALL_ID, TextNode.valueOf(Generator.nextID))
|
||||
|
||||
val dir = if (Generator.random.nextBoolean()) StreamDirs.S2C else StreamDirs.C2S
|
||||
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(dir))
|
||||
|
||||
obj.set(Record.F_COMMON_SERVER_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(Record.F_COMMON_SERVER_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
obj.set(Record.F_COMMON_CLIENT_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(Record.F_COMMON_CLIENT_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
|
||||
obj.set(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
obj.set(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, TextNode.valueOf(Generator.nextIP))
|
||||
obj.set(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, IntNode.valueOf(Generator.nextPort))
|
||||
|
||||
obj
|
||||
}
|
||||
|
||||
override def afterState(v: ObjectNode): ObjectNode = {
|
||||
import Record.Implicits._
|
||||
v.streamDir match {
|
||||
case StreamDirs.C2S =>
|
||||
v.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.S2C))
|
||||
case StreamDirs.S2C =>
|
||||
v.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.C2S))
|
||||
case _ =>
|
||||
}
|
||||
v
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
|
||||
class AddressTest extends AnyFlatSpec {
|
||||
|
||||
"Address.apply" should "return the Address with ports sorted and IPs reordered if necessary" in {
|
||||
val address1 = ("192.168.0.1", 80)
|
||||
val address2 = ("10.1.1.1", 8080)
|
||||
|
||||
val result1 = Address(address1, address2)
|
||||
assert(result1 == Address("192.168.0.1", 80, "10.1.1.1", 8080))
|
||||
|
||||
val result2 = Address(address2, address1)
|
||||
assert(result2 == Address("192.168.0.1", 80, "10.1.1.1", 8080))
|
||||
|
||||
val address3 = ("172.16.0.1", 80)
|
||||
val result3 = Address(address3, address1)
|
||||
assert(result3 == Address("172.16.0.1", 80, "192.168.0.1", 80))
|
||||
|
||||
val address4 = ("172.31.0.1", 443)
|
||||
val result4 = Address(address1, address4)
|
||||
assert(result4 == Address("192.168.0.1", 80, "172.31.0.1", 443))
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.zdjizhi.flink.voip.functions
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs
|
||||
import com.zdjizhi.flink.voip.data.SIPGenerator
|
||||
import com.zdjizhi.flink.voip.records.{Record, StreamDirs}
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor
|
||||
import org.apache.flink.api.common.time.Time
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation
|
||||
import org.apache.flink.api.java.functions.KeySelector
|
||||
import org.apache.flink.configuration.Configuration
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{IntNode, ObjectNode}
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
|
||||
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
|
||||
import java.lang.{Long => JLong}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
class SIPPairingFunctionTest extends AnyFlatSpec with BeforeAndAfter {
|
||||
|
||||
private val keySelector: KeySelector[ObjectNode, Int] = (_: ObjectNode) => 0
|
||||
|
||||
private var testHarness: KeyedOneInputStreamOperatorTestHarness[Int, ObjectNode, ObjectNode] = _
|
||||
|
||||
private val interval: JLong = Time.minutes(5).toMilliseconds
|
||||
|
||||
before {
|
||||
val func = new SIPPairingFunction
|
||||
val operator = new KeyedProcessOperator(func)
|
||||
val tpe = TypeInformation.of(classOf[Int])
|
||||
testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, tpe)
|
||||
|
||||
val config = new Configuration()
|
||||
config.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, interval)
|
||||
testHarness.getExecutionConfig.setGlobalJobParameters(config)
|
||||
testHarness.open()
|
||||
}
|
||||
|
||||
it should "properly pair SIP records" in {
|
||||
val sipGenerator = new SIPGenerator()
|
||||
val current = System.currentTimeMillis()
|
||||
|
||||
val obj = sipGenerator.next
|
||||
obj.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.DOUBLE))
|
||||
val record = new StreamRecord(obj, current)
|
||||
|
||||
testHarness.processElement(record)
|
||||
assert(testHarness.getOutput.asScala.toSet.contains(record))
|
||||
|
||||
val nextFireTime = (testHarness.getProcessingTime / interval + 1) * interval
|
||||
assert(testHarness.getProcessingTimeService.getActiveTimerTimestamps.contains(nextFireTime))
|
||||
|
||||
val obj1 = obj.deepCopy()
|
||||
obj1.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.S2C))
|
||||
|
||||
val obj2 = obj.deepCopy()
|
||||
obj2.set(Record.F_COMMON_STREAM_DIR, IntNode.valueOf(StreamDirs.C2S))
|
||||
|
||||
val descriptor = new MapStateDescriptor("sip-state", classOf[Address], classOf[ObjectNodeWithExpiration])
|
||||
|
||||
testHarness.processElement(obj1, current)
|
||||
val mapState = testHarness.getOperator.getKeyedStateStore.getMapState(descriptor)
|
||||
assert(mapState.values().asScala.map(_.obj).toSet(obj1))
|
||||
|
||||
testHarness.processElement(obj2, current)
|
||||
assert(!mapState.values().asScala.map(_.obj).toSet(obj1))
|
||||
assert(testHarness.getOutput.asScala.toSet.contains(record))
|
||||
assert(testHarness.getOutput.asScala.size == 2)
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user