新增基于schema填充默认值功能(TSG-11886)
This commit is contained in:
@@ -25,11 +25,6 @@ import java.util.regex.Pattern;
|
||||
class TransFunction {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
/**
|
||||
* 校验数字正则
|
||||
*/
|
||||
private static final Pattern PATTERN = Pattern.compile("[0-9]*");
|
||||
|
||||
/**
|
||||
* IP定位库工具类
|
||||
*/
|
||||
@@ -130,7 +125,7 @@ class TransFunction {
|
||||
Long teid = null;
|
||||
String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
for (String expr : exprs) {
|
||||
Long value = JsonPathUtil.getLongValue(logValue, expr);
|
||||
Long value = JsonPathUtil.getTeidValue(logValue, expr);
|
||||
if (value != null) {
|
||||
teid = value;
|
||||
break;
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
|
||||
@@ -32,10 +31,15 @@ public class JsonParseUtil {
|
||||
private static ArrayList<String> dropList = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 在内存中加载反射类用的map
|
||||
* 获取schema指定的有效字段及类型
|
||||
*/
|
||||
private static HashMap<String, Class> jsonFieldsMap;
|
||||
|
||||
/**
|
||||
* 获取包含默认值的字段
|
||||
*/
|
||||
private static HashMap<String, Object> defaultFieldsMap = new HashMap<>(16);
|
||||
|
||||
/**
|
||||
* 获取任务列表
|
||||
* list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
|
||||
@@ -162,7 +166,8 @@ public class JsonParseUtil {
|
||||
*/
|
||||
public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
|
||||
JsonParseUtil.dropJsonField(jsonMap);
|
||||
HashMap<String, Object> tmpMap = new HashMap<>(192);
|
||||
JsonParseUtil.setFieldDefault(jsonMap);
|
||||
HashMap<String, Object> tmpMap = new HashMap<>(256);
|
||||
for (String key : jsonMap.keySet()) {
|
||||
if (jsonFieldsMap.containsKey(key)) {
|
||||
String simpleName = jsonFieldsMap.get(key).getSimpleName();
|
||||
@@ -197,16 +202,36 @@ public class JsonParseUtil {
|
||||
return jobList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除schema内指定的无效字段(jackson)
|
||||
*
|
||||
* @param jsonMap 原始日志
|
||||
*/
|
||||
public static void dropJsonField(Map<String, Object> jsonMap) {
|
||||
for (String field : dropList) {
|
||||
jsonMap.remove(field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
||||
* <p>
|
||||
* // * @param http 网关schema地址
|
||||
* 根据schema内指定的默认值,给数据赋值。
|
||||
*
|
||||
* @param jsonMap 原始日志
|
||||
*/
|
||||
private static void setFieldDefault(Map<String, Object> jsonMap) {
|
||||
for (String key : defaultFieldsMap.keySet()) {
|
||||
jsonMap.put(key, defaultFieldsMap.get(key));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 通过schema来获取所需的字段及字段类型。
|
||||
*
|
||||
* @return 用于反射生成schema类型的对象的一个map集合
|
||||
*/
|
||||
private static HashMap<String, Class> getFieldsFromSchema(String schema) {
|
||||
HashMap<String, Class> map = new HashMap<>(16);
|
||||
HashMap<String, Class> map = new HashMap<>(256);
|
||||
|
||||
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
||||
JSONObject schemaJson = new JSONObject(schema, false, true);
|
||||
@@ -215,10 +240,13 @@ public class JsonParseUtil {
|
||||
for (Object field : fields) {
|
||||
String filedStr = field.toString();
|
||||
if (checkKeepField(filedStr)) {
|
||||
String name = JsonPath.read(filedStr, "$.name").toString();
|
||||
String type = JsonPath.read(filedStr, "$.type").toString();
|
||||
if (type.contains("{")) {
|
||||
type = JsonPath.read(filedStr, "$.type.type").toString();
|
||||
JSONObject fieldJson = new JSONObject(filedStr, false, true);
|
||||
String name = fieldJson.getStr("name");
|
||||
String type = fieldJson.getStr("type");
|
||||
if (fieldJson.containsKey("default")) {
|
||||
System.out.println(fieldJson.toString());
|
||||
System.out.println(fieldJson.get("default"));
|
||||
defaultFieldsMap.put(name, fieldJson.get("default"));
|
||||
}
|
||||
//组合用来生成实体类的map
|
||||
map.put(name, getClassName(type));
|
||||
@@ -240,9 +268,9 @@ public class JsonParseUtil {
|
||||
JSONObject fieldJson = new JSONObject(message, false, true);
|
||||
boolean hasDoc = fieldJson.containsKey("doc");
|
||||
if (hasDoc) {
|
||||
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
|
||||
if (isHiveVi) {
|
||||
String visibility = JsonPath.read(message, "$.doc.visibility").toString();
|
||||
JSONObject doc = new JSONObject(fieldJson.getStr("doc"), false, true);
|
||||
if (doc.containsKey("visibility")) {
|
||||
String visibility = doc.getStr("visibility");
|
||||
if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
|
||||
isKeepField = false;
|
||||
}
|
||||
@@ -251,17 +279,6 @@ public class JsonParseUtil {
|
||||
return isKeepField;
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除schema内指定的无效字段(jackson)
|
||||
*
|
||||
* @param jsonMap
|
||||
*/
|
||||
public static void dropJsonField(Map<String, Object> jsonMap) {
|
||||
for (String field : dropList) {
|
||||
jsonMap.remove(field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
|
||||
*
|
||||
@@ -315,6 +332,7 @@ public class JsonParseUtil {
|
||||
jobList.clear();
|
||||
jsonFieldsMap.clear();
|
||||
dropList.clear();
|
||||
defaultFieldsMap.clear();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -17,62 +17,14 @@ import java.util.ArrayList;
|
||||
public class JsonPathUtil {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
|
||||
/**
|
||||
* 通过 josnPath 解析,返回String类型数据
|
||||
* 通过 josnPath 解析,返回TEID数据
|
||||
*
|
||||
* @param message json数据
|
||||
* @param expr 解析表达式
|
||||
* @return 返回值
|
||||
*/
|
||||
public static String getStringValue(String message, String expr) {
|
||||
String result = null;
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
|
||||
ArrayList<Object> read = JsonPath.parse(message).read(expr);
|
||||
if (read.size() >= 1) {
|
||||
result = read.get(0).toString();
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("JSONPath parsing json returns String data exception" + e);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 通过 josnPath 解析,返回Long类型数据
|
||||
*
|
||||
* @param message json数据
|
||||
* @param expr 解析表达式
|
||||
* @return 返回值
|
||||
*/
|
||||
public static Integer getIntegerValue(String message, String expr) {
|
||||
Integer result = null;
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
|
||||
ArrayList<Object> read = JsonPath.parse(message).read(expr);
|
||||
if (read.size() >= 1) {
|
||||
result = Integer.parseInt(read.get(0).toString());
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("JSONPath parsing json returns Long data exception" + e);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过 josnPath 解析,返回Long类型数据
|
||||
*
|
||||
* @param message json数据
|
||||
* @param expr 解析表达式
|
||||
* @return 返回值
|
||||
*/
|
||||
public static Long getLongValue(String message, String expr) {
|
||||
public static Long getTeidValue(String message, String expr) {
|
||||
Long result = null;
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
|
||||
|
||||
@@ -41,6 +41,7 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem
|
||||
return json;
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user