提交线上使用版本
This commit is contained in:
174
pom.xml
Normal file
174
pom.xml
Normal file
@@ -0,0 +1,174 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>radius-account-knowledge</groupId>
|
||||
<artifactId>radius-account-knowledge</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>radius-account-knowledge</name>
|
||||
<url>http://maven.apache.org</url>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>nexus</id>
|
||||
<name>Team Nexus Repository</name>
|
||||
<url>http://192.168.40.125:8099/content/groups/public</url>
|
||||
</repository>
|
||||
|
||||
|
||||
<repository>
|
||||
<id>maven-ali</id>
|
||||
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
<updatePolicy>always</updatePolicy>
|
||||
<checksumPolicy>fail</checksumPolicy>
|
||||
</snapshots>
|
||||
</repository>
|
||||
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.4.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>cn.ac.iie.topology.RadiusLogClearTopology</mainClass>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.handlers</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.schemas</resource>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>properties</directory>
|
||||
<includes>
|
||||
<include>**/*.properties</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
<resource>
|
||||
<directory>src/main/java</directory>
|
||||
<includes>
|
||||
<include>log4j.properties</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
<storm.version>1.0.2</storm.version>
|
||||
<zdjizhi.version>1.0.3</zdjizhi.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-core</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-kafka</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.60</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>${zdjizhi.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
36
properties/knowledge_config.properties
Normal file
36
properties/knowledge_config.properties
Normal file
@@ -0,0 +1,36 @@
|
||||
#管理kafka地址
|
||||
bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
|
||||
|
||||
#kafka broker下的topic名称
|
||||
kafka.topic=RADIUS-RECORD-LOG
|
||||
|
||||
#kafka消费group id
|
||||
group.id=RADIUS-AAA
|
||||
|
||||
#输出kafka server
|
||||
results.output.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
|
||||
|
||||
#输出topic
|
||||
results.output.topic=RADIUS-ONFF-LOG
|
||||
|
||||
#从kafka哪里开始读:earliest/latest
|
||||
auto.offset.reset=latest
|
||||
|
||||
#storm topology workers
|
||||
topology.workers=1
|
||||
|
||||
#storm spout parallelism
|
||||
spout.parallelism=3
|
||||
|
||||
#storm bolt parallelism
|
||||
format.bolt.parallelism=3
|
||||
|
||||
#tick时钟频率
|
||||
topology.tick.tuple.freq.secs=5
|
||||
|
||||
topology.config.max.spout.pending=500000
|
||||
|
||||
topology.num.acks=0
|
||||
|
||||
#kafka批量条数
|
||||
batch.insert.num=2000
|
||||
61
src/main/java/cn/ac/iie/bean/Knowledge.java
Normal file
61
src/main/java/cn/ac/iie/bean/Knowledge.java
Normal file
@@ -0,0 +1,61 @@
|
||||
package cn.ac.iie.bean;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class Knowledge {
|
||||
private String framed_ip;
|
||||
private String account;
|
||||
private String acct_session_id;
|
||||
private int acct_status_type;
|
||||
private int acct_session_time;
|
||||
private long event_timestamp;
|
||||
|
||||
public String getFramed_ip() {
|
||||
return framed_ip;
|
||||
}
|
||||
|
||||
public void setFramed_ip(String framed_ip) {
|
||||
this.framed_ip = framed_ip;
|
||||
}
|
||||
|
||||
public String getAccount() {
|
||||
return account;
|
||||
}
|
||||
|
||||
public void setAccount(String account) {
|
||||
this.account = account;
|
||||
}
|
||||
|
||||
public int getAcct_status_type() {
|
||||
return acct_status_type;
|
||||
}
|
||||
|
||||
public void setAcct_status_type(int acct_status_type) {
|
||||
this.acct_status_type = acct_status_type;
|
||||
}
|
||||
|
||||
public long getEvent_timestamp() {
|
||||
return event_timestamp;
|
||||
}
|
||||
|
||||
public void setEvent_timestamp(long event_timestamp) {
|
||||
this.event_timestamp = event_timestamp;
|
||||
}
|
||||
|
||||
public String getAcct_session_id() {
|
||||
return acct_session_id;
|
||||
}
|
||||
|
||||
public void setAcct_session_id(String acct_session_id) {
|
||||
this.acct_session_id = acct_session_id;
|
||||
}
|
||||
|
||||
public int getAcct_session_time() {
|
||||
return acct_session_time;
|
||||
}
|
||||
|
||||
public void setAcct_session_time(int acct_session_time) {
|
||||
this.acct_session_time = acct_session_time;
|
||||
}
|
||||
}
|
||||
108
src/main/java/cn/ac/iie/bolt/RadiusCleanBolt.java
Normal file
108
src/main/java/cn/ac/iie/bolt/RadiusCleanBolt.java
Normal file
@@ -0,0 +1,108 @@
|
||||
package cn.ac.iie.bolt;
|
||||
|
||||
import cn.ac.iie.bean.Knowledge;
|
||||
import cn.ac.iie.common.KnowledgeConfig;
|
||||
import cn.ac.iie.utils.LogToKafka;
|
||||
import cn.ac.iie.utils.TupleUtils;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author ZDJZ
|
||||
*/
|
||||
public class RadiusCleanBolt extends BaseBasicBolt {
|
||||
private static final long serialVersionUID = -7099293750085572832L;
|
||||
private List<String> list;
|
||||
private LogToKafka logToKafka;
|
||||
private static Logger logger = Logger.getLogger(RadiusCleanBolt.class);
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context) {
|
||||
list = new LinkedList<>();
|
||||
logToKafka = LogToKafka.getInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector collector) {
|
||||
try {
|
||||
if (TupleUtils.isTick(tuple)) {
|
||||
logToKafka.sendMessage(list);
|
||||
list.clear();
|
||||
} else {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
JSONObject jsonObject = JSONObject.parseObject(message);
|
||||
if (jsonObject.containsKey(KnowledgeConfig.RADIUS_PACKET_TYPE)) {
|
||||
int packetType = jsonObject.getInteger(KnowledgeConfig.RADIUS_PACKET_TYPE);
|
||||
if (KnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
|
||||
int statusType = jsonObject.getInteger("radius_acct_status_type");
|
||||
if (KnowledgeConfig.START_BILLING == statusType || KnowledgeConfig.STOP_BILLING == statusType) {
|
||||
Knowledge knowledge = new Knowledge();
|
||||
knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
|
||||
knowledge.setAccount(jsonObject.getString("radius_account"));
|
||||
knowledge.setAcct_status_type(statusType);
|
||||
/*
|
||||
如果存在时间戳则选择此时间戳没有获取当前时间
|
||||
*/
|
||||
if (jsonObject.containsKey(KnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) {
|
||||
knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp"));
|
||||
} else {
|
||||
knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000));
|
||||
}
|
||||
/*
|
||||
* 标识同一个连接:
|
||||
* 1.数据若存在acct_multi_session_id属性,取该属性
|
||||
* 2. 不存在取 acct_session_id
|
||||
*/
|
||||
if (jsonObject.containsKey(KnowledgeConfig.RADIUS_MULTI_SESSION_ID)) {
|
||||
knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
|
||||
} else {
|
||||
knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
|
||||
}
|
||||
/*
|
||||
用户的在线时长,以秒为单位,下线用户无此属性,默认为0
|
||||
*/
|
||||
if (jsonObject.containsKey(KnowledgeConfig.RADIUS_SESSION_TIME)) {
|
||||
knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
|
||||
} else {
|
||||
knowledge.setAcct_session_time(0);
|
||||
}
|
||||
list.add(JSONObject.toJSONString(knowledge));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (list.size() == KnowledgeConfig.BATCH_INSERT_NUM) {
|
||||
logToKafka.sendMessage(list);
|
||||
list.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Radius 上下线日志解析异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
Map<String, Object> conf = new HashMap<>(16);
|
||||
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, KnowledgeConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
65
src/main/java/cn/ac/iie/common/KnowledgeConfig.java
Normal file
65
src/main/java/cn/ac/iie/common/KnowledgeConfig.java
Normal file
@@ -0,0 +1,65 @@
|
||||
package cn.ac.iie.common;
|
||||
|
||||
import cn.ac.iie.utils.KnowledgeConfigurations;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class KnowledgeConfig implements Serializable {
|
||||
private static final long serialVersionUID = -8326385159484059324L;
|
||||
|
||||
public static final String SEGMENTATION = ",";
|
||||
/**
|
||||
* 4- Accounting-Request(账户授权)
|
||||
*/
|
||||
public static final int ACCOUNTING_REQUEST = 4;
|
||||
/**
|
||||
* 1、开始计费
|
||||
*/
|
||||
public static final int START_BILLING = 1;
|
||||
/**
|
||||
* 2、停止计费
|
||||
*/
|
||||
public static final int STOP_BILLING = 2;
|
||||
|
||||
/**
|
||||
* 报文类型
|
||||
*/
|
||||
public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
|
||||
|
||||
/**
|
||||
* 发送计费请求报文时间戳
|
||||
*/
|
||||
public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
|
||||
|
||||
/**
|
||||
* 一个用户多个计费ID关联属性
|
||||
*/
|
||||
public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
|
||||
|
||||
/**
|
||||
* 用户的在线时长,以秒为单位
|
||||
*/
|
||||
public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
|
||||
|
||||
/***
|
||||
* kafka and system
|
||||
*/
|
||||
public static final String BOOTSTRAP_SERVERS = KnowledgeConfigurations.getStringProperty(0, "bootstrap.servers");
|
||||
public static final Integer SPOUT_PARALLELISM = KnowledgeConfigurations.getIntProperty(0, "spout.parallelism");
|
||||
public static final String GROUP_ID = KnowledgeConfigurations.getStringProperty(0, "group.id");
|
||||
public static final String KAFKA_TOPIC = KnowledgeConfigurations.getStringProperty(0, "kafka.topic");
|
||||
public static final String AUTO_OFFSET_RESET = KnowledgeConfigurations.getStringProperty(0, "auto.offset.reset");
|
||||
public static final String RESULTS_OUTPUT_SERVERS = KnowledgeConfigurations.getStringProperty(0, "results.output.servers");
|
||||
public static final String RESULTS_OUTPUT_TOPIC = KnowledgeConfigurations.getStringProperty(0, "results.output.topic");
|
||||
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = KnowledgeConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
|
||||
public static final Integer TOPOLOGY_NUM_ACKS = KnowledgeConfigurations.getIntProperty(0, "topology.num.acks");
|
||||
public static final Integer TOPOLOGY_WORKERS = KnowledgeConfigurations.getIntProperty(0, "topology.workers");
|
||||
public static final Integer FORMAT_BOLT_PARALLELISM = KnowledgeConfigurations.getIntProperty(0, "format.bolt.parallelism");
|
||||
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = KnowledgeConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
|
||||
public static final Integer BATCH_INSERT_NUM = KnowledgeConfigurations.getIntProperty(0, "batch.insert.num");
|
||||
|
||||
|
||||
}
|
||||
78
src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
Normal file
78
src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
Normal file
@@ -0,0 +1,78 @@
|
||||
package cn.ac.iie.spout;
|
||||
|
||||
import cn.ac.iie.common.KnowledgeConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.spout.SpoutOutputCollector;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseRichSpout;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class CustomizedKafkaSpout extends BaseRichSpout {
|
||||
private static final long serialVersionUID = 2934528972182398950L;
|
||||
private KafkaConsumer<String, String> consumer;
|
||||
private SpoutOutputCollector collector = null;
|
||||
private TopologyContext context = null;
|
||||
private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
|
||||
|
||||
|
||||
private static Properties createConsumerConfig() {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", KnowledgeConfig.BOOTSTRAP_SERVERS);
|
||||
props.put("group.id", KnowledgeConfig.GROUP_ID);
|
||||
props.put("auto.offset.reset", KnowledgeConfig.AUTO_OFFSET_RESET);
|
||||
props.put("session.timeout.ms", "60000");
|
||||
props.put("max.poll.records", 3000);
|
||||
props.put("max.partition.fetch.bytes", 31457280);
|
||||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
return props;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
|
||||
// TODO Auto-generated method stub
|
||||
this.collector = collector;
|
||||
this.context = context;
|
||||
Properties prop = createConsumerConfig();
|
||||
this.consumer = new KafkaConsumer<>(prop);
|
||||
this.consumer.subscribe(Arrays.asList(KnowledgeConfig.KAFKA_TOPIC));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nextTuple() {
|
||||
try {
|
||||
// TODO Auto-generated method stub
|
||||
ConsumerRecords<String, String> records = consumer.poll(10000L);
|
||||
Thread.sleep(300);
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
this.collector.emit(new Values(record.value()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("kfaka-spout 发送数据出现异常" + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
||||
// TODO Auto-generated method stub
|
||||
declarer.declare(new Fields("source"));
|
||||
}
|
||||
}
|
||||
81
src/main/java/cn/ac/iie/topology/RadiusLogClearTopology.java
Normal file
81
src/main/java/cn/ac/iie/topology/RadiusLogClearTopology.java
Normal file
@@ -0,0 +1,81 @@
|
||||
package cn.ac.iie.topology;
|
||||
|
||||
import cn.ac.iie.bolt.RadiusCleanBolt;
|
||||
import cn.ac.iie.common.KnowledgeConfig;
|
||||
import cn.ac.iie.spout.CustomizedKafkaSpout;
|
||||
import org.apache.storm.Config;
|
||||
import org.apache.storm.generated.AlreadyAliveException;
|
||||
import org.apache.storm.generated.AuthorizationException;
|
||||
import org.apache.storm.generated.InvalidTopologyException;
|
||||
import org.apache.storm.topology.TopologyBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class RadiusLogClearTopology {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RadiusLogClearTopology.class);
|
||||
private final String topologyName;
|
||||
private final Config topologyConfig;
|
||||
private TopologyBuilder builder;
|
||||
|
||||
public RadiusLogClearTopology() {
|
||||
this(RadiusLogClearTopology.class.getSimpleName());
|
||||
}
|
||||
|
||||
public RadiusLogClearTopology(String topologyName) {
|
||||
this.topologyName = topologyName;
|
||||
topologyConfig = createTopologConfig();
|
||||
}
|
||||
|
||||
private Config createTopologConfig() {
|
||||
Config conf = new Config();
|
||||
conf.setDebug(false);
|
||||
conf.setMessageTimeoutSecs(120);
|
||||
conf.setTopologyWorkerMaxHeapSize(500);
|
||||
conf.setMaxSpoutPending(KnowledgeConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
|
||||
if (KnowledgeConfig.TOPOLOGY_NUM_ACKS == 0) {
|
||||
conf.setNumAckers(0);
|
||||
}
|
||||
return conf;
|
||||
}
|
||||
|
||||
public void runLocally() throws InterruptedException {
|
||||
topologyConfig.setMaxTaskParallelism(1);
|
||||
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
|
||||
}
|
||||
|
||||
public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
||||
topologyConfig.setNumWorkers(KnowledgeConfig.TOPOLOGY_WORKERS);
|
||||
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
|
||||
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
|
||||
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
|
||||
}
|
||||
|
||||
private void buildTopology() {
|
||||
builder = new TopologyBuilder();
|
||||
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), KnowledgeConfig.SPOUT_PARALLELISM);
|
||||
builder.setBolt("RadiusCleanBolt", new RadiusCleanBolt(), KnowledgeConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
RadiusLogClearTopology csst = null;
|
||||
boolean runLocally = true;
|
||||
if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
|
||||
runLocally = false;
|
||||
csst = new RadiusLogClearTopology(args[0]);
|
||||
} else {
|
||||
csst = new RadiusLogClearTopology();
|
||||
}
|
||||
csst.buildTopology();
|
||||
|
||||
if (runLocally) {
|
||||
logger.info("执行本地模式...");
|
||||
csst.runLocally();
|
||||
} else {
|
||||
logger.info("执行远程部署模式...");
|
||||
csst.runRemotely();
|
||||
}
|
||||
}
|
||||
}
|
||||
32
src/main/java/cn/ac/iie/topology/StormRunner.java
Normal file
32
src/main/java/cn/ac/iie/topology/StormRunner.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package cn.ac.iie.topology;
|
||||
|
||||
|
||||
import org.apache.storm.Config;
|
||||
import org.apache.storm.LocalCluster;
|
||||
import org.apache.storm.StormSubmitter;
|
||||
import org.apache.storm.generated.AlreadyAliveException;
|
||||
import org.apache.storm.generated.AuthorizationException;
|
||||
import org.apache.storm.generated.InvalidTopologyException;
|
||||
import org.apache.storm.topology.TopologyBuilder;
|
||||
|
||||
public final class StormRunner{
|
||||
private static final int MILLS_IN_SEC = 1000;
|
||||
|
||||
private StormRunner() {}
|
||||
|
||||
public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
|
||||
|
||||
LocalCluster localCluster = new LocalCluster();
|
||||
localCluster.submitTopology(topologyName, conf, builder.createTopology());
|
||||
Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
|
||||
localCluster.shutdown();
|
||||
|
||||
}
|
||||
|
||||
public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
||||
|
||||
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
53
src/main/java/cn/ac/iie/utils/KnowledgeConfigurations.java
Normal file
53
src/main/java/cn/ac/iie/utils/KnowledgeConfigurations.java
Normal file
@@ -0,0 +1,53 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public final class KnowledgeConfigurations {
|
||||
|
||||
private static Properties propCommon = new Properties();
|
||||
|
||||
public static String getStringProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return propCommon.getProperty(key);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static Integer getIntProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return Integer.parseInt(propCommon.getProperty(key));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Long getLongProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return Long.parseLong(propCommon.getProperty(key));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Boolean getBooleanProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
try {
|
||||
propCommon.load(KnowledgeConfigurations.class.getClassLoader().getResourceAsStream("knowledge_config.properties"));
|
||||
} catch (Exception e) {
|
||||
propCommon = null;
|
||||
System.err.println("配置加载失败");
|
||||
}
|
||||
}
|
||||
}
|
||||
77
src/main/java/cn/ac/iie/utils/LogToKafka.java
Normal file
77
src/main/java/cn/ac/iie/utils/LogToKafka.java
Normal file
@@ -0,0 +1,77 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import cn.ac.iie.common.KnowledgeConfig;
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 日志写入数据中心类
|
||||
*
|
||||
* @author Administrator
|
||||
* @create 2018-08-13 15:11
|
||||
*/
|
||||
|
||||
public class LogToKafka {
|
||||
private static Logger logger = Logger.getLogger(LogToKafka.class);
|
||||
|
||||
/**
|
||||
* kafka生产者,用于向kafka中发送消息
|
||||
*/
|
||||
private static Producer<String, String> kafkaProducer;
|
||||
|
||||
/**
|
||||
* kafka生产者适配器(单例),用来代理kafka生产者发送消息
|
||||
*/
|
||||
private static LogToKafka logToKafka;
|
||||
|
||||
private LogToKafka() {
|
||||
initKafkaProducer();
|
||||
}
|
||||
|
||||
public static LogToKafka getInstance() {
|
||||
if (logToKafka == null) {
|
||||
logToKafka = new LogToKafka();
|
||||
}
|
||||
return logToKafka;
|
||||
}
|
||||
|
||||
|
||||
public void sendMessage(List<String> list) {
|
||||
final int[] errorSum = {0};
|
||||
for (String value : list) {
|
||||
kafkaProducer.send(new ProducerRecord<>(KnowledgeConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() {
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
if (exception != null) {
|
||||
logger.error("写入" + KnowledgeConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
|
||||
errorSum[0]++;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
kafkaProducer.flush();
|
||||
logger.debug("Log sent to National Center successfully!!!!!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
|
||||
*/
|
||||
private void initKafkaProducer() {
|
||||
Properties properties = new Properties();
|
||||
properties.put("bootstrap.servers", KnowledgeConfig.RESULTS_OUTPUT_SERVERS);
|
||||
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
properties.put("acks", "1");
|
||||
properties.put("linger.ms", "2");
|
||||
properties.put("request.timeout.ms", 30000);
|
||||
properties.put("batch.size", 262144);
|
||||
properties.put("buffer.memory", 33554432);
|
||||
// properties.put("compression.type", "snappy");
|
||||
kafkaProducer = new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
21
src/main/java/cn/ac/iie/utils/TupleUtils.java
Normal file
21
src/main/java/cn/ac/iie/utils/TupleUtils.java
Normal file
@@ -0,0 +1,21 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import org.apache.storm.Constants;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
public final class TupleUtils {
|
||||
/**
|
||||
* 判断是否系统自动发送的Tuple
|
||||
*
|
||||
* @param tuple
|
||||
* @return
|
||||
*/
|
||||
public static boolean isTick(Tuple tuple) {
|
||||
return tuple != null
|
||||
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
|
||||
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
|
||||
}
|
||||
}
|
||||
25
src/main/java/log4j.properties
Normal file
25
src/main/java/log4j.properties
Normal file
@@ -0,0 +1,25 @@
|
||||
#Log4j
|
||||
log4j.rootLogger=info,console,file
|
||||
# 控制台日志设置
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.Threshold=error
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
|
||||
|
||||
# 文件日志设置
|
||||
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
|
||||
log4j.appender.file.Threshold=error
|
||||
log4j.appender.file.encoding=UTF-8
|
||||
log4j.appender.file.Append=true
|
||||
#路径请用相对路径,做好相关测试输出到应用目下
|
||||
log4j.appender.file.file=${nis.root}/log/galaxy-name.log
|
||||
log4j.appender.file.DatePattern='.'yyyy-MM-dd
|
||||
log4j.appender.file.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
|
||||
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
|
||||
#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
|
||||
log4j.logger.com.nis.web.dao=debug
|
||||
#bonecp数据源配置
|
||||
log4j.category.com.jolbox=debug,console
|
||||
|
||||
|
||||
11
src/test/java/cn/ac/iie/test/JsonTest.java
Normal file
11
src/test/java/cn/ac/iie/test/JsonTest.java
Normal file
@@ -0,0 +1,11 @@
|
||||
package cn.ac.iie.test;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class JsonTest {
|
||||
|
||||
@Test
|
||||
public void huToolTest() {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user