1:增加VSYS统计维度。(TSG-11658)
2:优化处理逻辑。
This commit is contained in:
16
pom.xml
16
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>log-olap-analysis-schema</artifactId>
|
||||
<version>220323-nacos</version>
|
||||
<version>220822-VSYS</version>
|
||||
|
||||
<name>log-olap-analysis-schema</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -194,7 +194,7 @@
|
||||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-all</artifactId>
|
||||
<version>5.5.2</version>
|
||||
<version>5.7.17</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
|
||||
@@ -211,18 +211,6 @@
|
||||
<version>${nacos.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
||||
@@ -58,4 +58,4 @@ nacos.group=Galaxy
|
||||
buffer.timeout=100
|
||||
|
||||
#第一次随机分组random范围
|
||||
random.range.num=40
|
||||
random.range.num=20
|
||||
@@ -1,56 +1,55 @@
|
||||
#--------------------------------地址配置------------------------------#
|
||||
|
||||
#管理kafka地址
|
||||
source.kafka.servers=192.168.40.223:9094,192.168.40.151:9094,192.168.40.152:9094
|
||||
source.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#管理输出kafka地址
|
||||
sink.kafka.servers=192.168.40.223:9094,192.168.40.151:9094,192.168.40.152:9094
|
||||
sink.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#--------------------------------HTTP------------------------------#
|
||||
#kafka 证书地址
|
||||
tools.library=D:\\workerspace\\dat
|
||||
|
||||
#--------------------------------nacos配置------------------------------#
|
||||
#nacos 地址
|
||||
nacos.server=192.168.44.12:8848
|
||||
|
||||
#nacos namespace
|
||||
nacos.schema.namespace=flink
|
||||
nacos.schema.namespace=prod
|
||||
|
||||
#nacos data id
|
||||
nacos.data.id=liveChart_session.json
|
||||
|
||||
#--------------------------------HTTP------------------------------#
|
||||
#kafka 证书地址
|
||||
tools.library=/home/tsg/olap/topology/dat/
|
||||
|
||||
#--------------------------------Kafka消费组信息------------------------------#
|
||||
|
||||
#kafka 接收数据topic
|
||||
source.kafka.topic=SESSION-RECORD
|
||||
|
||||
#补全数据 输出 topic
|
||||
sink.kafka.topic=TRAFFIC-PROTOCOL-STAT
|
||||
sink.kafka.topic=test-result
|
||||
|
||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||
group.id=liveCharts-session-20211105-1
|
||||
group.id=livecharts-test-20220816-1
|
||||
|
||||
#--------------------------------topology配置------------------------------#
|
||||
|
||||
#consumer 并行度
|
||||
source.parallelism=9
|
||||
source.parallelism=1
|
||||
|
||||
#map函数并行度
|
||||
parse.parallelism=27
|
||||
parse.parallelism=1
|
||||
|
||||
#第一次窗口计算并行度
|
||||
first.window.parallelism=27
|
||||
first.window.parallelism=1
|
||||
|
||||
#第二次窗口计算并行度
|
||||
second.window.parallelism=27
|
||||
second.window.parallelism=1
|
||||
|
||||
#producer 并行度
|
||||
sink.parallelism=9
|
||||
sink.parallelism=1
|
||||
|
||||
##初次随机预聚合窗口时间
|
||||
#初次随机预聚合窗口时间
|
||||
first.count.window.time=5
|
||||
|
||||
#二次聚合窗口时间
|
||||
second.count.window.time=15
|
||||
|
||||
|
||||
@@ -4,6 +4,11 @@ import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.StreamAggregateConfig;
|
||||
import com.zdjizhi.utils.functions.*;
|
||||
import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction;
|
||||
import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction;
|
||||
import com.zdjizhi.utils.functions.parse.ParseMapFunction;
|
||||
import com.zdjizhi.utils.functions.statistics.FirstCountWindowFunction;
|
||||
import com.zdjizhi.utils.functions.statistics.SecondCountWindowFunction;
|
||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import com.zdjizhi.utils.kafka.KafkaProducer;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
|
||||
@@ -1,154 +0,0 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.jayway.jsonpath.InvalidPathException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.zdjizhi.common.StreamAggregateConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.ParseFunctions;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, String>> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Tuple3<String, String, String> map(String message) {
|
||||
try {
|
||||
ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
|
||||
HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap();
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
|
||||
if (ParseFunctions.filterLogs(object)) {
|
||||
for (String[] strings : jobList) {
|
||||
//函数名称
|
||||
String function = strings[0];
|
||||
//需要补全的字段的key
|
||||
String resultKeyName = strings[1];
|
||||
//原始日志字段key
|
||||
String logsKeyName = strings[2];
|
||||
//原始日志字段对应的值
|
||||
Object logsKeyValue = JsonParseUtil.getValue(object, strings[2]);
|
||||
//额外的参数的值
|
||||
String parameters = strings[3];
|
||||
|
||||
switch (function) {
|
||||
case "dismantling":
|
||||
if (StringUtil.isNotBlank(parameters)) {
|
||||
if (logsKeyValue != null) {
|
||||
JsonParseUtil.setValue(message, logsKeyName, dismantlingUtils(parameters, logsKeyValue));
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "combination":
|
||||
if (logsKeyValue != null) {
|
||||
if (StringUtil.isNotBlank(parameters)) {
|
||||
combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "flattenSpec":
|
||||
if (logsKeyValue != null) {
|
||||
if (StringUtil.isNotBlank(parameters)) {
|
||||
flattenSpec(dimensionsObj, object, parameters, resultKeyName, logsKeyName);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "hierarchy":
|
||||
String key = JsonParseUtil.getString(object, logsKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
||||
return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(object));
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
|
||||
return new Tuple3<>("", "", "");
|
||||
}
|
||||
return new Tuple3<>("", "", "");
|
||||
}
|
||||
|
||||
/**
|
||||
* alignment ID替换操作
|
||||
* 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。
|
||||
*
|
||||
* @param parameters 参数集
|
||||
* @param fieldName 原始日志列名
|
||||
*/
|
||||
private static String dismantlingUtils(String parameters, Object fieldName) {
|
||||
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
|
||||
int digits = Integer.parseInt(alignmentPars[0]);
|
||||
return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits];
|
||||
}
|
||||
|
||||
/**
|
||||
* combination 拼接操作
|
||||
* 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
|
||||
*
|
||||
* @param parameters 参数集
|
||||
* @param message 原始日志
|
||||
* @param fieldName 原始日志列名
|
||||
*/
|
||||
private static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String fieldName) {
|
||||
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
|
||||
Object combinationField = JsonParseUtil.getValue(message, combinationPars[0]);
|
||||
if (combinationField != null) {
|
||||
String separator = combinationPars[1];
|
||||
Object fieldNameValue = JsonParseUtil.getValue(message, fieldName);
|
||||
if (fieldNameValue != null) {
|
||||
String combinationValue = fieldNameValue + separator + combinationField;
|
||||
dimensions.put(resultKeyName, combinationValue);
|
||||
JsonParseUtil.setValue(message, fieldName, combinationValue);
|
||||
} else {
|
||||
dimensions.put(resultKeyName, combinationField);
|
||||
JsonParseUtil.setValue(message, fieldName, combinationField);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据表达式解析json
|
||||
*
|
||||
* @param message json
|
||||
* @param expr 解析表达式
|
||||
* @return 解析结果
|
||||
*/
|
||||
private static void flattenSpec(Map<String, Object> dimensions, Map<String, Object> message, String expr, String resultKeyName, String fieldName) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(expr)) {
|
||||
String operateValue = JsonParseUtil.getString(message, fieldName);
|
||||
ArrayList<String> read = JsonPath.parse(operateValue).read(expr);
|
||||
if (read.size() >= 1) {
|
||||
String flattenResult = read.get(0);
|
||||
dimensions.put(resultKeyName, flattenResult);
|
||||
}
|
||||
}
|
||||
} catch (ClassCastException | InvalidPathException e) {
|
||||
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("json表达式解析异常,异常信息:" + e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
package com.zdjizhi.utils.functions.filter;
|
||||
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.flink.api.common.functions.FilterFunction;
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
package com.zdjizhi.utils.functions.keyby;
|
||||
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.zdjizhi.common.StreamAggregateConfig;
|
||||
@@ -16,7 +16,7 @@ public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, S
|
||||
|
||||
@Override
|
||||
public String getKey(Tuple3<String, String, String> value) throws Exception {
|
||||
// //以map拼接的key分组
|
||||
//以map拼接的key分组
|
||||
return value.f0;
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
package com.zdjizhi.utils.functions.keyby;
|
||||
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.zdjizhi.common.StreamAggregateConfig;
|
||||
@@ -16,7 +16,6 @@ public class SecondKeyByFunction implements KeySelector<Tuple2<String,String>, S
|
||||
|
||||
@Override
|
||||
public String getKey(Tuple2<String, String> value) throws Exception {
|
||||
|
||||
//以map拼接的key分组
|
||||
return value.f0;
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
package com.zdjizhi.utils.functions.parse;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.StreamAggregateConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.ParseFunctions;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, String>> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Tuple3<String, String, String> map(String message) {
|
||||
try {
|
||||
ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
|
||||
HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap();
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
Map<String, Object> originalLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, originalLog);
|
||||
if (ParseFunctions.filterLogs(originalLog)) {
|
||||
for (String[] strings : jobList) {
|
||||
//函数名称
|
||||
String function = strings[0];
|
||||
//结果集字段key
|
||||
String resultKeyName = strings[1];
|
||||
//原始日志字段key
|
||||
String logsKeyName = strings[2];
|
||||
//额外的参数的值
|
||||
String parameters = strings[3];
|
||||
|
||||
//原始日志字段对应的值
|
||||
Object logsValue = JsonParseUtil.getValue(originalLog, logsKeyName);
|
||||
|
||||
switch (function) {
|
||||
case "combination":
|
||||
if (logsValue != null) {
|
||||
if (StringUtil.isNotBlank(parameters)) {
|
||||
ParseFunctions.combinationUtils(dimensionsObj, originalLog, parameters, resultKeyName, logsKeyName);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "flattenSpec":
|
||||
if (logsValue != null) {
|
||||
if (StringUtil.isNotBlank(parameters)) {
|
||||
ParseFunctions.flattenSpec(dimensionsObj, parameters, resultKeyName, logsValue.toString());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "hierarchy":
|
||||
String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
||||
return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog));
|
||||
// case "dismantling":
|
||||
// if (StringUtil.isNotBlank(parameters)) {
|
||||
// if (logsValue != null) {
|
||||
// JsonParseUtil.setValue(message, logsKeyName, ParseFunctions.dismantlingUtils(parameters, logsValue));
|
||||
// }
|
||||
// }
|
||||
// break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
|
||||
return new Tuple3<>("", "", "");
|
||||
}
|
||||
return new Tuple3<>("", "", "");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,8 +1,9 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
package com.zdjizhi.utils.functions.statistics;
|
||||
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.MetricFunctions;
|
||||
import com.zdjizhi.utils.general.ParseFunctions;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
@@ -24,7 +25,7 @@ import java.util.Map;
|
||||
public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, Tuple2<String, String>, String, TimeWindow> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
|
||||
|
||||
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320);
|
||||
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(16);
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -33,22 +34,22 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<Strin
|
||||
HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap();
|
||||
HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
|
||||
for (Tuple3<String, String, String> tuple : input) {
|
||||
String label = tuple.f0;
|
||||
String groupKey = tuple.f0;
|
||||
String protocol = groupKey.substring(0, groupKey.indexOf("@"));
|
||||
String dimensions = tuple.f1;
|
||||
String message = tuple.f2;
|
||||
String l7Protocol = label.substring(0, label.indexOf("@"));
|
||||
//action中某个协议的所有function,如果没有就默认
|
||||
String[] metricNames = actionMap.getOrDefault(l7Protocol, actionMap.get("Default"));
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
String metrics = tuple.f2;
|
||||
//action中某个协议的所有action,如果没有就默认
|
||||
String[] protocolMetrics = actionMap.getOrDefault(protocol, actionMap.get("Default"));
|
||||
if (StringUtil.isNotBlank(metrics)) {
|
||||
Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
|
||||
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
Map<String, Object> metricsObj = (Map<String, Object>) JsonMapper.fromJsonString(metrics, Map.class);
|
||||
|
||||
Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
|
||||
for (String name : metricNames) {
|
||||
String[] metrics = metricsMap.get(name);
|
||||
String function = metrics[0];
|
||||
String fieldName = metrics[1];
|
||||
functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, fieldName));
|
||||
for (String protocolMetric : protocolMetrics) {
|
||||
String[] functions = metricsMap.get(protocolMetric);
|
||||
String function = functions[0];
|
||||
String fieldName = functions[1];
|
||||
functionSet(function, cacheMessage, protocolMetric, cacheMessage.get(protocolMetric), JsonParseUtil.getValue(metricsObj, fieldName));
|
||||
|
||||
}
|
||||
cacheMap.put(dimensions, cacheMessage);
|
||||
@@ -58,6 +59,7 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<Strin
|
||||
if (!cacheMap.isEmpty()) {
|
||||
for (String dimensions : cacheMap.keySet()) {
|
||||
Map<String, Object> resultMap = cacheMap.get(dimensions);
|
||||
System.out.println("resultMap"+resultMap.toString());
|
||||
output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap)));
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,10 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
package com.zdjizhi.utils.functions.statistics;
|
||||
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.MetricFunctions;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
@@ -14,7 +13,6 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
@@ -25,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class);
|
||||
|
||||
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320);
|
||||
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(16);
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -46,7 +44,6 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri
|
||||
functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name));
|
||||
|
||||
}
|
||||
|
||||
cacheMap.put(dimensions, cacheMessage);
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,9 @@ package com.zdjizhi.utils.general;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.jayway.jsonpath.InvalidPathException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.zdjizhi.common.StreamAggregateConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
@@ -11,30 +14,15 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* @ClassNameAggregateUtils
|
||||
* @Author lixkvip@126.com
|
||||
* @Date2020/6/23 14:04
|
||||
* @Version V1.0
|
||||
**/
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/7/2113:55
|
||||
*/
|
||||
public class ParseFunctions {
|
||||
|
||||
/**
|
||||
* 解析 dimensions 字段集
|
||||
*
|
||||
* @param dimensions 维度集
|
||||
* @param message 原始日志
|
||||
* @return 结果维度集
|
||||
*/
|
||||
public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> message) {
|
||||
HashMap<String, Object> dimensionsObj = new HashMap<>(16);
|
||||
|
||||
for (String dimension : dimensions.keySet()) {
|
||||
dimensionsObj.put(dimension, JsonParseUtil.getValue(message, dimensions.get(dimension)));
|
||||
}
|
||||
|
||||
return dimensionsObj;
|
||||
}
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
/**
|
||||
* 构建filters过滤函数,根据Schema指定的函数对日志进行过滤
|
||||
@@ -59,20 +47,95 @@ public class ParseFunctions {
|
||||
return available;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析 dimensions 字段集
|
||||
*
|
||||
* @param dimensions 维度集
|
||||
* @param originalLog 原始日志
|
||||
* @return 结果维度集
|
||||
*/
|
||||
public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> originalLog) {
|
||||
HashMap<String, Object> dimensionsObj = new HashMap<>(16);
|
||||
|
||||
for (String dimension : dimensions.keySet()) {
|
||||
dimensionsObj.put(dimension, JsonParseUtil.getValue(originalLog, dimensions.get(dimension)));
|
||||
}
|
||||
|
||||
return dimensionsObj;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据原始日志字段,生成schema内指定的metrics指标json。
|
||||
*
|
||||
* @param object 原始日志json
|
||||
* @param originalLog 原始日志json
|
||||
* @return 统计metrics json
|
||||
*/
|
||||
public static String getMetricsLog(Map<String, Object> object) {
|
||||
public static String getMetricsLog(Map<String, Object> originalLog) {
|
||||
Map<String, Object> json = new HashMap<>(16);
|
||||
|
||||
for (String fileName : JsonParseUtil.getMetricsFiledNameList()) {
|
||||
json.put(fileName, object.get(fileName));
|
||||
for (String logsKeyName : JsonParseUtil.getMetricsFiledNameList()) {
|
||||
json.put(logsKeyName, originalLog.get(logsKeyName));
|
||||
}
|
||||
|
||||
return JsonMapper.toJsonString(json);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* alignment ID替换操作
|
||||
* 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。
|
||||
*
|
||||
* @param parameters 参数集
|
||||
* @param fieldName 原始日志列名
|
||||
*/
|
||||
public static String dismantlingUtils(String parameters, Object fieldName) {
|
||||
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
|
||||
int digits = Integer.parseInt(alignmentPars[0]);
|
||||
return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits];
|
||||
}
|
||||
|
||||
/**
|
||||
* combination 拼接操作
|
||||
* 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
|
||||
*
|
||||
* @param parameters 参数集
|
||||
* @param message 原始日志
|
||||
* @param logsKeyName 原始日志列名
|
||||
*/
|
||||
public static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String logsKeyName) {
|
||||
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
|
||||
String combinationFieldKey = combinationPars[0];
|
||||
String separator = combinationPars[1];
|
||||
Object combinationFieldValue = JsonParseUtil.getValue(message, combinationFieldKey);
|
||||
if (combinationFieldValue != null) {
|
||||
Object logsFieldValue = JsonParseUtil.getValue(message, logsKeyName);
|
||||
String combinationResult = logsFieldValue + separator + combinationFieldValue;
|
||||
JsonParseUtil.setValue(dimensions, resultKeyName, combinationResult);
|
||||
JsonParseUtil.setValue(message, logsKeyName, combinationResult);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据表达式解析json
|
||||
* <p>
|
||||
* // * @param message json
|
||||
*
|
||||
* @param expr 解析表达式
|
||||
* @return 解析结果
|
||||
*/
|
||||
public static void flattenSpec(Map<String, Object> dimensions, String expr, String resultKeyName, String logsKeyValue) {
|
||||
|
||||
try {
|
||||
if (StringUtil.isNotBlank(expr)) {
|
||||
ArrayList<Object> read = JsonPath.parse(logsKeyValue).read(expr);
|
||||
if (read.size() >= 1) {
|
||||
String flattenResult = read.get(0).toString();
|
||||
dimensions.put(resultKeyName, flattenResult);
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("The label resolution exception or [expr] analytic expression error,info:" + e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -102,14 +102,14 @@ public class JsonParseUtil {
|
||||
* 获取属性值的方法
|
||||
*
|
||||
* @param jsonMap 原始日志
|
||||
* @param property key
|
||||
* @param key josn key名称
|
||||
* @return 属性的值
|
||||
*/
|
||||
public static Object getValue(Map<String, Object> jsonMap, String property) {
|
||||
public static Object getValue(Map<String, Object> jsonMap, String key) {
|
||||
try {
|
||||
return jsonMap.getOrDefault(property, null);
|
||||
return jsonMap.getOrDefault(key, null);
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("Get the JSON value is abnormal,The key is :" + property + "error message is :" + e);
|
||||
logger.error("Get the JSON value is abnormal,The key is :" + key + "error message is :" + e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -162,22 +162,6 @@ public class JsonParseUtil {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新属性值的方法
|
||||
*
|
||||
* @param obj 对象
|
||||
* @param property 更新的key
|
||||
* @param value 更新的值
|
||||
*/
|
||||
public static void setValue(Object obj, String property, Object value) {
|
||||
try {
|
||||
BeanMap beanMap = BeanMap.create(obj);
|
||||
beanMap.put(property, value);
|
||||
} catch (ClassCastException e) {
|
||||
logger.error("赋予实体类错误类型数据", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
||||
* 用于反射生成schema类型的对象的一个map集合
|
||||
|
||||
Reference in New Issue
Block a user