1:增加GTPC补全功能。

2:修改HBase连接参数,增加Radius和GTPC获取数据大小限制。
3:删除废弃函数代码。
4:优化部分函数代码。
This commit is contained in:
qidaijie
2022-08-04 10:16:08 +08:00
parent 2e8c8a3dbd
commit c0707a79c3
16 changed files with 648 additions and 685 deletions

View File

@@ -1,14 +1,15 @@
# log-stream-completion-schema # log-stream-completion-schema
基于查询网关的动态日志预处理程序,接收原始日志根据对应schema定义进行数据清洗 基于Nacos的动态日志预处理程序,接收原始日志根据对应Schema定义进行数据清洗并将结果回写Kafka。
并将结果回写Kafka 当Nacos上Schame变更后可动态获取到最新版本的信息无需重启任务
## 函数功能列表 ## 函数功能列表
* current_timestamp * current_timestamp
> 获取当前时间戳,若追加字段已有时间戳,不予以覆盖 > 获取当前时间戳,若追加字段已有时间戳,不予以覆盖
* snowflake_id * snowflake_id
> 雪花ID函数返回一个一定条件内不重复的 long 类型数值 > 雪花ID函数返回一个一定条件内不重复的 long 类型数值
> https://git.mesalab.cn/bigdata/algorithm/snowflake
* geo_ip_detail * geo_ip_detail
> IP定位库获取对应IP的详细地理位置信息城市,州/省,国家 > IP定位库获取对应IP的详细地理位置信息城市,州/省,国家
* geo_asn * geo_asn
@@ -16,19 +17,21 @@
* geo_ip_country * geo_ip_country
> IP定位库获取对应IP的地理位置信息仅包含 国家 > IP定位库获取对应IP的地理位置信息仅包含 国家
* set_value * set_value
> 给予字段固定值 > 给予字段固定值
* get_value * get_value
> 获取字段值并追加到新的字段 > 获取字段值并追加到新的字段
* if * if
> IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 > IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
* sub_domain * sub_domain
> 获取顶级域名 > 获取顶级域名
* radius_match * radius_match
> 根据IP解析对应raidus用户借助于HBase存储数据。 > 根据IP获取对应的Raidus用户信息。
> 实际数据存储在HBase中依赖RADIUS-RELATIONSHIP-HBASE-V2程序使用时加载到内存中加速查询。
* app_match * app_match
> 根据APP_ID获取对应的APP名称 > 根据APP_ID获取对应的APP名称
* decode_of_base64 * decode_of_base64
> 根据编码解码base64若编码字段为空则根据默认编码解析(UTF-8) > 根据编码解码base64若编码字段为空则根据默认编码解析(UTF-8)
* flattenSpec * flattenSpec
> 根据表达式解析json > 根据表达式解析json,使用jsonPath工具类
> https://github.com/json-path/JsonPath

15
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId> <artifactId>log-completion-schema</artifactId>
<version>220318-Nacos</version> <version>220727-GTPC</version>
<name>log-completion-schema</name> <name>log-completion-schema</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>
@@ -297,6 +297,19 @@
<version>1.9.3</version> <version>1.9.3</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -24,42 +24,46 @@ batch.size=262144
#128M #128M
buffer.memory=134217728 buffer.memory=134217728
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 #这个参数决定了每次发送给Kafka服务器请求的最大大小
#10M #default: 10485760 = 10M
max.request.size=10485760 max.request.size=10485760
#====================kafka default====================# #====================kafka default====================#
#kafka SASL验证用户名-加密 #kafka SASL/SSL username (encryption)
kafka.user=nsyGpHKGFA4KW0zro9MDdw== kafka.user=nsyGpHKGFA4KW0zro9MDdw==
#kafka SASLSSL验证密码-加密 #kafka SASL/SSL pin (encryption)
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#生产者ack #producer ack
producer.ack=1 producer.ack=1
#====================nacos default====================# #====================nacos default====================#
#nacos username #nacos username (encryption)
nacos.username=nacos nacos.username=kANxu/Zi5rBnZVxa5zAjrQ==
#nacos password #nacos pin (encryption)
nacos.pin=nacos nacos.pin=YPIBDIXjJUtVBjjk2op0Dg==
#nacos group #nacos group
nacos.group=Galaxy nacos.group=Galaxy
#====================Topology Default====================# #====================Topology Default====================#
#hbase table name #hbase radius relation table name
hbase.table.name=tsg_galaxy:relation_framedip_account hbase.radius.table.name=tsg_galaxy:relation_framedip_account
#邮件默认编码 #hbase gtpc relation table name
hbase.gtpc.table.name=tsg_galaxy:relation_user_teid
#mail base64 use default charset
mail.default.charset=UTF-8 mail.default.charset=UTF-8
#0不做任何校验1弱类型校验 #0 no-operation parse JSON directly.
#1 Check fields type with schema,Do some type conversion.
log.transform.type=1 log.transform.type=1
#两个输出之间的最大时间(单位milliseconds) #Maximum time between two outputs(milliseconds)
buffer.timeout=5000 buffer.timeout=5000
#====================临时配置-待删除====================#
#网关APP_ID 获取接口
app.id.http=http://192.168.44.20:9999/open-api/appDicList
#app_id 更新时间如填写0则不更新缓存 #The gtpc data scan max rows,0 = no limit.
app.tick.tuple.freq.secs=0 hbase.gtpc.scan.max.rows=0
#The radius data scan max rows,0 = no limit.
hbase.radius.scan.max.rows=0

