GTPC初始版本

This commit is contained in:
LAPTOP-CUUVN8AS\wk
2022-08-17 17:48:33 +08:00
parent 6f0b2cb82b
commit bea6a964df
16 changed files with 1344 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
src/main/java/com/zdjizhi/utils/kafka/test.java

287
pom.xml Normal file
View File

@@ -0,0 +1,287 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
<artifactId>relationship-gtpc-user</artifactId>
<version>22-08-15</version>
<name>relationship-gtpc-user</name>
<url>http://www.example.com</url>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<!--<enabled>true</enabled>-->
</releases>
<snapshots>
<!--<enabled>true</enabled>-->
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
<dependencies>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.6</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- The semantics of this option are reversed, see MCOMPILER-209. -->
<useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
<arg>-Xpkginfo:always</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>relationship-gtpc-user-22-08-15</finalName>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zdjizhi.topology.GtpRelation</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src\main\java</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>

View File

@@ -0,0 +1,16 @@
#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
#kafka source poll
max.poll.records=3000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#====================kafka default====================#
#kafka SASL<53><4C>֤<EFBFBD>û<EFBFBD><C3BB><EFBFBD>
kafka.user=admin
#kafka SASL<53><4C>SSL<53><4C>֤<EFBFBD><D6A4><EFBFBD><EFBFBD>
kafka.pin=galaxy2019

View File

@@ -0,0 +1,32 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
input.kafka.servers=192.168.44.12:9094
#hbase zookeeper地址 用于连接HBase
#hbase.zookeeper.servers=192.168.44.12
hbase.zookeeper.servers=192.168.44.12:2181
hbase.scan.limit=0
cache.expire.seconds=86400
cache.max.size=10000000
cache.update.seconds=3600
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
input.kafka.topic=GTPC-RECORD-COMPLETED
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=test3
#--------------------------------topology配置------------------------------#
#ip-account对应关系表
relation.user.teid.table.name=tsg_galaxy:relation_user_teid
#定位库地址
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#account-ip对应关系表
gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base

View File

@@ -0,0 +1,45 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.system.GtpConfigurations;
/**
* @author Administrator
*/
public class GtpConfig {
public static final int HBASE_SCAN_LIMIT = GtpConfigurations.getIntProperty(0, "hbase.scan.limit");
/**
* System
*/
public static final String RELATION_USER_TEID_TABLE_NAME = GtpConfigurations.getStringProperty(0, "relation.user.teid.table.name");
public static final String GTPC_KNOWLEDGE_BASE_TABLE_NAME = GtpConfigurations.getStringProperty(0, "gtpc.knowledge.base.table.name");
/**
* kafka
*/
public static final String INPUT_KAFKA_SERVERS = GtpConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = GtpConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
public static final String SESSION_TIMEOUT_MS = GtpConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = GtpConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = GtpConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
public static final String GROUP_ID = GtpConfigurations.getStringProperty(0, "group.id");
public static final String INPUT_KAFKA_TOPIC = GtpConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String TOOLS_LIBRARY = GtpConfigurations.getStringProperty(0, "tools.library");
public static final String KAFKA_USER = GtpConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = GtpConfigurations.getStringProperty(1, "kafka.pin");
public static final int CACHE_EXPIRE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.expire.seconds");
public static final int CACHE_MAX_SIZE = GtpConfigurations.getIntProperty(0, "cache.max.size");
public static final int CACHE_UPDATE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.update.seconds");
}

View File

