修复分组不均导致的缓冲区被破坏(GAL-121)
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>log-olap-analysis-schema</artifactId>
|
||||
<version>211120-hash</version>
|
||||
<version>20220113-balance</version>
|
||||
|
||||
<name>log-olap-analysis-schema</name>
|
||||
<url>http://www.example.com</url>
|
||||
|
||||
@@ -42,4 +42,10 @@ kafka.pin=galaxy2019
|
||||
#====================Topology Default====================#
|
||||
|
||||
#两个输出之间的最大时间(单位milliseconds)
|
||||
buffer.timeout=100
|
||||
buffer.timeout=100
|
||||
|
||||
#第一次随机分组random范围
|
||||
random.range.num=40
|
||||
|
||||
#app_id 更新时间,如填写0则不更新缓存
|
||||
app.tick.tuple.freq.secs=0
|
||||
@@ -1,27 +1,25 @@
|
||||
#--------------------------------地址配置------------------------------#
|
||||
|
||||
#管理kafka地址
|
||||
#source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094
|
||||
source.kafka.servers=10.221.12.4:9094
|
||||
source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
|
||||
#管理输出kafka地址
|
||||
sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094
|
||||
sink.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
|
||||
#--------------------------------HTTP------------------------------#
|
||||
#kafka 证书地址
|
||||
tools.library=D:\\workerspace\\dat\\
|
||||
|
||||
#网关的schema位置
|
||||
schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/liveChart_session
|
||||
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session
|
||||
|
||||
#网关APP_ID 获取接口
|
||||
app.id.http=http://10.224.11.244:9999/open-api/appDicList
|
||||
app.id.http=http://192.168.44.67:9999/open-api/appDicList
|
||||
|
||||
#--------------------------------Kafka消费组信息------------------------------#
|
||||
|
||||
#kafka 接收数据topic
|
||||
source.kafka.topic=SESSION-RECORD
|
||||
#source.kafka.topic=test
|
||||
source.kafka.topic=test
|
||||
|
||||
#补全数据 输出 topic
|
||||
sink.kafka.topic=test-result
|
||||
@@ -52,8 +50,8 @@ second.window.parallelism=2
|
||||
#producer 并行度
|
||||
sink.parallelism=1
|
||||
|
||||
#app_id 更新时间,如填写0则不更新缓存
|
||||
app.tick.tuple.freq.secs=0
|
||||
#初次随机预聚合窗口时间
|
||||
first.count.window.time=5
|
||||
|
||||
#聚合窗口时间
|
||||
count.window.time=15
|
||||
#二次聚合窗口时间
|
||||
second.count.window.time=15
|
||||
|
||||
@@ -18,11 +18,13 @@ public class StreamAggregateConfig {
|
||||
public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism");
|
||||
public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism");
|
||||
public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism");
|
||||
public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
|
||||
public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time");
|
||||
public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
|
||||
public static final Integer FIRST_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.count.window.time");
|
||||
public static final Integer SECOND_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "second.count.window.time");
|
||||
public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library");
|
||||
public static final Integer BUFFER_TIMEOUT = StreamAggregateConfigurations.getIntProperty(1, "buffer.timeout");
|
||||
public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism");
|
||||
public static final Integer RANDOM_RANGE_NUM = StreamAggregateConfigurations.getIntProperty(1, "random.range.num");
|
||||
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,7 +7,7 @@ import com.zdjizhi.utils.functions.*;
|
||||
import com.zdjizhi.utils.kafka.Consumer;
|
||||
import com.zdjizhi.utils.kafka.Producer;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.datastream.WindowedStream;
|
||||
@@ -38,19 +38,19 @@ public class StreamAggregateTopology {
|
||||
DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
|
||||
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM);
|
||||
|
||||
SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction())
|
||||
SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction())
|
||||
.name("ParseDataMap")
|
||||
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
|
||||
|
||||
WindowedStream<Tuple4<String, String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
|
||||
WindowedStream<Tuple3<String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME)));
|
||||
|
||||
SingleOutputStreamOperator<Tuple2<String, String>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
|
||||
.name("FirstCountWindow")
|
||||
.setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
|
||||
|
||||
WindowedStream<Tuple2<String, String>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME)));
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME)));
|
||||
|
||||
SingleOutputStreamOperator<String> secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
|
||||
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
|
||||
|
||||
@@ -6,7 +6,6 @@ import com.zdjizhi.utils.general.MetricFunctions;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
@@ -15,7 +14,6 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
@@ -23,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
* @Description:
|
||||
* @date 2021/7/2113:55
|
||||
*/
|
||||
public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<String, String, String, String>, Tuple2<String, String>, String, TimeWindow> {
|
||||
public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, Tuple2<String, String>, String, TimeWindow> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
|
||||
|
||||
private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
|
||||
@@ -32,14 +30,15 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void process(String key, Context context, Iterable<Tuple4<String, String, String, String>> input, Collector<Tuple2<String, String>> output) {
|
||||
public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<Tuple2<String, String>> output) {
|
||||
try {
|
||||
for (Tuple4<String, String, String, String> tuple : input) {
|
||||
String label = tuple.f1;
|
||||
for (Tuple3<String, String, String> tuple : input) {
|
||||
String label = tuple.f0;
|
||||
String dimensions = tuple.f1;
|
||||
String message = tuple.f2;
|
||||
String l7_Protocol = label.substring(0, label.indexOf("@"));
|
||||
//action中某个协议的所有function,如果没有就默认
|
||||
String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
|
||||
String dimensions = tuple.f2;
|
||||
String message = tuple.f3;
|
||||
String[] metricNames = actionMap.getOrDefault(l7_Protocol, actionMap.get("Default"));
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
|
||||
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
@@ -57,11 +56,9 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin
|
||||
}
|
||||
|
||||
if (!cacheMap.isEmpty()) {
|
||||
Long endTime = context.window().getEnd() / 1000;
|
||||
|
||||
for (String countKey : cacheMap.keySet()) {
|
||||
Map<String, Object> resultMap = cacheMap.get(countKey);
|
||||
output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap)));
|
||||
for (String dimensions : cacheMap.keySet()) {
|
||||
Map<String, Object> resultMap = cacheMap.get(dimensions);
|
||||
output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4;
|
||||
* @Description:
|
||||
* @date 2021/7/2112:13
|
||||
*/
|
||||
public class FirstKeyByFunction implements KeySelector<Tuple4<String, String, String, String>, String> {
|
||||
public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, String>, String> {
|
||||
|
||||
@Override
|
||||
public String getKey(Tuple4<String, String, String, String> value) throws Exception {
|
||||
public String getKey(Tuple3<String, String, String> value) throws Exception {
|
||||
// //以map拼接的key分组
|
||||
return value.f0;
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import com.zdjizhi.utils.general.ParseFunctions;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple4;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
@@ -25,7 +26,7 @@ import java.util.Map;
|
||||
* @Description:
|
||||
* @date 2021/5/2715:01
|
||||
*/
|
||||
public class ParseMapFunction implements MapFunction<String,Tuple4<String,String, String, String>> {
|
||||
public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, String>> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
|
||||
|
||||
private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
|
||||
@@ -34,11 +35,11 @@ public class ParseMapFunction implements MapFunction<String,Tuple4<String,String
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Tuple4<String,String, String, String> map(String message) {
|
||||
public Tuple3<String, String, String> map(String message) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
|
||||
// String streamTraceId = JsonMapperParseUtil.getString(object, "common_stream_trace_id");
|
||||
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
|
||||
if (ParseFunctions.filterLogs(object)) {
|
||||
for (String[] strings : jobList) {
|
||||
@@ -76,7 +77,8 @@ public class ParseMapFunction implements MapFunction<String,Tuple4<String,String
|
||||
}
|
||||
break;
|
||||
case "hierarchy":
|
||||
return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
|
||||
String key = JsonParseUtil.getString(object, logsKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
|
||||
return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(object));
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@@ -84,10 +86,10 @@ public class ParseMapFunction implements MapFunction<String,Tuple4<String,String
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("Map Parse error,message:" + e);
|
||||
return new Tuple4<>("","", "", "");
|
||||
logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
|
||||
return new Tuple3<>("", "", "");
|
||||
}
|
||||
return new Tuple4<>("","", "", "");
|
||||
return new Tuple3<>("", "", "");
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -38,7 +38,6 @@ public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
|
||||
out.collect(JsonMapper.toJsonString(jsonObject));
|
||||
}
|
||||
}
|
||||
// out.collect(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri
|
||||
private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class);
|
||||
|
||||
private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
|
||||
private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
|
||||
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320);
|
||||
private static String resultTimeKey = JsonParseUtil.getTimeKey();
|
||||
|
||||
@@ -40,17 +39,15 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
|
||||
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
String label = JsonParseUtil.getString(object, "protocol_id");
|
||||
//action中某个协议的所有function,如果没有就默认
|
||||
String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
|
||||
|
||||
Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
|
||||
for (String name : metricNames) {
|
||||
for (String name : metricsMap.keySet()) {
|
||||
String[] metrics = metricsMap.get(name);
|
||||
String function = metrics[0];
|
||||
functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name));
|
||||
|
||||
}
|
||||
|
||||
cacheMap.put(dimensions, cacheMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,11 @@ package com.zdjizhi.utils.general;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -21,6 +23,7 @@ public class ParseFunctions {
|
||||
*/
|
||||
private static HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap();
|
||||
|
||||
private static ArrayList<String> metricsList = JsonParseUtil.getLogMetrics();
|
||||
|
||||
/**
|
||||
* 解析 dimensions 字段集
|
||||
@@ -62,36 +65,15 @@ public class ParseFunctions {
|
||||
return available;
|
||||
}
|
||||
|
||||
// /**
|
||||
// * 更新缓存中的对应关系map
|
||||
// *
|
||||
// * @param hashMap 当前缓存对应关系map
|
||||
// */
|
||||
// public static void updateAppRelation(HashMap<Integer, String> hashMap) {
|
||||
// try {
|
||||
// Long begin = System.currentTimeMillis();
|
||||
// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
|
||||
// if (StringUtil.isNotBlank(schema)) {
|
||||
// String data = JSONObject.parseObject(schema).getString("data");
|
||||
// JSONArray objects = JSONArray.parseArray(data);
|
||||
// for (Object object : objects) {
|
||||
// JSONArray jsonArray = JSONArray.parseArray(object.toString());
|
||||
// int key = jsonArray.getInteger(0);
|
||||
// String value = jsonArray.getString(1);
|
||||
// if (hashMap.containsKey(key)) {
|
||||
// if (!value.equals(hashMap.get(key))) {
|
||||
// hashMap.put(key, value);
|
||||
// }
|
||||
// } else {
|
||||
// hashMap.put(key, value);
|
||||
// }
|
||||
// }
|
||||
// logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis()));
|
||||
// logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
|
||||
// }
|
||||
// } catch (RuntimeException e) {
|
||||
// logger.error("更新缓存APP-ID失败,异常:" + e);
|
||||
// }
|
||||
// }
|
||||
public static String getMetricsLog(Map<String, Object> object) {
|
||||
|
||||
Map<String, Object> json = new HashMap<>(16);
|
||||
|
||||
for (String fileName : metricsList) {
|
||||
json.put(fileName, object.get(fileName));
|
||||
}
|
||||
|
||||
return JsonMapper.toJsonString(json);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -231,6 +231,24 @@ public class JsonParseUtil {
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取Metrics内指标,用于过滤原始日志
|
||||
*
|
||||
* @return 指标列原始名称
|
||||
*/
|
||||
public static ArrayList<String> getLogMetrics() {
|
||||
ArrayList<String> list = new ArrayList<>();
|
||||
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
|
||||
DocumentContext parse = JsonPath.parse(schema);
|
||||
|
||||
List<Object> metrics = parse.read("$.data.doc.metrics[*]");
|
||||
|
||||
for (Object metric : metrics) {
|
||||
list.add(JsonPath.read(metric, "$.fieldName"));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
||||
*
|
||||
|
||||
@@ -39,14 +39,6 @@ public class TypeUtils {
|
||||
return ((Number) value).longValue();
|
||||
}
|
||||
|
||||
// if (value instanceof Map) {
|
||||
// return (Map) value;
|
||||
// }
|
||||
//
|
||||
// if (value instanceof List) {
|
||||
// return Collections.singletonList(value.toString());
|
||||
// }
|
||||
|
||||
if (value instanceof Boolean) {
|
||||
return (Boolean) value ? 1 : 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user