k8s初始版本storm

This commit is contained in:
lixikang
2020-05-25 16:48:37 +08:00
parent 0d1396f415
commit fc431fa3bd
53 changed files with 2660 additions and 0 deletions

74
storm/docker-compose.yml Normal file
View File

@@ -0,0 +1,74 @@
version: "3"
services:
#设置容器名称和docker-compose中的名称(文件夹名称_当前名称_数字),如果下面配置了container_name,以container_name中的值为准
nimbus:
#依赖的镜像
{% if offline_install %}
image: {{ image_name }}:{{ image_tag_name }}
{% else %}
image: {{ docker_registry_image_and_tag }}
{% endif %}
container_name: {{ nimbus_container_name }}
command: /opt/apache-storm-1.0.2/start_storm.sh storm nimbus
restart: always
ports:
- 6627:6627
environment:
ZK_IPARR: {{ machine_host }}
NIMBUS_IP: {{ machine_host }}
ZK_PORTS: {{ zookeeper_port }}
SPORTS: 30
WORKER_MEM: 2048
volumes:
- "{{ volume_path }}/tsg3.0-volumes/storm/nimbus/data:/opt/storm"
- "{{ volume_path }}/tsg3.0-volumes/storm/nimbus/logs:/opt/apache-storm-1.0.2/logs"
- "/etc/localtime:/etc/localtime:ro"
- "/etc/timezone:/etc/timezone:ro"
network_mode: "host"
supervisor:
#依赖的镜像
{% if offline_install %}
image: {{ image_name }}:{{ image_tag_name }}
{% else %}
image: {{ docker_registry_image_and_tag }}
{% endif %}
container_name: {{ supervisor_container_name }}
command: /opt/apache-storm-1.0.2/start_storm.sh storm supervisor
environment:
ZK_IPARR: {{ machine_host }}
NIMBUS_IP: {{ machine_host }}
ZK_PORTS: {{ zookeeper_port }}
SPORTS: 30
WORKER_MEM: 2048
depends_on:
- nimbus
restart: always
volumes:
- "{{ volume_path }}/tsg3.0-volumes/storm/supervisor/data:/opt/storm"
- "{{ volume_path }}/tsg3.0-volumes/storm/supervisor/logs:/opt/apache-storm-1.0.2/logs"
- "{{ volume_path }}/tsg3.0-volumes/topologylogs:/opt/topologylogs"
- "/etc/localtime:/etc/localtime:ro"
- "/etc/timezone:/etc/timezone:ro"
network_mode: "host"
ui:
#依赖的镜像
{% if offline_install %}
image: {{ image_name }}:{{ image_tag_name }}
{% else %}
image: {{ docker_registry_image_and_tag }}
{% endif %}
container_name: {{ ui_container_name }}
command: /opt/apache-storm-1.0.2/start_storm.sh storm ui
ports:
- 8080:8080
environment:
ZK_IPARR: {{ machine_host }}
NIMBUS_IP: {{ machine_host }}
ZK_PORTS: {{ zookeeper_port }}
SPORTS: 30
WORKER_MEM: 2048
depends_on:
- nimbus
network_mode: "host"

14
storm/nimbus-svc.yaml Normal file
View File

@@ -0,0 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: nimbus-svc
labels:
app: nimbus
spec:
ports:
- port: 6627
targetPort: 6627
name: server
protocol: TCP
nodePort: 6627
type: NodePort

121
storm/nimbus.yaml Normal file
View File