@@ -0,0 +1,107 @@
package com.zdjizhi.pojo;
public class Entity {
private String Hashkey;
private int ifError;
private String gtp_apn;
private String gtp_imei;
private String gtp_imsi;
private String gtp_phone_number;
private Long gtp_uplink_teid;
private Long gtp_downlink_teid ;
private String gtp_msg_type;
private Long common_recv_time;
private Long gtp_teid;
public String getHashkey() {
return Hashkey;
}
public void setHashkey(String hashkey) {
Hashkey = hashkey;
}
public int getIfError() {
return ifError;
}
public void setIfError(int ifError) {
this.ifError = ifError;
}
public String getGtp_apn() {
return gtp_apn;
}
public void setGtp_apn(String gtp_apn) {
this.gtp_apn = gtp_apn;
}
public String getGtp_imei() {
return gtp_imei;
}
public void setGtp_imei(String gtp_imei) {
this.gtp_imei = gtp_imei;
}
public String getGtp_imsi() {
return gtp_imsi;
}
public void setGtp_imsi(String gtp_imsi) {
this.gtp_imsi = gtp_imsi;
}
public String getGtp_phone_number() {
return gtp_phone_number;
}
public void setGtp_phone_number(String gtp_phone_number) {
this.gtp_phone_number = gtp_phone_number;
}
public Long getGtp_uplink_teid() {
return gtp_uplink_teid;
}
public void setGtp_uplink_teid(Long gtp_uplink_teid) {
this.gtp_uplink_teid = gtp_uplink_teid;
}
public Long getGtp_downlink_teid() {
return gtp_downlink_teid;
}
public void setGtp_downlink_teid(Long gtp_downlink_teid) {
this.gtp_downlink_teid = gtp_downlink_teid;
}
public String getGtp_msg_type() {
return gtp_msg_type;
}
public void setGtp_msg_type(String gtp_msg_type) {
this.gtp_msg_type = gtp_msg_type;
}
public Long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(Long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public Long getGtp_teid() {
return gtp_teid;
}
public void setGtp_teid(Long gtp_teid) {
this.gtp_teid = gtp_teid;
}
}

View File

@@ -0,0 +1,69 @@
package com.zdjizhi.pojo;
public class Gtp {
private String gtp_apn;
private String gtp_imei;
private String gtp_imsi;
private String gtp_phone_number;
private Long gtp_teid;
private Integer msg_type;
private Long last_update_time;
public Integer getMsg_type() {
return msg_type;
}
public void setMsg_type(Integer msg_type) {
this.msg_type = msg_type;
}
public String getGtp_apn() {
return gtp_apn;
}
public void setGtp_apn(String gtp_apn) {
this.gtp_apn = gtp_apn;
}
public String getGtp_imei() {
return gtp_imei;
}
public void setGtp_imei(String gtp_imei) {
this.gtp_imei = gtp_imei;
}
public String getGtp_imsi() {
return gtp_imsi;
}
public void setGtp_imsi(String gtp_imsi) {
this.gtp_imsi = gtp_imsi;
}
public String getGtp_phone_number() {
return gtp_phone_number;
}
public void setGtp_phone_number(String gtp_phone_number) {
this.gtp_phone_number = gtp_phone_number;
}
public Long getGtp_teid() {
return gtp_teid;
}
public void setGtp_teid(Long gtp_teid) {
this.gtp_teid = gtp_teid;
}
public Long getLast_update_time() {
return last_update_time;
}
public void setLast_update_time(Long last_update_time) {
this.last_update_time = last_update_time;
}
}

View File

@@ -0,0 +1,58 @@
package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.GtpConfig;
import com.zdjizhi.pojo.Entity;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.ParseFunction;
import com.zdjizhi.utils.hbasepackage.HbaseSink;
import com.zdjizhi.utils.kafka.Consumer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author qidaijie
* @Package com.zdjizhi.topology
* @Description:
* @date 2021/5/2016:42
*/
public class GtpRelation {
private static final Log logger = LogFactory.get();
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
DataStream<Entity> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
DataStream<Entity> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
KeyedStream<Entity, Tuple1<String>> GtprelationTuple = filterOriginalData.keyBy(new oneKeySelector());
GtprelationTuple.addSink(new HbaseSink(GtpConfig.HBASE_ZOOKEEPER_SERVERS));
try {
environment.execute("RELATIONSHIP-GTPC-USER");
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
}
}
public static class oneKeySelector implements KeySelector<Entity, Tuple1<String>> {
@Override
public Tuple1<String> getKey(Entity entity) throws Exception {
return new Tuple1<>(entity.getHashkey());
}
}
}

View File

@@ -0,0 +1,14 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.pojo.Entity;
import org.apache.flink.api.common.functions.FilterFunction;
public class FilterNullFunction implements FilterFunction<Entity> {
@Override
public boolean filter(Entity entity) {
return entity.getIfError()!=1;
}
}

View File

