topn_metrics基础功能实现

This commit is contained in:
kingwide
2023-02-28 15:26:17 +08:00
parent 6b401318a4
commit 4965ac0231
22 changed files with 2340 additions and 0 deletions

View File

@@ -0,0 +1,297 @@
package com.galaxy.tsg;
import com.alibaba.fastjson.JSON;
import com.galaxy.tsg.function.*;
import com.galaxy.tsg.pojo.Entity;
import com.galaxy.tsg.pojo.ResultEntity;
import com.galaxy.tsg.pojo.UrlEntity;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import static com.galaxy.tsg.config.commonConfig.*;
import static com.galaxy.tsg.util.KafkaUtils.*;
public class Toptask {
private static final Logger LOG = LoggerFactory.getLogger(Toptask.class);
public static void main(String[] args) throws Exception {
//1.创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定使用事件时间
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> sourceForSession = env.addSource(getKafkaConsumer("SESSION-RECORD-COMPLETED")).setParallelism(KAFKA_CONSUMER_PARALLELISM);
WatermarkStrategy<Entity> strategyForSession = WatermarkStrategy
.<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME))
.withTimestampAssigner((Entity, timestamp) -> Entity.getCommon_recv_time() * 1000);
List<String> topics = new LinkedList<>();
topics.add("SECURITY-EVENT-COMPLETED");
topics.add("PROXY-EVENT-COMPLETED");
DataStream<String> sourceForUrl = env.addSource(getKafkaConsumerLists(topics)).setParallelism(KAFKA_CONSUMER_TOPURL_PARALLELISM);
WatermarkStrategy<UrlEntity> strategyForSecurity = WatermarkStrategy
.<UrlEntity>forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME))
.withTimestampAssigner((UrlEntity, timestamp) -> UrlEntity.getCommon_recv_time() * 1000);
SingleOutputStreamOperator<Entity> inputForSession = sourceForSession.map(new MapFunction<String, Entity>() {
@Override
public Entity map(String message) {
Entity entity = new Entity();
try {
entity = JSON.parseObject(message, Entity.class);
} catch (Exception e) {
LOG.error("Entity Parsing ERROR");
entity.setIfError(1);
}
return entity;
}
}).filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity entity) throws Exception {
return entity.ifError != 1;
}
});
SingleOutputStreamOperator<UrlEntity> inputForUrl = sourceForUrl.map(new MapFunction<String, UrlEntity>() {
@Override
public UrlEntity map(String message) {
UrlEntity entity = new UrlEntity();
try {
entity = JSON.parseObject(message, UrlEntity.class);
} catch (Exception e) {
LOG.error("Entity Parsing ERROR");
entity.setIfError(1);
}
return entity;
}
}).filter(new FilterFunction<UrlEntity>() {
@Override
public boolean filter(UrlEntity entity) throws Exception {
return entity.ifError != 1;
}
});
switch (TMP_TEST_TYPE) {
case 1:
//clientip聚合TOP
SingleOutputStreamOperator<Entity> clientipdStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStream = clientipdStream.keyBy(new groupBySelector("common_client_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStream = windowedStream.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStream.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//serverip聚合TOP
SingleOutputStreamOperator<Entity> serveripdStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForServerIp = serveripdStream.keyBy(new groupBySelector("common_server_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStreamForServerIp = windowedStreamForServerIp.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStreamForServerIp.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
//common_internal_ip聚合TOP
SingleOutputStreamOperator<Entity> internalStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_internal_ip());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForInternal = internalStream.keyBy(new groupBySelector("common_internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForInternal = windowedStreamForInternal.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForInternal.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
//common_external_ip聚合TOP
SingleOutputStreamOperator<Entity> externalStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_external_ip());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForExternal = externalStream.keyBy(new groupBySelector("common_external_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForExternal = windowedStreamForExternal.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForExternal.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
//http_domain聚合TOP
SingleOutputStreamOperator<Entity> domainStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getHttp_domain());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForDomain = domainStream.keyBy(new groupBySelector("http_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForDomain = windowedStreamForDomain.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForDomain.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
SingleOutputStreamOperator<Entity> userStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_subscriber_id());
}
}).assignTimestampsAndWatermarks(strategyForSession);
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<ResultEntity> windowedStreamForUser = userStream.keyBy(new groupBySelector("common_subscriber_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUser = windowedStreamForUser.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForUser.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
SingleOutputStreamOperator<Entity> appNameStream = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_app_label());
}
}).assignTimestampsAndWatermarks(strategyForSession);
//common_app_label聚合求全量
appNameStream.keyBy(new groupBySelector("common_app_label"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
SingleOutputStreamOperator<UrlEntity> UrlStream = inputForUrl.filter(new FilterFunction<UrlEntity>() {
@Override
public boolean filter(UrlEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getHttp_url());
}
}).assignTimestampsAndWatermarks(strategyForSecurity);
//url聚合session求top
SingleOutputStreamOperator<ResultEntity> windowedStreamForUrl = UrlStream.keyBy(new twoKeySelector())
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new UrlAggregationReduce(), new metricsCalculateForUrl(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUrl = windowedStreamForUrl.keyBy(new oneKeySelector())
.process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1);
WindoweddStreamForUrl.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
break;
case 2:
//datasketch
break;
}
env.execute("TOP-task");
}
public static class groupBySelector implements KeySelector<Entity, Tuple4<String, Long, String, String>> {
public String key;
public groupBySelector(String key) {
this.key = key;
}
@Override
public Tuple4<String, Long, String, String> getKey(Entity entity) throws Exception {
Tuple4<String, Long, String, String> tuple = null;
switch (key) {
case "common_client_ip":
tuple = new Tuple4<>(entity.getCommon_client_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
case "common_server_ip":
tuple = new Tuple4<>(entity.getCommon_server_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
case "common_internal_ip":
tuple = new Tuple4<>(entity.getCommon_internal_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
case "common_external_ip":
tuple = new Tuple4<>(entity.getCommon_external_ip(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
case "http_domain":
tuple = new Tuple4<>(entity.getHttp_domain(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
case "common_subscriber_id":
tuple = new Tuple4<>(entity.getCommon_subscriber_id(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
case "common_app_label":
tuple = new Tuple4<>(entity.getCommon_app_label(), entity.getCommon_vsys_id(), entity.getCommon_device_group(), entity.getCommon_data_center());
break;
default:
}
return tuple;
}
}
public static class oneKeySelector implements KeySelector<ResultEntity, Tuple1<String>> {
@Override
public Tuple1<String> getKey(ResultEntity entity) throws Exception {
return new Tuple1<>(entity.getOrder_by());
}
}
public static class twoKeySelector implements KeySelector<UrlEntity, Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> getKey(UrlEntity entity) throws Exception {
return new Tuple2<>(entity.getHttp_url(), entity.getCommon_vsys_id());
}
}
}

View File

@@ -0,0 +1,62 @@
package com.galaxy.tsg.config;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author Administrator
*/
public final class CommonConfigurations {
private static Properties propService = new Properties();
public static Map<String,String> getHashTableProperty(String key) {
Map<String,String> map = new HashMap<>();
String[] keyarray = propService.getProperty(key).split(",");
for(String k :keyarray){
if(k!=null && !"".equals(k.trim())){
map.put(k,"");
}
}
return map;
}
public static String getStringProperty(String key) {
return propService.getProperty(key);
}
public static Integer getIntProperty( String key) {
return Integer.parseInt(propService.getProperty(key));
}
public static Long getLongProperty(String key) {
return Long.parseLong(propService.getProperty(key));
}
public static Boolean getBooleanProperty(String key) {
return "true".equals(propService.getProperty(key).toLowerCase().trim());
}
static {
try {
propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
} catch (Exception e) {
propService = null;
}
}
}

View File

@@ -0,0 +1,49 @@
package com.galaxy.tsg.config;
/**
* Created by wk on 2021/1/6.
*/
public class commonConfig {
public static final String KAFKA_CONSUMER_BROKER = CommonConfigurations.getStringProperty("kafka.consumer.broker");
public static final String KAFKA_CONSUMER_GROUP_ID = CommonConfigurations.getStringProperty("kafka.consumer.group.id");
public static final String KAFKA_CONSUMER_TOPIC = CommonConfigurations.getStringProperty("kafka.consumer.topic");
public static final int KAFKA_CONSUMER_PARALLELISM = CommonConfigurations.getIntProperty("kafka.consumer.parallelism");
public static final String KAFKA_CONSUMER_SESSION_TIMEOUT_MS=CommonConfigurations.getStringProperty("kafka.consumer.session.timeout.ms");
public static final String KAFKA_CONSUMER_MAX_POLL_RECORD=CommonConfigurations.getStringProperty("kafka.consumer.max.poll.records");
public static final String KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES=CommonConfigurations.getStringProperty("kafka.consumer.max.partition.fetch.bytes");
public static final int KAFKA_CONSUMER_TOPURL_PARALLELISM=CommonConfigurations.getIntProperty("kafka.consumer.topurl.parallelism");
public static final String KAFKA_PRODUCER_RETRIES = CommonConfigurations.getStringProperty("kafka.producer.retries");
public static final String KAFKA_PRODUCER_LINGER_MS = CommonConfigurations.getStringProperty("kafka.producer.linger.ms");
public static final String KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = CommonConfigurations.getStringProperty("kafka.producer.request.timeout.ms");
public static final String KAFKA_PRODUCER_BATCH_SIZE = CommonConfigurations.getStringProperty("kafka.producer.batch.size");
public static final String KAFKA_PRODUCER_BUFFER_MEMORY = CommonConfigurations.getStringProperty("kafka.producer.buffer.memory");
public static final String KAFKA_PRODUCER_MAX_REQUEST_SIZE = CommonConfigurations.getStringProperty("kafka.producer.max.request.size");
public static final String KAFKA_PRODUCER_COMPRESSION_TYPE = CommonConfigurations.getStringProperty("kafka.producer.compression.type");
public static final int TASK_PARALLELISM = CommonConfigurations.getIntProperty("task.parallelism");
public static final int WATERMARK_TIME = CommonConfigurations.getIntProperty("watermark.time");
public static final int WINDOW_TIME_MINUTE = CommonConfigurations.getIntProperty("window.time.minute");
public static final int TOP_LIMIT = CommonConfigurations.getIntProperty("top.limit");
public static final int URL_TOP_LIMIT = CommonConfigurations.getIntProperty("url.top.limit");
public static final String KAFKA_USER = CommonConfigurations.getStringProperty("kafka.user");
public static final String KAFKA_PIN = CommonConfigurations.getStringProperty("kafka.pin");
public static final int KAFKA_SECURITY = CommonConfigurations.getIntProperty("kafka.security");
public static final String TOOLS_LIBRARY = CommonConfigurations.getStringProperty("tools.library");
public static final String KAFKA_PRODUCER_BROKER = CommonConfigurations.getStringProperty("kafka_producer_broker");
public static final int TMP_TEST_TYPE = CommonConfigurations.getIntProperty("tmp.test.type");
}

View File

@@ -0,0 +1,128 @@
package com.galaxy.tsg.function;
import com.alibaba.fastjson.JSONObject;
import com.galaxy.tsg.pojo.ByteResultEntity;
import com.galaxy.tsg.pojo.PacketResultEntity;
import com.galaxy.tsg.pojo.ResultEntity;
import com.galaxy.tsg.pojo.SessionResultEntity;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.PriorityQueue;
public class TopNHotItems extends KeyedProcessFunction<Tuple1<String>, ResultEntity, String> {
private final int topSize;
// Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
private PriorityQueue<SessionResultEntity> sessionOrderEntity ;
private PriorityQueue<PacketResultEntity> packetOrderEntity ;
private PriorityQueue<ByteResultEntity> byteOrderEntity ;
public TopNHotItems(int i) {
this.topSize = i;
}
@Override
public void open(Configuration parameters) {
this.sessionOrderEntity=new PriorityQueue<>();
this.packetOrderEntity=new PriorityQueue<>();
this.byteOrderEntity=new PriorityQueue<>();
}
@Override
public void processElement(ResultEntity objectEntity, Context context, Collector<String> collector) {
switch(objectEntity.getOrder_by()) {
case "sessions":
if (sessionOrderEntity.size() < topSize) {
sessionOrderEntity.add(objectEntity.getSessionResultEntity());
} else {
if (sessionOrderEntity.peek() != null) {
SessionResultEntity res=sessionOrderEntity.peek();
if (res.getSession_num() <= objectEntity.getSessionResultEntity().getSession_num()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(objectEntity.getSessionResultEntity());
}
}
}
break;
case "packets":
if (packetOrderEntity.size() < topSize) {
packetOrderEntity.add(objectEntity.getPacketResultEntity());
} else {
if (packetOrderEntity.peek() != null) {
PacketResultEntity res=packetOrderEntity.peek();
if ((res.getS2c_pkt_num()+res.getC2s_pkt_num()) <= (objectEntity.getPacketResultEntity().getC2s_pkt_num()+objectEntity.getPacketResultEntity().getS2c_pkt_num())) {
packetOrderEntity.poll();
packetOrderEntity.add(objectEntity.getPacketResultEntity());
}
}
}
break;
case "bytes":
if (byteOrderEntity.size() < topSize) {
byteOrderEntity.add(objectEntity.getByteResultEntity());
} else {
if (byteOrderEntity.peek() != null) {
ByteResultEntity res=byteOrderEntity.peek();
if ((res.getC2s_byte_num()+res.getS2c_byte_num()) <= (objectEntity.getByteResultEntity().getS2c_byte_num()+objectEntity.getByteResultEntity().getC2s_byte_num())) {
byteOrderEntity.poll();
byteOrderEntity.add(objectEntity.getByteResultEntity());
}
}
}
break;
default:
}
//注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
context.timerService().registerEventTimeTimer(objectEntity.getStat_time() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
for(SessionResultEntity en : sessionOrderEntity){
String jsonStr = JSONObject.toJSONString(en);
out.collect(jsonStr);
}
for(PacketResultEntity en : packetOrderEntity){
String jsonStr = JSONObject.toJSONString(en);
out.collect(jsonStr);
}
for(ByteResultEntity en : byteOrderEntity){
String jsonStr = JSONObject.toJSONString(en);
out.collect(jsonStr);
}
sessionOrderEntity.clear();
packetOrderEntity.clear();
byteOrderEntity.clear();
}
}

View File

@@ -0,0 +1,68 @@
package com.galaxy.tsg.function;
import com.alibaba.fastjson.JSONObject;
import com.galaxy.tsg.pojo.ResultEntity;
import com.galaxy.tsg.pojo.TopUrlEntity;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.PriorityQueue;
public class TopNHotItemsForUrl extends KeyedProcessFunction<Tuple1<String>, ResultEntity, String> {
private final int topSize;
// Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
private PriorityQueue<TopUrlEntity> sessionOrderEntity ;
public TopNHotItemsForUrl(int i) {
this.topSize = i;
}
@Override
public void open(Configuration parameters) {
this.sessionOrderEntity=new PriorityQueue<>();
}
@Override
public void processElement(ResultEntity objectEntity, Context context, Collector<String> collector) {
if (sessionOrderEntity.size() < topSize) {
sessionOrderEntity.add(objectEntity.getTopUrlEntity());
} else {
if (sessionOrderEntity.peek() != null) {
TopUrlEntity res=sessionOrderEntity.peek();
if (res.getSession_num() <= objectEntity.getTopUrlEntity().getSession_num()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(objectEntity.getTopUrlEntity());
}
}
}
//注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
context.timerService().registerEventTimeTimer(objectEntity.getStat_time() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
for(TopUrlEntity en : sessionOrderEntity){
String jsonStr = JSONObject.toJSONString(en);
out.collect(jsonStr);
}
sessionOrderEntity.clear();
}
}

View File

@@ -0,0 +1,13 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.UrlEntity;
import org.apache.flink.api.common.functions.ReduceFunction;
public class UrlAggregationReduce implements ReduceFunction<UrlEntity> {
@Override
public UrlEntity reduce(UrlEntity value1, UrlEntity value2) throws Exception {
value1.setCommon_sessions(value1.getCommon_sessions()+value2.getCommon_sessions());
return value1;
}
}

View File

@@ -0,0 +1,18 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.Entity;
import org.apache.flink.api.common.functions.ReduceFunction;
public class metricsAggregationReduce implements ReduceFunction<Entity> {
@Override
public Entity reduce(Entity value1, Entity value2) throws Exception {
value1.setCommon_c2s_pkt_num(value1.getCommon_c2s_pkt_num() + value2.getCommon_c2s_pkt_num());
value1.setCommon_s2c_pkt_num(value1.getCommon_s2c_pkt_num() + value2.getCommon_s2c_pkt_num());
value1.setCommon_c2s_byte_num(value1.getCommon_c2s_byte_num() + value2.getCommon_c2s_byte_num());
value1.setCommon_s2c_byte_num(value1.getCommon_s2c_byte_num() + value2.getCommon_s2c_byte_num());
value1.setCommon_sessions(value1.getCommon_sessions()+value2.getCommon_sessions());
return value1;
}
}

View File

@@ -0,0 +1,243 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.*;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.PriorityQueue;
public class metricsCalculate extends ProcessWindowFunction<
Entity, // 输入类型
ResultEntity, // 输出类型
Tuple4<String,Long,String,String>, // 键类型
TimeWindow> { // 窗口类型
private final int topSize;
private final String key;
// Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
private PriorityQueue<SessionResultEntity> sessionOrderEntity ;
private PriorityQueue<PacketResultEntity> packetOrderEntity ;
private PriorityQueue<ByteResultEntity> byteOrderEntity ;
public metricsCalculate(int i,String key) {
this.key = key;
this.topSize = i;
this.sessionOrderEntity=new PriorityQueue<>();
this.packetOrderEntity=new PriorityQueue<>();
this.byteOrderEntity=new PriorityQueue<>();
}
@Override
public void process(Tuple4<String,Long,String,String> s,
Context context,
Iterable<Entity> elements, Collector<ResultEntity> out) throws Exception {
for(Entity objectEntity : elements) {
if (sessionOrderEntity.size() < topSize) {
sessionOrderEntity.add(enrichessionResult(context.window().getEnd() / 1000, objectEntity));
} else {
if (sessionOrderEntity.peek() != null) {
SessionResultEntity res = sessionOrderEntity.peek();
if (res.getSession_num() <= objectEntity.getCommon_sessions()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(enrichessionResult(context.window().getEnd() / 1000, objectEntity));
}
}
}
if (packetOrderEntity.size() < topSize) {
packetOrderEntity.add(enrichPacketResult(context.window().getEnd() / 1000, objectEntity));
} else {
if (packetOrderEntity.peek() != null) {
PacketResultEntity res = packetOrderEntity.peek();
if ((res.getS2c_pkt_num() + res.getC2s_pkt_num()) <= (objectEntity.getCommon_s2c_pkt_num() + objectEntity.getCommon_c2s_pkt_num())) {
packetOrderEntity.poll();
packetOrderEntity.add(enrichPacketResult(context.window().getEnd() / 1000, objectEntity));
}
}
}
if (byteOrderEntity.size() < topSize) {
byteOrderEntity.add(enrichByteResult(context.window().getEnd() / 1000, objectEntity));
} else {
if (byteOrderEntity.peek() != null) {
ByteResultEntity res = byteOrderEntity.peek();
if ((res.getS2c_byte_num() + res.getC2s_byte_num()) <= (objectEntity.getCommon_s2c_byte_num() + objectEntity.getCommon_c2s_byte_num())) {
byteOrderEntity.poll();
byteOrderEntity.add(enrichByteResult(context.window().getEnd() / 1000, objectEntity));
}
}
}
}
while (sessionOrderEntity.size() > 0) {
SessionResultEntity obj = sessionOrderEntity.peek();
//String jsonStr = JSONObject.toJSONString(en);
ResultEntity en = new ResultEntity();
en.setOrder_by("sessions");
en.setStat_time(context.window().getEnd() / 1000);
en.setSessionResultEntity(obj);
out.collect(en);
sessionOrderEntity.remove();
}
while (packetOrderEntity.size() > 0) {
PacketResultEntity obj = packetOrderEntity.peek();
ResultEntity en = new ResultEntity();
en.setOrder_by("packets");
en.setStat_time(context.window().getEnd() / 1000);
en.setPacketResultEntity(obj);
out.collect(en);
packetOrderEntity.remove();
}
while (byteOrderEntity.size() > 0) {
ByteResultEntity obj = byteOrderEntity.peek();
ResultEntity en = new ResultEntity();
en.setOrder_by("bytes");
en.setStat_time(context.window().getEnd() / 1000);
en.setByteResultEntity(obj);
out.collect(en);
byteOrderEntity.remove();
}
}
public ByteResultEntity enrichByteResult(Long time,Entity objectEntity) {
ByteResultEntity en = new ByteResultEntity();
en.setVsys_id(objectEntity.getCommon_vsys_id());
en.setStat_time(time);
en.setSource(objectEntity.getCommon_client_ip());
en.setSession_num(objectEntity.getCommon_sessions());
en.setOrder_by("bytes");
en.setDevice_group(objectEntity.getCommon_device_group());
en.setData_center(objectEntity.getCommon_data_center());
en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
switch (key) {
case "common_client_ip":
en.setSource(objectEntity.getCommon_client_ip());
break;
case "common_server_ip":
en.setDestination(objectEntity.getCommon_server_ip());
break;
case "common_internal_ip":
en.setSource(objectEntity.getCommon_internal_ip());
break;
case "common_external_ip":
en.setDestination(objectEntity.getCommon_external_ip());
break;
case "http_domain":
en.setDomain(objectEntity.getHttp_domain());
break;
case "common_subscriber_id":
en.setSubscriber_id(objectEntity.getCommon_subscriber_id());
break;
case "common_app_label":
en.setApp_name(objectEntity.getCommon_app_label());
break;
default:
}
return en;
}
public SessionResultEntity enrichessionResult(Long time,Entity objectEntity){
SessionResultEntity en =new SessionResultEntity();
en.setVsys_id(objectEntity.getCommon_vsys_id());
en.setStat_time(time);
en.setSession_num(objectEntity.getCommon_sessions());
en.setOrder_by("sessions");
en.setDevice_group(objectEntity.getCommon_device_group());
en.setData_center(objectEntity.getCommon_data_center());
en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
switch(key) {
case "common_client_ip":
en.setSource(objectEntity.getCommon_client_ip());
break;
case "common_server_ip":
en.setDestination(objectEntity.getCommon_server_ip());
break;
case "common_internal_ip":
en.setSource(objectEntity.getCommon_internal_ip());
break;
case "common_external_ip":
en.setDestination(objectEntity.getCommon_external_ip());
break;
case "http_domain":
en.setDomain(objectEntity.getHttp_domain());
break;
case "common_subscriber_id":
en.setSubscriber_id(objectEntity.getCommon_subscriber_id());
break;
case "common_app_label":
en.setApp_name(objectEntity.getCommon_app_label());
break;
default:
}
return en;
}
public PacketResultEntity enrichPacketResult(Long time,Entity objectEntity){
PacketResultEntity en =new PacketResultEntity();
en.setVsys_id(objectEntity.getCommon_vsys_id());
en.setStat_time(time);
en.setSource(objectEntity.getCommon_client_ip());
en.setSession_num(objectEntity.getCommon_sessions());
en.setOrder_by("packets");
en.setDevice_group(objectEntity.getCommon_device_group());
en.setData_center(objectEntity.getCommon_data_center());
en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
switch(key) {
case "common_client_ip":
en.setSource(objectEntity.getCommon_client_ip());
break;
case "common_server_ip":
en.setDestination(objectEntity.getCommon_server_ip());
break;
case "common_internal_ip":
en.setSource(objectEntity.getCommon_internal_ip());
break;
case "common_external_ip":
en.setDestination(objectEntity.getCommon_external_ip());
break;
case "common_subscriber_id":
en.setSubscriber_id(objectEntity.getCommon_subscriber_id());
break;
case "common_app_label":
en.setApp_name(objectEntity.getCommon_app_label());
break;
default:
}
return en;
}
}

View File

@@ -0,0 +1,43 @@
package com.galaxy.tsg.function;
import com.alibaba.fastjson.JSONObject;
import com.galaxy.tsg.pojo.AppEntity;
import com.galaxy.tsg.pojo.Entity;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class metricsCalculateForApp extends ProcessWindowFunction<
Entity, // 输入类型
String, // 输出类型
Tuple4<String,Long,String,String>, // 键类型
TimeWindow> { // 窗口类型
@Override
public void process(Tuple4<String,Long,String,String> s,
Context context,
Iterable<Entity> elements, Collector<String> out) throws Exception {
for (Entity objectEntity: elements) {
AppEntity en =new AppEntity();
en.setVsys_id(objectEntity.getCommon_vsys_id());
en.setStat_time(context.window().getEnd() / 1000);
en.setSession_num(objectEntity.getCommon_sessions());
en.setApp_name(objectEntity.getCommon_app_label());
en.setDevice_group(objectEntity.getCommon_device_group());
en.setData_center(objectEntity.getCommon_data_center());
en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num());
en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num());
en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num());
en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num());
String jsonStr = JSONObject.toJSONString(en);
out.collect(jsonStr);
}
}
}

View File

@@ -0,0 +1,70 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.ResultEntity;
import com.galaxy.tsg.pojo.TopUrlEntity;
import com.galaxy.tsg.pojo.UrlEntity;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.PriorityQueue;
public class metricsCalculateForUrl extends ProcessWindowFunction<
UrlEntity, // 输入类型
ResultEntity, // 输出类型
Tuple2<String,Long>, // 键类型
TimeWindow> { // 窗口类型
private final int topSize;
// Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
private PriorityQueue<TopUrlEntity> sessionOrderEntity;
public metricsCalculateForUrl(int i) {
this.topSize = i;
this.sessionOrderEntity = new PriorityQueue<>();
}
@Override
public void process(Tuple2 < String, Long > s,
Context context,
Iterable<UrlEntity> elements, Collector<ResultEntity> out) throws Exception {
for (UrlEntity objectEntity: elements) {
if (sessionOrderEntity.size() < topSize) {
TopUrlEntity en = new TopUrlEntity();
en.setSession_num(objectEntity.getCommon_sessions());
en.setStat_time(context.window().getEnd() / 1000);
en.setUrl(objectEntity.getHttp_url());
en.setVsys_id(objectEntity.getCommon_vsys_id());
sessionOrderEntity.add(en);
} else {
if (sessionOrderEntity.peek() != null) {
TopUrlEntity res = sessionOrderEntity.peek();
if (res.getSession_num() <= objectEntity.getCommon_sessions()) {
sessionOrderEntity.poll();
TopUrlEntity en = new TopUrlEntity();
en.setSession_num(objectEntity.getCommon_sessions());
en.setStat_time(context.window().getEnd() / 1000);
en.setUrl(objectEntity.getHttp_url());
en.setVsys_id(objectEntity.getCommon_vsys_id());
sessionOrderEntity.add(en);
}
}
}
}
while (sessionOrderEntity.size() > 0) {
TopUrlEntity obj = sessionOrderEntity.peek();
ResultEntity resultEntity = new ResultEntity();
resultEntity.setOrder_by("sessions");
resultEntity.setStat_time(context.window().getEnd() / 1000);
resultEntity.setTopUrlEntity(obj);
out.collect(resultEntity);
sessionOrderEntity.remove();
}
}
}

View File

@@ -0,0 +1,95 @@
package com.galaxy.tsg.pojo;
public class AppEntity {
private Long vsys_id ;
private Long session_num ;
private Long c2s_pkt_num ;
private Long s2c_pkt_num;
private Long c2s_byte_num ;
private Long s2c_byte_num ;
private String device_group;
private String data_center;
private Long stat_time;
private String app_name;
public Long getVsys_id() {
return vsys_id;
}
public void setVsys_id(Long vsys_id) {
this.vsys_id = vsys_id;
}
public Long getSession_num() {
return session_num;
}
public void setSession_num(Long session_num) {
this.session_num = session_num;
}
public Long getC2s_pkt_num() {
return c2s_pkt_num;
}
public void setC2s_pkt_num(Long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public Long getS2c_pkt_num() {
return s2c_pkt_num;
}
public void setS2c_pkt_num(Long s2c_pkt_num) {
this.s2c_pkt_num = s2c_pkt_num;
}
public Long getC2s_byte_num() {
return c2s_byte_num;
}
public void setC2s_byte_num(Long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public Long getS2c_byte_num() {
return s2c_byte_num;
}
public void setS2c_byte_num(Long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
public String getApp_name() {
return app_name;
}
public void setApp_name(String app_name) {
this.app_name = app_name;
}
}

View File

@@ -0,0 +1,152 @@
package com.galaxy.tsg.pojo;
public class ByteResultEntity implements Comparable<ByteResultEntity> {
private String source ;
private Long vsys_id ;
private Long session_num ;
private Long c2s_pkt_num ;
private Long s2c_pkt_num;
private Long c2s_byte_num ;
private Long s2c_byte_num ;
private String order_by;
private String device_group;
private String data_center;
private Long stat_time;
private String destination ;
private String domain;
private String subscriber_id;
private String app_name;
public String getApp_name() {
return app_name;
}
public void setApp_name(String app_name) {
this.app_name = app_name;
}
public String getSubscriber_id() {
return subscriber_id;
}
public void setSubscriber_id(String subscriber_id) {
this.subscriber_id = subscriber_id;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public Long getVsys_id() {
return vsys_id;
}
public void setVsys_id(Long vsys_id) {
this.vsys_id = vsys_id;
}
public Long getSession_num() {
return session_num;
}
public void setSession_num(Long session_num) {
this.session_num = session_num;
}
public Long getC2s_pkt_num() {
return c2s_pkt_num;
}
public void setC2s_pkt_num(Long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public Long getS2c_pkt_num() {
return s2c_pkt_num;
}
public void setS2c_pkt_num(Long s2c_pkt_num) {
this.s2c_pkt_num = s2c_pkt_num;
}
public Long getC2s_byte_num() {
return c2s_byte_num;
}
public void setC2s_byte_num(Long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public Long getS2c_byte_num() {
return s2c_byte_num;
}
public void setS2c_byte_num(Long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public String getOrder_by() {
return order_by;
}
public void setOrder_by(String order_by) {
this.order_by = order_by;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
@Override
public int compareTo(ByteResultEntity per) {
if(this.session_num>=per.session_num){
return 1 ;
}else{
return -1 ;
}
}
}

View File

@@ -0,0 +1,201 @@
package com.galaxy.tsg.pojo;
import java.io.Serializable;
public class Entity implements Serializable {
public int ifError;
public String common_client_ip ;
public String common_app_label ;
public long common_recv_time ;
public String common_schema_type ;
public String common_server_ip ;
public String http_host ;
public String http_domain ;
public long common_vsys_id ;
public String common_device_group ;
public String common_data_center;
public String common_l4_protocol;
public String common_internal_ip;
public String common_external_ip;
public String common_subscriber_id;
public long common_sessions;
public long common_c2s_pkt_num;
public long common_s2c_pkt_num;
public long common_c2s_byte_num ;
public long common_s2c_byte_num ;
public String key_by;
public Entity() {
}
public String getKey_by() {
return key_by;
}
public void setKey_by(String key_by) {
this.key_by = key_by;
}
public int getIfError() {
return ifError;
}
public void setIfError(int ifError) {
this.ifError = ifError;
}
public String getCommon_client_ip() {
return common_client_ip;
}
public void setCommon_client_ip(String common_client_ip) {
this.common_client_ip = common_client_ip;
}
public String getCommon_app_label() {
return common_app_label;
}
public void setCommon_app_label(String common_app_label) {
this.common_app_label = common_app_label;
}
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public String getCommon_schema_type() {
return common_schema_type;
}
public void setCommon_schema_type(String common_schema_type) {
this.common_schema_type = common_schema_type;
}
public String getCommon_server_ip() {
return common_server_ip;
}
public void setCommon_server_ip(String common_server_ip) {
this.common_server_ip = common_server_ip;
}
public String getHttp_host() {
return http_host;
}
public void setHttp_host(String http_host) {
this.http_host = http_host;
}
public String getHttp_domain() {
return http_domain;
}
public void setHttp_domain(String http_domain) {
this.http_domain = http_domain;
}
public long getCommon_vsys_id() {
return common_vsys_id;
}
public void setCommon_vsys_id(long common_vsys_id) {
this.common_vsys_id = common_vsys_id;
}
public String getCommon_device_group() {
return common_device_group;
}
public void setCommon_device_group(String common_device_group) {
this.common_device_group = common_device_group;
}
public String getCommon_data_center() {
return common_data_center;
}
public void setCommon_data_center(String common_data_center) {
this.common_data_center = common_data_center;
}
public String getCommon_l4_protocol() {
return common_l4_protocol;
}
public void setCommon_l4_protocol(String common_l4_protocol) {
this.common_l4_protocol = common_l4_protocol;
}
public String getCommon_internal_ip() {
return common_internal_ip;
}
public void setCommon_internal_ip(String common_internal_ip) {
this.common_internal_ip = common_internal_ip;
}
public String getCommon_external_ip() {
return common_external_ip;
}
public void setCommon_external_ip(String common_external_ip) {
this.common_external_ip = common_external_ip;
}
public String getCommon_subscriber_id() {
return common_subscriber_id;
}
public void setCommon_subscriber_id(String common_subscriber_id) {
this.common_subscriber_id = common_subscriber_id;
}
public long getCommon_sessions() {
return common_sessions;
}
public void setCommon_sessions(long common_sessions) {
this.common_sessions = common_sessions;
}
public long getCommon_c2s_pkt_num() {
return common_c2s_pkt_num;
}
public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
this.common_c2s_pkt_num = common_c2s_pkt_num;
}
public long getCommon_s2c_pkt_num() {
return common_s2c_pkt_num;
}
public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
this.common_s2c_pkt_num = common_s2c_pkt_num;
}
public long getCommon_c2s_byte_num() {
return common_c2s_byte_num;
}
public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
this.common_c2s_byte_num = common_c2s_byte_num;
}
public long getCommon_s2c_byte_num() {
return common_s2c_byte_num;
}
public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
this.common_s2c_byte_num = common_s2c_byte_num;
}
}

View File

@@ -0,0 +1,163 @@
package com.galaxy.tsg.pojo;
public class PacketResultEntity implements Comparable<PacketResultEntity>, Cloneable {
private String source ;
private Long vsys_id ;
private Long session_num ;
private Long c2s_pkt_num ;
private Long s2c_pkt_num;
private Long c2s_byte_num ;
private Long s2c_byte_num ;
private String order_by;
private String device_group;
private String data_center;
private Long stat_time;
private String destination ;
private String domain;
private String subscriber_id;
private String app_name;
public String getApp_name() {
return app_name;
}
public void setApp_name(String app_name) {
this.app_name = app_name;
}
public String getSubscriber_id() {
return subscriber_id;
}
public void setSubscriber_id(String subscriber_id) {
this.subscriber_id = subscriber_id;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public Long getVsys_id() {
return vsys_id;
}
public void setVsys_id(Long vsys_id) {
this.vsys_id = vsys_id;
}
public Long getSession_num() {
return session_num;
}
public void setSession_num(Long session_num) {
this.session_num = session_num;
}
public Long getC2s_pkt_num() {
return c2s_pkt_num;
}
public void setC2s_pkt_num(Long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public Long getS2c_pkt_num() {
return s2c_pkt_num;
}
public void setS2c_pkt_num(Long s2c_pkt_num) {
this.s2c_pkt_num = s2c_pkt_num;
}
public Long getC2s_byte_num() {
return c2s_byte_num;
}
public void setC2s_byte_num(Long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public Long getS2c_byte_num() {
return s2c_byte_num;
}
public void setS2c_byte_num(Long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public String getOrder_by() {
return order_by;
}
public void setOrder_by(String order_by) {
this.order_by = order_by;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
@Override
public int compareTo(PacketResultEntity per) {
if(this.session_num>=per.session_num){
return 1 ;
}else{
return -1 ;
}
}
/* @Override public Object clone() {
PacketResultEntity obj;
try {
obj = (PacketResultEntity) super.clone();
} catch (CloneNotSupportedException e) {
obj = new PacketResultEntity(this.source,this.vsys_id,this.session_num,this.c2s_pkt_num,this.s2c_pkt_num,this.c2s_byte_num,
this.s2c_byte_num,this.order_by,this.device_group,this.data_center,this.stat_time);
}
return obj;
}
*/
}

View File

@@ -0,0 +1,63 @@
package com.galaxy.tsg.pojo;
public class ResultEntity {
private String order_by;
private Long stat_time;
private SessionResultEntity sessionResultEntity;
private PacketResultEntity packetResultEntity;
private ByteResultEntity byteResultEntity;
private TopUrlEntity topUrlEntity;
public String getOrder_by() {
return order_by;
}
public TopUrlEntity getTopUrlEntity() {
return topUrlEntity;
}
public void setTopUrlEntity(TopUrlEntity topUrlEntity) {
this.topUrlEntity = topUrlEntity;
}
public void setOrder_by(String order_by) {
this.order_by = order_by;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
public SessionResultEntity getSessionResultEntity() {
return sessionResultEntity;
}
public void setSessionResultEntity(SessionResultEntity sessionResultEntity) {
this.sessionResultEntity = sessionResultEntity;
}
public PacketResultEntity getPacketResultEntity() {
return packetResultEntity;
}
public void setPacketResultEntity(PacketResultEntity packetResultEntity) {
this.packetResultEntity = packetResultEntity;
}
public ByteResultEntity getByteResultEntity() {
return byteResultEntity;
}
public void setByteResultEntity(ByteResultEntity byteResultEntity) {
this.byteResultEntity = byteResultEntity;
}
}

View File

@@ -0,0 +1,151 @@
package com.galaxy.tsg.pojo;
public class SessionResultEntity implements Comparable<SessionResultEntity> {
private String source ;
private Long vsys_id ;
private Long session_num ;
private Long c2s_pkt_num ;
private Long s2c_pkt_num;
private Long c2s_byte_num ;
private Long s2c_byte_num ;
private String order_by;
private String device_group;
private String data_center;
private Long stat_time;
private String destination ;
private String domain;
private String subscriber_id;
private String app_name;
public String getApp_name() {
return app_name;
}
public void setApp_name(String app_name) {
this.app_name = app_name;
}
public String getSubscriber_id() {
return subscriber_id;
}
public void setSubscriber_id(String subscriber_id) {
this.subscriber_id = subscriber_id;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public Long getVsys_id() {
return vsys_id;
}
public void setVsys_id(Long vsys_id) {
this.vsys_id = vsys_id;
}
public Long getSession_num() {
return session_num;
}
public void setSession_num(Long session_num) {
this.session_num = session_num;
}
public Long getC2s_pkt_num() {
return c2s_pkt_num;
}
public void setC2s_pkt_num(Long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public Long getS2c_pkt_num() {
return s2c_pkt_num;
}
public void setS2c_pkt_num(Long s2c_pkt_num) {
this.s2c_pkt_num = s2c_pkt_num;
}
public Long getC2s_byte_num() {
return c2s_byte_num;
}
public void setC2s_byte_num(Long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public Long getS2c_byte_num() {
return s2c_byte_num;
}
public void setS2c_byte_num(Long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public String getOrder_by() {
return order_by;
}
public void setOrder_by(String order_by) {
this.order_by = order_by;
}
public String getDevice_group() {
return device_group;
}
public void setDevice_group(String device_group) {
this.device_group = device_group;
}
public String getData_center() {
return data_center;
}
public void setData_center(String data_center) {
this.data_center = data_center;
}
public Long getStat_time() {
return stat_time;
}
public void setStat_time(Long stat_time) {
this.stat_time = stat_time;
}
@Override
public int compareTo(SessionResultEntity per) {
if(this.session_num>=per.session_num){
return 1 ;
}else{
return -1 ;
}
}
}

View File

@@ -0,0 +1,54 @@
package com.galaxy.tsg.pojo;
public class TopUrlEntity implements Comparable<TopUrlEntity>{
public long stat_time ;
public String url ;
public long vsys_id ;
public long session_num;
public long getStat_time() {
return stat_time;
}
public void setStat_time(long stat_time) {
this.stat_time = stat_time;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public long getVsys_id() {
return vsys_id;
}
public void setVsys_id(long vsys_id) {
this.vsys_id = vsys_id;
}
public long getSession_num() {
return session_num;
}
public void setSession_num(long session_num) {
this.session_num = session_num;
}
@Override
public int compareTo(TopUrlEntity per) {
if(this.session_num>=per.session_num){
return 1 ;
}else{
return -1 ;
}
}
}

View File

@@ -0,0 +1,55 @@
package com.galaxy.tsg.pojo;
public class UrlEntity {
public long common_recv_time ;
public String http_url ;
public long common_vsys_id ;
public long common_sessions;
public int ifError;
public int getIfError() {
return ifError;
}
public void setIfError(int ifError) {
this.ifError = ifError;
}
public long getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public String getHttp_url() {
return http_url;
}
public void setHttp_url(String http_url) {
this.http_url = http_url;
}
public long getCommon_vsys_id() {
return common_vsys_id;
}
public void setCommon_vsys_id(long common_vsys_id) {
this.common_vsys_id = common_vsys_id;
}
public long getCommon_sessions() {
return common_sessions;
}
public void setCommon_sessions(long common_sessions) {
this.common_sessions = common_sessions;
}
}

View File

@@ -0,0 +1,109 @@
package com.galaxy.tsg.util;
import com.galaxy.tsg.config.commonConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
public class KafkaUtils {
public static Properties getKafkaSourceProperty() {
Properties properties = new Properties();
properties.setProperty("group.id", commonConfig.KAFKA_CONSUMER_GROUP_ID);
properties.setProperty("bootstrap.servers", commonConfig.KAFKA_CONSUMER_BROKER);
properties.setProperty("session.timeout.ms", commonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS);
properties.setProperty("max.poll.records", commonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD);
properties.setProperty("max.partition.fetch.bytes", commonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES);
switch (commonConfig.KAFKA_SECURITY) {
case 1:
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", commonConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", commonConfig.KAFKA_PIN);
properties.put("ssl.truststore.location", commonConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", commonConfig.KAFKA_PIN);
properties.put("ssl.key.password", commonConfig.KAFKA_PIN);
break;
case 2:
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ commonConfig.KAFKA_USER + " password=" + commonConfig.KAFKA_PIN + ";");
break;
default:
}
return properties;
}
private static Properties getKafkaSinkProperty() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", commonConfig.KAFKA_PRODUCER_BROKER);
properties.put("acks", "1");
properties.put("retries", commonConfig.KAFKA_PRODUCER_RETRIES);
properties.put("linger.ms", commonConfig.KAFKA_PRODUCER_LINGER_MS);
properties.put("request.timeout.ms", commonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS);
properties.put("batch.size", commonConfig.KAFKA_PRODUCER_BATCH_SIZE);
properties.put("buffer.memory", commonConfig.KAFKA_PRODUCER_BUFFER_MEMORY);
properties.put("max.request.size", commonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE);
properties.put("compression.type", commonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE);
switch (commonConfig.KAFKA_SECURITY) {
case 1:
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", commonConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", commonConfig.KAFKA_PIN);
properties.put("ssl.truststore.location", commonConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", commonConfig.KAFKA_PIN);
properties.put("ssl.key.password", commonConfig.KAFKA_PIN);
break;
case 2:
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ commonConfig.KAFKA_USER + " password=" + commonConfig.KAFKA_PIN + ";");
break;
default:
}
return properties;
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), getKafkaSourceProperty());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(), getKafkaSourceProperty());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
public static SinkFunction<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty(),
Optional.empty()
);
}
}