增加flattenSpec函数,使用jsonpath解析json字段。

This commit is contained in:
qidaijie
2021-09-28 14:17:20 +08:00
parent 99bf45cdbc
commit e4e4fa2893
2 changed files with 34 additions and 4 deletions

View File

@@ -29,7 +29,7 @@ public class StreamAggregateTopology {
try { try {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(5000); // environment.enableCheckpointing(5000);
DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()) DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM); .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM);

View File

@@ -1,5 +1,7 @@
package com.zdjizhi.utils.functions; package com.zdjizhi.utils.functions;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.common.StreamAggregateConfig;
import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
@@ -58,14 +60,20 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri
} }
break; break;
case "combination": case "combination":
if (StringUtil.isNotBlank(parameters)) { if (logsKeyValue != null) {
if (logsKeyValue != null) { if (StringUtil.isNotBlank(parameters)) {
combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName); combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName);
} }
} }
break; break;
case "flattenSpec":
if (logsKeyValue != null) {
if (StringUtil.isNotBlank(parameters)) {
flattenSpec(dimensionsObj, object, parameters, resultKeyName, logsKeyName);
}
}
break;
case "hierarchy": case "hierarchy":
// collector.emit(new Values(JsonParseUtil.getValue(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)));
return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
default: default:
break; break;
@@ -119,4 +127,26 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri
} }
} }
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
private static void flattenSpec(Map<String, Object> dimensions, Map<String, Object> message, String expr, String resultKeyName, String fieldName) {
try {
if (StringUtil.isNotBlank(expr)) {
String operateValue = JsonParseUtil.getString(message, fieldName);
ArrayList<String> read = JsonPath.parse(operateValue).read(expr);
String flattenResult = read.get(0);
dimensions.put(resultKeyName, flattenResult);
}
} catch (ClassCastException | InvalidPathException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
} catch (RuntimeException e) {
logger.error("json表达式解析异常异常信息:" + e);
}
}
} }