修改统计时间为窗口开始时间。(GAL-371)

This commit is contained in:
qidaijie
2023-07-10 09:54:43 +08:00
parent 5445972400
commit 345b7fd601
8 changed files with 64 additions and 21 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId> <artifactId>app-protocol-stat-traffic-merge</artifactId>
<version>230516</version> <version>230710-Time</version>
<name>app-protocol-stat-traffic-merge</name> <name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>

View File

@@ -12,10 +12,10 @@ tools.library=D:\\workerspace\\dat
#--------------------------------Kafka消费组信息------------------------------# #--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic #kafka 接收数据topic
source.kafka.topic=APP-PROTOCOL-STAT-TRAFFIC-AGENT source.kafka.topic=etl-test
#补全数据 输出 topic #补全数据 输出 topic
sink.kafka.topic=APP-PROTOCOL-STAT-TRAFFIC-MERGE-LOCAL sink.kafka.topic=etl-test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据 #读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=livecharts-test-20230423-1 group.id=livecharts-test-20230423-1
@@ -36,3 +36,6 @@ sink.parallelism=1
#初次随机预聚合窗口时间 #初次随机预聚合窗口时间
count.window.time=15 count.window.time=15
#数据源 firewall or agent
metrics.data.source=firewall

View File

@@ -30,6 +30,7 @@ public class GlobalConfig {
public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library"); public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism"); public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source");
/** /**
* Kafka common * Kafka common

View File

@@ -5,6 +5,7 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter; import com.alibaba.fastjson2.JSONWriter;
import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
@@ -51,8 +52,6 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
} }
private static String getResultJson(Metrics metrics) { private static String getResultJson(Metrics metrics) {
return JSONObject.toJSONString(metrics return JSONObject.toJSONString(metrics);
, JSONWriter.Feature.WriteNullStringAsEmpty
, JSONWriter.Feature.WriteNullNumberAsZero);
} }
} }

View File

@@ -23,7 +23,7 @@ public class MergeCountWindow extends ProcessWindowFunction<Tuple2<Tags, Fields>
@Override @Override
public void process(String windowKey, Context context, Iterable<Tuple2<Tags, Fields>> input, Collector<Metrics> output) throws Exception { public void process(String windowKey, Context context, Iterable<Tuple2<Tags, Fields>> input, Collector<Metrics> output) throws Exception {
try { try {
Long endTime = context.window().getEnd() / 1000; Long endTime = context.window().getStart() / 1000;
for (Tuple2<Tags, Fields> tuple : input) { for (Tuple2<Tags, Fields> tuple : input) {
Tags tags = tuple.f0; Tags tags = tuple.f0;
Fields fields = tuple.f1; Fields fields = tuple.f1;

View File

@@ -2,6 +2,7 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.StringUtil;
import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.HllSketch;
@@ -18,6 +19,7 @@ import java.util.Base64;
*/ */
public class MetricUtil { public class MetricUtil {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private static final String METRICS_DEFAULT_TYPE = "agent";
/** /**
@@ -56,17 +58,28 @@ public class MetricUtil {
Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) {
String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
return new Fields(sessions, return new Fields(sessions,
inBytes, outBytes, inPkts, outPkts, inBytes, outBytes, inPkts, outPkts,
c2sPkts, s2cPkts, c2sBytes, s2cBytes, c2sPkts, s2cPkts, c2sBytes, s2cBytes,
c2sFragments, s2cFragments, c2sFragments, s2cFragments,
c2sTcpLostBytes, s2cTcpLostBytes, c2sTcpLostBytes, s2cTcpLostBytes,
c2sTcpooorderPkts, s2cTcpooorderPkts, c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
clientIpSketch); clientIpSketch);
} else {
return new Fields(sessions,
inBytes, outBytes, inPkts, outPkts,
c2sPkts, s2cPkts, c2sBytes, s2cBytes,
c2sFragments, s2cFragments,
c2sTcpLostBytes, s2cTcpLostBytes,
c2sTcpooorderPkts, s2cTcpooorderPkts,
c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
null);
}
} }
/** /**

View File

@@ -1,6 +1,8 @@
package com.zdjizhi; package com.zdjizhi;
import com.alibaba.fastjson2.*; import com.alibaba.fastjson2.*;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
import org.junit.Test; import org.junit.Test;
/** /**
@@ -33,10 +35,10 @@ public class FastJsonTest {
String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}"; String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}";
JSONObject originalLog = JSON.parseObject(message); JSONObject originalLog = JSON.parseObject(message);
String common_device_tag = originalLog.getString("common_device_tag");
String value1 = originalLog.getString("common_device_tag"); System.out.println(JSONPath.eval(common_device_tag, "$.tags[?(@.tag=='data_center')][0].value"));
System.out.println(JSONPath.eval(value1, "$.tags[?(@.tag=='data_center')][0].value")); System.out.println(JSONPath.of("$.tags[?(@.tag=='data_center')].value").extract(JSONReader.of(common_device_tag)));
System.out.println(JSONPath.of("$.tags[?(@.tag=='data_center')].value").extract(JSONReader.of(value1)));
JSONArray jsonArray = originalLog.getJSONObject("common_device_tag").getJSONArray("tags"); JSONArray jsonArray = originalLog.getJSONObject("common_device_tag").getJSONArray("tags");
for (Object array : jsonArray) { for (Object array : jsonArray) {
@@ -46,6 +48,10 @@ public class FastJsonTest {
} }
} }
JSONPath jsonPath = JSONPath.of("$.tags[?(@.tag=='data_center')][0].value");
JSONReader jsonReader = JSONReader.of(common_device_tag);
System.out.println("a");
} }
@Test @Test
@@ -70,4 +76,16 @@ public class FastJsonTest {
System.out.println(appProtocol.getProtocol_stack_id()); System.out.println(appProtocol.getProtocol_stack_id());
} }
@Test
public void errorJsonTest(){
String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}";
JSONObject originalLog = JSON.parseObject(message);
Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
System.out.println(fields.toString());
System.out.println(tags.toString());
}
} }

View File

@@ -36,6 +36,7 @@ public class FlagsTest {
Long clientIsLocal = 8L; Long clientIsLocal = 8L;
Long serverIsLocal = 16L; Long serverIsLocal = 16L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n");
@@ -44,5 +45,13 @@ public class FlagsTest {
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)); System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal));
if ((0L & clientIsLocal) == 0L){
System.out.println("yes");
}else {
System.out.println("no");
}
} }
} }