diff --git a/pom.xml b/pom.xml
index dd0565f..c1119d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-olap-analysis-schema
- 211105-flattenSpec
+ 211120-hash
log-olap-analysis-schema
http://www.example.com
@@ -116,7 +116,7 @@
com.zdjizhi
galaxy
- 1.0.6
+ 1.0.7
slf4j-log4j12
@@ -129,15 +129,6 @@
-
-
-
-
-
-
-
-
-
org.apache.flink
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 2b9bfb1..968c7fb 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -1,29 +1,45 @@
-#producer重试的次数设置
+#====================Kafka Consumer====================#
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+#====================Kafka Producer====================#
+#producer閲嶈瘯鐨勬鏁拌缃
retries=0
-#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
-linger.ms=5
+#浠栫殑鍚箟灏辨槸璇翠竴涓狟atch琚垱寤轰箣鍚庯紝鏈澶氳繃澶氫箙锛屼笉绠¤繖涓狟atch鏈夋病鏈夊啓婊★紝閮藉繀椤诲彂閫佸嚭鍘讳簡
+linger.ms=10
-#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+#濡傛灉鍦ㄨ秴鏃朵箣鍓嶆湭鏀跺埌鍝嶅簲锛屽鎴风灏嗗湪蹇呰鏃堕噸鏂板彂閫佽姹
request.timeout.ms=30000
-#producer都是按照batch进行发送的,批次大小,默认:16384
+#producer閮芥槸鎸夌収batch杩涜鍙戦佺殑,鎵规澶у皬锛岄粯璁:16384
batch.size=262144
-#Producer端用于缓存消息的缓冲区大小
-buffer.memory=67108864
-
-#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
-max.request.size=5242880
-
-#kafka SASL验证用户名
-kafka.user=admin
-
-#kafka SASL及SSL验证密码
-kafka.pin=galaxy2019
+#Producer绔敤浜庣紦瀛樻秷鎭殑缂撳啿鍖哄ぇ灏
+#128M
+buffer.memory=134217728
+#杩欎釜鍙傛暟鍐冲畾浜嗘瘡娆″彂閫佺粰Kafka鏈嶅姟鍣ㄨ姹傜殑鏈澶уぇ灏,榛樿1048576
+#10M
+max.request.size=10485760
+#====================kafka default====================#
#kafka source protocol; SSL or SASL
kafka.source.protocol=SASL
#kafka sink protocol; SSL or SASL
-kafka.sink.protocol=SASL
\ No newline at end of file
+kafka.sink.protocol=SASL
+
+#kafka SASL楠岃瘉鐢ㄦ埛鍚
+kafka.user=admin
+
+#kafka SASL鍙奡SL楠岃瘉瀵嗙爜
+kafka.pin=galaxy2019
+#====================Topology Default====================#
+
+#涓や釜杈撳嚭涔嬮棿鐨勬渶澶ф椂闂(鍗曚綅milliseconds)
+buffer.timeout=100
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 91f018d..2f62434 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,34 +1,33 @@
#--------------------------------鍦板潃閰嶇疆------------------------------#
#绠$悊kafka鍦板潃
-input.kafka.servers=192.168.44.12:9094
+#source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094
+source.kafka.servers=10.221.12.4:9094
#绠$悊杈撳嚭kafka鍦板潃
-output.kafka.servers=192.168.44.12:9094
+sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094
#--------------------------------HTTP------------------------------#
#kafka 璇佷功鍦板潃
-tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+tools.library=D:\\workerspace\\dat\\
#缃戝叧鐨剆chema浣嶇疆
-#schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_interim
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session
+schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/liveChart_session
#缃戝叧APP_ID 鑾峰彇鎺ュ彛
-app.id.http=http://192.168.44.67:9999/open-api/appDicList
+app.id.http=http://10.224.11.244:9999/open-api/appDicList
#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
#kafka 鎺ユ敹鏁版嵁topic
-input.kafka.topic=test
-#input.kafka.topic=SESSION-RECORD
-#input.kafka.topic=INTERIM-SESSION-RECORD
+source.kafka.topic=SESSION-RECORD
+#source.kafka.topic=test
#琛ュ叏鏁版嵁 杈撳嚭 topic
-output.kafka.topic=test-result
+sink.kafka.topic=test-result
#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
-group.id=liveCharts-session-test-20210811-1
+group.id=mytest-211119-1
#鐢熶骇鑰呭帇缂╂ā寮 none or snappy
producer.kafka.compression.type=none
@@ -39,13 +38,21 @@ producer.ack=1
#--------------------------------topology閰嶇疆------------------------------#
#consumer 骞惰搴
-consumer.parallelism=1
+source.parallelism=1
#map鍑芥暟骞惰搴
-parse.parallelism=1
+parse.parallelism=2
+
+#count 鍑芥暟骞惰搴
+first.window.parallelism=2
+
+second.window.parallelism=2
+
+#producer 骞惰搴
+sink.parallelism=1
#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨
app.tick.tuple.freq.secs=0
#鑱氬悎绐楀彛鏃堕棿
-count.window.time=15
\ No newline at end of file
+count.window.time=15
diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
index 3647c5a..a95a508 100644
--- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
+++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
@@ -14,22 +14,23 @@ public class StreamAggregateConfig {
/**
* System
*/
- public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism");
+ public static final Integer SOURCE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism");
+ public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism");
+ public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library");
+ public static final Integer BUFFER_TIMEOUT = StreamAggregateConfigurations.getIntProperty(1, "buffer.timeout");
+ public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism");
+
/**
* kafka source
*/
- public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers");
- public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers");
- public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
- public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic");
- public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack");
- public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol");
public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol");
public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin");
@@ -41,6 +42,18 @@ public class StreamAggregateConfig {
public static final Integer MAX_REQUEST_SIZE = StreamAggregateConfigurations.getIntProperty(1, "max.request.size");
+ /**
+ * kafka source config
+ */
+ public static final String SOURCE_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SOURCE_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
+ public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String SESSION_TIMEOUT_MS = StreamAggregateConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = StreamAggregateConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = StreamAggregateConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+
/**
* kafka闄愭祦閰嶇疆-20201117
*/
diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
index 3d9eb3e..b726ca8 100644
--- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
+++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
@@ -6,7 +6,9 @@ import com.zdjizhi.common.StreamAggregateConfig;
import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
@@ -31,20 +33,31 @@ public class StreamAggregateTopology {
// environment.enableCheckpointing(5000);
- DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer())
- .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM);
+ //涓や釜杈撳嚭涔嬮棿鐨勬渶澶ф椂闂 (鍗曚綅milliseconds)
+ environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT);
- SingleOutputStreamOperator> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap")
+ DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer())
+ .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM);
+
+ SingleOutputStreamOperator> parseDataMap = streamSource.map(new MapParseFunction())
+ .name("ParseDataMap")
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
- WindowedStream, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction())
+ WindowedStream, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
+
+ SingleOutputStreamOperator> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
+ .name("FirstCountWindow")
+ .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
+
+ WindowedStream, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME)));
- SingleOutputStreamOperator metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow")
- .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+ SingleOutputStreamOperator secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
+ .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
- metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM)
- .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+ secondCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM)
+ .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM);
environment.execute(args[0]);
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
similarity index 83%
rename from src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java
rename to src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
index 5f22a6b..1f821c1 100644
--- a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
@@ -1,12 +1,12 @@
package com.zdjizhi.utils.functions;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.MetricFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -15,7 +15,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* @author qidaijie
@@ -23,25 +22,25 @@ import java.util.concurrent.ConcurrentHashMap;
* @Description:
* @date 2021/7/2113:55
*/
-public class CountWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> {
- private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
+public class FirstCountWindowFunction extends ProcessWindowFunction, Tuple2, String, TimeWindow> {
+ private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static HashMap metricsMap = JsonParseUtil.getMetricsMap();
private static HashMap actionMap = JsonParseUtil.getActionMap();
- private HashMap> cacheMap = new HashMap<>(32);
+ private HashMap> cacheMap = new HashMap<>(320);
private static String resultTimeKey = JsonParseUtil.getTimeKey();
@Override
@SuppressWarnings("unchecked")
- public void process(String key, Context context, Iterable> input, Collector output) {
+ public void process(String key, Context context, Iterable> input, Collector> output) {
try {
- for (Tuple3 tuple : input) {
- String label = tuple.f0;
+ for (Tuple4 tuple : input) {
+ String label = tuple.f1;
//action涓煇涓崗璁殑鎵鏈塮unction,濡傛灉娌℃湁灏遍粯璁
String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
- String dimensions = tuple.f1;
- String message = tuple.f2;
- if (StringUtil.isNotBlank(message)){
+ String dimensions = tuple.f2;
+ String message = tuple.f3;
+ if (StringUtil.isNotBlank(message)) {
Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class);
Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
@@ -63,9 +62,8 @@ public class CountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(countKey);
JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
- output.collect(JsonMapper.toJsonString(resultMap));
+ output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap)));
}
-// cacheMap.clear();
}
} catch (RuntimeException e) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java
new file mode 100644
index 0000000..e02893d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.core.util.RandomUtil;
+import com.zdjizhi.common.StreamAggregateConfig;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2112:13
+ */
+public class FirstKeyByFunction implements KeySelector, String> {
+
+ @Override
+ public String getKey(Tuple4 value) throws Exception {
+// //浠ap鎷兼帴鐨刱ey鍒嗙粍
+ return value.f0;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
deleted file mode 100644
index 0b00b3c..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/7/2112:13
- */
-public class KeyByFunction implements KeySelector, String> {
-
- @Override
- public String getKey(Tuple3 value) throws Exception {
- //浠ap鎷兼帴鐨刱ey鍒嗙粍
- return value.f1;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
index 7244b1d..90a21a1 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
@@ -1,5 +1,6 @@
package com.zdjizhi.utils.functions;
+import cn.hutool.core.util.RandomUtil;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.StreamAggregateConfig;
@@ -8,8 +9,8 @@ import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.ParseFunctions;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,8 +25,8 @@ import java.util.Map;
* @Description:
* @date 2021/5/2715:01
*/
-public class MapParseFunction implements MapFunction> {
- private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
+public class MapParseFunction implements MapFunction> {
+ private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static ArrayList jobList = JsonParseUtil.getTransformsList();
@@ -33,10 +34,11 @@ public class MapParseFunction implements MapFunction map(String message) {
+ public Tuple4 map(String message) {
try {
if (StringUtil.isNotBlank(message)) {
Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
+// String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
if (ParseFunctions.filterLogs(object)) {
for (String[] strings : jobList) {
@@ -74,7 +76,9 @@ public class MapParseFunction implements MapFunction(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
+ String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
+// RandomUtil.randomInt(0, StreamAggregateConfig.COUNT_PARALLELISM)
+ return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
default:
break;
}
@@ -83,9 +87,9 @@ public class MapParseFunction implements MapFunction("", "", "");
+ return new Tuple4<>("","", "", "");
}
- return new Tuple3<>("", "", "");
+ return new Tuple4<>("","", "", "");
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java
deleted file mode 100644
index 4ba139f..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
-
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/7/2117:32
- */
-public class MyTimeAssigner implements SerializableTimestampAssigner {
- @Override
- public long extractTimestamp(String element, long recordTimestamp) {
- Map object = (Map) JsonMapper.fromJsonString(element, Map.class);
-
- return JsonParseUtil.getLong(object,"common_end_time");
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
index d458984..be78433 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
@@ -38,6 +38,7 @@ public class ResultFlatMapFunction implements FlatMapFunction {
out.collect(JsonMapper.toJsonString(jsonObject));
}
}
+// out.collect(value);
}
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
new file mode 100644
index 0000000..455414a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
@@ -0,0 +1,101 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.MetricFunctions;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2113:55
+ */
+public class SecondCountWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> {
+ private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class);
+
+ private static HashMap metricsMap = JsonParseUtil.getMetricsMap();
+ private static HashMap actionMap = JsonParseUtil.getActionMap();
+ private HashMap> cacheMap = new HashMap<>(320);
+ private static String resultTimeKey = JsonParseUtil.getTimeKey();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void process(String key, Context context, Iterable> input, Collector output) {
+ try {
+ for (Tuple2 tuple : input) {
+ String dimensions = tuple.f0;
+ String message = tuple.f1;
+ if (StringUtil.isNotBlank(message)) {
+ Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class);
+ Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
+ String label = JsonParseUtil.getString(object, "protocol_id");
+ //action涓煇涓崗璁殑鎵鏈塮unction,濡傛灉娌℃湁灏遍粯璁
+ String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
+
+ Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
+ for (String name : metricNames) {
+ String[] metrics = metricsMap.get(name);
+ String function = metrics[0];
+ functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name));
+
+ }
+ cacheMap.put(dimensions, cacheMessage);
+ }
+ }
+
+ if (!cacheMap.isEmpty()) {
+ Long endTime = context.window().getEnd() / 1000;
+
+ for (String countKey : cacheMap.keySet()) {
+ Map resultMap = cacheMap.get(countKey);
+ JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
+ output.collect(JsonMapper.toJsonString(resultMap));
+ }
+ }
+
+ } catch (RuntimeException e) {
+ logger.error("windows count error,message:" + e);
+ e.printStackTrace();
+ } finally {
+ cacheMap.clear();
+ }
+ }
+
+ /**
+ * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎
+ *
+ * @param function 鍑芥暟鍚嶇О
+ * @param cacheMessage 缁撴灉闆
+ * @param nameValue 褰撳墠鍊
+ * @param fieldNameValue 鏂板姞鍊
+ */
+ private static void functionSet(String function, Map cacheMessage, String resultName, Object nameValue, Object fieldNameValue) {
+ switch (function) {
+ case "sum":
+ cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue));
+ break;
+ case "count":
+ cacheMessage.put(resultName, MetricFunctions.count(nameValue));
+ break;
+ case "unique_sip_num":
+ //TODO
+ break;
+ case "unique_cip_num":
+ //TODO
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java
new file mode 100644
index 0000000..c27bd04
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java
@@ -0,0 +1,24 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.core.util.RandomUtil;
+import com.zdjizhi.common.StreamAggregateConfig;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2112:13
+ */
+public class SecondKeyByFunction implements KeySelector, String> {
+
+ @Override
+ public String getKey(Tuple2 value) throws Exception {
+
+ //浠ap鎷兼帴鐨刱ey鍒嗙粍
+ return value.f0;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
index a24ab4e..9379a1e 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -17,11 +17,11 @@ import java.util.Properties;
public class Consumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", StreamAggregateConfig.INPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", StreamAggregateConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", StreamAggregateConfig.GROUP_ID);
- properties.put("session.timeout.ms", "60000");
- properties.put("max.poll.records", 3000);
- properties.put("max.partition.fetch.bytes", 31457280);
+ properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@@ -31,7 +31,7 @@ public class Consumer {
}
public static FlinkKafkaConsumer getKafkaConsumer() {
- FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.INPUT_KAFKA_TOPIC,
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
index 65330b5..dc407e7 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -19,7 +19,7 @@ public class Producer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", StreamAggregateConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", StreamAggregateConfig.SINK_KAFKA_SERVERS);
properties.put("acks", StreamAggregateConfig.PRODUCER_ACK);
properties.put("retries", StreamAggregateConfig.RETRIES);
properties.put("linger.ms", StreamAggregateConfig.LINGER_MS);
@@ -37,7 +37,7 @@ public class Producer {
public static FlinkKafkaProducer getKafkaProducer() {
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
- StreamAggregateConfig.OUTPUT_KAFKA_TOPIC,
+ StreamAggregateConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());