diff --git a/pom.xml b/pom.xml index b0aecc5..6cf404e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 220727-GTPC + 220819-DOUBLE-TEID log-completion-schema http://www.example.com @@ -297,19 +297,6 @@ 1.9.3 - - - org.apache.avro - avro - 1.11.0 - - - - org.apache.flink - flink-avro - ${flink.version} - - diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index b111181..910ccb3 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,4 +1,5 @@ #--------------------------------地址配置------------------------------# + #管理kafka地址 source.kafka.servers=192.168.44.12:9094 @@ -13,29 +14,34 @@ hbase.zookeeper.servers=192.168.44.12:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 -tools.library=/opt/dat/ +tools.library=D:\\workerspace\\dat\\ #--------------------------------nacos配置------------------------------# #nacos 地址 -nacos.server=192.168.44.12:8848 +nacos.server=192.168.44.67:8848 #nacos namespace -nacos.schema.namespace=prod +nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb #nacos data id -nacos.data.id=gtpc_record.json +nacos.data.id=session_record.json #--------------------------------Kafka消费组信息------------------------------# + #kafka 接收数据topic -source.kafka.topic=test +source.kafka.topic=SESSION-RECORD #补全数据 输出 topic sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=gtpc-record-log-2022-1 +group.id=type-test-20220810-1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none #--------------------------------topology配置------------------------------# + #consumer 并行度 source.parallelism=1 @@ -46,14 +52,12 @@ transform.parallelism=1 sink.parallelism=1 #数据中心,取值范围(0-63) -data.center.id.num=2 +data.center.id.num=16 #hbase 更新时间,如填写0则不更新缓存 hbase.tick.tuple.freq.secs=180 #--------------------------------默认值配置------------------------------# + #0不需要补全原样输出日志,1需要补全 log.need.complete=1 - -#生产者压缩模式 none or snappy -producer.kafka.compression.type=none diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 8b4a641..1a9238b 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -37,6 +37,10 @@ public class FlowWriteConfig { */ public static final String ENCODING = "UTF8"; + public static final String GTPC_FAMILY_NAME = "gtp"; + public static final String RADIUS_FAMILY_NAME = "radius"; + + /** * Nacos */ diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 4e002c5..028a9b4 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -15,6 +15,7 @@ import com.zdjizhi.utils.json.JsonParseUtil; import com.zdjizhi.utils.json.JsonPathUtil; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; @@ -115,7 +116,9 @@ class TransFunction { } /** - * 借助HBase补齐GTP-C信息 + * 借助HBase补齐GTP-C信息,解析tunnels信息,优先使用gtp_uplink_teid,其次使用gtp_downlink_teid + *

+ * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_uplink_teid":235261261,"gtp_downlink_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}] * * @param jsonMap 原始日志json * @param logValue 上行TEID @@ -136,11 +139,10 @@ class TransFunction { if (teid != null) { String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER); - String userData = HBaseUtils.getGtpData(teid); + HashMap userData = HBaseUtils.getGtpData(teid); if (userData != null) { - JSONObject schemaJson = new JSONObject(userData, false, true); for (String key : appendToKeys) { - JsonParseUtil.setValue(jsonMap, key, schemaJson.getObj(key)); + JsonParseUtil.setValue(jsonMap, key, userData.get(key).toString()); } } else { logger.warn("Description The user whose TEID is " + teid + " was not matched!"); @@ -206,7 +208,7 @@ class TransFunction { } } } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) { - logger.error("The device label resolution exception or [expr] analytic expression error" + e); + logger.error("The label resolution exception or [expr] analytic expression error" + e); } return flattenResult; } diff --git a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java index f1e5b30..9b125ba 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java +++ b/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java @@ -1,6 +1,5 @@ package com.zdjizhi.utils.hbase; -import cn.hutool.json.JSONObject; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; @@ -22,9 +21,9 @@ class GtpCRelation { private static final Log logger = LogFactory.get(); /** - * 获取全量的Radius数据 + * 获取全量的GTpc数据 */ - static void getAllGtpCRelation(Connection connection, Map gtpcMap) { + static void getAllGtpCRelation(Connection connection, Map> gtpcMap) { long begin = System.currentTimeMillis(); ResultScanner scanner = null; try { @@ -37,15 +36,17 @@ class GtpCRelation { for (Result result : scanner) { int acctStatusType = GtpCRelation.getMsgType(result); if (acctStatusType == 1) { - Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))); - String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim(); - String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim(); - String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim(); - JSONObject jsonObject = new JSONObject(); - jsonObject.set("common_phone_number", phoneNumber); - jsonObject.set("common_imsi", imsi); - jsonObject.set("common_imei", imei); - gtpcMap.put(teid, jsonObject.toJSONString(0)); + Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid"); + Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid"); + String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); + String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); + String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); + Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time"); + + HashMap buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); + + updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); } } logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size()); @@ -68,7 +69,7 @@ class GtpCRelation { * @param startTime 开始时间 * @param endTime 结束时间 */ - static void upgradeGtpCRelation(Connection connection, Map gtpcMap, Long startTime, Long endTime) { + static void upgradeGtpCRelation(Connection connection, Map> gtpcMap, Long startTime, Long endTime) { Long begin = System.currentTimeMillis(); Table table = null; ResultScanner scanner = null; @@ -82,18 +83,21 @@ class GtpCRelation { scanner = table.getScanner(scan); for (Result result : scanner) { int acctStatusType = GtpCRelation.getMsgType(result); - Long teid = Bytes.toLong(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))); + Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid"); + Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid"); if (acctStatusType == 1) { - String phoneNumber = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))).trim(); - String imsi = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))).trim(); - String imei = Bytes.toString(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))).trim(); - JSONObject jsonObject = new JSONObject(); - jsonObject.set("common_phone_number", phoneNumber); - jsonObject.set("common_imsi", imsi); - jsonObject.set("common_imei", imei); - gtpcMap.put(teid, jsonObject.toJSONString(0)); + String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); + String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); + String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); + Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time"); + + HashMap buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); + + updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); } else if (acctStatusType == 2) { - gtpcMap.remove(teid); + removeCache(gtpcMap, upLinkTeid); + removeCache(gtpcMap, downLinkTeid); } } Long end = System.currentTimeMillis(); @@ -119,14 +123,64 @@ class GtpCRelation { * 获取当前用户上下线状态信息 * * @param result HBase内获取的数据 - * @return 状态 1-上线 2-下线 + * @return onff_type 状态 1-上线 2-下线 */ private static int getMsgType(Result result) { - boolean hasType = result.containsColumn(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type")); + boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type")); if (hasType) { - return Bytes.toInt(result.getValue(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))); + return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type"))); } else { - return 1; + return 0; + } + } + + /** + * 构建用户信息 + * + * @param phoneNumber 手机号 + * @param imsi 用户标识 + * @param imei 设备标识 + * @return 用户信息 + */ + private static HashMap buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) { + HashMap tmpMap = new HashMap<>(4); + tmpMap.put("common_phone_number", phoneNumber); + tmpMap.put("common_imsi", imsi); + tmpMap.put("common_imei", imei); + tmpMap.put("last_update_time", lastUpdateTime); + return tmpMap; + } + + /** + * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存 + * + * @param gtpcMap 缓存集合 + * @param teid 上下行teid + * @param userData 获取HBase内的用户信息 + * @param lastUpdateTime 该用户信息最后更新时间 + */ + private static void updateCache(Map> gtpcMap, Long teid, HashMap userData, Long lastUpdateTime) { + if (teid != 0L) { + if (gtpcMap.containsKey(teid)) { + Long oldUpdateTime = Long.parseLong(gtpcMap.get(teid).get("last_update_time").toString()); + if (lastUpdateTime > oldUpdateTime) { + gtpcMap.put(teid, userData); + } + } else { + gtpcMap.put(teid, userData); + } + } + } + + /** + * 将过期用户从缓存中删除 + * + * @param gtpcMap 缓存集合 + * @param teid 上下行teid + */ + private static void removeCache(Map> gtpcMap, Long teid) { + if (teid != 0L) { + gtpcMap.remove(teid); } } } diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 461edc0..349d4f1 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -3,9 +3,11 @@ package com.zdjizhi.utils.hbase; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.HashMap; @@ -24,7 +26,7 @@ import java.util.concurrent.TimeUnit; public class HBaseUtils { private static final Log logger = LogFactory.get(); private static Map radiusMap = new ConcurrentHashMap<>(16); - private static Map gtpcMap = new ConcurrentHashMap<>(16); + private static Map> gtpcMap = new ConcurrentHashMap<>(16); private static Connection connection; private static Long time; @@ -45,7 +47,8 @@ public class HBaseUtils { RadiusRelation.getAllRadiusRelation(connection, radiusMap); GtpCRelation.getAllGtpCRelation(connection, gtpcMap); //定时更新 - updateCache(); + updateRadiusCache(); + updateGtpcCache(); } @@ -68,6 +71,48 @@ public class HBaseUtils { } } + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @param familyName 列族名称 + * @param columnName 列名称 + * @return 结果数据 + */ + static String getString(Result result, String familyName, String columnName) { + byte[] familyBytes = Bytes.toBytes(familyName); + byte[] columnBytes = Bytes.toBytes(columnName); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim(); + if (StringUtil.isNotBlank(data)) { + return data; + } + } + + return ""; + } + + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @param columnName 列名称 + * @return 结果数据 + */ + static Long getLong(Result result, String familyName, String columnName) { + byte[] familyBytes = Bytes.toBytes(familyName); + byte[] columnBytes = Bytes.toBytes(columnName); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim(); + if (StringUtil.isNotBlank(data)) { + return Bytes.toLong(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(columnName))); + } + } + return 0L; + } + /** * 更新变量 */ @@ -84,7 +129,7 @@ public class HBaseUtils { /** * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie */ - private void updateCache() { + private void updateRadiusCache() { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); executorService.scheduleAtFixedRate(new Runnable() { @Override @@ -102,7 +147,26 @@ public class HBaseUtils { /** - * 获取 account + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateGtpcCache() { + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + /** + * 获取Radius account * * @param clientIp client_ip * @return account @@ -119,12 +183,12 @@ public class HBaseUtils { /** - * 获取 account + * 获取GTPC用户信息 * - * @param teid 上行TEID + * @param teid TEID * @return account */ - public static String getGtpData(Long teid) { + public static HashMap getGtpData(Long teid) { if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { if (hBaseUtils == null) { getInstance(); diff --git a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java b/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java index f5b17de..c5e6fe4 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java +++ b/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java @@ -34,8 +34,8 @@ class RadiusRelation { scanner = table.getScanner(scan); for (Result result : scanner) { int acctStatusType = RadiusRelation.getAcctStatusType(result); - String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); - String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim(); + String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim(); if (acctStatusType == 1) { radiusMap.put(framedIp, account); } @@ -73,8 +73,8 @@ class RadiusRelation { scanner = table.getScanner(scan); for (Result result : scanner) { int acctStatusType = RadiusRelation.getAcctStatusType(result); - String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim(); - String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim(); + String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim(); + String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim(); if (acctStatusType == 1) { if (radiusMap.containsKey(framedIp)) { boolean same = account.equals(radiusMap.get(framedIp)); @@ -89,7 +89,7 @@ class RadiusRelation { } } Long end = System.currentTimeMillis(); - logger.warn("The current number of Radius relationships is:: " + radiusMap.keySet().size()); + logger.warn("The current number of Radius relationships is: " + radiusMap.keySet().size()); logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms"); } catch (IOException | RuntimeException e) { logger.error("Radius relationship update exception, the content is:" + e); @@ -114,9 +114,9 @@ class RadiusRelation { * @return 状态 1-上线 2-下线 */ private static int getAcctStatusType(Result result) { - boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type")); if (hasType) { - return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type"))); } else { return 1; } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java index 130cd90..a3fb6a6 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java @@ -29,9 +29,9 @@ public class JsonPathUtil { String result = null; try { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { - ArrayList read = JsonPath.parse(message).read(expr); + ArrayList read = JsonPath.parse(message).read(expr); if (read.size() >= 1) { - result = read.get(0); + result = read.get(0).toString(); } } } catch (RuntimeException e) { @@ -53,9 +53,9 @@ public class JsonPathUtil { Integer result = null; try { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { - ArrayList read = JsonPath.parse(message).read(expr); + ArrayList read = JsonPath.parse(message).read(expr); if (read.size() >= 1) { - result = read.get(0); + result = Integer.parseInt(read.get(0).toString()); } } } catch (RuntimeException e) { @@ -76,14 +76,13 @@ public class JsonPathUtil { Long result = null; try { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { - System.out.println(message); - ArrayList read = JsonPath.parse(message).read(expr); + ArrayList read = JsonPath.parse(message).read(expr); if (read.size() >= 1) { - result = read.get(0); + result = Long.parseLong(read.get(0).toString()); } } } catch (RuntimeException e) { - logger.error("JSONPath parsing json returns Long data exception" + e); + logger.error("JSONPath parsing json returns Long data exception: " + e); } return result;