From b8550623410e3442e3b0e17d4186af085f8b71b1 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Fri, 26 Aug 2022 11:49:52 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=9A=E5=A2=9E=E5=8A=A0VSYS=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E7=BB=B4=E5=BA=A6=E3=80=82(TSG-11658)=202=EF=BC=9A?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 16 +- properties/default_config.properties | 2 +- properties/service_flow_config.properties | 31 ++-- .../topology/StreamAggregateTopology.java | 5 + .../utils/functions/ParseMapFunction.java | 154 ------------------ .../functions/ResultFlatMapFunction.java | 1 + .../{ => filter}/FilterNullFunction.java | 2 +- .../{ => keyby}/FirstKeyByFunction.java | 4 +- .../{ => keyby}/SecondKeyByFunction.java | 3 +- .../functions/parse/ParseMapFunction.java | 89 ++++++++++ .../FirstCountWindowFunction.java | 30 ++-- .../SecondCountWindowFunction.java | 7 +- .../zdjizhi/utils/general/ParseFunctions.java | 115 ++++++++++--- .../com/zdjizhi/utils/json/JsonParseUtil.java | 24 +-- 14 files changed, 228 insertions(+), 255 deletions(-) delete mode 100644 src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java rename src/main/java/com/zdjizhi/utils/functions/{ => filter}/FilterNullFunction.java (89%) rename src/main/java/com/zdjizhi/utils/functions/{ => keyby}/FirstKeyByFunction.java (88%) rename src/main/java/com/zdjizhi/utils/functions/{ => keyby}/SecondKeyByFunction.java (93%) create mode 100644 src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java rename src/main/java/com/zdjizhi/utils/functions/{ => statistics}/FirstCountWindowFunction.java (74%) rename src/main/java/com/zdjizhi/utils/functions/{ => statistics}/SecondCountWindowFunction.java (96%) diff --git a/pom.xml b/pom.xml index 3b7fdc1..e4f3384 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-olap-analysis-schema - 220323-nacos + 220822-VSYS log-olap-analysis-schema http://www.example.com @@ -194,7 +194,7 @@ cn.hutool hutool-all - 5.5.2 + 5.7.17 @@ -211,18 +211,6 @@ ${nacos.version} - - org.slf4j - slf4j-api - 1.7.21 - - - - org.slf4j - slf4j-log4j12 - 1.7.21 - - junit junit diff --git a/properties/default_config.properties b/properties/default_config.properties index 1637bb8..c0f8aef 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -58,4 +58,4 @@ nacos.group=Galaxy buffer.timeout=100 #第一次随机分组random范围 -random.range.num=40 \ No newline at end of file +random.range.num=20 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 9ab7852..97438d9 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,56 +1,55 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.40.223:9094,192.168.40.151:9094,192.168.40.152:9094 +source.kafka.servers=192.168.44.12:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.40.223:9094,192.168.40.151:9094,192.168.40.152:9094 +sink.kafka.servers=192.168.44.12:9094 + +#--------------------------------HTTP------------------------------# +#kafka 证书地址 +tools.library=D:\\workerspace\\dat #--------------------------------nacos配置------------------------------# #nacos 地址 nacos.server=192.168.44.12:8848 #nacos namespace -nacos.schema.namespace=flink +nacos.schema.namespace=prod #nacos data id nacos.data.id=liveChart_session.json -#--------------------------------HTTP------------------------------# -#kafka 证书地址 -tools.library=/home/tsg/olap/topology/dat/ - #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic source.kafka.topic=SESSION-RECORD #补全数据 输出 topic -sink.kafka.topic=TRAFFIC-PROTOCOL-STAT +sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=liveCharts-session-20211105-1 +group.id=livecharts-test-20220816-1 #--------------------------------topology配置------------------------------# #consumer 并行度 -source.parallelism=9 +source.parallelism=1 #map函数并行度 -parse.parallelism=27 +parse.parallelism=1 #第一次窗口计算并行度 -first.window.parallelism=27 +first.window.parallelism=1 #第二次窗口计算并行度 -second.window.parallelism=27 +second.window.parallelism=1 #producer 并行度 -sink.parallelism=9 +sink.parallelism=1 -##初次随机预聚合窗口时间 +#初次随机预聚合窗口时间 first.count.window.time=5 #二次聚合窗口时间 second.count.window.time=15 - diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index 3221164..c2d4f31 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -4,6 +4,11 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.functions.*; +import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction; +import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction; +import com.zdjizhi.utils.functions.parse.ParseMapFunction; +import com.zdjizhi.utils.functions.statistics.FirstCountWindowFunction; +import com.zdjizhi.utils.functions.statistics.SecondCountWindowFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple2; diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java deleted file mode 100644 index c8c5aa8..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java +++ /dev/null @@ -1,154 +0,0 @@ -package com.zdjizhi.utils.functions; - -import cn.hutool.core.util.RandomUtil; -import com.jayway.jsonpath.InvalidPathException; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.general.ParseFunctions; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple3; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class ParseMapFunction implements MapFunction> { - private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); - - @Override - @SuppressWarnings("unchecked") - public Tuple3 map(String message) { - try { - ArrayList jobList = JsonParseUtil.getTransformsList(); - HashMap dimensionsMap = JsonParseUtil.getDimensionsMap(); - if (StringUtil.isNotBlank(message)) { - Map object = (Map) JsonMapper.fromJsonString(message, Map.class); - Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); - if (ParseFunctions.filterLogs(object)) { - for (String[] strings : jobList) { - //函数名称 - String function = strings[0]; - //需要补全的字段的key - String resultKeyName = strings[1]; - //原始日志字段key - String logsKeyName = strings[2]; - //原始日志字段对应的值 - Object logsKeyValue = JsonParseUtil.getValue(object, strings[2]); - //额外的参数的值 - String parameters = strings[3]; - - switch (function) { - case "dismantling": - if (StringUtil.isNotBlank(parameters)) { - if (logsKeyValue != null) { - JsonParseUtil.setValue(message, logsKeyName, dismantlingUtils(parameters, logsKeyValue)); - } - } - break; - case "combination": - if (logsKeyValue != null) { - if (StringUtil.isNotBlank(parameters)) { - combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName); - } - } - break; - case "flattenSpec": - if (logsKeyValue != null) { - if (StringUtil.isNotBlank(parameters)) { - flattenSpec(dimensionsObj, object, parameters, resultKeyName, logsKeyName); - } - } - break; - case "hierarchy": - String key = JsonParseUtil.getString(object, logsKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM); - return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(object)); - default: - break; - } - } - } - } - } catch (RuntimeException e) { - logger.error("An error occurred in the original log parsing reorganization,error message is:" + e); - return new Tuple3<>("", "", ""); - } - return new Tuple3<>("", "", ""); - } - - /** - * alignment ID替换操作 - * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。 - * - * @param parameters 参数集 - * @param fieldName 原始日志列名 - */ - private static String dismantlingUtils(String parameters, Object fieldName) { - String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - int digits = Integer.parseInt(alignmentPars[0]); - return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; - } - - /** - * combination 拼接操作 - * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符 - * - * @param parameters 参数集 - * @param message 原始日志 - * @param fieldName 原始日志列名 - */ - private static void combinationUtils(Map dimensions, Map message, String parameters, String resultKeyName, String fieldName) { - String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - Object combinationField = JsonParseUtil.getValue(message, combinationPars[0]); - if (combinationField != null) { - String separator = combinationPars[1]; - Object fieldNameValue = JsonParseUtil.getValue(message, fieldName); - if (fieldNameValue != null) { - String combinationValue = fieldNameValue + separator + combinationField; - dimensions.put(resultKeyName, combinationValue); - JsonParseUtil.setValue(message, fieldName, combinationValue); - } else { - dimensions.put(resultKeyName, combinationField); - JsonParseUtil.setValue(message, fieldName, combinationField); - } - } - } - - /** - * 根据表达式解析json - * - * @param message json - * @param expr 解析表达式 - * @return 解析结果 - */ - private static void flattenSpec(Map dimensions, Map message, String expr, String resultKeyName, String fieldName) { - try { - if (StringUtil.isNotBlank(expr)) { - String operateValue = JsonParseUtil.getString(message, fieldName); - ArrayList read = JsonPath.parse(operateValue).read(expr); - if (read.size() >= 1) { - String flattenResult = read.get(0); - dimensions.put(resultKeyName, flattenResult); - } - } - } catch (ClassCastException | InvalidPathException e) { - logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e); - } catch (RuntimeException e) { - logger.error("json表达式解析异常,异常信息:" + e); - } - } - -} diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java index d458984..8ff8839 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java @@ -7,6 +7,7 @@ import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; +import java.util.Arrays; import java.util.Map; /** diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java similarity index 89% rename from src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java rename to src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java index de507ad..6c83b38 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.utils.functions.filter; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.FilterFunction; diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java similarity index 88% rename from src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java rename to src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java index 831c90a..7783676 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.utils.functions.keyby; import cn.hutool.core.util.RandomUtil; import com.zdjizhi.common.StreamAggregateConfig; @@ -16,7 +16,7 @@ public class FirstKeyByFunction implements KeySelector value) throws Exception { -// //以map拼接的key分组 + //以map拼接的key分组 return value.f0; } } diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java similarity index 93% rename from src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java rename to src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java index c27bd04..fd81d6e 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.utils.functions.keyby; import cn.hutool.core.util.RandomUtil; import com.zdjizhi.common.StreamAggregateConfig; @@ -16,7 +16,6 @@ public class SecondKeyByFunction implements KeySelector, S @Override public String getKey(Tuple2 value) throws Exception { - //以map拼接的key分组 return value.f0; } diff --git a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java new file mode 100644 index 0000000..d1fc36d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java @@ -0,0 +1,89 @@ +package com.zdjizhi.utils.functions.parse; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.ParseFunctions; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class ParseMapFunction implements MapFunction> { + private static final Log logger = LogFactory.get(); + + @Override + @SuppressWarnings("unchecked") + public Tuple3 map(String message) { + try { + ArrayList jobList = JsonParseUtil.getTransformsList(); + HashMap dimensionsMap = JsonParseUtil.getDimensionsMap(); + if (StringUtil.isNotBlank(message)) { + Map originalLog = (Map) JsonMapper.fromJsonString(message, Map.class); + Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, originalLog); + if (ParseFunctions.filterLogs(originalLog)) { + for (String[] strings : jobList) { + //函数名称 + String function = strings[0]; + //结果集字段key + String resultKeyName = strings[1]; + //原始日志字段key + String logsKeyName = strings[2]; + //额外的参数的值 + String parameters = strings[3]; + + //原始日志字段对应的值 + Object logsValue = JsonParseUtil.getValue(originalLog, logsKeyName); + + switch (function) { + case "combination": + if (logsValue != null) { + if (StringUtil.isNotBlank(parameters)) { + ParseFunctions.combinationUtils(dimensionsObj, originalLog, parameters, resultKeyName, logsKeyName); + } + } + break; + case "flattenSpec": + if (logsValue != null) { + if (StringUtil.isNotBlank(parameters)) { + ParseFunctions.flattenSpec(dimensionsObj, parameters, resultKeyName, logsValue.toString()); + } + } + break; + case "hierarchy": + String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM); + return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog)); +// case "dismantling": +// if (StringUtil.isNotBlank(parameters)) { +// if (logsValue != null) { +// JsonParseUtil.setValue(message, logsKeyName, ParseFunctions.dismantlingUtils(parameters, logsValue)); +// } +// } +// break; + default: + break; + } + } + } + } + } catch (RuntimeException e) { + logger.error("An error occurred in the original log parsing reorganization,error message is:" + e); + return new Tuple3<>("", "", ""); + } + return new Tuple3<>("", "", ""); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java similarity index 74% rename from src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java rename to src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java index e0e8a2e..92f6697 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java @@ -1,8 +1,9 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.utils.functions.statistics; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.MetricFunctions; +import com.zdjizhi.utils.general.ParseFunctions; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -24,7 +25,7 @@ import java.util.Map; public class FirstCountWindowFunction extends ProcessWindowFunction, Tuple2, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); - private HashMap> cacheMap = new HashMap<>(320); + private HashMap> cacheMap = new HashMap<>(16); @Override @SuppressWarnings("unchecked") @@ -33,22 +34,22 @@ public class FirstCountWindowFunction extends ProcessWindowFunction metricsMap = JsonParseUtil.getMetricFunctionsMap(); HashMap actionMap = JsonParseUtil.getActionMap(); for (Tuple3 tuple : input) { - String label = tuple.f0; + String groupKey = tuple.f0; + String protocol = groupKey.substring(0, groupKey.indexOf("@")); String dimensions = tuple.f1; - String message = tuple.f2; - String l7Protocol = label.substring(0, label.indexOf("@")); - //action中某个协议的所有function,如果没有就默认 - String[] metricNames = actionMap.getOrDefault(l7Protocol, actionMap.get("Default")); - if (StringUtil.isNotBlank(message)) { + String metrics = tuple.f2; + //action中某个协议的所有action,如果没有就默认 + String[] protocolMetrics = actionMap.getOrDefault(protocol, actionMap.get("Default")); + if (StringUtil.isNotBlank(metrics)) { Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class); - Map object = (Map) JsonMapper.fromJsonString(message, Map.class); + Map metricsObj = (Map) JsonMapper.fromJsonString(metrics, Map.class); Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); - for (String name : metricNames) { - String[] metrics = metricsMap.get(name); - String function = metrics[0]; - String fieldName = metrics[1]; - functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, fieldName)); + for (String protocolMetric : protocolMetrics) { + String[] functions = metricsMap.get(protocolMetric); + String function = functions[0]; + String fieldName = functions[1]; + functionSet(function, cacheMessage, protocolMetric, cacheMessage.get(protocolMetric), JsonParseUtil.getValue(metricsObj, fieldName)); } cacheMap.put(dimensions, cacheMessage); @@ -58,6 +59,7 @@ public class FirstCountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(dimensions); + System.out.println("resultMap"+resultMap.toString()); output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap))); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java similarity index 96% rename from src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java rename to src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java index ae8edb3..c0b2091 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java @@ -1,11 +1,10 @@ -package com.zdjizhi.utils.functions; +package com.zdjizhi.utils.functions.statistics; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.MetricFunctions; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -14,7 +13,6 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie @@ -25,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; public class SecondCountWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class); - private HashMap> cacheMap = new HashMap<>(320); + private HashMap> cacheMap = new HashMap<>(16); @Override @SuppressWarnings("unchecked") @@ -46,7 +44,6 @@ public class SecondCountWindowFunction extends ProcessWindowFunction transDimensions(Map dimensions, Map message) { - HashMap dimensionsObj = new HashMap<>(16); - - for (String dimension : dimensions.keySet()) { - dimensionsObj.put(dimension, JsonParseUtil.getValue(message, dimensions.get(dimension))); - } - - return dimensionsObj; - } + private static final Log logger = LogFactory.get(); /** * 构建filters过滤函数,根据Schema指定的函数对日志进行过滤 @@ -59,20 +47,95 @@ public class ParseFunctions { return available; } + /** + * 解析 dimensions 字段集 + * + * @param dimensions 维度集 + * @param originalLog 原始日志 + * @return 结果维度集 + */ + public static Map transDimensions(Map dimensions, Map originalLog) { + HashMap dimensionsObj = new HashMap<>(16); + + for (String dimension : dimensions.keySet()) { + dimensionsObj.put(dimension, JsonParseUtil.getValue(originalLog, dimensions.get(dimension))); + } + + return dimensionsObj; + } + /** * 根据原始日志字段,生成schema内指定的metrics指标json。 * - * @param object 原始日志json + * @param originalLog 原始日志json * @return 统计metrics json */ - public static String getMetricsLog(Map object) { + public static String getMetricsLog(Map originalLog) { Map json = new HashMap<>(16); - for (String fileName : JsonParseUtil.getMetricsFiledNameList()) { - json.put(fileName, object.get(fileName)); + for (String logsKeyName : JsonParseUtil.getMetricsFiledNameList()) { + json.put(logsKeyName, originalLog.get(logsKeyName)); } return JsonMapper.toJsonString(json); } + + /** + * alignment ID替换操作 + * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。 + * + * @param parameters 参数集 + * @param fieldName 原始日志列名 + */ + public static String dismantlingUtils(String parameters, Object fieldName) { + String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + int digits = Integer.parseInt(alignmentPars[0]); + return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; + } + + /** + * combination 拼接操作 + * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符 + * + * @param parameters 参数集 + * @param message 原始日志 + * @param logsKeyName 原始日志列名 + */ + public static void combinationUtils(Map dimensions, Map message, String parameters, String resultKeyName, String logsKeyName) { + String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + String combinationFieldKey = combinationPars[0]; + String separator = combinationPars[1]; + Object combinationFieldValue = JsonParseUtil.getValue(message, combinationFieldKey); + if (combinationFieldValue != null) { + Object logsFieldValue = JsonParseUtil.getValue(message, logsKeyName); + String combinationResult = logsFieldValue + separator + combinationFieldValue; + JsonParseUtil.setValue(dimensions, resultKeyName, combinationResult); + JsonParseUtil.setValue(message, logsKeyName, combinationResult); + } + } + + /** + * 根据表达式解析json + *