@@ -0,0 +1,100 @@
package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.zdjizhi.pojo.Entity;
import com.zdjizhi.utils.StringUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flink.api.common.functions.MapFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class ParseFunction implements MapFunction<String, Entity> {
private static final Log logger = LogFactory.get();
@Override
public Entity map(String message) {
Entity entity = new Entity();
try {
if (StringUtil.isNotBlank(message)) {
entity = JSON.parseObject(message, Entity.class);
if(entity.getGtp_apn()==null){
entity.setGtp_apn("");
}
if(entity.getGtp_phone_number()==null){
entity.setGtp_phone_number("");
}
if(entity.getGtp_imei()==null){
entity.setGtp_imei("");
}
if(entity.getGtp_imsi()==null){
entity.setGtp_imsi("");
}
if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
entity.setHashkey(md5Str);
if(entity.getGtp_uplink_teid()==null || entity.getGtp_uplink_teid()==0){
if(entity.getGtp_downlink_teid()==null || entity.getGtp_downlink_teid()==0){
entity.setIfError(1);
logger.info("teid为空" + message);
}
else{
entity.setGtp_teid(entity.getGtp_downlink_teid());
}
}else{
entity.setGtp_teid(entity.getGtp_uplink_teid());
}
}
else {
entity.setHashkey("");
entity.setIfError(1);
logger.info("三元组为空" + message);
}
}else{
entity.setIfError(1);
logger.error("数据转换JSON格式异常,原始日志为:" + message);
}
} catch (JSONException jse) {
entity.setIfError(1);
logger.error("数据转换JSON格式异常,原始日志为:" + message);
} catch (RuntimeException re) {
entity.setIfError(1);
logger.error("GTP日志条件过滤异常,异常信息为:" + re);
}
return entity;
}
}

View File