View File

@@ -1,32 +1,31 @@
#--------------------------------地址配置------------------------------# #--------------------------------地址配置------------------------------#
#管理kafka地址 #管理kafka地址
source.kafka.servers=192.168.44.11:9094 source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址 #管理输出kafka地址
sink.kafka.servers=192.168.44.11:9094 sink.kafka.servers=192.168.44.12:9094
#zookeeper 地址 用于配置log_id #zookeeper 地址 用于配置log_id
zookeeper.servers=192.168.44.11:2181 zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase #hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.11:2181 hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------# #--------------------------------HTTP/定位库------------------------------#
#定位库地址 #定位库地址
tools.library=D:\\workerspace\\dat\\ tools.library=/opt/dat/
#--------------------------------nacos配置------------------------------# #--------------------------------nacos配置------------------------------#
#nacos 地址 #nacos 地址
nacos.server=192.168.40.43:8848 nacos.server=192.168.44.12:8848
#nacos namespace #nacos namespace
nacos.schema.namespace=test nacos.schema.namespace=prod
#nacos data id #nacos data id
nacos.data.id=session_record.json nacos.data.id=gtpc_record.json
#--------------------------------Kafka消费/生产配置------------------------------#
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic #kafka 接收数据topic
source.kafka.topic=test source.kafka.topic=test
@@ -34,10 +33,9 @@ source.kafka.topic=test
sink.kafka.topic=test-result sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据 #读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=flinktest-1 group.id=gtpc-record-log-2022-1
#--------------------------------topology配置------------------------------# #--------------------------------topology配置------------------------------#
#consumer 并行度 #consumer 并行度
source.parallelism=1 source.parallelism=1
@@ -47,16 +45,15 @@ transform.parallelism=1
#kafka producer 并行度 #kafka producer 并行度
sink.parallelism=1 sink.parallelism=1
#数据中心,取值范围(0-31) #数据中心,取值范围(0-63)
data.center.id.num=0 data.center.id.num=2
#hbase 更新时间如填写0则不更新缓存 #hbase 更新时间如填写0则不更新缓存
hbase.tick.tuple.freq.secs=180 hbase.tick.tuple.freq.secs=180
#--------------------------------默认值配置------------------------------# #--------------------------------默认值配置------------------------------#
#0不需要补全原样输出日志1需要补全 #0不需要补全原样输出日志1需要补全
log.need.complete=1 log.need.complete=1
#生产者压缩模式 none or snappy #生产者压缩模式 none or snappy
producer.kafka.compression.type=none producer.kafka.compression.type=none

View File

@@ -42,11 +42,10 @@ public class FlowWriteConfig {
*/ */
public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server"); public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace"); public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id"); public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group"); public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username"); public static final String NACOS_USERNAME = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.username"));
public static final String NACOS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.pin"));
/** /**
* System config * System config
@@ -64,7 +63,10 @@ public class FlowWriteConfig {
* HBase * HBase
*/ */
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); public static final Integer HBASE_GTPC_SCAN_MAX_ROWS = FlowWriteConfigurations.getIntProperty(1, "hbase.gtpc.scan.max.rows");
public static final Integer HBASE_RADIUS_SCAN_MAX_ROWS = FlowWriteConfigurations.getIntProperty(1, "hbase.radius.scan.max.rows");
public static final String HBASE_RADIUS_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.radius.table.name");
public static final String HBASE_GTPC_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.gtpc.table.name");
/** /**
* kafka common * kafka common
@@ -98,19 +100,13 @@ public class FlowWriteConfig {
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
* http
*/
public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
/** /**
* common config * common config
*/ */
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers"); public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers"); public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers"); public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library"); public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers"); public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
} }

View File

@@ -1,124 +0,0 @@
package com.zdjizhi.utils.app;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.http.HttpClientUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* AppId 工具类
*
* @author qidaijie
*/
@Deprecated
public class AppUtils {
private static final Log logger = LogFactory.get();
private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
private static AppUtils appUtils;
private static void getAppInstance() {
appUtils = new AppUtils();
}
/**
* 构造函数-新
*/
private AppUtils() {
//定时更新
updateAppIdCache();
}
/**
* 更新变量
*/
private static void change() {
if (appUtils == null) {
getAppInstance();
}
timestampsFilter();
}
/**
* 获取变更内容
*/
private static void timestampsFilter() {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
if (StringUtil.isNotBlank(schema)) {
String data = JSONObject.parseObject(schema).getString("data");
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
int key = jsonArray.getInteger(0);
String value = jsonArray.getString(1);
if (appIdMap.containsKey(key)) {
if (!value.equals(appIdMap.get(key))) {
appIdMap.put(key, value);
}
} else {
appIdMap.put(key, value);
}
}
logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
}
} catch (RuntimeException e) {
logger.error("Update cache app-id failed, exception" + e);
}
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateAppIdCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
change();
}
} catch (RuntimeException e) {
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
}
}
}, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
}
/**
* 获取 appName
*
* @param appId app_id
* @return account
*/
public static String getAppName(int appId) {
if (appUtils == null) {
getAppInstance();
}
if (appIdMap.containsKey(appId)) {
return appIdMap.get(appId);
} else {
logger.warn("AppMap get appName is null, ID is :" + appId);
return "";
}
}
}

View File

