diff --git a/pom.xml b/pom.xml
index c1119d2..c5a4e80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-olap-analysis-schema
- 211120-hash
+ 20220113-balance
log-olap-analysis-schema
http://www.example.com
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 968c7fb..e5f6b91 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -42,4 +42,10 @@ kafka.pin=galaxy2019
#====================Topology Default====================#
#两个输出之间的最大时间(单位milliseconds)
-buffer.timeout=100
\ No newline at end of file
+buffer.timeout=100
+
+#第一次随机分组random范围
+random.range.num=40
+
+#app_id 更新时间,如填写0则不更新缓存
+app.tick.tuple.freq.secs=0
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 93384d2..9231c1b 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,27 +1,25 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-#source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094
-source.kafka.servers=10.221.12.4:9094
+source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#管理输出kafka地址
-sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094
+sink.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#--------------------------------HTTP------------------------------#
#kafka 证书地址
tools.library=D:\\workerspace\\dat\\
#网关的schema位置
-schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/liveChart_session
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session
#网关APP_ID 获取接口
-app.id.http=http://10.224.11.244:9999/open-api/appDicList
+app.id.http=http://192.168.44.67:9999/open-api/appDicList
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-source.kafka.topic=SESSION-RECORD
-#source.kafka.topic=test
+source.kafka.topic=test
#补全数据 输出 topic
sink.kafka.topic=test-result
@@ -52,8 +50,8 @@ second.window.parallelism=2
#producer 并行度
sink.parallelism=1
-#app_id 更新时间,如填写0则不更新缓存
-app.tick.tuple.freq.secs=0
+#初次随机预聚合窗口时间
+first.count.window.time=5
-#聚合窗口时间
-count.window.time=15
+#二次聚合窗口时间
+second.count.window.time=15
diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
index a95a508..325e04d 100644
--- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
+++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
@@ -18,11 +18,13 @@ public class StreamAggregateConfig {
public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism");
public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism");
public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism");
- public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
- public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
+ public static final Integer FIRST_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.count.window.time");
+ public static final Integer SECOND_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "second.count.window.time");
public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library");
public static final Integer BUFFER_TIMEOUT = StreamAggregateConfigurations.getIntProperty(1, "buffer.timeout");
public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism");
+ public static final Integer RANDOM_RANGE_NUM = StreamAggregateConfigurations.getIntProperty(1, "random.range.num");
/**
diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
index b6299af..98f5c96 100644
--- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
+++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
@@ -7,7 +7,7 @@ import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
@@ -38,19 +38,19 @@ public class StreamAggregateTopology {
DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM);
- SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction())
+ SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction())
.name("ParseDataMap")
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
- WindowedStream, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
+ WindowedStream, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME)));
SingleOutputStreamOperator> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
.name("FirstCountWindow")
.setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
WindowedStream, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME)));
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME)));
SingleOutputStreamOperator secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
index be827c1..1aa32c7 100644
--- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
@@ -6,7 +6,6 @@ import com.zdjizhi.utils.general.MetricFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -15,7 +14,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* @author qidaijie
@@ -23,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @Description:
* @date 2021/7/2113:55
*/
-public class FirstCountWindowFunction extends ProcessWindowFunction, Tuple2, String, TimeWindow> {
+public class FirstCountWindowFunction extends ProcessWindowFunction, Tuple2, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static HashMap metricsMap = JsonParseUtil.getMetricsMap();
@@ -32,14 +30,15 @@ public class FirstCountWindowFunction extends ProcessWindowFunction> input, Collector> output) {
+ public void process(String key, Context context, Iterable> input, Collector> output) {
try {
- for (Tuple4 tuple : input) {
- String label = tuple.f1;
+ for (Tuple3 tuple : input) {
+ String label = tuple.f0;
+ String dimensions = tuple.f1;
+ String message = tuple.f2;
+ String l7_Protocol = label.substring(0, label.indexOf("@"));
//action中某个协议的所有function,如果没有就默认
- String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
- String dimensions = tuple.f2;
- String message = tuple.f3;
+ String[] metricNames = actionMap.getOrDefault(l7_Protocol, actionMap.get("Default"));
if (StringUtil.isNotBlank(message)) {
Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class);
Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
@@ -57,11 +56,9 @@ public class FirstCountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(countKey);
- output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap)));
+ for (String dimensions : cacheMap.keySet()) {
+ Map resultMap = cacheMap.get(dimensions);
+ output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap)));
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java
index e02893d..831c90a 100644
--- a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java
@@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4;
* @Description:
* @date 2021/7/2112:13
*/
-public class FirstKeyByFunction implements KeySelector, String> {
+public class FirstKeyByFunction implements KeySelector, String> {
@Override
- public String getKey(Tuple4 value) throws Exception {
+ public String getKey(Tuple3 value) throws Exception {
// //以map拼接的key分组
return value.f0;
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java
index aa59e6b..bf67eb6 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java
@@ -10,7 +10,7 @@ import com.zdjizhi.utils.general.ParseFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
/**
* @author qidaijie
@@ -25,7 +26,7 @@ import java.util.Map;
* @Description:
* @date 2021/5/2715:01
*/
-public class ParseMapFunction implements MapFunction> {
+public class ParseMapFunction implements MapFunction> {
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static ArrayList jobList = JsonParseUtil.getTransformsList();
@@ -34,11 +35,11 @@ public class ParseMapFunction implements MapFunction map(String message) {
+ public Tuple3 map(String message) {
try {
if (StringUtil.isNotBlank(message)) {
Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
- String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
+// String streamTraceId = JsonMapperParseUtil.getString(object, "common_stream_trace_id");
Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
if (ParseFunctions.filterLogs(object)) {
for (String[] strings : jobList) {
@@ -76,7 +77,8 @@ public class ParseMapFunction implements MapFunction(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
+ String key = JsonParseUtil.getString(object, logsKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
+ return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(object));
default:
break;
}
@@ -84,10 +86,10 @@ public class ParseMapFunction implements MapFunction("","", "", "");
+ logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
+ return new Tuple3<>("", "", "");
}
- return new Tuple4<>("","", "", "");
+ return new Tuple3<>("", "", "");
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
index be78433..d458984 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
@@ -38,7 +38,6 @@ public class ResultFlatMapFunction implements FlatMapFunction {
out.collect(JsonMapper.toJsonString(jsonObject));
}
}
-// out.collect(value);
}
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
index 794b4ea..2d6b546 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
@@ -26,7 +26,6 @@ public class SecondCountWindowFunction extends ProcessWindowFunction metricsMap = JsonParseUtil.getMetricsMap();
- private static HashMap actionMap = JsonParseUtil.getActionMap();
private HashMap> cacheMap = new HashMap<>(320);
private static String resultTimeKey = JsonParseUtil.getTimeKey();
@@ -40,17 +39,15 @@ public class SecondCountWindowFunction extends ProcessWindowFunction dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class);
Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
- String label = JsonParseUtil.getString(object, "protocol_id");
- //action中某个协议的所有function,如果没有就默认
- String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
- for (String name : metricNames) {
+ for (String name : metricsMap.keySet()) {
String[] metrics = metricsMap.get(name);
String function = metrics[0];
functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name));
}
+
cacheMap.put(dimensions, cacheMessage);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
index 6bb3178..6933e7c 100644
--- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
+++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
@@ -3,9 +3,11 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -21,6 +23,7 @@ public class ParseFunctions {
*/
private static HashMap filtersMap = JsonParseUtil.getFiltersMap();
+ private static ArrayList metricsList = JsonParseUtil.getLogMetrics();
/**
* 解析 dimensions 字段集
@@ -62,36 +65,15 @@ public class ParseFunctions {
return available;
}
-// /**
-// * 更新缓存中的对应关系map
-// *
-// * @param hashMap 当前缓存对应关系map
-// */
-// public static void updateAppRelation(HashMap hashMap) {
-// try {
-// Long begin = System.currentTimeMillis();
-// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
-// if (StringUtil.isNotBlank(schema)) {
-// String data = JSONObject.parseObject(schema).getString("data");
-// JSONArray objects = JSONArray.parseArray(data);
-// for (Object object : objects) {
-// JSONArray jsonArray = JSONArray.parseArray(object.toString());
-// int key = jsonArray.getInteger(0);
-// String value = jsonArray.getString(1);
-// if (hashMap.containsKey(key)) {
-// if (!value.equals(hashMap.get(key))) {
-// hashMap.put(key, value);
-// }
-// } else {
-// hashMap.put(key, value);
-// }
-// }
-// logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis()));
-// logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
-// }
-// } catch (RuntimeException e) {
-// logger.error("更新缓存APP-ID失败,异常:" + e);
-// }
-// }
+ public static String getMetricsLog(Map object) {
+
+ Map json = new HashMap<>(16);
+
+ for (String fileName : metricsList) {
+ json.put(fileName, object.get(fileName));
+ }
+
+ return JsonMapper.toJsonString(json);
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index 4a6a01c..0ebe8e1 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -231,6 +231,24 @@ public class JsonParseUtil {
return map;
}
+ /**
+ * 获取Metrics内指标,用于过滤原始日志
+ *
+ * @return 指标列原始名称
+ */
+ public static ArrayList getLogMetrics() {
+ ArrayList list = new ArrayList<>();
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List