diff --git a/pom.xml b/pom.xml index e4f3384..fe9656f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-olap-analysis-schema - 220822-VSYS + 230317-DataSketches log-olap-analysis-schema http://www.example.com @@ -16,7 +16,7 @@ nexus Team Nexus Repository - http://192.168.40.125:8099/content/groups/public + http://192.168.40.153:8099/content/groups/public @@ -188,7 +188,7 @@ com.jayway.jsonpath json-path - 2.4.0 + 2.7.0 @@ -209,6 +209,20 @@ com.alibaba.nacos nacos-client ${nacos.version} + + + com.google.guava + guava + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + @@ -218,6 +232,12 @@ test + + org.apache.datasketches + datasketches-java + 3.2.0 + + diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index 0243f97..9ea9df5 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -15,8 +15,24 @@ public class StreamAggregateConfig { encryptor.setPassword("galaxy"); } + /** + * 默认的切分符号 + */ public static final String FORMAT_SPLITTER = ","; + /** + * 协议分隔符,需要转义 + */ public static final String PROTOCOL_SPLITTER = "\\."; + /** + * 标识字段为日志字段还是schema指定字段 + */ + public static final String IS_JSON_KEY_TAG = "$."; + + /** + * if函数连接分隔符 + */ + public static final String IF_CONDITION_SPLITTER = "="; + /** * Nacos @@ -27,7 +43,7 @@ public class StreamAggregateConfig { public static final String NACOS_PIN = StreamAggregateConfigurations.getStringProperty(1, "nacos.pin"); public static final String NACOS_GROUP = StreamAggregateConfigurations.getStringProperty(1, "nacos.group"); public static final String NACOS_USERNAME = StreamAggregateConfigurations.getStringProperty(1, "nacos.username"); - + /** * System */ diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index c2d4f31..1813a89 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -3,10 +3,10 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction; import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction; import com.zdjizhi.utils.functions.parse.ParseMapFunction; +import com.zdjizhi.utils.functions.result.ResultFlatMapFunction; import com.zdjizhi.utils.functions.statistics.FirstCountWindowFunction; import com.zdjizhi.utils.functions.statistics.SecondCountWindowFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; @@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTime import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.util.Map; + /** * @author qidaijie @@ -38,29 +40,37 @@ public class StreamAggregateTopology { //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); + //解析原始日志 DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC); - SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction()) + //解析原始日志初步聚合计算,增加自定义key 缓解数据倾斜 + SingleOutputStreamOperator>> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); - WindowedStream, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) + //初步聚合计算,增加自定义key 缓解数据倾斜 + WindowedStream>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME))); - SingleOutputStreamOperator> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) + //初次聚合计算窗口 + SingleOutputStreamOperator>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) .name("FirstCountWindow") .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM); - WindowedStream, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) + //二次聚合计算,使用业务的key 进行数据汇总 + WindowedStream>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME))); - SingleOutputStreamOperator secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) + //二次聚合计算窗口 + SingleOutputStreamOperator> secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); + //拆解结果数据按protocol id循环输出 SingleOutputStreamOperator resultFlatMap = secondCountWindow.flatMap(new ResultFlatMapFunction()) .name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); + //输出到kafka resultFlatMap.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(StreamAggregateConfig.SINK_PARALLELISM).name(StreamAggregateConfig.SINK_KAFKA_TOPIC); diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java deleted file mode 100644 index 57e8a2c..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; - -import java.util.Arrays; -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2114:52 - */ -public class ResultFlatMapFunction implements FlatMapFunction { - private static String[] jobList = JsonParseUtil.getHierarchy(); - private static final String APP_NAME = "app_name"; - - @Override - @SuppressWarnings("unchecked") - public void flatMap(String value, Collector out) throws Exception { - StringBuffer stringBuffer = new StringBuffer(); - String name = jobList[0]; - Map jsonObject = (Map) JsonMapper.fromJsonString(value, Map.class); - String protocol = JsonParseUtil.getString(jsonObject, name); - String appName = JsonParseUtil.getString(jsonObject, APP_NAME); - jsonObject.remove(APP_NAME); - if (StringUtil.isNotBlank(protocol)) { - String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); - for (String proto : protocolIds) { - if (StringUtil.isBlank(stringBuffer.toString())) { - stringBuffer.append(proto); - jsonObject.put(name, stringBuffer.toString()); - out.collect(JsonMapper.toJsonString(jsonObject)); - } else { - stringBuffer.append(jobList[1]).append(proto); - if (proto.equals(appName)) { - jsonObject.put(APP_NAME, appName); - } - jsonObject.put(name, stringBuffer.toString()); - out.collect(JsonMapper.toJsonString(jsonObject)); - } - } - } - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java index 7783676..6512f0f 100644 --- a/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java @@ -6,16 +6,18 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; +import java.util.Map; + /** * @author qidaijie * @Package com.zdjizhi.utils.functions * @Description: * @date 2021/7/2112:13 */ -public class FirstKeyByFunction implements KeySelector, String> { +public class FirstKeyByFunction implements KeySelector>, String> { @Override - public String getKey(Tuple3 value) throws Exception { + public String getKey(Tuple3> value) throws Exception { //以map拼接的key分组 return value.f0; } diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java index fd81d6e..37bf83a 100644 --- a/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java @@ -6,16 +6,18 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; +import java.util.Map; + /** * @author qidaijie * @Package com.zdjizhi.utils.functions * @Description: * @date 2021/7/2112:13 */ -public class SecondKeyByFunction implements KeySelector, String> { +public class SecondKeyByFunction implements KeySelector>, String> { @Override - public String getKey(Tuple2 value) throws Exception { + public String getKey(Tuple2> value) throws Exception { //以map拼接的key分组 return value.f0; } diff --git a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java index 4d69c82..c102788 100644 --- a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java @@ -22,12 +22,12 @@ import java.util.concurrent.ThreadLocalRandom; * @Description: * @date 2021/5/2715:01 */ -public class ParseMapFunction implements MapFunction> { +public class ParseMapFunction implements MapFunction>> { private static final Log logger = LogFactory.get(); @Override @SuppressWarnings("unchecked") - public Tuple3 map(String message) { + public Tuple3> map(String message) { try { ArrayList jobList = JsonParseUtil.getTransformsList(); HashMap dimensionsMap = JsonParseUtil.getDimensionsMap(); @@ -35,6 +35,7 @@ public class ParseMapFunction implements MapFunction originalLog = (Map) JsonMapper.fromJsonString(message, Map.class); Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, originalLog); if (ParseFunctions.filterLogs(originalLog)) { + Map metricsLog = ParseFunctions.getMetricsLog(originalLog); for (String[] strings : jobList) { //函数名称 String function = strings[0]; @@ -44,7 +45,6 @@ public class ParseMapFunction implements MapFunction(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog)); + return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), metricsLog); default: break; } @@ -74,9 +74,9 @@ public class ParseMapFunction implements MapFunction("", "", ""); + return new Tuple3<>("", "", null); } - return new Tuple3<>("", "", ""); + return new Tuple3<>("", "", null); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java new file mode 100644 index 0000000..1691ed8 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java @@ -0,0 +1,51 @@ +package com.zdjizhi.utils.functions.result; + +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2114:52 + */ +public class ResultFlatMapFunction implements FlatMapFunction, String> { + private static final String PROTOCOL_ID_KEY = "protocol_stack_id"; + private static final String APP_NAME_KEY = "app_name"; + private static final String HLL_SKETCH_KEY = "client_ip_sketch"; + + + @Override + @SuppressWarnings("unchecked") + public void flatMap(Map jsonObject, Collector out) throws Exception { + String protocol = JsonParseUtil.getString(jsonObject, PROTOCOL_ID_KEY); + if (jsonObject.containsKey(HLL_SKETCH_KEY)){ + JsonParseUtil.setValue(jsonObject, HLL_SKETCH_KEY, JsonParseUtil.getHllSketch(jsonObject, HLL_SKETCH_KEY)); + } + out.collect(JsonMapper.toJsonString(jsonObject)); + jsonObject.remove(APP_NAME_KEY); + + StringBuilder stringBuilder = new StringBuilder(); + if (StringUtil.isNotBlank(protocol)) { + String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); + int protocolIdsNum = protocolIds.length; + for (int i = 0; i < protocolIdsNum - 1; i++) { + if (StringUtil.isBlank(stringBuilder.toString())) { + stringBuilder.append(protocolIds[i]); + jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString()); + out.collect(JsonMapper.toJsonString(jsonObject)); + } else { + stringBuilder.append(".").append(protocolIds[i]); + jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString()); + out.collect(JsonMapper.toJsonString(jsonObject)); + } + } + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java index 93844be..4bca146 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java @@ -5,6 +5,7 @@ import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.MetricFunctions; import com.zdjizhi.utils.general.ParseFunctions; import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.datasketches.hll.HllSketch; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; @@ -22,35 +23,27 @@ import java.util.Map; * @Description: * @date 2021/7/2113:55 */ -public class FirstCountWindowFunction extends ProcessWindowFunction, Tuple2, String, TimeWindow> { +public class FirstCountWindowFunction extends ProcessWindowFunction>, Tuple2>, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); - private HashMap> cacheMap = new HashMap<>(16); + private HashMap> cacheMap = new HashMap<>(32); @Override @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable> input, Collector> output) { + public void process(String key, Context context, Iterable>> input, Collector>> output) { try { HashMap metricsMap = JsonParseUtil.getMetricFunctionsMap(); - HashMap actionMap = JsonParseUtil.getActionMap(); - for (Tuple3 tuple : input) { - String groupKey = tuple.f0; - String protocol = groupKey.substring(0, groupKey.indexOf("@")); + for (Tuple3> tuple : input) { String dimensions = tuple.f1; - String metrics = tuple.f2; - //action中某个协议的所有action,如果没有就默认 - String[] protocolMetrics = actionMap.getOrDefault(protocol, actionMap.get("Default")); - if (StringUtil.isNotBlank(metrics)) { - Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class); - Map metricsObj = (Map) JsonMapper.fromJsonString(metrics, Map.class); + Map metrics = tuple.f2; + if (metrics.size() != 0) { + Map cacheMessage = cacheMap.getOrDefault(dimensions, (Map) JsonMapper.fromJsonString(dimensions, Map.class)); - Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); - for (String protocolMetric : protocolMetrics) { - String[] functions = metricsMap.get(protocolMetric); + for (String resultKeyName : metricsMap.keySet()) { + String[] functions = metricsMap.get(resultKeyName); String function = functions[0]; String fieldName = functions[1]; - functionSet(function, cacheMessage, protocolMetric, cacheMessage.get(protocolMetric), JsonParseUtil.getValue(metricsObj, fieldName)); - + functionSet(function, cacheMessage, resultKeyName, JsonParseUtil.getValue(metrics, fieldName)); } cacheMap.put(dimensions, cacheMessage); } @@ -58,8 +51,7 @@ public class FirstCountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(dimensions); - output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap))); + output.collect(new Tuple2<>(dimensions, cacheMap.get(dimensions))); } } @@ -71,27 +63,25 @@ public class FirstCountWindowFunction extends ProcessWindowFunction cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + private static void functionSet(String function, Map cacheMessage, String resultKeyName, Object fieldNameValue) { switch (function) { case "sum": - cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue)); break; case "count": - cacheMessage.put(resultName, MetricFunctions.count(nameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.count(cacheMessage.get(resultKeyName))); break; - case "unique_sip_num": - //TODO - break; - case "unique_cip_num": - //TODO + case "HLLSketchBuild": + cacheMessage.put(resultKeyName, MetricFunctions.uniqueHllSketch((HllSketch) cacheMessage.get(resultKeyName), fieldNameValue.toString())); break; default: break; diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java index c0b2091..c09e2f6 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java @@ -4,6 +4,7 @@ import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.MetricFunctions; import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.datasketches.hll.HllSketch; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -20,28 +21,27 @@ import java.util.Map; * @Description: * @date 2021/7/2113:55 */ -public class SecondCountWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { +public class SecondCountWindowFunction extends ProcessWindowFunction>, Map, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class); - private HashMap> cacheMap = new HashMap<>(16); + private HashMap> cacheMap = new HashMap<>(32); @Override @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable> input, Collector output) { + public void process(String key, Context context, Iterable>> input, Collector> output) { try { HashMap metricsMap = JsonParseUtil.getMetricFunctionsMap(); - for (Tuple2 tuple : input) { + for (Tuple2> tuple : input) { String dimensions = tuple.f0; - String message = tuple.f1; - if (StringUtil.isNotBlank(message)) { + Map message = tuple.f1; + if (message.size() != 0) { Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class); - Map object = (Map) JsonMapper.fromJsonString(message, Map.class); Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); - for (String name : metricsMap.keySet()) { - String[] metrics = metricsMap.get(name); + for (String resultName : metricsMap.keySet()) { + String[] metrics = metricsMap.get(resultName); String function = metrics[0]; - functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name)); + functionSet(function, cacheMessage, resultName, JsonParseUtil.getValue(message, resultName)); } cacheMap.put(dimensions, cacheMessage); @@ -54,7 +54,7 @@ public class SecondCountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(countKey); JsonParseUtil.setValue(resultMap, JsonParseUtil.getResultTimeKey(), endTime); - output.collect(JsonMapper.toJsonString(resultMap)); + output.collect(resultMap); } } @@ -71,22 +71,19 @@ public class SecondCountWindowFunction extends ProcessWindowFunction cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + private static void functionSet(String function, Map cacheMessage, String resultKeyName, Object fieldNameValue) { switch (function) { case "sum": - cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue)); break; case "count": - cacheMessage.put(resultName, MetricFunctions.count(nameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.count(cacheMessage.get(resultKeyName))); break; - case "unique_sip_num": - //TODO - break; - case "unique_cip_num": - //TODO + case "HLLSketchBuild": + cacheMessage.put(resultKeyName, MetricFunctions.hllSketchUnion((HllSketch) cacheMessage.get(resultKeyName), (HllSketch) fieldNameValue)); break; default: break; diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java index 0672179..c248826 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java @@ -1,7 +1,11 @@ package com.zdjizhi.utils.general; +import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonTypeUtil; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.Union; + /** * @author qidaijie @@ -10,6 +14,7 @@ import com.zdjizhi.utils.json.JsonTypeUtil; * @date 2021/7/2015:31 */ public class MetricFunctions { + /** * Long类型的数据求和 * @@ -31,7 +36,42 @@ public class MetricFunctions { * @return count+1 */ public static Long count(Object count) { - return JsonTypeUtil.checkLongValue(count) + 1L; } + + /** + * 更新HllSketch内容 + * + * @param sketch 当前sketch + * @param str ip地址 + * @return 更新后sketch + */ + public static HllSketch uniqueHllSketch(HllSketch sketch, String str) { + if (StringUtil.isNotBlank(str)) { + if (sketch != null) { + sketch.update(str); + } else { + sketch = new HllSketch(12); + sketch.update(str); + } + } + + return sketch; + } + + /** + * @param cacheSketch 缓存的sketch + * @param newSketch 聚合后的sketch + * @return 合并后的sketch + */ + public static HllSketch hllSketchUnion(HllSketch cacheSketch, HllSketch newSketch) { + Union union = new Union(12); + if (cacheSketch != null) { + union.update(cacheSketch); + } + if (newSketch != null) { + union.update(newSketch); + } + return HllSketch.heapify(union.getResult().toUpdatableByteArray()); + } } diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java index 0c8d7df..d6577eb 100644 --- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -3,7 +3,6 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.JsonMapper; @@ -50,8 +49,8 @@ public class ParseFunctions { /** * 解析 dimensions 字段集 * - * @param dimensions 维度集 - * @param originalLog 原始日志 + * @param dimensions 维度集 + * @param originalLog 原始日志 * @return 结果维度集 */ public static Map transDimensions(Map dimensions, Map originalLog) { @@ -70,30 +69,19 @@ public class ParseFunctions { * @param originalLog 原始日志json * @return 统计metrics json */ - public static String getMetricsLog(Map originalLog) { - Map json = new HashMap<>(16); + public static Map getMetricsLog(Map originalLog) { + Map metricsMap = new HashMap<>(16); for (String logsKeyName : JsonParseUtil.getMetricsFiledNameList()) { - json.put(logsKeyName, originalLog.get(logsKeyName)); + if (originalLog.containsKey(logsKeyName)) { + metricsMap.put(logsKeyName, originalLog.get(logsKeyName)); + } } - return JsonMapper.toJsonString(json); + return metricsMap; } - /** - * alignment ID替换操作 - * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。 - * - * @param parameters 参数集 - * @param fieldName 原始日志列名 - */ - public static String dismantlingUtils(String parameters, Object fieldName) { - String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - int digits = Integer.parseInt(alignmentPars[0]); - return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; - } - /** * combination 拼接操作 * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符 @@ -137,4 +125,61 @@ public class ParseFunctions { } } + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param jsonMap 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + @Deprecated + private static Object isJsonValue(Map jsonMap, String param) { + if (param.contains(StreamAggregateConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(jsonMap, param.substring(2)); + } else { + return param; + } + } + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param jsonMap 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + @Deprecated + public static Object condition(Map jsonMap, String ifParam) { + Object result = null; + String separator = "!="; + try { + String[] split = ifParam.split(StreamAggregateConfig.FORMAT_SPLITTER); + if (split.length == 3) { + String expression = split[0]; + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (expression.contains(separator)) { + String[] regexp = expression.split(separator); + Object direction = isJsonValue(jsonMap, regexp[0]); + if (direction instanceof Number) { + result = Integer.parseInt(direction.toString()) != Integer.parseInt(regexp[1]) ? resultA : resultB; + } else if (direction instanceof String) { + result = direction.equals(regexp[1]) ? resultA : resultB; + } + } else { + String[] regexp = expression.split(StreamAggregateConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, regexp[0]); + if (direction instanceof Number) { + result = Integer.parseInt(direction.toString()) == Integer.parseInt(regexp[1]) ? resultA : resultB; + } else if (direction instanceof String) { + result = direction.equals(regexp[1]) ? resultA : resultB; + } + } + } + } catch (RuntimeException e) { + logger.error("IF function execution exception, exception information:" + e.getMessage()); + } + return result; + } + } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 4b3f75a..13d2270 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -12,11 +12,13 @@ import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import net.sf.cglib.beans.BeanMap; import java.util.*; import java.util.concurrent.Executor; +import org.apache.datasketches.hll.HllSketch; + + /** * 使用FastJson解析json的工具类 * @@ -27,11 +29,6 @@ public class JsonParseUtil { private static final Log logger = LogFactory.get(); private static Properties propNacos = new Properties(); - /** - * 获取actions所有的计算函数 - */ - private static HashMap actionMap = new HashMap<>(16); - /** * 解析metrics指标字段信息 */ @@ -57,11 +54,6 @@ public class JsonParseUtil { */ private static ArrayList metricsFiledNameList = new ArrayList<>(); - /** - * 解析hierarchy函数,获取切分信息 - */ - private static String[] hierarchy; - /** * 解析时间戳字段名称 */ @@ -101,8 +93,8 @@ public class JsonParseUtil { /** * 获取属性值的方法 * - * @param jsonMap 原始日志 - * @param key josn key名称 + * @param jsonMap 原始日志 + * @param key josn key名称 * @return 属性的值 */ public static Object getValue(Map jsonMap, String key) { @@ -114,6 +106,25 @@ public class JsonParseUtil { } } + /** + * 获取HLLSketch内容 + * + * @param jsonMap 原始日志 + * @param key json key名称 + * @return HLLSketch数据数组 + */ + public static byte[] getHllSketch(Map jsonMap, String key) { + try { + HllSketch hllSketchResult = (HllSketch) jsonMap.getOrDefault(key, null); + if (hllSketchResult != null) { + return hllSketchResult.toUpdatableByteArray(); + } + } catch (RuntimeException e) { + logger.error("HllSketch data conversion exception,data may be empty! exception:{}", e); + } + return null; + } + /** * long 类型检验转换方法,若为空返回基础值 * @@ -170,12 +181,6 @@ public class JsonParseUtil { clearCacheMap(); DocumentContext parse = JsonPath.parse(schema); - List actions = parse.read("$.doc.action[*]"); - for (Object action : actions) { - actionMap.put(JsonPath.read(action, "$.label"), - JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER)); - } - List metricFunctions = parse.read("$.doc.metrics[*]"); for (Object metric : metricFunctions) { metricFunctionsMap.put(JsonPath.read(metric, "$.name"), @@ -208,26 +213,9 @@ public class JsonParseUtil { transformsList.add(new String[]{function, name, fieldName, parameters}); } - List hierarchyList = parse.read("$.doc.transforms[*]"); - for (Object transform : hierarchyList) { - String function = JsonPath.read(transform, "$.function").toString(); - if ("hierarchy".equals(function)) { - String name = JsonPath.read(transform, "$.name").toString(); - String parameters = JsonPath.read(transform, "$.parameters").toString(); - hierarchy = new String[]{name, parameters}; - } - } - resultTimeKey = JsonPath.read(schema, "$.doc.timestamp.name"); } - /** - * @return 解析schema获取的actions集合 - */ - public static HashMap getActionMap() { - return actionMap; - } - /** * @return 解析schema获取的指标统计方式集合 */ @@ -263,13 +251,6 @@ public class JsonParseUtil { return metricsFiledNameList; } - /** - * @return 解析schema获取的拆解函数 - */ - public static String[] getHierarchy() { - return hierarchy; - } - /** * @return 解析schema获取的时间字段的key */ @@ -281,7 +262,6 @@ public class JsonParseUtil { * 在配置变化时清空缓存,重新解析schema更新缓存 */ private static void clearCacheMap() { - actionMap.clear(); metricFunctionsMap.clear(); dimensionsMap.clear(); filtersMap.clear(); diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java index 8555b1f..3ec3908 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java @@ -94,6 +94,10 @@ public class JsonTypeUtil { return 0L; } + if (longVal < 0L) { + return 0L; + } + return longVal; } diff --git a/src/test/java/com/zdjizhi/DatasketchesTest.java b/src/test/java/com/zdjizhi/DatasketchesTest.java new file mode 100644 index 0000000..755eedb --- /dev/null +++ b/src/test/java/com/zdjizhi/DatasketchesTest.java @@ -0,0 +1,228 @@ +package com.zdjizhi; + +import com.alibaba.fastjson.JSONObject; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.Union; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Properties; +import java.util.Random; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/3/217:17 + */ +public class DatasketchesTest { + + @Test + public void HllSketchTest() { + HashSet strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + + HashSet randomStrings = new HashSet<>(); + + HllSketch randomSketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = makeIPv4Random(); + randomSketch.update(ip); + randomStrings.add(ip); + } + + System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size()); + } + + @Test + public void HllSketchUnionTest() { + HashSet strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + HllSketch sketch2 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.2." + i; + sketch2.update(ip); + strings.add(ip); + } + + Union union = new Union(12); + + union.update(sketch); + union.update(sketch2); + HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray()); + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + System.out.println(sketch2.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result.getEstimate() + "--" + strings.size()); + } + + @Test + public void HllSketchDruidTest() { + + HashSet strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + HllSketch sketch2 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.2." + i; + sketch2.update(ip); + strings.add(ip); + } + + Union union = new Union(12); + + union.update(sketch); + union.update(sketch2); + HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray()); + + HllSketch sketch3 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.3." + i; + sketch3.update(ip); + strings.add(ip); + } + + Union union2 = new Union(12); + + union2.update(sketch_result1); + union2.update(sketch3); + HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray()); + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + System.out.println(sketch2.getEstimate() + "--" + strings.size()); + System.out.println(sketch3.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result1.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result2.getEstimate() + "--" + strings.size()); + + Result result = new Result(); + result.setC2s_pkt_num(10); + result.setS2c_pkt_num(10); + result.setC2s_byte_num(10); + result.setS2c_byte_num(10); + result.setStat_time(1679970031); + result.setSchema_type("HLLSketchMergeTest"); + + //CompactByte + result.setIp_object(sketch_result2.toCompactByteArray()); + System.out.println(result.toString()); + sendMessage(result); + + + //UpdatableByte + result.setIp_object(sketch_result2.toUpdatableByteArray()); + System.out.println(result.toString()); + sendMessage(result); + + } + + + //随机生成ip + private static String makeIPv4Random() { + Random random = new Random(); + int v4_1 = new Random().nextInt(255) + 1; + int v4_2 = new Random().nextInt(255); + int v4_3 = new Random().nextInt(255); + int v4_4 = new Random().nextInt(255); + return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4; + } + + private static void sendMessage(Result result) { + Properties props = new Properties(); + //kafka地址 + props.put("bootstrap.servers", "192.168.44.12:9092"); + props.put("acks", "all"); + props.put("retries", 0); + props.put("linger.ms", 1); + props.put("buffer.memory", 67108864); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(props); + + kafkaProducer.send(new ProducerRecord("TRAFFIC-PROTOCOL-TEST", JSONObject.toJSONString(result))); + + kafkaProducer.close(); + } +} + +class Result { + + private String schema_type; + private long c2s_byte_num; + private long c2s_pkt_num; + private long s2c_byte_num; + private long s2c_pkt_num; + private long stat_time; + private byte[] ip_object; + + public void setSchema_type(String schema_type) { + this.schema_type = schema_type; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } + + public void setIp_object(byte[] ip_object) { + this.ip_object = ip_object; + } + + @Override + public String toString() { + return "Result{" + + "schema_type='" + schema_type + '\'' + + ", c2s_byte_num=" + c2s_byte_num + + ", c2s_pkt_num=" + c2s_pkt_num + + ", s2c_byte_num=" + s2c_byte_num + + ", s2c_pkt_num=" + s2c_pkt_num + + ", stat_time=" + stat_time + + ", ip_object=" + Arrays.toString(ip_object) + + '}'; + } +} \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..9fa8872 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,79 @@ +package com.zdjizhi; + +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonTypeUtil; +import org.junit.Test; + +import java.util.Arrays; + + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/1/617:54 + */ +public class FunctionTest { + public static void main(String[] args) { + String groupKey = "ETHERNET.IPv4.TCP.UNCATEGORIZED.qq_r2@4"; + String protocol = groupKey.substring(0, groupKey.indexOf("@")); + System.out.println(protocol); + StringBuffer stringBuffer = new StringBuffer(); + String appName = "qq_r2"; + String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); + for (String proto : protocolIds) { + if (StringUtil.isBlank(stringBuffer.toString())) { + stringBuffer.append(proto); + System.out.println(stringBuffer.toString()); + } else { + stringBuffer.append(".").append(proto); + if (proto.equals(appName)) { + System.out.println(stringBuffer.toString() + "---" + appName); + } else { + System.out.println(stringBuffer.toString()); + } + } + } + } + + @Test + public void JsonPathTest() { + String json = "{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-7400\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-7400\"}]}"; + String expr = "$.tags[?(@.tag=='data_center')].value"; + Object read = JsonPath.parse(json).read(expr).toString(); + System.out.println(read); + } + + @Test + public void SplitTest() { + String str = "[.]"; + String protocol = "ETHERNET.IPv4.TCP.http.test"; + + System.out.println(Arrays.toString(protocol.split(str))); + + String str2 = "\\."; + System.out.println(Arrays.toString(protocol.split(str2))); + + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < protocol.split(str).length - 1; i++) { + String value = protocol.split(str)[i]; + if (StringUtil.isBlank(stringBuilder.toString())) { + stringBuilder.append(value); + System.out.println(stringBuilder.toString()); + }else { + stringBuilder.append(".").append(value); + System.out.println(stringBuilder.toString()); + } + } + } + + @Test + public void longSumTest() { + Long res1 = JsonTypeUtil.checkLongValue(123); + Long res2 = JsonTypeUtil.checkLongValue("123"); + + System.out.println(res1 + res2); + } +} diff --git a/src/test/java/com/zdjizhi/FunctionsTest.java b/src/test/java/com/zdjizhi/FunctionsTest.java deleted file mode 100644 index 6e3a20b..0000000 --- a/src/test/java/com/zdjizhi/FunctionsTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.zdjizhi; - -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/9/1714:22 - */ -public class FunctionsTest { - private static HashMap metricsMap = JsonParseUtil.getMetricsMap(); - - @Test - public void actionTest() { - HashMap actionMap = JsonParseUtil.getActionMap(); - String[] metricNames = actionMap.getOrDefault("", actionMap.get("Default")); - System.out.println(actionMap.toString()); - System.out.println(Arrays.toString(metricNames)); - - - - } - - - -}