diff --git a/pom.xml b/pom.xml index 8ca46cd..b8e161b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi app-protocol-stat-traffic-merge - 230516 + 230710-Time app-protocol-stat-traffic-merge http://www.example.com diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index e588696..c979817 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -12,10 +12,10 @@ tools.library=D:\\workerspace\\dat #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=APP-PROTOCOL-STAT-TRAFFIC-AGENT +source.kafka.topic=etl-test #补全数据 输出 topic -sink.kafka.topic=APP-PROTOCOL-STAT-TRAFFIC-MERGE-LOCAL +sink.kafka.topic=etl-test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=livecharts-test-20230423-1 @@ -36,3 +36,6 @@ sink.parallelism=1 #初次随机预聚合窗口时间 count.window.time=15 +#数据源 firewall or agent +metrics.data.source=firewall + diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java index 8795a7e..84008df 100644 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java +++ b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java @@ -30,6 +30,7 @@ public class GlobalConfig { public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library"); public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism"); + public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source"); /** * Kafka common diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java index 47b668c..22f2f51 100644 --- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java +++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java @@ -5,6 +5,7 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONWriter; import com.zdjizhi.common.config.GlobalConfig; +import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.StringUtil; @@ -51,8 +52,6 @@ public class ResultFlatMap implements FlatMapFunction { } private static String getResultJson(Metrics metrics) { - return JSONObject.toJSONString(metrics - , JSONWriter.Feature.WriteNullStringAsEmpty - , JSONWriter.Feature.WriteNullNumberAsZero); + return JSONObject.toJSONString(metrics); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java index b0d50fb..e3179a7 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java @@ -23,7 +23,7 @@ public class MergeCountWindow extends ProcessWindowFunction @Override public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception { try { - Long endTime = context.window().getEnd() / 1000; + Long endTime = context.window().getStart() / 1000; for (Tuple2 tuple : input) { Tags tags = tuple.f0; Fields fields = tuple.f1; diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java index 57a388f..6a48bcf 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java @@ -2,6 +2,7 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.utils.StringUtil; import org.apache.datasketches.hll.HllSketch; @@ -18,6 +19,7 @@ import java.util.Base64; */ public class MetricUtil { private static final Log logger = LogFactory.get(); + private static final String METRICS_DEFAULT_TYPE = "agent"; /** @@ -56,17 +58,28 @@ public class MetricUtil { Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes()); Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes()); - String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); - - return new Fields(sessions, - inBytes, outBytes, inPkts, outPkts, - c2sPkts, s2cPkts, c2sBytes, s2cBytes, - c2sFragments, s2cFragments, - c2sTcpLostBytes, s2cTcpLostBytes, - c2sTcpooorderPkts, s2cTcpooorderPkts, - c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, - c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, - clientIpSketch); + if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) { + String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch()); + return new Fields(sessions, + inBytes, outBytes, inPkts, outPkts, + c2sPkts, s2cPkts, c2sBytes, s2cBytes, + c2sFragments, s2cFragments, + c2sTcpLostBytes, s2cTcpLostBytes, + c2sTcpooorderPkts, s2cTcpooorderPkts, + c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, + c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, + clientIpSketch); + } else { + return new Fields(sessions, + inBytes, outBytes, inPkts, outPkts, + c2sPkts, s2cPkts, c2sBytes, s2cBytes, + c2sFragments, s2cFragments, + c2sTcpLostBytes, s2cTcpLostBytes, + c2sTcpooorderPkts, s2cTcpooorderPkts, + c2sTcpretransmittedPkts, s2cTcpretransmittedPkts, + c2sTcpretransmittedBytes, s2cTcpretransmittedBytes, + null); + } } /** diff --git a/src/test/java/com/zdjizhi/FastJsonTest.java b/src/test/java/com/zdjizhi/FastJsonTest.java index 7d50fc1..5fd854a 100644 --- a/src/test/java/com/zdjizhi/FastJsonTest.java +++ b/src/test/java/com/zdjizhi/FastJsonTest.java @@ -1,6 +1,8 @@ package com.zdjizhi; import com.alibaba.fastjson2.*; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Tags; import org.junit.Test; /** @@ -33,10 +35,10 @@ public class FastJsonTest { String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}"; JSONObject originalLog = JSON.parseObject(message); + String common_device_tag = originalLog.getString("common_device_tag"); - String value1 = originalLog.getString("common_device_tag"); - System.out.println(JSONPath.eval(value1, "$.tags[?(@.tag=='data_center')][0].value")); - System.out.println(JSONPath.of("$.tags[?(@.tag=='data_center')].value").extract(JSONReader.of(value1))); + System.out.println(JSONPath.eval(common_device_tag, "$.tags[?(@.tag=='data_center')][0].value")); + System.out.println(JSONPath.of("$.tags[?(@.tag=='data_center')].value").extract(JSONReader.of(common_device_tag))); JSONArray jsonArray = originalLog.getJSONObject("common_device_tag").getJSONArray("tags"); for (Object array : jsonArray) { @@ -46,6 +48,10 @@ public class FastJsonTest { } } + JSONPath jsonPath = JSONPath.of("$.tags[?(@.tag=='data_center')][0].value"); + JSONReader jsonReader = JSONReader.of(common_device_tag); + + System.out.println("a"); } @Test @@ -70,4 +76,16 @@ public class FastJsonTest { System.out.println(appProtocol.getProtocol_stack_id()); } + + + @Test + public void errorJsonTest(){ + String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}"; + JSONObject originalLog = JSON.parseObject(message); + Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class); + Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class); + + System.out.println(fields.toString()); + System.out.println(tags.toString()); + } } diff --git a/src/test/java/com/zdjizhi/FlagsTest.java b/src/test/java/com/zdjizhi/FlagsTest.java index 8310ee2..15ee4aa 100644 --- a/src/test/java/com/zdjizhi/FlagsTest.java +++ b/src/test/java/com/zdjizhi/FlagsTest.java @@ -36,6 +36,7 @@ public class FlagsTest { Long clientIsLocal = 8L; Long serverIsLocal = 16L; + System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); @@ -44,5 +45,13 @@ public class FlagsTest { System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)); + + + if ((0L & clientIsLocal) == 0L){ + System.out.println("yes"); + }else { + System.out.println("no"); + } } + }