修改配置文件名称

修改统计逻辑两层窗口计算
This commit is contained in:
qidaijie
2021-11-20 11:30:08 +03:00
parent 49f78a2f49
commit 2a32156c9e
15 changed files with 275 additions and 126 deletions

View File

@@ -14,22 +14,23 @@ public class StreamAggregateConfig {
/**
* System
*/
public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism");
public static final Integer SOURCE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism");
public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism");
public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library");
public static final Integer BUFFER_TIMEOUT = StreamAggregateConfigurations.getIntProperty(1, "buffer.timeout");
public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism");
/**
* kafka source
*/
public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers");
public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic");
public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack");
public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol");
public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol");
public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin");
@@ -41,6 +42,18 @@ public class StreamAggregateConfig {
public static final Integer MAX_REQUEST_SIZE = StreamAggregateConfigurations.getIntProperty(1, "max.request.size");
/**
* kafka source config
*/
public static final String SOURCE_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SOURCE_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol");
public static final String SESSION_TIMEOUT_MS = StreamAggregateConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = StreamAggregateConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = StreamAggregateConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka限流配置-20201117
*/

View File

@@ -6,7 +6,9 @@ import com.zdjizhi.common.StreamAggregateConfig;
import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
@@ -31,20 +33,31 @@ public class StreamAggregateTopology {
// environment.enableCheckpointing(5000);
DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM);
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT);
SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap")
DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM);
SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new MapParseFunction())
.name("ParseDataMap")
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction())
WindowedStream<Tuple4<String, String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, String>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
.name("FirstCountWindow")
.setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
WindowedStream<Tuple2<String, String>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME)));
SingleOutputStreamOperator<String> metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow")
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
SingleOutputStreamOperator<String> secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM)
.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
secondCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM)
.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM);
environment.execute(args[0]);
} catch (Exception e) {

View File

@@ -1,12 +1,12 @@
package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.MetricFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -15,7 +15,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author qidaijie
@@ -23,25 +22,25 @@ import java.util.concurrent.ConcurrentHashMap;
* @Description:
* @date 2021/7/2113:55
*/
public class CountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<String, String, String, String>, Tuple2<String, String>, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(32);
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320);
private static String resultTimeKey = JsonParseUtil.getTimeKey();
@Override
@SuppressWarnings("unchecked")
public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) {
public void process(String key, Context context, Iterable<Tuple4<String, String, String, String>> input, Collector<Tuple2<String, String>> output) {
try {
for (Tuple3<String, String, String> tuple : input) {
String label = tuple.f0;
for (Tuple4<String, String, String, String> tuple : input) {
String label = tuple.f1;
//action中某个协议的所有function,如果没有就默认
String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
String dimensions = tuple.f1;
String message = tuple.f2;
if (StringUtil.isNotBlank(message)){
String dimensions = tuple.f2;
String message = tuple.f3;
if (StringUtil.isNotBlank(message)) {
Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
@@ -63,9 +62,8 @@ public class CountWindowFunction extends ProcessWindowFunction<Tuple3<String, St
for (String countKey : cacheMap.keySet()) {
Map<String, Object> resultMap = cacheMap.get(countKey);
JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
output.collect(JsonMapper.toJsonString(resultMap));
output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap)));
}
// cacheMap.clear();
}
} catch (RuntimeException e) {

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.utils.functions;
import cn.hutool.core.util.RandomUtil;
import com.zdjizhi.common.StreamAggregateConfig;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2112:13
*/
public class FirstKeyByFunction implements KeySelector<Tuple4<String, String, String, String>, String> {
@Override
public String getKey(Tuple4<String, String, String, String> value) throws Exception {
// //以map拼接的key分组
return value.f0;
}
}

View File

@@ -1,19 +0,0 @@
package com.zdjizhi.utils.functions;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2112:13
*/
public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> {
@Override
public String getKey(Tuple3<String, String, String> value) throws Exception {
//以map拼接的key分组
return value.f1;
}
}

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.utils.functions;
import cn.hutool.core.util.RandomUtil;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.StreamAggregateConfig;
@@ -8,8 +9,8 @@ import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.ParseFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,8 +25,8 @@ import java.util.Map;
* @Description:
* @date 2021/5/2715:01
*/
public class MapParseFunction implements MapFunction<String, Tuple3<String, String, String>> {
private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
public class MapParseFunction implements MapFunction<String,Tuple4<String,String, String, String>> {
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
@@ -33,10 +34,11 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri
@Override
@SuppressWarnings("unchecked")
public Tuple3<String, String, String> map(String message) {
public Tuple4<String,String, String, String> map(String message) {
try {
if (StringUtil.isNotBlank(message)) {
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
// String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
if (ParseFunctions.filterLogs(object)) {
for (String[] strings : jobList) {
@@ -74,7 +76,9 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri
}
break;
case "hierarchy":
return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
// RandomUtil.randomInt(0, StreamAggregateConfig.COUNT_PARALLELISM)
return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
default:
break;
}
@@ -83,9 +87,9 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri
}
} catch (RuntimeException e) {
logger.error("Map Parse error,message:" + e);
return new Tuple3<>("", "", "");
return new Tuple4<>("","", "", "");
}
return new Tuple3<>("", "", "");
return new Tuple4<>("","", "", "");
}
/**

View File

@@ -1,22 +0,0 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2117:32
*/
public class MyTimeAssigner implements SerializableTimestampAssigner<String> {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(element, Map.class);
return JsonParseUtil.getLong(object,"common_end_time");
}
}

View File

@@ -38,6 +38,7 @@ public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
out.collect(JsonMapper.toJsonString(jsonObject));
}
}
// out.collect(value);
}
}
}

