Merge branch 'feature/2.3' into 'develop'
Feature/2.3 See merge request galaxy/tsg_olap/app-protocol-stat-traffic-merge!10
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>app-protocol-stat-traffic-merge</artifactId>
|
<artifactId>app-protocol-stat-traffic-merge</artifactId>
|
||||||
<version>2.2.2</version>
|
<version>2.3.0</version>
|
||||||
|
|
||||||
<name>app-protocol-stat-traffic-merge</name>
|
<name>app-protocol-stat-traffic-merge</name>
|
||||||
<url>http://www.example.com</url>
|
<url>http://www.example.com</url>
|
||||||
|
|||||||
251
src/main/java/com/zdjizhi/common/pojo/Data.java
Normal file
251
src/main/java/com/zdjizhi/common/pojo/Data.java
Normal file
@@ -0,0 +1,251 @@
|
|||||||
|
package com.zdjizhi.common.pojo;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class Data implements Serializable {
|
||||||
|
public long timestamp_ms;
|
||||||
|
public String name;
|
||||||
|
|
||||||
|
public int vsys_id;
|
||||||
|
public String device_id;
|
||||||
|
public String device_group;
|
||||||
|
public String data_center;
|
||||||
|
public String decoded_path;
|
||||||
|
public String app;
|
||||||
|
|
||||||
|
public long sessions;
|
||||||
|
public long in_bytes;
|
||||||
|
public long out_bytes;
|
||||||
|
public long in_pkts;
|
||||||
|
public long out_pkts;
|
||||||
|
public long c2s_pkts;
|
||||||
|
public long s2c_pkts;
|
||||||
|
public long c2s_bytes;
|
||||||
|
public long s2c_bytes;
|
||||||
|
public long c2s_fragments;
|
||||||
|
public long s2c_fragments;
|
||||||
|
public long c2s_tcp_lost_bytes;
|
||||||
|
public long s2c_tcp_lost_bytes;
|
||||||
|
public long c2s_tcp_ooorder_pkts;
|
||||||
|
public long s2c_tcp_ooorder_pkts;
|
||||||
|
public long c2s_tcp_retransmitted_pkts;
|
||||||
|
public long s2c_tcp_retransmitted_pkts;
|
||||||
|
public long c2s_tcp_retransmitted_bytes;
|
||||||
|
public long s2c_tcp_retransmitted_bytes;
|
||||||
|
|
||||||
|
public long getTimestamp_ms() {
|
||||||
|
return timestamp_ms;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimestamp_ms(long timestamp_ms) {
|
||||||
|
this.timestamp_ms = timestamp_ms;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getVsys_id() {
|
||||||
|
return vsys_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVsys_id(int vsys_id) {
|
||||||
|
this.vsys_id = vsys_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDevice_id() {
|
||||||
|
return device_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDevice_id(String device_id) {
|
||||||
|
this.device_id = device_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDevice_group() {
|
||||||
|
return device_group;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDevice_group(String device_group) {
|
||||||
|
this.device_group = device_group;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getData_center() {
|
||||||
|
return data_center;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setData_center(String data_center) {
|
||||||
|
this.data_center = data_center;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDecoded_path() {
|
||||||
|
return decoded_path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDecoded_path(String decoded_path) {
|
||||||
|
this.decoded_path = decoded_path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getApp() {
|
||||||
|
return app;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setApp(String app) {
|
||||||
|
this.app = app;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSessions() {
|
||||||
|
return sessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSessions(long sessions) {
|
||||||
|
this.sessions = sessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIn_bytes() {
|
||||||
|
return in_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIn_bytes(long in_bytes) {
|
||||||
|
this.in_bytes = in_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOut_bytes() {
|
||||||
|
return out_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOut_bytes(long out_bytes) {
|
||||||
|
this.out_bytes = out_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIn_pkts() {
|
||||||
|
return in_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIn_pkts(long in_pkts) {
|
||||||
|
this.in_pkts = in_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOut_pkts() {
|
||||||
|
return out_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOut_pkts(long out_pkts) {
|
||||||
|
this.out_pkts = out_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_pkts() {
|
||||||
|
return c2s_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_pkts(long c2s_pkts) {
|
||||||
|
this.c2s_pkts = c2s_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_pkts() {
|
||||||
|
return s2c_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_pkts(long s2c_pkts) {
|
||||||
|
this.s2c_pkts = s2c_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_bytes() {
|
||||||
|
return c2s_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_bytes(long c2s_bytes) {
|
||||||
|
this.c2s_bytes = c2s_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_bytes() {
|
||||||
|
return s2c_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_bytes(long s2c_bytes) {
|
||||||
|
this.s2c_bytes = s2c_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_fragments() {
|
||||||
|
return c2s_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_fragments(long c2s_fragments) {
|
||||||
|
this.c2s_fragments = c2s_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_fragments() {
|
||||||
|
return s2c_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_fragments(long s2c_fragments) {
|
||||||
|
this.s2c_fragments = s2c_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_lost_bytes() {
|
||||||
|
return c2s_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
|
||||||
|
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_lost_bytes() {
|
||||||
|
return s2c_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
|
||||||
|
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_ooorder_pkts() {
|
||||||
|
return c2s_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
|
||||||
|
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_ooorder_pkts() {
|
||||||
|
return s2c_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
|
||||||
|
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_retransmitted_pkts() {
|
||||||
|
return c2s_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
|
||||||
|
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_retransmitted_pkts() {
|
||||||
|
return s2c_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
|
||||||
|
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_retransmitted_bytes() {
|
||||||
|
return c2s_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
|
||||||
|
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_retransmitted_bytes() {
|
||||||
|
return s2c_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
|
||||||
|
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
251
src/main/java/com/zdjizhi/common/pojo/ResultData.java
Normal file
251
src/main/java/com/zdjizhi/common/pojo/ResultData.java
Normal file
@@ -0,0 +1,251 @@
|
|||||||
|
package com.zdjizhi.common.pojo;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class ResultData implements Serializable {
|
||||||
|
public long timestamp_ms;
|
||||||
|
public String name;
|
||||||
|
|
||||||
|
public int vsys_id;
|
||||||
|
public String device_id;
|
||||||
|
public String device_group;
|
||||||
|
public String data_center;
|
||||||
|
public String protocol_stack_id;
|
||||||
|
public String app_name;
|
||||||
|
|
||||||
|
public long sessions;
|
||||||
|
public long in_bytes;
|
||||||
|
public long out_bytes;
|
||||||
|
public long in_pkts;
|
||||||
|
public long out_pkts;
|
||||||
|
public long c2s_pkts;
|
||||||
|
public long s2c_pkts;
|
||||||
|
public long c2s_bytes;
|
||||||
|
public long s2c_bytes;
|
||||||
|
public long c2s_fragments;
|
||||||
|
public long s2c_fragments;
|
||||||
|
public long c2s_tcp_lost_bytes;
|
||||||
|
public long s2c_tcp_lost_bytes;
|
||||||
|
public long c2s_tcp_ooorder_pkts;
|
||||||
|
public long s2c_tcp_ooorder_pkts;
|
||||||
|
public long c2s_tcp_retransmitted_pkts;
|
||||||
|
public long s2c_tcp_retransmitted_pkts;
|
||||||
|
public long c2s_tcp_retransmitted_bytes;
|
||||||
|
public long s2c_tcp_retransmitted_bytes;
|
||||||
|
|
||||||
|
public long getTimestamp_ms() {
|
||||||
|
return timestamp_ms;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimestamp_ms(long timestamp_ms) {
|
||||||
|
this.timestamp_ms = timestamp_ms;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getVsys_id() {
|
||||||
|
return vsys_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVsys_id(int vsys_id) {
|
||||||
|
this.vsys_id = vsys_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDevice_id() {
|
||||||
|
return device_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDevice_id(String device_id) {
|
||||||
|
this.device_id = device_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDevice_group() {
|
||||||
|
return device_group;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDevice_group(String device_group) {
|
||||||
|
this.device_group = device_group;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getData_center() {
|
||||||
|
return data_center;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setData_center(String data_center) {
|
||||||
|
this.data_center = data_center;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProtocol_stack_id() {
|
||||||
|
return protocol_stack_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProtocol_stack_id(String protocol_stack_id) {
|
||||||
|
this.protocol_stack_id = protocol_stack_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getApp_name() {
|
||||||
|
return app_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setApp_name(String app_name) {
|
||||||
|
this.app_name = app_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSessions() {
|
||||||
|
return sessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSessions(long sessions) {
|
||||||
|
this.sessions = sessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIn_bytes() {
|
||||||
|
return in_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIn_bytes(long in_bytes) {
|
||||||
|
this.in_bytes = in_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOut_bytes() {
|
||||||
|
return out_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOut_bytes(long out_bytes) {
|
||||||
|
this.out_bytes = out_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIn_pkts() {
|
||||||
|
return in_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIn_pkts(long in_pkts) {
|
||||||
|
this.in_pkts = in_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOut_pkts() {
|
||||||
|
return out_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOut_pkts(long out_pkts) {
|
||||||
|
this.out_pkts = out_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_pkts() {
|
||||||
|
return c2s_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_pkts(long c2s_pkts) {
|
||||||
|
this.c2s_pkts = c2s_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_pkts() {
|
||||||
|
return s2c_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_pkts(long s2c_pkts) {
|
||||||
|
this.s2c_pkts = s2c_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_bytes() {
|
||||||
|
return c2s_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_bytes(long c2s_bytes) {
|
||||||
|
this.c2s_bytes = c2s_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_bytes() {
|
||||||
|
return s2c_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_bytes(long s2c_bytes) {
|
||||||
|
this.s2c_bytes = s2c_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_fragments() {
|
||||||
|
return c2s_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_fragments(long c2s_fragments) {
|
||||||
|
this.c2s_fragments = c2s_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_fragments() {
|
||||||
|
return s2c_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_fragments(long s2c_fragments) {
|
||||||
|
this.s2c_fragments = s2c_fragments;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_lost_bytes() {
|
||||||
|
return c2s_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
|
||||||
|
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_lost_bytes() {
|
||||||
|
return s2c_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
|
||||||
|
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_ooorder_pkts() {
|
||||||
|
return c2s_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
|
||||||
|
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_ooorder_pkts() {
|
||||||
|
return s2c_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
|
||||||
|
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_retransmitted_pkts() {
|
||||||
|
return c2s_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
|
||||||
|
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_retransmitted_pkts() {
|
||||||
|
return s2c_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
|
||||||
|
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getC2s_tcp_retransmitted_bytes() {
|
||||||
|
return c2s_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
|
||||||
|
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getS2c_tcp_retransmitted_bytes() {
|
||||||
|
return s2c_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
|
||||||
|
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,95 @@
|
|||||||
|
package com.zdjizhi.topology;
|
||||||
|
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.zdjizhi.common.config.MergeConfigs;
|
||||||
|
import com.zdjizhi.common.config.MergeConfiguration;
|
||||||
|
import com.zdjizhi.common.pojo.Fields;
|
||||||
|
import com.zdjizhi.common.pojo.Metrics;
|
||||||
|
import com.zdjizhi.common.pojo.Tags;
|
||||||
|
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
|
||||||
|
import com.zdjizhi.utils.functions.map.ResultFlatMap;
|
||||||
|
import com.zdjizhi.utils.functions.process.ParsingData;
|
||||||
|
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
|
||||||
|
import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
|
||||||
|
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||||
|
import com.zdjizhi.utils.kafka.KafkaProducer;
|
||||||
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
import org.apache.flink.api.java.utils.ParameterTool;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||||
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
import static com.zdjizhi.common.config.MergeConfigs.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
* @Package com.zdjizhi.topology
|
||||||
|
* @Description:
|
||||||
|
* @date 2021/5/2016:42
|
||||||
|
*/
|
||||||
|
public class ApplicationProtocolLegacyTopology {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
// param check
|
||||||
|
if (args.length < 1) {
|
||||||
|
throw new IllegalArgumentException("Error: Not found properties path. " +
|
||||||
|
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
|
||||||
|
ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
||||||
|
final Configuration config = tool.getConfiguration();
|
||||||
|
environment.getConfig().setGlobalJobParameters(config);
|
||||||
|
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
|
||||||
|
|
||||||
|
//水印
|
||||||
|
WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
|
||||||
|
.<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
|
||||||
|
.withTimestampAssigner((element, timestamp) -> element.f2);
|
||||||
|
|
||||||
|
//数据源
|
||||||
|
DataStream<String> streamSource = environment.addSource(
|
||||||
|
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
|
||||||
|
config.get(SOURCE_KAFKA_TOPIC),
|
||||||
|
config.get(STARTUP_MODE)));
|
||||||
|
|
||||||
|
//解析数据
|
||||||
|
SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
|
||||||
|
.assignTimestampsAndWatermarks(strategyForSession)
|
||||||
|
.name("ParseDataProcess");
|
||||||
|
|
||||||
|
//增量聚合窗口
|
||||||
|
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
|
||||||
|
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
|
||||||
|
.reduce(new DispersionCountWindow(), new MergeCountWindow())
|
||||||
|
.name("DispersionCountWindow");
|
||||||
|
|
||||||
|
//拆分数据
|
||||||
|
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
|
||||||
|
.name("ResultFlatMap");
|
||||||
|
|
||||||
|
//输出
|
||||||
|
resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
|
||||||
|
config.get(SINK_KAFKA_TOPIC),
|
||||||
|
config.get(LOG_FAILURES_ONLY)));
|
||||||
|
|
||||||
|
environment.execute(config.get(JOB_NAME));
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("This Flink task start ERROR! Exception information is :");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,96 +1,267 @@
|
|||||||
package com.zdjizhi.topology;
|
package com.zdjizhi.topology;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import cn.hutool.log.LogFactory;
|
import com.alibaba.fastjson2.JSONPath;
|
||||||
|
import com.alibaba.fastjson2.JSONReader;
|
||||||
import com.zdjizhi.common.config.MergeConfigs;
|
import com.zdjizhi.common.config.MergeConfigs;
|
||||||
import com.zdjizhi.common.config.MergeConfiguration;
|
import com.zdjizhi.common.config.MergeConfiguration;
|
||||||
import com.zdjizhi.common.pojo.Fields;
|
import com.zdjizhi.common.pojo.*;
|
||||||
import com.zdjizhi.common.pojo.Metrics;
|
|
||||||
import com.zdjizhi.common.pojo.Tags;
|
|
||||||
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
|
|
||||||
import com.zdjizhi.utils.functions.map.ResultFlatMap;
|
|
||||||
import com.zdjizhi.utils.functions.process.ParsingData;
|
|
||||||
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
|
|
||||||
import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
|
|
||||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||||
import com.zdjizhi.utils.kafka.KafkaProducer;
|
import com.zdjizhi.utils.kafka.KafkaProducer;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.flink.api.common.eventtime.*;
|
import org.apache.flink.api.common.eventtime.*;
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||||
|
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||||
|
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple6;
|
||||||
import org.apache.flink.api.java.utils.ParameterTool;
|
import org.apache.flink.api.java.utils.ParameterTool;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.apache.flink.util.Preconditions;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import static com.zdjizhi.common.config.MergeConfigs.*;
|
import static com.zdjizhi.common.config.MergeConfigs.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author lifengchao
|
||||||
* @Package com.zdjizhi.topology
|
* @Package com.zdjizhi.topology
|
||||||
* @Description:
|
* @Description:
|
||||||
* @date 2021/5/2016:42
|
* @date 2024/7/23 11:20
|
||||||
*/
|
*/
|
||||||
public class ApplicationProtocolTopology {
|
public class ApplicationProtocolTopology {
|
||||||
private static final Log logger = LogFactory.get();
|
static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopology.class);
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) throws Exception{
|
||||||
try {
|
// param check
|
||||||
|
if (args.length < 1) {
|
||||||
// param check
|
throw new IllegalArgumentException("Error: Not found properties path. " +
|
||||||
if (args.length < 1) {
|
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
||||||
throw new IllegalArgumentException("Error: Not found properties path. " +
|
|
||||||
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
|
||||||
}
|
|
||||||
|
|
||||||
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
||||||
|
|
||||||
ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
|
||||||
final Configuration config = tool.getConfiguration();
|
|
||||||
environment.getConfig().setGlobalJobParameters(config);
|
|
||||||
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
|
|
||||||
|
|
||||||
|
|
||||||
//水印
|
|
||||||
WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
|
|
||||||
.<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
|
|
||||||
.withTimestampAssigner((element, timestamp) -> element.f2);
|
|
||||||
|
|
||||||
//数据源
|
|
||||||
DataStream<String> streamSource = environment.addSource(
|
|
||||||
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
|
|
||||||
config.get(SOURCE_KAFKA_TOPIC),
|
|
||||||
config.get(STARTUP_MODE)));
|
|
||||||
|
|
||||||
//解析数据
|
|
||||||
SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
|
|
||||||
.assignTimestampsAndWatermarks(strategyForSession)
|
|
||||||
.name("ParseDataProcess");
|
|
||||||
|
|
||||||
//增量聚合窗口
|
|
||||||
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
|
|
||||||
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
|
|
||||||
.reduce(new DispersionCountWindow(), new MergeCountWindow())
|
|
||||||
.name("DispersionCountWindow");
|
|
||||||
|
|
||||||
//拆分数据
|
|
||||||
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
|
|
||||||
.name("ResultFlatMap");
|
|
||||||
|
|
||||||
//输出
|
|
||||||
resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
|
|
||||||
config.get(SINK_KAFKA_TOPIC),
|
|
||||||
config.get(LOG_FAILURES_ONLY)));
|
|
||||||
|
|
||||||
environment.execute(config.get(JOB_NAME));
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("This Flink task start ERROR! Exception information is :");
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
environment.setParallelism(1);
|
||||||
|
|
||||||
|
ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
||||||
|
final Configuration config = tool.getConfiguration();
|
||||||
|
environment.getConfig().setGlobalJobParameters(config);
|
||||||
|
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
|
||||||
|
|
||||||
|
//水印
|
||||||
|
WatermarkStrategy<Data> strategyForSession = WatermarkStrategy
|
||||||
|
.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
|
||||||
|
.withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
|
||||||
|
|
||||||
|
//数据源
|
||||||
|
DataStream<String> streamSource = environment.addSource(
|
||||||
|
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
|
||||||
|
config.get(SOURCE_KAFKA_TOPIC),
|
||||||
|
config.get(STARTUP_MODE)));
|
||||||
|
|
||||||
|
//解析数据
|
||||||
|
SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
|
||||||
|
|
||||||
|
//聚合 + 拆分
|
||||||
|
SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
|
||||||
|
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
|
||||||
|
.aggregate(aggregateFunction(), processWindowFunction());
|
||||||
|
|
||||||
|
//输出
|
||||||
|
rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY)));
|
||||||
|
|
||||||
|
environment.execute(config.get(JOB_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){
|
||||||
|
return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() {
|
||||||
|
@Override
|
||||||
|
public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception {
|
||||||
|
return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FlatMapFunction<String, Data> parseFlatMapFunction(){
|
||||||
|
return new RichFlatMapFunction<String, Data>() {
|
||||||
|
private JSONPath namePath;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
namePath = JSONPath.of("$.name");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flatMap(String value, Collector<Data> out) throws Exception {
|
||||||
|
JSONReader parser = JSONReader.of(value);
|
||||||
|
try {
|
||||||
|
Object name = namePath.extract(parser);
|
||||||
|
if(!"traffic_application_protocol_stat".equals(name)){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Data data = JSON.parseObject(value, Data.class);
|
||||||
|
String decodedPath = data.getDecoded_path();
|
||||||
|
if(StringUtils.isBlank(decodedPath)){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String appFullPath = data.getApp();
|
||||||
|
if(StringUtils.isBlank(appFullPath)){
|
||||||
|
out.collect(data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// decoded_path和app进行拼接,格式化
|
||||||
|
String[] appSplits = appFullPath.split("\\.");
|
||||||
|
data.setApp(appSplits[appSplits.length -1]);
|
||||||
|
String firstAppProtocol = appSplits[0];
|
||||||
|
String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
|
||||||
|
if (endProtocol.equals(firstAppProtocol)) {
|
||||||
|
if(appSplits.length > 1){
|
||||||
|
decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
|
||||||
|
data.setDecoded_path(decodedPath);
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
decodedPath = decodedPath + "." + appFullPath;
|
||||||
|
data.setDecoded_path(decodedPath);
|
||||||
|
}
|
||||||
|
out.collect(data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("parse error for value:" + value, e);
|
||||||
|
} finally {
|
||||||
|
parser.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){
|
||||||
|
return new AggregateFunction<Data, ResultData, ResultData>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultData createAccumulator() {
|
||||||
|
return new ResultData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultData add(Data value, ResultData acc) {
|
||||||
|
acc.sessions = acc.sessions + value.sessions;
|
||||||
|
acc.in_bytes = acc.in_bytes + value.in_bytes;
|
||||||
|
acc.out_bytes = acc.out_bytes + value.out_bytes;
|
||||||
|
acc.in_pkts = acc.in_pkts + value.in_pkts;
|
||||||
|
acc.out_pkts = acc.out_pkts + value.out_pkts;
|
||||||
|
acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
|
||||||
|
acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
|
||||||
|
acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
|
||||||
|
acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
|
||||||
|
acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
|
||||||
|
acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
|
||||||
|
acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
|
||||||
|
acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
|
||||||
|
acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
|
||||||
|
acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
|
||||||
|
acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
|
||||||
|
acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
|
||||||
|
acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
|
||||||
|
acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
|
||||||
|
return acc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultData getResult(ResultData acc) {
|
||||||
|
return acc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultData merge(ResultData a, ResultData b) {
|
||||||
|
a.sessions = a.sessions + b.sessions;
|
||||||
|
a.in_bytes = a.in_bytes + b.in_bytes;
|
||||||
|
a.out_bytes = a.out_bytes + b.out_bytes;
|
||||||
|
a.in_pkts = a.in_pkts + b.in_pkts;
|
||||||
|
a.out_pkts = a.out_pkts + b.out_pkts;
|
||||||
|
a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
|
||||||
|
a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
|
||||||
|
a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
|
||||||
|
a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
|
||||||
|
a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
|
||||||
|
a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
|
||||||
|
a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
|
||||||
|
a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
|
||||||
|
a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
|
||||||
|
a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
|
||||||
|
a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
|
||||||
|
a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
|
||||||
|
a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
|
||||||
|
a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
|
||||||
|
return a;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
|
||||||
|
return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() {
|
||||||
|
private String NAME = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
|
||||||
|
NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
|
||||||
|
Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
|
||||||
|
long timestamp_ms = context.window().getStart();
|
||||||
|
for (ResultData ele : elements) {
|
||||||
|
ele.timestamp_ms = timestamp_ms;
|
||||||
|
ele.name = NAME;
|
||||||
|
ele.vsys_id = key.f0;
|
||||||
|
ele.device_id = key.f1;
|
||||||
|
ele.device_group = key.f2;
|
||||||
|
ele.data_center = key.f3;
|
||||||
|
ele.app_name = null;
|
||||||
|
String decodedPath = key.f4;
|
||||||
|
String app = key.f5;
|
||||||
|
|
||||||
|
// 拆分
|
||||||
|
int index = decodedPath.indexOf('.');
|
||||||
|
String subDecodedPath;
|
||||||
|
while (index > 0) {
|
||||||
|
subDecodedPath = decodedPath.substring(0, index);
|
||||||
|
ele.protocol_stack_id = subDecodedPath;
|
||||||
|
out.collect(JSON.toJSONString(ele));
|
||||||
|
index = decodedPath.indexOf('.', index + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ele.app_name = app;
|
||||||
|
ele.protocol_stack_id = decodedPath;
|
||||||
|
out.collect(JSON.toJSONString(ele));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DataStream<String> testSource(StreamExecutionEnvironment environment){
|
||||||
|
return environment.fromCollection(Arrays.asList(
|
||||||
|
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
|
||||||
|
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
|
||||||
|
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
|
||||||
|
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
|
||||||
|
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
|
||||||
|
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
|
||||||
|
"{}"
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user