radius日志补全代码更新
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
package cn.ac.iie.topology;
|
||||
|
||||
|
||||
import cn.ac.iie.bolt.ConnCompletionBolt;
|
||||
import cn.ac.iie.bolt.NtcLogSendBolt;
|
||||
import cn.ac.iie.bolt.collectProtocol.CollectCompletedBolt;
|
||||
import cn.ac.iie.bolt.radius.RadiusCompletionBolt;
|
||||
import cn.ac.iie.bolt.security.SecurityCompletionBolt;
|
||||
|
||||
import cn.ac.iie.bolt.proxy.ProxyCompletionBolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.spout.CustomizedKafkaSpout;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -58,15 +62,45 @@ public class LogFlowWriteTopology {
|
||||
private void buildTopology() {
|
||||
builder = new TopologyBuilder();
|
||||
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
|
||||
if ("RADIUS-LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) {
|
||||
builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
|
||||
} else {
|
||||
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
|
||||
}
|
||||
|
||||
String topic_name = FlowWriteConfig.KAFKA_TOPIC;
|
||||
|
||||
switch(topic_name){
|
||||
case "PROXY-POLICY-LOG":
|
||||
builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt");
|
||||
break;
|
||||
|
||||
case "COLLECT-RADIUS-RECORD-LOG":
|
||||
builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
|
||||
break;
|
||||
|
||||
case "COLLECT-CONNECTION-RECORD-LOG":
|
||||
builder.setBolt("CollectCompletedBolt", new CollectCompletedBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("CollectCompletedBolt");
|
||||
break;
|
||||
|
||||
case "SECURITY-POLICY-LOG":
|
||||
builder.setBolt("SecurityCompletionBolt", new SecurityCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt");
|
||||
break;
|
||||
|
||||
|
||||
}
|
||||
|
||||
/*if ("PROXY_POLICY_LOG".equals(FlowWriteConfig.KAFKA_TOPIC) || "COLLECT_HTTP_META_LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) {
|
||||
builder.setBolt("HttpCompletionBolt", new HttpCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("HttpCompletionBolt");
|
||||
} else if ("COLLECT_RADIUS_RECORD_LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) {
|
||||
builder.setBolt("RadiusCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
|
||||
} else {
|
||||
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
|
||||
}*/
|
||||
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
LogFlowWriteTopology csst = null;
|
||||
|
||||
Reference in New Issue
Block a user