View File

@@ -0,0 +1,101 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.MetricFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2113:55
*/
public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class);
private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320);
private static String resultTimeKey = JsonParseUtil.getTimeKey();
@Override
@SuppressWarnings("unchecked")
public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<String> output) {
try {
for (Tuple2<String, String> tuple : input) {
String dimensions = tuple.f0;
String message = tuple.f1;
if (StringUtil.isNotBlank(message)) {
Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
String label = JsonParseUtil.getString(object, "protocol_id");
//action中某个协议的所有function,如果没有就默认
String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
for (String name : metricNames) {
String[] metrics = metricsMap.get(name);
String function = metrics[0];
functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name));
}
cacheMap.put(dimensions, cacheMessage);
}
}
if (!cacheMap.isEmpty()) {
Long endTime = context.window().getEnd() / 1000;
for (String countKey : cacheMap.keySet()) {
Map<String, Object> resultMap = cacheMap.get(countKey);
JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
output.collect(JsonMapper.toJsonString(resultMap));
}
}
} catch (RuntimeException e) {
logger.error("windows count error,message:" + e);
e.printStackTrace();
} finally {
cacheMap.clear();
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 函数名称
* @param cacheMessage 结果集
* @param nameValue 当前值
* @param fieldNameValue 新加值
*/
private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) {
switch (function) {
case "sum":
cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue));
break;
case "count":
cacheMessage.put(resultName, MetricFunctions.count(nameValue));
break;
case "unique_sip_num":
//TODO
break;
case "unique_cip_num":
//TODO
break;
default:
break;
}
}
}

View File

@@ -0,0 +1,24 @@
package com.zdjizhi.utils.functions;
import cn.hutool.core.util.RandomUtil;
import com.zdjizhi.common.StreamAggregateConfig;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/7/2112:13
*/
public class SecondKeyByFunction implements KeySelector<Tuple2<String,String>, String> {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
//以map拼接的key分组
return value.f0;
}
}

View File

@@ -17,11 +17,11 @@ import java.util.Properties;
public class Consumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", StreamAggregateConfig.INPUT_KAFKA_SERVERS);
properties.put("bootstrap.servers", StreamAggregateConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", StreamAggregateConfig.GROUP_ID);
properties.put("session.timeout.ms", "60000");
properties.put("max.poll.records", 3000);
properties.put("max.partition.fetch.bytes", 31457280);
properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@@ -31,7 +31,7 @@ public class Consumer {
}
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.INPUT_KAFKA_TOPIC,
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);

View File

@@ -19,7 +19,7 @@ public class Producer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", StreamAggregateConfig.OUTPUT_KAFKA_SERVERS);
properties.put("bootstrap.servers", StreamAggregateConfig.SINK_KAFKA_SERVERS);
properties.put("acks", StreamAggregateConfig.PRODUCER_ACK);
properties.put("retries", StreamAggregateConfig.RETRIES);
properties.put("linger.ms", StreamAggregateConfig.LINGER_MS);
@@ -37,7 +37,7 @@ public class Producer {
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
StreamAggregateConfig.OUTPUT_KAFKA_TOPIC,
StreamAggregateConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());