diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f437b34
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,240 @@
+
+
+
+ 4.0.0
+
+ com.zdjizhi
+ log-olap-analysis-schema
+ 210908-security
+
+ log-olap-analysis-schema
+ http://www.example.com
+
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public
+
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+
+
+
+
+ fail
+
+
+
+
+
+ UTF-8
+ 1.13.1
+ 2.7.1
+ 1.0.0
+ 2.2.3
+ provided
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.2
+
+
+ package
+
+ shade
+
+
+
+
+ com.zdjizhi.topology.StreamAggregateTopology
+
+
+
+
+
+
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ package
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 2.3.2
+
+ 1.8
+ 1.8
+
+
+
+
+
+ properties
+
+ **/*.properties
+ **/*.xml
+
+ false
+
+
+
+ src\main\java
+
+ log4j.properties
+
+ false
+
+
+
+
+
+
+
+ com.zdjizhi
+ galaxy
+ 1.0.6
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_2.12
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+ org.apache.flink
+ flink-clients_2.12
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_2.12
+ ${flink.version}
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+ ${scope.type}
+
+
+
+ cglib
+ cglib-nodep
+ 3.2.4
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.3.2
+ compile
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.2
+
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+
+ io.prometheus
+ simpleclient_pushgateway
+ 0.9.0
+
+
+
+ cn.hutool
+ hutool-all
+ 5.5.2
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.21
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.21
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..2b9bfb1
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,29 @@
+#producer重试的次数设置
+retries=0
+
+#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
+linger.ms=5
+
+#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+request.timeout.ms=30000
+
+#producer都是按照batch进行发送的,批次大小,默认:16384
+batch.size=262144
+
+#Producer端用于缓存消息的缓冲区大小
+buffer.memory=67108864
+
+#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
+max.request.size=5242880
+
+#kafka SASL验证用户名
+kafka.user=admin
+
+#kafka SASL及SSL验证密码
+kafka.pin=galaxy2019
+
+#kafka source protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=SASL
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..91f018d
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,51 @@
+#--------------------------------鍦板潃閰嶇疆------------------------------#
+
+#绠$悊kafka鍦板潃
+input.kafka.servers=192.168.44.12:9094
+
+#绠$悊杈撳嚭kafka鍦板潃
+output.kafka.servers=192.168.44.12:9094
+
+#--------------------------------HTTP------------------------------#
+#kafka 璇佷功鍦板潃
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+
+#缃戝叧鐨剆chema浣嶇疆
+#schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_interim
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session
+
+#缃戝叧APP_ID 鑾峰彇鎺ュ彛
+app.id.http=http://192.168.44.67:9999/open-api/appDicList
+
+#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
+
+#kafka 鎺ユ敹鏁版嵁topic
+input.kafka.topic=test
+#input.kafka.topic=SESSION-RECORD
+#input.kafka.topic=INTERIM-SESSION-RECORD
+
+#琛ュ叏鏁版嵁 杈撳嚭 topic
+output.kafka.topic=test-result
+
+#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
+group.id=liveCharts-session-test-20210811-1
+
+#鐢熶骇鑰呭帇缂╂ā寮 none or snappy
+producer.kafka.compression.type=none
+
+#鐢熶骇鑰卆ck
+producer.ack=1
+
+#--------------------------------topology閰嶇疆------------------------------#
+
+#consumer 骞惰搴
+consumer.parallelism=1
+
+#map鍑芥暟骞惰搴
+parse.parallelism=1
+
+#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨
+app.tick.tuple.freq.secs=0
+
+#鑱氬悎绐楀彛鏃堕棿
+count.window.time=15
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
new file mode 100644
index 0000000..3647c5a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
@@ -0,0 +1,56 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.StreamAggregateConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class StreamAggregateConfig {
+
+ public static final String FORMAT_SPLITTER = ",";
+ public static final String PROTOCOL_SPLITTER = "\\.";
+
+ /**
+ * System
+ */
+ public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism");
+ public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
+ public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time");
+ public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library");
+
+ /**
+ * kafka source
+ */
+ public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
+ public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack");
+ public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin");
+ public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = StreamAggregateConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = StreamAggregateConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = StreamAggregateConfigurations.getIntProperty(1, "max.request.size");
+
+
+ /**
+ * kafka闄愭祦閰嶇疆-20201117
+ */
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+
+ /**
+ * http
+ */
+ public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http");
+ public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http");
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
new file mode 100644
index 0000000..301d862
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
@@ -0,0 +1,56 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.functions.*;
+import com.zdjizhi.utils.kafka.Consumer;
+import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class StreamAggregateTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ try {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment.enableCheckpointing(5000);
+
+ DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer())
+ .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM);
+
+ SingleOutputStreamOperator> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap")
+ .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+
+ WindowedStream, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME)));
+
+ SingleOutputStreamOperator metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow")
+ .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+
+ metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM)
+ .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+
+ environment.execute(args[0]);
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java
new file mode 100644
index 0000000..ad251a2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class AnalysisException extends RuntimeException {
+
+ public AnalysisException() {
+ }
+
+ public AnalysisException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java
new file mode 100644
index 0000000..5f22a6b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java
@@ -0,0 +1,105 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.MetricFunctions;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2113:55
+ */
+public class CountWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> {
+ private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
+
+ private static HashMap metricsMap = JsonParseUtil.getMetricsMap();
+ private static HashMap actionMap = JsonParseUtil.getActionMap();
+ private HashMap> cacheMap = new HashMap<>(32);
+ private static String resultTimeKey = JsonParseUtil.getTimeKey();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void process(String key, Context context, Iterable> input, Collector output) {
+ try {
+ for (Tuple3 tuple : input) {
+ String label = tuple.f0;
+ //action涓煇涓崗璁殑鎵鏈塮unction,濡傛灉娌℃湁灏遍粯璁
+ String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
+ String dimensions = tuple.f1;
+ String message = tuple.f2;
+ if (StringUtil.isNotBlank(message)){
+ Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class);
+ Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
+
+ Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
+ for (String name : metricNames) {
+ String[] metrics = metricsMap.get(name);
+ String function = metrics[0];
+ String fieldName = metrics[1];
+ functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, fieldName));
+
+ }
+ cacheMap.put(dimensions, cacheMessage);
+ }
+ }
+
+ if (!cacheMap.isEmpty()) {
+ Long endTime = context.window().getEnd() / 1000;
+
+ for (String countKey : cacheMap.keySet()) {
+ Map resultMap = cacheMap.get(countKey);
+ JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
+ output.collect(JsonMapper.toJsonString(resultMap));
+ }
+// cacheMap.clear();
+ }
+
+ } catch (RuntimeException e) {
+ logger.error("windows count error,message:" + e);
+ e.printStackTrace();
+ } finally {
+ cacheMap.clear();
+ }
+ }
+
+ /**
+ * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎
+ *
+ * @param function 鍑芥暟鍚嶇О
+ * @param cacheMessage 缁撴灉闆
+ * @param nameValue 褰撳墠鍊
+ * @param fieldNameValue 鏂板姞鍊
+ */
+ private static void functionSet(String function, Map cacheMessage, String resultName, Object nameValue, Object fieldNameValue) {
+ switch (function) {
+ case "sum":
+ cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue));
+ break;
+ case "count":
+ cacheMessage.put(resultName, MetricFunctions.count(nameValue));
+ break;
+ case "unique_sip_num":
+ //TODO
+ break;
+ case "unique_cip_num":
+ //TODO
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..de507ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterNullFunction implements FilterFunction {
+ @Override
+ public boolean filter(String message) {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
new file mode 100644
index 0000000..0b00b3c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.utils.functions;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2112:13
+ */
+public class KeyByFunction implements KeySelector, String> {
+
+ @Override
+ public String getKey(Tuple3 value) throws Exception {
+ //浠ap鎷兼帴鐨刱ey鍒嗙粍
+ return value.f1;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
new file mode 100644
index 0000000..41a6109
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
@@ -0,0 +1,122 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.ParseFunctions;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapParseFunction implements MapFunction> {
+ private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
+
+ private static ArrayList jobList = JsonParseUtil.getTransformsList();
+
+ private static HashMap dimensionsMap = JsonParseUtil.getDimensionsMap();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Tuple3 map(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map object = (Map) JsonMapper.fromJsonString(message, Map.class);
+ Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
+ if (ParseFunctions.filterLogs(object)) {
+ for (String[] strings : jobList) {
+ //鍑芥暟鍚嶇О
+ String function = strings[0];
+ //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey
+ String resultKeyName = strings[1];
+ //鍘熷鏃ュ織瀛楁key
+ String logsKeyName = strings[2];
+ //鍘熷鏃ュ織瀛楁瀵瑰簲鐨勫
+ Object logsKeyValue = JsonParseUtil.getValue(object, strings[2]);
+ //棰濆鐨勫弬鏁扮殑鍊
+ String parameters = strings[3];
+
+ switch (function) {
+ case "dismantling":
+ if (StringUtil.isNotBlank(parameters)) {
+ if (logsKeyValue != null) {
+ JsonParseUtil.setValue(message, logsKeyName, dismantlingUtils(parameters, logsKeyValue));
+ }
+ }
+ break;
+ case "combination":
+ if (StringUtil.isNotBlank(parameters)) {
+ if (logsKeyValue != null) {
+ combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName);
+ }
+ }
+ break;
+ case "hierarchy":
+// collector.emit(new Values(JsonParseUtil.getValue(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)));
+ return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
+ default:
+ break;
+ }
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("Map Parse error,message:" + e);
+ return new Tuple3<>("", "", "");
+ }
+ return new Tuple3<>("", "", "");
+ }
+
+ /**
+ * alignment ID鏇挎崲鎿嶄綔
+ * 鏍规嵁缂撳瓨涓殑AppId瀵瑰簲淇℃伅锛岃幏鍙栧綋鍓岮ppId瀵瑰簲鐨勫叿浣撳悕绉般
+ *
+ * @param parameters 鍙傛暟闆
+ * @param fieldName 鍘熷鏃ュ織鍒楀悕
+ */
+ private static String dismantlingUtils(String parameters, Object fieldName) {
+ String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ int digits = Integer.parseInt(alignmentPars[0]);
+ return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits];
+ }
+
+ /**
+ * combination 鎷兼帴鎿嶄綔
+ * 鑾峰彇鏂规硶鍑芥暟涓 parameters 瀛楁锛岀粨鏋 "parameters": "abc,/" ;abc涓鸿鎷兼帴瀛楁 /涓烘嫾鎺ョ殑鍒嗛殧绗
+ *
+ * @param parameters 鍙傛暟闆
+ * @param message 鍘熷鏃ュ織
+ * @param fieldName 鍘熷鏃ュ織鍒楀悕
+ */
+ private static void combinationUtils(Map dimensions, Map message, String parameters, String resultKeyName, String fieldName) {
+ String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ Object combinationField = JsonParseUtil.getValue(message, combinationPars[0]);
+ if (combinationField != null) {
+ String separator = combinationPars[1];
+ Object fieldNameValue = JsonParseUtil.getValue(message, fieldName);
+ if (fieldNameValue != null) {
+ String combinationValue = fieldNameValue + separator + combinationField;
+ dimensions.put(resultKeyName, combinationValue);
+ JsonParseUtil.setValue(message, fieldName, combinationValue);
+ } else {
+ dimensions.put(resultKeyName, combinationField);
+ JsonParseUtil.setValue(message, fieldName, combinationField);
+
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java
new file mode 100644
index 0000000..4ba139f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2117:32
+ */
+public class MyTimeAssigner implements SerializableTimestampAssigner {
+ @Override
+ public long extractTimestamp(String element, long recordTimestamp) {
+ Map object = (Map) JsonMapper.fromJsonString(element, Map.class);
+
+ return JsonParseUtil.getLong(object,"common_end_time");
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
new file mode 100644
index 0000000..d458984
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2114:52
+ */
+public class ResultFlatMapFunction implements FlatMapFunction {
+ private static String[] jobList = JsonParseUtil.getHierarchy();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void flatMap(String value, Collector out) throws Exception {
+ StringBuffer stringBuffer = new StringBuffer();
+ String name = jobList[0];
+ Map jsonObject = (Map) JsonMapper.fromJsonString(value, Map.class);
+ String protocol = JsonParseUtil.getString(jsonObject, name);
+ if (StringUtil.isNotBlank(protocol)) {
+ String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
+ for (String proto : protocolIds) {
+ if (StringUtil.isBlank(stringBuffer.toString())) {
+ stringBuffer.append(proto);
+ jsonObject.put(name, stringBuffer.toString());
+ out.collect(JsonMapper.toJsonString(jsonObject));
+ } else {
+ stringBuffer.append(jobList[1]).append(proto);
+ jsonObject.put(name, stringBuffer.toString());
+ out.collect(JsonMapper.toJsonString(jsonObject));
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
new file mode 100644
index 0000000..5417236
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
@@ -0,0 +1,37 @@
+package com.zdjizhi.utils.general;
+
+
+import com.zdjizhi.utils.json.JsonTypeUtils;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.general
+ * @Description:
+ * @date 2021/7/2015:31
+ */
+public class MetricFunctions {
+ /**
+ * Long绫诲瀷鐨勬暟鎹眰鍜
+ *
+ * @param value1 绗竴涓
+ * @param value2 绗簩涓
+ * @return value1 + value2
+ */
+ public static Long longSum(Object value1, Object value2) {
+ Long res1 = JsonTypeUtils.checkLongValue(value1);
+ Long res2 = JsonTypeUtils.checkLongValue(value2);
+
+ return res1 + res2;
+ }
+
+ /**
+ * 璁$畻Count
+ *
+ * @param count 褰撳墠count鍊
+ * @return count+1
+ */
+ public static Long count(Object count) {
+
+ return JsonTypeUtils.checkLongValue(count) + 1L;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
new file mode 100644
index 0000000..5ab46e6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
@@ -0,0 +1,97 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassNameAggregateUtils
+ * @Author lixkvip@126.com
+ * @Date2020/6/23 14:04
+ * @Version V1.0
+ **/
+public class ParseFunctions {
+ /**
+ * 鑾峰彇filters鏉′欢map
+ */
+ private static HashMap filtersMap = JsonParseUtil.getFiltersMap();
+
+
+ /**
+ * 瑙f瀽 dimensions 瀛楁闆
+ *
+ * @param dimensions 缁村害闆
+ * @param message 鍘熷鏃ュ織
+ * @return 缁撴灉缁村害闆
+ */
+ public static Map transDimensions(Map dimensions, Map message) {
+ HashMap dimensionsObj = new HashMap<>(16);
+
+ for (String dimension : dimensions.keySet()) {
+ dimensionsObj.put(dimension, JsonParseUtil.getValue(message, dimensions.get(dimension)));
+ }
+
+ return dimensionsObj;
+ }
+
+ /**
+ * 鏋勫缓filters杩囨护鍑芥暟锛屾牴鎹甋chema鎸囧畾鐨勫嚱鏁板鏃ュ織杩涜杩囨护
+ *
+ * @param object 鍘熷鏃ュ織
+ * @return true or false
+ */
+ public static boolean filterLogs(Map object) {
+ boolean available = false;
+
+ for (String key : filtersMap.keySet()) {
+ switch (key) {
+ case "notempty":
+ Object value = JsonParseUtil.getValue(object, filtersMap.get(key));
+ if (value != null && StringUtil.isNotBlank(value.toString())) {
+ available = true;
+ }
+ break;
+ default:
+ }
+ }
+ return available;
+ }
+
+// /**
+// * 鏇存柊缂撳瓨涓殑瀵瑰簲鍏崇郴map
+// *
+// * @param hashMap 褰撳墠缂撳瓨瀵瑰簲鍏崇郴map
+// */
+// public static void updateAppRelation(HashMap hashMap) {
+// try {
+// Long begin = System.currentTimeMillis();
+// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
+// if (StringUtil.isNotBlank(schema)) {
+// String data = JSONObject.parseObject(schema).getString("data");
+// JSONArray objects = JSONArray.parseArray(data);
+// for (Object object : objects) {
+// JSONArray jsonArray = JSONArray.parseArray(object.toString());
+// int key = jsonArray.getInteger(0);
+// String value = jsonArray.getString(1);
+// if (hashMap.containsKey(key)) {
+// if (!value.equals(hashMap.get(key))) {
+// hashMap.put(key, value);
+// }
+// } else {
+// hashMap.put(key, value);
+// }
+// }
+// logger.warn("鏇存柊缂撳瓨瀵瑰簲鍏崇郴鐢ㄦ椂:" + (begin - System.currentTimeMillis()));
+// logger.warn("鏇存柊缂撳瓨涓殑瀵瑰簲鐨凙PP鍏崇郴,鎷夊彇鎺ュ彛鏁版嵁闀垮害:[" + objects.size());
+// }
+// } catch (RuntimeException e) {
+// logger.error("鏇存柊缂撳瓨APP-ID澶辫触,寮傚父锛" + e);
+// }
+// }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..1adb1d1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.utils.http;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 鑾峰彇缃戝叧schema鐨勫伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 璇锋眰缃戝叧鑾峰彇schema
+ *
+ * @param http 缃戝叧url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
+ }
+
+ return entityStringBuilder.toString();
+ }
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
new file mode 100644
index 0000000..4a6a01c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -0,0 +1,357 @@
+package com.zdjizhi.utils.json;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.http.HttpClientUtil;
+import net.sf.cglib.beans.BeanGenerator;
+import net.sf.cglib.beans.BeanMap;
+
+import java.util.*;
+
+/**
+ * 浣跨敤FastJson瑙f瀽json鐨勫伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class JsonParseUtil {
+
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 妯″紡鍖归厤锛岀粰瀹氫竴涓被鍨嬪瓧绗︿覆杩斿洖涓涓被绫诲瀷
+ *
+ * @param type 绫诲瀷
+ * @return 绫荤被鍨
+ */
+
+ public static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+
+ /**
+ * 鑾峰彇灞炴у肩殑鏂规硶
+ *
+ * @param obj 瀵硅薄
+ * @param property key
+ * @return 灞炴х殑鍊
+ */
+ public static Object getValue(Object obj, String property) {
+
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ if (beanMap.containsKey(property)) {
+ return beanMap.get(property);
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 鑾峰彇灞炴у肩殑鏂规硶
+ *
+ * @param jsonMap 鍘熷鏃ュ織
+ * @param property key
+ * @return 灞炴х殑鍊
+ */
+ public static Object getValue(Map jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e);
+ return null;
+ }
+ }
+
+ /**
+ * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @return Long value
+ */
+ public static Long getLong(Map jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ public static String getString(Map jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * 鏇存柊灞炴у肩殑鏂规硶
+ *
+ * @param jsonMap 鍘熷鏃ュ織json map
+ * @param property 鏇存柊鐨刱ey
+ * @param value 鏇存柊鐨勫
+ */
+ public static void setValue(Map jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e);
+ }
+ }
+
+ /**
+ * 鏇存柊灞炴у肩殑鏂规硶
+ *
+ * @param obj 瀵硅薄
+ * @param property 鏇存柊鐨刱ey
+ * @param value 鏇存柊鐨勫
+ */
+ public static void setValue(Object obj, String property, Object value) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ beanMap.put(property, value);
+ } catch (ClassCastException e) {
+ logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e);
+ }
+ }
+
+ /**
+ * 鏍规嵁鍙嶅皠鐢熸垚瀵硅薄鐨勬柟娉
+ *
+ * @param properties 鍙嶅皠绫荤敤鐨刴ap
+ * @return 鐢熸垚鐨凮bject绫诲瀷鐨勫璞
+ */
+ public static Object generateObject(Map properties) {
+ BeanGenerator generator = new BeanGenerator();
+ Set keySet = properties.keySet();
+ for (Object aKeySet : keySet) {
+ String key = (String) aKeySet;
+ generator.addProperty(key, (Class) properties.get(key));
+ }
+ return generator.create();
+ }
+
+ /**
+ * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞
+ *
+ * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎
+ */
+ public static HashMap getActionMap() {
+ HashMap map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List