dimensions, JSONObject originalLog, String parameters, String resultKeyName, String logsKeyName) {
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String combinationFieldKey = combinationPars[0];
String separator = combinationPars[1];
- Object combinationFieldValue = JsonParseUtil.getValue(message, combinationFieldKey);
+ Object combinationFieldValue = originalLog.get(combinationFieldKey);
if (combinationFieldValue != null) {
- Object logsFieldValue = JsonParseUtil.getValue(message, logsKeyName);
+ Object logsFieldValue = originalLog.get(logsKeyName);
String combinationResult = logsFieldValue + separator + combinationFieldValue;
- JsonParseUtil.setValue(dimensions, resultKeyName, combinationResult);
- JsonParseUtil.setValue(message, logsKeyName, combinationResult);
+ dimensions.put(resultKeyName, combinationResult);
}
}
/**
* 根据表达式解析json
*
- * //* @param message json
+ * //* @param message meta
*
* @param expr 解析表达式
* @return 解析结果
@@ -137,4 +123,80 @@ public class ParseFunctions {
}
}
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param jsonMap 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ @Deprecated
+ private static Object isJsonValue(Map jsonMap, String param) {
+ if (param.contains(StreamAggregateConfig.IS_JSON_KEY_TAG)) {
+ return jsonMap.get(param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param jsonMap 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ @Deprecated
+ public static Object condition(Map jsonMap, String ifParam) {
+ Object result = null;
+ String separator = "!=";
+ try {
+ String[] split = ifParam.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ if (split.length == 3) {
+ String expression = split[0];
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (expression.contains(separator)) {
+ String[] regexp = expression.split(separator);
+ Object direction = isJsonValue(jsonMap, regexp[0]);
+ if (direction instanceof Number) {
+ result = Integer.parseInt(direction.toString()) != Integer.parseInt(regexp[1]) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(regexp[1]) ? resultA : resultB;
+ }
+ } else {
+ String[] regexp = expression.split(StreamAggregateConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, regexp[0]);
+ if (direction instanceof Number) {
+ result = Integer.parseInt(direction.toString()) == Integer.parseInt(regexp[1]) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(regexp[1]) ? resultA : resultB;
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF function execution exception, exception information:" + e.getMessage());
+ }
+ return result;
+ }
+
+ /**
+ * 获取HLLSketch内容
+ *
+ * @param jsonMap 原始日志
+ * @param key meta key名称
+ * @return HLLSketch数据数组
+ */
+ public static String getHllSketch(JSONObject jsonMap, String key) {
+ try {
+ HllSketch hllSketchResult = (HllSketch) jsonMap.get(key);
+ if (hllSketchResult != null) {
+ return Base64.getEncoder().encodeToString(hllSketchResult.toUpdatableByteArray());
+ }
+ } catch (RuntimeException e) {
+ logger.error("HllSketch data conversion exception,data may be empty! exception:{}", e);
+ }
+ return null;
+ }
+
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
deleted file mode 100644
index 8555b1f..0000000
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
+++ /dev/null
@@ -1,140 +0,0 @@
-package com.zdjizhi.utils.json;
-
-
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.exception.AnalysisException;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package PACKAGE_NAME
- * @Description:
- * @date 2021/7/1217:34
- */
-public class JsonTypeUtil {
- /**
- * String 类型检验转换方法
- *
- * @param value json value
- * @return String value
- */
- public static String checkString(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
- * array 类型检验转换方法
- *
- * @param value json value
- * @return List value
- */
- private static Map checkObject(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return (Map) value;
- }
-
- throw new AnalysisException("can not cast to map, value : " + value);
- }
-
- /**
- * array 类型检验转换方法
- *
- * @param value json value
- * @return List value
- */
- private static List checkArray(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof List) {
- return (List) value;
- }
-
- throw new AnalysisException("can not cast to List, value : " + value);
- }
-
- private static Long checkLong(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToLong(value);
- }
-
- /**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @param value json value
- * @return Long value
- */
- public static long checkLongValue(Object value) {
-
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- /**
- * Double 类型校验转换方法
- *
- * @param value json value
- * @return Double value
- */
- private static Double checkDouble(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToDouble(value);
- }
-
-
- private static Integer checkInt(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToInt(value);
- }
-
-
- /**
- * int 类型检验转换方法,若为空返回基础值
- *
- * @param value json value
- * @return int value
- */
- private static int getIntValue(Object value) {
-
- Integer intVal = TypeUtils.castToInt(value);
-
- if (intVal == null) {
- return 0;
- }
- return intVal;
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
deleted file mode 100644
index 9cb0631..0000000
--- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package com.zdjizhi.utils.json;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.AnalysisException;
-
-
-/**
- * @author qidaijie
- * @Package PACKAGE_NAME
- * @Description:
- * @date 2021/7/1218:20
- */
-public class TypeUtils {
- private static final Log logger = LogFactory.get();
-
- /**
- * Integer 类型判断方法
- *
- * @param value json value
- * @return Integer value or null
- */
- public static Object castToIfFunction(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof String) {
- return value.toString();
- }
-
- if (value instanceof Integer) {
- return ((Number) value).intValue();
- }
-
- if (value instanceof Long) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value ? 1 : 0;
- }
-
- throw new AnalysisException("can not cast to int, value : " + value);
- }
-
- /**
- * Integer 类型判断方法
- *
- * @param value json value
- * @return Integer value or null
- */
- static Integer castToInt(Object value) {
-
- if (value == null) {
- return null;
- }
-
- if (value instanceof Integer) {
- return (Integer) value;
- }
-
- //此判断数值超范围不抛出异常,会截取成对应类型数值
-// if (value instanceof Number) {
-// return ((Number) value).intValue();
-// }
-
- if (value instanceof String) {
- String strVal = (String) value;
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Integer.parseInt(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Integer Error,The error Str is:" + strVal);
- }
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value ? 1 : 0;
- }
-
- throw new AnalysisException("can not cast to int, value : " + value);
- }
-
- /**
- * Double类型判断方法
- *
- * @param value json value
- * @return double value or null
- */
- static Double castToDouble(Object value) {
-
- if (value instanceof Double) {
- return (Double) value;
- }
-
- //此判断数值超范围不抛出异常,会截取成对应类型数值
-// if (value instanceof Number) {
-// return ((Number) value).doubleValue();
-// }
-
- if (value instanceof String) {
- String strVal = (String) value;
-
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Double.parseDouble(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Double Error,The error Str is:" + strVal);
- }
- }
-
- throw new AnalysisException("can not cast to double, value : " + value);
- }
-
- /**
- * Long类型判断方法
- *
- * @param value json value
- * @return (Long)value or null
- */
- static Long castToLong(Object value) {
- if (value == null) {
- return null;
- }
-
-// 此判断数值超范围不抛出异常,会截取成对应类型数值
- if (value instanceof Number) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof String) {
- String strVal = (String) value;
-
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Long.parseLong(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Long Error,The error Str is:" + strVal);
- }
- }
-
- throw new AnalysisException("can not cast to long, value : " + value);
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 545a0e3..d9f1b37 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -22,6 +22,7 @@ public class KafkaConsumer {
properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties);
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java
similarity index 65%
rename from src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
rename to src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java
index 4b3f75a..21073ac 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.utils.meta;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -10,28 +10,22 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
+
/**
* 使用FastJson解析json的工具类
*
* @author qidaijie
*/
-public class JsonParseUtil {
+public class MetaDataParse {
private static final Log logger = LogFactory.get();
private static Properties propNacos = new Properties();
- /**
- * 获取actions所有的计算函数
- */
- private static HashMap actionMap = new HashMap<>(16);
-
/**
* 解析metrics指标字段信息
*/
@@ -57,11 +51,6 @@ public class JsonParseUtil {
*/
private static ArrayList metricsFiledNameList = new ArrayList<>();
- /**
- * 解析hierarchy函数,获取切分信息
- */
- private static String[] hierarchy;
-
/**
* 解析时间戳字段名称
*/
@@ -98,70 +87,6 @@ public class JsonParseUtil {
}
}
- /**
- * 获取属性值的方法
- *
- * @param jsonMap 原始日志
- * @param key josn key名称
- * @return 属性的值
- */
- public static Object getValue(Map jsonMap, String key) {
- try {
- return jsonMap.getOrDefault(key, null);
- } catch (RuntimeException e) {
- logger.error("Get the JSON value is abnormal,The key is :" + key + "error message is :" + e);
- return null;
- }
- }
-
- /**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @return Long value
- */
- public static Long getLong(Map jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- public static String getString(Map jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
- * 更新属性值的方法
- *
- * @param jsonMap 原始日志json map
- * @param property 更新的key
- * @param value 更新的值
- */
- public static void setValue(Map jsonMap, String property, Object value) {
- try {
- jsonMap.put(property, value);
- } catch (RuntimeException e) {
- logger.error("The JSON set value is abnormal,the error message is :", e);
- }
- }
-
/**
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
* 用于反射生成schema类型的对象的一个map集合
@@ -170,12 +95,6 @@ public class JsonParseUtil {
clearCacheMap();
DocumentContext parse = JsonPath.parse(schema);
- List