From 60d12d3f8c7db5ba72207412bf2d0de9fb709604 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Wed, 21 Sep 2022 18:15:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0GTP-C/RADIUS=E5=85=B3?= =?UTF-8?q?=E8=81=94=E5=A2=9E=E5=8A=A0VSYS=E5=BC=80=E5=85=B3=EF=BC=8C?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=91=E5=89=8D=E5=85=BC=E5=AE=B9=EF=BC=8C?= =?UTF-8?q?=E6=B2=A1=E6=9C=89vsysid=E7=9A=84=E6=95=B0=E6=8D=AE=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E4=B8=BA1=E3=80=82=EF=BC=88TSG-11939=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/default_config.properties | 6 +- properties/service_flow_config.properties | 8 +- .../com/zdjizhi/common/FlowWriteConfig.java | 5 +- .../exception/FlowWriteException.java | 2 +- .../functions/FilterNullFunction.java | 2 +- .../functions/MapCompletedFunction.java | 4 +- .../functions/TypeMapCompletedFunction.java | 4 +- .../{utils => tools}/general/SnowflakeId.java | 6 +- .../general/TransFormMap.java | 6 +- .../general/TransFormTypeMap.java | 9 ++- .../general/TransFunction.java | 27 ++++--- .../{utils => tools}/hbase/GtpCRelation.java | 74 ++++++++++-------- .../{utils => tools}/hbase/HBaseUtils.java | 78 ++++++++++++------- .../hbase/RadiusRelation.java | 26 +++++-- .../{utils => tools}/json/JsonParseUtil.java | 11 ++- .../{utils => tools}/json/JsonPathUtil.java | 8 +- .../{utils => tools}/json/JsonTypeUtil.java | 4 +- .../{utils => tools}/json/TypeUtils.java | 4 +- .../{utils => tools}/kafka/CertUtils.java | 2 +- .../{utils => tools}/kafka/KafkaConsumer.java | 9 +-- .../{utils => tools}/kafka/KafkaProducer.java | 2 +- .../kafka/TimestampDeserializationSchema.java | 2 +- .../system/FlowWriteConfigurations.java | 8 +- .../zookeeper/DistributedLock.java | 2 +- .../zookeeper/ZookeeperUtils.java | 2 +- .../topology/LogFlowWriteTopology.java | 10 +-- .../zdjizhi/utils/http/HttpClientUtil.java | 77 ------------------ 28 files changed, 184 insertions(+), 216 deletions(-) rename src/main/java/com/zdjizhi/{utils => tools}/exception/FlowWriteException.java (88%) rename src/main/java/com/zdjizhi/{utils => tools}/functions/FilterNullFunction.java (91%) rename src/main/java/com/zdjizhi/{utils => tools}/functions/MapCompletedFunction.java (84%) rename src/main/java/com/zdjizhi/{utils => tools}/functions/TypeMapCompletedFunction.java (83%) rename src/main/java/com/zdjizhi/{utils => tools}/general/SnowflakeId.java (97%) rename src/main/java/com/zdjizhi/{utils => tools}/general/TransFormMap.java (97%) rename src/main/java/com/zdjizhi/{utils => tools}/general/TransFormTypeMap.java (95%) rename src/main/java/com/zdjizhi/{utils => tools}/general/TransFunction.java (90%) rename src/main/java/com/zdjizhi/{utils => tools}/hbase/GtpCRelation.java (68%) rename src/main/java/com/zdjizhi/{utils => tools}/hbase/HBaseUtils.java (77%) rename src/main/java/com/zdjizhi/{utils => tools}/hbase/RadiusRelation.java (81%) rename src/main/java/com/zdjizhi/{utils => tools}/json/JsonParseUtil.java (96%) rename src/main/java/com/zdjizhi/{utils => tools}/json/JsonPathUtil.java (83%) rename src/main/java/com/zdjizhi/{utils => tools}/json/JsonTypeUtil.java (96%) rename src/main/java/com/zdjizhi/{utils => tools}/json/TypeUtils.java (98%) rename src/main/java/com/zdjizhi/{utils => tools}/kafka/CertUtils.java (98%) rename src/main/java/com/zdjizhi/{utils => tools}/kafka/KafkaConsumer.java (80%) rename src/main/java/com/zdjizhi/{utils => tools}/kafka/KafkaProducer.java (98%) rename src/main/java/com/zdjizhi/{utils => tools}/kafka/TimestampDeserializationSchema.java (97%) rename src/main/java/com/zdjizhi/{utils => tools}/system/FlowWriteConfigurations.java (87%) rename src/main/java/com/zdjizhi/{utils => tools}/zookeeper/DistributedLock.java (99%) rename src/main/java/com/zdjizhi/{utils => tools}/zookeeper/ZookeeperUtils.java (99%) delete mode 100644 src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java diff --git a/pom.xml b/pom.xml index 6cf404e..3442b0e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 220819-DOUBLE-TEID + 20220921-VSYS log-completion-schema http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties index 23fb5f9..9af6157 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -66,4 +66,8 @@ buffer.timeout=5000 hbase.gtpc.scan.max.rows=0 #The radius data scan max rows,0 = no limit. -hbase.radius.scan.max.rows=0 \ No newline at end of file +hbase.radius.scan.max.rows=0 + +#Whether vsys_id is used as the relationship key between gtpc and radius. +#vsys or global +data.relationship.model=vsys \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 910ccb3..f76f57d 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -18,10 +18,10 @@ tools.library=D:\\workerspace\\dat\\ #--------------------------------nacos配置------------------------------# #nacos 地址 -nacos.server=192.168.44.67:8848 +nacos.server=192.168.44.12:8848 #nacos namespace -nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb +nacos.schema.namespace=test #nacos data id nacos.data.id=session_record.json @@ -29,7 +29,7 @@ nacos.data.id=session_record.json #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=SESSION-RECORD +source.kafka.topic=test #补全数据 输出 topic sink.kafka.topic=test-result @@ -55,7 +55,7 @@ sink.parallelism=1 data.center.id.num=16 #hbase 更新时间,如填写0则不更新缓存 -hbase.tick.tuple.freq.secs=180 +hbase.tick.tuple.freq.secs=60 #--------------------------------默认值配置------------------------------# diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 1a9238b..7f92b02 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -1,7 +1,7 @@ package com.zdjizhi.common; -import com.zdjizhi.utils.system.FlowWriteConfigurations; +import com.zdjizhi.tools.system.FlowWriteConfigurations; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; /** @@ -39,6 +39,8 @@ public class FlowWriteConfig { public static final String GTPC_FAMILY_NAME = "gtp"; public static final String RADIUS_FAMILY_NAME = "radius"; + public static final String COMMON_FAMILY_NAME = "common"; + public static final String DEFAULT_RELATIONSHIP_MODULE = "vsys"; /** @@ -61,6 +63,7 @@ public class FlowWriteConfig { public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset"); public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); + public static final String DATA_RELATIONSHIP_MODEL = FlowWriteConfigurations.getStringProperty(1, "data.relationship.model"); public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); /** diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/tools/exception/FlowWriteException.java similarity index 88% rename from src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java rename to src/main/java/com/zdjizhi/tools/exception/FlowWriteException.java index 67c88f0..ef14812 100644 --- a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java +++ b/src/main/java/com/zdjizhi/tools/exception/FlowWriteException.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.exception; +package com.zdjizhi.tools.exception; /** * @author qidaijie diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java similarity index 91% rename from src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java rename to src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java index de507ad..e5f8526 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java +++ b/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.tools.functions; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.FilterFunction; diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java similarity index 84% rename from src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java rename to src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java index 810e4c8..6b8df35 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java @@ -1,7 +1,7 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.tools.functions; -import com.zdjizhi.utils.general.TransFormMap; +import com.zdjizhi.tools.general.TransFormMap; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java similarity index 83% rename from src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java rename to src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java index ccef850..1b8dd6a 100644 --- a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java @@ -1,6 +1,6 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.tools.functions; -import com.zdjizhi.utils.general.TransFormTypeMap; +import com.zdjizhi.tools.general.TransFormTypeMap; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/tools/general/SnowflakeId.java similarity index 97% rename from src/main/java/com/zdjizhi/utils/general/SnowflakeId.java rename to src/main/java/com/zdjizhi/tools/general/SnowflakeId.java index 7cb907e..8db3ec6 100644 --- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/tools/general/SnowflakeId.java @@ -1,10 +1,10 @@ -package com.zdjizhi.utils.general; +package com.zdjizhi.tools.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.zookeeper.DistributedLock; -import com.zdjizhi.utils.zookeeper.ZookeeperUtils; +import com.zdjizhi.tools.zookeeper.DistributedLock; +import com.zdjizhi.tools.zookeeper.ZookeeperUtils; /** * 雪花算法 diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/tools/general/TransFormMap.java similarity index 97% rename from src/main/java/com/zdjizhi/utils/general/TransFormMap.java rename to src/main/java/com/zdjizhi/tools/general/TransFormMap.java index 37d3a00..19df653 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/tools/general/TransFormMap.java @@ -1,10 +1,10 @@ -package com.zdjizhi.utils.general; +package com.zdjizhi.tools.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.tools.json.JsonParseUtil; import java.util.Map; @@ -107,7 +107,7 @@ public class TransFormMap { break; case "radius_match": if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString())); + JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(jsonMap, logValue.toString())); } break; case "gtpc_match": diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java similarity index 95% rename from src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java rename to src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java index de5fd43..4bc2f36 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java @@ -1,10 +1,11 @@ -package com.zdjizhi.utils.general; +package com.zdjizhi.tools.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.tools.json.JsonParseUtil; import java.util.Map; @@ -108,12 +109,12 @@ public class TransFormTypeMap { break; case "radius_match": if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString())); + JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(jsonMap, logValue.toString())); } break; case "gtpc_match": if (logValue != null) { - TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey,param); + TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey, param); } break; case "set_value": diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/tools/general/TransFunction.java similarity index 90% rename from src/main/java/com/zdjizhi/utils/general/TransFunction.java rename to src/main/java/com/zdjizhi/tools/general/TransFunction.java index 7e16d72..0263239 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/tools/general/TransFunction.java @@ -1,23 +1,21 @@ -package com.zdjizhi.utils.general; +package com.zdjizhi.tools.general; import cn.hutool.core.codec.Base64; -import cn.hutool.json.JSONObject; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.hbase.HBaseUtils; import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookupV2; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.hbase.HBaseUtils; -import com.zdjizhi.utils.json.JsonParseUtil; -import com.zdjizhi.utils.json.JsonPathUtil; +import com.zdjizhi.tools.json.JsonParseUtil; +import com.zdjizhi.tools.json.JsonPathUtil; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.regex.Pattern; /** * @author qidaijie @@ -106,8 +104,13 @@ class TransFunction { * @param ip client IP * @return account */ - static String radiusMatch(String ip) { - return HBaseUtils.getAccount(ip.trim()); + static String radiusMatch(Map jsonMap, String ip) { + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = jsonMap.getOrDefault("common_vsys_id", "1").toString(); + return HBaseUtils.getAccount(ip + vsysId); + } else { + return HBaseUtils.getAccount(ip); + } } /** @@ -122,10 +125,10 @@ class TransFunction { */ static void gtpcMatch(Map jsonMap, String logValue, String appendToKey, String param) { try { - Long teid = null; + String teid = null; String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER); for (String expr : exprs) { - Long value = JsonPathUtil.getTeidValue(logValue, expr); + String value = JsonPathUtil.getTeidValue(logValue, expr); if (value != null) { teid = value; break; @@ -133,6 +136,10 @@ class TransFunction { } if (teid != null) { + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = jsonMap.getOrDefault("common_vsys_id", "1").toString(); + teid = teid + vsysId; + } String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER); HashMap userData = HBaseUtils.getGtpData(teid); if (userData != null) { diff --git a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/tools/hbase/GtpCRelation.java similarity index 68% rename from src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java rename to src/main/java/com/zdjizhi/tools/hbase/GtpCRelation.java index 9b125ba..ad7b128 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java +++ b/src/main/java/com/zdjizhi/tools/hbase/GtpCRelation.java @@ -1,8 +1,9 @@ -package com.zdjizhi.utils.hbase; +package com.zdjizhi.tools.hbase; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; @@ -23,7 +24,7 @@ class GtpCRelation { /** * 获取全量的GTpc数据 */ - static void getAllGtpCRelation(Connection connection, Map> gtpcMap) { + static void getAllGtpCRelation(Connection connection, Map> gtpcMap) { long begin = System.currentTimeMillis(); ResultScanner scanner = null; try { @@ -36,8 +37,8 @@ class GtpCRelation { for (Result result : scanner) { int acctStatusType = GtpCRelation.getMsgType(result); if (acctStatusType == 1) { - Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid"); - Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid"); + String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid"); + String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid"); String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); @@ -45,10 +46,18 @@ class GtpCRelation { HashMap buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); - updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = HBaseUtils.getVsysId(result).trim(); + updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime); + } else { + updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); + } } } + + System.out.println(gtpcMap.toString()); logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size()); logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin) + "ms"); } catch (IOException | RuntimeException e) { @@ -69,7 +78,7 @@ class GtpCRelation { * @param startTime 开始时间 * @param endTime 结束时间 */ - static void upgradeGtpCRelation(Connection connection, Map> gtpcMap, Long startTime, Long endTime) { + static void upgradeGtpCRelation(Connection connection, Map> gtpcMap, Long startTime, Long endTime) { Long begin = System.currentTimeMillis(); Table table = null; ResultScanner scanner = null; @@ -83,8 +92,8 @@ class GtpCRelation { scanner = table.getScanner(scan); for (Result result : scanner) { int acctStatusType = GtpCRelation.getMsgType(result); - Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid"); - Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid"); + String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid"); + String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid"); if (acctStatusType == 1) { String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); @@ -93,11 +102,24 @@ class GtpCRelation { HashMap buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); - updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = HBaseUtils.getVsysId(result).trim(); + updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime); + } else { + updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); + } + } else if (acctStatusType == 2) { - removeCache(gtpcMap, upLinkTeid); - removeCache(gtpcMap, downLinkTeid); + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = HBaseUtils.getVsysId(result).trim(); + gtpcMap.remove(upLinkTeid+vsysId); + gtpcMap.remove(downLinkTeid+vsysId); + } else { + gtpcMap.remove(upLinkTeid); + gtpcMap.remove(downLinkTeid); + } } } Long end = System.currentTimeMillis(); @@ -155,32 +177,20 @@ class GtpCRelation { * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存 * * @param gtpcMap 缓存集合 - * @param teid 上下行teid + * @param key 上下行teid * @param userData 获取HBase内的用户信息 * @param lastUpdateTime 该用户信息最后更新时间 */ - private static void updateCache(Map> gtpcMap, Long teid, HashMap userData, Long lastUpdateTime) { - if (teid != 0L) { - if (gtpcMap.containsKey(teid)) { - Long oldUpdateTime = Long.parseLong(gtpcMap.get(teid).get("last_update_time").toString()); + private static void updateCache(Map> gtpcMap, String key, HashMap userData, Long lastUpdateTime) { + if (StringUtil.isNotBlank(key)){ + if (gtpcMap.containsKey(key)) { + Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString()); if (lastUpdateTime > oldUpdateTime) { - gtpcMap.put(teid, userData); + gtpcMap.put(key, userData); } } else { - gtpcMap.put(teid, userData); + gtpcMap.put(key, userData); } } } - - /** - * 将过期用户从缓存中删除 - * - * @param gtpcMap 缓存集合 - * @param teid 上下行teid - */ - private static void removeCache(Map> gtpcMap, Long teid) { - if (teid != 0L) { - gtpcMap.remove(teid); - } - } } diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/tools/hbase/HBaseUtils.java similarity index 77% rename from src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java rename to src/main/java/com/zdjizhi/tools/hbase/HBaseUtils.java index 349d4f1..dc66b21 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/tools/hbase/HBaseUtils.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.hbase; +package com.zdjizhi.tools.hbase; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; public class HBaseUtils { private static final Log logger = LogFactory.get(); private static Map radiusMap = new ConcurrentHashMap<>(16); - private static Map> gtpcMap = new ConcurrentHashMap<>(16); + private static Map> gtpcMap = new ConcurrentHashMap<>(16); private static Connection connection; private static Long time; @@ -47,8 +47,7 @@ public class HBaseUtils { RadiusRelation.getAllRadiusRelation(connection, radiusMap); GtpCRelation.getAllGtpCRelation(connection, gtpcMap); //定时更新 - updateRadiusCache(); - updateGtpcCache(); + updateCache(); } @@ -105,14 +104,51 @@ public class HBaseUtils { byte[] columnBytes = Bytes.toBytes(columnName); boolean contains = result.containsColumn(familyBytes, columnBytes); if (contains) { - String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim(); - if (StringUtil.isNotBlank(data)) { - return Bytes.toLong(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(columnName))); - } + return Bytes.toLong(result.getValue(familyBytes, columnBytes)); } return 0L; } + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @param columnName 列名称 + * @return 结果数据 + */ + static String getTeid(Result result, String columnName) { + byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME); + byte[] columnBytes = Bytes.toBytes(columnName); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim(); + if (StringUtil.isNotBlank(data)) { + return data; + } + } + return "0"; + } + + + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @return 结果数据 + */ + static String getVsysId(Result result) { + byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.COMMON_FAMILY_NAME); + byte[] columnBytes = Bytes.toBytes("vsys_id"); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim(); + if (StringUtil.isNotBlank(data)) { + return data; + } + } + return "1"; + } + /** * 更新变量 */ @@ -123,33 +159,15 @@ public class HBaseUtils { long nowTime = System.currentTimeMillis(); RadiusRelation.upgradeRadiusRelation(connection, radiusMap, time - 1000, nowTime + 500); GtpCRelation.upgradeGtpCRelation(connection, gtpcMap, time - 1000, nowTime + 500); + System.out.println(gtpcMap); + System.out.println(radiusMap); time = nowTime; } /** * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie */ - private void updateRadiusCache() { - ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); - executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { - change(); - } - } catch (RuntimeException e) { - logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); - } - } - }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); - } - - - /** - * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie - */ - private void updateGtpcCache() { + private void updateCache() { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); executorService.scheduleAtFixedRate(new Runnable() { @Override @@ -188,7 +206,7 @@ public class HBaseUtils { * @param teid TEID * @return account */ - public static HashMap getGtpData(Long teid) { + public static HashMap getGtpData(String teid) { if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { if (hBaseUtils == null) { getInstance(); diff --git a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java b/src/main/java/com/zdjizhi/tools/hbase/RadiusRelation.java similarity index 81% rename from src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java rename to src/main/java/com/zdjizhi/tools/hbase/RadiusRelation.java index c5e6fe4..7c5372f 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java +++ b/src/main/java/com/zdjizhi/tools/hbase/RadiusRelation.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.hbase; +package com.zdjizhi.tools.hbase; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -37,9 +37,16 @@ class RadiusRelation { String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim(); String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim(); if (acctStatusType == 1) { - radiusMap.put(framedIp, account); + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = HBaseUtils.getVsysId(result).trim(); + radiusMap.put(framedIp + vsysId, account); + } else { + radiusMap.put(framedIp, account); + } } } + System.out.println(radiusMap.toString()); + logger.warn("The obtain the number of RADIUS relationships : " + radiusMap.size()); logger.warn("The time spent to obtain radius relationships : " + (System.currentTimeMillis() - begin) + "ms"); } catch (IOException | RuntimeException e) { @@ -76,16 +83,19 @@ class RadiusRelation { String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim(); String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim(); if (acctStatusType == 1) { - if (radiusMap.containsKey(framedIp)) { - boolean same = account.equals(radiusMap.get(framedIp)); - if (!same) { - radiusMap.put(framedIp, account); - } + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = HBaseUtils.getVsysId(result).trim(); + radiusMap.put(framedIp + vsysId, account); } else { radiusMap.put(framedIp, account); } } else if (acctStatusType == 2) { - radiusMap.remove(framedIp); + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = HBaseUtils.getVsysId(result).trim(); + radiusMap.remove(framedIp+vsysId); + } else { + radiusMap.remove(framedIp); + } } } Long end = System.currentTimeMillis(); diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java similarity index 96% rename from src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java rename to src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java index ac543ec..3c522e1 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; @@ -219,8 +219,13 @@ public class JsonParseUtil { * @param jsonMap 原始日志 */ private static void setFieldDefault(Map jsonMap) { - for (String key : defaultFieldsMap.keySet()) { - jsonMap.put(key, defaultFieldsMap.get(key)); + if (defaultFieldsMap.keySet().size() >= 1) { + for (String fieldName : defaultFieldsMap.keySet()) { + Object logValue = JsonParseUtil.getValue(jsonMap, fieldName); + if (logValue == null) { + jsonMap.put(fieldName, defaultFieldsMap.get(fieldName)); + } + } } } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonPathUtil.java similarity index 83% rename from src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java rename to src/main/java/com/zdjizhi/tools/json/JsonPathUtil.java index 70b4b19..13bfaca 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/JsonPathUtil.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -24,13 +24,13 @@ public class JsonPathUtil { * @param expr 解析表达式 * @return 返回值 */ - public static Long getTeidValue(String message, String expr) { - Long result = null; + public static String getTeidValue(String message, String expr) { + String result = null; try { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { ArrayList read = JsonPath.parse(message).read(expr); if (read.size() >= 1) { - result = Long.parseLong(read.get(0).toString()); + result = read.get(0).toString(); } } } catch (RuntimeException e) { diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java similarity index 96% rename from src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java rename to src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java index 0cf16ff..2ef19f3 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java @@ -1,7 +1,7 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; +import com.zdjizhi.tools.exception.FlowWriteException; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.exception.FlowWriteException; import java.util.List; import java.util.Map; diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java similarity index 98% rename from src/main/java/com/zdjizhi/utils/json/TypeUtils.java rename to src/main/java/com/zdjizhi/tools/json/TypeUtils.java index b13627f..c6acc17 100644 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java @@ -1,10 +1,10 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.tools.json; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.exception.FlowWriteException; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.FlowWriteException; /** * @author qidaijie diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java similarity index 98% rename from src/main/java/com/zdjizhi/utils/kafka/CertUtils.java rename to src/main/java/com/zdjizhi/tools/kafka/CertUtils.java index ce059f8..ad93f29 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import com.zdjizhi.common.FlowWriteConfig; import org.apache.kafka.common.config.SslConfigs; diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java similarity index 80% rename from src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java rename to src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index f935689..ec75b8a 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -1,15 +1,8 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import java.util.Map; diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java similarity index 98% rename from src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java rename to src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java index 28ecff9..3c372af 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java similarity index 97% rename from src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java rename to src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java index 920ffab..409ea94 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java +++ b/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.kafka; +package com.zdjizhi.tools.kafka; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java similarity index 87% rename from src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java rename to src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java index 3b40482..d429def 100644 --- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java @@ -1,14 +1,8 @@ -package com.zdjizhi.utils.system; +package com.zdjizhi.tools.system; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.exception.NacosException; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import java.io.IOException; -import java.io.StringReader; import java.util.Locale; import java.util.Properties; diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/tools/zookeeper/DistributedLock.java similarity index 99% rename from src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java rename to src/main/java/com/zdjizhi/tools/zookeeper/DistributedLock.java index 2afab03..62b5464 100644 --- a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java +++ b/src/main/java/com/zdjizhi/tools/zookeeper/DistributedLock.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.zookeeper; +package com.zdjizhi.tools.zookeeper; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/tools/zookeeper/ZookeeperUtils.java similarity index 99% rename from src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java rename to src/main/java/com/zdjizhi/tools/zookeeper/ZookeeperUtils.java index 9efbd46..f86f130 100644 --- a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java +++ b/src/main/java/com/zdjizhi/tools/zookeeper/ZookeeperUtils.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.zookeeper; +package com.zdjizhi.tools.zookeeper; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index c98687b..5581137 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -3,11 +3,11 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.functions.FilterNullFunction; -import com.zdjizhi.utils.functions.MapCompletedFunction; -import com.zdjizhi.utils.functions.TypeMapCompletedFunction; -import com.zdjizhi.utils.kafka.KafkaConsumer; -import com.zdjizhi.utils.kafka.KafkaProducer; +import com.zdjizhi.tools.functions.FilterNullFunction; +import com.zdjizhi.tools.functions.MapCompletedFunction; +import com.zdjizhi.tools.functions.TypeMapCompletedFunction; +import com.zdjizhi.tools.kafka.KafkaConsumer; +import com.zdjizhi.tools.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java deleted file mode 100644 index 1adb1d1..0000000 --- a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.zdjizhi.utils.http; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * 获取网关schema的工具类 - * - * @author qidaijie - */ -public class HttpClientUtil { - private static final Log logger = LogFactory.get(); - - /** - * 请求网关获取schema - * - * @param http 网关url - * @return schema - */ - public static String requestByGetMethod(String http) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder; - - HttpGet get = new HttpGet(http); - BufferedReader bufferedReader = null; - CloseableHttpResponse httpResponse = null; - try { - httpResponse = httpClient.execute(get); - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - int intC; - while ((intC = bufferedReader.read()) != -1) { - char c = (char) intC; - if (c == '\n') { - break; - } - entityStringBuilder.append(c); - } - - return entityStringBuilder.toString(); - } - } catch (IOException e) { - logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); - } finally { - if (httpClient != null) { - try { - httpClient.close(); - } catch (IOException e) { - logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); - } - } - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - logger.error("Close httpResponse ERROR! Exception messgae is:" + e); - } - } - if (bufferedReader != null) { - IOUtils.closeQuietly(bufferedReader); - } - } - return ""; - } -}