+ * // * @param message json + * + * @param expr 解析表达式 + * @return 解析结果 + */ + public static void flattenSpec(Map dimensions, String expr, String resultKeyName, String logsKeyValue) { + + try { + if (StringUtil.isNotBlank(expr)) { + ArrayList read = JsonPath.parse(logsKeyValue).read(expr); + if (read.size() >= 1) { + String flattenResult = read.get(0).toString(); + dimensions.put(resultKeyName, flattenResult); + } + } + } catch (RuntimeException e) { + logger.error("The label resolution exception or [expr] analytic expression error,info:" + e); + } + } + } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index b133368..4b3f75a 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -102,14 +102,14 @@ public class JsonParseUtil { * 获取属性值的方法 * * @param jsonMap 原始日志 - * @param property key + * @param key josn key名称 * @return 属性的值 */ - public static Object getValue(Map jsonMap, String property) { + public static Object getValue(Map jsonMap, String key) { try { - return jsonMap.getOrDefault(property, null); + return jsonMap.getOrDefault(key, null); } catch (RuntimeException e) { - logger.error("Get the JSON value is abnormal,The key is :" + property + "error message is :" + e); + logger.error("Get the JSON value is abnormal,The key is :" + key + "error message is :" + e); return null; } } @@ -162,22 +162,6 @@ public class JsonParseUtil { } } - /** - * 更新属性值的方法 - * - * @param obj 对象 - * @param property 更新的key - * @param value 更新的值 - */ - public static void setValue(Object obj, String property, Object value) { - try { - BeanMap beanMap = BeanMap.create(obj); - beanMap.put(property, value); - } catch (ClassCastException e) { - logger.error("赋予实体类错误类型数据", e); - } - } - /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 * 用于反射生成schema类型的对象的一个map集合