提交线上使用版本
This commit is contained in:
64
pom.xml
64
pom.xml
@@ -2,20 +2,35 @@
|
|||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<groupId>log-address-hbase</groupId>
|
<groupId>log-subscriber-hbase</groupId>
|
||||||
<artifactId>log-address-hbase</artifactId>
|
<artifactId>log-subscriber-hbase</artifactId>
|
||||||
<version>0.0.1-SNAPSHOT</version>
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>log-address-hbase</name>
|
<name>log-subscriber-hbase</name>
|
||||||
<url>http://maven.apache.org</url>
|
<url>http://maven.apache.org</url>
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
<repository>
|
<repository>
|
||||||
<id>nexus</id>
|
<id>nexus</id>
|
||||||
<name>Team Nexus Repository</name>
|
<name>Team Nexus Repository</name>
|
||||||
<url>http://192.168.10.125:8099/content/groups/public</url>
|
<url>http://192.168.40.125:8099/content/groups/public</url>
|
||||||
</repository>
|
</repository>
|
||||||
|
|
||||||
|
|
||||||
|
<repository>
|
||||||
|
<id>maven-ali</id>
|
||||||
|
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
|
||||||
|
<releases>
|
||||||
|
<enabled>true</enabled>
|
||||||
|
</releases>
|
||||||
|
<snapshots>
|
||||||
|
<enabled>true</enabled>
|
||||||
|
<updatePolicy>always</updatePolicy>
|
||||||
|
<checksumPolicy>fail</checksumPolicy>
|
||||||
|
</snapshots>
|
||||||
|
</repository>
|
||||||
|
|
||||||
</repositories>
|
</repositories>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
@@ -35,7 +50,7 @@
|
|||||||
<transformers>
|
<transformers>
|
||||||
<transformer
|
<transformer
|
||||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||||
<mainClass>cn.ac.iie.topology.LogAddressRedisTopology</mainClass>
|
<mainClass>cn.ac.iie.topology.LogSubscriberTopology</mainClass>
|
||||||
</transformer>
|
</transformer>
|
||||||
<transformer
|
<transformer
|
||||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||||
@@ -85,7 +100,7 @@
|
|||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<kafka.version>1.0.0</kafka.version>
|
<kafka.version>1.0.0</kafka.version>
|
||||||
<storm.version>1.0.2</storm.version>
|
<storm.version>1.0.2</storm.version>
|
||||||
<hbase.version>1.4.9</hbase.version>
|
<hbase.version>2.2.3</hbase.version>
|
||||||
<hadoop.version>2.7.1</hadoop.version>
|
<hadoop.version>2.7.1</hadoop.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
@@ -108,6 +123,16 @@
|
|||||||
<artifactId>storm-core</artifactId>
|
<artifactId>storm-core</artifactId>
|
||||||
<version>${storm.version}</version>
|
<version>${storm.version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>log4j-over-slf4j</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
@@ -126,34 +151,11 @@
|
|||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.nis</groupId>
|
|
||||||
<artifactId>nis-core</artifactId>
|
|
||||||
<version>1.0</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>info.monitorenter</groupId>
|
|
||||||
<artifactId>cpdetector</artifactId>
|
|
||||||
<version>1.0.7</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.google.guava</groupId>
|
|
||||||
<artifactId>guava</artifactId>
|
|
||||||
<version>18.0</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>redis.clients</groupId>
|
|
||||||
<artifactId>jedis</artifactId>
|
|
||||||
<version>2.8.1</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<version>3.8.1</version>
|
<version>4.12</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
@@ -161,7 +163,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba</groupId>
|
<groupId>com.alibaba</groupId>
|
||||||
<artifactId>fastjson</artifactId>
|
<artifactId>fastjson</artifactId>
|
||||||
<version>1.2.47</version>
|
<version>1.2.59</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
32
properties/subscriber-config.properties
Normal file
32
properties/subscriber-config.properties
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
#管理kafka地址
|
||||||
|
bootstrap.servers=192.168.40.186:9092
|
||||||
|
#从kafka哪里开始读:earliest/latest
|
||||||
|
auto.offset.reset=latest
|
||||||
|
|
||||||
|
#hbase zookeeper地址
|
||||||
|
hbase.zookeeper.servers=192.168.40.186:2181
|
||||||
|
|
||||||
|
#hbase table name
|
||||||
|
hbase.table.name=subscriber_info
|
||||||
|
|
||||||
|
#tick时钟频率
|
||||||
|
topology.tick.tuple.freq.secs=30
|
||||||
|
|
||||||
|
topology.config.max.spout.pending=500000
|
||||||
|
|
||||||
|
topology.num.acks=0
|
||||||
|
|
||||||
|
#kafka broker下的topic名称
|
||||||
|
kafka.topic=RADIUS-RECORD-LOG
|
||||||
|
|
||||||
|
#kafka消费group id
|
||||||
|
group.id=account-to-hbase-a
|
||||||
|
|
||||||
|
#storm topology workers
|
||||||
|
topology.workers=1
|
||||||
|
|
||||||
|
#storm spout parallelism
|
||||||
|
spout.parallelism=1
|
||||||
|
|
||||||
|
#storm bolt parallelism
|
||||||
|
format.bolt.parallelism=1
|
||||||
169
src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
Normal file
169
src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
package cn.ac.iie.bolt;
|
||||||
|
|
||||||
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
|
import cn.ac.iie.utils.TupleUtils;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.*;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.storm.task.TopologyContext;
|
||||||
|
import org.apache.storm.topology.BasicOutputCollector;
|
||||||
|
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||||
|
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||||
|
import org.apache.storm.tuple.Tuple;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
*/
|
||||||
|
public class SubscriberIdBolt extends BaseBasicBolt {
|
||||||
|
private static Logger logger = Logger.getLogger(SubscriberIdBolt.class);
|
||||||
|
private static Map<String, String> subIdMap;
|
||||||
|
private List<Put> putList;
|
||||||
|
private static Connection connection;
|
||||||
|
|
||||||
|
static {
|
||||||
|
// 管理Hbase的配置信息
|
||||||
|
Configuration configuration = HBaseConfiguration.create();
|
||||||
|
// 设置zookeeper节点
|
||||||
|
configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
|
||||||
|
configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
|
||||||
|
configuration.set("hbase.client.retries.number", "3");
|
||||||
|
configuration.set("hbase.bulkload.retries.number", "3");
|
||||||
|
configuration.set("zookeeper.recovery.retry", "3");
|
||||||
|
try {
|
||||||
|
connection = ConnectionFactory.createConnection(configuration);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("Subscriber写入HBase程序连接HBase异常");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare(Map stormConf, TopologyContext context) {
|
||||||
|
subIdMap = new HashMap<>(3333334);
|
||||||
|
putList = new ArrayList<>();
|
||||||
|
getAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(Tuple tuple, BasicOutputCollector collector) {
|
||||||
|
try {
|
||||||
|
if (TupleUtils.isTick(tuple)) {
|
||||||
|
insertData(putList);
|
||||||
|
} else {
|
||||||
|
String message = tuple.getString(0);
|
||||||
|
if (StringUtil.isNotBlank(message)) {
|
||||||
|
JSONObject jsonObject = JSONObject.parseObject(message);
|
||||||
|
if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) {
|
||||||
|
if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE)
|
||||||
|
&& SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) {
|
||||||
|
String framedIp = jsonObject.getString("radius_framed_ip");
|
||||||
|
String account = jsonObject.getString("radius_account");
|
||||||
|
dataValidation(framedIp, account, putList);
|
||||||
|
}
|
||||||
|
if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
|
||||||
|
insertData(putList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Radius 日志解析/更新HBase 失败!");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getComponentConfiguration() {
|
||||||
|
Map<String, Object> conf = new HashMap<String, Object>(16);
|
||||||
|
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
|
||||||
|
SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有的 key value
|
||||||
|
*/
|
||||||
|
private static void getAll() {
|
||||||
|
try {
|
||||||
|
Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
|
||||||
|
Scan scan = new Scan();
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
for (Result result : scanner) {
|
||||||
|
Cell[] cells = result.rawCells();
|
||||||
|
for (Cell cell : cells) {
|
||||||
|
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.warn("初始化内存数据成功--初始化map大小->(" + subIdMap.size() + ")");
|
||||||
|
scanner.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("初始化内存数据异常");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 写入数据到HBase
|
||||||
|
*
|
||||||
|
* @param putList puts list
|
||||||
|
*/
|
||||||
|
private static void insertData(List<Put> putList) {
|
||||||
|
Table table = null;
|
||||||
|
try {
|
||||||
|
table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
|
||||||
|
table.put(putList);
|
||||||
|
putList.clear();
|
||||||
|
logger.warn("更新HBase成功,更新条数:" + putList.size());
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("更新数据写入HBase失败");
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (table != null) {
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("HBase表关闭异常");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证数据并与内存中的对比
|
||||||
|
*
|
||||||
|
* @param ip framed_ip
|
||||||
|
* @param account account
|
||||||
|
*/
|
||||||
|
private static void dataValidation(String ip, String account, List<Put> putList) {
|
||||||
|
if (subIdMap.containsKey(ip)) {
|
||||||
|
if (!subIdMap.get(ip).equals(account)) {
|
||||||
|
Put put = new Put(ip.getBytes());
|
||||||
|
put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
|
||||||
|
putList.add(put);
|
||||||
|
subIdMap.put(ip, account);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Put put = new Put(ip.getBytes());
|
||||||
|
put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
|
||||||
|
putList.add(put);
|
||||||
|
subIdMap.put(ip, account);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
52
src/main/java/cn/ac/iie/common/SubscriberConfig.java
Normal file
52
src/main/java/cn/ac/iie/common/SubscriberConfig.java
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package cn.ac.iie.common;
|
||||||
|
|
||||||
|
import cn.ac.iie.utils.SubscriberConfigurations;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
*/
|
||||||
|
public class SubscriberConfig implements Serializable {
|
||||||
|
private static final long serialVersionUID = -8326385159484059324L;
|
||||||
|
|
||||||
|
public static final String SEGMENTATION = ",";
|
||||||
|
public static final int LIST_SIZE_MAX = 5000;
|
||||||
|
/**
|
||||||
|
* 4- Accounting-Request(账户授权)
|
||||||
|
*/
|
||||||
|
public static final int ACCOUNTING_REQUEST = 4;
|
||||||
|
/**
|
||||||
|
* radius_packet_type
|
||||||
|
*/
|
||||||
|
public static final String PACKET_TYPE = "radius_packet_type";
|
||||||
|
/**
|
||||||
|
* 1、开始计费
|
||||||
|
*/
|
||||||
|
public static final int START_BILLING = 1;
|
||||||
|
/**
|
||||||
|
* radius_acct_status_type
|
||||||
|
*/
|
||||||
|
public static final String STATUS_TYPE = "radius_acct_status_type";
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* kafka and system
|
||||||
|
*/
|
||||||
|
public static final String BOOTSTRAP_SERVERS = SubscriberConfigurations.getStringProperty(0, "bootstrap.servers");
|
||||||
|
public static final Integer SPOUT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "spout.parallelism");
|
||||||
|
public static final Integer FORMAT_BOLT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "format.bolt.parallelism");
|
||||||
|
public static final String GROUP_ID = SubscriberConfigurations.getStringProperty(0, "group.id");
|
||||||
|
public static final String KAFKA_TOPIC = SubscriberConfigurations.getStringProperty(0, "kafka.topic");
|
||||||
|
public static final String AUTO_OFFSET_RESET = SubscriberConfigurations.getStringProperty(0, "auto.offset.reset");
|
||||||
|
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = SubscriberConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
|
||||||
|
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = SubscriberConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
|
||||||
|
public static final Integer TOPOLOGY_NUM_ACKS = SubscriberConfigurations.getIntProperty(0, "topology.num.acks");
|
||||||
|
|
||||||
|
public static final Integer TOPOLOGY_WORKERS = SubscriberConfigurations.getIntProperty(0, "topology.workers");
|
||||||
|
|
||||||
|
public static final String CHECK_IP_SCOPE = SubscriberConfigurations.getStringProperty(0, "check.ip.scope");
|
||||||
|
|
||||||
|
public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
||||||
|
public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name");
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package cn.ac.iie.spout;
|
package cn.ac.iie.spout;
|
||||||
|
|
||||||
import cn.ac.iie.common.AddressConfig;
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
@@ -29,9 +29,9 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
|
|||||||
|
|
||||||
private static Properties createConsumerConfig() {
|
private static Properties createConsumerConfig() {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.put("bootstrap.servers", AddressConfig.BOOTSTRAP_SERVERS);
|
props.put("bootstrap.servers", SubscriberConfig.BOOTSTRAP_SERVERS);
|
||||||
props.put("group.id", AddressConfig.GROUP_ID);
|
props.put("group.id", SubscriberConfig.GROUP_ID);
|
||||||
props.put("auto.offset.reset", AddressConfig.AUTO_OFFSET_RESET);
|
props.put("auto.offset.reset", SubscriberConfig.AUTO_OFFSET_RESET);
|
||||||
props.put("session.timeout.ms", "60000");
|
props.put("session.timeout.ms", "60000");
|
||||||
props.put("max.poll.records", 3000);
|
props.put("max.poll.records", 3000);
|
||||||
props.put("max.partition.fetch.bytes", 31457280);
|
props.put("max.partition.fetch.bytes", 31457280);
|
||||||
@@ -47,7 +47,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
|
|||||||
this.context = context;
|
this.context = context;
|
||||||
Properties prop = createConsumerConfig();
|
Properties prop = createConsumerConfig();
|
||||||
this.consumer = new KafkaConsumer<>(prop);
|
this.consumer = new KafkaConsumer<>(prop);
|
||||||
this.consumer.subscribe(Arrays.asList(AddressConfig.KAFKA_TOPIC));
|
this.consumer.subscribe(Arrays.asList(SubscriberConfig.KAFKA_TOPIC));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
81
src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
Normal file
81
src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
package cn.ac.iie.topology;
|
||||||
|
|
||||||
|
import cn.ac.iie.bolt.SubscriberIdBolt;
|
||||||
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
|
import cn.ac.iie.spout.CustomizedKafkaSpout;
|
||||||
|
import org.apache.storm.Config;
|
||||||
|
import org.apache.storm.generated.AlreadyAliveException;
|
||||||
|
import org.apache.storm.generated.AuthorizationException;
|
||||||
|
import org.apache.storm.generated.InvalidTopologyException;
|
||||||
|
import org.apache.storm.topology.TopologyBuilder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
*/
|
||||||
|
public class LogSubscriberTopology {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(LogSubscriberTopology.class);
|
||||||
|
private final String topologyName;
|
||||||
|
private final Config topologyConfig;
|
||||||
|
private TopologyBuilder builder;
|
||||||
|
|
||||||
|
public LogSubscriberTopology() {
|
||||||
|
this(LogSubscriberTopology.class.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
|
public LogSubscriberTopology(String topologyName) {
|
||||||
|
this.topologyName = topologyName;
|
||||||
|
topologyConfig = createTopologConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Config createTopologConfig() {
|
||||||
|
Config conf = new Config();
|
||||||
|
conf.setDebug(false);
|
||||||
|
conf.setMessageTimeoutSecs(120);
|
||||||
|
conf.setTopologyWorkerMaxHeapSize(500);
|
||||||
|
conf.setMaxSpoutPending(SubscriberConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
|
||||||
|
if (SubscriberConfig.TOPOLOGY_NUM_ACKS == 0) {
|
||||||
|
conf.setNumAckers(0);
|
||||||
|
}
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void runLocally() throws InterruptedException {
|
||||||
|
topologyConfig.setMaxTaskParallelism(1);
|
||||||
|
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
||||||
|
topologyConfig.setNumWorkers(SubscriberConfig.TOPOLOGY_WORKERS);
|
||||||
|
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
|
||||||
|
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
|
||||||
|
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void buildTopology() {
|
||||||
|
builder = new TopologyBuilder();
|
||||||
|
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), SubscriberConfig.SPOUT_PARALLELISM);
|
||||||
|
builder.setBolt("SubscriberIdBolt", new SubscriberIdBolt(), SubscriberConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
LogSubscriberTopology csst = null;
|
||||||
|
boolean runLocally = true;
|
||||||
|
if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
|
||||||
|
runLocally = false;
|
||||||
|
csst = new LogSubscriberTopology(args[0]);
|
||||||
|
} else {
|
||||||
|
csst = new LogSubscriberTopology();
|
||||||
|
}
|
||||||
|
csst.buildTopology();
|
||||||
|
|
||||||
|
if (runLocally) {
|
||||||
|
logger.info("执行本地模式...");
|
||||||
|
csst.runLocally();
|
||||||
|
} else {
|
||||||
|
logger.info("执行远程部署模式...");
|
||||||
|
csst.runRemotely();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,6 +9,9 @@ import org.apache.storm.generated.AuthorizationException;
|
|||||||
import org.apache.storm.generated.InvalidTopologyException;
|
import org.apache.storm.generated.InvalidTopologyException;
|
||||||
import org.apache.storm.topology.TopologyBuilder;
|
import org.apache.storm.topology.TopologyBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author qidaijie
|
||||||
|
*/
|
||||||
public final class StormRunner{
|
public final class StormRunner{
|
||||||
private static final int MILLS_IN_SEC = 1000;
|
private static final int MILLS_IN_SEC = 1000;
|
||||||
|
|
||||||
|
|||||||
62
src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
Normal file
62
src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package cn.ac.iie.utils;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public final class SubscriberConfigurations {
|
||||||
|
|
||||||
|
private static Properties propCommon = new Properties();
|
||||||
|
// private static Properties propService = new Properties();
|
||||||
|
|
||||||
|
|
||||||
|
public static String getStringProperty(Integer type, String key) {
|
||||||
|
if (type == 0) {
|
||||||
|
return propCommon.getProperty(key);
|
||||||
|
// } else if (type == 1) {
|
||||||
|
// return propService.getProperty(key);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static Integer getIntProperty(Integer type, String key) {
|
||||||
|
if (type == 0) {
|
||||||
|
return Integer.parseInt(propCommon.getProperty(key));
|
||||||
|
// } else if (type == 1) {
|
||||||
|
// return Integer.parseInt(propService.getProperty(key));
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Long getLongProperty(Integer type, String key) {
|
||||||
|
if (type == 0) {
|
||||||
|
return Long.parseLong(propCommon.getProperty(key));
|
||||||
|
// } else if (type == 1) {
|
||||||
|
// return Long.parseLong(propService.getProperty(key));
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Boolean getBooleanProperty(Integer type, String key) {
|
||||||
|
if (type == 0) {
|
||||||
|
return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
|
||||||
|
// } else if (type == 1) {
|
||||||
|
// return propService.getProperty(key).toLowerCase().trim().equals("true");
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
propCommon.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("subscriber-config.properties"));
|
||||||
|
} catch (Exception e) {
|
||||||
|
propCommon = null;
|
||||||
|
System.err.println("配置加载失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
21
src/test/java/cn/ac/iie/test/LoggerTest.java
Normal file
21
src/test/java/cn/ac/iie/test/LoggerTest.java
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package cn.ac.iie.test;
|
||||||
|
|
||||||
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class LoggerTest {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(LoggerTest.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
try{
|
||||||
|
int a = 1;
|
||||||
|
throw new Exception();
|
||||||
|
}catch (Exception e){
|
||||||
|
int b = 20;
|
||||||
|
logger.error("id is {} aaaaaaaaaa {}.",SubscriberConfig.KAFKA_TOPIC,b,e);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package cn.ac.iie.test;
|
package cn.ac.iie.test;
|
||||||
|
|
||||||
import cn.ac.iie.common.AddressConfig;
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
import cn.ac.iie.utils.TupleUtils;
|
import cn.ac.iie.utils.TupleUtils;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
@@ -37,7 +37,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
|
|||||||
// 管理Hbase的配置信息
|
// 管理Hbase的配置信息
|
||||||
Configuration configuration = HBaseConfiguration.create();
|
Configuration configuration = HBaseConfiguration.create();
|
||||||
// 设置zookeeper节点
|
// 设置zookeeper节点
|
||||||
configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
|
configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
|
||||||
try {
|
try {
|
||||||
connection = ConnectionFactory.createConnection(configuration);
|
connection = ConnectionFactory.createConnection(configuration);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -64,7 +64,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
|
|||||||
String ip = jsonObject.getString("framed_ip");
|
String ip = jsonObject.getString("framed_ip");
|
||||||
String account = jsonObject.getString("account");
|
String account = jsonObject.getString("account");
|
||||||
dataValidation(ip, account, putList);
|
dataValidation(ip, account, putList);
|
||||||
if (putList.size() == AddressConfig.LIST_SIZE_MAX) {
|
if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
|
||||||
insertData(putList);
|
insertData(putList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -79,7 +79,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
|
|||||||
public Map<String, Object> getComponentConfiguration() {
|
public Map<String, Object> getComponentConfiguration() {
|
||||||
Map<String, Object> conf = new HashMap<String, Object>(16);
|
Map<String, Object> conf = new HashMap<String, Object>(16);
|
||||||
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
|
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
|
||||||
AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
|
SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,7 +93,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
|
|||||||
*/
|
*/
|
||||||
private static void getAll() {
|
private static void getAll() {
|
||||||
try {
|
try {
|
||||||
Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
|
Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
|
||||||
Scan scan2 = new Scan();
|
Scan scan2 = new Scan();
|
||||||
ResultScanner scanner = table.getScanner(scan2);
|
ResultScanner scanner = table.getScanner(scan2);
|
||||||
for (Result result : scanner) {
|
for (Result result : scanner) {
|
||||||
@@ -114,7 +114,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
|
|||||||
private static void insertData(List<Put> putList) {
|
private static void insertData(List<Put> putList) {
|
||||||
Table table = null;
|
Table table = null;
|
||||||
try {
|
try {
|
||||||
table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
|
table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
|
||||||
table.put(putList);
|
table.put(putList);
|
||||||
logger.error("写入hbase数目:" + putList.size());
|
logger.error("写入hbase数目:" + putList.size());
|
||||||
putList.clear();
|
putList.clear();
|
||||||
@@ -141,7 +141,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
|
|||||||
private static void dataValidation(String ip, String account, List<Put> putList) {
|
private static void dataValidation(String ip, String account, List<Put> putList) {
|
||||||
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
|
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
|
||||||
// String s = ip.split("\\.")[0];
|
// String s = ip.split("\\.")[0];
|
||||||
// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
|
// if (!SubscriberConfig.CHECK_IP_SCOPE.contains(s)) {
|
||||||
if (subIdMap.containsKey(ip)) {
|
if (subIdMap.containsKey(ip)) {
|
||||||
if (!subIdMap.get(ip).equals(account)) {
|
if (!subIdMap.get(ip).equals(account)) {
|
||||||
Put put = new Put(ip.getBytes());
|
Put put = new Put(ip.getBytes());
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package cn.ac.iie.test;
|
package cn.ac.iie.test;
|
||||||
|
|
||||||
import cn.ac.iie.common.AddressConfig;
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@@ -35,7 +35,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt {
|
|||||||
// 管理Hbase的配置信息
|
// 管理Hbase的配置信息
|
||||||
Configuration configuration = HBaseConfiguration.create();
|
Configuration configuration = HBaseConfiguration.create();
|
||||||
// 设置zookeeper节点
|
// 设置zookeeper节点
|
||||||
configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
|
configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
|
||||||
try {
|
try {
|
||||||
connection = ConnectionFactory.createConnection(configuration);
|
connection = ConnectionFactory.createConnection(configuration);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -76,7 +76,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt {
|
|||||||
*/
|
*/
|
||||||
private static void getAll() {
|
private static void getAll() {
|
||||||
try {
|
try {
|
||||||
Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
|
Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
|
||||||
Scan scan2 = new Scan();
|
Scan scan2 = new Scan();
|
||||||
ResultScanner scanner = table.getScanner(scan2);
|
ResultScanner scanner = table.getScanner(scan2);
|
||||||
for (Result result : scanner) {
|
for (Result result : scanner) {
|
||||||
@@ -101,7 +101,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt {
|
|||||||
private static void dataValidation(String ip, String account, BasicOutputCollector collector) {
|
private static void dataValidation(String ip, String account, BasicOutputCollector collector) {
|
||||||
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
|
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
|
||||||
// String s = ip.split("\\.")[0];
|
// String s = ip.split("\\.")[0];
|
||||||
// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
|
// if (!SubscriberConfig.CHECK_IP_SCOPE.contains(s)) {
|
||||||
if (subIdMap.containsKey(ip)) {
|
if (subIdMap.containsKey(ip)) {
|
||||||
if (!subIdMap.get(ip).equals(account)) {
|
if (!subIdMap.get(ip).equals(account)) {
|
||||||
subIdMap.put(ip, account);
|
subIdMap.put(ip, account);
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package cn.ac.iie.test;
|
package cn.ac.iie.test;
|
||||||
|
|
||||||
import cn.ac.iie.common.AddressConfig;
|
import cn.ac.iie.common.SubscriberConfig;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
|
|
||||||
public class subTest {
|
public class subTest {
|
||||||
@@ -24,7 +24,7 @@ public class subTest {
|
|||||||
private static boolean dataValidation(String ip, String account) {
|
private static boolean dataValidation(String ip, String account) {
|
||||||
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
|
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
|
||||||
String s = ip.split("\\.")[0];
|
String s = ip.split("\\.")[0];
|
||||||
return !AddressConfig.CHECK_IP_SCOPE.contains(s);
|
return !SubscriberConfig.CHECK_IP_SCOPE.contains(s);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user