From 99bf45cdbc2186d37b9a3876bb3b4ab2b2620c3f Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 27 Sep 2021 11:13:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A42109=E7=89=88livecharts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 240 ++++++++++++ properties/default_config.properties | 29 ++ properties/service_flow_config.properties | 51 +++ .../zdjizhi/common/StreamAggregateConfig.java | 56 +++ .../topology/StreamAggregateTopology.java | 56 +++ .../utils/exception/AnalysisException.java | 18 + .../utils/functions/CountWindowFunction.java | 105 ++++++ .../utils/functions/FilterNullFunction.java | 17 + .../utils/functions/KeyByFunction.java | 19 + .../utils/functions/MapParseFunction.java | 122 ++++++ .../utils/functions/MyTimeAssigner.java | 22 ++ .../functions/ResultFlatMapFunction.java | 43 +++ .../utils/general/MetricFunctions.java | 37 ++ .../zdjizhi/utils/general/ParseFunctions.java | 97 +++++ .../zdjizhi/utils/http/HttpClientUtil.java | 77 ++++ .../com/zdjizhi/utils/json/JsonParseUtil.java | 357 ++++++++++++++++++ .../com/zdjizhi/utils/json/JsonTypeUtils.java | 142 +++++++ .../com/zdjizhi/utils/json/TypeUtils.java | 180 +++++++++ .../com/zdjizhi/utils/kafka/CertUtils.java | 36 ++ .../com/zdjizhi/utils/kafka/Consumer.java | 42 +++ .../com/zdjizhi/utils/kafka/Producer.java | 53 +++ .../system/StreamAggregateConfigurations.java | 70 ++++ src/main/java/log4j.properties | 25 ++ src/test/java/com/zdjizhi/FunctionsTest.java | 33 ++ 24 files changed, 1927 insertions(+) create mode 100644 pom.xml create mode 100644 properties/default_config.properties create mode 100644 properties/service_flow_config.properties create mode 100644 src/main/java/com/zdjizhi/common/StreamAggregateConfig.java create mode 100644 src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java create mode 100644 src/main/java/com/zdjizhi/utils/exception/AnalysisException.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/general/MetricFunctions.java create mode 100644 src/main/java/com/zdjizhi/utils/general/ParseFunctions.java create mode 100644 src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java create mode 100644 src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java create mode 100644 src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/json/TypeUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/CertUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/Consumer.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/Producer.java create mode 100644 src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java create mode 100644 src/main/java/log4j.properties create mode 100644 src/test/java/com/zdjizhi/FunctionsTest.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f437b34 --- /dev/null +++ b/pom.xml @@ -0,0 +1,240 @@ + + + + 4.0.0 + + com.zdjizhi + log-olap-analysis-schema + 210908-security + + log-olap-analysis-schema + http://www.example.com + + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + + + + + fail + + + + + + UTF-8 + 1.13.1 + 2.7.1 + 1.0.0 + 2.2.3 + provided + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + com.zdjizhi.topology.StreamAggregateTopology + + + + + + + + + io.github.zlika + reproducible-build-maven-plugin + 0.2 + + + + strip-jar + + package + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + **/*.xml + + false + + + + src\main\java + + log4j.properties + + false + + + + + + + + com.zdjizhi + galaxy + 1.0.6 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + + + + + + + + + + org.apache.flink + flink-core + ${flink.version} + ${scope.type} + + + + + + org.apache.flink + flink-streaming-java_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-clients_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-connector-kafka_2.12 + ${flink.version} + + + + + + org.apache.flink + flink-java + ${flink.version} + ${scope.type} + + + + cglib + cglib-nodep + 3.2.4 + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + + + + org.apache.httpcomponents + httpclient + 4.5.2 + + + + com.jayway.jsonpath + json-path + 2.4.0 + + + + io.prometheus + simpleclient_pushgateway + 0.9.0 + + + + cn.hutool + hutool-all + 5.5.2 + + + + org.slf4j + slf4j-api + 1.7.21 + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + + + junit + junit + 4.12 + test + + + + + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..2b9bfb1 --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,29 @@ +#producer重试的次数设置 +retries=0 + +#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +linger.ms=5 + +#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +request.timeout.ms=30000 + +#producer都是按照batch进行发送的,批次大小,默认:16384 +batch.size=262144 + +#Producer端用于缓存消息的缓冲区大小 +buffer.memory=67108864 + +#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 +max.request.size=5242880 + +#kafka SASL验证用户名 +kafka.user=admin + +#kafka SASL及SSL验证密码 +kafka.pin=galaxy2019 + +#kafka source protocol; SSL or SASL +kafka.source.protocol=SASL + +#kafka sink protocol; SSL or SASL +kafka.sink.protocol=SASL \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..91f018d --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,51 @@ +#--------------------------------鍦板潃閰嶇疆------------------------------# + +#绠$悊kafka鍦板潃 +input.kafka.servers=192.168.44.12:9094 + +#绠$悊杈撳嚭kafka鍦板潃 +output.kafka.servers=192.168.44.12:9094 + +#--------------------------------HTTP------------------------------# +#kafka 璇佷功鍦板潃 +tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ + +#缃戝叧鐨剆chema浣嶇疆 +#schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_interim +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session + +#缃戝叧APP_ID 鑾峰彇鎺ュ彛 +app.id.http=http://192.168.44.67:9999/open-api/appDicList + +#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# + +#kafka 鎺ユ敹鏁版嵁topic +input.kafka.topic=test +#input.kafka.topic=SESSION-RECORD +#input.kafka.topic=INTERIM-SESSION-RECORD + +#琛ュ叏鏁版嵁 杈撳嚭 topic +output.kafka.topic=test-result + +#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 +group.id=liveCharts-session-test-20210811-1 + +#鐢熶骇鑰呭帇缂╂ā寮 none or snappy +producer.kafka.compression.type=none + +#鐢熶骇鑰卆ck +producer.ack=1 + +#--------------------------------topology閰嶇疆------------------------------# + +#consumer 骞惰搴 +consumer.parallelism=1 + +#map鍑芥暟骞惰搴 +parse.parallelism=1 + +#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨 +app.tick.tuple.freq.secs=0 + +#鑱氬悎绐楀彛鏃堕棿 +count.window.time=15 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java new file mode 100644 index 0000000..3647c5a --- /dev/null +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -0,0 +1,56 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.StreamAggregateConfigurations; + +/** + * @author Administrator + */ +public class StreamAggregateConfig { + + public static final String FORMAT_SPLITTER = ","; + public static final String PROTOCOL_SPLITTER = "\\."; + + /** + * System + */ + public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism"); + public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); + public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time"); + public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library"); + + /** + * kafka source + */ + public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id"); + public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack"); + public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol"); + public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol"); + public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user"); + public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"); + public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = StreamAggregateConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = StreamAggregateConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = StreamAggregateConfigurations.getIntProperty(1, "max.request.size"); + + + /** + * kafka闄愭祦閰嶇疆-20201117 + */ + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + + /** + * http + */ + public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http"); + public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http"); + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java new file mode 100644 index 0000000..301d862 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -0,0 +1,56 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.functions.*; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.WindowedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class StreamAggregateTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + try { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.enableCheckpointing(5000); + + DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer()) + .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM); + + SingleOutputStreamOperator> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap") + .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + + WindowedStream, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME))); + + SingleOutputStreamOperator metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow") + .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + + metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM) + .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + + environment.execute(args[0]); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java new file mode 100644 index 0000000..ad251a2 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class AnalysisException extends RuntimeException { + + public AnalysisException() { + } + + public AnalysisException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java new file mode 100644 index 0000000..5f22a6b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java @@ -0,0 +1,105 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.MetricFunctions; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2113:55 + */ +public class CountWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { + private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class); + + private static HashMap metricsMap = JsonParseUtil.getMetricsMap(); + private static HashMap actionMap = JsonParseUtil.getActionMap(); + private HashMap> cacheMap = new HashMap<>(32); + private static String resultTimeKey = JsonParseUtil.getTimeKey(); + + @Override + @SuppressWarnings("unchecked") + public void process(String key, Context context, Iterable> input, Collector output) { + try { + for (Tuple3 tuple : input) { + String label = tuple.f0; + //action涓煇涓崗璁殑鎵鏈塮unction,濡傛灉娌℃湁灏遍粯璁 + String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default")); + String dimensions = tuple.f1; + String message = tuple.f2; + if (StringUtil.isNotBlank(message)){ + Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class); + Map object = (Map) JsonMapper.fromJsonString(message, Map.class); + + Map cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); + for (String name : metricNames) { + String[] metrics = metricsMap.get(name); + String function = metrics[0]; + String fieldName = metrics[1]; + functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, fieldName)); + + } + cacheMap.put(dimensions, cacheMessage); + } + } + + if (!cacheMap.isEmpty()) { + Long endTime = context.window().getEnd() / 1000; + + for (String countKey : cacheMap.keySet()) { + Map resultMap = cacheMap.get(countKey); + JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); + output.collect(JsonMapper.toJsonString(resultMap)); + } +// cacheMap.clear(); + } + + } catch (RuntimeException e) { + logger.error("windows count error,message:" + e); + e.printStackTrace(); + } finally { + cacheMap.clear(); + } + } + + /** + * 鏍规嵁schema鎻忚堪瀵瑰簲瀛楁杩涜鎿嶄綔鐨 鍑芥暟闆嗗悎 + * + * @param function 鍑芥暟鍚嶇О + * @param cacheMessage 缁撴灉闆 + * @param nameValue 褰撳墠鍊 + * @param fieldNameValue 鏂板姞鍊 + */ + private static void functionSet(String function, Map cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + switch (function) { + case "sum": + cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + break; + case "count": + cacheMessage.put(resultName, MetricFunctions.count(nameValue)); + break; + case "unique_sip_num": + //TODO + break; + case "unique_cip_num": + //TODO + break; + default: + break; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java new file mode 100644 index 0000000..0b00b3c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java @@ -0,0 +1,19 @@ +package com.zdjizhi.utils.functions; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2112:13 + */ +public class KeyByFunction implements KeySelector, String> { + + @Override + public String getKey(Tuple3 value) throws Exception { + //浠ap鎷兼帴鐨刱ey鍒嗙粍 + return value.f1; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java new file mode 100644 index 0000000..41a6109 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java @@ -0,0 +1,122 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.ParseFunctions; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapParseFunction implements MapFunction> { + private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class); + + private static ArrayList jobList = JsonParseUtil.getTransformsList(); + + private static HashMap dimensionsMap = JsonParseUtil.getDimensionsMap(); + + @Override + @SuppressWarnings("unchecked") + public Tuple3 map(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Map object = (Map) JsonMapper.fromJsonString(message, Map.class); + Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); + if (ParseFunctions.filterLogs(object)) { + for (String[] strings : jobList) { + //鍑芥暟鍚嶇О + String function = strings[0]; + //闇瑕佽ˉ鍏ㄧ殑瀛楁鐨刱ey + String resultKeyName = strings[1]; + //鍘熷鏃ュ織瀛楁key + String logsKeyName = strings[2]; + //鍘熷鏃ュ織瀛楁瀵瑰簲鐨勫 + Object logsKeyValue = JsonParseUtil.getValue(object, strings[2]); + //棰濆鐨勫弬鏁扮殑鍊 + String parameters = strings[3]; + + switch (function) { + case "dismantling": + if (StringUtil.isNotBlank(parameters)) { + if (logsKeyValue != null) { + JsonParseUtil.setValue(message, logsKeyName, dismantlingUtils(parameters, logsKeyValue)); + } + } + break; + case "combination": + if (StringUtil.isNotBlank(parameters)) { + if (logsKeyValue != null) { + combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName); + } + } + break; + case "hierarchy": +// collector.emit(new Values(JsonParseUtil.getValue(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object))); + return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); + default: + break; + } + } + } + } + } catch (RuntimeException e) { + logger.error("Map Parse error,message:" + e); + return new Tuple3<>("", "", ""); + } + return new Tuple3<>("", "", ""); + } + + /** + * alignment ID鏇挎崲鎿嶄綔 + * 鏍规嵁缂撳瓨涓殑AppId瀵瑰簲淇℃伅锛岃幏鍙栧綋鍓岮ppId瀵瑰簲鐨勫叿浣撳悕绉般 + * + * @param parameters 鍙傛暟闆 + * @param fieldName 鍘熷鏃ュ織鍒楀悕 + */ + private static String dismantlingUtils(String parameters, Object fieldName) { + String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + int digits = Integer.parseInt(alignmentPars[0]); + return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; + } + + /** + * combination 鎷兼帴鎿嶄綔 + * 鑾峰彇鏂规硶鍑芥暟涓 parameters 瀛楁锛岀粨鏋 "parameters": "abc,/" ;abc涓鸿鎷兼帴瀛楁 /涓烘嫾鎺ョ殑鍒嗛殧绗 + * + * @param parameters 鍙傛暟闆 + * @param message 鍘熷鏃ュ織 + * @param fieldName 鍘熷鏃ュ織鍒楀悕 + */ + private static void combinationUtils(Map dimensions, Map message, String parameters, String resultKeyName, String fieldName) { + String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + Object combinationField = JsonParseUtil.getValue(message, combinationPars[0]); + if (combinationField != null) { + String separator = combinationPars[1]; + Object fieldNameValue = JsonParseUtil.getValue(message, fieldName); + if (fieldNameValue != null) { + String combinationValue = fieldNameValue + separator + combinationField; + dimensions.put(resultKeyName, combinationValue); + JsonParseUtil.setValue(message, fieldName, combinationValue); + } else { + dimensions.put(resultKeyName, combinationField); + JsonParseUtil.setValue(message, fieldName, combinationField); + + } + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java new file mode 100644 index 0000000..4ba139f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java @@ -0,0 +1,22 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2117:32 + */ +public class MyTimeAssigner implements SerializableTimestampAssigner { + @Override + public long extractTimestamp(String element, long recordTimestamp) { + Map object = (Map) JsonMapper.fromJsonString(element, Map.class); + + return JsonParseUtil.getLong(object,"common_end_time"); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java new file mode 100644 index 0000000..d458984 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java @@ -0,0 +1,43 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2114:52 + */ +public class ResultFlatMapFunction implements FlatMapFunction { + private static String[] jobList = JsonParseUtil.getHierarchy(); + + @Override + @SuppressWarnings("unchecked") + public void flatMap(String value, Collector out) throws Exception { + StringBuffer stringBuffer = new StringBuffer(); + String name = jobList[0]; + Map jsonObject = (Map) JsonMapper.fromJsonString(value, Map.class); + String protocol = JsonParseUtil.getString(jsonObject, name); + if (StringUtil.isNotBlank(protocol)) { + String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); + for (String proto : protocolIds) { + if (StringUtil.isBlank(stringBuffer.toString())) { + stringBuffer.append(proto); + jsonObject.put(name, stringBuffer.toString()); + out.collect(JsonMapper.toJsonString(jsonObject)); + } else { + stringBuffer.append(jobList[1]).append(proto); + jsonObject.put(name, stringBuffer.toString()); + out.collect(JsonMapper.toJsonString(jsonObject)); + } + } + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java new file mode 100644 index 0000000..5417236 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java @@ -0,0 +1,37 @@ +package com.zdjizhi.utils.general; + + +import com.zdjizhi.utils.json.JsonTypeUtils; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.general + * @Description: + * @date 2021/7/2015:31 + */ +public class MetricFunctions { + /** + * Long绫诲瀷鐨勬暟鎹眰鍜 + * + * @param value1 绗竴涓 + * @param value2 绗簩涓 + * @return value1 + value2 + */ + public static Long longSum(Object value1, Object value2) { + Long res1 = JsonTypeUtils.checkLongValue(value1); + Long res2 = JsonTypeUtils.checkLongValue(value2); + + return res1 + res2; + } + + /** + * 璁$畻Count + * + * @param count 褰撳墠count鍊 + * @return count+1 + */ + public static Long count(Object count) { + + return JsonTypeUtils.checkLongValue(count) + 1L; + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java new file mode 100644 index 0000000..5ab46e6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -0,0 +1,97 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.HashMap; +import java.util.Map; + +/** + * @ClassNameAggregateUtils + * @Author lixkvip@126.com + * @Date2020/6/23 14:04 + * @Version V1.0 + **/ +public class ParseFunctions { + /** + * 鑾峰彇filters鏉′欢map + */ + private static HashMap filtersMap = JsonParseUtil.getFiltersMap(); + + + /** + * 瑙f瀽 dimensions 瀛楁闆 + * + * @param dimensions 缁村害闆 + * @param message 鍘熷鏃ュ織 + * @return 缁撴灉缁村害闆 + */ + public static Map transDimensions(Map dimensions, Map message) { + HashMap dimensionsObj = new HashMap<>(16); + + for (String dimension : dimensions.keySet()) { + dimensionsObj.put(dimension, JsonParseUtil.getValue(message, dimensions.get(dimension))); + } + + return dimensionsObj; + } + + /** + * 鏋勫缓filters杩囨护鍑芥暟锛屾牴鎹甋chema鎸囧畾鐨勫嚱鏁板鏃ュ織杩涜杩囨护 + * + * @param object 鍘熷鏃ュ織 + * @return true or false + */ + public static boolean filterLogs(Map object) { + boolean available = false; + + for (String key : filtersMap.keySet()) { + switch (key) { + case "notempty": + Object value = JsonParseUtil.getValue(object, filtersMap.get(key)); + if (value != null && StringUtil.isNotBlank(value.toString())) { + available = true; + } + break; + default: + } + } + return available; + } + +// /** +// * 鏇存柊缂撳瓨涓殑瀵瑰簲鍏崇郴map +// * +// * @param hashMap 褰撳墠缂撳瓨瀵瑰簲鍏崇郴map +// */ +// public static void updateAppRelation(HashMap hashMap) { +// try { +// Long begin = System.currentTimeMillis(); +// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); +// if (StringUtil.isNotBlank(schema)) { +// String data = JSONObject.parseObject(schema).getString("data"); +// JSONArray objects = JSONArray.parseArray(data); +// for (Object object : objects) { +// JSONArray jsonArray = JSONArray.parseArray(object.toString()); +// int key = jsonArray.getInteger(0); +// String value = jsonArray.getString(1); +// if (hashMap.containsKey(key)) { +// if (!value.equals(hashMap.get(key))) { +// hashMap.put(key, value); +// } +// } else { +// hashMap.put(key, value); +// } +// } +// logger.warn("鏇存柊缂撳瓨瀵瑰簲鍏崇郴鐢ㄦ椂:" + (begin - System.currentTimeMillis())); +// logger.warn("鏇存柊缂撳瓨涓殑瀵瑰簲鐨凙PP鍏崇郴,鎷夊彇鎺ュ彛鏁版嵁闀垮害:[" + objects.size()); +// } +// } catch (RuntimeException e) { +// logger.error("鏇存柊缂撳瓨APP-ID澶辫触,寮傚父锛" + e); +// } +// } + +} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..1adb1d1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -0,0 +1,77 @@ +package com.zdjizhi.utils.http; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 鑾峰彇缃戝叧schema鐨勫伐鍏风被 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 璇锋眰缃戝叧鑾峰彇schema + * + * @param http 缃戝叧url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); + } + + return entityStringBuilder.toString(); + } + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); + } + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + return ""; + } +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..4a6a01c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -0,0 +1,357 @@ +package com.zdjizhi.utils.json; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.http.HttpClientUtil; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 浣跨敤FastJson瑙f瀽json鐨勫伐鍏风被 + * + * @author qidaijie + */ +public class JsonParseUtil { + + private static final Log logger = LogFactory.get(); + + /** + * 妯″紡鍖归厤锛岀粰瀹氫竴涓被鍨嬪瓧绗︿覆杩斿洖涓涓被绫诲瀷 + * + * @param type 绫诲瀷 + * @return 绫荤被鍨 + */ + + public static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + + /** + * 鑾峰彇灞炴у肩殑鏂规硶 + * + * @param obj 瀵硅薄 + * @param property key + * @return 灞炴х殑鍊 + */ + public static Object getValue(Object obj, String property) { + + try { + BeanMap beanMap = BeanMap.create(obj); + if (beanMap.containsKey(property)) { + return beanMap.get(property); + } else { + return null; + } + } catch (RuntimeException e) { + logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e); + return null; + } + } + + /** + * 鑾峰彇灞炴у肩殑鏂规硶 + * + * @param jsonMap 鍘熷鏃ュ織 + * @param property key + * @return 灞炴х殑鍊 + */ + public static Object getValue(Map jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e); + return null; + } + } + + /** + * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊 + * + * @return Long value + */ + public static Long getLong(Map jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + public static String getString(Map jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * 鏇存柊灞炴у肩殑鏂规硶 + * + * @param jsonMap 鍘熷鏃ュ織json map + * @param property 鏇存柊鐨刱ey + * @param value 鏇存柊鐨勫 + */ + public static void setValue(Map jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e); + } + } + + /** + * 鏇存柊灞炴у肩殑鏂规硶 + * + * @param obj 瀵硅薄 + * @param property 鏇存柊鐨刱ey + * @param value 鏇存柊鐨勫 + */ + public static void setValue(Object obj, String property, Object value) { + try { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } catch (ClassCastException e) { + logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e); + } + } + + /** + * 鏍规嵁鍙嶅皠鐢熸垚瀵硅薄鐨勬柟娉 + * + * @param properties 鍙嶅皠绫荤敤鐨刴ap + * @return 鐢熸垚鐨凮bject绫诲瀷鐨勫璞 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static HashMap getActionMap() { + HashMap map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List actions = parse.read("$.data.doc.action[*]"); + + for (Object action : actions) { + map.put(JsonPath.read(action, "$.label"), + JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER)); +// System.out.println(JsonPath.read(action, "$.label")+JsonPath.read(action, "$.metrics").toString()); + } + + return map; + } + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static HashMap getMetricsMap() { + HashMap map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List metrics = parse.read("$.data.doc.metrics[*]"); + + for (Object metric : metrics) { + map.put(JsonPath.read(metric, "$.name"), + new String[]{JsonPath.read(metric, "$.function"), JsonPath.read(metric, "$.fieldName")} + ); + } + return map; + } + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static String getTimeKey() { + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + + return JsonPath.read(schema, "$.data.doc.timestamp.name"); + } + + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static HashMap getResultLogMap() { + HashMap map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List dimensions = parse.read("$.data.doc.dimensions[*]"); + + for (Object dimension : dimensions) { + map.put(JsonPath.read(dimension, "$.name"), + JsonParseUtil.getClassName(JsonPath.read(dimension, "$.type"))); + } + + List metrics = parse.read("$.data.doc.metrics[*]"); + for (Object metric : metrics) { + map.put(JsonPath.read(metric, "$.name"), + JsonParseUtil.getClassName(JsonPath.read(metric, "$.type"))); + } + + return map; + } + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static HashMap getDimensionsMap() { + HashMap map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List dimensions = parse.read("$.data.doc.dimensions[*]"); + + for (Object dimension : dimensions) { + map.put(JsonPath.read(dimension, "$.name"), + JsonPath.read(dimension, "$.fieldName")); + } + + return map; + } + + + /** + * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞 + * + * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎 + */ + public static HashMap getFiltersMap() { + HashMap map = new HashMap<>(16); + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List filters = parse.read("$.data.doc.filters[*]"); + for (Object filter : filters) { + map.put(JsonPath.read(filter, "$.type"), JsonPath.read(filter, "$.fieldName")); + } + + return map; + } + + + /** + * 鏍规嵁http閾炬帴鑾峰彇schema锛岃В鏋愪箣鍚庤繑鍥炰竴涓换鍔″垪琛 (useList toList funcList paramlist) + * + * @return 浠诲姟鍒楄〃 + */ + public static ArrayList getTransformsList() { + ArrayList list = new ArrayList<>(); + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List transforms = parse.read("$.data.doc.transforms[*]"); + for (Object transform : transforms) { + String function = JsonPath.read(transform, "$.function").toString(); + String name = JsonPath.read(transform, "$.name").toString(); + String fieldName = JsonPath.read(transform, "$.fieldName").toString(); + String parameters = JsonPath.read(transform, "$.parameters").toString(); + list.add(new String[]{function, name, fieldName, parameters}); + } + + return list; + } + + /** + * 鏍规嵁http閾炬帴鑾峰彇schema锛岃В鏋愪箣鍚庤繑鍥炰竴涓换鍔″垪琛 (useList toList funcList paramlist) + * + * @return 浠诲姟鍒楄〃 + */ + public static String[] getHierarchy() { + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + List transforms = parse.read("$.data.doc.transforms[*]"); + for (Object transform : transforms) { + String function = JsonPath.read(transform, "$.function").toString(); + if ("hierarchy".equals(function)) { + String name = JsonPath.read(transform, "$.name").toString(); + String parameters = JsonPath.read(transform, "$.parameters").toString(); + return new String[]{name, parameters}; + } + } + return null; + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java new file mode 100644 index 0000000..034f76a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -0,0 +1,142 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.AnalysisException; + +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtils { + private static final Log logger = LogFactory.get(); + /** + * String 绫诲瀷妫楠岃浆鎹㈡柟娉 + * + * @param value json value + * @return String value + */ + public static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 绫诲瀷妫楠岃浆鎹㈡柟娉 + * + * @param value json value + * @return List value + */ + private static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new AnalysisException("can not cast to map, value : " + value); + } + + /** + * array 绫诲瀷妫楠岃浆鎹㈡柟娉 + * + * @param value json value + * @return List value + */ + private static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new AnalysisException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊 + * + * @param value json value + * @return Long value + */ + public static long checkLongValue(Object value) { + + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * Double 绫诲瀷鏍¢獙杞崲鏂规硶 + * + * @param value json value + * @return Double value + */ + private static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊 + * + * @param value json value + * @return int value + */ + private static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + + if (intVal == null) { + return 0; + } + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..01e8540 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,180 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.AnalysisException; + + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new AnalysisException("can not cast to int, value : " + value); + } + + /** + * Integer 绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + //姝ゅ垽鏂暟鍊艰秴鑼冨洿涓嶆姏鍑哄紓甯革紝浼氭埅鍙栨垚瀵瑰簲绫诲瀷鏁板 +// if (value instanceof Number) { +// return ((Number) value).intValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //灏 10,20 绫绘暟鎹浆鎹负10 + if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new AnalysisException("can not cast to int, value : " + value); + } + + /** + * Double绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Double) { + return (Double) value; + } + + //姝ゅ垽鏂暟鍊艰秴鑼冨洿涓嶆姏鍑哄紓甯革紝浼氭埅鍙栨垚瀵瑰簲绫诲瀷鏁板 +// if (value instanceof Number) { +// return ((Number) value).doubleValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //灏 10,20 绫绘暟鎹浆鎹负10 + if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new AnalysisException("can not cast to double, value : " + value); + } + + /** + * Long绫诲瀷鍒ゆ柇鏂规硶 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + +// 姝ゅ垽鏂暟鍊艰秴鑼冨洿涓嶆姏鍑哄紓甯革紝浼氭埅鍙栨垚瀵瑰簲绫诲瀷鏁板 + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //灏 10,20 绫绘暟鎹浆鎹负10 + if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new AnalysisException("can not cast to long, value : " + value); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..2608187 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,36 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + static void chooseCert(String type, Properties properties) { + switch (type) { + case "SSL": + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN); + break; + case "SASL": + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";"); + break; + default: + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..a24ab4e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,42 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", StreamAggregateConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", StreamAggregateConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", 3000); + properties.put("max.partition.fetch.bytes", 31457280); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SOURCE_PROTOCOL,properties); + + return properties; + } + + public static FlinkKafkaConsumer getKafkaConsumer() { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..65330b5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,53 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", StreamAggregateConfig.OUTPUT_KAFKA_SERVERS); + properties.put("acks", StreamAggregateConfig.PRODUCER_ACK); + properties.put("retries", StreamAggregateConfig.RETRIES); + properties.put("linger.ms", StreamAggregateConfig.LINGER_MS); + properties.put("request.timeout.ms", StreamAggregateConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", StreamAggregateConfig.BATCH_SIZE); + properties.put("buffer.memory", StreamAggregateConfig.BUFFER_MEMORY); + properties.put("max.request.size", StreamAggregateConfig.MAX_REQUEST_SIZE); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, StreamAggregateConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SINK_PROTOCOL, properties); + + return properties; + } + + + public static FlinkKafkaProducer getKafkaProducer() { + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( + StreamAggregateConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig(), Optional.empty()); + + //鍚敤姝ら夐」灏嗕娇鐢熶骇鑰呬粎璁板綍澶辫触鏃ュ織鑰屼笉鏄崟鑾峰拰閲嶆柊鎶涘嚭瀹冧滑 + kafkaProducer.setLogFailuresOnly(false); + + //鍐欏叆kafka鐨勬秷鎭惡甯︽椂闂存埑 +// kafkaProducer.setWriteTimestampToKafka(true); + + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java new file mode 100644 index 0000000..3ea18a5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class StreamAggregateConfigurations { + + private static Properties propKafka = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propKafka.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propKafka.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propKafka = null; + propService = null; + } + } +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 鎺у埗鍙版棩蹇楄缃 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 鏂囦欢鏃ュ織璁剧疆 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#璺緞璇风敤鐩稿璺緞锛屽仛濂界浉鍏虫祴璇曡緭鍑哄埌搴旂敤鐩笅 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 閰嶇疆锛宑om.nis.web.dao鏄痬ybatis鎺ュ彛鎵鍦ㄥ寘 +log4j.logger.com.nis.web.dao=debug +#bonecp鏁版嵁婧愰厤缃 +log4j.category.com.jolbox=debug,console + + diff --git a/src/test/java/com/zdjizhi/FunctionsTest.java b/src/test/java/com/zdjizhi/FunctionsTest.java new file mode 100644 index 0000000..6e3a20b --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionsTest.java @@ -0,0 +1,33 @@ +package com.zdjizhi; + +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/9/1714:22 + */ +public class FunctionsTest { + private static HashMap metricsMap = JsonParseUtil.getMetricsMap(); + + @Test + public void actionTest() { + HashMap actionMap = JsonParseUtil.getActionMap(); + String[] metricNames = actionMap.getOrDefault("", actionMap.get("Default")); + System.out.println(actionMap.toString()); + System.out.println(Arrays.toString(metricNames)); + + + + } + + + +}