@@ -1,180 +0,0 @@
package com.zdjizhi.utils.general;
/**
* CityHash64算法对logid进行散列计算
* 版本规划暂不实现-TSG22.01
*
* @author qidaijie
*/
@Deprecated
public class CityHash {
private static final long k0 = 0xc3a5c85c97cb3127L;
private static final long k1 = 0xb492b66fbe98f273L;
private static final long k2 = 0x9ae16a3b2f90404fL;
private static final long k3 = 0xc949d7c7509e6557L;
private static final long k5 = 0x9ddfea08eb382d69L;
private CityHash() {}
public static long CityHash64(byte[] s, int index, int len) {
if (len <= 16 ) {
return HashLen0to16(s, index, len);
} else if (len > 16 && len <= 32) {
return HashLen17to32(s, index, len);
} else if (len > 32 && len <= 64) {
return HashLen33to64(s, index, len);
} else {
long x = Fetch64(s, index);
long y = Fetch64(s, index + len - 16) ^ k1;
long z = Fetch64(s, index + len - 56) ^ k0;
long[] v = WeakHashLen32WithSeeds(s, len - 64, len, y);
long[] w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0);
z += ShiftMix(v[1]) * k1;
x = Rotate(z + x, 39) * k1;
y = Rotate(y, 33) * k1;
len = (len - 1) & ~63;
do {
x = Rotate(x + y + v[0] + Fetch64(s, index + 16), 37) * k1;
y = Rotate(y + v[1] + Fetch64(s, index + 48), 42) * k1;
x ^= w[1];
y ^= v[0];
z = Rotate(z ^ w[0], 33);
v = WeakHashLen32WithSeeds(s, index, v[1] * k1, x + w[0]);
w = WeakHashLen32WithSeeds(s, index + 32, z + w[1], y);
long t = z;
z = x;
x = t;
index += 64;
len -= 64;
} while (len != 0);
return HashLen16(HashLen16(v[0], w[0]) + ShiftMix(y) * k1 + z,
HashLen16(v[1], w[1]) + x);
}
}
private static long HashLen0to16(byte[] s, int index, int len) {
if (len > 8) {
long a = Fetch64(s, index);
long b = Fetch64(s, index + len - 8);
return HashLen16(a, RotateByAtLeastOne(b + len, len)) ^ b;
}
if (len >= 4) {
long a = Fetch32(s, index);
return HashLen16(len + (a << 3), Fetch32(s, index + len - 4));
}
if (len > 0) {
byte a = s[index];
byte b = s[index + len >>> 1];
byte c = s[index + len - 1];
int y = (a) + (b << 8);
int z = len + (c << 2);
return ShiftMix(y * k2 ^ z * k3) * k2;
}
return k2;
}
private static long HashLen17to32(byte[] s, int index, int len) {
long a = Fetch64(s, index) * k1;
long b = Fetch64(s, index + 8);
long c = Fetch64(s, index + len - 8) * k2;
long d = Fetch64(s, index + len - 16) * k0;
return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
a + Rotate(b ^ k3, 20) - c + len);
}
private static long HashLen33to64(byte[] s, int index, int len) {
long z = Fetch64(s, index + 24);
long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0;
long b = Rotate(a + z, 52);
long c = Rotate(a, 37);
a += Fetch64(s, index + 8);
c += Rotate(a, 7);
a += Fetch64(s, index + 16);
long vf = a + z;
long vs = b + Rotate(a, 31) + c;
a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32);
z = Fetch64(s, index + len - 8);
b = Rotate(a + z, 52);
c = Rotate(a, 37);
a += Fetch64(s, index + len - 24);
c += Rotate(a, 7);
a += Fetch64(s, index + len - 16);
long wf = a + z;
long ws = b + Rotate(a, 31) + c;
long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
return ShiftMix(r * k0 + vs) * k2;
}
private static long Fetch64(byte[] p, int index) {
return toLongLE(p,index);
}
private static long Fetch32(byte[] p, int index) {
return toIntLE(p,index);
}
private static long[] WeakHashLen32WithSeeds(
long w, long x, long y, long z, long a, long b) {
a += w;
b = Rotate(b + a + z, 21);
long c = a;
a += x;
a += y;
b += Rotate(a, 44);
return new long[]{a + z, b + c};
}
private static long[] WeakHashLen32WithSeeds(byte[] s, int index, long a, long b) {
return WeakHashLen32WithSeeds(Fetch64(s, index),
Fetch64(s, index + 8),
Fetch64(s, index + 16),
Fetch64(s, index + 24),
a,
b);
}
private static long toLongLE(byte[] b, int i) {
return 0xffffffffffffffffL & (((long) b[i + 7] << 56) + ((long) (b[i + 6] & 255) << 48) + ((long) (b[i + 5] & 255) << 40) + ((long) (b[i + 4] & 255) << 32) + ((long) (b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
}
private static long toIntLE(byte[] b, int i) {
return 0xffffffffL & (((b[i + 3] & 255) << 24) + ((b[i + 2] & 255) << 16) + ((b[i + 1] & 255) << 8) + ((b[i + 0] & 255) << 0));
}
private static long RotateByAtLeastOne(long val, int shift) {
return (val >>> shift) | (val << (64 - shift));
}
private static long ShiftMix(long val) {
return val ^ (val >>> 47);
}
private static long Uint128Low64(long[] x) {
return x[0];
}
private static long Rotate(long val, int shift) {
return shift == 0 ? val : (val >>> shift) | (val << (64 - shift));
}
private static long Uint128High64(long[] x) {
return x[1];
}
private static long Hash128to64(long[] x) {
long a = (Uint128Low64(x) ^ Uint128High64(x)) * k5;
a ^= (a >>> 47);
long b = (Uint128High64(x) ^ a) * k5;
b ^= (b >>> 47);
b *= k5;
return b;
}
private static long HashLen16(long u, long v) {
return Hash128to64(new long[]{u,v});
}
}

View File

@@ -28,17 +28,19 @@ public class TransFormMap {
try { try {
JsonParseUtil.dropJsonField(jsonMap); JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : JsonParseUtil.getJobList()) { for (String[] strings : JsonParseUtil.getJobList()) {
//用到的参数的值 //该日志字段的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段key //结果值映射到的日志字段key
String appendToKeyName = strings[1]; String appendToKey = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段 //匹配操作函数的字段
String function = strings[2]; String function = strings[2];
//额外的参数的值 //额外的参数的值
String param = strings[3]; String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
//结果值映射到的日志字段原始value
Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param);
} }
return JsonMapper.toJsonString(jsonMap); return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) { } catch (RuntimeException e) {
@@ -51,76 +53,76 @@ public class TransFormMap {
/** /**
* 根据schema描述对应字段进行操作的 函数集合 * 根据schema描述对应字段进行操作的 函数集合
* *
* @param function 匹配操作函数的字段 * @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map * @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key * @param appendToKey 需要补全的字段的key
* @param appendTo 需要补全的字段的值 * @param appendToValue 需要补全的字段的值
* @param logValue 用到的参数的值 * @param logValue 用到的参数的值
* @param param 额外的参数的值 * @param param 额外的参数的值
*/ */
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) { private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) {
switch (function) { switch (function) {
case "current_timestamp": case "current_timestamp":
if (!(appendTo instanceof Long)) { if (!(appendToValue instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getCurrentTime());
} }
break; break;
case "snowflake_id": case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); JsonParseUtil.setValue(jsonMap, appendToKey, SnowflakeId.generateId());
break; break;
case "geo_ip_detail": case "geo_ip_detail":
if (logValue != null && appendTo == null) { if (logValue != null && appendToValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString()));
} }
break; break;
case "geo_asn": case "geo_asn":
if (logValue != null && appendTo == null) { if (logValue != null && appendToValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString()));
} }
break; break;
case "geo_ip_country": case "geo_ip_country":
if (logValue != null && appendTo == null) { if (logValue != null && appendToValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendTo == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "radius_match":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
} }
break; break;
case "flattenSpec": case "flattenSpec":
if (logValue != null && param != null) { if (logValue != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.flattenSpec(logValue.toString(), param));
} }
break; break;
case "app_match": case "if":
if (logValue != null && appendTo == null) { if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.condition(jsonMap, param));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "sub_domain":
if (appendToValue == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "radius_match":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "gtpc_match":
if (logValue != null) {
TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey, param);
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, logValue);
} }
break; break;
default: default:

View File

@@ -28,21 +28,24 @@ public class TransFormTypeMap {
try { try {
Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message); Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message);
for (String[] strings : JsonParseUtil.getJobList()) { for (String[] strings : JsonParseUtil.getJobList()) {
//用到的参数的值 //该日志字段的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段key //结果值映射到的日志字段key
String appendToKeyName = strings[1]; String appendToKey = strings[1];
//需要补全的字段的值
Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段 //匹配操作函数的字段
String function = strings[2]; String function = strings[2];
//额外的参数的值 //额外的参数的值
String param = strings[3]; String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
//结果值映射到的日志字段原始value
Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param);
} }
return JsonMapper.toJsonString(jsonMap); return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e); logger.error("TransForm logs failed,The exception is :" + e);
e.printStackTrace();
return null; return null;
} }
} }
@@ -51,78 +54,76 @@ public class TransFormTypeMap {
/** /**
* 根据schema描述对应字段进行操作的 函数集合 * 根据schema描述对应字段进行操作的 函数集合
* *
* @param function 匹配操作函数的字段 * @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map * @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key * @param appendToKey 需要补全的字段的key
* @param appendToKeyValue 需要补全的字段的值 * @param appendToValue 需要补全的字段的值
* @param logValue 用到的参数的值 * @param logValue 用到的参数的值
* @param param 额外的参数的值 * @param param 额外的参数的值
*/ */
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) { private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) {
switch (function) { switch (function) {
case "current_timestamp": case "current_timestamp":
if (!(appendToKeyValue instanceof Long)) { if (!(appendToValue instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getCurrentTime());
} }
break; break;
case "snowflake_id": case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); JsonParseUtil.setValue(jsonMap, appendToKey, SnowflakeId.generateId());
//版本规划暂不实现TSG-22.01
// JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getDecimalHash(SnowflakeId.generateId()));
break; break;
case "geo_ip_detail": case "geo_ip_detail":
if (logValue != null && appendToKeyValue == null) { if (logValue != null && appendToValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString()));
} }
break; break;
case "geo_asn": case "geo_asn":
if (logValue != null && appendToKeyValue == null) { if (logValue != null && appendToValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString()));
} }
break; break;
case "geo_ip_country": case "geo_ip_country":
if (logValue != null && appendToKeyValue == null) { if (logValue != null && appendToValue == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString()));
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendToKeyValue == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "radius_match":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
} }
break; break;
case "flattenSpec": case "flattenSpec":
if (logValue != null && param != null) { if (logValue != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.flattenSpec(logValue.toString(), param));
} }
break; break;
case "app_match": case "if":
if (logValue != null && appendToKeyValue == null) { if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.condition(jsonMap, param));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "sub_domain":
if (appendToValue == null && logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getTopDomain(logValue.toString()));
}
break;
case "radius_match":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "gtpc_match":
if (logValue != null) {
TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey,param);
}
break;
case "set_value":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, param);
}
break;
case "get_value":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKey, logValue);
} }
break; break;
default: default:

View File

@@ -1,7 +1,7 @@
package com.zdjizhi.utils.general; package com.zdjizhi.utils.general;
import cn.hutool.core.codec.Base64; import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.InvalidPathException;
@@ -10,15 +10,12 @@ import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookupV2; import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.app.AppUtils;
import com.zdjizhi.utils.hbase.HBaseUtils; import com.zdjizhi.utils.hbase.HBaseUtils;
import com.zdjizhi.utils.json.JsonParseUtil; import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.utils.json.TypeUtils; import com.zdjizhi.utils.json.JsonPathUtil;
import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
@@ -52,21 +49,6 @@ class TransFunction {
return System.currentTimeMillis() / 1000; return System.currentTimeMillis() / 1000;
} }
/**
* CityHash64算法
* 版本规划暂不实现-TSG22.01
*
* @param data 原始数据
* @return 散列结果
*/
@Deprecated
static BigInteger getDecimalHash(long data) {
byte[] dataBytes = String.valueOf(data).getBytes();
long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
String decimalValue = Long.toUnsignedString(hashValue, 10);
return new BigInteger(decimalValue);
}
/** /**
* 根据clientIp获取location信息 * 根据clientIp获取location信息
* *
@@ -74,15 +56,15 @@ class TransFunction {
* @return ip地址详细信息 * @return ip地址详细信息
*/ */
static String getGeoIpDetail(String ip) { static String getGeoIpDetail(String ip) {
String detail = "";
try { try {
return ipLookup.cityLookupDetail(ip); detail = ipLookup.cityLookupDetail(ip);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe); logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Get clientIP location error! " + e); logger.error("Get clientIP location error! " + e);
return "";
} }
return detail;
} }
/** /**
@@ -92,15 +74,15 @@ class TransFunction {
* @return ASN * @return ASN
*/ */
static String getGeoAsn(String ip) { static String getGeoAsn(String ip) {
String asn = "";
try { try {
return ipLookup.asnLookup(ip); asn = ipLookup.asnLookup(ip);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe); logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Get IP ASN error! " + e); logger.error("Get IP ASN error! " + e);
return "";
} }
return asn;
} }
/** /**
@@ -110,15 +92,15 @@ class TransFunction {
* @return 国家 * @return 国家
*/ */
static String getGeoIpCountry(String ip) { static String getGeoIpCountry(String ip) {
String country = "";
try { try {
return ipLookup.countryLookup(ip); country = ipLookup.countryLookup(ip);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
logger.error("The MMDB file is not loaded or IP is null! " + npe); logger.error("The MMDB file is not loaded or IP is null! " + npe);
return "";
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Get ServerIP location error! " + e); logger.error("Get ServerIP location error! " + e);
return "";
} }
return country;
} }
@@ -133,19 +115,39 @@ class TransFunction {
} }
/** /**
* appId与缓存中对应关系补全appName * 借助HBase补齐GTP-C信息
* *
* @param appIds app id 列表 * @param jsonMap 原始日志json
* @return appName * @param logValue 上行TEID
* @param appendToKey 结果值映射到的日志字段key
* @param param 用于解析jsonarray直接定位到GTP信息所在的位置
*/ */
@Deprecated static void gtpcMatch(Map<String, Object> jsonMap, String logValue, String appendToKey, String param) {
static String appMatch(String appIds) {
try { try {
String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); Long teid = null;
return AppUtils.getAppName(Integer.parseInt(appId)); String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
} catch (NumberFormatException | ClassCastException exception) { for (String expr : exprs) {
logger.error("APP ID列表分割转换异常异常APP ID列表:" + appIds); Long value = JsonPathUtil.getLongValue(logValue, expr);
return ""; if (value != null) {
teid = value;
break;
}
}
if (teid != null) {
String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
String userData = HBaseUtils.getGtpData(teid);
if (userData != null) {
JSONObject schemaJson = new JSONObject(userData, false, true);
for (String key : appendToKeys) {
JsonParseUtil.setValue(jsonMap, key, schemaJson.getObj(key));
}
} else {
logger.warn("Description The user whose TEID is " + teid + " was not matched!");
}
}
} catch (RuntimeException re) {
logger.error("An exception occurred in teid type conversion or parsing of user information!" + re);
} }
} }

View File

@@ -0,0 +1,132 @@
package com.zdjizhi.utils.hbase;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.hbase
* @Description:
* @date 2022/7/1510:12
*/
class GtpCRelation {
private static final Log logger = LogFactory.get();
/**
* 获取全量的Radius数据
*/
static void getAllGtpCRelation(Connection connection, Map<Long, String> gtpcMap) {
long begin = System.currentTimeMillis();
ResultScanner scanner = null;
try {
Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
Scan scan = new Scan();
if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) {
scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS);
}
scanner = table.getScanner(scan);
for (Result result : scanner) {
int acctStatusType = GtpCRelation.getMsgType(result);
if (acctStatusType == 1) {
Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid")));
String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim();
String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim();
String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim();
JSONObject jsonObject = new JSONObject();
jsonObject.set("common_phone_number", phoneNumber);
jsonObject.set("common_imsi", imsi);
jsonObject.set("common_imei", imei);
gtpcMap.put(teid, jsonObject.toJSONString(0));
}
}
logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin) + "ms");
} catch (IOException | RuntimeException e) {
logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e);
} finally {
if (scanner != null) {
scanner.close();
}
}
}
/**
* 增量更新GTP-C关系
*
* @param connection HBase连接
* @param gtpcMap gtp-c关系缓存
* @param startTime 开始时间
* @param endTime 结束时间
*/
static void upgradeGtpCRelation(Connection connection, Map<Long, String> gtpcMap, Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
Scan scan = new Scan();
try {
table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
scan.setTimeRange(startTime, endTime);
if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) {
scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS);
}
scanner = table.getScanner(scan);
for (Result result : scanner) {
int acctStatusType = GtpCRelation.getMsgType(result);
Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid")));
if (acctStatusType == 1) {
String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim();
String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim();
String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim();
JSONObject jsonObject = new JSONObject();
jsonObject.set("common_phone_number", phoneNumber);
jsonObject.set("common_imsi", imsi);
jsonObject.set("common_imei", imei);
gtpcMap.put(teid, jsonObject.toJSONString(0));
} else if (acctStatusType == 2) {
gtpcMap.remove(teid);
}
}
Long end = System.currentTimeMillis();
logger.warn("The current number of GTPC relationships is: " + gtpcMap.keySet().size());
logger.warn("The time used to update the GTPC relationship is: " + (end - begin) + "ms");
} catch (IOException | RuntimeException e) {
logger.error("GTPC relationship update exception, the content is:" + e);
} finally {
if (scanner != null) {
scanner.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
logger.error("HBase Table Close ERROR! Exception message is:" + e);
}
}
}
}
/**
* 获取当前用户上下线状态信息
*
* @param result HBase内获取的数据
* @return 状态 1-上线 2-下线
*/
private static int getMsgType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"));
if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")));
} else {
return 1;
}
}
}

View File

@@ -5,11 +5,10 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@@ -24,7 +23,8 @@ import java.util.concurrent.TimeUnit;
public class HBaseUtils { public class HBaseUtils {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(16); private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16);
private static Map<Long, String> gtpcMap = new ConcurrentHashMap<>(16);
private static Connection connection; private static Connection connection;
private static Long time; private static Long time;
@@ -42,7 +42,8 @@ public class HBaseUtils {
//获取连接 //获取连接
getConnection(); getConnection();
//拉取所有 //拉取所有
getAll(); RadiusRelation.getAllRadiusRelation(connection, radiusMap);
GtpCRelation.getAllGtpCRelation(connection, gtpcMap);
//定时更新 //定时更新
updateCache(); updateCache();
@@ -50,13 +51,13 @@ public class HBaseUtils {
private static void getConnection() { private static void getConnection() {
try { try {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create(); Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3"); configuration.set("hbase.client.retries.number", "1");
configuration.set("hbase.bulkload.retries.number", "3"); configuration.set("hbase.client.pause", "50");
configuration.set("zookeeper.recovery.retry", "3"); configuration.set("hbase.rpc.timeout", "3000");
configuration.set("zookeeper.recovery.retry", "1");
configuration.set("zookeeper.recovery.retry.intervalmill", "200");
connection = ConnectionFactory.createConnection(configuration); connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis(); time = System.currentTimeMillis();
logger.warn("HBaseUtils get HBase connection,now to getAll()."); logger.warn("HBaseUtils get HBase connection,now to getAll().");
@@ -75,97 +76,15 @@ public class HBaseUtils {
getInstance(); getInstance();
} }
long nowTime = System.currentTimeMillis(); long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1000, nowTime + 500); RadiusRelation.upgradeRadiusRelation(connection, radiusMap, time - 1000, nowTime + 500);
} GtpCRelation.upgradeGtpCRelation(connection, gtpcMap, time - 1000, nowTime + 500);
time = nowTime;
/**
* 获取变更内容
*
* @param startTime 开始时间
* @param endTime 结束时间
*/
private static void timestampsFilter(Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
int acctStatusType = getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
if (acctStatusType == 1) {
if (subIdMap.containsKey(framedIp)) {
boolean same = account.equals(subIdMap.get(framedIp));
if (!same) {
subIdMap.put(framedIp, account);
}
} else {
subIdMap.put(framedIp, account);
}
} else if (acctStatusType == 2) {
subIdMap.remove(framedIp);
}
}
Long end = System.currentTimeMillis();
logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime);
time = endTime;
} catch (IOException ioe) {
logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
} finally {
if (scanner != null) {
scanner.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
logger.error("HBase Table Close ERROR! Exception message is:" + e);
}
}
}
}
/**
* 获取所有的 key value
*/
private static void getAll() {
long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
int acctStatusType = getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
if (acctStatusType == 1) {
subIdMap.put(framedIp, account);
}
}
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException ioe) {
logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
} catch (RuntimeException e) {
logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
}
} }
/** /**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/ */
private void updateCache() { private void updateCache() {
// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() { executorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
@@ -193,24 +112,25 @@ public class HBaseUtils {
if (hBaseUtils == null) { if (hBaseUtils == null) {
getInstance(); getInstance();
} }
return subIdMap.get(clientIp); return radiusMap.get(clientIp);
} }
return ""; return "";
} }
/**
* 获取当前用户上下线状态信息
*
* @param result HBase内获取的数据
* @return 状态 1-上线 2-下线
*/
private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
} else {
return 1;
}
}
/**
* 获取 account
*
* @param teid 上行TEID
* @return account
*/
public static String getGtpData(Long teid) {
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
if (hBaseUtils == null) {
getInstance();
}
return gtpcMap.get(teid);
}
return null;
}
} }

View File

@@ -0,0 +1,124 @@
package com.zdjizhi.utils.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.hbase
* @Description:
* @date 2022/7/1510:12
*/
class RadiusRelation {
private static final Log logger = LogFactory.get();
/**
* 获取全量的Radius数据
*/
static void getAllRadiusRelation(Connection connection, Map<String, String> radiusMap) {
long begin = System.currentTimeMillis();
ResultScanner scanner = null;
try {
Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME));
Scan scan = new Scan();
if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) {
scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS);
}
scanner = table.getScanner(scan);
for (Result result : scanner) {
int acctStatusType = RadiusRelation.getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
if (acctStatusType == 1) {
radiusMap.put(framedIp, account);
}
}
logger.warn("The obtain the number of RADIUS relationships : " + radiusMap.size());
logger.warn("The time spent to obtain radius relationships : " + (System.currentTimeMillis() - begin) + "ms");
} catch (IOException | RuntimeException e) {
logger.error("The relationship between framedIP and account obtained from HBase is abnormal! message is :" + e);
} finally {
if (scanner != null) {
scanner.close();
}
}
}
/**
* 增量更新Radius关系
*
* @param connection HBase连接
* @param radiusMap radius关系缓存
* @param startTime 开始时间
* @param endTime 结束时间
*/
static void upgradeRadiusRelation(Connection connection, Map<String, String> radiusMap, Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
Scan scan = new Scan();
try {
table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME));
scan.setTimeRange(startTime, endTime);
if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) {
scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS);
}
scanner = table.getScanner(scan);
for (Result result : scanner) {
int acctStatusType = RadiusRelation.getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
if (acctStatusType == 1) {
if (radiusMap.containsKey(framedIp)) {
boolean same = account.equals(radiusMap.get(framedIp));
if (!same) {
radiusMap.put(framedIp, account);
}
} else {
radiusMap.put(framedIp, account);
}
} else if (acctStatusType == 2) {
radiusMap.remove(framedIp);
}
}
Long end = System.currentTimeMillis();
logger.warn("The current number of Radius relationships is:: " + radiusMap.keySet().size());
logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms");
} catch (IOException | RuntimeException e) {
logger.error("Radius relationship update exception, the content is:" + e);
} finally {
if (scanner != null) {
scanner.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
logger.error("HBase Table Close ERROR! Exception message is:" + e);
}
}
}
}
/**
* 获取当前用户上下线状态信息
*
* @param result HBase内获取的数据
* @return 状态 1-上线 2-下线
*/
private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
} else {
return 1;
}
}
}

View File

