适配重构后的tsg日志,增加单元测试

This commit is contained in:
wangkuan
2024-02-06 15:38:42 +08:00
parent 6e683191c2
commit 6d68355b5e
26 changed files with 931 additions and 769 deletions

19
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.galaxy.tsg</groupId>
<artifactId>topn-metrics-job</artifactId>
<version>23-11-01</version>
<version>24-01-16</version>
<repositories>
<repository>
@@ -21,11 +21,13 @@
<hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
<dependencies>
<dependency>
<groupId>com.zdjizhi</groupId>
<groupId>com.geedgenetworks</groupId>
<artifactId>galaxy</artifactId>
<version>1.1.0</version>
<version>1.2.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -37,6 +39,7 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
@@ -149,6 +152,12 @@
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@@ -181,7 +190,7 @@
</goals>
<configuration>
<finalName>topn-metrics-job-23-11-01</finalName>
<finalName>topn-metrics-job-24-01-16</finalName>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.

View File

@@ -1,19 +1,18 @@
package com.galaxy.tsg;
import com.alibaba.fastjson2.JSON;
import com.galaxy.tsg.function.metricsAggregationReduce;
import com.galaxy.tsg.function.metricsCalculate;
import com.galaxy.tsg.function.topnHotItems;
import com.galaxy.tsg.pojo.resultEntity;
import com.galaxy.tsg.pojo.sessionEntity;
import com.galaxy.tsg.pojo.transformEntity;
import com.zdjizhi.utils.StringUtil;
import com.galaxy.tsg.config.CommonConfig;
import com.galaxy.tsg.function.*;
import com.galaxy.tsg.pojo.ResultEntity;
import com.galaxy.tsg.pojo.SessionEntity;
import com.galaxy.tsg.pojo.TransformEntity;
import com.galaxy.tsg.selector.GroupBySelector;
import com.galaxy.tsg.selector.OrderBySelector;
import com.geedgenetworks.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -23,249 +22,125 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Objects;
import static com.galaxy.tsg.config.commonConfig.*;
import static com.galaxy.tsg.util.kafkaUtils.getKafkaConsumer;
import static com.galaxy.tsg.util.kafkaUtils.getKafkaSink;
import static com.galaxy.tsg.config.CommonConfig.*;
import static com.galaxy.tsg.util.KafkaUtils.getKafkaConsumer;
import static com.galaxy.tsg.util.KafkaUtils.getKafkaSink;
public class Toptask {
private static final Logger LOG = LoggerFactory.getLogger(Toptask.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool serviceConfig = ParameterTool.fromPropertiesFile(args[0]);
Configuration configurationService = serviceConfig.getConfiguration();
// global register
env.getConfig().setGlobalJobParameters(configurationService);
WatermarkStrategy<SessionEntity> strategyForSession = WatermarkStrategy
.<SessionEntity>forBoundedOutOfOrderness(Duration.ofSeconds(configurationService.get(CommonConfig.WATERMARK_TIME)))
.withTimestampAssigner((sessionEntity, timestamp) -> sessionEntity.getRecv_time());
DataStream<String> sourceForSession = env.addSource(getKafkaConsumer(KAFKA_CONSUMER_TOPIC)).setParallelism(KAFKA_CONSUMER_PARALLELISM);
WatermarkStrategy<transformEntity> strategyForSession = WatermarkStrategy
.<transformEntity>forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME))
.withTimestampAssigner((transformEntity, timestamp) -> transformEntity.getTimestamp()*1000);
SingleOutputStreamOperator<transformEntity> inputForSession = sourceForSession.map(new MapFunction<String, transformEntity>() {
DataStream<SessionEntity> sourceForSession = env.addSource(getKafkaConsumer(configurationService)).setParallelism(configurationService.get(CommonConfig.KAFKA_CONSUMER_PARALLELISM)).assignTimestampsAndWatermarks(strategyForSession);;
SingleOutputStreamOperator<TransformEntity> inputForSession = sourceForSession.flatMap(new FlatMapFunction()
).filter(new FilterFunction<TransformEntity>() {
@Override
public transformEntity map(String message) {
transformEntity transformEntity = new transformEntity();
try {
sessionEntity sessionEntity = JSON.parseObject(message, com.galaxy.tsg.pojo.sessionEntity.class);
transformEntity.setServer_ip(sessionEntity.getCommon_server_ip());
transformEntity.setClient_ip(sessionEntity.getCommon_client_ip());
public boolean filter(TransformEntity entity) throws Exception {
transformEntity.setSubscriber_id(sessionEntity.getCommon_subscriber_id());
transformEntity.setFqdn(sessionEntity.getCommon_server_fqdn());
transformEntity.setExternal_ip(sessionEntity.getCommon_external_ip());
transformEntity.setInternal_ip(sessionEntity.getCommon_internal_ip());
transformEntity.setDomain(sessionEntity.getHttp_domain());
transformEntity.setDevice_group(sessionEntity.getCommon_device_group());
transformEntity.setDevice_id(sessionEntity.getCommon_device_id());
transformEntity.setData_center(sessionEntity.getCommon_data_center());
transformEntity.setVsys_id(sessionEntity.getCommon_vsys_id());
transformEntity.setTimestamp(sessionEntity.getCommon_recv_time());
transformEntity.setSessions(sessionEntity.getCommon_sessions());
transformEntity.setL4_protocol(sessionEntity.getCommon_l4_protocol());
if ((8L & sessionEntity.getCommon_flags()) == 8L) {
transformEntity.setOut_bytes(sessionEntity.getCommon_c2s_byte_num());
transformEntity.setOut_pkts(sessionEntity.getCommon_c2s_pkt_num());
transformEntity.setIn_bytes(sessionEntity.getCommon_s2c_byte_num());
transformEntity.setIn_pkts(sessionEntity.getCommon_s2c_pkt_num());
} else {
transformEntity.setOut_bytes(sessionEntity.getCommon_s2c_byte_num());
transformEntity.setOut_pkts(sessionEntity.getCommon_s2c_pkt_num());
transformEntity.setIn_bytes(sessionEntity.getCommon_c2s_byte_num());
transformEntity.setIn_pkts(sessionEntity.getCommon_c2s_pkt_num());
}
} catch (Exception e) {
LOG.error("Entity Parsing ERROR");
transformEntity.setIfError(1);
}
return transformEntity;
}
}).filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity entity) throws Exception {
return entity.ifError != 1;
return entity.getIfError() != 1;
}
});
//clientip聚合TOP
SingleOutputStreamOperator<transformEntity> clientipdStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return "IPv6_TCP".equals(value.getL4_protocol()) || "IPv4_TCP".equals(value.getL4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<resultEntity> windowedStream = clientipdStream.keyBy(new groupBySelector("client_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("client_ip");;
DataStream<String> Stream = windowedStream.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
Stream.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
//serverip聚合TOP
SingleOutputStreamOperator<transformEntity> serveripdStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return "IPv6_TCP".equals(value.getL4_protocol()) || "IPv4_TCP".equals(value.getL4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<resultEntity> windowedStreamForServerIp = serveripdStream.keyBy(new groupBySelector("server_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_ip");;
DataStream<String> StreamForServerIp = windowedStreamForServerIp.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForServerIp.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
//common_internal_ip聚合TOP
SingleOutputStreamOperator<transformEntity> internalStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getInternal_ip());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<resultEntity> windowedStreamForInternal = internalStream.keyBy(new groupBySelector("internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("internal_ip");;
DataStream<String> StreamForInternal = windowedStreamForInternal.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForInternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
//common_external_ip聚合TOP
SingleOutputStreamOperator<transformEntity> externalStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getExternal_ip());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<resultEntity> windowedStreamForExternal = externalStream.keyBy(new groupBySelector("external_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("external_ip");;
DataStream<String> StreamForExternal = windowedStreamForExternal.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForExternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
//http_domain聚合TOP
SingleOutputStreamOperator<transformEntity> domainStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getDomain());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<resultEntity> windowedStreamForDomain = domainStream.keyBy(new groupBySelector("server_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_domain");;
DataStream<String> StreamForDomain = windowedStreamForDomain.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForDomain.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
SingleOutputStreamOperator<transformEntity> userStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getSubscriber_id());
}
}).assignTimestampsAndWatermarks(strategyForSession);
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<resultEntity> windowedStreamForUser = userStream.keyBy(new groupBySelector("subscriber_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("subscriber_id");;
DataStream<String> StreamForUser = windowedStreamForUser.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForUser.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
SingleOutputStreamOperator<transformEntity> fqdnStream = inputForSession.filter(new FilterFunction<transformEntity>() {
@Override
public boolean filter(transformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getFqdn());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<resultEntity> windowedStreamForFqdn = fqdnStream.keyBy(new groupBySelector("server_fqdn"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_fqdn");
DataStream<String> StreamForFqdn = windowedStreamForFqdn.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForFqdn.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
env.execute(JOB_NAME);
buildJobTuple(configurationService,inputForSession,"client_ip");
buildJobTuple(configurationService,inputForSession,"server_ip");
buildJobTuple(configurationService,inputForSession,"internal_ip");
buildJobTuple(configurationService,inputForSession,"external_ip");
buildJobTuple(configurationService,inputForSession,"server_domain");
buildJobTuple(configurationService,inputForSession,"subscriber_id");
buildJobTuple(configurationService,inputForSession,"server_fqdn");
env.execute(configurationService.getString(JOB_NAME));
}
public static class groupBySelector implements KeySelector<transformEntity, Tuple5<String, Long, String, String, String>> {
public String key;
private static void buildJobTuple(Configuration configurationService, SingleOutputStreamOperator<TransformEntity> inputForSession, String key) {
SingleOutputStreamOperator<TransformEntity> event = null;
switch (key) {
case "client_ip":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getClient_ip());
}
});
break;
case "server_ip":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getServer_ip());
}
});
break;
case "internal_ip":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getInternal_ip());
}
});
break;
case "external_ip":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getExternal_ip());
}
});
break;
case "server_domain":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getDomain());
}
});
break;
case "subscriber_id":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getSubscriber_id());
}
});
break;
case "server_fqdn":
event= inputForSession.filter(new FilterFunction<TransformEntity>() {
@Override
public boolean filter(TransformEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getFqdn());
}
});
break;
default:
break;
}
SingleOutputStreamOperator<ResultEntity> windowedStreamForFqdn = Objects.requireNonNull(event).keyBy(new GroupBySelector(key))
.window(TumblingEventTimeWindows.of(Time.minutes(configurationService.getInteger(CommonConfig.WINDOW_TIME_MINUTE))))
.reduce(new MetricsAggregationReduce(), new MetricsCalculate()).setParallelism(configurationService.getInteger(CommonConfig.TASK_PARALLELISM)).name(key);
DataStream<String> StreamForKey = windowedStreamForFqdn.keyBy(new OrderBySelector())
.process(new TopnHotItems(configurationService.getInteger(CommonConfig.TOP_LIMIT))).setParallelism(configurationService.getInteger(CommonConfig.ORDERBY_PARALLELISM));
StreamForKey.addSink(getKafkaSink(configurationService)).setParallelism(configurationService.getInteger(CommonConfig.SINK_PARALLELISM));
public groupBySelector(String key) {
this.key = key;
}
@Override
public Tuple5<String, Long, String, String, String> getKey(transformEntity transformEntity) throws Exception {
Tuple5<String, Long, String, String, String> tuple = null;
transformEntity.setKey_by(key);
switch (key) {
case "client_ip":
tuple = new Tuple5<>(transformEntity.getClient_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "server_ip":
tuple = new Tuple5<>(transformEntity.getServer_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "internal_ip":
tuple = new Tuple5<>(transformEntity.getInternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "external_ip":
tuple = new Tuple5<>(transformEntity.getExternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "server_domain":
tuple = new Tuple5<>(transformEntity.getDomain(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "subscriber_id":
tuple = new Tuple5<>(transformEntity.getSubscriber_id(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "server_fqdn":
tuple = new Tuple5<>(transformEntity.getFqdn(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
default:
}
return tuple;
}
}
public static class oneKeySelector implements KeySelector<resultEntity, Tuple1<String>> {
@Override
public Tuple1<String> getKey(resultEntity entity) throws Exception {
return new Tuple1<>(entity.getOrder_by());
}
}
}

View File

@@ -0,0 +1,53 @@
package com.galaxy.tsg.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/**
* Created by wk on 2021/1/6.
*/
public class CommonConfig {
public static final ConfigOption<String> KAFKA_CONSUMER_BROKER = ConfigOptions.key("kafka.consumer.broker").stringType().defaultValue("");
public static final ConfigOption<String> KAFKA_CONSUMER_GROUP_ID = ConfigOptions.key("kafka.consumer.group.id").stringType().defaultValue("");
public static final ConfigOption<String> KAFKA_CONSUMER_TOPIC = ConfigOptions.key("kafka.consumer.topic").stringType().defaultValue("");
public static final ConfigOption<Integer> KAFKA_CONSUMER_PARALLELISM = ConfigOptions.key("kafka.consumer.parallelism").intType().defaultValue(0);
public static final ConfigOption<String> KAFKA_CONSUMER_SESSION_TIMEOUT_MS= ConfigOptions.key("kafka.consumer.session.timeout.ms").stringType().defaultValue("60000");
public static final ConfigOption<String> KAFKA_CONSUMER_MAX_POLL_RECORD= ConfigOptions.key("kafka.consumer.max.poll.records").stringType().defaultValue("3000");
public static final ConfigOption<String> KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES= ConfigOptions.key("kafka.consumer.max.partition.fetch.bytes").stringType().defaultValue("31457280");
public static final ConfigOption<String> KAFKA_PRODUCER_TOPIC = ConfigOptions.key("kafka.producer.topic").stringType().defaultValue("");
public static final ConfigOption<String> KAFKA_PRODUCER_RETRIES = ConfigOptions.key("kafka.producer.retries").stringType().defaultValue("1");
public static final ConfigOption<String> KAFKA_PRODUCER_LINGER_MS = ConfigOptions.key("kafka.producer.linger.ms").stringType().defaultValue("10");
public static final ConfigOption<String> KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = ConfigOptions.key("kafka.producer.request.timeout.ms").stringType().defaultValue("30000");
public static final ConfigOption<String> KAFKA_PRODUCER_BATCH_SIZE = ConfigOptions.key("kafka.producer.batch.size").stringType().defaultValue("262144");
public static final ConfigOption<String> KAFKA_PRODUCER_BUFFER_MEMORY = ConfigOptions.key("kafka.producer.buffer.memory").stringType().defaultValue("134217728");
public static final ConfigOption<String> KAFKA_PRODUCER_MAX_REQUEST_SIZE = ConfigOptions.key("kafka.producer.max.request.size").stringType().defaultValue("10485760");
public static final ConfigOption<String> KAFKA_PRODUCER_COMPRESSION_TYPE = ConfigOptions.key("kafka.producer.compression.type").stringType().defaultValue("none");
public static final ConfigOption<String> KAFKA_PRODUCER_BROKER = ConfigOptions.key("kafka.producer.broker").stringType().defaultValue("");
public static final ConfigOption<String> JOB_NAME = ConfigOptions.key("job.name").stringType().defaultValue("");
public static final ConfigOption<Integer> TASK_PARALLELISM = ConfigOptions.key("task.parallelism").intType().defaultValue(0);
public static final ConfigOption<Integer> ORDERBY_PARALLELISM = ConfigOptions.key("orderby.parallelism").intType().defaultValue(0);
public static final ConfigOption<Integer> SINK_PARALLELISM = ConfigOptions.key("sink.parallelism").intType().defaultValue(0);
public static final ConfigOption<Integer> WATERMARK_TIME = ConfigOptions.key("watermark.time").intType().defaultValue(0);
public static final ConfigOption<Integer> WINDOW_TIME_MINUTE = ConfigOptions.key("window.time.minute").intType().defaultValue(0);
public static final ConfigOption<Integer> TOP_LIMIT = ConfigOptions.key("top.limit").intType().defaultValue(0);
public static final ConfigOption<String> KAFKA_CONSUMER_USER = ConfigOptions.key("kafka.consumer.user").stringType().defaultValue("");
public static final ConfigOption<String> KAFKA_CONSUMER_PIN = ConfigOptions.key("kafka.consumer.pin").stringType().defaultValue("");
public static final ConfigOption<Integer> KAFKA_CONSUMER_SECURITY = ConfigOptions.key("kafka.consumer.security").intType().defaultValue(0);
public static final ConfigOption<String> TOOLS_CONSUMER_LIBRARY = ConfigOptions.key("tools.consumer.library").stringType().defaultValue("");
public static final ConfigOption<String> KAFKA_PRODUCER_USER = ConfigOptions.key("kafka.producer.user").stringType().defaultValue("");
public static final ConfigOption<String> KAFKA_PRODUCER_PIN = ConfigOptions.key("kafka.producer.pin").stringType().defaultValue("");
public static final ConfigOption<Integer> KAFKA_PRODUCER_SECURITY = ConfigOptions.key("kafka.producer.security").intType().defaultValue(0);
public static final ConfigOption<String> TOOLS_PRODUCER_LIBRARY = ConfigOptions.key("tools.producer.library").stringType().defaultValue("");
}

View File

@@ -9,7 +9,7 @@ import java.util.Properties;
* @author Administrator
*/
public final class commonConfigurations {
public final class CommonConfigurations {
private static Properties propService = new Properties();
@@ -54,7 +54,7 @@ public final class commonConfigurations {
static {
try {
propService.load(commonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
} catch (Exception e) {
propService = null;
}

View File

@@ -1,52 +0,0 @@
package com.galaxy.tsg.config;
/**
* Created by wk on 2021/1/6.
*/
public class commonConfig {
public static final String KAFKA_CONSUMER_BROKER = commonConfigurations.getStringProperty("kafka.consumer.broker");
public static final String KAFKA_CONSUMER_GROUP_ID = commonConfigurations.getStringProperty("kafka.consumer.group.id");
public static final String KAFKA_CONSUMER_TOPIC = commonConfigurations.getStringProperty("kafka.consumer.topic");
public static final int KAFKA_CONSUMER_PARALLELISM = commonConfigurations.getIntProperty("kafka.consumer.parallelism");
public static final String KAFKA_CONSUMER_SESSION_TIMEOUT_MS= commonConfigurations.getStringProperty("kafka.consumer.session.timeout.ms");
public static final String KAFKA_CONSUMER_MAX_POLL_RECORD= commonConfigurations.getStringProperty("kafka.consumer.max.poll.records");
public static final String KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES= commonConfigurations.getStringProperty("kafka.consumer.max.partition.fetch.bytes");
public static final String KAFKA_PRODUCER_TOPIC = commonConfigurations.getStringProperty("kafka.producer.topic");
public static final String KAFKA_PRODUCER_RETRIES = commonConfigurations.getStringProperty("kafka.producer.retries");
public static final String KAFKA_PRODUCER_LINGER_MS = commonConfigurations.getStringProperty("kafka.producer.linger.ms");
public static final String KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = commonConfigurations.getStringProperty("kafka.producer.request.timeout.ms");
public static final String KAFKA_PRODUCER_BATCH_SIZE = commonConfigurations.getStringProperty("kafka.producer.batch.size");
public static final String KAFKA_PRODUCER_BUFFER_MEMORY = commonConfigurations.getStringProperty("kafka.producer.buffer.memory");
public static final String KAFKA_PRODUCER_MAX_REQUEST_SIZE = commonConfigurations.getStringProperty("kafka.producer.max.request.size");
public static final String KAFKA_PRODUCER_COMPRESSION_TYPE = commonConfigurations.getStringProperty("kafka.producer.compression.type");
public static final String KAFKA_PRODUCER_BROKER = commonConfigurations.getStringProperty("kafka_producer_broker");
public static final String JOB_NAME = commonConfigurations.getStringProperty("job.name");
public static final int TASK_PARALLELISM = commonConfigurations.getIntProperty("task.parallelism");
public static final int ORDERBY_PARALLELISM = commonConfigurations.getIntProperty("orderby.parallelism");
public static final int SINK_PARALLELISM = commonConfigurations.getIntProperty("sink.parallelism");
public static final int WATERMARK_TIME = commonConfigurations.getIntProperty("watermark.time");
public static final int WINDOW_TIME_MINUTE = commonConfigurations.getIntProperty("window.time.minute");
public static final int TOP_LIMIT = commonConfigurations.getIntProperty("top.limit");
public static final String KAFKA_CONSUMER_USER = commonConfigurations.getStringProperty("kafka.consumer.user");
public static final String KAFKA_CONSUMER_PIN = commonConfigurations.getStringProperty("kafka.consumer.pin");
public static final int KAFKA_CONSUMER_SECURITY = commonConfigurations.getIntProperty("kafka.consumer.security");
public static final String TOOLS_CONSUMER_LIBRARY = commonConfigurations.getStringProperty("tools.consumer.library");
public static final String KAFKA_PRODUCER_USER = commonConfigurations.getStringProperty("kafka.producer.user");
public static final String KAFKA_PRODUCER_PIN = commonConfigurations.getStringProperty("kafka.producer.pin");
public static final int KAFKA_PRODUCER_SECURITY = commonConfigurations.getIntProperty("kafka.producer.security");
public static final String TOOLS_PRODUCER_LIBRARY = commonConfigurations.getStringProperty("tools.producer.library");
}

View File

@@ -0,0 +1,89 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.SessionEntity;
import com.galaxy.tsg.pojo.TransformEntity;
import com.geedgenetworks.utils.FormatUtils;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlatMapFunction implements org.apache.flink.api.common.functions.FlatMapFunction<SessionEntity, TransformEntity> {
private static final Logger LOG = LoggerFactory.getLogger(FlatMapFunction.class);
@Override
public void flatMap(SessionEntity sessionEntity, Collector<TransformEntity> out) throws Exception {
TransformEntity transformEntity = new TransformEntity();
try {
transformEntity.setServer_ip(sessionEntity.getServer_ip());
transformEntity.setClient_ip(sessionEntity.getClient_ip());
transformEntity.setSubscriber_id(sessionEntity.getSubscriber_id());
transformEntity.setFqdn(sessionEntity.getServer_fqdn());
if(sessionEntity.getHttp_host()!=null && !sessionEntity.getHttp_host().isEmpty()){
transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getHttp_host()));
}
else if(sessionEntity.getSsl_sni()!=null && !sessionEntity.getSsl_sni().isEmpty()) {
transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getSsl_sni()));
}
else if(sessionEntity.getDtls_sni()!=null && !sessionEntity.getDtls_sni().isEmpty()){
transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getDtls_sni()));
}
if(sessionEntity.getQuic_sni()!=null && !sessionEntity.getQuic_sni().isEmpty()){
transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getQuic_sni()));
}
if(transformEntity.getDomain()!=null && transformEntity.getDomain().contains("InternetDomain")){
System.out.println(transformEntity.getDomain());
}
transformEntity.setDevice_group(sessionEntity.getDevice_group());
transformEntity.setDevice_id(sessionEntity.getDevice_id());
transformEntity.setData_center(sessionEntity.getData_center());
transformEntity.setVsys_id(sessionEntity.getVsys_id());
transformEntity.setTimestamp(sessionEntity.getRecv_time()/1000);
transformEntity.setSessions(1);
//
if ((8L & sessionEntity.getFlags()) == 8L) {
transformEntity.setOut_bytes(sessionEntity.getSent_bytes());
transformEntity.setOut_pkts(sessionEntity.getSent_pkts());
transformEntity.setIn_bytes(sessionEntity.getReceived_bytes());
transformEntity.setIn_pkts(sessionEntity.getReceived_pkts());
} else {
transformEntity.setOut_bytes(sessionEntity.getReceived_bytes());
transformEntity.setOut_pkts(sessionEntity.getReceived_pkts());
transformEntity.setIn_bytes(sessionEntity.getSent_bytes());
transformEntity.setIn_pkts(sessionEntity.getSent_pkts());
}
if(sessionEntity.getFlags()>0) {
if ((8L & sessionEntity.getFlags()) == 8L && (16L & sessionEntity.getFlags()) == 16L) {
transformEntity.setInternal_ip(sessionEntity.getServer_ip());
out.collect(transformEntity);
transformEntity.setInternal_ip(sessionEntity.getClient_ip());
} else if ((8L & sessionEntity.getFlags()) == 8L && (16L & sessionEntity.getFlags()) != 16L) {
transformEntity.setInternal_ip(sessionEntity.getClient_ip());
transformEntity.setExternal_ip(sessionEntity.getServer_ip());
} else if ((8L & sessionEntity.getFlags()) != 8L && (16L & sessionEntity.getFlags()) == 16L) {
transformEntity.setInternal_ip(sessionEntity.getServer_ip());
transformEntity.setExternal_ip(sessionEntity.getClient_ip());
} else if ((8L & sessionEntity.getFlags()) != 8L && (16L & sessionEntity.getFlags()) != 16L) {
transformEntity.setExternal_ip(sessionEntity.getServer_ip());
out.collect(transformEntity);
transformEntity.setExternal_ip(sessionEntity.getClient_ip());
}
}
} catch (Exception e) {
LOG.error("Entity Parsing ERROR");
transformEntity.setIfError(1);
}
out.collect(transformEntity);
}
}

View File

@@ -1,12 +1,12 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.transformEntity;
import com.galaxy.tsg.pojo.TransformEntity;
import org.apache.flink.api.common.functions.ReduceFunction;
public class metricsAggregationReduce implements ReduceFunction<transformEntity> {
public class MetricsAggregationReduce implements ReduceFunction<TransformEntity> {
@Override
public transformEntity reduce(transformEntity value1, transformEntity value2) throws Exception {
public TransformEntity reduce(TransformEntity value1, TransformEntity value2) throws Exception {
value1.setOut_pkts(value1.getOut_pkts() + value2.getOut_pkts());
value1.setOut_bytes(value1.getOut_bytes() + value2.getOut_bytes());
value1.setIn_bytes(value1.getIn_bytes() + value2.getIn_bytes());

View File

@@ -6,9 +6,9 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class metricsCalculate extends ProcessWindowFunction<
transformEntity, // 输入类型
resultEntity, // 输出类型
public class MetricsCalculate extends ProcessWindowFunction<
TransformEntity, // 输入类型
ResultEntity, // 输出类型
Tuple5<String, Long, String, String, String>, // 键类型
TimeWindow> { // 窗口类型
@@ -16,25 +16,25 @@ public class metricsCalculate extends ProcessWindowFunction<
@Override
public void process(Tuple5<String, Long, String, String, String> s,
Context context,
Iterable<transformEntity> elements, Collector<resultEntity> out) throws Exception {
Iterable<TransformEntity> elements, Collector<ResultEntity> out) throws Exception {
if (elements.iterator().hasNext()) {
transformEntity objectTransformEntity = elements.iterator().next();
resultEntity enSession = new resultEntity();
TransformEntity objectTransformEntity = elements.iterator().next();
ResultEntity enSession = new ResultEntity();
enSession.setOrder_by("sessions");
enSession.setStat_time(context.window().getStart());
enSession.setSessionResultEntity(enrichessionResult(context.window().getStart(), objectTransformEntity));
out.collect(enSession);
resultEntity enPacket = new resultEntity();
ResultEntity enPacket = new ResultEntity();
enPacket.setOrder_by("packets");
enPacket.setStat_time(context.window().getStart());
enPacket.setPacketResultEntity(enrichPacketResult(context.window().getStart() , objectTransformEntity));
out.collect(enPacket);
resultEntity enbyte = new resultEntity();
ResultEntity enbyte = new ResultEntity();
enbyte.setOrder_by("bytes");
enbyte.setStat_time(context.window().getStart());
enbyte.setByteResultEntity(enrichByteResult(context.window().getStart(), objectTransformEntity));
@@ -46,8 +46,8 @@ public class metricsCalculate extends ProcessWindowFunction<
}
public byteResultEntity enrichByteResult(Long time, transformEntity objectTransformEntity) {
byteResultEntity en = new byteResultEntity();
public ByteResultEntity enrichByteResult(Long time, TransformEntity objectTransformEntity) {
ByteResultEntity en = new ByteResultEntity();
en.setVsys_id(objectTransformEntity.getVsys_id());
en.setTimestamp_ms(time);
en.setSessions(objectTransformEntity.getSessions());
@@ -72,9 +72,9 @@ public class metricsCalculate extends ProcessWindowFunction<
}
public sessionResultEntity enrichessionResult(Long time, transformEntity objectTransformEntity) {
public SessionResultEntity enrichessionResult(Long time, TransformEntity objectTransformEntity) {
sessionResultEntity en = new sessionResultEntity();
SessionResultEntity en = new SessionResultEntity();
en.setVsys_id(objectTransformEntity.getVsys_id());
en.setTimestamp_ms(time);
en.setSessions(objectTransformEntity.getSessions());
@@ -96,8 +96,8 @@ public class metricsCalculate extends ProcessWindowFunction<
return en;
}
public packetResultEntity enrichPacketResult(Long time, transformEntity objectTransformEntity) {
packetResultEntity en = new packetResultEntity();
public PacketResultEntity enrichPacketResult(Long time, TransformEntity objectTransformEntity) {
PacketResultEntity en = new PacketResultEntity();
en.setVsys_id(objectTransformEntity.getVsys_id());
en.setTimestamp_ms(time);
en.setSessions(objectTransformEntity.getSessions());

View File

@@ -11,14 +11,14 @@ import org.apache.flink.util.Collector;
import java.util.Map;
import java.util.PriorityQueue;
public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEntity, String> {
public class TopnHotItems extends KeyedProcessFunction<Tuple1<String>, ResultEntity, String> {
private final int topSize;
private PriorityQueue<sessionResultEntity> sessionOrderEntity ;
private PriorityQueue<packetResultEntity> packetOrderEntity ;
private PriorityQueue<byteResultEntity> byteOrderEntity ;
private PriorityQueue<SessionResultEntity> sessionOrderEntity ;
private PriorityQueue<PacketResultEntity> packetOrderEntity ;
private PriorityQueue<ByteResultEntity> byteOrderEntity ;
public topnHotItems(int i) {
public TopnHotItems(int i) {
this.topSize = i;
@@ -36,7 +36,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
@Override
public void processElement(resultEntity objectEntity, Context context, Collector<String> collector) {
public void processElement(ResultEntity objectEntity, Context context, Collector<String> collector) {
@@ -49,7 +49,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
sessionOrderEntity.add(objectEntity.getSessionResultEntity());
} else {
if (sessionOrderEntity.peek() != null) {
sessionResultEntity res=sessionOrderEntity.peek();
SessionResultEntity res=sessionOrderEntity.peek();
if (res.getSessions() <= objectEntity.getSessionResultEntity().getSessions()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(objectEntity.getSessionResultEntity());
@@ -63,7 +63,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
packetOrderEntity.add(objectEntity.getPacketResultEntity());
} else {
if (packetOrderEntity.peek() != null) {
packetResultEntity res=packetOrderEntity.peek();
PacketResultEntity res=packetOrderEntity.peek();
if ((res.getIn_pkts()+res.getOut_pkts()) <= (objectEntity.getPacketResultEntity().getIn_pkts()+objectEntity.getPacketResultEntity().getOut_pkts())) {
packetOrderEntity.poll();
packetOrderEntity.add(objectEntity.getPacketResultEntity());
@@ -76,7 +76,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
byteOrderEntity.add(objectEntity.getByteResultEntity());
} else {
if (byteOrderEntity.peek() != null) {
byteResultEntity res=byteOrderEntity.peek();
ByteResultEntity res=byteOrderEntity.peek();
if ((res.getIn_bytes()+res.getOut_bytes()) <= (objectEntity.getByteResultEntity().getIn_bytes()+objectEntity.getByteResultEntity().getOut_bytes())) {
byteOrderEntity.poll();
byteOrderEntity.add(objectEntity.getByteResultEntity());
@@ -96,9 +96,9 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
for(sessionResultEntity en : sessionOrderEntity) {
for(SessionResultEntity en : sessionOrderEntity) {
metricsEntity metricsEntity = new metricsEntity();
MetricsEntity metricsEntity = new MetricsEntity();
metricsEntity.setName("sessions_top_" + en.getKey_by());
Map tags = new LinkedMap();
tags.put("vsys_id", en.getVsys_id());
@@ -145,10 +145,10 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
}
for(packetResultEntity en : packetOrderEntity){
for(PacketResultEntity en : packetOrderEntity){
metricsEntity metricsEntity = new metricsEntity();
MetricsEntity metricsEntity = new MetricsEntity();
metricsEntity.setName("packets_top_" + en.getKey_by());
Map tags = new LinkedMap();
tags.put("vsys_id", en.getVsys_id());
@@ -195,15 +195,9 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
}
for(byteResultEntity en : byteOrderEntity){
for(ByteResultEntity en : byteOrderEntity){
metricsEntity metricsEntity = new metricsEntity();
MetricsEntity metricsEntity = new MetricsEntity();
metricsEntity.setName("bytes_top_" + en.getKey_by());
Map tags = new LinkedMap();
tags.put("vsys_id", en.getVsys_id());

View File

@@ -1,6 +1,6 @@
package com.galaxy.tsg.pojo;
public class byteResultEntity implements Comparable<byteResultEntity> {
public class ByteResultEntity implements Comparable<ByteResultEntity> {
@@ -191,7 +191,7 @@ public class byteResultEntity implements Comparable<byteResultEntity> {
}
@Override
public int compareTo(byteResultEntity per) {
public int compareTo(ByteResultEntity per) {
if((this.out_bytes+this.in_bytes)>=(per.out_bytes+per.in_bytes)){
return 1 ;
}else{

View File

@@ -2,7 +2,7 @@ package com.galaxy.tsg.pojo;
import java.util.Map;
public class metricsEntity {
public class MetricsEntity {
private String name;
private Map<String,String> tags;
private Map<String,Long> fields;

View File

@@ -1,6 +1,6 @@
package com.galaxy.tsg.pojo;
public class packetResultEntity implements Comparable<packetResultEntity>, Cloneable {
public class PacketResultEntity implements Comparable<PacketResultEntity>, Cloneable {
private long timestamp_ms;
@@ -189,7 +189,7 @@ public class packetResultEntity implements Comparable<packetResultEntity>, Clone
}
@Override
public int compareTo(packetResultEntity per) {
public int compareTo(PacketResultEntity per) {
if((this.out_pkts+this.in_pkts)>=(per.out_pkts+per.in_pkts)){
return 1 ;
}else{

View File

@@ -0,0 +1,56 @@
package com.galaxy.tsg.pojo;
public class ResultEntity {
private String order_by;
private Long stat_time;
private SessionResultEntity sessionResultEntity;
private PacketResultEntity packetResultEntity;
private ByteResultEntity byteResultEntity;
public String getOrder_by() {
return order_by;
}
public void setOrder_by(String order_by) {
this.order_by = order_by;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
public SessionResultEntity getSessionResultEntity() {
return sessionResultEntity;
}
public void setSessionResultEntity(SessionResultEntity sessionResultEntity) {
this.sessionResultEntity = sessionResultEntity;
}
public PacketResultEntity getPacketResultEntity() {
return packetResultEntity;
}
public void setPacketResultEntity(PacketResultEntity packetResultEntity) {
this.packetResultEntity = packetResultEntity;
}
public ByteResultEntity getByteResultEntity() {
return byteResultEntity;
}
public void setByteResultEntity(ByteResultEntity byteResultEntity) {
this.byteResultEntity = byteResultEntity;
}
}

View File

@@ -0,0 +1,200 @@
package com.galaxy.tsg.pojo;
import java.io.Serializable;
public class SessionEntity implements Serializable{
public String client_ip ;
public String app ;
public long recv_time ;
public String server_fqdn;
public long flags;
public String decoded_as ;
public String server_ip ;
public String http_host ;
public String ssl_sni ;
public String dtls_sni ;
public String quic_sni ;
public long vsys_id ;
public String device_group ;
public String device_id ;
public String data_center;
public String subscriber_id;
public long sent_pkts;
public long received_pkts;
public long sent_bytes ;
public long received_bytes;
public SessionEntity() {
}
public String getDevice_id() {
return device_id;
}
public void setDevice_id(String device_id) {
this.device_id = device_id;
}
public String getClient_ip() {
return client_ip;
}
public void setClient_ip(String client_ip) {
this.client_ip = client_ip;
}
public String getApp() {
return app;
}
public void setApp(String common_app_label) {
this.app = app;
}
public long getRecv_time() {
return recv_time;
}
public void setRecv_time(long recv_time) {
this.recv_time = recv_time;
}
public String getDecoded_as() {
return decoded_as;
}
public void setDecoded_as(String decoded_as) {
this.decoded_as = decoded_as;
}
public String getServer_ip() {
return server_ip;
}
public void setServer_ip(String server_ip) {
this.server_ip = server_ip;
}
public String getHttp_host() {
return http_host;
}
public void setHttp_host(String http_host) {
this.http_host = http_host;
}
public String getSsl_sni() {
return ssl_sni;
}
public void setSsl_sni(String ssl_sni) {
this.ssl_sni = ssl_sni;
}
public String getDtls_sni() {
return dtls_sni;
}
public void setDtls_sni(String dtls_sni) {
this.dtls_sni = dtls_sni;
}
public String getQuic_sni() {
return quic_sni;
}
public void setQuic_sni(String quic_sni) {
this.quic_sni = quic_sni;
}
public long getVsys_id() {
return vsys_id;
}
public void setVsys_id(long common_vsys_id) {
this.vsys_id = vsys_id;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public String getSubscriber_id() {
return subscriber_id;
}
public void setSubscriber_id(String subscriber_id) {
this.subscriber_id = subscriber_id;
}
public long getSent_pkts() {
return sent_pkts;
}
public void setSent_pkts(long sent_pkts) {
this.sent_pkts = sent_pkts;
}
public long getReceived_pkts() {
return received_pkts;
}
public void setReceived_pkts(long received_pkts) {
this.received_pkts = received_pkts;
}
public long getSent_bytes() {
return sent_bytes;
}
public void setSent_bytes(long sent_bytes) {
this.sent_bytes = sent_bytes;
}
public long getReceived_bytes() {
return received_bytes;
}
public void setReceived_bytes(long received_bytes) {
this.received_bytes = received_bytes;
}
public String getServer_fqdn() {
return server_fqdn;
}
public void setServer_fqdn(String server_fqdn) {
this.server_fqdn = server_fqdn;
}
public long getFlags() {
return flags;
}
public void setFlags(long flags) {
this.flags = flags;
}
}

View File

@@ -1,6 +1,6 @@
package com.galaxy.tsg.pojo;
public class sessionResultEntity implements Comparable<sessionResultEntity> {
public class SessionResultEntity implements Comparable<SessionResultEntity> {
@@ -189,7 +189,7 @@ public class sessionResultEntity implements Comparable<sessionResultEntity> {
}
@Override
public int compareTo(sessionResultEntity per) {
public int compareTo(SessionResultEntity per) {
if(this.sessions>=per.sessions){
return 1 ;
}else{

View File

@@ -2,32 +2,32 @@ package com.galaxy.tsg.pojo;
import java.io.Serializable;
public class transformEntity implements Serializable {
public class TransformEntity implements Serializable,Cloneable {
public int ifError;
public long timestamp;
private int ifError;
private long timestamp;
public String fqdn;
public String server_ip ;
public String domain ;
public long vsys_id ;
public String device_group ;
public String client_ip ;
public String device_id;
public String data_center;
public String internal_ip;
public String external_ip;
public String subscriber_id;
public long sessions;
public long out_bytes;
public long in_bytes;
public long out_pkts ;
public long in_pkts ;
public String key_by;
public String l4_protocol;
private String fqdn;
private String server_ip ;
private String domain ;
private long vsys_id ;
private String device_group ;
private String client_ip ;
private String device_id;
private String data_center;
private String internal_ip;
private String external_ip;
private String subscriber_id;
private long sessions;
private long out_bytes;
private long in_bytes;
private long out_pkts ;
private long in_pkts ;
private String key_by;
private String l4_protocol;
public transformEntity() {
public TransformEntity() {
}
public String getL4_protocol() {
@@ -191,4 +191,9 @@ public class transformEntity implements Serializable {
public void setKey_by(String key_by) {
this.key_by = key_by;
}
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}

View File

@@ -1,56 +0,0 @@
package com.galaxy.tsg.pojo;
public class resultEntity {
private String order_by;
private Long stat_time;
private com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity;
private com.galaxy.tsg.pojo.packetResultEntity packetResultEntity;
private com.galaxy.tsg.pojo.byteResultEntity byteResultEntity;
public String getOrder_by() {
return order_by;
}
public void setOrder_by(String order_by) {
this.order_by = order_by;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
public com.galaxy.tsg.pojo.sessionResultEntity getSessionResultEntity() {
return sessionResultEntity;
}
public void setSessionResultEntity(com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity) {
this.sessionResultEntity = sessionResultEntity;
}
public com.galaxy.tsg.pojo.packetResultEntity getPacketResultEntity() {
return packetResultEntity;
}
public void setPacketResultEntity(com.galaxy.tsg.pojo.packetResultEntity packetResultEntity) {
this.packetResultEntity = packetResultEntity;
}
public com.galaxy.tsg.pojo.byteResultEntity getByteResultEntity() {
return byteResultEntity;
}
public void setByteResultEntity(com.galaxy.tsg.pojo.byteResultEntity byteResultEntity) {
this.byteResultEntity = byteResultEntity;
}
}

View File

@@ -1,204 +0,0 @@
package com.galaxy.tsg.pojo;
import java.io.Serializable;
public class sessionEntity implements Serializable {
public String common_client_ip ;
public String common_app_label ;
public long common_recv_time ;
public String common_server_fqdn;
public long common_flags;
public String common_schema_type ;
public String common_server_ip ;
public String http_domain ;
public long common_vsys_id ;
public String common_device_group ;
public String common_device_id ;
public String common_data_center;
public String common_l4_protocol;
public String common_internal_ip;
public String common_external_ip;
public String common_subscriber_id;
public long common_sessions;
public long common_c2s_pkt_num;
public long common_s2c_pkt_num;
public long common_c2s_byte_num ;
public long common_s2c_byte_num ;
public sessionEntity() {
}
public String getCommon_device_id() {
return common_device_id;
}
public void setCommon_device_id(String common_device_id) {
this.common_device_id = common_device_id;
}
public String getCommon_client_ip() {
return common_client_ip;
}
public void setCommon_client_ip(String common_client_ip) {
this.common_client_ip = common_client_ip;
}
public String getCommon_app_label() {
return common_app_label;
}
public void setCommon_app_label(String common_app_label) {
this.common_app_label = common_app_label;
}
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public String getCommon_schema_type() {
return common_schema_type;
}
public void setCommon_schema_type(String common_schema_type) {
this.common_schema_type = common_schema_type;
}
public String getCommon_server_ip() {
return common_server_ip;
}
public void setCommon_server_ip(String common_server_ip) {
this.common_server_ip = common_server_ip;
}
public String getHttp_domain() {
return http_domain;
}
public void setHttp_domain(String http_domain) {
this.http_domain = http_domain;
}
public long getCommon_vsys_id() {
return common_vsys_id;
}
public void setCommon_vsys_id(long common_vsys_id) {
this.common_vsys_id = common_vsys_id;
}
public String getCommon_device_group() {
return common_device_group;
}
public void setCommon_device_group(String common_device_group) {
this.common_device_group = common_device_group;
}
public String getCommon_data_center() {
return common_data_center;
}
public void setCommon_data_center(String common_data_center) {
this.common_data_center = common_data_center;
}
public String getCommon_l4_protocol() {
return common_l4_protocol;
}
public void setCommon_l4_protocol(String common_l4_protocol) {
this.common_l4_protocol = common_l4_protocol;
}
public String getCommon_internal_ip() {
return common_internal_ip;
}
public void setCommon_internal_ip(String common_internal_ip) {
this.common_internal_ip = common_internal_ip;
}
public String getCommon_external_ip() {
return common_external_ip;
}
public void setCommon_external_ip(String common_external_ip) {
this.common_external_ip = common_external_ip;
}
public String getCommon_subscriber_id() {
return common_subscriber_id;
}
public void setCommon_subscriber_id(String common_subscriber_id) {
this.common_subscriber_id = common_subscriber_id;
}
public long getCommon_sessions() {
return common_sessions;
}
public void setCommon_sessions(long common_sessions) {
this.common_sessions = common_sessions;
}
public long getCommon_c2s_pkt_num() {
return common_c2s_pkt_num;
}
public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
this.common_c2s_pkt_num = common_c2s_pkt_num;
}
public long getCommon_s2c_pkt_num() {
return common_s2c_pkt_num;
}
public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
this.common_s2c_pkt_num = common_s2c_pkt_num;
}
public long getCommon_c2s_byte_num() {
return common_c2s_byte_num;
}
public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
this.common_c2s_byte_num = common_c2s_byte_num;
}
public long getCommon_s2c_byte_num() {
return common_s2c_byte_num;
}
public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
this.common_s2c_byte_num = common_s2c_byte_num;
}
public String getCommon_server_fqdn() {
return common_server_fqdn;
}
public void setCommon_server_fqdn(String common_server_fqdn) {
this.common_server_fqdn = common_server_fqdn;
}
public long getCommon_flags() {
return common_flags;
}
public void setCommon_flags(long common_flags) {
this.common_flags = common_flags;
}
}

View File

@@ -0,0 +1,48 @@
package com.galaxy.tsg.selector;
import com.galaxy.tsg.pojo.TransformEntity;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple5;
public class GroupBySelector implements KeySelector<TransformEntity, Tuple5<String, Long, String, String, String>> {
public String key;
public GroupBySelector(String key) {
this.key = key;
}
@Override
public Tuple5<String, Long, String, String, String> getKey(TransformEntity transformEntity) throws Exception {
Tuple5<String, Long, String, String, String> tuple = null;
transformEntity.setKey_by(key);
switch (key) {
case "client_ip":
tuple = new Tuple5<>(transformEntity.getClient_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "server_ip":
tuple = new Tuple5<>(transformEntity.getServer_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "internal_ip":
tuple = new Tuple5<>(transformEntity.getInternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "external_ip":
tuple = new Tuple5<>(transformEntity.getExternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "server_domain":
tuple = new Tuple5<>(transformEntity.getDomain(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "subscriber_id":
tuple = new Tuple5<>(transformEntity.getSubscriber_id(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
case "server_fqdn":
tuple = new Tuple5<>(transformEntity.getFqdn(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
break;
default:
}
return tuple;
}
}

View File

@@ -0,0 +1,13 @@
package com.galaxy.tsg.selector;
import com.galaxy.tsg.pojo.ResultEntity;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
public class OrderBySelector implements KeySelector<ResultEntity, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ResultEntity entity) throws Exception {
return new Tuple1<>(entity.getOrder_by());
}
}

View File

@@ -0,0 +1,114 @@
package com.galaxy.tsg.util;
import com.galaxy.tsg.config.CommonConfig;
import com.galaxy.tsg.pojo.SessionEntity;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Optional;
import java.util.Properties;
import static com.galaxy.tsg.config.CommonConfig.KAFKA_CONSUMER_TOPIC;
import static com.galaxy.tsg.config.CommonConfig.KAFKA_PRODUCER_TOPIC;
public class KafkaUtils {
public static Properties getKafkaSourceProperty(Configuration configuration) {
Properties properties = new Properties();
properties.setProperty("group.id", configuration.getString(CommonConfig.KAFKA_CONSUMER_GROUP_ID));
properties.setProperty("bootstrap.servers",configuration.getString(CommonConfig.KAFKA_CONSUMER_BROKER));
properties.setProperty("session.timeout.ms", configuration.getString(CommonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS));
properties.setProperty("max.poll.records", configuration.getString(CommonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD));
properties.setProperty("max.partition.fetch.bytes", configuration.getString(CommonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES));
switch (configuration.getInteger(CommonConfig.KAFKA_CONSUMER_SECURITY)) {
case 1:
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", configuration.getString(CommonConfig.TOOLS_CONSUMER_LIBRARY) + "keystore.jks");
properties.put("ssl.keystore.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN));
properties.put("ssl.truststore.location", configuration.getString(CommonConfig.TOOLS_CONSUMER_LIBRARY) + "truststore.jks");
properties.put("ssl.truststore.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN));
properties.put("ssl.key.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN));
break;
case 2:
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ configuration.getString(CommonConfig.KAFKA_CONSUMER_USER) + " password=" + configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN) + ";");
break;
default:
}
return properties;
}
private static Properties getKafkaSinkProperty(Configuration configuration) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",configuration.getString(CommonConfig.KAFKA_PRODUCER_BROKER));
properties.put("acks", "1");
properties.put("retries", configuration.getString(CommonConfig.KAFKA_PRODUCER_RETRIES));
properties.put("linger.ms", configuration.getString(CommonConfig.KAFKA_PRODUCER_LINGER_MS));
properties.put("request.timeout.ms", configuration.getString(CommonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS));
properties.put("batch.size", configuration.getString(CommonConfig.KAFKA_PRODUCER_BATCH_SIZE));
properties.put("buffer.memory", configuration.getString(CommonConfig.KAFKA_PRODUCER_BUFFER_MEMORY));
properties.put("max.request.size", configuration.getString(CommonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE));
properties.put("compression.type", configuration.getString(CommonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE));
switch (configuration.getInteger(CommonConfig.KAFKA_PRODUCER_SECURITY)) {
case 1:
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", configuration.getString(CommonConfig.TOOLS_PRODUCER_LIBRARY) + "keystore.jks");
properties.put("ssl.keystore.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN));
properties.put("ssl.truststore.location", configuration.getString(CommonConfig.TOOLS_PRODUCER_LIBRARY) + "truststore.jks");
properties.put("ssl.truststore.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN));
properties.put("ssl.key.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN));
break;
case 2:
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ configuration.getString(CommonConfig.KAFKA_PRODUCER_USER) + " password=" + configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN) + ";");
break;
default:
}
return properties;
}
public static FlinkKafkaConsumer<SessionEntity> getKafkaConsumer(Configuration configuration) {
FlinkKafkaConsumer<SessionEntity> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.getString(KAFKA_CONSUMER_TOPIC),
new TimestampDeserializationSchema(), getKafkaSourceProperty(configuration));
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
/* public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), getKafkaSourceProperty(configuration));
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}*/
public static SinkFunction<String> getKafkaSink(Configuration configuration) {
FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(
configuration.getString(KAFKA_PRODUCER_TOPIC),
new SimpleStringSchema(),
getKafkaSinkProperty(configuration),
Optional.empty()
);
flinkKafkaProducer.setLogFailuresOnly(false);
return flinkKafkaProducer;
}
}

View File

@@ -0,0 +1,48 @@
package com.galaxy.tsg.util;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.galaxy.tsg.pojo.SessionEntity;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* @author qidaijie
* @version 2022/3/89:42
*/
public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
private static final Log logger = LogFactory.get();
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(new TypeHint<SessionEntity>() {});
}
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
@Override
@SuppressWarnings("unchecked")
public SessionEntity deserialize(ConsumerRecord record) throws Exception {
if (record != null) {
try {
SessionEntity sessionEntity = JSON.parseObject((byte[]) record.value(), SessionEntity.class);
sessionEntity.setRecv_time(record.timestamp());
return sessionEntity;
} catch (RuntimeException e) {
logger.error(
"KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
e.printStackTrace();
}
}
return new SessionEntity();
}
}

View File

@@ -1,109 +0,0 @@
package com.galaxy.tsg.util;
import com.galaxy.tsg.config.commonConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
public class kafkaUtils {
public static Properties getKafkaSourceProperty() {
Properties properties = new Properties();
properties.setProperty("group.id", commonConfig.KAFKA_CONSUMER_GROUP_ID);
properties.setProperty("bootstrap.servers", commonConfig.KAFKA_CONSUMER_BROKER);
properties.setProperty("session.timeout.ms", commonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS);
properties.setProperty("max.poll.records", commonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD);
properties.setProperty("max.partition.fetch.bytes", commonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES);
switch (commonConfig.KAFKA_CONSUMER_SECURITY) {
case 1:
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", commonConfig.TOOLS_CONSUMER_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", commonConfig.KAFKA_CONSUMER_PIN);
properties.put("ssl.truststore.location", commonConfig.TOOLS_CONSUMER_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", commonConfig.KAFKA_CONSUMER_PIN);
properties.put("ssl.key.password", commonConfig.KAFKA_CONSUMER_PIN);
break;
case 2:
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ commonConfig.KAFKA_CONSUMER_USER + " password=" + commonConfig.KAFKA_CONSUMER_PIN + ";");
break;
default:
}
return properties;
}
private static Properties getKafkaSinkProperty() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", commonConfig.KAFKA_PRODUCER_BROKER);
properties.put("acks", "1");
properties.put("retries", commonConfig.KAFKA_PRODUCER_RETRIES);
properties.put("linger.ms", commonConfig.KAFKA_PRODUCER_LINGER_MS);
properties.put("request.timeout.ms", commonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS);
properties.put("batch.size", commonConfig.KAFKA_PRODUCER_BATCH_SIZE);
properties.put("buffer.memory", commonConfig.KAFKA_PRODUCER_BUFFER_MEMORY);
properties.put("max.request.size", commonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE);
properties.put("compression.type", commonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE);
switch (commonConfig.KAFKA_PRODUCER_SECURITY) {
case 1:
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", commonConfig.TOOLS_PRODUCER_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", commonConfig.KAFKA_PRODUCER_PIN);
properties.put("ssl.truststore.location", commonConfig.TOOLS_PRODUCER_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", commonConfig.KAFKA_PRODUCER_PIN);
properties.put("ssl.key.password", commonConfig.KAFKA_PRODUCER_PIN);
break;
case 2:
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ commonConfig.KAFKA_PRODUCER_USER + " password=" + commonConfig.KAFKA_PRODUCER_PIN + ";");
break;
default:
}
return properties;
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), getKafkaSourceProperty());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), getKafkaSourceProperty());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
public static SinkFunction<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty(),
Optional.empty()
);
}
}

View File

@@ -1,37 +1,37 @@
#--------------------------------Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ------------------------------#
#kafka<6B>ĵ<EFBFBD>ַ<EFBFBD><D6B7>Ϣ
kafka.consumer.broker=192.168.44.11:9092
kafka.consumer.broker=192.168.54.241:9092
#kafka <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>topic
kafka.consumer.topic=SESSION-RECORD-COMPLETED
kafka.consumer.topic=SESSION-RECORD
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
kafka.consumer.group.id=topn-metrics-job-20231101-t1
kafka.consumer.group.id=topn-metrics-job-20231101-t1-t
#--------------------------------Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ------------------------------#
#kafka<6B>ĵ<EFBFBD>ַ<EFBFBD><D6B7>Ϣ
kafka_producer_broker=192.168.44.12:9092
kafka.producer.broker=192.168.44.12:9094
kafka.producer.topic=TRAFFIC-TOP-METRIC
#--------------------------------topology<67><79><EFBFBD><EFBFBD>------------------------------#
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
job.name=TOPN-METRICS-JOB
job.name=agg_session_record_topn
#source<63><65><EFBFBD>ж<EFBFBD>
kafka.consumer.parallelism=1
kafka.consumer.parallelism=3
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ж<EFBFBD>
task.parallelism=1
task.parallelism=3
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ж<EFBFBD>
orderby.parallelism=1
orderby.parallelism=3
#<23><><EFBFBD><EFBFBD>жȣ<D0B6>ͨ<EFBFBD><CDA8><EFBFBD><EFBFBD><EFBFBD><EFBFBD>orderby.parallelism
sink.parallelism=1
sink.parallelism=3
#<23><><EFBFBD><EFBFBD><EFBFBD>ӳٵȴ<D9B5>ʱ<EFBFBD>䵥λ<E4B5A5><CEBB>
watermark.time=60
watermark.time=90
#top<6F><70><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
top.limit=10000
@@ -41,13 +41,7 @@ window.time.minute=5
#--------------------------------Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>------------------------------#
#kafka source poll
kafka.consumer.max.poll.records=3000
#kafka source connection timeout
kafka.consumer.session.timeout.ms=60000
#kafka source poll bytes
kafka.consumer.max.partition.fetch.bytes=31457280
#kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȫ<EFBFBD><C8AB>֤ 0<><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1SSL 2 SASL
kafka.consumer.security=0
@@ -62,7 +56,7 @@ kafka.consumer.pin=galaxy2019
tools.consumer.library=/home/bigdata/topology/dat/
#kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȫ<EFBFBD><C8AB>֤ 0<><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1SSL 2 SASL
kafka.producer.security=0
kafka.producer.security=2
#kafka SASL<53><4C>֤<EFBFBD>û<EFBFBD><C3BB><EFBFBD>
kafka.producer.user=admin
@@ -73,23 +67,5 @@ kafka.producer.pin=galaxy2019
#1SSL<53><4C>Ҫ
tools.producer.library=/home/bigdata/topology/dat/
#producer<65><72><EFBFBD>ԵĴ<D4B5><C4B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
kafka.producer.retries=1
#<23><><EFBFBD>ĺ<EFBFBD><C4BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˵һ<CBB5><D2BB>Batch<63><68><EFBFBD><EFBFBD><EFBFBD><EFBFBD>֮<EFBFBD><D6AE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ã<EFBFBD><C3A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Batch<63><68>û<EFBFBD><C3BB>д<EFBFBD><D0B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͳ<EFBFBD>ȥ<EFBFBD><C8A5>
kafka.producer.linger.ms=1
#<23><><EFBFBD><EFBFBD><EFBFBD>ڳ<EFBFBD>ʱ֮ǰδ<C7B0>յ<EFBFBD><D5B5><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD>ͻ<EFBFBD><CDBB>˽<EFBFBD><CBBD>ڱ<EFBFBD>Ҫʱ<D2AA><CAB1><EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
kafka.producer.request.timeout.ms=30000
#producer<65><72><EFBFBD>ǰ<EFBFBD><C7B0><EFBFBD>batch<63><68><EFBFBD>з<EFBFBD><D0B7>͵<EFBFBD>,<2C><><EFBFBD>δ<EFBFBD>С<EFBFBD><D0A1>Ĭ<EFBFBD><C4AC>:16384
kafka.producer.batch.size=262144
#Producer<65><72><EFBFBD><EFBFBD><EFBFBD>ڻ<EFBFBD><DABB><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD>Ļ<EFBFBD><C4BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С
kafka.producer.buffer.memory=134217728
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ÿ<EFBFBD>η<EFBFBD><CEB7>͸<EFBFBD>Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С<><C4AC>1048576
kafka.producer.max.request.size=10485760
#<23><><EFBFBD><EFBFBD>kafkaѹ<61><D1B9><EFBFBD><EFBFBD><EFBFBD>ͣ<EFBFBD>Ĭ<EFBFBD>ϲ<EFBFBD><CFB2><EFBFBD><EFBFBD><EFBFBD>
kafka.producer.compression.type=none

View File

@@ -1,7 +0,0 @@
package com.galaxy.tsg.catalog;
public class CatalogTest {
public static void main(String[] args) {
}
}

View File

@@ -0,0 +1,110 @@
package com.galaxy.tsg.top;
import com.alibaba.fastjson2.JSON;
import com.galaxy.tsg.function.FlatMapFunction;
import com.galaxy.tsg.pojo.SessionEntity;
import com.galaxy.tsg.pojo.TransformEntity;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.*;
public class TopTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String jsonString1 = "{\"client_ip\":\"192.168.1.1\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":24,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.1\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":1,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }";
String jsonString2 = "{\"client_ip\":\"192.168.1.2\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":8,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.2\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":2,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }";
String jsonString3 = "{\"client_ip\":\"192.168.1.3\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":16,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.3\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":3,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }";
String jsonString4 = "{\"client_ip\":\"192.168.1.4\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":1,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.4\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":4,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }";
// configure your test environment
SessionEntity sessionEntity1 = JSON.parseObject(jsonString1, SessionEntity.class);
SessionEntity sessionEntity2 = JSON.parseObject(jsonString2, SessionEntity.class);
SessionEntity sessionEntity3 = JSON.parseObject(jsonString3, SessionEntity.class);
SessionEntity sessionEntity4 = JSON.parseObject(jsonString4, SessionEntity.class);
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
ParameterTool serviceConfig = ParameterTool.fromPropertiesFile("src\\main\\resources\\common.properties");
Configuration configurationService = serviceConfig.getConfiguration();
// global register
env.getConfig().setGlobalJobParameters(configurationService);
// create a stream of custom elements and apply transformations
env.fromElements(sessionEntity1,sessionEntity2,sessionEntity3,sessionEntity4)
.flatMap(new FlatMapFunction())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertEquals("192.168.2.1", CollectSink.values.get(0).getInternal_ip());
assertEquals("192.168.1.1", CollectSink.values.get(1).getInternal_ip());
assertNull(CollectSink.values.get(0).getExternal_ip());
assertNull(CollectSink.values.get(1).getExternal_ip());
assertEquals("192.168.1.2", CollectSink.values.get(2).getInternal_ip());
assertEquals("192.168.2.2", CollectSink.values.get(2).getExternal_ip());
assertEquals("192.168.2.3", CollectSink.values.get(3).getInternal_ip());
assertEquals("192.168.1.3", CollectSink.values.get(3).getExternal_ip());
assertEquals("192.168.2.4", CollectSink.values.get(4).getExternal_ip());
assertEquals("192.168.1.4", CollectSink.values.get(5).getExternal_ip());
assertNull(CollectSink.values.get(4).getInternal_ip());
assertNull(CollectSink.values.get(5).getInternal_ip());
assertEquals(6, CollectSink.values.size());
assertEquals("bangcdn.net", CollectSink.values.get(0).getDomain());
}
// create a testing sink
private static class CollectSink implements SinkFunction<TransformEntity> {
// must be static
public static final List<TransformEntity> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(TransformEntity value, SinkFunction.Context context) throws Exception {
values.add(value);
}
}
}