@@ -0,0 +1,121 @@
apiVersion: v1
kind: Service
metadata:
name: nimbus-svc
labels:
app: nimbus
spec:
ports:
- port: 6627
targetPort: 6627
name: server
protocol: TCP
nodePort: 6627
type: NodePort
selector:
app: nimbus
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nimbus
spec:
serviceName: nimbus-svc
replicas: 1
selector:
matchLabels:
app: nimbus
template:
metadata:
labels:
app: nimbus
spec:
hostAliases:
- ip: "192.168.40.127"
hostnames:
- "bigdata-127"
- ip: "192.168.40.151"
hostnames:
- "bigdata-151"
- ip: "192.168.40.152"
hostnames:
- "bigdata-152"
- ip: "192.168.40.203"
hostnames:
- "slave1"
#限制一台服务器上启动太多的服务,因端口占用或者资源问题导致出错
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- nimbus
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- nimbus
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
#容器的配置
containers:
- name: k8s-nimbus
#0表示以root权限运行容器
securityContext:
runAsUser: 0
imagePullPolicy: Always
image: 192.168.40.153:9080/tsg/storm:1.0.2
resources:
requests:
memory: "10Gi"
cpu: 500m
#标注映射的端口
ports:
- containerPort: 6627
name: server
#创建容器后执行的命令
command:
- sh
- -c
- "/opt/apache-storm-1.0.2/start_storm.sh storm nimbus"
#环境变量
env:
- name: ZK_PORTS
value: "2182"
- name: NIMBUS_IP
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ZK_IPARR
value: "zk-0.zk-hs.default.svc.cluster.local"
- name: SPORTS
value: "30"
- name: WORKER_MEM
value: "2048"
#挂载目录
volumeMounts:
- name: datadir
mountPath: /opt/test
securityContext:
runAsUser: 1000
fsGroup: 1000
#声明pvc
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: nfs
resources:
requests:
storage: 5Gi

89
storm/pv.yaml Normal file
View File

@@ -0,0 +1,89 @@
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv7
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage7
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv8
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage8
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv9
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage9
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv10
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage10
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv11
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage11
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv12
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage12
server: 192.168.40.127

16
storm/storm-ui-svc.yaml Normal file
View File

@@ -0,0 +1,16 @@
apiVersion: v1
kind: Service
metadata:
name: storm-ui-svc
labels:
app: storm-ui
spec:
ports:
- port: 8080
targetPort: 8080
name: client
protocol: TCP
nodePort: 8080
type: NodePort
selector:
app: storm-ui

107
storm/storm-ui.yaml Normal file
View File

@@ -0,0 +1,107 @@
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: storm-ui
labels:
app: storm-ui
spec:
serviceName: storm-ui-svc
replicas: 1
selector:
matchLabels:
app: storm-ui
template:
metadata:
labels:
app: storm-ui
spec:
hostAliases:
- ip: "192.168.40.127"
hostnames:
- "bigdata-127"
- ip: "192.168.40.151"
hostnames:
- "bigdata-151"
- ip: "192.168.40.152"
hostnames:
- "bigdata-152"
#限制一台服务器上启动太多的服务,因端口占用或者资源问题导致出错
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- storm-ui
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- storm-ui
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
#容器的配置
containers:
- name: k8s-storm-ui
#0表示以root权限运行容器
securityContext:
runAsUser: 0
imagePullPolicy: Always
image: 192.168.40.153:9080/tsg/storm:1.0.2
resources:
requests:
memory: "10Gi"
cpu: 500m
#标注映射的端口
ports:
- containerPort: 8080
name: client
#创建容器后执行的命令
command:
- sh
- -c
- "/opt/apache-storm-1.0.2/start_storm.sh storm ui"
#环境变量
env:
- name: ZK_PORTS
value: "2182"
- name: NIMBUS_IP
value: "nimbus-0.nimbus-svc.default.svc.cluster.local"
# - name: NIMBUS_IP
# valueFrom:
# fieldRef:
# fieldPath: spec.nodeName
- name: ZK_IPARR
value: "zk-0.zk-hs.default.svc.cluster.local"
- name: SPORTS
value: "30"
- name: WORKER_MEM
value: "2048"
#挂载目录
# volumeMounts:
# - name: datadir
# mountPath: /opt/apache-storm-1.0.2/logs
# securityContext:
# runAsUser: 1000
# fsGroup: 1000
#声明pvc
# volumeClaimTemplates:
# - metadata:
# name: datadir
# spec:
# accessModes: [ "ReadWriteMany" ]
# storageClassName: nfs
# resources:
# requests:
# storage: 5Gi

13
storm/supervisor-svc.yaml Normal file
View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: supervisor-svc
labels:
app: supervisor
spec:
ports:
- port: 9097
name: server
clusterIP: None
selector:
app: supervisor

109
storm/supervisor.yaml Normal file
View File

