From 4965ac02310f0a311754a34c855f49eed557e46d Mon Sep 17 00:00:00 2001
From: kingwide <572159764@qq.com>
Date: Tue, 28 Feb 2023 15:26:17 +0800
Subject: [PATCH] =?UTF-8?q?topn=5Fmetrics=E5=9F=BA=E7=A1=80=E5=8A=9F?=
=?UTF-8?q?=E8=83=BD=E5=AE=9E=E7=8E=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 240 ++++++++++++++
src/main/java/com/galaxy/tsg/Toptask.java | 297 ++++++++++++++++++
.../tsg/config/CommonConfigurations.java | 62 ++++
.../com/galaxy/tsg/config/commonConfig.java | 49 +++
.../com/galaxy/tsg/function/TopNHotItems.java | 128 ++++++++
.../tsg/function/TopNHotItemsForUrl.java | 68 ++++
.../tsg/function/UrlAggregationReduce.java | 13 +
.../function/metricsAggregationReduce.java | 18 ++
.../galaxy/tsg/function/metricsCalculate.java | 243 ++++++++++++++
.../tsg/function/metricsCalculateForApp.java | 43 +++
.../tsg/function/metricsCalculateForUrl.java | 70 +++++
.../java/com/galaxy/tsg/pojo/AppEntity.java | 95 ++++++
.../com/galaxy/tsg/pojo/ByteResultEntity.java | 152 +++++++++
src/main/java/com/galaxy/tsg/pojo/Entity.java | 201 ++++++++++++
.../galaxy/tsg/pojo/PacketResultEntity.java | 163 ++++++++++
.../com/galaxy/tsg/pojo/ResultEntity.java | 63 ++++
.../galaxy/tsg/pojo/SessionResultEntity.java | 151 +++++++++
.../com/galaxy/tsg/pojo/TopUrlEntity.java | 54 ++++
.../java/com/galaxy/tsg/pojo/UrlEntity.java | 55 ++++
.../java/com/galaxy/tsg/util/KafkaUtils.java | 109 +++++++
src/main/resources/common.properties | 59 ++++
.../com/galaxy/tsg/catalog/CatalogTest.java | 7 +
22 files changed, 2340 insertions(+)
create mode 100644 pom.xml
create mode 100644 src/main/java/com/galaxy/tsg/Toptask.java
create mode 100644 src/main/java/com/galaxy/tsg/config/CommonConfigurations.java
create mode 100644 src/main/java/com/galaxy/tsg/config/commonConfig.java
create mode 100644 src/main/java/com/galaxy/tsg/function/TopNHotItems.java
create mode 100644 src/main/java/com/galaxy/tsg/function/TopNHotItemsForUrl.java
create mode 100644 src/main/java/com/galaxy/tsg/function/UrlAggregationReduce.java
create mode 100644 src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java
create mode 100644 src/main/java/com/galaxy/tsg/function/metricsCalculate.java
create mode 100644 src/main/java/com/galaxy/tsg/function/metricsCalculateForApp.java
create mode 100644 src/main/java/com/galaxy/tsg/function/metricsCalculateForUrl.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/AppEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/Entity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/ResultEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/TopUrlEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/pojo/UrlEntity.java
create mode 100644 src/main/java/com/galaxy/tsg/util/KafkaUtils.java
create mode 100644 src/main/resources/common.properties
create mode 100644 src/test/java/com/galaxy/tsg/catalog/CatalogTest.java
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..7625ada
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,240 @@
+
+
+ 4.0.0
+
+ com.galaxy.tsg
+ flink-top-task
+ 22-02-22
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public
+
+
+
+
+ 1.13.1
+ 2.7.1
+
+
+
+
+ com.zdjizhi
+ galaxy
+ 1.1.0
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+ cn.hutool
+ hutool-all
+ 5.5.2
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.21
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.21
+
+
+
+ org.apache.flink
+ flink-streaming-java_2.12
+ 1.13.1
+ provided
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_2.12
+ ${flink.version}
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.9
+
+
+ org.apache.hbase
+ hbase-client
+ 2.2.3
+
+
+ org.apache.hadoop
+ hadoop-client
+ 2.7.1
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+ servlet-api
+ javax.servlet
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ provided
+
+
+ com.alibaba
+ fastjson
+ 1.2.70
+
+
+ org.apache.flink
+ flink-csv
+ ${flink.version}
+ provided
+
+
+ com.opencsv
+ opencsv
+ 3.3
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-clients_2.11
+ ${flink.version}
+ provided
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.0
+
+ 1.8
+ 1.8
+
+ false
+
+
+ -Xpkginfo:always
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ flink-top-task
+ package
+
+ shade
+
+
+
+ flink-top-task-23-02-22
+
+
+
+ *:*
+
+ META-INF
+
+
+
+
+
+ com.galaxy.tsg.Toptask
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java
new file mode 100644
index 0000000..f0030bf
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/Toptask.java
@@ -0,0 +1,297 @@
+package com.galaxy.tsg;
+
+import com.alibaba.fastjson.JSON;
+import com.galaxy.tsg.function.*;
+import com.galaxy.tsg.pojo.Entity;
+import com.galaxy.tsg.pojo.ResultEntity;
+import com.galaxy.tsg.pojo.UrlEntity;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static com.galaxy.tsg.config.commonConfig.*;
+import static com.galaxy.tsg.util.KafkaUtils.*;
+
+public class Toptask {
+ private static final Logger LOG = LoggerFactory.getLogger(Toptask.class);
+
+ public static void main(String[] args) throws Exception {
+
+ //1.创建执行环境
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ //指定使用事件时间
+ //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+
+ DataStream sourceForSession = env.addSource(getKafkaConsumer("SESSION-RECORD-COMPLETED")).setParallelism(KAFKA_CONSUMER_PARALLELISM);
+ WatermarkStrategy strategyForSession = WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME))
+ .withTimestampAssigner((Entity, timestamp) -> Entity.getCommon_recv_time() * 1000);
+
+ List topics = new LinkedList<>();
+ topics.add("SECURITY-EVENT-COMPLETED");
+ topics.add("PROXY-EVENT-COMPLETED");
+ DataStream sourceForUrl = env.addSource(getKafkaConsumerLists(topics)).setParallelism(KAFKA_CONSUMER_TOPURL_PARALLELISM);
+ WatermarkStrategy strategyForSecurity = WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME))
+ .withTimestampAssigner((UrlEntity, timestamp) -> UrlEntity.getCommon_recv_time() * 1000);
+
+
+ SingleOutputStreamOperator inputForSession = sourceForSession.map(new MapFunction() {
+ @Override
+ public Entity map(String message) {
+ Entity entity = new Entity();
+ try {
+ entity = JSON.parseObject(message, Entity.class);
+
+ } catch (Exception e) {
+ LOG.error("Entity Parsing ERROR");
+ entity.setIfError(1);
+ }
+ return entity;
+ }
+ }).filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity entity) throws Exception {
+
+ return entity.ifError != 1;
+ }
+ });
+
+ SingleOutputStreamOperator inputForUrl = sourceForUrl.map(new MapFunction() {
+ @Override
+ public UrlEntity map(String message) {
+ UrlEntity entity = new UrlEntity();
+ try {
+ entity = JSON.parseObject(message, UrlEntity.class);
+
+ } catch (Exception e) {
+ LOG.error("Entity Parsing ERROR");
+ entity.setIfError(1);
+ }
+ return entity;
+ }
+ }).filter(new FilterFunction() {
+ @Override
+ public boolean filter(UrlEntity entity) throws Exception {
+
+ return entity.ifError != 1;
+ }
+ });
+
+ switch (TMP_TEST_TYPE) {
+ case 1:
+
+ //clientip聚合TOP
+
+ SingleOutputStreamOperator clientipdStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ SingleOutputStreamOperator windowedStream = clientipdStream.keyBy(new groupBySelector("common_client_ip"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
+ DataStream windoweddStream = windowedStream.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+ windoweddStream.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
+
+ //serverip聚合TOP
+
+ SingleOutputStreamOperator serveripdStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ SingleOutputStreamOperator windowedStreamForServerIp = serveripdStream.keyBy(new groupBySelector("common_server_ip"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM);
+ DataStream windoweddStreamForServerIp = windowedStreamForServerIp.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+ windoweddStreamForServerIp.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
+
+
+ //common_internal_ip聚合TOP
+ SingleOutputStreamOperator internalStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_internal_ip());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ SingleOutputStreamOperator windowedStreamForInternal = internalStream.keyBy(new groupBySelector("common_internal_ip"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM);
+ DataStream WindoweddStreamForInternal = windowedStreamForInternal.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+ WindoweddStreamForInternal.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
+
+ //common_external_ip聚合TOP
+
+ SingleOutputStreamOperator externalStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_external_ip());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ SingleOutputStreamOperator windowedStreamForExternal = externalStream.keyBy(new groupBySelector("common_external_ip"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM);
+ DataStream WindoweddStreamForExternal = windowedStreamForExternal.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+ WindoweddStreamForExternal.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
+
+ //http_domain聚合TOP
+
+ SingleOutputStreamOperator domainStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getHttp_domain());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ SingleOutputStreamOperator windowedStreamForDomain = domainStream.keyBy(new groupBySelector("http_domain"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM);
+ DataStream WindoweddStreamForDomain = windowedStreamForDomain.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+ WindoweddStreamForDomain.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
+
+ SingleOutputStreamOperator userStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_subscriber_id());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ //common_subscriber_id聚合TOP
+ SingleOutputStreamOperator windowedStreamForUser = userStream.keyBy(new groupBySelector("common_subscriber_id"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM);
+ DataStream WindoweddStreamForUser = windowedStreamForUser.keyBy(new oneKeySelector())
+ .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
+ WindoweddStreamForUser.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
+
+ SingleOutputStreamOperator appNameStream = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_app_label());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+
+ //common_app_label聚合求全量
+ appNameStream.keyBy(new groupBySelector("common_app_label"))
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
+
+
+ SingleOutputStreamOperator UrlStream = inputForUrl.filter(new FilterFunction() {
+ @Override
+ public boolean filter(UrlEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getHttp_url());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSecurity);
+
+ //url聚合session求top
+ SingleOutputStreamOperator windowedStreamForUrl = UrlStream.keyBy(new twoKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .reduce(new UrlAggregationReduce(), new metricsCalculateForUrl(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM);
+ DataStream WindoweddStreamForUrl = windowedStreamForUrl.keyBy(new oneKeySelector())
+ .process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1);
+ WindoweddStreamForUrl.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
+
+ break;
+ case 2:
+ //datasketch
+ break;
+
+ }
+
+
+ env.execute("TOP-task");
+
+ }
+
+
+ public static class groupBySelector implements KeySelector> {
+
+ public String key;
+
+ public groupBySelector(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public Tuple4 getKey(Entity entity) throws Exception {
+
+ Tuple4 tuple = null;
+ switch (key) {
+ case "common_client_ip":
+ tuple = new Tuple4<>(entity.getCommon_client_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+ case "common_server_ip":
+ tuple = new Tuple4<>(entity.getCommon_server_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+ case "common_internal_ip":
+ tuple = new Tuple4<>(entity.getCommon_internal_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+ case "common_external_ip":
+ tuple = new Tuple4<>(entity.getCommon_external_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+ case "http_domain":
+ tuple = new Tuple4<>(entity.getHttp_domain(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+
+ case "common_subscriber_id":
+ tuple = new Tuple4<>(entity.getCommon_subscriber_id(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+ case "common_app_label":
+ tuple = new Tuple4<>(entity.getCommon_app_label(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
+ break;
+
+ default:
+
+ }
+ return tuple;
+ }
+ }
+
+
+ public static class oneKeySelector implements KeySelector> {
+
+ @Override
+ public Tuple1 getKey(ResultEntity entity) throws Exception {
+ return new Tuple1<>(entity.getOrder_by());
+ }
+ }
+
+ public static class twoKeySelector implements KeySelector> {
+
+ @Override
+ public Tuple2 getKey(UrlEntity entity) throws Exception {
+ return new Tuple2<>(entity.getHttp_url(), entity.getCommon_vsys_id());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java b/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java
new file mode 100644
index 0000000..447724f
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java
@@ -0,0 +1,62 @@
+package com.galaxy.tsg.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class CommonConfigurations {
+
+ private static Properties propService = new Properties();
+
+ public static Map getHashTableProperty(String key) {
+
+ Map map = new HashMap<>();
+
+
+ String[] keyarray = propService.getProperty(key).split(",");
+ for(String k :keyarray){
+
+ if(k!=null && !"".equals(k.trim())){
+ map.put(k,"");
+ }
+
+ }
+
+ return map;
+ }
+
+ public static String getStringProperty(String key) {
+
+ return propService.getProperty(key);
+
+
+ }
+
+ public static Integer getIntProperty( String key) {
+
+ return Integer.parseInt(propService.getProperty(key));
+
+ }
+
+ public static Long getLongProperty(String key) {
+ return Long.parseLong(propService.getProperty(key));
+
+ }
+
+ public static Boolean getBooleanProperty(String key) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ }
+
+ static {
+ try {
+ propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
+ } catch (Exception e) {
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/config/commonConfig.java b/src/main/java/com/galaxy/tsg/config/commonConfig.java
new file mode 100644
index 0000000..845546f
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/config/commonConfig.java
@@ -0,0 +1,49 @@
+package com.galaxy.tsg.config;
+
+/**
+ * Created by wk on 2021/1/6.
+ */
+public class commonConfig {
+
+
+ public static final String KAFKA_CONSUMER_BROKER = CommonConfigurations.getStringProperty("kafka.consumer.broker");
+ public static final String KAFKA_CONSUMER_GROUP_ID = CommonConfigurations.getStringProperty("kafka.consumer.group.id");
+ public static final String KAFKA_CONSUMER_TOPIC = CommonConfigurations.getStringProperty("kafka.consumer.topic");
+ public static final int KAFKA_CONSUMER_PARALLELISM = CommonConfigurations.getIntProperty("kafka.consumer.parallelism");
+ public static final String KAFKA_CONSUMER_SESSION_TIMEOUT_MS=CommonConfigurations.getStringProperty("kafka.consumer.session.timeout.ms");
+ public static final String KAFKA_CONSUMER_MAX_POLL_RECORD=CommonConfigurations.getStringProperty("kafka.consumer.max.poll.records");
+ public static final String KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES=CommonConfigurations.getStringProperty("kafka.consumer.max.partition.fetch.bytes");
+ public static final int KAFKA_CONSUMER_TOPURL_PARALLELISM=CommonConfigurations.getIntProperty("kafka.consumer.topurl.parallelism");
+
+
+
+
+ public static final String KAFKA_PRODUCER_RETRIES = CommonConfigurations.getStringProperty("kafka.producer.retries");
+ public static final String KAFKA_PRODUCER_LINGER_MS = CommonConfigurations.getStringProperty("kafka.producer.linger.ms");
+ public static final String KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = CommonConfigurations.getStringProperty("kafka.producer.request.timeout.ms");
+ public static final String KAFKA_PRODUCER_BATCH_SIZE = CommonConfigurations.getStringProperty("kafka.producer.batch.size");
+ public static final String KAFKA_PRODUCER_BUFFER_MEMORY = CommonConfigurations.getStringProperty("kafka.producer.buffer.memory");
+ public static final String KAFKA_PRODUCER_MAX_REQUEST_SIZE = CommonConfigurations.getStringProperty("kafka.producer.max.request.size");
+ public static final String KAFKA_PRODUCER_COMPRESSION_TYPE = CommonConfigurations.getStringProperty("kafka.producer.compression.type");
+
+
+
+ public static final int TASK_PARALLELISM = CommonConfigurations.getIntProperty("task.parallelism");
+
+ public static final int WATERMARK_TIME = CommonConfigurations.getIntProperty("watermark.time");
+ public static final int WINDOW_TIME_MINUTE = CommonConfigurations.getIntProperty("window.time.minute");
+ public static final int TOP_LIMIT = CommonConfigurations.getIntProperty("top.limit");
+ public static final int URL_TOP_LIMIT = CommonConfigurations.getIntProperty("url.top.limit");
+
+ public static final String KAFKA_USER = CommonConfigurations.getStringProperty("kafka.user");
+ public static final String KAFKA_PIN = CommonConfigurations.getStringProperty("kafka.pin");
+ public static final int KAFKA_SECURITY = CommonConfigurations.getIntProperty("kafka.security");
+ public static final String TOOLS_LIBRARY = CommonConfigurations.getStringProperty("tools.library");
+
+
+ public static final String KAFKA_PRODUCER_BROKER = CommonConfigurations.getStringProperty("kafka_producer_broker");
+
+ public static final int TMP_TEST_TYPE = CommonConfigurations.getIntProperty("tmp.test.type");
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/function/TopNHotItems.java b/src/main/java/com/galaxy/tsg/function/TopNHotItems.java
new file mode 100644
index 0000000..8a28b7e
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/TopNHotItems.java
@@ -0,0 +1,128 @@
+package com.galaxy.tsg.function;
+
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.tsg.pojo.ByteResultEntity;
+import com.galaxy.tsg.pojo.PacketResultEntity;
+import com.galaxy.tsg.pojo.ResultEntity;
+import com.galaxy.tsg.pojo.SessionResultEntity;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.PriorityQueue;
+
+public class TopNHotItems extends KeyedProcessFunction, ResultEntity, String> {
+ private final int topSize;
+ // Set allSet = new TreeSet() ;
+ private PriorityQueue sessionOrderEntity ;
+ private PriorityQueue packetOrderEntity ;
+ private PriorityQueue byteOrderEntity ;
+
+
+ public TopNHotItems(int i) {
+ this.topSize = i;
+
+
+
+ }
+ @Override
+ public void open(Configuration parameters) {
+
+ this.sessionOrderEntity=new PriorityQueue<>();
+ this.packetOrderEntity=new PriorityQueue<>();
+ this.byteOrderEntity=new PriorityQueue<>();
+
+
+ }
+
+
+ @Override
+ public void processElement(ResultEntity objectEntity, Context context, Collector collector) {
+
+
+
+
+ switch(objectEntity.getOrder_by()) {
+ case "sessions":
+
+
+ if (sessionOrderEntity.size() < topSize) {
+ sessionOrderEntity.add(objectEntity.getSessionResultEntity());
+ } else {
+ if (sessionOrderEntity.peek() != null) {
+ SessionResultEntity res=sessionOrderEntity.peek();
+ if (res.getSession_num() <= objectEntity.getSessionResultEntity().getSession_num()) {
+ sessionOrderEntity.poll();
+ sessionOrderEntity.add(objectEntity.getSessionResultEntity());
+ }
+ }
+ }
+
+ break;
+ case "packets":
+ if (packetOrderEntity.size() < topSize) {
+ packetOrderEntity.add(objectEntity.getPacketResultEntity());
+ } else {
+ if (packetOrderEntity.peek() != null) {
+ PacketResultEntity res=packetOrderEntity.peek();
+ if ((res.getS2c_pkt_num()+res.getC2s_pkt_num()) <= (objectEntity.getPacketResultEntity().getC2s_pkt_num()+objectEntity.getPacketResultEntity().getS2c_pkt_num())) {
+ packetOrderEntity.poll();
+ packetOrderEntity.add(objectEntity.getPacketResultEntity());
+ }
+ }
+ }
+ break;
+ case "bytes":
+ if (byteOrderEntity.size() < topSize) {
+ byteOrderEntity.add(objectEntity.getByteResultEntity());
+ } else {
+ if (byteOrderEntity.peek() != null) {
+ ByteResultEntity res=byteOrderEntity.peek();
+ if ((res.getC2s_byte_num()+res.getS2c_byte_num()) <= (objectEntity.getByteResultEntity().getS2c_byte_num()+objectEntity.getByteResultEntity().getC2s_byte_num())) {
+ byteOrderEntity.poll();
+ byteOrderEntity.add(objectEntity.getByteResultEntity());
+ }
+ }
+ }
+ break;
+ default:
+ }
+
+ //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
+ context.timerService().registerEventTimeTimer(objectEntity.getStat_time() + 1);
+ }
+
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) {
+
+ for(SessionResultEntity en : sessionOrderEntity){
+
+ String jsonStr = JSONObject.toJSONString(en);
+ out.collect(jsonStr);
+
+ }
+
+
+ for(PacketResultEntity en : packetOrderEntity){
+
+ String jsonStr = JSONObject.toJSONString(en);
+ out.collect(jsonStr);
+
+ }
+ for(ByteResultEntity en : byteOrderEntity){
+
+ String jsonStr = JSONObject.toJSONString(en);
+ out.collect(jsonStr);
+
+ }
+
+ sessionOrderEntity.clear();
+ packetOrderEntity.clear();
+ byteOrderEntity.clear();
+
+
+
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/function/TopNHotItemsForUrl.java b/src/main/java/com/galaxy/tsg/function/TopNHotItemsForUrl.java
new file mode 100644
index 0000000..c4ad6c0
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/TopNHotItemsForUrl.java
@@ -0,0 +1,68 @@
+package com.galaxy.tsg.function;
+
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.tsg.pojo.ResultEntity;
+import com.galaxy.tsg.pojo.TopUrlEntity;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.PriorityQueue;
+
+public class TopNHotItemsForUrl extends KeyedProcessFunction, ResultEntity, String> {
+ private final int topSize;
+ // Set allSet = new TreeSet() ;
+ private PriorityQueue sessionOrderEntity ;
+
+
+ public TopNHotItemsForUrl(int i) {
+ this.topSize = i;
+
+
+
+ }
+ @Override
+ public void open(Configuration parameters) {
+
+ this.sessionOrderEntity=new PriorityQueue<>();
+
+ }
+
+
+ @Override
+ public void processElement(ResultEntity objectEntity, Context context, Collector collector) {
+
+ if (sessionOrderEntity.size() < topSize) {
+ sessionOrderEntity.add(objectEntity.getTopUrlEntity());
+ } else {
+ if (sessionOrderEntity.peek() != null) {
+ TopUrlEntity res=sessionOrderEntity.peek();
+ if (res.getSession_num() <= objectEntity.getTopUrlEntity().getSession_num()) {
+ sessionOrderEntity.poll();
+ sessionOrderEntity.add(objectEntity.getTopUrlEntity());
+ }
+ }
+ }
+
+
+ //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
+ context.timerService().registerEventTimeTimer(objectEntity.getStat_time() + 1);
+ }
+
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) {
+
+ for(TopUrlEntity en : sessionOrderEntity){
+
+ String jsonStr = JSONObject.toJSONString(en);
+ out.collect(jsonStr);
+
+ }
+
+
+ sessionOrderEntity.clear();
+
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/function/UrlAggregationReduce.java b/src/main/java/com/galaxy/tsg/function/UrlAggregationReduce.java
new file mode 100644
index 0000000..f9e1771
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/UrlAggregationReduce.java
@@ -0,0 +1,13 @@
+package com.galaxy.tsg.function;
+
+import com.galaxy.tsg.pojo.UrlEntity;
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+public class UrlAggregationReduce implements ReduceFunction {
+
+ @Override
+ public UrlEntity reduce(UrlEntity value1, UrlEntity value2) throws Exception {
+ value1.setCommon_sessions(value1.getCommon_sessions()+value2.getCommon_sessions());
+ return value1;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java b/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java
new file mode 100644
index 0000000..a15020a
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java
@@ -0,0 +1,18 @@
+package com.galaxy.tsg.function;
+
+import com.galaxy.tsg.pojo.Entity;
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+public class metricsAggregationReduce implements ReduceFunction {
+
+ @Override
+ public Entity reduce(Entity value1, Entity value2) throws Exception {
+ value1.setCommon_c2s_pkt_num(value1.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
+ value1.setCommon_s2c_pkt_num(value1.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
+ value1.setCommon_c2s_byte_num(value1.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
+ value1.setCommon_s2c_byte_num(value1.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
+ value1.setCommon_sessions(value1.getCommon_sessions()+value2.getCommon_sessions());
+
+ return value1;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/metricsCalculate.java b/src/main/java/com/galaxy/tsg/function/metricsCalculate.java
new file mode 100644
index 0000000..c5b1d49
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/metricsCalculate.java
@@ -0,0 +1,243 @@
+package com.galaxy.tsg.function;
+
+import com.galaxy.tsg.pojo.*;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.PriorityQueue;
+
+public class metricsCalculate extends ProcessWindowFunction<
+ Entity, // 输入类型
+ ResultEntity, // 输出类型
+ Tuple4, // 键类型
+ TimeWindow> { // 窗口类型
+ private final int topSize;
+ private final String key;
+ // Set allSet = new TreeSet() ;
+ private PriorityQueue sessionOrderEntity ;
+ private PriorityQueue packetOrderEntity ;
+ private PriorityQueue byteOrderEntity ;
+
+ public metricsCalculate(int i,String key) {
+ this.key = key;
+ this.topSize = i;
+ this.sessionOrderEntity=new PriorityQueue<>();
+ this.packetOrderEntity=new PriorityQueue<>();
+ this.byteOrderEntity=new PriorityQueue<>();
+ }
+
+ @Override
+ public void process(Tuple4 s,
+ Context context,
+ Iterable elements, Collector out) throws Exception {
+
+
+
+
+ for(Entity objectEntity : elements) {
+
+ if (sessionOrderEntity.size() < topSize) {
+ sessionOrderEntity.add(enrichessionResult(context.window().getEnd() / 1000, objectEntity));
+ } else {
+ if (sessionOrderEntity.peek() != null) {
+ SessionResultEntity res = sessionOrderEntity.peek();
+ if (res.getSession_num() <= objectEntity.getCommon_sessions()) {
+ sessionOrderEntity.poll();
+ sessionOrderEntity.add(enrichessionResult(context.window().getEnd() / 1000, objectEntity));
+ }
+ }
+ }
+
+
+ if (packetOrderEntity.size() < topSize) {
+ packetOrderEntity.add(enrichPacketResult(context.window().getEnd() / 1000, objectEntity));
+ } else {
+ if (packetOrderEntity.peek() != null) {
+ PacketResultEntity res = packetOrderEntity.peek();
+ if ((res.getS2c_pkt_num() + res.getC2s_pkt_num()) <= (objectEntity.getCommon_s2c_pkt_num() + objectEntity.getCommon_c2s_pkt_num())) {
+ packetOrderEntity.poll();
+ packetOrderEntity.add(enrichPacketResult(context.window().getEnd() / 1000, objectEntity));
+ }
+ }
+ }
+
+ if (byteOrderEntity.size() < topSize) {
+ byteOrderEntity.add(enrichByteResult(context.window().getEnd() / 1000, objectEntity));
+ } else {
+ if (byteOrderEntity.peek() != null) {
+ ByteResultEntity res = byteOrderEntity.peek();
+ if ((res.getS2c_byte_num() + res.getC2s_byte_num()) <= (objectEntity.getCommon_s2c_byte_num() + objectEntity.getCommon_c2s_byte_num())) {
+ byteOrderEntity.poll();
+ byteOrderEntity.add(enrichByteResult(context.window().getEnd() / 1000, objectEntity));
+ }
+ }
+ }
+
+ }
+
+ while (sessionOrderEntity.size() > 0) {
+ SessionResultEntity obj = sessionOrderEntity.peek();
+ //String jsonStr = JSONObject.toJSONString(en);
+ ResultEntity en = new ResultEntity();
+ en.setOrder_by("sessions");
+ en.setStat_time(context.window().getEnd() / 1000);
+ en.setSessionResultEntity(obj);
+ out.collect(en);
+ sessionOrderEntity.remove();
+ }
+
+
+ while (packetOrderEntity.size() > 0) {
+ PacketResultEntity obj = packetOrderEntity.peek();
+ ResultEntity en = new ResultEntity();
+ en.setOrder_by("packets");
+ en.setStat_time(context.window().getEnd() / 1000);
+ en.setPacketResultEntity(obj);
+ out.collect(en);
+ packetOrderEntity.remove();
+ }
+
+ while (byteOrderEntity.size() > 0) {
+ ByteResultEntity obj = byteOrderEntity.peek();
+ ResultEntity en = new ResultEntity();
+ en.setOrder_by("bytes");
+ en.setStat_time(context.window().getEnd() / 1000);
+ en.setByteResultEntity(obj);
+ out.collect(en);
+ byteOrderEntity.remove();
+ }
+
+ }
+
+ public ByteResultEntity enrichByteResult(Long time,Entity objectEntity) {
+ ByteResultEntity en = new ByteResultEntity();
+ en.setVsys_id(objectEntity.getCommon_vsys_id());
+ en.setStat_time(time);
+ en.setSource(objectEntity.getCommon_client_ip());
+ en.setSession_num(objectEntity.getCommon_sessions());
+ en.setOrder_by("bytes");
+ en.setDevice_group(objectEntity.getCommon_device_group());
+ en.setData_center(objectEntity.getCommon_data_center());
+ en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
+ en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
+ en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
+ en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
+ switch (key) {
+ case "common_client_ip":
+ en.setSource(objectEntity.getCommon_client_ip());
+ break;
+ case "common_server_ip":
+ en.setDestination(objectEntity.getCommon_server_ip());
+ break;
+ case "common_internal_ip":
+ en.setSource(objectEntity.getCommon_internal_ip());
+ break;
+ case "common_external_ip":
+ en.setDestination(objectEntity.getCommon_external_ip());
+ break;
+ case "http_domain":
+ en.setDomain(objectEntity.getHttp_domain());
+ break;
+
+ case "common_subscriber_id":
+ en.setSubscriber_id(objectEntity.getCommon_subscriber_id());
+ break;
+
+ case "common_app_label":
+ en.setApp_name(objectEntity.getCommon_app_label());
+ break;
+
+ default:
+
+ }
+ return en;
+
+ }
+ public SessionResultEntity enrichessionResult(Long time,Entity objectEntity){
+
+ SessionResultEntity en =new SessionResultEntity();
+ en.setVsys_id(objectEntity.getCommon_vsys_id());
+ en.setStat_time(time);
+ en.setSession_num(objectEntity.getCommon_sessions());
+ en.setOrder_by("sessions");
+ en.setDevice_group(objectEntity.getCommon_device_group());
+ en.setData_center(objectEntity.getCommon_data_center());
+ en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
+ en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
+ en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
+ en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
+ switch(key) {
+ case "common_client_ip":
+ en.setSource(objectEntity.getCommon_client_ip());
+ break;
+ case "common_server_ip":
+ en.setDestination(objectEntity.getCommon_server_ip());
+ break;
+ case "common_internal_ip":
+ en.setSource(objectEntity.getCommon_internal_ip());
+ break;
+ case "common_external_ip":
+ en.setDestination(objectEntity.getCommon_external_ip());
+ break;
+ case "http_domain":
+ en.setDomain(objectEntity.getHttp_domain());
+ break;
+
+ case "common_subscriber_id":
+ en.setSubscriber_id(objectEntity.getCommon_subscriber_id());
+ break;
+
+ case "common_app_label":
+ en.setApp_name(objectEntity.getCommon_app_label());
+ break;
+
+ default:
+
+
+
+ }
+ return en;
+ }
+ public PacketResultEntity enrichPacketResult(Long time,Entity objectEntity){
+ PacketResultEntity en =new PacketResultEntity();
+ en.setVsys_id(objectEntity.getCommon_vsys_id());
+ en.setStat_time(time);
+ en.setSource(objectEntity.getCommon_client_ip());
+ en.setSession_num(objectEntity.getCommon_sessions());
+ en.setOrder_by("packets");
+ en.setDevice_group(objectEntity.getCommon_device_group());
+ en.setData_center(objectEntity.getCommon_data_center());
+ en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
+ en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
+ en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
+ en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
+ switch(key) {
+ case "common_client_ip":
+ en.setSource(objectEntity.getCommon_client_ip());
+ break;
+ case "common_server_ip":
+ en.setDestination(objectEntity.getCommon_server_ip());
+ break;
+ case "common_internal_ip":
+ en.setSource(objectEntity.getCommon_internal_ip());
+ break;
+ case "common_external_ip":
+ en.setDestination(objectEntity.getCommon_external_ip());
+ break;
+ case "common_subscriber_id":
+ en.setSubscriber_id(objectEntity.getCommon_subscriber_id());
+ break;
+
+ case "common_app_label":
+ en.setApp_name(objectEntity.getCommon_app_label());
+ break;
+
+ default:
+
+
+ }
+ return en;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/function/metricsCalculateForApp.java b/src/main/java/com/galaxy/tsg/function/metricsCalculateForApp.java
new file mode 100644
index 0000000..7ca043d
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/metricsCalculateForApp.java
@@ -0,0 +1,43 @@
+package com.galaxy.tsg.function;
+
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.tsg.pojo.AppEntity;
+import com.galaxy.tsg.pojo.Entity;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+public class metricsCalculateForApp extends ProcessWindowFunction<
+ Entity, // 输入类型
+ String, // 输出类型
+ Tuple4, // 键类型
+ TimeWindow> { // 窗口类型
+
+
+
+ @Override
+ public void process(Tuple4 s,
+ Context context,
+ Iterable elements, Collector out) throws Exception {
+
+
+ for (Entity objectEntity: elements) {
+ AppEntity en =new AppEntity();
+ en.setVsys_id(objectEntity.getCommon_vsys_id());
+ en.setStat_time(context.window().getEnd() / 1000);
+ en.setSession_num(objectEntity.getCommon_sessions());
+ en.setApp_name(objectEntity.getCommon_app_label());
+ en.setDevice_group(objectEntity.getCommon_device_group());
+ en.setData_center(objectEntity.getCommon_data_center());
+ en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
+ en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
+ en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
+ en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
+ String jsonStr = JSONObject.toJSONString(en);
+ out.collect(jsonStr);
+ }
+
+
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/function/metricsCalculateForUrl.java b/src/main/java/com/galaxy/tsg/function/metricsCalculateForUrl.java
new file mode 100644
index 0000000..d2324a3
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/metricsCalculateForUrl.java
@@ -0,0 +1,70 @@
+package com.galaxy.tsg.function;
+
+import com.galaxy.tsg.pojo.ResultEntity;
+import com.galaxy.tsg.pojo.TopUrlEntity;
+import com.galaxy.tsg.pojo.UrlEntity;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.PriorityQueue;
+
+public class metricsCalculateForUrl extends ProcessWindowFunction<
+ UrlEntity, // 输入类型
+ ResultEntity, // 输出类型
+ Tuple2, // 键类型
+ TimeWindow> { // 窗口类型
+ private final int topSize;
+ // Set allSet = new TreeSet() ;
+ private PriorityQueue sessionOrderEntity;
+
+
+ public metricsCalculateForUrl(int i) {
+ this.topSize = i;
+ this.sessionOrderEntity = new PriorityQueue<>();
+
+ }
+
+ @Override
+ public void process(Tuple2 < String, Long > s,
+ Context context,
+ Iterable elements, Collector out) throws Exception {
+
+
+ for (UrlEntity objectEntity: elements) {
+ if (sessionOrderEntity.size() < topSize) {
+ TopUrlEntity en = new TopUrlEntity();
+ en.setSession_num(objectEntity.getCommon_sessions());
+ en.setStat_time(context.window().getEnd() / 1000);
+ en.setUrl(objectEntity.getHttp_url());
+ en.setVsys_id(objectEntity.getCommon_vsys_id());
+ sessionOrderEntity.add(en);
+ } else {
+ if (sessionOrderEntity.peek() != null) {
+ TopUrlEntity res = sessionOrderEntity.peek();
+ if (res.getSession_num() <= objectEntity.getCommon_sessions()) {
+ sessionOrderEntity.poll();
+ TopUrlEntity en = new TopUrlEntity();
+ en.setSession_num(objectEntity.getCommon_sessions());
+ en.setStat_time(context.window().getEnd() / 1000);
+ en.setUrl(objectEntity.getHttp_url());
+ en.setVsys_id(objectEntity.getCommon_vsys_id());
+ sessionOrderEntity.add(en);
+ }
+ }
+ }
+ }
+
+ while (sessionOrderEntity.size() > 0) {
+ TopUrlEntity obj = sessionOrderEntity.peek();
+ ResultEntity resultEntity = new ResultEntity();
+ resultEntity.setOrder_by("sessions");
+ resultEntity.setStat_time(context.window().getEnd() / 1000);
+ resultEntity.setTopUrlEntity(obj);
+ out.collect(resultEntity);
+ sessionOrderEntity.remove();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/pojo/AppEntity.java b/src/main/java/com/galaxy/tsg/pojo/AppEntity.java
new file mode 100644
index 0000000..b79c313
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/AppEntity.java
@@ -0,0 +1,95 @@
+package com.galaxy.tsg.pojo;
+
+public class AppEntity {
+
+ private Long vsys_id ;
+ private Long session_num ;
+ private Long c2s_pkt_num ;
+ private Long s2c_pkt_num;
+ private Long c2s_byte_num ;
+ private Long s2c_byte_num ;
+ private String device_group;
+ private String data_center;
+ private Long stat_time;
+ private String app_name;
+
+ public Long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(Long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public Long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(Long session_num) {
+ this.session_num = session_num;
+ }
+
+ public Long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(Long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public Long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(Long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public Long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(Long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public Long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(Long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public Long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(Long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public String getApp_name() {
+ return app_name;
+ }
+
+ public void setApp_name(String app_name) {
+ this.app_name = app_name;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java
new file mode 100644
index 0000000..5a3b7d2
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java
@@ -0,0 +1,152 @@
+package com.galaxy.tsg.pojo;
+
+public class ByteResultEntity implements Comparable {
+
+
+ private String source ;
+ private Long vsys_id ;
+ private Long session_num ;
+ private Long c2s_pkt_num ;
+ private Long s2c_pkt_num;
+ private Long c2s_byte_num ;
+ private Long s2c_byte_num ;
+ private String order_by;
+ private String device_group;
+ private String data_center;
+ private Long stat_time;
+ private String destination ;
+ private String domain;
+ private String subscriber_id;
+ private String app_name;
+
+ public String getApp_name() {
+ return app_name;
+ }
+
+ public void setApp_name(String app_name) {
+ this.app_name = app_name;
+ }
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public Long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(Long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public Long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(Long session_num) {
+ this.session_num = session_num;
+ }
+
+ public Long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(Long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public Long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(Long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public Long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(Long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public Long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(Long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public String getOrder_by() {
+ return order_by;
+ }
+
+ public void setOrder_by(String order_by) {
+ this.order_by = order_by;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public Long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(Long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ @Override
+ public int compareTo(ByteResultEntity per) {
+ if(this.session_num>=per.session_num){
+ return 1 ;
+ }else{
+ return -1 ;
+ }
+ }
+
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/Entity.java b/src/main/java/com/galaxy/tsg/pojo/Entity.java
new file mode 100644
index 0000000..be2b658
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/Entity.java
@@ -0,0 +1,201 @@
+package com.galaxy.tsg.pojo;
+
+import java.io.Serializable;
+
+public class Entity implements Serializable {
+
+ public int ifError;
+ public String common_client_ip ;
+ public String common_app_label ;
+ public long common_recv_time ;
+
+ public String common_schema_type ;
+ public String common_server_ip ;
+ public String http_host ;
+ public String http_domain ;
+ public long common_vsys_id ;
+ public String common_device_group ;
+ public String common_data_center;
+ public String common_l4_protocol;
+ public String common_internal_ip;
+ public String common_external_ip;
+ public String common_subscriber_id;
+ public long common_sessions;
+ public long common_c2s_pkt_num;
+ public long common_s2c_pkt_num;
+ public long common_c2s_byte_num ;
+ public long common_s2c_byte_num ;
+ public String key_by;
+
+ public Entity() {
+ }
+
+ public String getKey_by() {
+ return key_by;
+ }
+
+ public void setKey_by(String key_by) {
+ this.key_by = key_by;
+ }
+
+ public int getIfError() {
+ return ifError;
+ }
+
+ public void setIfError(int ifError) {
+ this.ifError = ifError;
+ }
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public String getCommon_app_label() {
+ return common_app_label;
+ }
+
+ public void setCommon_app_label(String common_app_label) {
+ this.common_app_label = common_app_label;
+ }
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public String getCommon_schema_type() {
+ return common_schema_type;
+ }
+
+ public void setCommon_schema_type(String common_schema_type) {
+ this.common_schema_type = common_schema_type;
+ }
+
+ public String getCommon_server_ip() {
+ return common_server_ip;
+ }
+
+ public void setCommon_server_ip(String common_server_ip) {
+ this.common_server_ip = common_server_ip;
+ }
+
+ public String getHttp_host() {
+ return http_host;
+ }
+
+ public void setHttp_host(String http_host) {
+ this.http_host = http_host;
+ }
+
+ public String getHttp_domain() {
+ return http_domain;
+ }
+
+ public void setHttp_domain(String http_domain) {
+ this.http_domain = http_domain;
+ }
+
+ public long getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(long common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
+
+ public String getCommon_device_group() {
+ return common_device_group;
+ }
+
+ public void setCommon_device_group(String common_device_group) {
+ this.common_device_group = common_device_group;
+ }
+
+
+ public String getCommon_data_center() {
+ return common_data_center;
+ }
+
+ public void setCommon_data_center(String common_data_center) {
+ this.common_data_center = common_data_center;
+ }
+
+ public String getCommon_l4_protocol() {
+ return common_l4_protocol;
+ }
+
+ public void setCommon_l4_protocol(String common_l4_protocol) {
+ this.common_l4_protocol = common_l4_protocol;
+ }
+
+ public String getCommon_internal_ip() {
+ return common_internal_ip;
+ }
+
+ public void setCommon_internal_ip(String common_internal_ip) {
+ this.common_internal_ip = common_internal_ip;
+ }
+
+ public String getCommon_external_ip() {
+ return common_external_ip;
+ }
+
+ public void setCommon_external_ip(String common_external_ip) {
+ this.common_external_ip = common_external_ip;
+ }
+
+ public String getCommon_subscriber_id() {
+ return common_subscriber_id;
+ }
+
+ public void setCommon_subscriber_id(String common_subscriber_id) {
+ this.common_subscriber_id = common_subscriber_id;
+ }
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+
+ public long getCommon_c2s_pkt_num() {
+ return common_c2s_pkt_num;
+ }
+
+ public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
+ this.common_c2s_pkt_num = common_c2s_pkt_num;
+ }
+
+ public long getCommon_s2c_pkt_num() {
+ return common_s2c_pkt_num;
+ }
+
+ public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
+ this.common_s2c_pkt_num = common_s2c_pkt_num;
+ }
+
+ public long getCommon_c2s_byte_num() {
+ return common_c2s_byte_num;
+ }
+
+ public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
+ this.common_c2s_byte_num = common_c2s_byte_num;
+ }
+
+ public long getCommon_s2c_byte_num() {
+ return common_s2c_byte_num;
+ }
+
+ public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
+ this.common_s2c_byte_num = common_s2c_byte_num;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java
new file mode 100644
index 0000000..9d0ab39
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java
@@ -0,0 +1,163 @@
+package com.galaxy.tsg.pojo;
+
+public class PacketResultEntity implements Comparable, Cloneable {
+
+
+ private String source ;
+ private Long vsys_id ;
+ private Long session_num ;
+ private Long c2s_pkt_num ;
+ private Long s2c_pkt_num;
+ private Long c2s_byte_num ;
+ private Long s2c_byte_num ;
+ private String order_by;
+ private String device_group;
+ private String data_center;
+ private Long stat_time;
+
+ private String destination ;
+ private String domain;
+ private String subscriber_id;
+ private String app_name;
+
+ public String getApp_name() {
+ return app_name;
+ }
+
+ public void setApp_name(String app_name) {
+ this.app_name = app_name;
+ }
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public Long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(Long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public Long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(Long session_num) {
+ this.session_num = session_num;
+ }
+
+ public Long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(Long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public Long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(Long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public Long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(Long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public Long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(Long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public String getOrder_by() {
+ return order_by;
+ }
+
+ public void setOrder_by(String order_by) {
+ this.order_by = order_by;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public Long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(Long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ @Override
+ public int compareTo(PacketResultEntity per) {
+ if(this.session_num>=per.session_num){
+ return 1 ;
+ }else{
+ return -1 ;
+ }
+ }
+
+
+ /* @Override public Object clone() {
+ PacketResultEntity obj;
+ try {
+ obj = (PacketResultEntity) super.clone();
+ } catch (CloneNotSupportedException e) {
+ obj = new PacketResultEntity(this.source,this.vsys_id,this.session_num,this.c2s_pkt_num,this.s2c_pkt_num,this.c2s_byte_num,
+ this.s2c_byte_num,this.order_by,this.device_group,this.data_center,this.stat_time);
+ }
+ return obj;
+ }
+*/
+
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java
new file mode 100644
index 0000000..4abed44
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java
@@ -0,0 +1,63 @@
+package com.galaxy.tsg.pojo;
+
+public class ResultEntity {
+
+
+
+ private String order_by;
+ private Long stat_time;
+
+ private SessionResultEntity sessionResultEntity;
+ private PacketResultEntity packetResultEntity;
+ private ByteResultEntity byteResultEntity;
+ private TopUrlEntity topUrlEntity;
+
+
+ public String getOrder_by() {
+ return order_by;
+ }
+
+ public TopUrlEntity getTopUrlEntity() {
+ return topUrlEntity;
+ }
+
+ public void setTopUrlEntity(TopUrlEntity topUrlEntity) {
+ this.topUrlEntity = topUrlEntity;
+ }
+
+ public void setOrder_by(String order_by) {
+ this.order_by = order_by;
+ }
+
+ public Long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(Long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public SessionResultEntity getSessionResultEntity() {
+ return sessionResultEntity;
+ }
+
+ public void setSessionResultEntity(SessionResultEntity sessionResultEntity) {
+ this.sessionResultEntity = sessionResultEntity;
+ }
+
+ public PacketResultEntity getPacketResultEntity() {
+ return packetResultEntity;
+ }
+
+ public void setPacketResultEntity(PacketResultEntity packetResultEntity) {
+ this.packetResultEntity = packetResultEntity;
+ }
+
+ public ByteResultEntity getByteResultEntity() {
+ return byteResultEntity;
+ }
+
+ public void setByteResultEntity(ByteResultEntity byteResultEntity) {
+ this.byteResultEntity = byteResultEntity;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java
new file mode 100644
index 0000000..7928565
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java
@@ -0,0 +1,151 @@
+package com.galaxy.tsg.pojo;
+
+public class SessionResultEntity implements Comparable {
+
+
+ private String source ;
+ private Long vsys_id ;
+ private Long session_num ;
+ private Long c2s_pkt_num ;
+ private Long s2c_pkt_num;
+ private Long c2s_byte_num ;
+ private Long s2c_byte_num ;
+ private String order_by;
+ private String device_group;
+ private String data_center;
+ private Long stat_time;
+ private String destination ;
+ private String domain;
+ private String subscriber_id;
+ private String app_name;
+
+ public String getApp_name() {
+ return app_name;
+ }
+
+ public void setApp_name(String app_name) {
+ this.app_name = app_name;
+ }
+
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+ public String getDomain() {
+ return domain;
+ }
+
+ public void setDomain(String domain) {
+ this.domain = domain;
+ }
+ public String getDestination() {
+ return destination;
+ }
+
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public Long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(Long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public Long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(Long session_num) {
+ this.session_num = session_num;
+ }
+
+ public Long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(Long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public Long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(Long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public Long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(Long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public Long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(Long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public String getOrder_by() {
+ return order_by;
+ }
+
+ public void setOrder_by(String order_by) {
+ this.order_by = order_by;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public Long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(Long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ @Override
+ public int compareTo(SessionResultEntity per) {
+ if(this.session_num>=per.session_num){
+ return 1 ;
+ }else{
+ return -1 ;
+ }
+ }
+
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/TopUrlEntity.java b/src/main/java/com/galaxy/tsg/pojo/TopUrlEntity.java
new file mode 100644
index 0000000..0ce14cd
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/TopUrlEntity.java
@@ -0,0 +1,54 @@
+package com.galaxy.tsg.pojo;
+
+public class TopUrlEntity implements Comparable{
+
+
+ public long stat_time ;
+
+ public String url ;
+
+ public long vsys_id ;
+
+ public long session_num;
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ @Override
+ public int compareTo(TopUrlEntity per) {
+ if(this.session_num>=per.session_num){
+ return 1 ;
+ }else{
+ return -1 ;
+ }
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/UrlEntity.java b/src/main/java/com/galaxy/tsg/pojo/UrlEntity.java
new file mode 100644
index 0000000..e0aa5e3
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/UrlEntity.java
@@ -0,0 +1,55 @@
+package com.galaxy.tsg.pojo;
+
+public class UrlEntity {
+
+
+ public long common_recv_time ;
+
+ public String http_url ;
+
+ public long common_vsys_id ;
+
+ public long common_sessions;
+
+ public int ifError;
+
+ public int getIfError() {
+ return ifError;
+ }
+
+ public void setIfError(int ifError) {
+ this.ifError = ifError;
+ }
+
+ public long getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(long common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public String getHttp_url() {
+ return http_url;
+ }
+
+ public void setHttp_url(String http_url) {
+ this.http_url = http_url;
+ }
+
+ public long getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(long common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
new file mode 100644
index 0000000..b374a2b
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
@@ -0,0 +1,109 @@
+package com.galaxy.tsg.util;
+
+import com.galaxy.tsg.config.commonConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+public class KafkaUtils {
+
+
+ public static Properties getKafkaSourceProperty() {
+ Properties properties = new Properties();
+ properties.setProperty("group.id", commonConfig.KAFKA_CONSUMER_GROUP_ID);
+ properties.setProperty("bootstrap.servers", commonConfig.KAFKA_CONSUMER_BROKER);
+ properties.setProperty("session.timeout.ms", commonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS);
+ properties.setProperty("max.poll.records", commonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD);
+ properties.setProperty("max.partition.fetch.bytes", commonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES);
+
+ switch (commonConfig.KAFKA_SECURITY) {
+ case 1:
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", commonConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", commonConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", commonConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", commonConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", commonConfig.KAFKA_PIN);
+ break;
+ case 2:
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + commonConfig.KAFKA_USER + " password=" + commonConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ return properties;
+ }
+
+ private static Properties getKafkaSinkProperty() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", commonConfig.KAFKA_PRODUCER_BROKER);
+
+ properties.put("acks", "1");
+ properties.put("retries", commonConfig.KAFKA_PRODUCER_RETRIES);
+ properties.put("linger.ms", commonConfig.KAFKA_PRODUCER_LINGER_MS);
+ properties.put("request.timeout.ms", commonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", commonConfig.KAFKA_PRODUCER_BATCH_SIZE);
+ properties.put("buffer.memory", commonConfig.KAFKA_PRODUCER_BUFFER_MEMORY);
+ properties.put("max.request.size", commonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE);
+ properties.put("compression.type", commonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE);
+
+ switch (commonConfig.KAFKA_SECURITY) {
+ case 1:
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", commonConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", commonConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", commonConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", commonConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", commonConfig.KAFKA_PIN);
+ break;
+ case 2:
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + commonConfig.KAFKA_USER + " password=" + commonConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer getKafkaConsumer(String topic) {
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new SimpleStringSchema(), getKafkaSourceProperty());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+ public static FlinkKafkaConsumer getKafkaConsumerLists(List topic) {
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new SimpleStringSchema(), getKafkaSourceProperty());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+ public static SinkFunction getKafkaSink(String topic) {
+ return new FlinkKafkaProducer(
+ topic,
+ new SimpleStringSchema(),
+ getKafkaSinkProperty(),
+ Optional.empty()
+ );
+ }
+
+
+}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
new file mode 100644
index 0000000..f6a9a9d
--- /dev/null
+++ b/src/main/resources/common.properties
@@ -0,0 +1,59 @@
+#kafkaĵַϢ
+kafka.consumer.broker=192.168.44.11:9092
+kafka.consumer.group.id =vpn-1206-27
+kafka.consumer.topic=SESSION-RECORD-COMPLETED
+kafka.consumer.parallelism=5
+kafka.consumer.max.poll.records=3000
+kafka.consumer.session.timeout.ms=60000
+kafka.consumer.max.partition.fetch.bytes=31457280
+kafka.consumer.topurl.parallelism=5
+
+#ж
+task.parallelism=5
+#ӳٵȴʱ䵥λ
+watermark.time=100
+#top
+top.limit=100
+
+url.top.limit=100
+#ʱ䵥λ
+window.time.minute=5
+#ÿʱ䵥λ
+#kafkaǷȫ֤ 0 1SSL 2 SASL
+kafka.security=0
+#kafka SASL֤û
+kafka.user=admin
+#kafka SASLSSL֤
+kafka.pin=galaxy2019
+#1SSLҪ
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+
+
+
+#producerԵĴ
+kafka.producer.retries=0
+
+#ĺ˵һBatch֮ãBatchûд뷢ͳȥ
+kafka.producer.linger.ms=10
+
+#ڳʱ֮ǰδյӦͻ˽ڱҪʱ·
+kafka.producer.request.timeout.ms=30000
+
+#producerǰbatchз͵,δСĬ:16384
+kafka.producer.batch.size=262144
+
+#ProducerڻϢĻС
+#128M
+kafka.producer.buffer.memory=134217728
+
+#ÿηKafkaС,Ĭ1048576
+#10M
+kafka.producer.max.request.size=10485760
+
+#kafkaѹͣĬϲ
+kafka.producer.compression.type=none
+
+kafka_producer_broker=192.168.44.12:9092
+
+
+tmp.test.type=1
\ No newline at end of file
diff --git a/src/test/java/com/galaxy/tsg/catalog/CatalogTest.java b/src/test/java/com/galaxy/tsg/catalog/CatalogTest.java
new file mode 100644
index 0000000..bb7ef2c
--- /dev/null
+++ b/src/test/java/com/galaxy/tsg/catalog/CatalogTest.java
@@ -0,0 +1,7 @@
+package com.galaxy.tsg.catalog;
+
+public class CatalogTest {
+ public static void main(String[] args) {
+
+ }
+}