diff --git a/pom.xml b/pom.xml index 3cae362..3b7fdc1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-olap-analysis-schema - 220316-encryption + 220323-nacos log-olap-analysis-schema http://www.example.com @@ -38,6 +38,8 @@ 2.7.1 1.0.0 2.2.3 + 1.2.0 + 1.0.8 provided @@ -116,7 +118,7 @@ com.zdjizhi galaxy - 1.0.7 + ${zdjz.tools.version} slf4j-log4j12 @@ -183,30 +185,32 @@ compile - - org.apache.httpcomponents - httpclient - 4.5.2 - - com.jayway.jsonpath json-path 2.4.0 - - io.prometheus - simpleclient_pushgateway - 0.9.0 - - cn.hutool hutool-all 5.5.2 + + + org.jasypt + jasypt + 1.9.3 + + + + + com.alibaba.nacos + nacos-client + ${nacos.version} + + org.slf4j slf4j-api @@ -226,13 +230,6 @@ test - - - org.jasypt - jasypt - 1.9.3 - - diff --git a/properties/default_config.properties b/properties/default_config.properties index 9c083ad..1637bb8 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -7,6 +7,7 @@ max.poll.records=3000 #kafka source poll bytes max.partition.fetch.bytes=31457280 + #====================Kafka KafkaProducer====================# #producer重试的次数设置 retries=0 @@ -27,19 +28,34 @@ buffer.memory=134217728 #这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 #10M max.request.size=10485760 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none + +#生产者ack +producer.ack=1 + #====================kafka default====================# #kafka SASL验证用户名-加密 kafka.user=nsyGpHKGFA4KW0zro9MDdw== #kafka SASL及SSL验证密码-加密 kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ + +#====================nacos default====================# +#nacos username +nacos.username=nacos + +#nacos password +nacos.pin=nacos + +#nacos group +nacos.group=Galaxy + #====================Topology Default====================# #两个输出之间的最大时间(单位milliseconds) buffer.timeout=100 #第一次随机分组random范围 -random.range.num=40 - -#app_id 更新时间,如填写0则不更新缓存 -app.tick.tuple.freq.secs=0 \ No newline at end of file +random.range.num=40 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index c933cba..9ab7852 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,57 +1,56 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.44.12:9094 +source.kafka.servers=192.168.40.223:9094,192.168.40.151:9094,192.168.40.152:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 +sink.kafka.servers=192.168.40.223:9094,192.168.40.151:9094,192.168.40.152:9094 + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.44.12:8848 + +#nacos namespace +nacos.schema.namespace=flink + +#nacos data id +nacos.data.id=liveChart_session.json #--------------------------------HTTP------------------------------# #kafka 证书地址 -tools.library=D:\\workerspace\\dat\\ - -#网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session - -#网关APP_ID 获取接口 -app.id.http=http://192.168.44.67:9999/open-api/appDicList +tools.library=/home/tsg/olap/topology/dat/ #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=test +source.kafka.topic=SESSION-RECORD #补全数据 输出 topic -sink.kafka.topic=test-result +sink.kafka.topic=TRAFFIC-PROTOCOL-STAT #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=mytest-211119-1 - -#生产者压缩模式 none or snappy -producer.kafka.compression.type=none - -#生产者ack -producer.ack=1 +group.id=liveCharts-session-20211105-1 #--------------------------------topology配置------------------------------# #consumer 并行度 -source.parallelism=1 +source.parallelism=9 #map函数并行度 -parse.parallelism=2 +parse.parallelism=27 -#first count 函数并行度 -first.window.parallelism=2 +#第一次窗口计算并行度 +first.window.parallelism=27 -#second count 函数并行度 -second.window.parallelism=2 +#第二次窗口计算并行度 +second.window.parallelism=27 #producer 并行度 -sink.parallelism=1 +sink.parallelism=9 -#初次随机预聚合窗口时间 +##初次随机预聚合窗口时间 first.count.window.time=5 #二次聚合窗口时间 second.count.window.time=15 + diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index 14d79c6..0243f97 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -18,6 +18,16 @@ public class StreamAggregateConfig { public static final String FORMAT_SPLITTER = ","; public static final String PROTOCOL_SPLITTER = "\\."; + /** + * Nacos + */ + public static final String NACOS_SERVER = StreamAggregateConfigurations.getStringProperty(0, "nacos.server"); + public static final String NACOS_SCHEMA_NAMESPACE = StreamAggregateConfigurations.getStringProperty(0, "nacos.schema.namespace"); + public static final String NACOS_DATA_ID = StreamAggregateConfigurations.getStringProperty(0, "nacos.data.id"); + public static final String NACOS_PIN = StreamAggregateConfigurations.getStringProperty(1, "nacos.pin"); + public static final String NACOS_GROUP = StreamAggregateConfigurations.getStringProperty(1, "nacos.group"); + public static final String NACOS_USERNAME = StreamAggregateConfigurations.getStringProperty(1, "nacos.username"); + /** * System */ @@ -25,7 +35,6 @@ public class StreamAggregateConfig { public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism"); public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism"); public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism"); - public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs"); public static final Integer FIRST_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.count.window.time"); public static final Integer SECOND_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "second.count.window.time"); public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library"); @@ -45,7 +54,7 @@ public class StreamAggregateConfig { */ public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic"); - public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack"); + public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(1, "producer.ack"); public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries"); public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms"); public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms"); @@ -68,13 +77,7 @@ public class StreamAggregateConfig { /** * kafka限流配置-20201117 */ - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type"); - - /** - * http - */ - public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http"); - public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(1, "producer.kafka.compression.type"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index 07a1e87..3221164 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -30,13 +30,11 @@ public class StreamAggregateTopology { try { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); -// environment.enableCheckpointing(5000); - //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) - .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM); + .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC); SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") @@ -55,8 +53,11 @@ public class StreamAggregateTopology { SingleOutputStreamOperator secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); - secondCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM) - .addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); + SingleOutputStreamOperator resultFlatMap = secondCountWindow.flatMap(new ResultFlatMapFunction()) + .name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); + + resultFlatMap.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") + .setParallelism(StreamAggregateConfig.SINK_PARALLELISM).name(StreamAggregateConfig.SINK_KAFKA_TOPIC); environment.execute(args[0]); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java index 1aa32c7..e0e8a2e 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java @@ -24,21 +24,21 @@ import java.util.Map; public class FirstCountWindowFunction extends ProcessWindowFunction, Tuple2, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); - private static HashMap metricsMap = JsonParseUtil.getMetricsMap(); - private static HashMap actionMap = JsonParseUtil.getActionMap(); private HashMap> cacheMap = new HashMap<>(320); @Override @SuppressWarnings("unchecked") public void process(String key, Context context, Iterable> input, Collector> output) { try { + HashMap metricsMap = JsonParseUtil.getMetricFunctionsMap(); + HashMap actionMap = JsonParseUtil.getActionMap(); for (Tuple3 tuple : input) { String label = tuple.f0; String dimensions = tuple.f1; String message = tuple.f2; - String l7_Protocol = label.substring(0, label.indexOf("@")); + String l7Protocol = label.substring(0, label.indexOf("@")); //action中某个协议的所有function,如果没有就默认 - String[] metricNames = actionMap.getOrDefault(l7_Protocol, actionMap.get("Default")); + String[] metricNames = actionMap.getOrDefault(l7Protocol, actionMap.get("Default")); if (StringUtil.isNotBlank(message)) { Map dimensionsObj = (Map) JsonMapper.fromJsonString(dimensions, Map.class); Map object = (Map) JsonMapper.fromJsonString(message, Map.class); diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java index bf67eb6..c8c5aa8 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java @@ -29,17 +29,14 @@ import java.util.concurrent.ThreadLocalRandom; public class ParseMapFunction implements MapFunction> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); - private static ArrayList jobList = JsonParseUtil.getTransformsList(); - - private static HashMap dimensionsMap = JsonParseUtil.getDimensionsMap(); - @Override @SuppressWarnings("unchecked") public Tuple3 map(String message) { try { + ArrayList jobList = JsonParseUtil.getTransformsList(); + HashMap dimensionsMap = JsonParseUtil.getDimensionsMap(); if (StringUtil.isNotBlank(message)) { Map object = (Map) JsonMapper.fromJsonString(message, Map.class); -// String streamTraceId = JsonMapperParseUtil.getString(object, "common_stream_trace_id"); Map dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); if (ParseFunctions.filterLogs(object)) { for (String[] strings : jobList) { @@ -126,7 +123,6 @@ public class ParseMapFunction implements MapFunction, String, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class); - private static HashMap metricsMap = JsonParseUtil.getMetricsMap(); private HashMap> cacheMap = new HashMap<>(320); - private static String resultTimeKey = JsonParseUtil.getTimeKey(); @Override @SuppressWarnings("unchecked") public void process(String key, Context context, Iterable> input, Collector output) { try { + HashMap metricsMap = JsonParseUtil.getMetricFunctionsMap(); for (Tuple2 tuple : input) { String dimensions = tuple.f0; String message = tuple.f1; @@ -57,7 +56,7 @@ public class SecondCountWindowFunction extends ProcessWindowFunction resultMap = cacheMap.get(countKey); - JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); + JsonParseUtil.setValue(resultMap, JsonParseUtil.getResultTimeKey(), endTime); output.collect(JsonMapper.toJsonString(resultMap)); } } diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java index 5417236..0672179 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java @@ -1,7 +1,7 @@ package com.zdjizhi.utils.general; -import com.zdjizhi.utils.json.JsonTypeUtils; +import com.zdjizhi.utils.json.JsonTypeUtil; /** * @author qidaijie @@ -18,8 +18,8 @@ public class MetricFunctions { * @return value1 + value2 */ public static Long longSum(Object value1, Object value2) { - Long res1 = JsonTypeUtils.checkLongValue(value1); - Long res2 = JsonTypeUtils.checkLongValue(value2); + Long res1 = JsonTypeUtil.checkLongValue(value1); + Long res2 = JsonTypeUtil.checkLongValue(value2); return res1 + res2; } @@ -32,6 +32,6 @@ public class MetricFunctions { */ public static Long count(Object count) { - return JsonTypeUtils.checkLongValue(count) + 1L; + return JsonTypeUtil.checkLongValue(count) + 1L; } } diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java index 6933e7c..2e28428 100644 --- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -18,12 +18,6 @@ import java.util.Map; * @Version V1.0 **/ public class ParseFunctions { - /** - * 获取filters条件map - */ - private static HashMap filtersMap = JsonParseUtil.getFiltersMap(); - - private static ArrayList metricsList = JsonParseUtil.getLogMetrics(); /** * 解析 dimensions 字段集 @@ -50,7 +44,7 @@ public class ParseFunctions { */ public static boolean filterLogs(Map object) { boolean available = false; - + HashMap filtersMap = JsonParseUtil.getFiltersMap(); for (String key : filtersMap.keySet()) { switch (key) { case "notempty": @@ -65,11 +59,16 @@ public class ParseFunctions { return available; } + /** + * 根据原始日志字段,生成schema内指定的metrics指标json。 + * + * @param object 原始日志json + * @return 统计metrics json + */ public static String getMetricsLog(Map object) { - Map json = new HashMap<>(16); - for (String fileName : metricsList) { + for (String fileName : JsonParseUtil.getMetricsFiledNameList()) { json.put(fileName, object.get(fileName)); } diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java deleted file mode 100644 index 1adb1d1..0000000 --- a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.zdjizhi.utils.http; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.commons.io.IOUtils; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * 获取网关schema的工具类 - * - * @author qidaijie - */ -public class HttpClientUtil { - private static final Log logger = LogFactory.get(); - - /** - * 请求网关获取schema - * - * @param http 网关url - * @return schema - */ - public static String requestByGetMethod(String http) { - CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder; - - HttpGet get = new HttpGet(http); - BufferedReader bufferedReader = null; - CloseableHttpResponse httpResponse = null; - try { - httpResponse = httpClient.execute(get); - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - int intC; - while ((intC = bufferedReader.read()) != -1) { - char c = (char) intC; - if (c == '\n') { - break; - } - entityStringBuilder.append(c); - } - - return entityStringBuilder.toString(); - } - } catch (IOException e) { - logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); - } finally { - if (httpClient != null) { - try { - httpClient.close(); - } catch (IOException e) { - logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); - } - } - if (httpResponse != null) { - try { - httpResponse.close(); - } catch (IOException e) { - logger.error("Close httpResponse ERROR! Exception messgae is:" + e); - } - } - if (bufferedReader != null) { - IOUtils.closeQuietly(bufferedReader); - } - } - return ""; - } -} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 0ebe8e1..b133368 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -1,17 +1,21 @@ package com.zdjizhi.utils.json; - import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.http.HttpClientUtil; -import net.sf.cglib.beans.BeanGenerator; +import com.zdjizhi.utils.StringUtil; import net.sf.cglib.beans.BeanMap; import java.util.*; +import java.util.concurrent.Executor; /** * 使用FastJson解析json的工具类 @@ -21,74 +25,76 @@ import java.util.*; public class JsonParseUtil { private static final Log logger = LogFactory.get(); + private static Properties propNacos = new Properties(); /** - * 模式匹配,给定一个类型字符串返回一个类类型 - * - * @param type 类型 - * @return 类类型 + * 获取actions所有的计算函数 */ - - public static Class getClassName(String type) { - Class clazz; - - switch (type) { - case "int": - clazz = Integer.class; - break; - case "string": - clazz = String.class; - break; - case "long": - clazz = long.class; - break; - case "array": - clazz = List.class; - break; - case "double": - clazz = double.class; - break; - case "float": - clazz = float.class; - break; - case "char": - clazz = char.class; - break; - case "byte": - clazz = byte.class; - break; - case "boolean": - clazz = boolean.class; - break; - case "short": - clazz = short.class; - break; - default: - clazz = String.class; - } - return clazz; - } - + private static HashMap actionMap = new HashMap<>(16); /** - * 获取属性值的方法 - * - * @param obj 对象 - * @param property key - * @return 属性的值 + * 解析metrics指标字段信息 */ - public static Object getValue(Object obj, String property) { + private static HashMap metricFunctionsMap = new HashMap<>(16); + /** + * 解析dimensions维度字段信息 + */ + private static HashMap dimensionsMap = new HashMap<>(16); + + /** + * 解析filters过滤信息 + */ + private static HashMap filtersMap = new HashMap<>(16); + + /** + * 解析transforms转换函数信息 + */ + private static ArrayList transformsList = new ArrayList<>(); + + /** + * 解析metrics指标字段集 + */ + private static ArrayList metricsFiledNameList = new ArrayList<>(); + + /** + * 解析hierarchy函数,获取切分信息 + */ + private static String[] hierarchy; + + /** + * 解析时间戳字段名称 + */ + private static String resultTimeKey = "stat_time"; + + static { + propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, StreamAggregateConfig.NACOS_SERVER); + propNacos.setProperty(PropertyKeyConst.NAMESPACE, StreamAggregateConfig.NACOS_SCHEMA_NAMESPACE); + propNacos.setProperty(PropertyKeyConst.USERNAME, StreamAggregateConfig.NACOS_USERNAME); + propNacos.setProperty(PropertyKeyConst.PASSWORD, StreamAggregateConfig.NACOS_PIN); try { - BeanMap beanMap = BeanMap.create(obj); - if (beanMap.containsKey(property)) { - return beanMap.get(property); - } else { - return null; + ConfigService configService = NacosFactory.createConfigService(propNacos); + String dataId = StreamAggregateConfig.NACOS_DATA_ID; + String group = StreamAggregateConfig.NACOS_GROUP; + String schema = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(schema)) { + parseSchema(schema); } - } catch (RuntimeException e) { - logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); - return null; + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + if (StringUtil.isNotBlank(configMsg)) { + parseSchema(configMsg); + } + } + }); + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); } } @@ -103,7 +109,7 @@ public class JsonParseUtil { try { return jsonMap.getOrDefault(property, null); } catch (RuntimeException e) { - logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + logger.error("Get the JSON value is abnormal,The key is :" + property + "error message is :" + e); return null; } } @@ -152,7 +158,7 @@ public class JsonParseUtil { try { jsonMap.put(property, value); } catch (RuntimeException e) { - logger.error("赋予实体类错误类型数据", e); + logger.error("The JSON set value is abnormal,the error message is :", e); } } @@ -172,204 +178,130 @@ public class JsonParseUtil { } } - /** - * 根据反射生成对象的方法 - * - * @param properties 反射类用的map - * @return 生成的Object类型的对象 - */ - public static Object generateObject(Map properties) { - BeanGenerator generator = new BeanGenerator(); - Set keySet = properties.keySet(); - for (Object aKeySet : keySet) { - String key = (String) aKeySet; - generator.addProperty(key, (Class) properties.get(key)); - } - return generator.create(); - } - /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @return 用于反射生成schema类型的对象的一个map集合 + * 用于反射生成schema类型的对象的一个map集合 */ - public static HashMap getActionMap() { - HashMap map = new HashMap<>(16); - - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + private static void parseSchema(String schema) { + clearCacheMap(); DocumentContext parse = JsonPath.parse(schema); - List actions = parse.read("$.data.doc.action[*]"); - + List actions = parse.read("$.doc.action[*]"); for (Object action : actions) { - map.put(JsonPath.read(action, "$.label"), + actionMap.put(JsonPath.read(action, "$.label"), JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER)); -// System.out.println(JsonPath.read(action, "$.label")+JsonPath.read(action, "$.metrics").toString()); } - return map; - } - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @return 用于反射生成schema类型的对象的一个map集合 - */ - public static HashMap getMetricsMap() { - HashMap map = new HashMap<>(16); - - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - - List metrics = parse.read("$.data.doc.metrics[*]"); - - for (Object metric : metrics) { - map.put(JsonPath.read(metric, "$.name"), + List metricFunctions = parse.read("$.doc.metrics[*]"); + for (Object metric : metricFunctions) { + metricFunctionsMap.put(JsonPath.read(metric, "$.name"), new String[]{JsonPath.read(metric, "$.function"), JsonPath.read(metric, "$.fieldName")} ); } - return map; - } - - /** - * 获取Metrics内指标,用于过滤原始日志 - * - * @return 指标列原始名称 - */ - public static ArrayList getLogMetrics() { - ArrayList list = new ArrayList<>(); - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - - List metrics = parse.read("$.data.doc.metrics[*]"); + List metrics = parse.read("$.doc.metrics[*]"); for (Object metric : metrics) { - list.add(JsonPath.read(metric, "$.fieldName")); + metricsFiledNameList.add(JsonPath.read(metric, "$.fieldName")); } - return list; - } - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @return 用于反射生成schema类型的对象的一个map集合 - */ - public static String getTimeKey() { - - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - - return JsonPath.read(schema, "$.data.doc.timestamp.name"); - } - - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @return 用于反射生成schema类型的对象的一个map集合 - */ - public static HashMap getResultLogMap() { - HashMap map = new HashMap<>(16); - - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - - List dimensions = parse.read("$.data.doc.dimensions[*]"); + List dimensions = parse.read("$.doc.dimensions[*]"); for (Object dimension : dimensions) { - map.put(JsonPath.read(dimension, "$.name"), - JsonParseUtil.getClassName(JsonPath.read(dimension, "$.type"))); - } - - List metrics = parse.read("$.data.doc.metrics[*]"); - for (Object metric : metrics) { - map.put(JsonPath.read(metric, "$.name"), - JsonParseUtil.getClassName(JsonPath.read(metric, "$.type"))); - } - - return map; - } - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @return 用于反射生成schema类型的对象的一个map集合 - */ - public static HashMap getDimensionsMap() { - HashMap map = new HashMap<>(16); - - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - - List dimensions = parse.read("$.data.doc.dimensions[*]"); - - for (Object dimension : dimensions) { - map.put(JsonPath.read(dimension, "$.name"), + dimensionsMap.put(JsonPath.read(dimension, "$.name"), JsonPath.read(dimension, "$.fieldName")); } - return map; - } - - - /** - * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 - * - * @return 用于反射生成schema类型的对象的一个map集合 - */ - public static HashMap getFiltersMap() { - HashMap map = new HashMap<>(16); - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - - List filters = parse.read("$.data.doc.filters[*]"); + List filters = parse.read("$.doc.filters[*]"); for (Object filter : filters) { - map.put(JsonPath.read(filter, "$.type"), JsonPath.read(filter, "$.fieldName")); + filtersMap.put(JsonPath.read(filter, "$.type"), JsonPath.read(filter, "$.fieldName")); } - return map; - } - - - /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) - * - * @return 任务列表 - */ - public static ArrayList getTransformsList() { - ArrayList list = new ArrayList<>(); - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - - List transforms = parse.read("$.data.doc.transforms[*]"); + List transforms = parse.read("$.doc.transforms[*]"); for (Object transform : transforms) { String function = JsonPath.read(transform, "$.function").toString(); String name = JsonPath.read(transform, "$.name").toString(); String fieldName = JsonPath.read(transform, "$.fieldName").toString(); String parameters = JsonPath.read(transform, "$.parameters").toString(); - list.add(new String[]{function, name, fieldName, parameters}); + transformsList.add(new String[]{function, name, fieldName, parameters}); } - return list; - } - - /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) - * - * @return 任务列表 - */ - public static String[] getHierarchy() { - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); - DocumentContext parse = JsonPath.parse(schema); - List transforms = parse.read("$.data.doc.transforms[*]"); - for (Object transform : transforms) { + List hierarchyList = parse.read("$.doc.transforms[*]"); + for (Object transform : hierarchyList) { String function = JsonPath.read(transform, "$.function").toString(); if ("hierarchy".equals(function)) { String name = JsonPath.read(transform, "$.name").toString(); String parameters = JsonPath.read(transform, "$.parameters").toString(); - return new String[]{name, parameters}; + hierarchy = new String[]{name, parameters}; } } - return null; + + resultTimeKey = JsonPath.read(schema, "$.doc.timestamp.name"); } + /** + * @return 解析schema获取的actions集合 + */ + public static HashMap getActionMap() { + return actionMap; + } + + /** + * @return 解析schema获取的指标统计方式集合 + */ + public static HashMap getMetricFunctionsMap() { + return metricFunctionsMap; + } + + /** + * @return 解析schema获取的维度集合 + */ + public static HashMap getDimensionsMap() { + return dimensionsMap; + } + + /** + * @return 解析schema获取的过滤规则集合 + */ + public static HashMap getFiltersMap() { + return filtersMap; + } + + /** + * @return 解析schema获取的操作集合 + */ + public static ArrayList getTransformsList() { + return transformsList; + } + + /** + * @return 解析schema获取的指标字段集合 + */ + public static ArrayList getMetricsFiledNameList() { + return metricsFiledNameList; + } + + /** + * @return 解析schema获取的拆解函数 + */ + public static String[] getHierarchy() { + return hierarchy; + } + + /** + * @return 解析schema获取的时间字段的key + */ + public static String getResultTimeKey() { + return resultTimeKey; + } + + /** + * 在配置变化时清空缓存,重新解析schema更新缓存 + */ + private static void clearCacheMap() { + actionMap.clear(); + metricFunctionsMap.clear(); + dimensionsMap.clear(); + filtersMap.clear(); + transformsList.clear(); + metricsFiledNameList.clear(); + } } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java similarity index 95% rename from src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java rename to src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java index 034f76a..8555b1f 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils.json; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; + import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.exception.AnalysisException; @@ -14,8 +13,7 @@ import java.util.Map; * @Description: * @date 2021/7/1217:34 */ -public class JsonTypeUtils { - private static final Log logger = LogFactory.get(); +public class JsonTypeUtil { /** * String 类型检验转换方法 * diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 85024e1..545a0e3 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -22,19 +22,25 @@ public class KafkaConsumer { properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS); properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS); properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES); - properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } + /** + * 官方序列化kafka数据 + * + * @return kafka logs + */ public static FlinkKafkaConsumer getKafkaConsumer() { FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); + //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + + //从消费组当前的offset开始消费 kafkaConsumer.setStartFromGroupOffsets(); return kafkaConsumer; diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index c350439..7fd38ec 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -42,11 +42,7 @@ public class KafkaProducer { createProducerConfig(), Optional.empty()); //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 - kafkaProducer.setLogFailuresOnly(false); - - //写入kafka的消息携带时间戳 -// kafkaProducer.setWriteTimestampToKafka(true); - + kafkaProducer.setLogFailuresOnly(true); return kafkaProducer; } diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties index 9d91936..5a6b542 100644 --- a/src/main/java/log4j.properties +++ b/src/main/java/log4j.properties @@ -1,14 +1,14 @@ #Log4j -log4j.rootLogger=info,console,file +log4j.rootLogger=error,console,file # 控制台日志设置 log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=info +log4j.appender.console.Threshold=error log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n # 文件日志设置 log4j.appender.file=org.apache.log4j.DailyRollingFileAppender -log4j.appender.file.Threshold=info +log4j.appender.file.Threshold=error log4j.appender.file.encoding=UTF-8 log4j.appender.file.Append=true #路径请用相对路径,做好相关测试输出到应用目下 @@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout #log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n #MyBatis 配置,com.nis.web.dao是mybatis接口所在包 -log4j.logger.com.nis.web.dao=debug +log4j.logger.com.nis.web.dao=error #bonecp数据源配置 -log4j.category.com.jolbox=debug,console +log4j.category.com.jolbox=error,console