@@ -0,0 +1,109 @@
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: supervisor
labels:
app: supervisor
spec:
serviceName: supervisor-svc
replicas: 3
selector:
matchLabels:
app: supervisor
template:
metadata:
labels:
app: supervisor
spec:
hostAliases:
- ip: "192.168.40.127"
hostnames:
- "bigdata-127"
- ip: "192.168.40.151"
hostnames:
- "bigdata-151"
- ip: "192.168.40.152"
hostnames:
- "bigdata-152"
- ip: "192.168.40.203"
hostnames:
- "slave1"
#限制一台服务器上启动太多的服务,因端口占用或者资源问题导致出错
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- supervisor
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- supervisor
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
#容器的配置
containers:
- name: k8s-supervisor
#0表示以root权限运行容器
securityContext:
runAsUser: 0
imagePullPolicy: Always
image: 192.168.40.153:9080/tsg/storm:1.0.2
resources:
requests:
memory: "10Gi"
cpu: 500m
#标注映射的端口
ports:
- containerPort: 9097
name: server
#创建容器后执行的命令
command:
- sh
- -c
- "/opt/apache-storm-1.0.2/start_storm.sh storm supervisor"
#环境变量
env:
- name: ZK_PORTS
value: "2182"
- name: NIMBUS_IP
value: "nimbus-0.nimbus-svc.default.svc.cluster.local"
# - name: NIMBUS_IP
# valueFrom:
# fieldRef:
# fieldPath: spec.nodeName
- name: ZK_IPARR
value: "zk-0.zk-hs.default.svc.cluster.local"
- name: SPORTS
value: "30"
- name: WORKER_MEM
value: "2048"
#挂载目录
volumeMounts:
- name: datadir
mountPath: /opt/apache-storm-1.0.2/logs
securityContext:
runAsUser: 1000
fsGroup: 1000
#声明pvc
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: nfs
resources:
requests:
storage: 5Gi

View File

@@ -0,0 +1,33 @@
#管理kafka地址
bootstrap.servers=192.168.40.153:9092
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#hbase zookeeper地址
hbase.zookeeper.servers=192.168.40.153:2182
#hbase table name
hbase.table.name=subscriber_info
#tick时钟频率
topology.tick.tuple.freq.secs=50
topology.config.max.spout.pending=500000
topology.num.acks=0
#kafka broker下的topic名称
kafka.topic=RADIUS-RECORD-LOG
#kafka消费group id
group.id=account-to-hbase-a
#storm topology workers
topology.workers=1
#storm spout parallelism
spout.parallelism=3
#storm bolt parallelism
format.bolt.parallelism=3

View File

@@ -0,0 +1,33 @@
#管理kafka地址
bootstrap.servers=ipaddress:9092
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#hbase zookeeper地址
hbase.zookeeper.servers=ipaddress:2182
#hbase table name
hbase.table.name=subscriber_info
#tick时钟频率
topology.tick.tuple.freq.secs=50
topology.config.max.spout.pending=500000
topology.num.acks=0
#kafka broker下的topic名称
kafka.topic=RADIUS-RECORD-LOG
#kafka消费group id
group.id=account-to-hbase-a
#storm topology workers
topology.workers=1
#storm spout parallelism
spout.parallelism=3
#storm bolt parallelism
format.bolt.parallelism=3

View File

@@ -0,0 +1,34 @@
#! /bin/bash
#启动storm任务脚本
#任务jar所在目录
BASE_DIR=$2
#jar name
JAR_NAME='log-address-hbase.jar'
#nimbus ip
LOCAL_IP=$3
cd $BASE_DIR
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jar -xvf $BASE_DIR/$JAR_NAME address_routine.properties
cat $1$file > $BASE_DIR/address_routine.properties
sed -i 's/ipaddress/'$LOCAL_IP'/' $BASE_DIR/address_routine.properties
jar -uvf $BASE_DIR/$JAR_NAME address_routine.properties
docker run -it --rm -v $BASE_DIR/$JAR_NAME:/$JAR_NAME --env ZK_IPARR=$LOCAL_IP --env NIMBUS_IP=$LOCAL_IP --env ZK_PORTS=2181 --net host storm:1.0.2 /opt/apache-storm-1.0.2/start_storm.sh storm jar /$JAR_NAME cn.ac.iie.topology.LogAddressRedisTopology ACCOUNT-HBASE remote
fi
done
}
#读取第一个参数 为配置文件目录名称
if [ $# != 3 ];then
echo "usage: ./start.sh [Configuration path] [Path of jar] [nimbus ip]"
exit 1
fi
read_dir $1

View File

@@ -0,0 +1,3 @@
#!/bin/bash
docker exec nimbus kill ACCOUNT-HBASE -w 1

View File

@@ -0,0 +1,75 @@
#管理kafka地址
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=ipaddress:9092
#zookeeper 地址
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
zookeeper.servers=ipaddress:2181
#hbase zookeeper地址
#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=ipaddress:2182
#hbase tablename
hbase.table.name=subcriber_info
#latest/earliest
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=connection-log-191122
#输出topic
results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度
kafka.bolt.parallelism=1
#定位库地址
ip.library=/dat/
#kafka批量条数
batch.insert.num=2000
#数据中心UID
data.center.id.num=12
#tick时钟频率
topology.tick.tuple.freq.secs=5
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
#当bolt性能受限时限制spout接收速度理论看ack开启才有效
topology.config.max.spout.pending=150000
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
#用于过滤对准用户名
check.ip.scope=10,100,192
#允许发送kafka最大失败数
max.failure.num=20
#influx地址
influx.ip=http://192.168.40.151:8086
#influx用户名
influx.username=admin
#influx密码
influx.password=admin

View File

@@ -0,0 +1,3 @@
#!/bin/bash
JAR_NAME='log-stream-completion.jar'
storm jar $JAR_NAME cn.ac.iie.topology.LogFlowWriteTopology $1 remote

View File

@@ -0,0 +1,34 @@
#! /bin/bash
#启动storm任务脚本
#任务jar所在目录
BASE_DIR=$2
#jar name
JAR_NAME='log-stream-completion.jar'
#nimbus ip
LOCAL_IP=$3
cd $BASE_DIR
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jar -xvf $BASE_DIR/$JAR_NAME service_flow_config.properties
cat $1$file > $BASE_DIR/service_flow_config.properties
sed -i 's/ipaddress/'$LOCAL_IP'/' $BASE_DIR/service_flow_config.properties
jar -uvf $BASE_DIR/$JAR_NAME service_flow_config.properties
kubectl exec -c k8s-nimbus nimbus-0 -n default -- storm jar /opt/test/topo/storm_topology/completion/alone/$JAR_NAME cn.ac.iie.topology.LogFlowWriteTopology TEST1 remote
fi
done
}
if [ $# != 3 ];then
echo "usage: ./startall.sh [Configuration path] [Path of jar] [nimbus ip]"
exit 1
fi
#读取第一个参数 为配置文件目录名称
read_dir $1

View File

@@ -0,0 +1,75 @@
#管理kafka地址
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=ipaddress:9092
#zookeeper 地址
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
zookeeper.servers=ipaddress:2181
#hbase zookeeper地址
#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=ipaddress:2182
#hbase tablename
hbase.table.name=subcriber_info
#latest/earliest
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=connection-log-191122
#输出topic
results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度
kafka.bolt.parallelism=1
#定位库地址
ip.library=/dat/
#kafka批量条数
batch.insert.num=2000
#数据中心UID
data.center.id.num=12
#tick时钟频率
topology.tick.tuple.freq.secs=5
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
#当bolt性能受限时限制spout接收速度理论看ack开启才有效
topology.config.max.spout.pending=150000
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
#用于过滤对准用户名
check.ip.scope=10,100,192
#允许发送kafka最大失败数
max.failure.num=20
#influx地址
influx.ip=http://192.168.40.151:8086
#influx用户名
influx.username=admin
#influx密码
influx.password=admin

View File

@@ -0,0 +1,75 @@
#管理kafka地址
##bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=ipaddress:9092
##zookeeper 地址
##zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
zookeeper.servers=ipaddress:2181
##hbase zookeeper地址
##hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=ipaddress:2182
#hbase tablename
hbase.table.name=subcriber_info
#latest/earliest
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=PROXY-EVENT-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=proxy-event-191122
#输出topic
results.output.topic=PROXY-EVENT-COMPLETED-LOG
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度
kafka.bolt.parallelism=1
#定位库地址
ip.library=/dat/
#kafka批量条数
batch.insert.num=2000
#数据中心UID
data.center.id.num=14
#tick时钟频率
topology.tick.tuple.freq.secs=5
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
#当bolt性能受限时限制spout接收速度理论看ack开启才有效
topology.config.max.spout.pending=150000
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
#用于过滤对准用户名
check.ip.scope=10,100,192
#允许发送kafka最大失败数
max.failure.num=20
#influx地址
influx.ip=http://192.168.40.151:8086
#influx用户名
influx.username=admin
#influx密码
influx.password=admin

View File

@@ -0,0 +1,75 @@
#管理kafka地址
##bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=ipaddress:9092
##zookeeper 地址
##zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
zookeeper.servers=ipaddress:2181
##hbase zookeeper地址
##hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=ipaddress:2182
#hbase tablename
hbase.table.name=subcriber_info
#latest/earliest
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=RADIUS-RECORD-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=radius-record-191122
#输出topic
results.output.topic=RADIUS-RECORD-COMPLETED-LOG
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度
kafka.bolt.parallelism=1
#定位库地址
ip.library=/dat/
#kafka批量条数
batch.insert.num=2000
#数据中心UID
data.center.id.num=13
#tick时钟频率
topology.tick.tuple.freq.secs=5
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
#当bolt性能受限时限制spout接收速度理论看ack开启才有效
topology.config.max.spout.pending=150000
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
#用于过滤对准用户名
check.ip.scope=10,100,192
#允许发送kafka最大失败数
max.failure.num=20
#influx地址
influx.ip=http://192.168.40.151:8086
#influx用户名
influx.username=admin
#influx密码
influx.password=admin

View File

@@ -0,0 +1,75 @@
#管理kafka地址
##bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=ipaddress:9092
##zookeeper 地址
##zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
zookeeper.servers=ipaddress:2181
##hbase zookeeper地址
##hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=ipaddress:2182
#hbase tablename
hbase.table.name=subcriber_info
#latest/earliest
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=SECURITY-EVENT-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=security-policy-191122
#输出topic
results.output.topic=SECURITY-EVENT-COMPLETED-LOG
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度
kafka.bolt.parallelism=1
#定位库地址
ip.library=/dat/
#kafka批量条数
batch.insert.num=2000
#数据中心UID
data.center.id.num=15
#tick时钟频率
topology.tick.tuple.freq.secs=5
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
#当bolt性能受限时限制spout接收速度理论看ack开启才有效
topology.config.max.spout.pending=150000
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
#用于过滤对准用户名
check.ip.scope=10,100,192
#允许发送kafka最大失败数
max.failure.num=20
#influx地址
influx.ip=http://192.168.40.151:8086
#influx用户名
influx.username=admin
#influx密码
influx.password=admin

View File

@@ -0,0 +1,34 @@
#! /bin/bash
#启动storm任务脚本
#任务jar所在目录
BASE_DIR=$2
#jar name
JAR_NAME='log-stream-completion.jar'
#nimbus ip
LOCAL_IP=$3
cd $BASE_DIR
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jar -xvf $BASE_DIR/$JAR_NAME service_flow_config.properties
cat $1$file > $BASE_DIR/service_flow_config.properties
sed -i 's/ipaddress/'$LOCAL_IP'/' $BASE_DIR/service_flow_config.properties
jar -uvf $BASE_DIR/$JAR_NAME service_flow_config.properties
docker run -it --rm -v $BASE_DIR/$JAR_NAME:/$JAR_NAME --env ZK_IPARR=$LOCAL_IP --env NIMBUS_IP=$LOCAL_IP --env ZK_PORTS=2182 --net host storm:1.0.2 /opt/apache-storm-1.0.2/start_storm.sh storm jar /$JAR_NAME cn.ac.iie.topology.LogFlowWriteTopology $file remote
fi
done
}
if [ $# != 3 ];then
echo "usage: ./startall.sh [Configuration path] [Path of jar] [nimbus ip]"
exit 1
fi
#读取第一个参数 为配置文件目录名称
read_dir $1

View File

@@ -0,0 +1,17 @@
#! /bin/bash
#storm任务停止脚本
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
docker exec nimbus storm kill $file -w 1
# /home/bigdata/apache-storm-1.0.2/bin/storm kill $file -w 1
echo $file #在此处处理文件即可
fi
done
}
#读取第一个参数 为配置文件目录名
read_dir $1

