From 9e2d7350ea3516698510439eead1d5e7c047d73a Mon Sep 17 00:00:00 2001 From: qidaijie Date: Thu, 14 Dec 2023 10:55:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9Fields=E5=90=84=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...nfig.properties => application.properties} | 2 +- .../zdjizhi/common/config/MergeConfigs.java | 1 + .../java/com/zdjizhi/common/pojo/Fields.java | 117 +++++++++--------- .../functions/filter/DataTypeFilter.java | 36 ------ .../com/zdjizhi/utils/general/MetricUtil.java | 20 +-- src/test/java/com/zdjizhi/FlagsTest.java | 24 ++-- 6 files changed, 82 insertions(+), 118 deletions(-) rename properties/{service_flow_config.properties => application.properties} (94%) delete mode 100644 src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java diff --git a/properties/service_flow_config.properties b/properties/application.properties similarity index 94% rename from properties/service_flow_config.properties rename to properties/application.properties index 82a236c..0f66f4e 100644 --- a/properties/service_flow_config.properties +++ b/properties/application.properties @@ -1,5 +1,5 @@ #kafka 接收数据topic -source.kafka.topic=NETWORK-TRAFFIC-METRIC +source.kafka.topic=NETWORK-TRAFFIC-METRIC-TEST source.kafka.props.bootstrap.servers=192.168.44.12:9094 diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java index 738537c..8929f6c 100644 --- a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java +++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java @@ -69,4 +69,5 @@ public class MergeConfigs { .stringType() .defaultValue("application_protocol_stat") .withDescription("The data identification."); + } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/pojo/Fields.java b/src/main/java/com/zdjizhi/common/pojo/Fields.java index baa5b25..40b3995 100644 --- a/src/main/java/com/zdjizhi/common/pojo/Fields.java +++ b/src/main/java/com/zdjizhi/common/pojo/Fields.java @@ -7,28 +7,28 @@ package com.zdjizhi.common.pojo; * @date 2023/4/2311:47 */ public class Fields { - private Long sessions; - private Long in_bytes; - private Long out_bytes; - private Long in_pkts; - private Long out_pkts; - private Long c2s_pkts; - private Long s2c_pkts; - private Long c2s_bytes; - private Long s2c_bytes; - private Long c2s_fragments; - private Long s2c_fragments; - private Long c2s_tcp_lost_bytes; - private Long s2c_tcp_lost_bytes; - private Long c2s_tcp_ooorder_pkts; - private Long s2c_tcp_ooorder_pkts; - private Long c2s_tcp_retransmitted_pkts; - private Long s2c_tcp_retransmitted_pkts; - private Long c2s_tcp_retransmitted_bytes; - private Long s2c_tcp_retransmitted_bytes; + private long sessions; + private long in_bytes; + private long out_bytes; + private long in_pkts; + private long out_pkts; + private long c2s_pkts; + private long s2c_pkts; + private long c2s_bytes; + private long s2c_bytes; + private long c2s_fragments; + private long s2c_fragments; + private long c2s_tcp_lost_bytes; + private long s2c_tcp_lost_bytes; + private long c2s_tcp_ooorder_pkts; + private long s2c_tcp_ooorder_pkts; + private long c2s_tcp_retransmitted_pkts; + private long s2c_tcp_retransmitted_pkts; + private long c2s_tcp_retransmitted_bytes; + private long s2c_tcp_retransmitted_bytes; private String client_ip_sketch; - public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, String client_ip_sketch) { + public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes, String client_ip_sketch) { this.sessions = sessions; this.in_bytes = in_bytes; this.out_bytes = out_bytes; @@ -51,155 +51,155 @@ public class Fields { this.client_ip_sketch = client_ip_sketch; } - public Long getSessions() { + public long getSessions() { return sessions; } - public void setSessions(Long sessions) { + public void setSessions(long sessions) { this.sessions = sessions; } - public Long getIn_bytes() { + public long getIn_bytes() { return in_bytes; } - public void setIn_bytes(Long in_bytes) { + public void setIn_bytes(long in_bytes) { this.in_bytes = in_bytes; } - public Long getOut_bytes() { + public long getOut_bytes() { return out_bytes; } - public void setOut_bytes(Long out_bytes) { + public void setOut_bytes(long out_bytes) { this.out_bytes = out_bytes; } - public Long getIn_pkts() { + public long getIn_pkts() { return in_pkts; } - public void setIn_pkts(Long in_pkts) { + public void setIn_pkts(long in_pkts) { this.in_pkts = in_pkts; } - public Long getOut_pkts() { + public long getOut_pkts() { return out_pkts; } - public void setOut_pkts(Long out_pkts) { + public void setOut_pkts(long out_pkts) { this.out_pkts = out_pkts; } - public Long getC2s_pkts() { + public long getC2s_pkts() { return c2s_pkts; } - public void setC2s_pkts(Long c2s_pkts) { + public void setC2s_pkts(long c2s_pkts) { this.c2s_pkts = c2s_pkts; } - public Long getS2c_pkts() { + public long getS2c_pkts() { return s2c_pkts; } - public void setS2c_pkts(Long s2c_pkts) { + public void setS2c_pkts(long s2c_pkts) { this.s2c_pkts = s2c_pkts; } - public Long getC2s_bytes() { + public long getC2s_bytes() { return c2s_bytes; } - public void setC2s_bytes(Long c2s_bytes) { + public void setC2s_bytes(long c2s_bytes) { this.c2s_bytes = c2s_bytes; } - public Long getS2c_bytes() { + public long getS2c_bytes() { return s2c_bytes; } - public void setS2c_bytes(Long s2c_bytes) { + public void setS2c_bytes(long s2c_bytes) { this.s2c_bytes = s2c_bytes; } - public Long getC2s_fragments() { + public long getC2s_fragments() { return c2s_fragments; } - public void setC2s_fragments(Long c2s_fragments) { + public void setC2s_fragments(long c2s_fragments) { this.c2s_fragments = c2s_fragments; } - public Long getS2c_fragments() { + public long getS2c_fragments() { return s2c_fragments; } - public void setS2c_fragments(Long s2c_fragments) { + public void setS2c_fragments(long s2c_fragments) { this.s2c_fragments = s2c_fragments; } - public Long getC2s_tcp_lost_bytes() { + public long getC2s_tcp_lost_bytes() { return c2s_tcp_lost_bytes; } - public void setC2s_tcp_lost_bytes(Long c2s_tcp_lost_bytes) { + public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) { this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes; } - public Long getS2c_tcp_lost_bytes() { + public long getS2c_tcp_lost_bytes() { return s2c_tcp_lost_bytes; } - public void setS2c_tcp_lost_bytes(Long s2c_tcp_lost_bytes) { + public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) { this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes; } - public Long getC2s_tcp_ooorder_pkts() { + public long getC2s_tcp_ooorder_pkts() { return c2s_tcp_ooorder_pkts; } - public void setC2s_tcp_ooorder_pkts(Long c2s_tcp_ooorder_pkts) { + public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) { this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts; } - public Long getS2c_tcp_ooorder_pkts() { + public long getS2c_tcp_ooorder_pkts() { return s2c_tcp_ooorder_pkts; } - public void setS2c_tcp_ooorder_pkts(Long s2c_tcp_ooorder_pkts) { + public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) { this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts; } - public Long getC2s_tcp_retransmitted_pkts() { + public long getC2s_tcp_retransmitted_pkts() { return c2s_tcp_retransmitted_pkts; } - public void setC2s_tcp_retransmitted_pkts(Long c2s_tcp_retransmitted_pkts) { + public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) { this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts; } - public Long getS2c_tcp_retransmitted_pkts() { + public long getS2c_tcp_retransmitted_pkts() { return s2c_tcp_retransmitted_pkts; } - public void setS2c_tcp_retransmitted_pkts(Long s2c_tcp_retransmitted_pkts) { + public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) { this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts; } - public Long getC2s_tcp_retransmitted_bytes() { + public long getC2s_tcp_retransmitted_bytes() { return c2s_tcp_retransmitted_bytes; } - public void setC2s_tcp_retransmitted_bytes(Long c2s_tcp_retransmitted_bytes) { + public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) { this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes; } - public Long getS2c_tcp_retransmitted_bytes() { + public long getS2c_tcp_retransmitted_bytes() { return s2c_tcp_retransmitted_bytes; } - public void setS2c_tcp_retransmitted_bytes(Long s2c_tcp_retransmitted_bytes) { + public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) { this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; } @@ -210,4 +210,5 @@ public class Fields { public void setClient_ip_sketch(String client_ip_sketch) { this.client_ip_sketch = client_ip_sketch; } + } diff --git a/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java b/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java deleted file mode 100644 index cbf9572..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.utils.functions.filter; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson2.JSONPath; -import com.alibaba.fastjson2.JSONReader; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions.filter - * @Description: - * @date 2023/4/1919:02 - */ -public class DataTypeFilter implements FilterFunction { - private static final Log logger = LogFactory.get(); - - private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]"; - - @Override - public boolean filter(String message) throws Exception { - boolean protocolData = false; - try { - if (StringUtil.isNotBlank(message)) { - Object name = JSONPath.eval(message, dataTypeExpr); - if (name != null) { - protocolData = true; - } - } - } catch (RuntimeException e) { - logger.error("Parsing metric data is abnormal! The exception message is:" + e.getMessage()); - } - return protocolData; - } -} diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java index cc8b32c..eeabd72 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java @@ -55,6 +55,7 @@ public class MetricUtil { Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); + // String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); // return new Fields(sessions, // inBytes, outBytes, inPkts, outPkts, @@ -80,21 +81,22 @@ public class MetricUtil { /** * Long类型的数据求和 * - * @param value1 第一个值 - * @param value2 第二个值 - * @return value1 + value2 + * @param cacheData 缓存中的值 + * @param newData 新来数据的值 + * @return cacheData + newData */ - private static Long longSum(Long value1, Long value2) { + private static Long longSum(Long cacheData, Long newData) { + Long result; try { - if (value1 >= 0 && value2 >= 0) { - result = value1 + value2; + if (cacheData >= 0 && newData >= 0) { + result = cacheData + newData; } else { - result = value1; + result = cacheData; } } catch (RuntimeException e) { - logger.error("Abnormal sending of traffic indicator statistics! The message is:" + e.getMessage()); - result = value1; + logger.error("Abnormal sending of traffic indicator statistics! The message is:{}" , e); + result = cacheData; } return result; diff --git a/src/test/java/com/zdjizhi/FlagsTest.java b/src/test/java/com/zdjizhi/FlagsTest.java index ce59840..8449f52 100644 --- a/src/test/java/com/zdjizhi/FlagsTest.java +++ b/src/test/java/com/zdjizhi/FlagsTest.java @@ -32,29 +32,25 @@ public class FlagsTest { @Test public void bitwiseAND() { - Long common_flags = 8200L; + Long flags = 24712L; Long clientIsLocal = 8L; Long serverIsLocal = 16L; - System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); - System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); + System.out.println("flags & clientIsLocal = " + (flags & clientIsLocal)); + System.out.println("flags & serverIsLocal = " + (flags & serverIsLocal)+"\n\n"); - common_flags = 16400L; + flags = 16400L; - System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); - System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); + System.out.println("flags & clientIsLocal = " + (flags & clientIsLocal)); + System.out.println("flags & serverIsLocal = " + (flags & serverIsLocal)+"\n\n"); - common_flags = 1062135466L; + flags = 24712L; - System.out.println("common_flags & clientIsLocal = " + (common_flags & 128)); - System.out.println("common_flags & serverIsLocal = " + (common_flags & 256)+"\n\n"); + System.out.println("flags & c2s = " + (flags & 8192)); + System.out.println("flags & s2c = " + (flags & 16384)); + System.out.println("flags & Bidirectional = " + (flags & 32768)); - if ((0L & clientIsLocal) == 0L){ - System.out.println("yes"); - }else { - System.out.println("no"); - } } }