diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java index 7329789..405895b 100644 --- a/src/main/java/com/galaxy/tsg/Toptask.java +++ b/src/main/java/com/galaxy/tsg/Toptask.java @@ -13,13 +13,11 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +114,9 @@ public class Toptask { .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); windoweddStream.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); + + + //serverip聚合TOP SingleOutputStreamOperator serveripdStream = inputForSession.filter(new FilterFunction() { @@ -384,7 +385,131 @@ public class Toptask { break; + case 3: + SingleOutputStreamOperator clientipdStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + SingleOutputStreamOperator windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM); + DataStream windoweddStream3 = windowedStream3.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(1); + windoweddStream3.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(1); + + + + SingleOutputStreamOperator serveripdStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + SingleOutputStreamOperator windowedStreamForServerIp3 = serveripdStream3.keyBy(new groupBySelector("common_server_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM); + DataStream windoweddStreamForServerIp3 = windowedStreamForServerIp3.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(1); + windoweddStreamForServerIp3.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(1); + + + //common_internal_ip聚合TOP + SingleOutputStreamOperator internalStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return StringUtil.isNotEmpty(value.getCommon_internal_ip()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + SingleOutputStreamOperator windowedStreamForInternal3 = internalStream3.keyBy(new groupBySelector("common_internal_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM); + DataStream WindoweddStreamForInternal3 = windowedStreamForInternal3.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(1); + WindoweddStreamForInternal3.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(1); + + //common_external_ip聚合TOP + + SingleOutputStreamOperator externalStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return StringUtil.isNotEmpty(value.getCommon_external_ip()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + SingleOutputStreamOperator windowedStreamForExternal3= externalStream3.keyBy(new groupBySelector("common_external_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM); + DataStream WindoweddStreamForExternal3 = windowedStreamForExternal3.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(1); + WindoweddStreamForExternal3.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(1); + + //http_domain聚合TOP + + SingleOutputStreamOperator domainStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return StringUtil.isNotEmpty(value.getHttp_domain()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + SingleOutputStreamOperator windowedStreamForDomain3 = domainStream3.keyBy(new groupBySelector("http_domain")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM); + DataStream WindoweddStreamForDomain3 = windowedStreamForDomain3.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(1); + WindoweddStreamForDomain3.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(1); + + SingleOutputStreamOperator userStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return StringUtil.isNotEmpty(value.getCommon_subscriber_id()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + //common_subscriber_id聚合TOP + SingleOutputStreamOperator windowedStreamForUser3 = userStream3.keyBy(new groupBySelector("common_subscriber_id")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM); + DataStream WindoweddStreamForUser3 = windowedStreamForUser3.keyBy(new oneKeySelector()) + .process(new TopNHotItems(TOP_LIMIT)).setParallelism(1); + WindoweddStreamForUser3.addSink(getKafkaSink("TOP-USER")).setParallelism(1); + + SingleOutputStreamOperator appNameStream3 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return StringUtil.isNotEmpty(value.getCommon_app_label()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + + //common_app_label聚合求全量 + appNameStream3.keyBy(new groupBySelector("common_app_label")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); + + + SingleOutputStreamOperator UrlStream3 = inputForUrl.filter(new FilterFunction() { + @Override + public boolean filter(UrlEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getHttp_url()); + } + }).assignTimestampsAndWatermarks(strategyForSecurity); + + //url聚合session求top + SingleOutputStreamOperator windowedStreamForUrl3 = UrlStream3.keyBy(new twoKeySelector()) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .reduce(new UrlAggregationReduce(), new metricsCalculateForUrl(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM); + DataStream WindoweddStreamForUrl3 = windowedStreamForUrl3.keyBy(new oneKeySelector()) + .process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1); + WindoweddStreamForUrl3.addSink(getKafkaSink("TOP-URLS")).setParallelism(1); + + break; diff --git a/src/main/java/com/galaxy/tsg/function/TopNHotItems.java b/src/main/java/com/galaxy/tsg/function/TopNHotItems.java index 8a28b7e..9186636 100644 --- a/src/main/java/com/galaxy/tsg/function/TopNHotItems.java +++ b/src/main/java/com/galaxy/tsg/function/TopNHotItems.java @@ -87,6 +87,17 @@ public class TopNHotItems extends KeyedProcessFunction, ResultEn } break; default: + if (byteOrderEntity.size() < topSize) { + byteOrderEntity.add(objectEntity.getByteResultEntity()); + } else { + if (byteOrderEntity.peek() != null) { + ByteResultEntity res=byteOrderEntity.peek(); + if ((res.getC2s_byte_num()+res.getS2c_byte_num()) <= (objectEntity.getByteResultEntity().getS2c_byte_num()+objectEntity.getByteResultEntity().getC2s_byte_num())) { + byteOrderEntity.poll(); + byteOrderEntity.add(objectEntity.getByteResultEntity()); + } + } + } } //注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据 diff --git a/src/main/java/com/galaxy/tsg/pojo/Entity.java b/src/main/java/com/galaxy/tsg/pojo/Entity.java index be2b658..2d0b71f 100644 --- a/src/main/java/com/galaxy/tsg/pojo/Entity.java +++ b/src/main/java/com/galaxy/tsg/pojo/Entity.java @@ -27,9 +27,19 @@ public class Entity implements Serializable { public long common_s2c_byte_num ; public String key_by; + public int count; + public Entity() { } + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + public String getKey_by() { return key_by; } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 13cda90..b27ad22 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -56,4 +56,4 @@ kafka.producer.compression.type=none kafka_producer_broker=192.168.44.12:9092 -tmp.test.type=2 \ No newline at end of file +tmp.test.type=3 \ No newline at end of file