View File

@@ -0,0 +1,36 @@
#管理kafka地址
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=RADIUS-RECORD-LOG
#kafka消费group id
group.id=radius-account-knowledge-191224
#输出kafka server
results.output.servers=ipaddress:9092
#输出topic
results.output.topic=RADIUS-ONFF-LOG
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#storm topology workers
topology.workers=1
#storm spout parallelism
spout.parallelism=3
#storm bolt parallelism
format.bolt.parallelism=3
#tick时钟频率
topology.tick.tuple.freq.secs=5
topology.config.max.spout.pending=500000
topology.num.acks=0
#kafka批量条数
batch.insert.num=2000

View File

@@ -0,0 +1,36 @@
#管理kafka地址
bootstrap.servers=192.168.40.153:9092
#kafka broker下的topic名称
kafka.topic=RADIUS-RECORD-LOG
#kafka消费group id
group.id=radius-account-knowledge-191224
#输出kafka server
results.output.servers=192.168.40.153:9092
#输出topic
results.output.topic=RADIUS-ONFF-LOG
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#storm topology workers
topology.workers=1
#storm spout parallelism
spout.parallelism=3
#storm bolt parallelism
format.bolt.parallelism=3
#tick时钟频率
topology.tick.tuple.freq.secs=5
topology.config.max.spout.pending=500000
topology.num.acks=0
#kafka批量条数
batch.insert.num=2000

