diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 2f62434..93384d2 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -43,9 +43,10 @@ source.parallelism=1 #map函数并行度 parse.parallelism=2 -#count 函数并行度 +#first count 函数并行度 first.window.parallelism=2 +#second count 函数并行度 second.window.parallelism=2 #producer 并行度 diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index b726ca8..b6299af 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -7,7 +7,6 @@ import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.kafka.Consumer; import com.zdjizhi.utils.kafka.Producer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -39,7 +38,7 @@ public class StreamAggregateTopology { DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM); - SingleOutputStreamOperator> parseDataMap = streamSource.map(new MapParseFunction()) + SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java index 1f821c1..be827c1 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie @@ -28,7 +29,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction metricsMap = JsonParseUtil.getMetricsMap(); private static HashMap actionMap = JsonParseUtil.getActionMap(); private HashMap> cacheMap = new HashMap<>(320); - private static String resultTimeKey = JsonParseUtil.getTimeKey(); @Override @SuppressWarnings("unchecked") @@ -61,7 +61,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(countKey); - JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap))); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java similarity index 94% rename from src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java rename to src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java index 90a21a1..aa59e6b 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java @@ -25,7 +25,7 @@ import java.util.Map; * @Description: * @date 2021/5/2715:01 */ -public class MapParseFunction implements MapFunction> { +public class ParseMapFunction implements MapFunction> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); private static ArrayList jobList = JsonParseUtil.getTransformsList(); @@ -38,7 +38,7 @@ public class MapParseFunction implements MapFunction object = (Map) JsonMapper.fromJsonString(message, Map.class); -// String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); + String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); if (ParseFunctions.filterLogs(object)) { for (String[] strings : jobList) { @@ -76,8 +76,6 @@ public class MapParseFunction implements MapFunction(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); default: break; diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java index 455414a..794b4ea 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java @@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java index 5ab46e6..6bb3178 100644 --- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -51,8 +51,8 @@ public class ParseFunctions { for (String key : filtersMap.keySet()) { switch (key) { case "notempty": - Object value = JsonParseUtil.getValue(object, filtersMap.get(key)); - if (value != null && StringUtil.isNotBlank(value.toString())) { + String value = JsonParseUtil.getString(object, filtersMap.get(key)); + if (StringUtil.isNotBlank(value)) { available = true; } break;