diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index 301d862..3d9eb3e 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -29,7 +29,7 @@ public class StreamAggregateTopology { try { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - environment.enableCheckpointing(5000); +// environment.enableCheckpointing(5000); DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM); diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java index 41a6109..5a5741e 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java @@ -1,5 +1,7 @@ package com.zdjizhi.utils.functions; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; @@ -58,14 +60,20 @@ public class MapParseFunction implements MapFunction(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); default: break; @@ -119,4 +127,26 @@ public class MapParseFunction implements MapFunction dimensions, Map message, String expr, String resultKeyName, String fieldName) { + try { + if (StringUtil.isNotBlank(expr)) { + String operateValue = JsonParseUtil.getString(message, fieldName); + ArrayList read = JsonPath.parse(operateValue).read(expr); + String flattenResult = read.get(0); + dimensions.put(resultKeyName, flattenResult); + } + } catch (ClassCastException | InvalidPathException e) { + logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e); + } catch (RuntimeException e) { + logger.error("json表达式解析异常,异常信息:" + e); + } + } + }