View File

@@ -0,0 +1,34 @@
#! /bin/bash
#启动storm任务脚本
#任务jar所在目录
BASE_DIR=$2
#jar name
JAR_NAME='radius-account-knowledge.jar'
#nimbus ip
LOCAL_IP=$3
cd $BASE_DIR
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jar -xvf $BASE_DIR/$JAR_NAME knowledge_config.properties
cat $1$file > $BASE_DIR/knowledge_config.properties
sed -i 's/ipaddress/'$LOCAL_IP'/' $BASE_DIR/knowledge_config.properties
jar -uvf $BASE_DIR/$JAR_NAME knowledge_config.properties
docker run -it --rm -v $BASE_DIR/$JAR_NAME:/$JAR_NAME --env ZK_IPARR=$LOCAL_IP --env NIMBUS_IP=$LOCAL_IP --env ZK_PORTS=2181 --net host storm:1.0.2 /opt/apache-storm-1.0.2/start_storm.sh storm jar /$JAR_NAME cn.ac.iie.topology.RadiusLogClearTopology $file remote
fi
done
}
#读取第一个参数 为配置文件目录名称
if [ $# != 3 ];then
echo "usage: ./start.sh [Configuration path] [Path of jar] [nimbus ip]"
exit 1
fi
read_dir $1

View File

@@ -0,0 +1,3 @@
#!/bin/bash
docker exec nimbus kill RADIUS-KNOWLEDGE -w 1

View File

@@ -0,0 +1,67 @@
#kafka
bootstrap.servers=192.168.40.151:9092
#kafka broker下的topic名称
kafka.topic=SESSION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=website-top-program
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
results.output.servers=192.168.40.151:9092
#输出Topic
results.output.topics=TOP-WEBSITE-DOMAIN-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:Urls HTTP/HTTPS 5:User 活跃用户
pattern.num=3
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=3
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=6
#storm bolt InternalCountBolt parallelism_hint
topology.bolt.count.parallelism=6
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=6

Binary file not shown.

View File

@@ -0,0 +1,35 @@
#! /bin/bash
#启动storm任务脚本
#任务jar所在目录
BASE_DIR=$2
#jar name
JAR_NAME='log-stream-topn.jar'
#nimbus ip
LOCAL_IP=$3
cd $BASE_DIR
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
jar -xvf $BASE_DIR/$JAR_NAME kafka_topic.properties
cat $1$file > $BASE_DIR/kafka_topic.properties
sed -i 's/ipaddress/'$LOCAL_IP'/' $BASE_DIR/kafka_topic.properties
jar -uvf $BASE_DIR/$JAR_NAME kafka_topic.properties
docker run -it --rm -v $BASE_DIR/$JAR_NAME:/$JAR_NAME --env ZK_IPARR=$LOCAL_IP --env NIMBUS_IP=$LOCAL_IP --env ZK_PORTS=2181 --net host storm:1.0.2 /opt/apache-storm-1.0.2/start_storm.sh storm jar /$JAR_NAME cn.ac.iie.topology.LogTopCountTopology $file remote
fi
done
}
if [ $# != 3 ];then
echo "usage: ./startall.sh [Configuration path] [Path of jar] [nimbus ip]"
exit 1
fi
#读取第一个参数 为配置文件目录名称
read_dir $1

View File

@@ -0,0 +1,16 @@
#! /bin/bash
#storm任务停止脚本
function read_dir(){
for file in `ls $1` #注意此处这是两个反引号,表示运行系统命令
do
if [ -d $1"/"$file ] #注意此处之间一定要加上空格,否则会报错
then
read_dir $1"/"$file
else
docker exec nimbus storm kill $file -w 1
echo $1"/"$file #在此处处理文件即可
fi
done
}
#读取第一个参数 为配置文件目录名
read_dir $1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
group.id=external-bytes-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-EXTERNAL-HOST-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=2
# bytes,packets,sessions
dimension.type=bytes
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
group.id=external-packets-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-EXTERNAL-HOST-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=2
# bytes,packets,sessions
dimension.type=packets
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
group.id=external-sessions-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-EXTERNAL-HOST-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=2
# bytes,packets,sessions
dimension.type=sessions
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
group.id=internal-bytes-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-INTERNAL-HOST-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=1
# bytes,packets,sessions
dimension.type=bytes
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
group.id=internal-packets-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-INTERNAL-HOST-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=1
# bytes,packets,sessions
dimension.type=packets
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-LOG
#kafka消费group id
group.id=internal-sessions-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-INTERNAL-HOST-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=1
# bytes,packets,sessions
dimension.type=sessions
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=user-bytes-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-USER-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=4
# bytespackets, sessions
dimension.type=bytes
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=user-packets-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-USER-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=4
# bytes,packets,sessions
dimension.type=packets
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=user-sessions-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-USER-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=4
# bytes,packets,sessions
dimension.type=sessions
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=website-bytes-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-WEBSITE-DOMAIN-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=3
# bytes,packets,sessions
dimension.type=bytes
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=website-packets-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-WEBSITE-DOMAIN-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=3
# bytes,packets,sessions
dimension.type=packets
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

View File

@@ -0,0 +1,72 @@
#管理kafka地址
paddress/
bootstrap.servers=ipaddress:9092
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#kafka消费group id
group.id=website-sessions-top-191216
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#auto.offset.reset=earliest
#输出Kafka地址
paddress/
results.output.servers=ipaddress:9092
#输出Topic
results.output.topics=TOP-WEBSITE-DOMAIN-LOG
#topology pending
topology.config.max.spout.pending=150000
#topology ack
topology.num.acks=1
#允许发送kafka最大失败数
max.failure.num=20
#定位库地址
ip.library=/dat/
#1:Internal 内部主机 2:External 外部主机
#3:Website 域名 4:User 活跃用户
pattern.num=3
# bytes,packets,sessions
dimension.type=sessions
#storm topology workers
topology.workers=1
#storm topology rolling count window length in seconds
topology.top.window.length.secs=300
#storm topology rolling count emit Frequency in seconds
topology.top.emit.frequency.secs=60
#storm topology intermediate rank emit Frequency in seconds
topology.intermediate.emit.frequency.secs=70
#storm topology total rank emit Frequency in seconds
topology.taotal.emit.frequency.secs=300
#storm topology intermediate top N
topology.top.intermediate.n = 200
#storm topology total top N
topology.top.total.n = 20
#storm topology spout parallelism_hint一般与kafka 分区数量1:1关系
topology.spout.parallelism=1
#storm bolt InternalBolt parallelism_hint
topology.bolt.check.parallelism=1
#storm bolt InternalByteCountBolt parallelism_hint
topology.bolt.count.parallelism=1
#storm bolt IntermediateRankingsBolt parallelism_hint
topology.bolt.interRanker.parallelism=1

5
zk-kafka/check.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/bin/bash
for i in 0 1 2 ;do
kubectl exec zk-$i -c kubernetes-zookeeper zkServer.sh status
done;

13
zk-kafka/host.sh Executable file
View File

@@ -0,0 +1,13 @@
#!/bin/bash
HOSTNAME=`hostname -s`
if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
ORD=${BASH_REMATCH[2]}
PORT=$((ORD + 9092))
#12.345.67.8 是 LB 的 ip
export KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://192.168.40.127:$PORT"
else
echo "Failed to get index from hostname $HOST"
exit 1
fi
echo $KAFKA_CFG_ADVERTISED_LISTENERS

View File

@@ -0,0 +1,98 @@
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
spec:
ports:
- port: 9093
targetPort: 9093
name: server
protocol: TCP
nodePort: 9093
type: NodePort
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
hostAliases:
- ip: "192.168.40.127"
hostnames:
- "bigdata-127"
- ip: "192.168.40.151"
hostnames:
- "bigdata-151"
- ip: "192.168.40.152"
hostnames:
- "bigdata-152"
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
containers:
- name: k8skafka
securityContext:
runAsUser: 0
imagePullPolicy: Always
image: 192.168.40.153:9080/k8s/kafka:test3
resources:
requests:
memory: "1Gi"
cpu: 500m
ports:
- containerPort: 9093
hostPort: 9093
env:
- name: KA_PORT
value: "9093"
- name: HOST_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ZK_DIR
value: "zk-0.zk-hs.default.svc.cluster.local:2182/kafka-test"
volumeMounts:
- name: datadir
mountPath: /opt/kafka-logs
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: nfs
resources:
requests:
storage: 5Gi

13
zk-kafka/kafka-svc.yaml Normal file
View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka
spec:
ports:
- port: 9093
name: server
clusterIP: None
selector:
app: kafka

85
zk-kafka/kafka.yaml Normal file
View File

@@ -0,0 +1,85 @@
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-svc
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
hostAliases:
- ip: "192.168.40.127"
hostnames:
- "bigdata-127"
- ip: "192.168.40.151"
hostnames:
- "bigdata-151"
- ip: "192.168.40.152"
hostnames:
- "bigdata-152"
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
containers:
- name: k8skafka
securityContext:
runAsUser: 0
imagePullPolicy: Always
image: 192.168.40.153:9080/k8s/kafka:test3
resources:
requests:
memory: "10Gi"
cpu: 500m
ports:
- containerPort: 9093
hostPort: 9093
env:
- name: KA_PORT
value: "9093"
- name: HOST_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ZK_DIR
value: "zk-0.zk-hs.default.svc.cluster.local:2182/kafka-test"
volumeMounts:
- name: datadir
mountPath: /opt/kafka-logs
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: nfs
resources:
requests:
storage: 5Gi

89
zk-kafka/pv.yaml Normal file
View File

@@ -0,0 +1,89 @@
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv1
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage1
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv2
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage2
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv3
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage3
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv4
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage4
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv5
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage5
server: 192.168.40.127
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfspv6
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nfs
nfs:
path: /nfs/storage6
server: 192.168.40.127

157
zk-kafka/zk.yaml Normal file
View File

@@ -0,0 +1,157 @@
apiVersion: v1
kind: Service
metadata:
name: zk-hs
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
labels:
app: zk
spec:
type: NodePort
ports:
- port: 2182
name: client
targetPort: 2182
nodePort: 2182
selector:
app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: zk-pdb
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zk
spec:
selector:
matchLabels:
app: zk
serviceName: zk-hs
replicas: 3 #创建三个pod
updateStrategy:
type: RollingUpdate
podManagementPolicy: Parallel
template:
metadata:
labels:
app: zk
spec:
nodeSelector: #进行label匹配调度pod到目标节点
travis.io/schedule-only: "kafka"
tolerations:
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "NoSchedule"
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "NoExecute"
tolerationSeconds: 3600
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "PreferNoSchedule"
affinity: #配置每个机器只能运行一个pod
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
imagePullSecrets: # 指定自己的私有镜像秘钥
- name: registry-key
containers:
- name: kubernetes-zookeeper
securityContext:
runAsUser: 0
imagePullPolicy: Always
image: 192.168.40.153:9080/k8s/k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10
resources:
requests:
memory: "20Mi"
cpu: "0.1"
ports:
- containerPort: 2182
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2182 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe: # pod 健康监测
exec:
command:
- sh
- -c
- "zookeeper-ready 2182"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2182"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates: #nfs 映射模版配置
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: nfs
resources:
requests:
storage: 10Gi