diff --git a/pom.xml b/pom.xml index 315e8dd..8ca46cd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi app-protocol-stat-traffic-merge - 230506 + 230516 app-protocol-stat-traffic-merge http://www.example.com @@ -35,10 +35,11 @@ UTF-8 1.13.1 - 2.7.1 1.0.0 - 2.2.3 1.2.0 + 5.7.17 + 3.2.0 + 1.9.3 1.0.8 2.0.26 provided @@ -115,6 +116,18 @@ + + cglib + cglib-nodep + 3.2.4 + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + com.zdjizhi @@ -173,30 +186,17 @@ ${scope.type} - - cglib - cglib-nodep - 3.2.4 - - - - org.junit.jupiter - junit-jupiter-api - 5.3.2 - compile - - cn.hutool hutool-all - 5.7.17 + ${hutool.version} org.jasypt jasypt - 1.9.3 + ${jasypt.version} @@ -230,7 +230,7 @@ org.apache.datasketches datasketches-java - 3.2.0 + ${datasketches.version} diff --git a/properties/default_config.properties b/properties/default_config.properties index b17a8cc..efe85b3 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -43,9 +43,4 @@ kafka.user=nsyGpHKGFA4KW0zro9MDdw== kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ #====================Topology Default====================# - -#两个输出之间的最大时间(单位milliseconds) -buffer.timeout=100 - -#第一次随机分组random范围 -random.range.num=20 \ No newline at end of file +measurement.name=application_protocol_stat \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 123423a..e588696 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -12,26 +12,26 @@ tools.library=D:\\workerspace\\dat #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=test +source.kafka.topic=APP-PROTOCOL-STAT-TRAFFIC-AGENT #补全数据 输出 topic -sink.kafka.topic=test-result +sink.kafka.topic=APP-PROTOCOL-STAT-TRAFFIC-MERGE-LOCAL #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=livecharts-test-20230423-1 #--------------------------------topology配置------------------------------# #consumer 并行度 -source.parallelism=3 +source.parallelism=1 #map函数并行度 -parse.parallelism=3 +parse.parallelism=1 #第一次窗口计算并行度 -window.parallelism=3 +window.parallelism=1 #producer 并行度 -sink.parallelism=3 +sink.parallelism=1 #初次随机预聚合窗口时间 count.window.time=15 diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java index 6f5798d..8795a7e 100644 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java +++ b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java @@ -24,6 +24,7 @@ public class GlobalConfig { * System */ public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism"); + public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name"); public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism"); public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism"); public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java index 7b04295..0ae91e5 100644 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java +++ b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java @@ -13,26 +13,26 @@ import java.util.Properties; public final class GlobalConfigLoad { - private static Properties propKafka = new Properties(); + private static Properties propDefault = new Properties(); private static Properties propService = new Properties(); - public static String getStringProperty(Integer type, String key) { + static String getStringProperty(Integer type, String key) { if (type == 0) { return propService.getProperty(key); } else if (type == 1) { - return propKafka.getProperty(key); + return propDefault.getProperty(key); } else { return null; } } - public static Integer getIntProperty(Integer type, String key) { + static Integer getIntProperty(Integer type, String key) { if (type == 0) { return Integer.parseInt(propService.getProperty(key)); } else if (type == 1) { - return Integer.parseInt(propKafka.getProperty(key)); + return Integer.parseInt(propDefault.getProperty(key)); } else { return null; } @@ -42,7 +42,7 @@ public final class GlobalConfigLoad { if (type == 0) { return Long.parseLong(propService.getProperty(key)); } else if (type == 1) { - return Long.parseLong(propKafka.getProperty(key)); + return Long.parseLong(propDefault.getProperty(key)); } else { return null; } @@ -52,7 +52,7 @@ public final class GlobalConfigLoad { if (type == 0) { return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); } else if (type == 1) { - return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); } else { return null; } @@ -61,9 +61,9 @@ public final class GlobalConfigLoad { static { try { propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propKafka.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties")); + propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties")); } catch (IOException | RuntimeException e) { - propKafka = null; + propDefault = null; propService = null; } } diff --git a/src/main/java/com/zdjizhi/common/pojo/Fields.java b/src/main/java/com/zdjizhi/common/pojo/Fields.java index dc075b2..baa5b25 100644 --- a/src/main/java/com/zdjizhi/common/pojo/Fields.java +++ b/src/main/java/com/zdjizhi/common/pojo/Fields.java @@ -26,9 +26,9 @@ public class Fields { private Long s2c_tcp_retransmitted_pkts; private Long c2s_tcp_retransmitted_bytes; private Long s2c_tcp_retransmitted_bytes; - private byte[] client_ip_sketch; + private String client_ip_sketch; - public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, byte[] client_ip_sketch) { + public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, String client_ip_sketch) { this.sessions = sessions; this.in_bytes = in_bytes; this.out_bytes = out_bytes; @@ -203,11 +203,11 @@ public class Fields { this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; } - public byte[] getClient_ip_sketch() { + public String getClient_ip_sketch() { return client_ip_sketch; } - public void setClient_ip_sketch(byte[] client_ip_sketch) { + public void setClient_ip_sketch(String client_ip_sketch) { this.client_ip_sketch = client_ip_sketch; } } diff --git a/src/main/java/com/zdjizhi/common/pojo/Metrics.java b/src/main/java/com/zdjizhi/common/pojo/Metrics.java new file mode 100644 index 0000000..58ecbfb --- /dev/null +++ b/src/main/java/com/zdjizhi/common/pojo/Metrics.java @@ -0,0 +1,54 @@ +package com.zdjizhi.common.pojo; + +/** + * @author qidaijie + * @Package com.zdjizhi.common.pojo + * @Description: + * @date 2023/5/911:42 + */ +public class Metrics { + private String name; + private Tags tags; + private Fields fields; + private long timestamp; + + + public Metrics(String name, Tags tags, Fields fields, long timestamp) { + this.name = name; + this.tags = tags; + this.fields = fields; + this.timestamp = timestamp; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Tags getTags() { + return tags; + } + + public void setTags(Tags tags) { + this.tags = tags; + } + + public Fields getFields() { + return fields; + } + + public void setFields(Fields fields) { + this.fields = fields; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } +} diff --git a/src/main/java/com/zdjizhi/common/pojo/Tags.java b/src/main/java/com/zdjizhi/common/pojo/Tags.java index 8f35b37..bad2520 100644 --- a/src/main/java/com/zdjizhi/common/pojo/Tags.java +++ b/src/main/java/com/zdjizhi/common/pojo/Tags.java @@ -1,5 +1,7 @@ package com.zdjizhi.common.pojo; +import com.alibaba.fastjson.annotation.JSONField; + /** * @author qidaijie * @Package com.zdjizhi.common.pojo @@ -11,16 +13,16 @@ public class Tags { private String device_id; private String device_group; private String data_center; - private String protocol_label; - private String app_full_path; + private String protocol_stack_id; + private String app_name; - public Tags(int vsys_id, String device_id, String device_group, String data_center, String protocol_label, String app_full_path) { + public Tags(int vsys_id, String device_id, String device_group, String data_center, String protocol_stack_id, String app_name) { this.vsys_id = vsys_id; this.device_id = device_id; this.device_group = device_group; this.data_center = data_center; - this.protocol_label = protocol_label; - this.app_full_path = app_full_path; + this.protocol_stack_id = protocol_stack_id; + this.app_name = app_name; } public int getVsys_id() { @@ -55,19 +57,26 @@ public class Tags { this.data_center = data_center; } - public String getProtocol_label() { - return protocol_label; + public String getProtocol_stack_id() { + return protocol_stack_id; } - public void setProtocol_label(String protocol_label) { - this.protocol_label = protocol_label; + @JSONField(name = "protocol_label") + public void setProtocol_stack_id(String protocol_stack_id) { + this.protocol_stack_id = protocol_stack_id; } - public String getApp_full_path() { - return app_full_path; + public String getApp_name() { + return app_name; } - public void setApp_full_path(String app_full_path) { - this.app_full_path = app_full_path; + @JSONField(name = "app_full_path") + public void setApp_name(String app_name) { + this.app_name = app_name; + } + + @Override + public String toString() { + return vsys_id + device_id + device_group + data_center + protocol_stack_id + app_name; } } diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index 59cd208..0f34770 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -3,7 +3,9 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.config.GlobalConfig; -import com.zdjizhi.common.pojo.AppProtocol; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Metrics; +import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.functions.filter.DataTypeFilter; import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; import com.zdjizhi.utils.functions.map.MetricsParseMap; @@ -41,10 +43,10 @@ public class ApplicationProtocolTopology { .name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM); - SingleOutputStreamOperator> parseDataMap = appProtocolFilter.map(new MetricsParseMap()) + SingleOutputStreamOperator> parseDataMap = appProtocolFilter.map(new MetricsParseMap()) .name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM); - SingleOutputStreamOperator dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy()) + SingleOutputStreamOperator dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy()) .window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) .reduce(new DispersionCountWindow(), new MergeCountWindow()) .name("DispersionCountWindow") diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java index a97c367..4393729 100644 --- a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java @@ -1,14 +1,9 @@ package com.zdjizhi.utils.functions.keyby; -import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.pojo.AppProtocol; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Tags; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.Map; /** * @author qidaijie @@ -16,11 +11,11 @@ import java.util.Map; * @Description: * @date 2021/7/2112:13 */ -public class DimensionKeyBy implements KeySelector, String> { +public class DimensionKeyBy implements KeySelector, String> { @Override - public String getKey(Tuple2 value) throws Exception { + public String getKey(Tuple2 value) throws Exception { //以map拼接的key分组 - return value.f0; + return value.f0.toString(); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java b/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java index 66b840f..48d8757 100644 --- a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java +++ b/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java @@ -4,7 +4,8 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.pojo.AppProtocol; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -16,30 +17,26 @@ import org.apache.flink.api.java.tuple.Tuple2; * @Description: * @date 2021/5/2715:01 */ -public class MetricsParseMap implements MapFunction> { +public class MetricsParseMap implements MapFunction> { private static final Log logger = LogFactory.get(); @Override - @SuppressWarnings("unchecked") - public Tuple2 map(String message) { + public Tuple2 map(String message) { try { JSONObject originalLog = JSON.parseObject(message); - JSONObject fieldsObject = JSONObject.parseObject(originalLog.getString("fields")); - JSONObject tagsObject = JSONObject.parseObject(originalLog.getString("tags")); - fieldsObject.putAll(tagsObject); + Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class); + Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class); - AppProtocol appProtocol = JSON.to(AppProtocol.class, fieldsObject); - - String appFullPath = appProtocol.getApp_name(); + String appFullPath = tags.getApp_name(); if (StringUtil.isNotBlank(appFullPath)) { String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1); - String protocolLabel = appProtocol.getProtocol_stack_id(); + String protocolLabel = tags.getProtocol_stack_id(); - appProtocol.setApp_name(appName); - appProtocol.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath)); + tags.setApp_name(appName); + tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath)); } - return new Tuple2<>(tagsObject.toJSONString(), appProtocol); + return new Tuple2<>(tags, fields); } catch (RuntimeException e) { logger.error("An error occurred in the original log parsing reorganization,error message is:" + e); return new Tuple2<>(null, null); diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java index 8fcf560..47b668c 100644 --- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java +++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java @@ -5,9 +5,9 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONWriter; import com.zdjizhi.common.config.GlobalConfig; -import com.zdjizhi.common.pojo.AppProtocol; +import com.zdjizhi.common.pojo.Metrics; +import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.general.FormatConverterUtil; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; @@ -17,20 +17,16 @@ import org.apache.flink.util.Collector; * @Description: * @date 2021/7/2114:52 */ -public class ResultFlatMap implements FlatMapFunction { +public class ResultFlatMap implements FlatMapFunction { private static final Log logger = LogFactory.get(); @Override - @SuppressWarnings("unchecked") - public void flatMap(AppProtocol appProtocol, Collector out) throws Exception { + public void flatMap(Metrics metrics, Collector out) throws Exception { try { - JSONObject tags = FormatConverterUtil.getTags(appProtocol); - JSONObject conversion = FormatConverterUtil.structureConversion(appProtocol); - - String protocolStackId = tags.getString("protocol_stack_id"); - - out.collect(FormatConverterUtil.updateTagsData(conversion, tags)); - tags.remove("app_name"); + Tags tags = metrics.getTags(); + String protocolStackId = tags.getProtocol_stack_id(); + out.collect(getResultJson(metrics)); + tags.setApp_name(null); StringBuilder stringBuilder = new StringBuilder(); String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER); @@ -38,13 +34,14 @@ public class ResultFlatMap implements FlatMapFunction { for (int i = 0; i < protocolIdsNum - 1; i++) { if (StringUtil.isBlank(stringBuilder.toString())) { stringBuilder.append(protocolIds[i]); - tags.put("protocol_stack_id", stringBuilder.toString()); - out.collect(FormatConverterUtil.updateTagsData(conversion, tags)); + tags.setProtocol_stack_id(stringBuilder.toString()); + metrics.setTags(tags); + out.collect(getResultJson(metrics)); } else { stringBuilder.append(".").append(protocolIds[i]); - tags.put("protocol_stack_id", stringBuilder.toString()); - conversion.put("tags", tags); - out.collect(FormatConverterUtil.updateTagsData(conversion, tags)); + tags.setProtocol_stack_id(stringBuilder.toString()); + metrics.setTags(tags); + out.collect(getResultJson(metrics)); } } } catch (RuntimeException e) { @@ -52,4 +49,10 @@ public class ResultFlatMap implements FlatMapFunction { e.printStackTrace(); } } + + private static String getResultJson(Metrics metrics) { + return JSONObject.toJSONString(metrics + , JSONWriter.Feature.WriteNullStringAsEmpty + , JSONWriter.Feature.WriteNullNumberAsZero); + } } diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java index 0176ba2..57ebde1 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java @@ -2,7 +2,8 @@ package com.zdjizhi.utils.functions.statistics; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.pojo.AppProtocol; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.general.MetricUtil; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -13,18 +14,18 @@ import org.apache.flink.api.java.tuple.Tuple2; * @Description: * @date 2023/4/2314:02 */ -public class DispersionCountWindow implements ReduceFunction> { +public class DispersionCountWindow implements ReduceFunction> { private static final Log logger = LogFactory.get(); @Override - public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { + public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { try { - AppProtocol cacheData = value1.f1; - AppProtocol newData = value2.f1; + Fields cacheData = value1.f1; + Fields newData = value2.f1; - MetricUtil.statisticsMetrics(cacheData, newData); + Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData); - return new Tuple2<>(value1.f0, cacheData); + return new Tuple2<>(value1.f0, metricsResult); } catch (RuntimeException e) { logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage()); return value1; diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java index 7a2866a..b0d50fb 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java @@ -2,7 +2,10 @@ package com.zdjizhi.utils.functions.statistics; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.pojo.AppProtocol; +import com.zdjizhi.common.config.GlobalConfig; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Metrics; +import com.zdjizhi.common.pojo.Tags; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -14,17 +17,18 @@ import org.apache.flink.util.Collector; * @Description: * @date 2023/4/2314:43 */ -public class MergeCountWindow extends ProcessWindowFunction, AppProtocol, String, TimeWindow> { +public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> { private static final Log logger = LogFactory.get(); @Override - public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception { + public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception { try { Long endTime = context.window().getEnd() / 1000; - for (Tuple2 tuple : input) { - AppProtocol data = tuple.f1; - data.setTimestamp(endTime); - output.collect(data); + for (Tuple2 tuple : input) { + Tags tags = tuple.f0; + Fields fields = tuple.f1; + Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, endTime); + output.collect(metrics); } } catch (RuntimeException e) { logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage()); diff --git a/src/main/java/com/zdjizhi/utils/general/FormatConverterUtil.java b/src/main/java/com/zdjizhi/utils/general/FormatConverterUtil.java deleted file mode 100644 index 452698a..0000000 --- a/src/main/java/com/zdjizhi/utils/general/FormatConverterUtil.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.zdjizhi.utils.general; - -import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.JSONWriter; -import com.zdjizhi.common.pojo.AppProtocol; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.general - * @Description: - * @date 2023/5/519:04 - */ -public class FormatConverterUtil { - - /** - * 生成tags类型数据 - * - * @param appProtocol 结果集 - * @return tags结果 - */ - public static JSONObject getTags(AppProtocol appProtocol) { - JSONObject tags = new JSONObject(); - tags.fluentPut("vsys_id", appProtocol.getVsys_id()) - .fluentPut("device_id", appProtocol.getDevice_id()) - .fluentPut("device_group", appProtocol.getDevice_group()) - .fluentPut("data_center", appProtocol.getData_center()) - .fluentPut("protocol_stack_id", appProtocol.getProtocol_stack_id()) - .fluentPut("app_name", appProtocol.getApp_name()); - - return tags; - - } - - /** - * 将数据结构转换为最终的结构 - * - * @param appProtocol 结果集 - * @return 结果数据 - */ - public static JSONObject structureConversion(AppProtocol appProtocol) { - JSONObject metrics = new JSONObject(); - JSONObject fields = new JSONObject(); - - fields.fluentPut("sessions", appProtocol.getSessions()) - .fluentPut("in_bytes", appProtocol.getIn_bytes()) - .fluentPut("out_bytes", appProtocol.getOut_bytes()) - .fluentPut("in_pkts", appProtocol.getIn_pkts()) - .fluentPut("out_pkts", appProtocol.getOut_pkts()) - .fluentPut("c2s_bytes", appProtocol.getC2s_bytes()) - .fluentPut("s2c_bytes", appProtocol.getS2c_bytes()) - .fluentPut("c2s_pkts", appProtocol.getC2s_pkts()) - .fluentPut("s2c_pkts", appProtocol.getS2c_pkts()) - .fluentPut("c2s_fragments", appProtocol.getC2s_fragments()) - .fluentPut("s2c_fragments", appProtocol.getS2c_fragments()) - .fluentPut("c2s_tcp_lost_bytes", appProtocol.getC2s_tcp_lost_bytes()) - .fluentPut("s2c_tcp_lost_bytes", appProtocol.getS2c_tcp_lost_bytes()) - .fluentPut("c2s_tcp_ooorder_pkts", appProtocol.getC2s_tcp_ooorder_pkts()) - .fluentPut("s2c_tcp_ooorder_pkts", appProtocol.getS2c_tcp_ooorder_pkts()) - .fluentPut("c2s_tcp_retransmitted_pkts", appProtocol.getC2s_tcp_retransmitted_bytes()) - .fluentPut("s2c_tcp_retransmitted_pkts", appProtocol.getS2c_tcp_retransmitted_bytes()) - .fluentPut("c2s_tcp_retransmitted_bytes", appProtocol.getC2s_tcp_retransmitted_pkts()) - .fluentPut("s2c_tcp_retransmitted_bytes", appProtocol.getS2c_tcp_retransmitted_pkts()) - .fluentPut("client_ip_sketch", appProtocol.getClient_ip_sketch()); - - metrics.put("timestamp", appProtocol.getTimestamp()); - metrics.put("name", "application_protocol_stat"); - - metrics.fluentPut("timestamp", appProtocol.getTimestamp()) - .fluentPut("name", "application_protocol_stat") - .fluentPut("fields", fields); - - return metrics; - } - - - /** - * 更新结果集tags数据(不同协议层级),并输出json - * - * @param conversion 结果集 - * @param tags tags结果 - * @return 结果json - */ - public static String updateTagsData(JSONObject conversion, JSONObject tags) { - conversion.put("tags", tags); - - return JSONObject.toJSONString(conversion - , JSONWriter.Feature.WriteNullStringAsEmpty - , JSONWriter.Feature.WriteNullNumberAsZero); - } - -} diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java index 3fa9c09..57a388f 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java @@ -1,9 +1,8 @@ package com.zdjizhi.utils.general; - import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.pojo.AppProtocol; +import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.utils.StringUtil; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.Union; @@ -27,35 +26,47 @@ public class MetricUtil { * @param cacheData 缓存中数据 * @param newData 新数据 */ - public static void statisticsMetrics(AppProtocol cacheData, AppProtocol newData) { - cacheData.setSessions(MetricUtil.longSum(cacheData.getSessions(), newData.getSessions())); + public static Fields statisticsMetrics(Fields cacheData, Fields newData) { - cacheData.setIn_bytes(MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes())); - cacheData.setOut_pkts(MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes())); - cacheData.setIn_pkts(MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts())); - cacheData.setOut_pkts(MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts())); + Long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions()); - cacheData.setC2s_bytes(MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes())); - cacheData.setS2c_bytes(MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes())); - cacheData.setC2s_pkts(MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts())); - cacheData.setS2c_pkts(MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts())); + Long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes()); + Long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes()); + Long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts()); + Long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts()); - cacheData.setC2s_fragments(MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments())); - cacheData.setS2c_fragments(MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments())); - cacheData.setC2s_tcp_lost_bytes(MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes())); - cacheData.setS2c_tcp_lost_bytes(MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes())); + Long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes()); + Long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes()); + Long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts()); + Long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts()); - cacheData.setC2s_tcp_ooorder_pkts(MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts())); - cacheData.setS2c_tcp_ooorder_pkts(MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts())); + Long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments()); + Long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments()); - cacheData.setC2s_tcp_retransmitted_bytes(MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes())); - cacheData.setS2c_tcp_retransmitted_bytes(MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes())); + Long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes()); + Long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes()); - cacheData.setC2s_tcp_retransmitted_pkts(MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts())); - cacheData.setS2c_tcp_retransmitted_pkts(MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts())); + Long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts()); + Long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts()); - cacheData.setClient_ip_sketch(MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch())); + Long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts()); + Long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts()); + + Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); + Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); + + String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); + + return new Fields(sessions, + inBytes, outBytes, inPkts, outPkts, + c2sPkts, s2cPkts, c2sBytes, s2cBytes, + c2sFragments, s2cFragments, + c2sTcpLostBytes, s2cTcpLostBytes, + c2sTcpooorderPkts, s2cTcpooorderPkts, + c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, + c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, + clientIpSketch); } /** @@ -66,9 +77,9 @@ public class MetricUtil { * @return value1 + value2 */ private static Long longSum(Long value1, Long value2) { - Long result = 0L; + Long result; try { - if (value1 > 0 && value2 > 0) { + if (value1 >= 0 && value2 >= 0) { result = value1 + value2; } else { result = value1; diff --git a/src/main/java/com/zdjizhi/common/pojo/AppProtocol.java b/src/test/java/com/zdjizhi/AppProtocol.java similarity index 76% rename from src/main/java/com/zdjizhi/common/pojo/AppProtocol.java rename to src/test/java/com/zdjizhi/AppProtocol.java index 977e821..a665b85 100644 --- a/src/main/java/com/zdjizhi/common/pojo/AppProtocol.java +++ b/src/test/java/com/zdjizhi/AppProtocol.java @@ -1,4 +1,4 @@ -package com.zdjizhi.common.pojo; +package com.zdjizhi; import com.alibaba.fastjson.annotation.JSONField; @@ -37,6 +37,36 @@ public class AppProtocol { private Long s2c_tcp_retransmitted_bytes; private String client_ip_sketch; + public AppProtocol(Long timestamp, int vsys_id, String device_id, String device_group, String data_center, String protocol_stack_id, String app_name, Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, String client_ip_sketch) { + this.timestamp = timestamp; + this.vsys_id = vsys_id; + this.device_id = device_id; + this.device_group = device_group; + this.data_center = data_center; + this.protocol_stack_id = protocol_stack_id; + this.app_name = app_name; + this.sessions = sessions; + this.in_bytes = in_bytes; + this.out_bytes = out_bytes; + this.in_pkts = in_pkts; + this.out_pkts = out_pkts; + this.c2s_pkts = c2s_pkts; + this.s2c_pkts = s2c_pkts; + this.c2s_bytes = c2s_bytes; + this.s2c_bytes = s2c_bytes; + this.c2s_fragments = c2s_fragments; + this.s2c_fragments = s2c_fragments; + this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes; + this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes; + this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts; + this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts; + this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts; + this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts; + this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes; + this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; + this.client_ip_sketch = client_ip_sketch; + } + public Long getTimestamp() { return timestamp; } diff --git a/src/test/java/com/zdjizhi/DatasketchesTest.java b/src/test/java/com/zdjizhi/DatasketchesTest.java index 24585fd..09cb291 100644 --- a/src/test/java/com/zdjizhi/DatasketchesTest.java +++ b/src/test/java/com/zdjizhi/DatasketchesTest.java @@ -4,11 +4,13 @@ import cn.hutool.json.JSONUtil; import com.alibaba.fastjson2.*; import com.zdjizhi.utils.JsonMapper; import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Test; +import java.lang.instrument.Instrumentation; import java.util.*; /** @@ -161,16 +163,47 @@ public class DatasketchesTest { System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap)); System.out.println(JSONUtil.toJsonStr(dataMap)); - // sendMessage(JSONObject.toJSONString(dataMap)); } + @Test + public void HllSketchStorageTest() { + TgtHllType hllType = TgtHllType.HLL_4; +// TgtHllType hllType = TgtHllType.HLL_6; +// TgtHllType hllType = TgtHllType.HLL_8; + + HllSketch sketch4 = new HllSketch(4,hllType); + HllSketch sketch8 = new HllSketch(8,hllType); + HllSketch sketch12 = new HllSketch(12,hllType); + HllSketch sketch16 = new HllSketch(16,hllType); + HllSketch sketch21 = new HllSketch(21,hllType); + + HashSet IPSet = new HashSet<>(); + + for (int i = 0; i < 500000; i++) { + String ip = makeIPv4Random(); + IPSet.add(ip); + sketch4.update(ip); + sketch8.update(ip); + sketch12.update(ip); + sketch16.update(ip); + sketch21.update(ip); + } + System.out.println(IPSet.size()); + System.out.println(sketch4.toString()); + System.out.println(sketch8.toString()); + System.out.println(sketch12.toString()); + System.out.println(sketch16.toString()); + System.out.println(sketch21.toString()); + + } + //随机生成ip private static String makeIPv4Random() { int v4_1 = new Random().nextInt(255) + 1; - int v4_2 = new Random().nextInt(255); - int v4_3 = new Random().nextInt(255); + int v4_2 = new Random().nextInt(100); + int v4_3 = new Random().nextInt(100); int v4_4 = new Random().nextInt(255); return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4; } diff --git a/src/test/java/com/zdjizhi/FastJsonTest.java b/src/test/java/com/zdjizhi/FastJsonTest.java index 125a7cd..7d50fc1 100644 --- a/src/test/java/com/zdjizhi/FastJsonTest.java +++ b/src/test/java/com/zdjizhi/FastJsonTest.java @@ -1,15 +1,8 @@ package com.zdjizhi; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.alibaba.fastjson2.JSONPath; -import com.alibaba.fastjson2.JSONReader; -import com.zdjizhi.common.pojo.AppProtocol; -import com.zdjizhi.utils.StringUtil; +import com.alibaba.fastjson2.*; import org.junit.Test; -import java.util.ArrayList; - /** * @author qidaijie * @Package com.zdjizhi @@ -37,6 +30,22 @@ public class FastJsonTest { } System.out.println(JSONPath.contains(value, dataTypeExpr)); + + String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}"; + JSONObject originalLog = JSON.parseObject(message); + + String value1 = originalLog.getString("common_device_tag"); + System.out.println(JSONPath.eval(value1, "$.tags[?(@.tag=='data_center')][0].value")); + System.out.println(JSONPath.of("$.tags[?(@.tag=='data_center')].value").extract(JSONReader.of(value1))); + + JSONArray jsonArray = originalLog.getJSONObject("common_device_tag").getJSONArray("tags"); + for (Object array : jsonArray) { + JSONObject object = JSON.parseObject(array.toString()); + if ("device_group".equals(object.getString("tag"))) { + System.out.println(object.getString("value")); + } + } + } @Test @@ -60,4 +69,5 @@ public class FastJsonTest { System.out.println(appProtocol.getApp_name()); System.out.println(appProtocol.getProtocol_stack_id()); } + }