21.12 update

This commit is contained in:
zhanghongqing
2021-12-06 16:25:54 +08:00
parent 96164008dc
commit 351b5270f2
27 changed files with 17220 additions and 43 deletions

View File

@@ -1,42 +1 @@
#1.指定基础镜像,并且必须是第一条指令
#FROM alpine:latest
FROM alpine:3.15.0
#2.指明该镜像的作者和其电子邮件
MAINTAINER galaxy
#3.在构建镜像时,指定镜像的工作目录,之后的命令都是基于此工作目录,如果不存在,则会创建目录
WORKDIR /opt/jdk1.8.0_73
#4.将一些安装包复制到镜像中语法ADD/COPY <src>... <dest>
## ADD与COPY的区别ADD复制并解压COPY仅复制
## 注意~~~上传的瘦身后的jre
COPY jdk1.8.0_73 /opt/jdk1.8.0_73
## glibc安装包如果从网络下载速度实在是太慢了先提前下载复制到镜像中
#COPY glibc-2.29-r0.apk /opt/jdk/
#COPY glibc-bin-2.29-r0.apk /opt/jdk/
#COPY glibc-i18n-2.29-r0.apk /opt/jdk/
#5.更新Alpine的软件源为阿里云因为从默认官源拉取实在太慢了
#RUN echo http://mirrors.aliyun.com/alpine/v3.10/main/ > /etc/apk/repositories && \
# echo http://mirrors.aliyun.com/alpine/v3.10/community/ >> /etc/apk/repositories
#RUN apk update && apk upgrade
#6.运行指定的命令
## Alpine linux为了精简本身并没有安装太多的常用软件,apk类似于ubuntu的apt-get
## 用来安装一些常用软V件其语法如下apk add bash wget curl git make vim docker
## wget是linux下的ftp/http传输工具没安装会报错“/bin/sh:   wget: not found”网上例子少安装wget
## ca-certificates证书服务是安装glibc前置依赖
RUN apk --no-cache add ca-certificates wget \
&& wget -q -O /etc/apk/keys/sgerrand.rsa.pub https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub \
&& apk add glibc-2.29-r0.apk glibc-bin-2.29-r0.apk glibc-i18n-2.29-r0.apk \
&& rm -rf /var/cache/apk/* glibc-2.29-r0.apk glibc-bin-2.29-r0.apk glibc-i18n-2.29-r0.apk
#7.配置环境变量
## 注意~~~没有jdk啦直接指向jre
ENV JAVA_HOME=/opt/jdk1.8.0_73
ENV PATH=$JAVA_HOME/bin:$PATH
ENV CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
#容器启动时需要执行的命令
#CMD ["java","-version"]
jdk1.8.0_202

View File

@@ -52,7 +52,7 @@
"completionTimeout": "PT30M",
"earlyMessageRejectionPeriod": "PT6H",
"consumerProperties": {
"bootstrap.servers": "192.168.44.12:9094",
"bootstrap.servers": "kafkabootstrap",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"galaxy2019\";"

View File

@@ -0,0 +1,24 @@
#管理kafka地址
input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#input.kafka.servers=192.168.44.12:9094
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.11,192.168.44.14,192.168.44.15
#hbase.zookeeper.servers=192.168.44.11:2181
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
input.kafka.topic=RADIUS-RECORD
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=radius-flink-20211124
#--------------------------------topology配置------------------------------#
#ip-account对应关系表
hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
#定位库地址
tools.library=/home/bigdata/topology/dat/
#account-ip对应关系表
hbase.account.table.name=tsg_galaxy:relation_account_framedip

View File

@@ -0,0 +1,24 @@
#管理kafka地址
input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#input.kafka.servers=192.168.44.12:9094
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.11,192.168.44.14,192.168.44.15
#hbase.zookeeper.servers=192.168.44.11:2181
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
input.kafka.topic=RADIUS-RECORD
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=radius-flink-20211124
#--------------------------------topology配置------------------------------#
#ip-account对应关系表
hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
#定位库地址
tools.library=/home/bigdata/topology/dat/
#account-ip对应关系表
hbase.account.table.name=tsg_galaxy:relation_account_framedip

View File

@@ -0,0 +1,33 @@
#! /bin/bash
#启动storm任务脚本
source /etc/profile
#任务jar所在目录
BASE_DIR=`pwd`
#jar name
JAR_NAME='radius-relation-21-12-06.jar'
#cd $BASE_DIR
jar -xvf $BASE_DIR/$JAR_NAME service_flow_config.properties
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
num=`flink list | grep "$file" | wc -l`
if [ $num -eq "0" ];then
cat $1$file > $BASE_DIR/service_flow_config.properties
jar -uvf $BASE_DIR/$JAR_NAME service_flow_config.properties
flink run -d -p 1 $JAR_NAME
fi
fi
done
}
if [ $# != 1 ];then
echo "usage: ./startall.sh [Configuration path]"
exit 1
fi
#读取第一个参数 为配置文件目录名称
read_dir $1

View File

@@ -0,0 +1,19 @@
#!/bin/bash
#flink任务停止脚本
source /etc/profile
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jobid=`flink list | grep "$file" | awk '{print $4}'`
flink cancel $jobid
fi
done
}
#读取第一个参数 为配置文件目录名
read_dir $1

View File

@@ -0,0 +1,5 @@
# dos任务新增以下配置
#baseline ttl单位
hbase.baseline.ttl=30

View File

@@ -0,0 +1,415 @@
--
CREATE TABLE session_record_completed_log(
common_schema_type VARCHAR,
common_recv_time BIGINT,
common_client_ip VARCHAR,
common_server_ip VARCHAR,
http_host VARCHAR,
http_domain VARCHAR,
common_l4_protocol VARCHAR,
common_internal_ip VARCHAR,
common_external_ip VARCHAR,
common_subscriber_id VARCHAR,
common_app_label VARCHAR,
common_sessions BIGINT,
common_c2s_pkt_num BIGINT,
common_s2c_pkt_num BIGINT,
common_c2s_byte_num BIGINT,
common_s2c_byte_num BIGINT,
common_processing_time BIGINT,
stat_time as TO_TIMESTAMP(FROM_UNIXTIME(common_recv_time)),
WATERMARK FOR stat_time AS stat_time - INTERVAL '1' MINUTE)
WITH(
'connector' = 'kafka',
'properties.group.id' = 'kafka-indexing-service',
'topic' = 'SESSION-RECORD-COMPLETED',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
'scan.startup.mode' = 'latest-offset',
'sink.parallelism'='1',
--'sink.parallelism'='60',
'format' = 'json'
);
--client
CREATE TABLE top_client_ip_log(
source VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
order_by VARCHAR,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
)WITH(
'connector' = 'upsert-kafka',
'topic' = 'TOP-CLIENT-IP',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
--'sink.parallelism'='1',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_client_ip_view as
SELECT common_client_ip as source,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where common_l4_protocol = 'IPv6_TCP' or common_l4_protocol = 'IPv4_TCP'
group by common_client_ip,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_client_ip_log
(SELECT `source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'sessions' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY session_num DESC) as rownum
FROM
top_client_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'packets' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_pkt_num+s2c_pkt_num DESC) as rownum
FROM
top_client_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'bytes' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_byte_num+s2c_byte_num DESC) as rownum
FROM
top_client_ip_view)
WHERE rownum <= 1000);
--server:
CREATE TABLE top_server_ip_log(
destination VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
order_by VARCHAR,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
)WITH(
'connector' = 'upsert-kafka',
'topic' = 'TOP-SERVER-IP',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
--'sink.parallelism'='1',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_server_ip_view as
SELECT common_server_ip as `destination`,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where common_l4_protocol = 'IPv6_TCP' or common_l4_protocol = 'IPv4_TCP'
group by common_server_ip,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_server_ip_log
(SELECT `destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'sessions' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY session_num DESC) as rownum
FROM
top_server_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'packets' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_pkt_num+s2c_pkt_num DESC) as rownum
FROM
top_server_ip_view)
WHERE rownum <= 1000)
union all
(SELECT destination, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
destination, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'bytes' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_byte_num+s2c_byte_num DESC) as rownum
FROM
top_server_ip_view)
WHERE rownum <= 1000);
--internal
CREATE TABLE top_internal_ip_log (
source VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
order_by VARCHAR,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'TOP-INTERNAL-HOST',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
--'sink.parallelism'='1',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_common_internal_ip_view as
SELECT common_internal_ip as `source`,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where common_internal_ip<>''
group by common_internal_ip,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_internal_ip_log
(SELECT `source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'sessions' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY session_num DESC) as rownum
FROM
top_common_internal_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'packets' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_pkt_num+s2c_pkt_num DESC) as rownum
FROM
top_common_internal_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`source`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'bytes' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_byte_num+s2c_byte_num DESC) as rownum
FROM
top_common_internal_ip_view)
WHERE rownum <= 1000);
--external:
CREATE TABLE top_external_ip_log (
destination VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
order_by VARCHAR,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'TOP-EXTERNAL-HOST',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
--'sink.parallelism'='1',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_common_external_ip_view as
SELECT common_external_ip as `destination`,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where common_external_ip<>''
group by common_external_ip,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_external_ip_log
(SELECT `destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'sessions' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY session_num DESC) as rownum
FROM
top_common_external_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'packets' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_pkt_num+s2c_pkt_num DESC) as rownum
FROM
top_common_external_ip_view)
WHERE rownum <= 1000)
union all
(SELECT `destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`destination`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'bytes' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_byte_num+s2c_byte_num DESC) as rownum
FROM
top_common_external_ip_view)
WHERE rownum <= 1000);
--website_domain
CREATE TABLE top_website_domain_log (
domain VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
order_by VARCHAR,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'TOP-WEBSITE-DOMAIN',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
--'sink.parallelism'='1',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_website_domain_view as
SELECT http_domain as `domain`,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where http_domain<>''
group by http_domain,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_website_domain_log
(SELECT `domain`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`domain`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'sessions' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY session_num DESC) as rownum
FROM
top_website_domain_view)
WHERE rownum <= 1000)
union all
(SELECT `domain`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`domain`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'packets' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_pkt_num+s2c_pkt_num DESC) as rownum
FROM
top_website_domain_view)
WHERE rownum <= 1000)
union all
(SELECT `domain`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`domain`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'bytes' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_byte_num+s2c_byte_num DESC) as rownum
FROM
top_website_domain_view)
WHERE rownum <= 1000);
--user:
CREATE TABLE top_user_log (
subscriber_id VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
order_by VARCHAR,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'TOP-USER',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
--'sink.parallelism'='1',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_user_log_view as
SELECT common_subscriber_id as `subscriber_id`,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where common_subscriber_id <>''
group by common_subscriber_id,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_user_log
(SELECT `subscriber_id`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`subscriber_id`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'sessions' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY session_num DESC) as rownum
FROM
top_user_log_view)
WHERE rownum <= 1000)
union all
(SELECT `subscriber_id`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`subscriber_id`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'packets' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_pkt_num+s2c_pkt_num DESC) as rownum
FROM
top_user_log_view)
WHERE rownum <= 1000)
union all
(SELECT `subscriber_id`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,order_by,stat_time FROM
(SELECT
`subscriber_id`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,'bytes' as order_by,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time ORDER BY c2s_byte_num+s2c_byte_num DESC) as rownum
FROM
top_user_log_view)
WHERE rownum <= 1000);
--app
CREATE TABLE top_app_log (
app_name VARCHAR,
session_num BIGINT,
c2s_pkt_num BIGINT,
s2c_pkt_num BIGINT,
c2s_byte_num BIGINT,
s2c_byte_num BIGINT,
stat_time BIGINT,
PRIMARY KEY (stat_time) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'TRAFFIC-APP-STAT',
'properties.bootstrap.servers' = '192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094',
--'properties.bootstrap.servers' = '10.111.136.193:9092,10.111.136.194:9092,10.111.136.195:9092,10.111.136.196:9092,10.111.136.197:9092,10.111.136.198:9092,10.111.136.199:9092,10.111.136.200:9092,10.111.136.201:9092,10.111.136.203:9092,10.111.136.204:9092,10.111.136.205:9092,10.111.136.206:9092,10.111.136.207:9092,10.111.136.202:9092',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'= 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="galaxy2019";',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE VIEW top_app_log_view as
SELECT common_app_label as `app_name`,sum(common_sessions) as session_num,sum(common_c2s_pkt_num) as c2s_pkt_num,sum(common_s2c_pkt_num) as s2c_pkt_num,sum(common_c2s_byte_num) as c2s_byte_num,sum(common_s2c_byte_num) as s2c_byte_num,UNIX_TIMESTAMP(CAST(TUMBLE_END(stat_time,INTERVAL '5' MINUTE) as VARCHAR)) as stat_time
FROM session_record_completed_log
where common_app_label<>''
group by common_app_label,TUMBLE(stat_time,INTERVAL '5' MINUTE);
INSERT INTO top_app_log
(SELECT `app_name`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,stat_time FROM
(SELECT
`app_name`, session_num, c2s_pkt_num,s2c_pkt_num,c2s_byte_num,s2c_byte_num,stat_time,
ROW_NUMBER() OVER (PARTITION BY stat_time )
FROM
top_app_log_view));

View File

@@ -0,0 +1,38 @@
#kafka的地址信息
source.kafka.broker=192.168.44.11:9094
source.kafka.group.id =vpn-1206-1
source.kafka.topic=SESSION-RECORD-COMPLETED
source.kafka.parallelism=1
max.poll.records=3000
session.timeout.ms=60000
max.partition.fetch.bytes=31457280
#hbase的zk地址
zk.host=192.168.44.11:2181
#写入hbase并行度
sink.hbase.parallelism=1
#写入hbase列簇
sink.hbase.fm=common
#写入hbase表名
sink.hbase.table=tsg_galaxy:recommendation_app_cip
#任务并行度
task.parallelism=1
#窗口延迟等待时间单位秒
watermark.time=1
#top结果限制
top.limit=10000
#滑动窗口总时间单位分钟
slidingwindow.time.minute=30
#每个滑块时间单位分钟
slidingwindowslot.time.minute=1
#kafka是否开启安全验证 0不开启 1SSL 2 SASL
kafka.security=2
#kafka SASL验证用户名
kafka.user=admin
#kafka SASL及SSL验证密码
kafka.pin=galaxy2019
#1SSL需要
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#是否接受全量app 无过滤条件false 白名单过滤true
has.filter=false
#只计算filter命中的common_app_label逗号分隔 baidu.com,qq 可不填写
app.white.list=

View File

@@ -0,0 +1,38 @@
#kafka的地址信息
source.kafka.broker=192.168.44.11:9094
source.kafka.group.id =vpn-1206-1
source.kafka.topic=SESSION-RECORD-COMPLETED
source.kafka.parallelism=1
max.poll.records=3000
session.timeout.ms=60000
max.partition.fetch.bytes=31457280
#hbase的zk地址
zk.host=192.168.44.11:2181
#写入hbase并行度
sink.hbase.parallelism=1
#写入hbase列簇
sink.hbase.fm=common
#写入hbase表名
sink.hbase.table=tsg_galaxy:recommendation_app_cip
#任务并行度
task.parallelism=1
#窗口延迟等待时间单位秒
watermark.time=1
#top结果限制
top.limit=10000
#滑动窗口总时间单位分钟
slidingwindow.time.minute=30
#每个滑块时间单位分钟
slidingwindowslot.time.minute=1
#kafka是否开启安全验证 0不开启 1SSL 2 SASL
kafka.security=2
#kafka SASL验证用户名
kafka.user=admin
#kafka SASL及SSL验证密码
kafka.pin=galaxy2019
#1SSL需要
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#是否接受全量app 无过滤条件false 白名单过滤true
has.filter=false
#只计算filter命中的common_app_label逗号分隔 baidu.com,qq 可不填写
app.white.list=

View File

@@ -0,0 +1,33 @@
#! /bin/bash
#启动storm任务脚本
source /etc/profile
#任务jar所在目录
BASE_DIR=`pwd`
#jar name
JAR_NAME='flink-vpn-recommend-21-12-06.jar'
#cd $BASE_DIR
jar -xvf $BASE_DIR/$JAR_NAME common.properties
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
num=`flink list | grep "$file" | wc -l`
if [ $num -eq "0" ];then
cat $1$file > $BASE_DIR/common.properties
jar -uvf $BASE_DIR/$JAR_NAME common.properties
flink run -d -p 1 $JAR_NAME
fi
fi
done
}
if [ $# != 1 ];then
echo "usage: ./startall.sh [Configuration path]"
exit 1
fi
#读取第一个参数 为配置文件目录名称
read_dir $1

View File

@@ -0,0 +1,19 @@
#!/bin/bash
#flink任务停止脚本
source /etc/profile
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jobid=`flink list | grep "$file" | awk '{print $4}'`
flink cancel $jobid
fi
done
}
#读取第一个参数 为配置文件目录名
read_dir $1

View File

@@ -0,0 +1,353 @@
{
"type": "record",
"name": "dos_event",
"namespace": "tsg_galaxy_v3",
"doc": {
"primary_key": "log_id",
"partition_key": "start_time",
"functions": {
"aggregation": [
{
"name": "COUNT",
"label": "COUNT",
"function": "count(expr)"
},
{
"name": "COUNT_DISTINCT",
"label": "COUNT_DISTINCT",
"function": "count(distinct expr)"
},
{
"name": "AVG",
"label": "AVG",
"function": "avg(expr)"
},
{
"name": "SUM",
"label": "SUM",
"function": "sum(expr)"
},
{
"name": "MAX",
"label": "MAX",
"function": "max(expr)"
},
{
"name": "MIN",
"label": "MIN",
"function": "min(expr)"
}
],
"operator": [
{
"name": "=",
"label": "=",
"function": "expr = value"
},
{
"name": "!=",
"label": "!=",
"function": "expr != value"
},
{
"name": ">",
"label": ">",
"function": "expr > value"
},
{
"name": "<",
"label": "<",
"function": "expr < value"
},
{
"name": ">=",
"label": ">=",
"function": "expr >= value"
},
{
"name": "<=",
"label": "<=",
"function": "expr <= value"
},
{
"name": "has",
"label": "HAS",
"function": "has(expr, value)"
},
{
"name": "in",
"label": "IN",
"function": "expr in (values)"
},
{
"name": "not in",
"label": "NOT IN",
"function": "expr not in (values)"
},
{
"name": "like",
"label": "LIKE",
"function": "expr like value"
},
{
"name": "not like",
"label": "NOT LIKE",
"function": "expr not like value"
},
{
"name": "notEmpty",
"label": "NOT EMPTY",
"function": "notEmpty(expr)"
},
{
"name": "empty",
"label": "EMPTY",
"function": "empty(expr)"
}
]
},
"schema_query": {
"references": {
"aggregation": [
{
"type": "int",
"functions": "COUNT,COUNT_DISTINCT,AVG,SUM,MAX,MIN"
},
{
"type": "long",
"functions": "COUNT,COUNT_DISTINCT,AVG,SUM,MAX,MIN"
},
{
"type": "float",
"functions": "COUNT,COUNT_DISTINCT,AVG,SUM,MAX,MIN"
},
{
"type": "double",
"functions": "COUNT,COUNT_DISTINCT,AVG,SUM,MAX,MIN"
},
{
"type": "string",
"functions": "COUNT,COUNT_DISTINCT"
},
{
"type": "date",
"functions": "COUNT,COUNT_DISTINCT,MAX,MIN"
},
{
"type": "timestamp",
"functions": "COUNT,COUNT_DISTINCT,MAX,MIN"
}
],
"operator": [
{
"type": "int",
"functions": "=,!=,>,<,>=,<=,in,not in"
},
{
"type": "long",
"functions": "=,!=,>,<,>=,<=,in,not in"
},
{
"type": "float",
"functions": "=,!=,>,<,>=,<="
},
{
"type": "double",
"functions": "=,!=,>,<,>=,<="
},
{
"type": "string",
"functions": "=,!=,in,not in,like,not like,notEmpty,empty"
},
{
"type": "date",
"functions": "=,!=,>,<,>=,<="
},
{
"type": "timestamp",
"functions": "=,!=,>,<,>=,<="
},
{
"type": "array",
"functions": "has"
}
]
}
},
"default_columns": [
"log_id",
"attack_type",
"source_ip_list",
"destination_ip",
"severity",
"start_time",
"end_time",
"packet_rate",
"bit_rate",
"session_rate"
],
"internal_columns": [
"common_recv_time",
"common_log_id",
"common_processing_time"
]
},
"fields": [
{
"name": "start_time",
"label": "Start Time",
"doc": {
"allow_query": "true",
"constraints": {
"type": "timestamp"
}
},
"type": "long"
},
{
"name": "end_time",
"label": "End Time",
"doc": {
"constraints": {
"type": "timestamp"
}
},
"type": "long"
},
{
"name": "log_id",
"label": "Log ID",
"doc": {
"allow_query": "true",
"format": {
"functions": "snowflake_id"
}
},
"type": "long"
},
{
"name": "attack_type",
"label": "Attack Type",
"doc": {
"allow_query": "true",
"constraints": {
"operator_functions": "=,!="
},
"data": [
{
"code": "TCP SYN Flood",
"value": "TCP SYN Flood"
},
{
"code": "UDP Flood",
"value": "UDP Flood"
},
{
"code": "ICMP Flood",
"value": "ICMP Flood"
},
{
"code": "DNS Flood",
"value": "DNS Flood"
},
{
"code": "DNS Amplification",
"value": "DNS Amplification"
}
]
},
"type": "string"
},
{
"name": "severity",
"label": "Severity",
"doc": {
"allow_query": "true",
"constraints": {
"operator_functions": "=,!="
},
"data": [
{
"code": "Critical",
"value": "Critical"
},
{
"code": "Severe",
"value": "Severe"
},
{
"code": "Major",
"value": "Major"
},
{
"code": "Warning",
"value": "Warning"
},
{
"code": "Minor",
"value": "Minor"
}
]
},
"type": "string"
},
{
"name": "conditions",
"label": "Conditions",
"type": "string"
},
{
"name": "destination_ip",
"label": "Destination IP",
"doc": {
"allow_query": "true"
},
"type": "string"
},
{
"name": "destination_country",
"label": "Destination Country",
"type": "string"
},
{
"name": "source_ip_list",
"label": "Source IPs",
"type": "string"
},
{
"name": "source_country_list",
"label": "Source Countries",
"type": "string"
},
{
"name": "session_rate",
"label": "Sessions/s",
"doc": {
"constraints": {
"type": "sessions/sec"
}
},
"type": "long"
},
{
"name": "packet_rate",
"label": "Packets/s",
"doc": {
"constraints": {
"type": "packets/sec"
}
},
"type": "long"
},
{
"name": "bit_rate",
"label": "Bits/s",
"doc": {
"constraints": {
"type": "bits/sec"
}
},
"type": "long"
}
]
}

View File

@@ -0,0 +1,116 @@
--Q01.CK DateTime
select toDateTime(common_recv_time) as common_recv_time from session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) limit 20
--Q02.Standard DateTime
select FROM_UNIXTIME(common_recv_time) as common_recv_time from session_record where common_recv_time >= UNIX_TIMESTAMP(@start) and common_recv_time< UNIX_TIMESTAMP(@end) limit 20
--Q03.count(1)
select count(1) from session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end)
--Q04.count(*)
select count(*) from session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end)
--Q05.UDF APPROX_COUNT_DISTINCT_DS_HLL
SELECT policy_id, APPROX_COUNT_DISTINCT_DS_HLL(isp) as num FROM proxy_event_hits_log where __time >= @start and __time < @end and policy_id=0 group by policy_id
--Q06.UDF TIME_FLOOR_WITH_FILL
select TIME_FLOOR_WITH_FILL(common_recv_time,'PT5M','previous') as stat_time from session_record where common_recv_time > @start and common_recv_time < @end group by stat_time
--Q07.UDF GEO IP
select IP_TO_GEO(common_client_ip) as geo,IP_TO_CITY(common_server_ip) as city,IP_TO_COUNTRY(common_server_ip) as country from session_record limit 10
--Q08.Special characters
select * from session_record where (common_protocol_label ='/$' or common_client_ip like'%') limit 10
--Q09.Federation Query
select * from (select FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(common_recv_time,'PT5M','zero')) as stat_time from session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) group by stat_time order by stat_time asc)
--Q10.Catalog Database
select * from tsg_galaxy_v3.session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) limit 20
--Q11.Session Record Logs
select * from session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) AND @common_filter order by common_recv_time desc limit 20
--Q12.Live Session Record Logs
select * from interim_session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) AND @common_filter order by common_recv_time desc limit 20
--Q13.Transaction Record Logs
select * from transaction_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) order by common_recv_time desc limit 20
--Q14.Security Event Logs
select * from security_event where common_recv_time >= UNIX_TIMESTAMP(@start) and common_recv_time< UNIX_TIMESTAMP(@end) AND @common_filter order by common_recv_time desc limit 0,20
--Q15.Proxy Event Logs
select * from proxy_event where common_recv_time >= UNIX_TIMESTAMP(@start) and common_recv_time< UNIX_TIMESTAMP(@end) order by common_recv_time desc limit 0,20
--Q16.Radius Record Logs
select * from radius_record where common_recv_time >= UNIX_TIMESTAMP(@start) and common_recv_time< UNIX_TIMESTAMP(@end) order by common_recv_time desc limit 0,20
--Q17.GTPC Record Logs
select * from gtpc_record where common_recv_time >= UNIX_TIMESTAMP(@start) and common_recv_time< UNIX_TIMESTAMP(@end) order by common_recv_time desc limit 0,20
--Q18.Security Event Logs with fields
select FROM_UNIXTIME(common_recv_time) as common_recv_time,common_log_id,common_policy_id,common_subscriber_id,common_client_ip,common_client_port,common_l4_protocol,common_address_type,common_server_ip,common_server_port,common_action,common_direction,common_sled_ip,common_client_location,common_client_asn,common_server_location,common_server_asn,common_c2s_pkt_num,common_s2c_pkt_num,common_c2s_byte_num,common_s2c_byte_num,common_schema_type,common_sub_action,common_device_id, FROM_UNIXTIME(common_start_time) as common_start_time, FROM_UNIXTIME(common_end_time) as common_end_time,common_establish_latency_ms,common_con_duration_ms,common_stream_dir,common_stream_trace_id,http_url,http_host,http_domain,http_request_body,http_response_body,http_cookie,http_referer,http_user_agent,http_content_length,http_content_type,http_set_cookie,http_version,http_response_latency_ms,http_action_file_size,http_session_duration_ms,mail_protocol_type,mail_account,mail_from_cmd,mail_to_cmd,mail_from,mail_to,mail_cc,mail_bcc,mail_subject,mail_attachment_name,mail_eml_file,dns_message_id,dns_qr,dns_opcode,dns_aa,dns_tc,dns_rd,dns_ra,dns_rcode,dns_qdcount,dns_ancount,dns_nscount,dns_arcount,dns_qname,dns_qtype,dns_qclass,dns_cname,dns_sub,dns_rr,ssl_sni,ssl_san,ssl_cn,ssl_pinningst,ssl_intercept_state,ssl_server_side_latency,ssl_client_side_latency,ssl_server_side_version,ssl_client_side_version,ssl_cert_verify,ssl_error,quic_version,quic_sni,quic_user_agent,ftp_account,ftp_url,ftp_content from security_event where common_recv_time >= @start and common_recv_time < @end order by common_recv_time desc limit 10000
--Q19.Radius ON/OFF Logs For Frame IP
select framed_ip, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries from radius_onff_log where event_timestamp >=toDateTime(@start) and event_timestamp <toDateTime(@end) group by framed_ip limit 20
--Q20.Radius ON/OFF Logs For Account
select account, arraySlice(groupUniqArray(concat(toString(event_timestamp),':', if(acct_status_type=1,'start','stop'))),1,100000) as timeseries from radius_onff_log where event_timestamp >= @start and event_timestamp < @end group by account
--Q21.Radius ON/OFF Logs total Account number
select count(distinct(framed_ip)) as active_ip_num , sum(acct_session_time) as online_duration from (select any(framed_ip) as framed_ip ,max(acct_session_time) as acct_session_time from radius_onff_log where account='000jS' and event_timestamp >= @start and event_timestamp < @end group by acct_session_id)
--Q22.Radius ON/OFF Logs Account Access Detail
select max(if(acct_status_type=1,event_timestamp,0)) as start_time,max(if(acct_status_type=2,event_timestamp,0)) as end_time, any(framed_ip) as ip,max(acct_session_time) as online_duration from radius_onff_log where event_timestamp >= @start and event_timestamp < @end group by acct_session_id order by start_time desc limit 200
--Q23.Report for Client IP
select common_client_ip, count(*) as sessions from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@end)) group by common_client_ip order by sessions desc limit 0,100
--Q24.Report for Server IP
select common_server_ip, count(*) as sessions from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) group by common_server_ip order by sessions desc limit 0,100
--Q25.Report for SSL SNI
select ssl_sni, count(*) as sessions from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) group by ssl_sni order by sessions desc limit 0,100
--Q26.Report for SSL APP
select common_app_label as applicaiton, count(*) as sessions from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) group by applicaiton order by sessions desc limit 0,100
--Q27.Report for Domains
select http_domain AS domain,SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM session_record WHERE common_recv_time >= toStartOfDay(toDateTime(@start))-86400 AND common_recv_time < toStartOfDay(toDateTime(@start)) and notEmpty(domain) GROUP BY domain ORDER BY bytes DESC LIMIT 100
--Q28.Report for Domains with unique Client IP
select toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300) as stat_time, http_domain, uniq (common_client_ip) as nums from session_record where common_recv_time >= toStartOfDay(toDateTime(@start))-86400 AND common_recv_time < toStartOfDay(toDateTime(@start)) and http_domain in (select http_domain from session_record where common_recv_time >= toStartOfDay(toDateTime(@start))-86400 AND common_recv_time < toStartOfDay(toDateTime(@start)) and notEmpty(http_domain) group by http_domain order by SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) desc limit 10 ) group by toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300), http_domain order by stat_time asc limit 500
--Q29. Report for HTTP Host
SELECT http_host as host, SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM session_record WHERE common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) and notEmpty(http_host) GROUP BY host ORDER BY bytes DESC limit 100 union all SELECT 'totals' as host, SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes, SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes, SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) and notEmpty(http_host)
--Q30.Report for HTTP/HTTPS URLS with Sessions
SELECT http_url AS url,count(*) AS sessions FROM proxy_event WHERE common_recv_time >= toStartOfDay(toDateTime(@start))-86400 AND common_recv_time < toStartOfDay(toDateTime(@start)) and notEmpty(http_url) GROUP BY url ORDER BY sessions DESC LIMIT 100
--Q31.Report for HTTP/HTTPS URLS with UNIQUE Client IP
select toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300) as stat_time, http_url, count(distinct(common_client_ip)) as nums from proxy_event where common_recv_time >= toStartOfDay(toDateTime(@start))-86400 AND common_recv_time < toStartOfDay(toDateTime(@start)) and http_url IN (select http_url from proxy_event where common_recv_time >= toStartOfDay(toDateTime(@start))-86400 AND common_recv_time < toStartOfDay(toDateTime(@start)) and notEmpty(http_url) group by http_url order by count(*) desc limit 10 ) group by toDateTime(intDiv(toUInt32(toDateTime(toDateTime(common_recv_time))), 300)*300), http_url order by stat_time asc limit 500
--Q32.Report for Subscriber ID with Sessions
select common_subscriber_id as user, count(*) as sessions from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) and notEmpty(user) group by common_subscriber_id order by sessions desc limit 0,100
--Q33.Report for Subscriber ID with Bandwidth
SELECT common_subscriber_id as user,SUM(coalesce(common_c2s_byte_num, 0)) AS sent_bytes,SUM(coalesce(common_s2c_byte_num, 0)) AS received_bytes,SUM(coalesce(common_c2s_byte_num, 0)+coalesce(common_s2c_byte_num, 0)) AS bytes FROM session_record WHERE common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start)) and notEmpty(user) GROUP BY user ORDER BY bytes DESC LIMIT 100
--Q34.Report Unique Endpoints
select uniq(common_client_ip) as "Client IP",uniq(common_server_ip) as "Server IP",uniq(common_internal_ip) as "Internal IP",uniq(common_external_ip) as "External IP",uniq(http_domain) as "Domain",uniq(ssl_sni) as "SNI" from session_record where common_recv_time>= toStartOfDay(toDateTime(@start))-604800 and common_recv_time< toStartOfDay(toDateTime(@start))
--Q35.TopN Optimizer
SELECT http_url AS url, SUM(common_sessions) AS sessions FROM session_record WHERE common_recv_time >= toDateTime(@start) AND common_recv_time < toDateTime(@end) AND notEmpty(http_url) GROUP BY http_url ORDER BY sessions DESC limit 10
--Q36.All Security Event Hits Trend by 5min B
select DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') as start_time, sum(hits) as hits from security_event_hits_log where __time >= @start and __time < @end group by DATE_FORMAT(FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300),'%Y-%m-%d %H:%i:%s') limit 10000
--Q37.Security Event Hit Timefirst and last time) B
select policy_id, DATE_FORMAT(min(__time) ,'%Y-%m-%d %H:%i:%s') as first_used, DATE_FORMAT(max(__time) ,'%Y-%m-%d %H:%i:%s') as last_used from security_event_hits_log where policy_id in (0) group by policy_id
--Q38.All Proxy Event Hits Trend by 5min B
select FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300) as start_time, sum(hits) as hits from proxy_event_hits_log where __time >= @start and __time < @end group by FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(__time)/300)*300) limit 10000
--Q39.Proxy Event Hit Timefirst and last time) B
select policy_id, DATE_FORMAT(min(__time) ,'%Y-%m-%d %H:%i:%s') as first_used, DATE_FORMAT(max(__time) ,'%Y-%m-%d %H:%i:%s') as last_used from proxy_event_hits_log where policy_id in (0) group by policy_id
--Q40.Traffic Composition Protocol Tree Trend
(SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, 'PT30S', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes from traffic_protocol_stat_log where __time >= TIMESTAMP @start AND __time < TIMESTAMP @end and protocol_id = 'ETHERNET' group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, 'PT30S', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id order by stat_time asc) union all (SELECT TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, 'PT30S', 'zero')), 'yyyy-MM-dd HH:mm:ss') as stat_time, protocol_id as type, sum(c2s_byte_num + s2c_byte_num) as bytes from traffic_protocol_stat_log where __time >= TIMESTAMP @start AND __time < TIMESTAMP @end and protocol_id like CONCAT('ETHERNET','.%') and LENGTH(protocol_id) = LENGTH(REPLACE(protocol_id,'.','')) + 1 + 0 group by TIME_FORMAT(MILLIS_TO_TIMESTAMP( 1000 * TIME_FLOOR_WITH_FILL(TIMESTAMP_TO_MILLIS(__time)/1000, 'PT30S', 'zero')), 'yyyy-MM-dd HH:mm:ss'), protocol_id order by stat_time asc)
--Q41.Traffic Metrics Security Action Hits Trend
select FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1800S','zero')) as statisticTime, sum(default_in_bytes + default_out_bytes) as default_bytes, sum(default_in_packets + default_out_packets) as default_packets, sum(default_conn_num) as default_sessions, sum(allow_in_bytes + allow_out_bytes) as allow_bytes, sum(allow_in_packets + allow_out_packets) as allow_packets, sum(allow_conn_num) as allow_sessions, sum(deny_in_bytes + deny_out_bytes) as deny_bytes, sum(deny_in_packets + deny_out_packets) as deny_packets, sum(deny_conn_num) as deny_sessions, sum(monitor_in_bytes + monitor_out_bytes) as monitor_bytes, sum(monitor_in_packets + monitor_out_packets) as monitor_packets, sum(monitor_conn_num) as monitor_sessions, sum(intercept_in_bytes + intercept_out_bytes) as intercept_bytes, sum(intercept_in_packets + intercept_out_packets) as intercept_packets, sum(intercept_conn_num) as intercept_sessions from traffic_metrics_log where __time >= @start and __time < @end group by FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1800S','zero')) limit 100000
--Q42.Traffic Metrics Proxy Action Hits Trend
SELECT FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1800S','zero')) AS statisticTime,SUM(intcp_allow_num) AS intercept_allow_conn_num,SUM(intcp_mon_num) AS intercept_monitor_conn_num,SUM(intcp_deny_num) AS intercept_deny_conn_num,SUM(intcp_rdirt_num) AS intercept_redirect_conn_num,SUM(intcp_repl_num) AS intercept_replace_conn_num,SUM(intcp_hijk_num) AS intercept_hijack_conn_num,SUM(intcp_ins_num) AS intercept_insert_conn_num FROM traffic_metrics_log WHERE __time >= @start AND __time < @end GROUP BY FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time), 'PT1800S', 'zero')) LIMIT 100000
--Q43.Traffic Statistics(Metrics02)
select FROM_UNIXTIME(stat_time) as max_active_date_by_sessions, total_live_sessions as max_live_sessions from ( select stat_time, sum(live_sessions) as total_live_sessions from ( select TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time), 'P1D') as stat_time, device_id, avg(established_conn_num) as live_sessions from traffic_metrics_log where __time >= @start and __time<@end group by TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time), 'P1D'), device_id) group by stat_time order by total_live_sessions desc limit 1 )
--Q44.Traffic Summary(Bandwidth Trend)
select * from ( select DATE_FORMAT(FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1h','zero')),'%Y-%m-%d %H:%i:%s') as stat_time,'traffic_in_bytes' as type, sum(total_in_bytes) as bytes from traffic_metrics_log where __time >= @start and __time < @end group by DATE_FORMAT(FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1h','zero')),'%Y-%m-%d %H:%i:%s'), 'traffic_in_bytes' union all select DATE_FORMAT(FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1h','zero')),'%Y-%m-%d %H:%i:%s') as stat_time,'traffic_out_bytes' as type,sum(total_out_bytes) as bytes from traffic_metrics_log where __time >= @start and __time < @end group by DATE_FORMAT(FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1h','zero')),'%Y-%m-%d %H:%i:%s'),'traffic_out_bytes' ) order by stat_time asc limit 100000
--Q45.Traffic Summary(Sessions Trend)
select DATE_FORMAT(FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1h','zero')),'%Y-%m-%d %H:%i:%s') as stat_time, 'total_conn_num' as type, sum(new_conn_num) as sessions from traffic_metrics_log where __time >= @start and __time < @end group by DATE_FORMAT(FROM_UNIXTIME(TIME_FLOOR_WITH_FILL(UNIX_TIMESTAMP(__time),'PT1h','zero')),'%Y-%m-%d %H:%i:%s'), 'total_conn_num' order by stat_time asc limit 10000
--Q46.Domain Baidu.com Metrics
select FROM_UNIXTIME(min(common_recv_time)) as "First Seen" , FROM_UNIXTIME(max(common_recv_time)) as "Last Seen" , median(http_response_latency_ms) as "Server Processing Time Median(ms)", count(1) as Responses,any(common_server_location) as Location from session_record WHERE common_recv_time >= toDateTime(@start) AND common_recv_time < toDateTime(@end) AND http_domain='baidu.com'
--Q47.TIME_FLOOR_WITH_FILL 01
select "Device Group" as "Device Group" ,"Data Center" as "Data Center" ,FROM_UNIXTIME("End Time") as "End Time" , sum("counter") as "counter" from (select common_device_group as "Device Group" ,common_data_center as "Data Center" ,TIME_FLOOR_WITH_FILL (common_end_time,'PT1H','zero') as "End Time" ,count(common_log_id) as "counter" from session_record where common_recv_time >= toDateTime(@start) and common_recv_time< toDateTime(@end) group by "Device Group","Data Center","End Time") group by "Device Group" ,"Data Center" ,"End Time" order by "End Time" asc limit 5
--Q48.TIME_FLOOR_WITH_FILL 02
select FROM_UNIXTIME("End Time") as "End Time" , sum("counter") as "counter" from (select common_device_group as "Device Group" ,common_data_center as "Data Center" ,TIME_FLOOR_WITH_FILL (common_end_time,'PT1H','zero') as "End Time" ,count(common_log_id) as "counter" ,count(http_domain) as "HTTP.Domain" from security_event where ((common_recv_time >= toDateTime('2021-10-19 00:00:00') and common_recv_time < toDateTime('2021-10-20 00:00:00')) ) AND ( ( common_action = 2 ) ) group by "Device Group","Data Center","End Time") group by "End Time" order by "End Time" asc
--Q49.CONVERT_TZ (Druid) 01
SELECT CONVERT_TZ('2019-09-09 09:09:09','GMT','MET') as test_time from proxy_event_hits_log limit 1
--Q50.CONVERT_TZ (Druid) 02
SELECT CONVERT_TZ('2019-09-09 09:09:09','Europe/London','America/New_York') as test_time from proxy_event_hits_log limit 1
--Q51.CONVERT_TZ (Druid) 03
SELECT CONVERT_TZ(now(),'GMT','America/New_York') as test_time from proxy_event_hits_log limit 1
--Q53.CONVERT_TZ (clickhouse) 01
SELECT CONVERT_TZ('2019-09-09 09:09:09','GMT','MET') as test_time from session_record limit 1
--Q54.CONVERT_TZ (clickhouse) 02
SELECT CONVERT_TZ('2019-09-09 09:09:09','Europe/London','America/New_York') as test_time from session_record limit 1
--Q55.CONVERT_TZ (clickhouse) 03
SELECT CONVERT_TZ(now(),'GMT','America/New_York') as test_time from session_record limit 1
--Q57.CONVERT_TZ (hbase) 01
SELECT CONVERT_TZ('2019-09-09 09:09:09','GMT','MET') as test_time from report_result limit 1
--Q58.CONVERT_TZ (hbase) 02
SELECT CONVERT_TZ('2019-09-09 09:09:09','Europe/London','America/New_York') as test_time from report_result limit 1
--Q59.CONVERT_TZ (hbase) 03
SELECT CONVERT_TZ(now(),'GMT','America/New_York') as test_time from report_result limit 1
--Q61.CONVERT_TZ (elasticsearch)
SELECT CONVERT_TZ('2019-09-09 09:09:09','Europe/London','America/New_York')as time from report_result limit 1

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,15 @@
{
"version": "1.0",
"name": "hbase-Raw",
"namespace": "tsg",
"filters": [
{
"name":"@start",
"value": "'2021-10-19 10:00:00'"
},
{
"name":"@end",
"value": "'2021-10-20 11:00:00'"
}
]
}

View File

@@ -0,0 +1,4 @@
--Q01.
SELECT last_update_time FROM relation_account_framedip WHERE last_update_time>=CAST(TO_TIMESTAMP (@start,'yyyy-MM-dd HH:mm:ss','Asia/Shanghai') AS UNSIGNED_LONG) AND last_update_time<CAST(TO_TIMESTAMP (@end,'yyyy-MM-dd HH:mm:ss','Asia/Shanghai') AS UNSIGNED_LONG) LIMIT 30
--Q02. KV查询
select * from relation_account_framedip where ROWKEY = '0a771a381088e7d72ded13e998c06cbe' limit 1

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,89 @@
{
"metadata": [
{
"namespace": "tsg_galaxy_v3",
"group": "CLICKHOUSE_GROUP",
"tables": [
"radius_onff_log",
"session_record",
"session_record_common_client_ip",
"session_record_common_server_ip",
"session_record_http_domain",
"interim_session_record",
"transaction_record",
"radius_record",
"voip_record",
"gtpc_record",
"security_event",
"proxy_event",
"dos_event",
"active_defence_event",
"sys_packet_capture_event"
]
},
{
"namespace": "elasticsearch",
"group": "ELASTICSEARCH_GROUP",
"tables": [
]
},
{
"namespace": "system",
"group": "CLICKHOUSE_GROUP",
"tables": [
"query_log_cluster",
"tables_cluster",
"columns_cluster",
"disks_cluster",
"parts_cluster",
"processes",
"query_log"
]
},
{
"namespace": "druid",
"group": "DRUID_GROUP",
"tables": [
"top_internal_host_log",
"top_website_domain_log",
"proxy_event_hits_log",
"sys_storage_log",
"security_event_hits_log",
"traffic_protocol_stat_log",
"top_server_ip_log",
"traffic_summary_log",
"traffic_metrics_log",
"top_user_log",
"top_urls_log",
"top_client_ip_log",
"top_external_host_log",
"traffic_app_stat_log",
"traffic_top_destination_ip_metrics_log"
]
},
{
"namespace": "etl",
"group": "ETL_GROUP",
"tables": [
"liveChart_interim",
"liveChart_session"
]
},
{
"namespace":"tsg",
"group":"HBASE_GROUP",
"tables":[
"report_result"
]
},
{
"namespace": "tsg_galaxy",
"group": "HBASE_GROUP",
"tables": [
"relation_account_framedip",
"job_result",
"recommendation_app_cip"
]
}
]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,7 @@
{
"type": "record",
"name": "recommendation_app_cip",
"namespace": "tsg_galaxy",
"fields": [
]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff