1:增加GTPC用户信息补全函数功能。(TSG-11152)

2:优化HBase更新策略,增加scan最大数量限制。
3:增加缓存存储GTPC上下行TEID,用于单向流补全处理。
4:增加时间判断,选用最新的TEID信息。
This commit is contained in:
qidaijie
2022-08-26 11:46:10 +08:00
parent c0707a79c3
commit 933b58ec18
8 changed files with 192 additions and 78 deletions

15
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId> <artifactId>log-completion-schema</artifactId>
<version>220727-GTPC</version> <version>220819-DOUBLE-TEID</version>
<name>log-completion-schema</name> <name>log-completion-schema</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>
@@ -297,19 +297,6 @@
<version>1.9.3</version> <version>1.9.3</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -1,4 +1,5 @@
#--------------------------------地址配置------------------------------# #--------------------------------地址配置------------------------------#
#管理kafka地址 #管理kafka地址
source.kafka.servers=192.168.44.12:9094 source.kafka.servers=192.168.44.12:9094
@@ -13,29 +14,34 @@ hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------# #--------------------------------HTTP/定位库------------------------------#
#定位库地址 #定位库地址
tools.library=/opt/dat/ tools.library=D:\\workerspace\\dat\\
#--------------------------------nacos配置------------------------------# #--------------------------------nacos配置------------------------------#
#nacos 地址 #nacos 地址
nacos.server=192.168.44.12:8848 nacos.server=192.168.44.67:8848
#nacos namespace #nacos namespace
nacos.schema.namespace=prod nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb
#nacos data id #nacos data id
nacos.data.id=gtpc_record.json nacos.data.id=session_record.json
#--------------------------------Kafka消费组信息------------------------------# #--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic #kafka 接收数据topic
source.kafka.topic=test source.kafka.topic=SESSION-RECORD
#补全数据 输出 topic #补全数据 输出 topic
sink.kafka.topic=test-result sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据 #读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=gtpc-record-log-2022-1 group.id=type-test-20220810-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
#--------------------------------topology配置------------------------------# #--------------------------------topology配置------------------------------#
#consumer 并行度 #consumer 并行度
source.parallelism=1 source.parallelism=1
@@ -46,14 +52,12 @@ transform.parallelism=1
sink.parallelism=1 sink.parallelism=1
#数据中心,取值范围(0-63) #数据中心,取值范围(0-63)
data.center.id.num=2 data.center.id.num=16
#hbase 更新时间如填写0则不更新缓存 #hbase 更新时间如填写0则不更新缓存
hbase.tick.tuple.freq.secs=180 hbase.tick.tuple.freq.secs=180
#--------------------------------默认值配置------------------------------# #--------------------------------默认值配置------------------------------#
#0不需要补全原样输出日志1需要补全 #0不需要补全原样输出日志1需要补全
log.need.complete=1 log.need.complete=1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none

View File

@@ -37,6 +37,10 @@ public class FlowWriteConfig {
*/ */
public static final String ENCODING = "UTF8"; public static final String ENCODING = "UTF8";
public static final String GTPC_FAMILY_NAME = "gtp";
public static final String RADIUS_FAMILY_NAME = "radius";
/** /**
* Nacos * Nacos
*/ */

View File

@@ -15,6 +15,7 @@ import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.utils.json.JsonPathUtil; import com.zdjizhi.utils.json.JsonPathUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@@ -115,7 +116,9 @@ class TransFunction {
} }
/** /**
* 借助HBase补齐GTP-C信息 * 借助HBase补齐GTP-C信息,解析tunnels信息优先使用gtp_uplink_teid其次使用gtp_downlink_teid
* <p>
* "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_uplink_teid":235261261,"gtp_downlink_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}]
* *
* @param jsonMap 原始日志json * @param jsonMap 原始日志json
* @param logValue 上行TEID * @param logValue 上行TEID
@@ -136,11 +139,10 @@ class TransFunction {
if (teid != null) { if (teid != null) {
String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER); String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
String userData = HBaseUtils.getGtpData(teid); HashMap<String, Object> userData = HBaseUtils.getGtpData(teid);
if (userData != null) { if (userData != null) {
JSONObject schemaJson = new JSONObject(userData, false, true);
for (String key : appendToKeys) { for (String key : appendToKeys) {
JsonParseUtil.setValue(jsonMap, key, schemaJson.getObj(key)); JsonParseUtil.setValue(jsonMap, key, userData.get(key).toString());
} }
} else { } else {
logger.warn("Description The user whose TEID is " + teid + " was not matched!"); logger.warn("Description The user whose TEID is " + teid + " was not matched!");
@@ -206,7 +208,7 @@ class TransFunction {
} }
} }
} catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) { } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
logger.error("The device label resolution exception or [expr] analytic expression error" + e); logger.error("The label resolution exception or [expr] analytic expression error" + e);
} }
return flattenResult; return flattenResult;
} }

View File

@@ -1,6 +1,5 @@
package com.zdjizhi.utils.hbase; package com.zdjizhi.utils.hbase;
import cn.hutool.json.JSONObject;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.FlowWriteConfig;
@@ -22,9 +21,9 @@ class GtpCRelation {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
/** /**
* 获取全量的Radius数据 * 获取全量的GTpc数据
*/ */
static void getAllGtpCRelation(Connection connection, Map<Long, String> gtpcMap) { static void getAllGtpCRelation(Connection connection, Map<Long, HashMap<String, Object>> gtpcMap) {
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
ResultScanner scanner = null; ResultScanner scanner = null;
try { try {
@@ -37,15 +36,17 @@ class GtpCRelation {
for (Result result : scanner) { for (Result result : scanner) {
int acctStatusType = GtpCRelation.getMsgType(result); int acctStatusType = GtpCRelation.getMsgType(result);
if (acctStatusType == 1) { if (acctStatusType == 1) {
Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))); Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid");
String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim(); Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid");
String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim(); String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim(); String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
JSONObject jsonObject = new JSONObject(); String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
jsonObject.set("common_phone_number", phoneNumber); Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time");
jsonObject.set("common_imsi", imsi);
jsonObject.set("common_imei", imei); HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
gtpcMap.put(teid, jsonObject.toJSONString(0));
updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
} }
} }
logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size()); logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
@@ -68,7 +69,7 @@ class GtpCRelation {
* @param startTime 开始时间 * @param startTime 开始时间
* @param endTime 结束时间 * @param endTime 结束时间
*/ */
static void upgradeGtpCRelation(Connection connection, Map<Long, String> gtpcMap, Long startTime, Long endTime) { static void upgradeGtpCRelation(Connection connection, Map<Long, HashMap<String, Object>> gtpcMap, Long startTime, Long endTime) {
Long begin = System.currentTimeMillis(); Long begin = System.currentTimeMillis();
Table table = null; Table table = null;
ResultScanner scanner = null; ResultScanner scanner = null;
@@ -82,18 +83,21 @@ class GtpCRelation {
scanner = table.getScanner(scan); scanner = table.getScanner(scan);
for (Result result : scanner) { for (Result result : scanner) {
int acctStatusType = GtpCRelation.getMsgType(result); int acctStatusType = GtpCRelation.getMsgType(result);
Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))); Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid");
Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid");
if (acctStatusType == 1) { if (acctStatusType == 1) {
String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim(); String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim(); String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim(); String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
JSONObject jsonObject = new JSONObject(); Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time");
jsonObject.set("common_phone_number", phoneNumber);
jsonObject.set("common_imsi", imsi); HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
jsonObject.set("common_imei", imei);
gtpcMap.put(teid, jsonObject.toJSONString(0)); updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
} else if (acctStatusType == 2) { } else if (acctStatusType == 2) {
gtpcMap.remove(teid); removeCache(gtpcMap, upLinkTeid);
removeCache(gtpcMap, downLinkTeid);
} }
} }
Long end = System.currentTimeMillis(); Long end = System.currentTimeMillis();
@@ -119,14 +123,64 @@ class GtpCRelation {
* 获取当前用户上下线状态信息 * 获取当前用户上下线状态信息
* *
* @param result HBase内获取的数据 * @param result HBase内获取的数据
* @return 状态 1-上线 2-下线 * @return onff_type 状态 1-上线 2-下线
*/ */
private static int getMsgType(Result result) { private static int getMsgType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")); boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type"));
if (hasType) { if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))); return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type")));
} else { } else {
return 1; return 0;
}
}
/**
* 构建用户信息
*
* @param phoneNumber 手机号
* @param imsi 用户标识
* @param imei 设备标识
* @return 用户信息
*/
private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) {
HashMap<String, Object> tmpMap = new HashMap<>(4);
tmpMap.put("common_phone_number", phoneNumber);
tmpMap.put("common_imsi", imsi);
tmpMap.put("common_imei", imei);
tmpMap.put("last_update_time", lastUpdateTime);
return tmpMap;
}
/**
* 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存
*
* @param gtpcMap 缓存集合
* @param teid 上下行teid
* @param userData 获取HBase内的用户信息
* @param lastUpdateTime 该用户信息最后更新时间
*/
private static void updateCache(Map<Long, HashMap<String, Object>> gtpcMap, Long teid, HashMap<String, Object> userData, Long lastUpdateTime) {
if (teid != 0L) {
if (gtpcMap.containsKey(teid)) {
Long oldUpdateTime = Long.parseLong(gtpcMap.get(teid).get("last_update_time").toString());
if (lastUpdateTime > oldUpdateTime) {
gtpcMap.put(teid, userData);
}
} else {
gtpcMap.put(teid, userData);
}
}
}
/**
* 将过期用户从缓存中删除
*
* @param gtpcMap 缓存集合
* @param teid 上下行teid
*/
private static void removeCache(Map<Long, HashMap<String, Object>> gtpcMap, Long teid) {
if (teid != 0L) {
gtpcMap.remove(teid);
} }
} }
} }