@@ -0,0 +1,397 @@
package com.zdjizhi.utils.hbasepackage;
import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.GtpConfig;
import com.zdjizhi.pojo.Entity;
import com.zdjizhi.pojo.Gtp;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, SinkFunction<Entity> {
private Logger log;
private String hbase_zookeeper_host;
// public Map<String, Gtp> gtpConcurrentHashMap = new ConcurrentHashMap<>(80000);
//public Cache<String, Gtp> gtpConcurrentHashMap = CacheUtil.newLRUCache(4, DateUnit.SECOND.getMillis() * 60);
public com.google.common.cache.Cache<String, Gtp> gtpConcurrentHashMap ;
private Connection connection;
private Admin admin;
public HbaseSink(String hbase_zookeeper_host) {
this.hbase_zookeeper_host = hbase_zookeeper_host;
gtpConcurrentHashMap=CacheBuilder.newBuilder().expireAfterWrite(GtpConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(GtpConfig.CACHE_MAX_SIZE).build();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
log = Logger.getLogger(HbaseSink.class);
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
try {
Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
Scan scan = new Scan();
scan.addColumn("gtp".getBytes(), "teid".getBytes());
scan.addColumn("gtp".getBytes(), "apn".getBytes());
scan.addColumn("gtp".getBytes(), "phone_number".getBytes());
scan.addColumn("gtp".getBytes(), "imsi".getBytes());
scan.addColumn("gtp".getBytes(), "imei".getBytes());
scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
}
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
if (result.containsColumn("gtp".getBytes(), "teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
Gtp gtp = new Gtp();
String key = Bytes.toString(result.getRow());
Long teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))));
int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))));
String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn"))));
String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))));
String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
gtp.setLast_update_time(last_update_time);
gtp.setGtp_teid(teid);
gtp.setGtp_apn(apn);
gtp.setGtp_phone_number(phone_number);
gtp.setGtp_imsi(imsi);
gtp.setGtp_imei(imei);
gtp.setMsg_type(msg_type);
gtpConcurrentHashMap.put(key, gtp);
}
}
scanner.close();
} catch (IOException ioe) {
log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
}
}
public void invoke(Entity entity, Context context) throws Exception {
// 按 project:table 归纳
Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
if (gtp!=null) {
//Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
if (gtp.getLast_update_time() <= entity.getCommon_recv_time()) {
if("delete".equals(entity.getGtp_msg_type())){
if(gtp.getGtp_teid().equals(entity.getGtp_teid())){
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
gtp.setGtp_imei(entity.getGtp_imei());
ArrayList<Row> updaterows = getupdateRows(gtp);
rows.addAll(updaterows);
updateKnowledgeMessage(rows);
updateRelationMessage(entity.getHashkey(), gtp);
}
else{
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
gtp.setGtp_imei(entity.getGtp_imei());
ArrayList<Row> updaterows = getupdateRows(gtp);
// rows.addAll(delrows);
rows.addAll(updaterows);
updateKnowledgeMessage(rows);
// updateRelationMessage(entity.getHashkey(), gtp);
}
}
else{
if(!gtp.getGtp_teid().equals(entity.getGtp_teid())){
gtp.setMsg_type(1);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
gtp.setGtp_imei(entity.getGtp_imei());
ArrayList<Row> updaterows = getupdateRows(gtp);
// rows.addAll(delrows);
rows.addAll(updaterows);
updateKnowledgeMessage(rows);
updateRelationMessage(entity.getHashkey(), gtp);
}
else {
if(entity.getCommon_recv_time()-gtp.getLast_update_time()>GtpConfig.CACHE_UPDATE_SECONDS){
gtp.setMsg_type(1);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
gtp.setGtp_imei(entity.getGtp_imei());
ArrayList<Row> updaterows = getupdateRows(gtp);
// rows.addAll(delrows);
rows.addAll(updaterows);
updateKnowledgeMessage(rows);
updateRelationMessage(entity.getHashkey(), gtp);
}
}
}
}else{
if ("delete".equals(entity.getGtp_msg_type())) {
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
gtp.setGtp_teid(entity.getGtp_teid());
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
gtp.setGtp_imei(entity.getGtp_imei());
ArrayList<Row> updaterows = getupdateRows(gtp);
// rows.addAll(delrows);
rows.addAll(updaterows);
updateKnowledgeMessage(rows);
}
}
} else {
Gtp gtpobj = new Gtp();
gtpobj.setLast_update_time(entity.getCommon_recv_time());
gtpobj.setGtp_teid(entity.getGtp_teid());
gtpobj.setGtp_apn(entity.getGtp_apn());
gtpobj.setGtp_phone_number(entity.getGtp_phone_number());
gtpobj.setGtp_imsi(entity.getGtp_imsi());
gtpobj.setGtp_imei(entity.getGtp_imei());
if(!"delete".equals(entity.getGtp_msg_type())) {
gtpobj.setMsg_type(1);
}
else{
gtpobj.setMsg_type(2);
}
ArrayList<Row> rows = new ArrayList<>();
ArrayList<Row> updaterows = getupdateRows(gtpobj);
rows.addAll(updaterows);
updateRelationMessage(entity.getHashkey(), gtpobj);
updateKnowledgeMessage(rows);
}
}
@Override
public void close() throws Exception {
super.close();
}
public void updateRelationMessage(String key, Gtp gtp) throws IOException {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
Put put = new Put(key.getBytes());
put.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
put.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
put.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
put.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
put.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
put.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
put.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
table.put(put);
gtpConcurrentHashMap.put(key, gtp);
} catch (Exception e) {
log.error(e.toString());
} finally {
table.close();
}
}
public void updateKnowledgeMessage(ArrayList<Row> rows) throws IOException {
Table tableR = null;
try {
tableR = connection.getTable(TableName.valueOf(GtpConfig.GTPC_KNOWLEDGE_BASE_TABLE_NAME));
Object[] results = new Object[rows.size()];
tableR.batch(rows, results);
} catch (Exception e) {
log.error(e.toString());
} finally {
tableR.close();
}
}
public ArrayList<Row> getDelRows(Gtp entity) {
ArrayList<Row> delrows = new ArrayList<>();
if (!"".equals(entity.getGtp_apn())) {
String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_teid();
Delete del_apnkey = new Delete(Bytes.toBytes(oldapnkey));
delrows.add(del_apnkey);
}
if (!"".equals(entity.getGtp_phone_number())) {
String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_teid();
Delete del_pnkey = new Delete(Bytes.toBytes(oldpnkey));
delrows.add(del_pnkey);
}
if (!"".equals(entity.getGtp_imsi())) {
String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_teid();
Delete del_imsikey = new Delete(Bytes.toBytes(oldimsikey));
delrows.add(del_imsikey);
}
if (!"".equals(entity.getGtp_imei())) {
String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_teid();
Delete del_imeikey = new Delete(Bytes.toBytes(oldimeikey));
delrows.add(del_imeikey);
}
return delrows;
}
public ArrayList<Row> getupdateRows(Gtp gtp) {
ArrayList<Row> updaterows = new ArrayList<>();
if (!"".equals(gtp.getGtp_apn())) {
String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_teid();
Put putApn = new Put(apnkey.getBytes());
putApn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putApn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putApn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putApn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
putApn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putApn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putApn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
updaterows.add(putApn);
}
if (!"".equals(gtp.getGtp_phone_number())) {
String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_teid();
Put putPn = new Put(pnkey.getBytes());
putPn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putPn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putPn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putPn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
putPn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putPn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putPn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
updaterows.add(putPn);
}
if (!"".equals(gtp.getGtp_imsi())) {
String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_teid();
Put putImsi = new Put(imsikey.getBytes());
putImsi.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putImsi.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putImsi.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putImsi.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
putImsi.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putImsi.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putImsi.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
updaterows.add(putImsi);
}
if (!"".equals(gtp.getGtp_imei())) {
String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_teid();
Put putImei = new Put(imeikey.getBytes());
putImei.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
putImei.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putImei.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putImei.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
putImei.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei()));
putImei.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time()));
putImei.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type()));
updaterows.add(putImei);
}
return updaterows;
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.GtpConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/9/610:37
*/
class CertUtils {
/**
* Kafka SASL认证端口
*/
private static final String SASL_PORT = "9094";
/**
* Kafka SSL认证端口
*/
private static final String SSL_PORT = "9095";
/**
* 根据连接信息端口判断认证方式。
*
* @param servers kafka 连接信息
* @param properties kafka 连接配置信息
*/
static void chooseCert(String servers, Properties properties) {
if (servers.contains(SASL_PORT)) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ GtpConfig.KAFKA_USER + " password=" + GtpConfig.KAFKA_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", GtpConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", GtpConfig.KAFKA_PIN);
properties.put("ssl.truststore.location", GtpConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", GtpConfig.KAFKA_PIN);
properties.put("ssl.key.password", GtpConfig.KAFKA_PIN);
}
}
}