@@ -1,10 +1,9 @@
package com.zdjizhi.utils.json; package com.zdjizhi.utils.json;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.ConfigService;
@@ -210,8 +209,8 @@ public class JsonParseUtil {
HashMap<String, Class> map = new HashMap<>(16); HashMap<String, Class> map = new HashMap<>(16);
//获取fields并转化为数组数组的每个元素都是一个name doc type //获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(schema); JSONObject schemaJson = new JSONObject(schema, false, true);
JSONArray fields = (JSONArray) schemaJson.get("fields"); JSONArray fields = schemaJson.getJSONArray("fields");
for (Object field : fields) { for (Object field : fields) {
String filedStr = field.toString(); String filedStr = field.toString();
@@ -238,8 +237,9 @@ public class JsonParseUtil {
*/ */
private static boolean checkKeepField(String message) { private static boolean checkKeepField(String message) {
boolean isKeepField = true; boolean isKeepField = true;
boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); JSONObject fieldJson = new JSONObject(message, false, true);
if (isHiveDoc) { boolean hasDoc = fieldJson.containsKey("doc");
if (hasDoc) {
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
if (isHiveVi) { if (isHiveVi) {
String visibility = JsonPath.read(message, "$.doc.visibility").toString(); String visibility = JsonPath.read(message, "$.doc.visibility").toString();
@@ -271,58 +271,40 @@ public class JsonParseUtil {
private static ArrayList<String[]> getJobListFromHttp(String schema) { private static ArrayList<String[]> getJobListFromHttp(String schema) {
ArrayList<String[]> list = new ArrayList<>(); ArrayList<String[]> list = new ArrayList<>();
//获取fields并转化为数组数组的每个元素都是一个name doc type JSONObject schemaJson = new JSONObject(schema, false, true);
JSONObject schemaJson = JSON.parseObject(schema); JSONArray fields = schemaJson.getJSONArray("fields");
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) { for (Object field : fields) {
JSONObject fieldJson = new JSONObject(field, false, true);
boolean hasDoc = fieldJson.containsKey("doc");
if (hasDoc) {
JSONObject docJson = fieldJson.getJSONObject("doc");
boolean hasFormat = docJson.containsKey("format");
if (hasFormat) {
String name = fieldJson.getStr("name");
JSONArray formatList = docJson.getJSONArray("format");
for (Object format : formatList) {
JSONObject formatJson = new JSONObject(format, false, true);
String function = formatJson.getStr("function");
String appendTo;
String params = null;
if (JSON.parseObject(field.toString()).containsKey("doc")) { if (formatJson.containsKey("appendTo")) {
Object doc = JSON.parseObject(field.toString()).get("doc"); appendTo = formatJson.getStr("appendTo");
} else {
if (JSON.parseObject(doc.toString()).containsKey("format")) { appendTo = name;
String name = JSON.parseObject(field.toString()).get("name").toString();
Object format = JSON.parseObject(doc.toString()).get("format");
JSONObject formatObject = JSON.parseObject(format.toString());
String functions = formatObject.get("functions").toString();
String appendTo = null;
String params = null;
if (formatObject.containsKey("appendTo")) {
appendTo = formatObject.get("appendTo").toString();
}
if (formatObject.containsKey("param")) {
params = formatObject.get("param").toString();
}
if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
for (int i = 0; i < functionArray.length; i++) {
list.add(new String[]{name, appendToArray[i], functionArray[i], null});
} }
} else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { if (formatJson.containsKey("param")) {
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); params = formatJson.getStr("param");
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
for (int i = 0; i < functionArray.length; i++) {
list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
} }
} else {
list.add(new String[]{name, name, functions, params});
}
list.add(new String[]{name, appendTo, function, params});
}
} }
} }
} }
return list; return list;
} }

View File

@@ -0,0 +1,91 @@
package com.zdjizhi.utils.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.utils.StringUtil;
import java.util.ArrayList;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.json
* @Description:
* @date 2022/7/1817:19
*/
public class JsonPathUtil {
private static final Log logger = LogFactory.get();
/**
* 通过 josnPath 解析返回String类型数据
*
* @param message json数据
* @param expr 解析表达式
* @return 返回值
*/
public static String getStringValue(String message, String expr) {
String result = null;
try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
ArrayList<String> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) {
result = read.get(0);
}
}
} catch (RuntimeException e) {
logger.error("JSONPath parsing json returns String data exception" + e);
}
return result;
}
/**
* 通过 josnPath 解析返回Long类型数据
*
* @param message json数据
* @param expr 解析表达式
* @return 返回值
*/
public static Integer getIntegerValue(String message, String expr) {
Integer result = null;
try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
ArrayList<Integer> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) {
result = read.get(0);
}
}
} catch (RuntimeException e) {
logger.error("JSONPath parsing json returns Long data exception" + e);
}
return result;
}
/**
* 通过 josnPath 解析返回Long类型数据
*
* @param message json数据
* @param expr 解析表达式
* @return 返回值
*/
public static Long getLongValue(String message, String expr) {
Long result = null;
try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
System.out.println(message);
ArrayList<Long> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) {
result = read.get(0);
}
}
} catch (RuntimeException e) {
logger.error("JSONPath parsing json returns Long data exception" + e);
}
return result;
}
}

View File

@@ -19,15 +19,15 @@ import java.util.Properties;
public final class FlowWriteConfigurations { public final class FlowWriteConfigurations {
private static Properties propKafka = new Properties(); private static Properties propDefault = new Properties();
private static Properties propService = new Properties(); private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) { public static String getStringProperty(Integer type, String key) {
if (type == 0) { if (type == 0) {
return propService.getProperty(key); return propService.getProperty(key).trim();
} else if (type == 1) { } else if (type == 1) {
return propKafka.getProperty(key); return propDefault.getProperty(key).trim();
} else { } else {
return null; return null;
} }
@@ -35,9 +35,9 @@ public final class FlowWriteConfigurations {
public static Integer getIntProperty(Integer type, String key) { public static Integer getIntProperty(Integer type, String key) {
if (type == 0) { if (type == 0) {
return Integer.parseInt(propService.getProperty(key)); return Integer.parseInt(propService.getProperty(key).trim());
} else if (type == 1) { } else if (type == 1) {
return Integer.parseInt(propKafka.getProperty(key)); return Integer.parseInt(propDefault.getProperty(key).trim());
} else { } else {
return null; return null;
} }
@@ -45,9 +45,9 @@ public final class FlowWriteConfigurations {
public static Long getLongProperty(Integer type, String key) { public static Long getLongProperty(Integer type, String key) {
if (type == 0) { if (type == 0) {
return Long.parseLong(propService.getProperty(key)); return Long.parseLong(propService.getProperty(key).trim());
} else if (type == 1) { } else if (type == 1) {
return Long.parseLong(propKafka.getProperty(key)); return Long.parseLong(propDefault.getProperty(key).trim());
} else { } else {
return null; return null;
} }
@@ -57,7 +57,7 @@ public final class FlowWriteConfigurations {
if (type == 0) { if (type == 0) {
return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else if (type == 1) { } else if (type == 1) {
return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
} else { } else {
return null; return null;
} }
@@ -66,9 +66,9 @@ public final class FlowWriteConfigurations {
static { static {
try { try {
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); propDefault.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
} catch (IOException | RuntimeException e) { } catch (IOException | RuntimeException e) {
propKafka = null; propDefault = null;
propService = null; propService = null;
} }
} }