diff --git a/pom.xml b/pom.xml
index f3e7a70..e76331f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 2.0.0
+ 2.0.1
app-protocol-stat-traffic-merge
http://www.example.com
@@ -206,12 +206,6 @@
test
-
- org.apache.datasketches
- datasketches-java
- ${datasketches.version}
-
-
com.alibaba
fastjson
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
index 8929f6c..8cf604a 100644
--- a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -70,4 +70,10 @@ public class MergeConfigs {
.defaultValue("application_protocol_stat")
.withDescription("The data identification.");
+
+ public static final ConfigOption JOB_NAME =
+ ConfigOptions.key("job.name")
+ .stringType()
+ .defaultValue("agg_app_protocol_traffic")
+ .withDescription("The flink job name.");
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/pojo/Fields.java b/src/main/java/com/zdjizhi/common/pojo/Fields.java
index 40b3995..0e32a56 100644
--- a/src/main/java/com/zdjizhi/common/pojo/Fields.java
+++ b/src/main/java/com/zdjizhi/common/pojo/Fields.java
@@ -26,9 +26,8 @@ public class Fields {
private long s2c_tcp_retransmitted_pkts;
private long c2s_tcp_retransmitted_bytes;
private long s2c_tcp_retransmitted_bytes;
- private String client_ip_sketch;
- public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes, String client_ip_sketch) {
+ public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes) {
this.sessions = sessions;
this.in_bytes = in_bytes;
this.out_bytes = out_bytes;
@@ -48,7 +47,6 @@ public class Fields {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
- this.client_ip_sketch = client_ip_sketch;
}
public long getSessions() {
@@ -203,12 +201,4 @@ public class Fields {
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
}
- public String getClient_ip_sketch() {
- return client_ip_sketch;
- }
-
- public void setClient_ip_sketch(String client_ip_sketch) {
- this.client_ip_sketch = client_ip_sketch;
- }
-
}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 116c45a..d1a60c9 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -85,7 +85,7 @@ public class ApplicationProtocolTopology {
config.get(SINK_KAFKA_TOPIC),
config.get(LOG_FAILURES_ONLY)));
- environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE");
+ environment.execute(config.get(JOB_NAME));
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :");
e.printStackTrace();
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
index eeabd72..dc3c0c6 100644
--- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
+++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
@@ -3,11 +3,6 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.Fields;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.datasketches.hll.HllSketch;
-import org.apache.datasketches.hll.Union;
-
-import java.util.Base64;
/**
@@ -27,45 +22,33 @@ public class MetricUtil {
*/
public static Fields statisticsMetrics(Fields cacheData, Fields newData) {
- Long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions());
+ long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions());
- Long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes());
- Long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes());
- Long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts());
- Long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts());
+ long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes());
+ long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes());
+ long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts());
+ long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts());
- Long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes());
- Long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes());
- Long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts());
- Long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts());
+ long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes());
+ long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes());
+ long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts());
+ long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts());
- Long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments());
- Long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments());
+ long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments());
+ long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments());
- Long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes());
- Long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes());
+ long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes());
+ long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes());
- Long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts());
- Long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts());
+ long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts());
+ long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts());
- Long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts());
- Long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts());
+ long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts());
+ long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts());
- Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
- Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
-
-
-// String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
-// return new Fields(sessions,
-// inBytes, outBytes, inPkts, outPkts,
-// c2sPkts, s2cPkts, c2sBytes, s2cBytes,
-// c2sFragments, s2cFragments,
-// c2sTcpLostBytes, s2cTcpLostBytes,
-// c2sTcpooorderPkts, s2cTcpooorderPkts,
-// c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
-// c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
-// clientIpSketch);
+ long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
+ long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
return new Fields(sessions,
inBytes, outBytes, inPkts, outPkts,
@@ -74,20 +57,19 @@ public class MetricUtil {
c2sTcpLostBytes, s2cTcpLostBytes,
c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- null);
+ c2sTcpretransmittedBytes, s2cTcpretransmittedBytes);
}
/**
* Long类型的数据求和
*
* @param cacheData 缓存中的值
- * @param newData 新来数据的值
+ * @param newData 新来数据的值
* @return cacheData + newData
*/
- private static Long longSum(Long cacheData, Long newData) {
+ private static long longSum(long cacheData, long newData) {
- Long result;
+ long result;
try {
if (cacheData >= 0 && newData >= 0) {
result = cacheData + newData;
@@ -95,38 +77,11 @@ public class MetricUtil {
result = cacheData;
}
} catch (RuntimeException e) {
- logger.error("Abnormal sending of traffic indicator statistics! The message is:{}" , e);
+ logger.error("Abnormal sending of traffic indicator statistics! The message is:{}", e);
result = cacheData;
}
return result;
}
- /**
- * @param cacheHll 缓存的sketch
- * @param newHll 聚合后的sketch
- * @return 合并后的sketch
- */
- private static String hllSketchUnion(String cacheHll, String newHll) {
- Union union = new Union(12);
- try {
- if (StringUtil.isNotBlank(cacheHll)) {
- byte[] cacheHllBytes = Base64.getDecoder().decode(cacheHll);
- HllSketch cacheSketch = HllSketch.heapify(cacheHllBytes);
- union.update(cacheSketch);
- }
-
- if (StringUtil.isNotBlank(newHll)) {
- byte[] newHllBytes = Base64.getDecoder().decode(newHll);
- HllSketch newSketch = HllSketch.heapify(newHllBytes);
- union.update(newSketch);
- }
- return Base64.getEncoder().encodeToString(union.getResult().toUpdatableByteArray());
-
- } catch (RuntimeException e) {
- logger.error("Merge hllSketch results abnormal! The message is:" + e.getMessage());
- return null;
- }
- }
-
}
diff --git a/src/test/java/com/zdjizhi/DatasketchesTest.java b/src/test/java/com/zdjizhi/DatasketchesTest.java
deleted file mode 100644
index 09cb291..0000000
--- a/src/test/java/com/zdjizhi/DatasketchesTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package com.zdjizhi;
-
-import cn.hutool.json.JSONUtil;
-import com.alibaba.fastjson2.*;
-import com.zdjizhi.utils.JsonMapper;
-import org.apache.datasketches.hll.HllSketch;
-import org.apache.datasketches.hll.TgtHllType;
-import org.apache.datasketches.hll.Union;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.Test;
-
-import java.lang.instrument.Instrumentation;
-import java.util.*;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2023/3/217:17
- */
-public class DatasketchesTest {
-
- @Test
- public void HllSketchTest() {
- HashSet strings = new HashSet<>();
-
- HllSketch sketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = "192.168.1." + i;
- sketch.update(ip);
- strings.add(ip);
- }
-
- System.out.println(sketch.getEstimate() + "--" + strings.size());
-
- HashSet randomStrings = new HashSet<>();
-
- HllSketch randomSketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = makeIPv4Random();
- randomSketch.update(ip);
- randomStrings.add(ip);
- }
-
- System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size());
- }
-
- @Test
- public void HllSketchUnionTest() {
- HashSet strings = new HashSet<>();
-
- HllSketch sketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = "192.168.1." + i;
- sketch.update(ip);
- strings.add(ip);
- }
-
- HllSketch sketch2 = new HllSketch(12);
-
- for (int i = 0; i < 10; i++) {
- String ip = "192.168.2." + i;
- sketch2.update(ip);
- strings.add(ip);
- }
-
- Union union = new Union(12);
-
- union.update(sketch);
- union.update(sketch2);
- HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray());
-
- System.out.println(sketch.getEstimate() + "--" + strings.size());
- System.out.println(sketch2.getEstimate() + "--" + strings.size());
- System.out.println(sketch_result.getEstimate() + "--" + strings.size());
- }
-
- @Test
- public void HllSketchDruidTest() {
- HashMap dataMap = new HashMap<>();
-
- HashSet strings = new HashSet<>();
-
- HllSketch sketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = "192.168.1." + i;
- sketch.update(ip);
- strings.add(ip);
- }
-
- HllSketch sketch2 = new HllSketch(12);
-
- for (int i = 0; i < 10; i++) {
- String ip = "192.168.2." + i;
- sketch2.update(ip);
- strings.add(ip);
- }
-
- Union union = new Union(12);
-
- union.update(sketch);
- union.update(sketch2);
- HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray());
-
- HllSketch sketch3 = new HllSketch(12);
-
- for (int i = 0; i < 10; i++) {
- String ip = "192.168.3." + i;
- sketch3.update(ip);
- strings.add(ip);
- }
-
- Union union2 = new Union(12);
-
- union2.update(sketch_result1);
- union2.update(sketch3);
- HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray());
-
- System.out.println(sketch.getEstimate() + "--" + strings.size());
- System.out.println(sketch2.getEstimate() + "--" + strings.size());
- System.out.println(sketch3.getEstimate() + "--" + strings.size());
- System.out.println(sketch_result1.getEstimate() + "--" + strings.size());
- System.out.println(sketch_result2.getEstimate() + "--" + strings.size());
-
- Result result = new Result();
- result.setC2s_pkt_num(10);
- result.setS2c_pkt_num(10);
- result.setC2s_byte_num(10);
- result.setS2c_byte_num(10);
- result.setStat_time(1679970031);
- result.setSchema_type("HLLSketchMergeTest");
-
- //CompactByte
- result.setIp_object(sketch_result2.toCompactByteArray());
-// System.out.println(result.toString());
- //sendMessage(JsonMapper.toJsonString(result);
-
-
- //UpdatableByte
- result.setIp_object(sketch_result2.toUpdatableByteArray());
-// System.out.println(result.toString());
- //sendMessage(JsonMapper.toJsonString(result);
-
- //Hashmap
- dataMap.put("app_name", "TEST");
- dataMap.put("protocol_stack_id", "HTTP");
- dataMap.put("vsys_id", 1);
- dataMap.put("stat_time", 1681370100);
- dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
-
- System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
- System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
- System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
-
-
- dataMap.put("client_ip_sketch", Base64.getEncoder().encode(sketch_result2.toUpdatableByteArray()));
- System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
- System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
- System.out.println(JSONUtil.toJsonStr(dataMap));
-
-// sendMessage(JSONObject.toJSONString(dataMap));
- }
-
- @Test
- public void HllSketchStorageTest() {
- TgtHllType hllType = TgtHllType.HLL_4;
-// TgtHllType hllType = TgtHllType.HLL_6;
-// TgtHllType hllType = TgtHllType.HLL_8;
-
- HllSketch sketch4 = new HllSketch(4,hllType);
- HllSketch sketch8 = new HllSketch(8,hllType);
- HllSketch sketch12 = new HllSketch(12,hllType);
- HllSketch sketch16 = new HllSketch(16,hllType);
- HllSketch sketch21 = new HllSketch(21,hllType);
-
- HashSet IPSet = new HashSet<>();
-
- for (int i = 0; i < 500000; i++) {
- String ip = makeIPv4Random();
- IPSet.add(ip);
- sketch4.update(ip);
- sketch8.update(ip);
- sketch12.update(ip);
- sketch16.update(ip);
- sketch21.update(ip);
- }
- System.out.println(IPSet.size());
- System.out.println(sketch4.toString());
- System.out.println(sketch8.toString());
- System.out.println(sketch12.toString());
- System.out.println(sketch16.toString());
- System.out.println(sketch21.toString());
-
- }
-
-
- //随机生成ip
- private static String makeIPv4Random() {
- int v4_1 = new Random().nextInt(255) + 1;
- int v4_2 = new Random().nextInt(100);
- int v4_3 = new Random().nextInt(100);
- int v4_4 = new Random().nextInt(255);
- return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
- }
-
- private static void sendMessage(Object message) {
- Properties props = new Properties();
- //kafka地址
- props.put("bootstrap.servers", "192.168.44.12:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 67108864);
-// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- KafkaProducer kafkaProducer = new KafkaProducer(props);
-
- kafkaProducer.send(new ProducerRecord("TRAFFIC-PROTOCOL-TEST", message));
-
- kafkaProducer.close();
- }
-}
-
-class Result {
-
- private String schema_type;
- private long c2s_byte_num;
- private long c2s_pkt_num;
- private long s2c_byte_num;
- private long s2c_pkt_num;
- private long stat_time;
- private byte[] ip_object;
-
- public void setSchema_type(String schema_type) {
- this.schema_type = schema_type;
- }
-
- public void setC2s_byte_num(long c2s_byte_num) {
- this.c2s_byte_num = c2s_byte_num;
- }
-
- public void setC2s_pkt_num(long c2s_pkt_num) {
- this.c2s_pkt_num = c2s_pkt_num;
- }
-
- public void setS2c_byte_num(long s2c_byte_num) {
- this.s2c_byte_num = s2c_byte_num;
- }
-
- public void setS2c_pkt_num(long s2c_pkt_num) {
- this.s2c_pkt_num = s2c_pkt_num;
- }
-
- public void setStat_time(long stat_time) {
- this.stat_time = stat_time;
- }
-
- public void setIp_object(byte[] ip_object) {
- this.ip_object = ip_object;
- }
-
- @Override
- public String toString() {
- return "Result{" +
- "schema_type='" + schema_type + '\'' +
- ", c2s_byte_num=" + c2s_byte_num +
- ", c2s_pkt_num=" + c2s_pkt_num +
- ", s2c_byte_num=" + s2c_byte_num +
- ", s2c_pkt_num=" + s2c_pkt_num +
- ", stat_time=" + stat_time +
- ", ip_object=" + Arrays.toString(ip_object) +
- '}';
- }
-}
\ No newline at end of file