View File

@@ -0,0 +1,41 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.GtpConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/6/813:54
*/
public class Consumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", GtpConfig.INPUT_KAFKA_SERVERS);
properties.put("group.id", GtpConfig.GROUP_ID);
properties.put("session.timeout.ms", GtpConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", GtpConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", GtpConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
CertUtils.chooseCert(GtpConfig.INPUT_KAFKA_SERVERS, properties);
return properties;
}
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(GtpConfig.INPUT_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
}

View File

@@ -0,0 +1,70 @@
package com.zdjizhi.utils.system;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
/**
* @author Administrator
*/
public final class GtpConfigurations {
private static Properties propDefault = new Properties();
private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
} else if (type == 1) {
return propDefault.getProperty(key);
} else {
return null;
}
}
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
} else if (type == 1) {
return Integer.parseInt(propDefault.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
} else if (type == 1) {
return Long.parseLong(propDefault.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) {
return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
}
static {
try {
propService.load(GtpConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propDefault.load(GtpConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) {
propDefault = null;
propService = null;
}
}
}

25
src/main/log4j.properties Normal file
View File

@@ -0,0 +1,25 @@
#Log4j
log4j.rootLogger=console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=ERROR
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=${nis.root}/log/galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
##MyBatis 配置com.nis.web.dao是mybatis接口所在包
#log4j.logger.com.nis.web.dao=debug
##bonecp数据源配置
#log4j.category.com.jolbox=debug,console

View File

@@ -0,0 +1,34 @@
package com.zdjizhi;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2021/6/2314:32
*/
public class FunctionTest {
@Test
public void jsonTest() {
Map<String, Long> methodCount = new HashMap<>(16);
methodCount.put("A",20L);
methodCount.put("B",20L);
methodCount.put("C",20L);
String jsonString = JSON.toJSONString(methodCount);
System.out.println(jsonString);
JSONObject jsonObject = JSONObject.parseObject(jsonString);
Map<String, Object> hmCount = (Map<String, Object>) jsonObject;
System.out.println(hmCount.toString());
}
}