diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 028a9b4..7e16d72 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -25,11 +25,6 @@ import java.util.regex.Pattern; class TransFunction { private static final Log logger = LogFactory.get(); - /** - * 校验数字正则 - */ - private static final Pattern PATTERN = Pattern.compile("[0-9]*"); - /** * IP定位库工具类 */ @@ -130,7 +125,7 @@ class TransFunction { Long teid = null; String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER); for (String expr : exprs) { - Long value = JsonPathUtil.getLongValue(logValue, expr); + Long value = JsonPathUtil.getTeidValue(logValue, expr); if (value != null) { teid = value; break; diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 0ee7d3b..106fdb9 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -9,7 +9,6 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; -import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; @@ -32,10 +31,15 @@ public class JsonParseUtil { private static ArrayList dropList = new ArrayList<>(); /** - * 在内存中加载反射类用的map + * 获取schema指定的有效字段及类型 */ private static HashMap jsonFieldsMap; + /** + * 获取包含默认值的字段 + */ + private static HashMap defaultFieldsMap = new HashMap<>(16); + /** * 获取任务列表 * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: @@ -162,7 +166,8 @@ public class JsonParseUtil { */ public static Map typeTransform(Map jsonMap) throws RuntimeException { JsonParseUtil.dropJsonField(jsonMap); - HashMap tmpMap = new HashMap<>(192); + JsonParseUtil.setFieldDefault(jsonMap); + HashMap tmpMap = new HashMap<>(256); for (String key : jsonMap.keySet()) { if (jsonFieldsMap.containsKey(key)) { String simpleName = jsonFieldsMap.get(key).getSimpleName(); @@ -197,16 +202,36 @@ public class JsonParseUtil { return jobList; } + /** + * 删除schema内指定的无效字段(jackson) + * + * @param jsonMap 原始日志 + */ + public static void dropJsonField(Map jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - *

- * // * @param http 网关schema地址 + * 根据schema内指定的默认值,给数据赋值。 + * + * @param jsonMap 原始日志 + */ + private static void setFieldDefault(Map jsonMap) { + for (String key : defaultFieldsMap.keySet()) { + jsonMap.put(key, defaultFieldsMap.get(key)); + } + } + + + /** + * 通过schema来获取所需的字段及字段类型。 * * @return 用于反射生成schema类型的对象的一个map集合 */ private static HashMap getFieldsFromSchema(String schema) { - HashMap map = new HashMap<>(16); + HashMap map = new HashMap<>(256); //获取fields,并转化为数组,数组的每个元素都是一个name doc type JSONObject schemaJson = new JSONObject(schema, false, true); @@ -215,10 +240,13 @@ public class JsonParseUtil { for (Object field : fields) { String filedStr = field.toString(); if (checkKeepField(filedStr)) { - String name = JsonPath.read(filedStr, "$.name").toString(); - String type = JsonPath.read(filedStr, "$.type").toString(); - if (type.contains("{")) { - type = JsonPath.read(filedStr, "$.type.type").toString(); + JSONObject fieldJson = new JSONObject(filedStr, false, true); + String name = fieldJson.getStr("name"); + String type = fieldJson.getStr("type"); + if (fieldJson.containsKey("default")) { + System.out.println(fieldJson.toString()); + System.out.println(fieldJson.get("default")); + defaultFieldsMap.put(name, fieldJson.get("default")); } //组合用来生成实体类的map map.put(name, getClassName(type)); @@ -240,9 +268,9 @@ public class JsonParseUtil { JSONObject fieldJson = new JSONObject(message, false, true); boolean hasDoc = fieldJson.containsKey("doc"); if (hasDoc) { - boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); - if (isHiveVi) { - String visibility = JsonPath.read(message, "$.doc.visibility").toString(); + JSONObject doc = new JSONObject(fieldJson.getStr("doc"), false, true); + if (doc.containsKey("visibility")) { + String visibility = doc.getStr("visibility"); if (FlowWriteConfig.VISIBILITY.equals(visibility)) { isKeepField = false; } @@ -251,17 +279,6 @@ public class JsonParseUtil { return isKeepField; } - /** - * 删除schema内指定的无效字段(jackson) - * - * @param jsonMap - */ - public static void dropJsonField(Map jsonMap) { - for (String field : dropList) { - jsonMap.remove(field); - } - } - /** * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist) * @@ -315,6 +332,7 @@ public class JsonParseUtil { jobList.clear(); jsonFieldsMap.clear(); dropList.clear(); + defaultFieldsMap.clear(); } } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java index a3fb6a6..70b4b19 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java @@ -17,62 +17,14 @@ import java.util.ArrayList; public class JsonPathUtil { private static final Log logger = LogFactory.get(); - /** - * 通过 josnPath 解析,返回String类型数据 + * 通过 josnPath 解析,返回TEID数据 * * @param message json数据 * @param expr 解析表达式 * @return 返回值 */ - public static String getStringValue(String message, String expr) { - String result = null; - try { - if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { - ArrayList read = JsonPath.parse(message).read(expr); - if (read.size() >= 1) { - result = read.get(0).toString(); - } - } - } catch (RuntimeException e) { - logger.error("JSONPath parsing json returns String data exception" + e); - } - - return result; - } - - - /** - * 通过 josnPath 解析,返回Long类型数据 - * - * @param message json数据 - * @param expr 解析表达式 - * @return 返回值 - */ - public static Integer getIntegerValue(String message, String expr) { - Integer result = null; - try { - if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { - ArrayList read = JsonPath.parse(message).read(expr); - if (read.size() >= 1) { - result = Integer.parseInt(read.get(0).toString()); - } - } - } catch (RuntimeException e) { - logger.error("JSONPath parsing json returns Long data exception" + e); - } - - return result; - } - - /** - * 通过 josnPath 解析,返回Long类型数据 - * - * @param message json数据 - * @param expr 解析表达式 - * @return 返回值 - */ - public static Long getLongValue(String message, String expr) { + public static Long getTeidValue(String message, String expr) { Long result = null; try { if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) { diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java index 920ffab..e38f9e6 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java +++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java @@ -41,6 +41,7 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem return json; } catch (RuntimeException e) { logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage()); + e.printStackTrace(); } } return null;