View File

@@ -3,9 +3,11 @@ package com.zdjizhi.utils.hbase;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@@ -24,7 +26,7 @@ import java.util.concurrent.TimeUnit;
public class HBaseUtils { public class HBaseUtils {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16); private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16);
private static Map<Long, String> gtpcMap = new ConcurrentHashMap<>(16); private static Map<Long, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16);
private static Connection connection; private static Connection connection;
private static Long time; private static Long time;
@@ -45,7 +47,8 @@ public class HBaseUtils {
RadiusRelation.getAllRadiusRelation(connection, radiusMap); RadiusRelation.getAllRadiusRelation(connection, radiusMap);
GtpCRelation.getAllGtpCRelation(connection, gtpcMap); GtpCRelation.getAllGtpCRelation(connection, gtpcMap);
//定时更新 //定时更新
updateCache(); updateRadiusCache();
updateGtpcCache();
} }
@@ -68,6 +71,48 @@ public class HBaseUtils {
} }
} }
/**
* 获取HBase内String类型的值
*
* @param result 结果集
* @param familyName 列族名称
* @param columnName 列名称
* @return 结果数据
*/
static String getString(Result result, String familyName, String columnName) {
byte[] familyBytes = Bytes.toBytes(familyName);
byte[] columnBytes = Bytes.toBytes(columnName);
boolean contains = result.containsColumn(familyBytes, columnBytes);
if (contains) {
String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim();
if (StringUtil.isNotBlank(data)) {
return data;
}
}
return "";
}
/**
* 获取HBase内String类型的值
*
* @param result 结果集
* @param columnName 列名称
* @return 结果数据
*/
static Long getLong(Result result, String familyName, String columnName) {
byte[] familyBytes = Bytes.toBytes(familyName);
byte[] columnBytes = Bytes.toBytes(columnName);
boolean contains = result.containsColumn(familyBytes, columnBytes);
if (contains) {
String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim();
if (StringUtil.isNotBlank(data)) {
return Bytes.toLong(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(columnName)));
}
}
return 0L;
}
/** /**
* 更新变量 * 更新变量
*/ */
@@ -84,7 +129,7 @@ public class HBaseUtils {
/** /**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/ */
private void updateCache() { private void updateRadiusCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() { executorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
@@ -102,7 +147,26 @@ public class HBaseUtils {
/** /**
* 获取 account * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateGtpcCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
change();
}
} catch (RuntimeException e) {
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
}
}
}, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
}
/**
* 获取Radius account
* *
* @param clientIp client_ip * @param clientIp client_ip
* @return account * @return account
@@ -119,12 +183,12 @@ public class HBaseUtils {
/** /**
* 获取 account * 获取GTPC用户信息
* *
* @param teid 上行TEID * @param teid TEID
* @return account * @return account
*/ */
public static String getGtpData(Long teid) { public static HashMap<String, Object> getGtpData(Long teid) {
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
if (hBaseUtils == null) { if (hBaseUtils == null) {
getInstance(); getInstance();

View File

@@ -34,8 +34,8 @@ class RadiusRelation {
scanner = table.getScanner(scan); scanner = table.getScanner(scan);
for (Result result : scanner) { for (Result result : scanner) {
int acctStatusType = RadiusRelation.getAcctStatusType(result); int acctStatusType = RadiusRelation.getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim();
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim();
if (acctStatusType == 1) { if (acctStatusType == 1) {
radiusMap.put(framedIp, account); radiusMap.put(framedIp, account);
} }
@@ -73,8 +73,8 @@ class RadiusRelation {
scanner = table.getScanner(scan); scanner = table.getScanner(scan);
for (Result result : scanner) { for (Result result : scanner) {
int acctStatusType = RadiusRelation.getAcctStatusType(result); int acctStatusType = RadiusRelation.getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim(); String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim();
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim(); String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim();
if (acctStatusType == 1) { if (acctStatusType == 1) {
if (radiusMap.containsKey(framedIp)) { if (radiusMap.containsKey(framedIp)) {
boolean same = account.equals(radiusMap.get(framedIp)); boolean same = account.equals(radiusMap.get(framedIp));
@@ -89,7 +89,7 @@ class RadiusRelation {
} }
} }
Long end = System.currentTimeMillis(); Long end = System.currentTimeMillis();
logger.warn("The current number of Radius relationships is:: " + radiusMap.keySet().size()); logger.warn("The current number of Radius relationships is: " + radiusMap.keySet().size());
logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms"); logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms");
} catch (IOException | RuntimeException e) { } catch (IOException | RuntimeException e) {
logger.error("Radius relationship update exception, the content is:" + e); logger.error("Radius relationship update exception, the content is:" + e);
@@ -114,9 +114,9 @@ class RadiusRelation {
* @return 状态 1-上线 2-下线 * @return 状态 1-上线 2-下线
*/ */
private static int getAcctStatusType(Result result) { private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type"));
if (hasType) { if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type")));
} else { } else {
return 1; return 1;
} }

View File

@@ -29,9 +29,9 @@ public class JsonPathUtil {
String result = null; String result = null;
try { try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
ArrayList<String> read = JsonPath.parse(message).read(expr); ArrayList<Object> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) { if (read.size() >= 1) {
result = read.get(0); result = read.get(0).toString();
} }
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
@@ -53,9 +53,9 @@ public class JsonPathUtil {
Integer result = null; Integer result = null;
try { try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
ArrayList<Integer> read = JsonPath.parse(message).read(expr); ArrayList<Object> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) { if (read.size() >= 1) {
result = read.get(0); result = Integer.parseInt(read.get(0).toString());
} }
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
@@ -76,14 +76,13 @@ public class JsonPathUtil {
Long result = null; Long result = null;
try { try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
System.out.println(message); ArrayList<Object> read = JsonPath.parse(message).read(expr);
ArrayList<Long> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) { if (read.size() >= 1) {
result = read.get(0); result = Long.parseLong(read.get(0).toString());
} }
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("JSONPath parsing json returns Long data exception" + e); logger.error("JSONPath parsing json returns Long data exception: " + e);
} }
return result; return result;