diff --git a/README.md b/README.md
index 25a4a3f..889c192 100644
--- a/README.md
+++ b/README.md
@@ -1,14 +1,15 @@
# log-stream-completion-schema
-基于查询网关的动态日志预处理程序,接收原始日志根据对应schema定义进行数据清洗
-并将结果回写Kafka。
+基于Nacos的动态日志预处理程序,接收原始日志根据对应Schema定义进行数据清洗,并将结果回写Kafka。
+当Nacos上Schame变更后可动态获取到最新版本的信息,无需重启任务。
## 函数功能列表
* current_timestamp
-> 获取当前时间戳,若追加字段已有时间戳,不予以覆盖
+> 获取当前时间戳,若追加字段已有时间戳,不予以覆盖。
* snowflake_id
-> 雪花ID函数,返回一个一定条件内不重复的 long 类型数值
+> 雪花ID函数,返回一个一定条件内不重复的 long 类型数值。
+> https://git.mesalab.cn/bigdata/algorithm/snowflake
* geo_ip_detail
> IP定位库,获取对应IP的详细地理位置信息,城市,州/省,国家
* geo_asn
@@ -16,19 +17,21 @@
* geo_ip_country
> IP定位库,获取对应IP的地理位置信息,仅包含 国家
* set_value
-> 给予字段固定值
+> 给予字段固定值。
* get_value
-> 获取字段值并追加到新的字段
+> 获取字段值并追加到新的字段。
* if
> IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
* sub_domain
> 获取顶级域名
* radius_match
-> 根据IP解析对应raidus用户,借助于HBase存储数据。
+> 根据IP获取对应的Raidus用户信息。
+> 实际数据存储在HBase中,依赖RADIUS-RELATIONSHIP-HBASE-V2程序;使用时加载到内存中加速查询。
* app_match
> 根据APP_ID获取对应的APP名称
* decode_of_base64
> 根据编码解码base64,若编码字段为空则根据默认编码解析(UTF-8)
* flattenSpec
-> 根据表达式解析json
+> 根据表达式解析json,使用jsonPath工具类
+> https://github.com/json-path/JsonPath
diff --git a/pom.xml b/pom.xml
index f484859..b0aecc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-completion-schema
- 220318-Nacos
+ 220727-GTPC
log-completion-schema
http://www.example.com
@@ -297,6 +297,19 @@
1.9.3
+
+
+ org.apache.avro
+ avro
+ 1.11.0
+
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
index aaeccfc..23fb5f9 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -24,42 +24,46 @@ batch.size=262144
#128M
buffer.memory=134217728
-#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
-#10M
+#这个参数决定了每次发送给Kafka服务器请求的最大大小
+#default: 10485760 = 10M
max.request.size=10485760
#====================kafka default====================#
-#kafka SASL验证用户名-加密
+#kafka SASL/SSL username (encryption)
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
-#kafka SASL及SSL验证密码-加密
+#kafka SASL/SSL pin (encryption)
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
-#生产者ack
+#producer ack
producer.ack=1
#====================nacos default====================#
-#nacos username
-nacos.username=nacos
+#nacos username (encryption)
+nacos.username=kANxu/Zi5rBnZVxa5zAjrQ==
-#nacos password
-nacos.pin=nacos
+#nacos pin (encryption)
+nacos.pin=YPIBDIXjJUtVBjjk2op0Dg==
#nacos group
nacos.group=Galaxy
#====================Topology Default====================#
-#hbase table name
-hbase.table.name=tsg_galaxy:relation_framedip_account
+#hbase radius relation table name
+hbase.radius.table.name=tsg_galaxy:relation_framedip_account
-#邮件默认编码
+#hbase gtpc relation table name
+hbase.gtpc.table.name=tsg_galaxy:relation_user_teid
+
+#mail base64 use default charset
mail.default.charset=UTF-8
-#0不做任何校验,1弱类型校验
+#0 no-operation parse JSON directly.
+#1 Check fields type with schema,Do some type conversion.
log.transform.type=1
-#两个输出之间的最大时间(单位milliseconds)
+#Maximum time between two outputs(milliseconds)
buffer.timeout=5000
-#====================临时配置-待删除====================#
-#网关APP_ID 获取接口
-app.id.http=http://192.168.44.20:9999/open-api/appDicList
-#app_id 更新时间,如填写0则不更新缓存
-app.tick.tuple.freq.secs=0
\ No newline at end of file
+#The gtpc data scan max rows,0 = no limit.
+hbase.gtpc.scan.max.rows=0
+
+#The radius data scan max rows,0 = no limit.
+hbase.radius.scan.max.rows=0
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 09c78df..b111181 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,32 +1,31 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.44.11:9094
+source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.44.11:9094
+sink.kafka.servers=192.168.44.12:9094
#zookeeper 地址 用于配置log_id
-zookeeper.servers=192.168.44.11:2181
+zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.11:2181
+hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
-tools.library=D:\\workerspace\\dat\\
+tools.library=/opt/dat/
#--------------------------------nacos配置------------------------------#
#nacos 地址
-nacos.server=192.168.40.43:8848
+nacos.server=192.168.44.12:8848
#nacos namespace
-nacos.schema.namespace=test
+nacos.schema.namespace=prod
#nacos data id
-nacos.data.id=session_record.json
-
-#--------------------------------Kafka消费/生产配置------------------------------#
+nacos.data.id=gtpc_record.json
+#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
source.kafka.topic=test
@@ -34,10 +33,9 @@ source.kafka.topic=test
sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=flinktest-1
+group.id=gtpc-record-log-2022-1
#--------------------------------topology配置------------------------------#
-
#consumer 并行度
source.parallelism=1
@@ -47,16 +45,15 @@ transform.parallelism=1
#kafka producer 并行度
sink.parallelism=1
-#数据中心,取值范围(0-31)
-data.center.id.num=0
+#数据中心,取值范围(0-63)
+data.center.id.num=2
#hbase 更新时间,如填写0则不更新缓存
hbase.tick.tuple.freq.secs=180
#--------------------------------默认值配置------------------------------#
-
#0不需要补全原样输出日志,1需要补全
log.need.complete=1
#生产者压缩模式 none or snappy
-producer.kafka.compression.type=none
\ No newline at end of file
+producer.kafka.compression.type=none
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 5fd92ed..8b4a641 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -42,11 +42,10 @@ public class FlowWriteConfig {
*/
public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
- public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
- public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
- public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
+ public static final String NACOS_USERNAME = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.username"));
+ public static final String NACOS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.pin"));
/**
* System config
@@ -64,7 +63,10 @@ public class FlowWriteConfig {
* HBase
*/
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
- public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+ public static final Integer HBASE_GTPC_SCAN_MAX_ROWS = FlowWriteConfigurations.getIntProperty(1, "hbase.gtpc.scan.max.rows");
+ public static final Integer HBASE_RADIUS_SCAN_MAX_ROWS = FlowWriteConfigurations.getIntProperty(1, "hbase.radius.scan.max.rows");
+ public static final String HBASE_RADIUS_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.radius.table.name");
+ public static final String HBASE_GTPC_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.gtpc.table.name");
/**
* kafka common
@@ -98,19 +100,13 @@ public class FlowWriteConfig {
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
- /**
- * http
- */
- public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
- public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
-
/**
* common config
*/
- public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
- public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
- public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
- public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
- public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
+ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
deleted file mode 100644
index 1425ce9..0000000
--- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.zdjizhi.utils.app;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.http.HttpClientUtil;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * AppId 工具类
- *
- * @author qidaijie
- */
-
-@Deprecated
-public class AppUtils {
- private static final Log logger = LogFactory.get();
- private static Map appIdMap = new ConcurrentHashMap<>(128);
- private static AppUtils appUtils;
-
- private static void getAppInstance() {
- appUtils = new AppUtils();
- }
-
-
- /**
- * 构造函数-新
- */
- private AppUtils() {
- //定时更新
- updateAppIdCache();
- }
-
- /**
- * 更新变量
- */
- private static void change() {
- if (appUtils == null) {
- getAppInstance();
- }
- timestampsFilter();
- }
-
-
- /**
- * 获取变更内容
- */
- private static void timestampsFilter() {
- try {
- Long begin = System.currentTimeMillis();
- String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
- if (StringUtil.isNotBlank(schema)) {
- String data = JSONObject.parseObject(schema).getString("data");
- JSONArray objects = JSONArray.parseArray(data);
- for (Object object : objects) {
- JSONArray jsonArray = JSONArray.parseArray(object.toString());
- int key = jsonArray.getInteger(0);
- String value = jsonArray.getString(1);
- if (appIdMap.containsKey(key)) {
- if (!value.equals(appIdMap.get(key))) {
- appIdMap.put(key, value);
- }
- } else {
- appIdMap.put(key, value);
- }
- }
- logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
- logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
- }
- } catch (RuntimeException e) {
- logger.error("Update cache app-id failed, exception:" + e);
- }
- }
-
-
- /**
- * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
- */
- private void updateAppIdCache() {
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
- change();
- }
- } catch (RuntimeException e) {
- logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
- }
- }
- }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
- }
-
-
- /**
- * 获取 appName
- *
- * @param appId app_id
- * @return account
- */
- public static String getAppName(int appId) {
-
- if (appUtils == null) {
- getAppInstance();
- }
-
- if (appIdMap.containsKey(appId)) {
- return appIdMap.get(appId);
- } else {
- logger.warn("AppMap get appName is null, ID is :" + appId);
- return "";
- }
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/general/CityHash.java b/src/main/java/com/zdjizhi/utils/general/CityHash.java
deleted file mode 100644
index 5de4785..0000000
--- a/src/main/java/com/zdjizhi/utils/general/CityHash.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package com.zdjizhi.utils.general;
-
-
-
-
-/**
- * CityHash64算法对logid进行散列计算
- * 版本规划暂不实现-TSG22.01
- *
- * @author qidaijie
- */
-@Deprecated
-public class CityHash {
-
- private static final long k0 = 0xc3a5c85c97cb3127L;
- private static final long k1 = 0xb492b66fbe98f273L;
- private static final long k2 = 0x9ae16a3b2f90404fL;
- private static final long k3 = 0xc949d7c7509e6557L;
- private static final long k5 = 0x9ddfea08eb382d69L;
-
- private CityHash() {}
-
- public static long CityHash64(byte[] s, int index, int len) {
- if (len <= 16 ) {
- return HashLen0to16(s, index, len);
- } else if (len > 16 && len <= 32) {
- return HashLen17to32(s, index, len);
- } else if (len > 32 && len <= 64) {
- return HashLen33to64(s, index, len);
- } else {
- long x = Fetch64(s, index);
- long y = Fetch64(s, index + len - 16) ^ k1;
- long z = Fetch64(s, index + len - 56) ^ k0;
- long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y);
- long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0);
- z += ShiftMix(v[1]) * k1;
- x = Rotate(z + x, 39) * k1;
- y = Rotate(y, 33) * k1;
-
- len = (len - 1) & ~63;
- do {
- x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1;
- y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1;
- x ^= w[1];
- y ^= v[0];
- z = Rotate(z ^ w[0], 33);
- v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]);
- w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y);
- long t = z;
- z = x;
- x = t;
- index += 64;
- len -= 64;
- } while (len != 0);
- return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z,
- HashLen16(v[1], w[1]) + x);
- }
- }
-
- private static long HashLen0to16(byte[] s, int index, int len) {
- if (len > 8) {
- long a = Fetch64(s, index);
- long b = Fetch64(s, index + len - 8);
- return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b;
- }
- if (len >= 4) {
- long a = Fetch32(s, index);
- return HashLen16(len + (a << 3), Fetch32(s, index + len - 4));
- }
- if (len > 0) {
- byte a = s[index];
- byte b = s[index + len >>> 1];
- byte c = s[index + len - 1];
- int y = (a) + (b << 8);
- int z = len + (c << 2);
- return ShiftMix(y * k2 ^ z * k3) * k2;
- }
- return k2;
- }
-
- private static long HashLen17to32(byte[] s, int index, int len) {
- long a = Fetch64(s, index) * k1;
- long b = Fetch64(s, index + 8);
- long c = Fetch64(s, index + len - 8) * k2;
- long d = Fetch64(s, index + len - 16) * k0;
- return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
- a + Rotate(b ^ k3, 20) - c + len);
- }
-
- private static long HashLen33to64(byte[] s, int index, int len) {
- long z = Fetch64(s, index + 24);
- long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0;
- long b = Rotate(a + z, 52);
- long c = Rotate(a, 37);
- a += Fetch64(s, index + 8);
- c += Rotate(a, 7);
- a += Fetch64(s, index + 16);
- long vf = a + z;
- long vs = b + Rotate(a, 31) + c;
- a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32);
- z = Fetch64(s, index + len - 8);
- b = Rotate(a + z, 52);
- c = Rotate(a, 37);
- a += Fetch64(s, index + len - 24);
- c += Rotate(a, 7);
- a += Fetch64(s, index + len - 16);
- long wf = a + z;
- long ws = b + Rotate(a, 31) + c;
- long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
- return ShiftMix(r * k0 + vs) * k2;
- }
-
- private static long Fetch64(byte[] p, int index) {
- return toLongLE(p,index);
- }
-
- private static long Fetch32(byte[] p, int index) {
- return toIntLE(p,index);
- }
- private static long[] WeakHashLen32WithSeeds(
- long w, long x, long y, long z, long a, long b) {
- a += w;
- b = Rotate(b + a + z, 21);
- long c = a;
- a += x;
- a += y;
- b += Rotate(a, 44);
- return new long[]{a + z, b + c};
- }
-
- private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) {
- return WeakHashLen32WithSeeds(Fetch64(s, index),
- Fetch64(s, index + 8),
- Fetch64(s, index + 16),
- Fetch64(s, index + 24),
- a,
- b);
- }
-
- private static long toLongLE(byte[] b, int i) {
- return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
- }
-
- private static long toIntLE(byte[] b, int i) {
- return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
- }
- private static long RotateByAtLeastOne(long val, int shift) {
- return (val >>> shift) | (val << (64 - shift));
- }
-
- private static long ShiftMix(long val) {
- return val ^ (val >>> 47);
- }
-
- private static long Uint128Low64(long[] x) {
- return x[0];
- }
-
- private static long Rotate(long val, int shift) {
- return shift == 0 ? val : (val >>> shift) | (val << (64 - shift));
- }
-
- private static long Uint128High64(long[] x) {
- return x[1];
- }
-
- private static long Hash128to64(long[] x) {
- long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5;
- a ^= (a >>> 47);
- long b = (Uint128High64(x) ^ a) * k5;
- b ^= (b >>> 47);
- b *= k5;
- return b;
- }
-
- private static long HashLen16(long u, long v) {
- return Hash128to64(new long[]{u,v});
- }
-
-}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
index 537d172..37d3a00 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -28,17 +28,19 @@ public class TransFormMap {
try {
JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : JsonParseUtil.getJobList()) {
- //用到的参数的值
+ //该日志字段的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //结果值映射到的日志字段key
+ String appendToKey = strings[1];
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
- functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
+
+ //结果值映射到的日志字段原始value
+ Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
+
+ functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
@@ -51,76 +53,76 @@ public class TransFormMap {
/**
* 根据schema描述对应字段进行操作的 函数集合
*
- * @param function 匹配操作函数的字段
- * @param jsonMap 原始日志解析map
- * @param appendToKeyName 需要补全的字段的key
- * @param appendTo 需要补全的字段的值
- * @param logValue 用到的参数的值
- * @param param 额外的参数的值
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKey 需要补全的字段的key
+ * @param appendToValue 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
*/
- private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
+ private static void functionSet(String function, Map jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) {
switch (function) {
case "current_timestamp":
- if (!(appendTo instanceof Long)) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ if (!(appendToValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
- JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ JsonParseUtil.setValue(jsonMap, appendToKey, SnowflakeId.generateId());
break;
case "geo_ip_detail":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ if (logValue != null && appendToValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString()));
}
break;
case "geo_asn":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ if (logValue != null && appendToValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString()));
}
break;
case "geo_ip_country":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
- }
- break;
- case "set_value":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
- }
- break;
- case "get_value":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
- }
- break;
- case "if":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
- }
- break;
- case "sub_domain":
- if (appendTo == null && logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
- }
- break;
- case "radius_match":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
- }
- break;
- case "decode_of_base64":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ if (logValue != null && appendToValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "flattenSpec":
if (logValue != null && param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
- case "app_match":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "sub_domain":
+ if (appendToValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "gtpc_match":
+ if (logValue != null) {
+ TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey, param);
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, logValue);
}
break;
default:
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
index d5ca29c..de5fd43 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -28,21 +28,24 @@ public class TransFormTypeMap {
try {
Map jsonMap = JsonParseUtil.typeTransform(message);
for (String[] strings : JsonParseUtil.getJobList()) {
- //用到的参数的值
+ //该日志字段的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //需要补全的字段的key
- String appendToKeyName = strings[1];
- //需要补全的字段的值
- Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //结果值映射到的日志字段key
+ String appendToKey = strings[1];
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
- functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
+
+ //结果值映射到的日志字段原始value
+ Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
+
+ functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e);
+ e.printStackTrace();
return null;
}
}
@@ -51,78 +54,76 @@ public class TransFormTypeMap {
/**
* 根据schema描述对应字段进行操作的 函数集合
*
- * @param function 匹配操作函数的字段
- * @param jsonMap 原始日志解析map
- * @param appendToKeyName 需要补全的字段的key
- * @param appendToKeyValue 需要补全的字段的值
- * @param logValue 用到的参数的值
- * @param param 额外的参数的值
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKey 需要补全的字段的key
+ * @param appendToValue 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
*/
- private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
+ private static void functionSet(String function, Map jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) {
switch (function) {
case "current_timestamp":
- if (!(appendToKeyValue instanceof Long)) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ if (!(appendToValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
- JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
- //版本规划暂不实现TSG-22.01
-// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
+ JsonParseUtil.setValue(jsonMap, appendToKey, SnowflakeId.generateId());
break;
case "geo_ip_detail":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ if (logValue != null && appendToValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString()));
}
break;
case "geo_asn":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ if (logValue != null && appendToValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString()));
}
break;
case "geo_ip_country":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
- }
- break;
- case "set_value":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
- }
- break;
- case "get_value":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
- }
- break;
- case "if":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
- }
- break;
- case "sub_domain":
- if (appendToKeyValue == null && logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
- }
- break;
- case "radius_match":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
- }
- break;
- case "decode_of_base64":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ if (logValue != null && appendToValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "flattenSpec":
if (logValue != null && param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
- case "app_match":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "sub_domain":
+ if (appendToValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "gtpc_match":
+ if (logValue != null) {
+ TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey,param);
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKey, logValue);
}
break;
default:
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index 8ed4286..4e002c5 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -1,7 +1,7 @@
package com.zdjizhi.utils.general;
import cn.hutool.core.codec.Base64;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
@@ -10,15 +10,12 @@ import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.app.AppUtils;
import com.zdjizhi.utils.hbase.HBaseUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
-import com.zdjizhi.utils.json.TypeUtils;
+import com.zdjizhi.utils.json.JsonPathUtil;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Map;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@@ -52,21 +49,6 @@ class TransFunction {
return System.currentTimeMillis() / 1000;
}
- /**
- * CityHash64算法
- * 版本规划暂不实现-TSG22.01
- *
- * @param data 原始数据
- * @return 散列结果
- */
- @Deprecated
- static BigInteger getDecimalHash(long data) {
- byte[] dataBytes = String.valueOf(data).getBytes();
- long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
- String decimalValue = Long.toUnsignedString(hashValue, 10);
- return new BigInteger(decimalValue);
- }
-
/**
* 根据clientIp获取location信息
*
@@ -74,15 +56,15 @@ class TransFunction {
* @return ip地址详细信息
*/
static String getGeoIpDetail(String ip) {
+ String detail = "";
try {
- return ipLookup.cityLookupDetail(ip);
+ detail = ipLookup.cityLookupDetail(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
- return "";
} catch (RuntimeException e) {
logger.error("Get clientIP location error! " + e);
- return "";
}
+ return detail;
}
/**
@@ -92,15 +74,15 @@ class TransFunction {
* @return ASN
*/
static String getGeoAsn(String ip) {
+ String asn = "";
try {
- return ipLookup.asnLookup(ip);
+ asn = ipLookup.asnLookup(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
- return "";
} catch (RuntimeException e) {
logger.error("Get IP ASN error! " + e);
- return "";
}
+ return asn;
}
/**
@@ -110,15 +92,15 @@ class TransFunction {
* @return 国家
*/
static String getGeoIpCountry(String ip) {
+ String country = "";
try {
- return ipLookup.countryLookup(ip);
+ country = ipLookup.countryLookup(ip);
} catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe);
- return "";
} catch (RuntimeException e) {
logger.error("Get ServerIP location error! " + e);
- return "";
}
+ return country;
}
@@ -133,19 +115,39 @@ class TransFunction {
}
/**
- * appId与缓存中对应关系补全appName
+ * 借助HBase补齐GTP-C信息
*
- * @param appIds app id 列表
- * @return appName
+ * @param jsonMap 原始日志json
+ * @param logValue 上行TEID
+ * @param appendToKey 结果值映射到的日志字段key
+ * @param param 用于解析jsonarray,直接定位到GTP信息所在的位置
*/
- @Deprecated
- static String appMatch(String appIds) {
+ static void gtpcMatch(Map jsonMap, String logValue, String appendToKey, String param) {
try {
- String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
- return AppUtils.getAppName(Integer.parseInt(appId));
- } catch (NumberFormatException | ClassCastException exception) {
- logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
- return "";
+ Long teid = null;
+ String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
+ for (String expr : exprs) {
+ Long value = JsonPathUtil.getLongValue(logValue, expr);
+ if (value != null) {
+ teid = value;
+ break;
+ }
+ }
+
+ if (teid != null) {
+ String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String userData = HBaseUtils.getGtpData(teid);
+ if (userData != null) {
+ JSONObject schemaJson = new JSONObject(userData, false, true);
+ for (String key : appendToKeys) {
+ JsonParseUtil.setValue(jsonMap, key, schemaJson.getObj(key));
+ }
+ } else {
+ logger.warn("Description The user whose TEID is " + teid + " was not matched!");
+ }
+ }
+ } catch (RuntimeException re) {
+ logger.error("An exception occurred in teid type conversion or parsing of user information!" + re);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java
new file mode 100644
index 0000000..f1e5b30
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java
@@ -0,0 +1,132 @@
+package com.zdjizhi.utils.hbase;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.hbase
+ * @Description:
+ * @date 2022/7/1510:12
+ */
+class GtpCRelation {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 获取全量的Radius数据
+ */
+ static void getAllGtpCRelation(Connection connection, Map gtpcMap) {
+ long begin = System.currentTimeMillis();
+ ResultScanner scanner = null;
+ try {
+ Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
+ Scan scan = new Scan();
+ if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) {
+ scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS);
+ }
+ scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ int acctStatusType = GtpCRelation.getMsgType(result);
+ if (acctStatusType == 1) {
+ Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid")));
+ String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim();
+ String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim();
+ String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim();
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.set("common_phone_number", phoneNumber);
+ jsonObject.set("common_imsi", imsi);
+ jsonObject.set("common_imei", imei);
+ gtpcMap.put(teid, jsonObject.toJSONString(0));
+ }
+ }
+ logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
+ logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin) + "ms");
+ } catch (IOException | RuntimeException e) {
+ logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ }
+
+
+ /**
+ * 增量更新GTP-C关系
+ *
+ * @param connection HBase连接
+ * @param gtpcMap gtp-c关系缓存
+ * @param startTime 开始时间
+ * @param endTime 结束时间
+ */
+ static void upgradeGtpCRelation(Connection connection, Map gtpcMap, Long startTime, Long endTime) {
+ Long begin = System.currentTimeMillis();
+ Table table = null;
+ ResultScanner scanner = null;
+ Scan scan = new Scan();
+ try {
+ table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
+ scan.setTimeRange(startTime, endTime);
+ if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) {
+ scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS);
+ }
+ scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ int acctStatusType = GtpCRelation.getMsgType(result);
+ Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid")));
+ if (acctStatusType == 1) {
+ String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim();
+ String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim();
+ String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim();
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.set("common_phone_number", phoneNumber);
+ jsonObject.set("common_imsi", imsi);
+ jsonObject.set("common_imei", imei);
+ gtpcMap.put(teid, jsonObject.toJSONString(0));
+ } else if (acctStatusType == 2) {
+ gtpcMap.remove(teid);
+ }
+ }
+ Long end = System.currentTimeMillis();
+ logger.warn("The current number of GTPC relationships is: " + gtpcMap.keySet().size());
+ logger.warn("The time used to update the GTPC relationship is: " + (end - begin) + "ms");
+ } catch (IOException | RuntimeException e) {
+ logger.error("GTPC relationship update exception, the content is:" + e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ logger.error("HBase Table Close ERROR! Exception message is:" + e);
+ }
+ }
+ }
+ }
+
+ /**
+ * 获取当前用户上下线状态信息
+ *
+ * @param result HBase内获取的数据
+ * @return 状态 1-上线 2-下线
+ */
+ private static int getMsgType(Result result) {
+ boolean hasType = result.containsColumn(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"));
+ if (hasType) {
+ return Bytes.toInt(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")));
+ } else {
+ return 1;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
index adf2ef4..461edc0 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -5,11 +5,10 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -24,7 +23,8 @@ import java.util.concurrent.TimeUnit;
public class HBaseUtils {
private static final Log logger = LogFactory.get();
- private static Map subIdMap = new ConcurrentHashMap<>(16);
+ private static Map radiusMap = new ConcurrentHashMap<>(16);
+ private static Map gtpcMap = new ConcurrentHashMap<>(16);
private static Connection connection;
private static Long time;
@@ -42,7 +42,8 @@ public class HBaseUtils {
//获取连接
getConnection();
//拉取所有
- getAll();
+ RadiusRelation.getAllRadiusRelation(connection, radiusMap);
+ GtpCRelation.getAllGtpCRelation(connection, gtpcMap);
//定时更新
updateCache();
@@ -50,13 +51,13 @@ public class HBaseUtils {
private static void getConnection() {
try {
- // 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
- configuration.set("hbase.client.retries.number", "3");
- configuration.set("hbase.bulkload.retries.number", "3");
- configuration.set("zookeeper.recovery.retry", "3");
+ configuration.set("hbase.client.retries.number", "1");
+ configuration.set("hbase.client.pause", "50");
+ configuration.set("hbase.rpc.timeout", "3000");
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("zookeeper.recovery.retry.intervalmill", "200");
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
logger.warn("HBaseUtils get HBase connection,now to getAll().");
@@ -75,97 +76,15 @@ public class HBaseUtils {
getInstance();
}
long nowTime = System.currentTimeMillis();
- timestampsFilter(time - 1000, nowTime + 500);
- }
-
-
- /**
- * 获取变更内容
- *
- * @param startTime 开始时间
- * @param endTime 结束时间
- */
- private static void timestampsFilter(Long startTime, Long endTime) {
- Long begin = System.currentTimeMillis();
- Table table = null;
- ResultScanner scanner = null;
- Scan scan2 = new Scan();
- try {
- table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
- scan2.setTimeRange(startTime, endTime);
- scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- int acctStatusType = getAcctStatusType(result);
- String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
- String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
- if (acctStatusType == 1) {
- if (subIdMap.containsKey(framedIp)) {
- boolean same = account.equals(subIdMap.get(framedIp));
- if (!same) {
- subIdMap.put(framedIp, account);
- }
- } else {
- subIdMap.put(framedIp, account);
- }
- } else if (acctStatusType == 2) {
- subIdMap.remove(framedIp);
- }
- }
- Long end = System.currentTimeMillis();
- logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
- logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime);
- time = endTime;
- } catch (IOException ioe) {
- logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
- } catch (RuntimeException e) {
- logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- logger.error("HBase Table Close ERROR! Exception message is:" + e);
- }
- }
- }
- }
-
- /**
- * 获取所有的 key value
- */
- private static void getAll() {
- long begin = System.currentTimeMillis();
- try {
- Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
- Scan scan2 = new Scan();
- ResultScanner scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- int acctStatusType = getAcctStatusType(result);
- String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
- String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
- if (acctStatusType == 1) {
- subIdMap.put(framedIp, account);
- }
- }
- logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
- logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
- scanner.close();
- } catch (IOException ioe) {
- logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
- } catch (RuntimeException e) {
- logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
- }
+ RadiusRelation.upgradeRadiusRelation(connection, radiusMap, time - 1000, nowTime + 500);
+ GtpCRelation.upgradeGtpCRelation(connection, gtpcMap, time - 1000, nowTime + 500);
+ time = nowTime;
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateCache() {
-// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
-// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -193,24 +112,25 @@ public class HBaseUtils {
if (hBaseUtils == null) {
getInstance();
}
- return subIdMap.get(clientIp);
+ return radiusMap.get(clientIp);
}
return "";
}
- /**
- * 获取当前用户上下线状态信息
- *
- * @param result HBase内获取的数据
- * @return 状态 1-上线 2-下线
- */
- private static int getAcctStatusType(Result result) {
- boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
- if (hasType) {
- return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
- } else {
- return 1;
- }
- }
+ /**
+ * 获取 account
+ *
+ * @param teid 上行TEID
+ * @return account
+ */
+ public static String getGtpData(Long teid) {
+ if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
+ if (hBaseUtils == null) {
+ getInstance();
+ }
+ return gtpcMap.get(teid);
+ }
+ return null;
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java b/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java
new file mode 100644
index 0000000..f5b17de
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java
@@ -0,0 +1,124 @@
+package com.zdjizhi.utils.hbase;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.hbase
+ * @Description:
+ * @date 2022/7/1510:12
+ */
+class RadiusRelation {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 获取全量的Radius数据
+ */
+ static void getAllRadiusRelation(Connection connection, Map radiusMap) {
+ long begin = System.currentTimeMillis();
+ ResultScanner scanner = null;
+ try {
+ Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME));
+ Scan scan = new Scan();
+ if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) {
+ scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS);
+ }
+ scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ int acctStatusType = RadiusRelation.getAcctStatusType(result);
+ String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
+ String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
+ if (acctStatusType == 1) {
+ radiusMap.put(framedIp, account);
+ }
+ }
+ logger.warn("The obtain the number of RADIUS relationships : " + radiusMap.size());
+ logger.warn("The time spent to obtain radius relationships : " + (System.currentTimeMillis() - begin) + "ms");
+ } catch (IOException | RuntimeException e) {
+ logger.error("The relationship between framedIP and account obtained from HBase is abnormal! message is :" + e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ }
+
+ /**
+ * 增量更新Radius关系
+ *
+ * @param connection HBase连接
+ * @param radiusMap radius关系缓存
+ * @param startTime 开始时间
+ * @param endTime 结束时间
+ */
+ static void upgradeRadiusRelation(Connection connection, Map radiusMap, Long startTime, Long endTime) {
+ Long begin = System.currentTimeMillis();
+ Table table = null;
+ ResultScanner scanner = null;
+ Scan scan = new Scan();
+ try {
+ table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME));
+ scan.setTimeRange(startTime, endTime);
+ if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) {
+ scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS);
+ }
+ scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ int acctStatusType = RadiusRelation.getAcctStatusType(result);
+ String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
+ String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
+ if (acctStatusType == 1) {
+ if (radiusMap.containsKey(framedIp)) {
+ boolean same = account.equals(radiusMap.get(framedIp));
+ if (!same) {
+ radiusMap.put(framedIp, account);
+ }
+ } else {
+ radiusMap.put(framedIp, account);
+ }
+ } else if (acctStatusType == 2) {
+ radiusMap.remove(framedIp);
+ }
+ }
+ Long end = System.currentTimeMillis();
+ logger.warn("The current number of Radius relationships is:: " + radiusMap.keySet().size());
+ logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms");
+ } catch (IOException | RuntimeException e) {
+ logger.error("Radius relationship update exception, the content is:" + e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ logger.error("HBase Table Close ERROR! Exception message is:" + e);
+ }
+ }
+ }
+ }
+
+ /**
+ * 获取当前用户上下线状态信息
+ *
+ * @param result HBase内获取的数据
+ * @return 状态 1-上线 2-下线
+ */
+ private static int getAcctStatusType(Result result) {
+ boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
+ if (hasType) {
+ return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
+ } else {
+ return 1;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index ddb29ed..0ee7d3b 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -1,10 +1,9 @@
package com.zdjizhi.utils.json;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
@@ -210,8 +209,8 @@ public class JsonParseUtil {
HashMap map = new HashMap<>(16);
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(schema);
- JSONArray fields = (JSONArray) schemaJson.get("fields");
+ JSONObject schemaJson = new JSONObject(schema, false, true);
+ JSONArray fields = schemaJson.getJSONArray("fields");
for (Object field : fields) {
String filedStr = field.toString();
@@ -238,8 +237,9 @@ public class JsonParseUtil {
*/
private static boolean checkKeepField(String message) {
boolean isKeepField = true;
- boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
- if (isHiveDoc) {
+ JSONObject fieldJson = new JSONObject(message, false, true);
+ boolean hasDoc = fieldJson.containsKey("doc");
+ if (hasDoc) {
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
if (isHiveVi) {
String visibility = JsonPath.read(message, "$.doc.visibility").toString();
@@ -271,58 +271,40 @@ public class JsonParseUtil {
private static ArrayList getJobListFromHttp(String schema) {
ArrayList list = new ArrayList<>();
- //获取fields,并转化为数组,数组的每个元素都是一个name doc type
- JSONObject schemaJson = JSON.parseObject(schema);
- JSONArray fields = (JSONArray) schemaJson.get("fields");
-
+ JSONObject schemaJson = new JSONObject(schema, false, true);
+ JSONArray fields = schemaJson.getJSONArray("fields");
for (Object field : fields) {
+ JSONObject fieldJson = new JSONObject(field, false, true);
+ boolean hasDoc = fieldJson.containsKey("doc");
+ if (hasDoc) {
+ JSONObject docJson = fieldJson.getJSONObject("doc");
+ boolean hasFormat = docJson.containsKey("format");
+ if (hasFormat) {
+ String name = fieldJson.getStr("name");
+ JSONArray formatList = docJson.getJSONArray("format");
+ for (Object format : formatList) {
+ JSONObject formatJson = new JSONObject(format, false, true);
+ String function = formatJson.getStr("function");
+ String appendTo;
+ String params = null;
- if (JSON.parseObject(field.toString()).containsKey("doc")) {
- Object doc = JSON.parseObject(field.toString()).get("doc");
-
- if (JSON.parseObject(doc.toString()).containsKey("format")) {
- String name = JSON.parseObject(field.toString()).get("name").toString();
- Object format = JSON.parseObject(doc.toString()).get("format");
- JSONObject formatObject = JSON.parseObject(format.toString());
-
- String functions = formatObject.get("functions").toString();
- String appendTo = null;
- String params = null;
-
- if (formatObject.containsKey("appendTo")) {
- appendTo = formatObject.get("appendTo").toString();
- }
-
- if (formatObject.containsKey("param")) {
- params = formatObject.get("param").toString();
- }
-
-
- if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
- String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
-
- for (int i = 0; i < functionArray.length; i++) {
- list.add(new String[]{name, appendToArray[i], functionArray[i], null});
+ if (formatJson.containsKey("appendTo")) {
+ appendTo = formatJson.getStr("appendTo");
+ } else {
+ appendTo = name;
}
- } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
- String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
- String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
-
- for (int i = 0; i < functionArray.length; i++) {
- list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
-
+ if (formatJson.containsKey("param")) {
+ params = formatJson.getStr("param");
}
- } else {
- list.add(new String[]{name, name, functions, params});
- }
+ list.add(new String[]{name, appendTo, function, params});
+
+ }
}
}
-
}
+
return list;
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java
new file mode 100644
index 0000000..130cd90
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java
@@ -0,0 +1,91 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.utils.StringUtil;
+
+import java.util.ArrayList;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.json
+ * @Description:
+ * @date 2022/7/1817:19
+ */
+public class JsonPathUtil {
+ private static final Log logger = LogFactory.get();
+
+
+ /**
+ * 通过 josnPath 解析,返回String类型数据
+ *
+ * @param message json数据
+ * @param expr 解析表达式
+ * @return 返回值
+ */
+ public static String getStringValue(String message, String expr) {
+ String result = null;
+ try {
+ if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
+ ArrayList read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ result = read.get(0);
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("JSONPath parsing json returns String data exception" + e);
+ }
+
+ return result;
+ }
+
+
+ /**
+ * 通过 josnPath 解析,返回Long类型数据
+ *
+ * @param message json数据
+ * @param expr 解析表达式
+ * @return 返回值
+ */
+ public static Integer getIntegerValue(String message, String expr) {
+ Integer result = null;
+ try {
+ if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
+ ArrayList read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ result = read.get(0);
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("JSONPath parsing json returns Long data exception" + e);
+ }
+
+ return result;
+ }
+
+ /**
+ * 通过 josnPath 解析,返回Long类型数据
+ *
+ * @param message json数据
+ * @param expr 解析表达式
+ * @return 返回值
+ */
+ public static Long getLongValue(String message, String expr) {
+ Long result = null;
+ try {
+ if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
+ System.out.println(message);
+ ArrayList read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ result = read.get(0);
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("JSONPath parsing json returns Long data exception" + e);
+ }
+
+ return result;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
index d793628..3b40482 100644
--- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
+++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
@@ -19,15 +19,15 @@ import java.util.Properties;
public final class FlowWriteConfigurations {
- private static Properties propKafka = new Properties();
+ private static Properties propDefault = new Properties();
private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
- return propService.getProperty(key);
+ return propService.getProperty(key).trim();
} else if (type == 1) {
- return propKafka.getProperty(key);
+ return propDefault.getProperty(key).trim();
} else {
return null;
}
@@ -35,9 +35,9 @@ public final class FlowWriteConfigurations {
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
- return Integer.parseInt(propService.getProperty(key));
+ return Integer.parseInt(propService.getProperty(key).trim());
} else if (type == 1) {
- return Integer.parseInt(propKafka.getProperty(key));
+ return Integer.parseInt(propDefault.getProperty(key).trim());
} else {
return null;
}
@@ -45,9 +45,9 @@ public final class FlowWriteConfigurations {
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
- return Long.parseLong(propService.getProperty(key));
+ return Long.parseLong(propService.getProperty(key).trim());
} else if (type == 1) {
- return Long.parseLong(propKafka.getProperty(key));
+ return Long.parseLong(propDefault.getProperty(key).trim());
} else {
return null;
}
@@ -57,7 +57,7 @@ public final class FlowWriteConfigurations {
if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) {
- return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else {
return null;
}
@@ -66,9 +66,9 @@ public final class FlowWriteConfigurations {
static {
try {
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ propDefault.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) {
- propKafka = null;
+ propDefault = null;
propService = null;
}
}