Merge branch 'feature/2.1' into 'develop'
[TSG-18697] 拼接Protocol Statck ID时去除重复的基础协议 See merge request galaxy/tsg_olap/app-protocol-stat-traffic-merge!3
This commit is contained in:
25
README.md
25
README.md
@@ -16,21 +16,22 @@ Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到
|
|||||||
|
|
||||||
### 2.基于Tags内容进行分组统计。
|
### 2.基于Tags内容进行分组统计。
|
||||||
|
|
||||||
### 3.拆分protocol_stack_id协议树为多个节点
|
### 3.拆分protocol_stack_id协议树为多个节点,例如:ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为
|
||||||
|
|
||||||
#### 例如,ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为:
|
1. ETHERNET
|
||||||
|
2. ETHERNET.IPv4
|
||||||
|
3. ETHERNET.IPv4.TCP
|
||||||
|
4. ETHERNET.IPv4.TCP.https
|
||||||
|
5. ETHERNET.IPv4.TCP.https.kingsoft
|
||||||
|
6. ETHERNET.IPv4.TCP.https.kingsoft.wps_office
|
||||||
|
|
||||||
##### ETHERNET
|
#### 为避免展示重复的协议,拆分应去除Decoded Path(最后一个元素)与 Application(第一个元素)重复的基础协议,例如:{"protocol_label": "ETHERNET.IPv4.TCP.dns","app_full_path": "dns"}
|
||||||
|
|
||||||
##### ETHERNET.IPv4
|
1. ETHERNET
|
||||||
|
2. ETHERNET.IPv4
|
||||||
##### ETHERNET.IPv4.TCP
|
3. ETHERNET.IPv4.TCP
|
||||||
|
4. ETHERNET.IPv4.TCP.dns
|
||||||
##### ETHERNET.IPv4.TCP.https
|
##### 将protocol_label内最后的一个基础协议去除
|
||||||
|
|
||||||
##### ETHERNET.IPv4.TCP.https.kingsoft
|
|
||||||
|
|
||||||
##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office
|
|
||||||
|
|
||||||
### 4.app_name仅在终端节点输出。
|
### 4.app_name仅在终端节点输出。
|
||||||
|
|
||||||
|
|||||||
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>app-protocol-stat-traffic-merge</artifactId>
|
<artifactId>app-protocol-stat-traffic-merge</artifactId>
|
||||||
<version>2.0.1</version>
|
<version>2.1.0</version>
|
||||||
|
|
||||||
<name>app-protocol-stat-traffic-merge</name>
|
<name>app-protocol-stat-traffic-merge</name>
|
||||||
<url>http://www.example.com</url>
|
<url>http://www.example.com</url>
|
||||||
|
|||||||
@@ -28,14 +28,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
|
|||||||
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
|
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
|
||||||
Long timestamp_ms = originalLog.getLong("timestamp_ms");
|
Long timestamp_ms = originalLog.getLong("timestamp_ms");
|
||||||
|
|
||||||
String appFullPath = tags.getApp_name();
|
joinProtocol(tags);
|
||||||
if (StringUtil.isNotBlank(appFullPath)) {
|
|
||||||
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
|
|
||||||
String protocolLabel = tags.getProtocol_stack_id();
|
|
||||||
|
|
||||||
tags.setApp_name(appName);
|
|
||||||
tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
|
|
||||||
}
|
|
||||||
|
|
||||||
out.collect(new Tuple3<>(tags, fields, timestamp_ms));
|
out.collect(new Tuple3<>(tags, fields, timestamp_ms));
|
||||||
}
|
}
|
||||||
@@ -44,4 +37,32 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
|
|||||||
logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage());
|
logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 避免计算重复的协议,去除Decoded Path(最后一个元素) 与 Application(第一个元素)重复的基础协议。
|
||||||
|
*
|
||||||
|
* @param tags
|
||||||
|
*/
|
||||||
|
private static void joinProtocol(Tags tags) {
|
||||||
|
String appFullPath = tags.getApp_name();
|
||||||
|
|
||||||
|
if (StringUtil.isNotBlank(appFullPath)) {
|
||||||
|
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
|
||||||
|
tags.setApp_name(appName);
|
||||||
|
|
||||||
|
String protocolLabel = tags.getProtocol_stack_id();
|
||||||
|
|
||||||
|
String endProtocol = protocolLabel.substring(protocolLabel.lastIndexOf(".") + 1);
|
||||||
|
|
||||||
|
String[] appSplits = appFullPath.split("\\.");
|
||||||
|
|
||||||
|
String firstAppProtocol = appSplits[0];
|
||||||
|
|
||||||
|
if (endProtocol.equals(firstAppProtocol)) {
|
||||||
|
tags.setProtocol_stack_id(protocolLabel.substring(0, protocolLabel.lastIndexOf(".")).concat(".").concat(appFullPath));
|
||||||
|
} else {
|
||||||
|
tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import java.util.Properties;
|
|||||||
public class KafkaProducer {
|
public class KafkaProducer {
|
||||||
|
|
||||||
public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
|
public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
|
||||||
setDefaultConfig(properties, "ack", 1);
|
setDefaultConfig(properties, "ack", "1");
|
||||||
setDefaultConfig(properties, "retries", 0);
|
setDefaultConfig(properties, "retries", 0);
|
||||||
setDefaultConfig(properties, "linger.ms", 10);
|
setDefaultConfig(properties, "linger.ms", 10);
|
||||||
setDefaultConfig(properties, "request.timeout.ms", 30000);
|
setDefaultConfig(properties, "request.timeout.ms", 30000);
|
||||||
|
|||||||
@@ -72,4 +72,26 @@ public class ConventionalTest {
|
|||||||
System.out.println(protocol.concat(".").concat(app));
|
System.out.println(protocol.concat(".").concat(app));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void removeDuplicateProtocolTest() {
|
||||||
|
String str = "[.]";
|
||||||
|
String protocol = "ETHERNET.IPv4.TCP.http";
|
||||||
|
String[] protocolSplits = protocol.split(str);
|
||||||
|
String endProtocol = protocolSplits[protocolSplits.length -1];
|
||||||
|
System.out.println(endProtocol);
|
||||||
|
|
||||||
|
String app = "http.test";
|
||||||
|
String[] appSplits = app.split(str);
|
||||||
|
String firstAppProtocol = appSplits[0];
|
||||||
|
System.out.println(firstAppProtocol);
|
||||||
|
|
||||||
|
|
||||||
|
System.out.println(app.substring(app.lastIndexOf(".") + 1));
|
||||||
|
System.out.println(app.lastIndexOf(".") + 1);
|
||||||
|
System.out.println(protocol.substring(0,protocol.lastIndexOf(".")));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user