From 3ad9d0cbd4d3605ba0cff67930c31eb9f5317e49 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Fri, 12 Jun 2020 19:48:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E5=90=8D=E5=86=99=E5=85=A5Hbase=E7=A8=8B=E5=BA=8F=E5=92=8C?= =?UTF-8?q?=E4=B8=8A=E4=B8=8B=E7=BA=BF=E6=97=A5=E5=BF=97=E7=A8=8B=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FlumeRadiusOnOffInterceptor/pom.xml | 145 ++++++++++ .../properties/service_flow_config.properties | 16 ++ .../flume/interceptor/FlumeOnOffApp.java | 145 ++++++++++ .../flume/interceptor/bean/Knowledge.java | 61 ++++ .../flume/interceptor/common/OnOffConfig.java | 50 ++++ .../common/OnOffConfigurations.java | 55 ++++ FlumeSubscriberInterceptor/pom.xml | 163 +++++++++++ .../properties/service_flow_config.properties | 16 ++ .../flume/interceptor/FlumeSubscriberApp.java | 261 ++++++++++++++++++ .../interceptor/common/SubscriberConfig.java | 34 +++ .../common/SubscriberConfigurations.java | 55 ++++ pom.xml | 2 + 12 files changed, 1003 insertions(+) create mode 100644 FlumeRadiusOnOffInterceptor/pom.xml create mode 100644 FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties create mode 100644 FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java create mode 100644 FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java create mode 100644 FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java create mode 100644 FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java create mode 100644 FlumeSubscriberInterceptor/pom.xml create mode 100644 FlumeSubscriberInterceptor/properties/service_flow_config.properties create mode 100644 FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java create mode 100644 FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java create mode 100644 FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml new file mode 100644 index 0000000..4517ae3 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/pom.xml @@ -0,0 +1,145 @@ + + + + dynamic_complement + com.zdjizhi + 1.0 + + 4.0.0 + + FlumeRadiusOnOffInterceptor + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + + UTF-8 + 1.9.0 + 2.2.1 + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.1 + + true + + + + package + + shade + + + + + + com.zdjizhi.flume.interceptor.FlumeOnOffApp + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + exec + + + + + java + true + false + compile + com.zdjizhi.flume.interceptor.FlumeOnOffApp + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + **/*.xml + + false + + + + + + + org.apache.flume + flume-ng-core + ${flume.version} + provided + + + + + com.alibaba + fastjson + 1.2.47 + + + + cglib + cglib-nodep + 3.2.4 + + + + com.zdjizhi + galaxy + 1.0.2 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + + \ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties new file mode 100644 index 0000000..bf2e470 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties @@ -0,0 +1,16 @@ +#kafka broker下的topic名称 +#kafka.topic=SESSION-TEST-LOG + +#数据中心(UID) +#data.center.id.num=15 + +#zookeeper.servers=192.168.40.207:2181 + +#用于过滤对准用户名 +#check.ip.scope=10,100,192 + +#hbase-zookeeper地址 +hbase.zookeeper.servers=192.168.40.224:2181 + +#hbase表名 +hbase.table.name=subscriber_info \ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java new file mode 100644 index 0000000..3eeafc7 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java @@ -0,0 +1,145 @@ +package com.zdjizhi.flume.interceptor; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Preconditions; +import com.zdjizhi.flume.interceptor.bean.Knowledge; +import com.zdjizhi.flume.interceptor.common.OnOffConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.*; + + +/** + * @author qidaijie + */ +public class FlumeOnOffApp implements Interceptor { + private static Logger logger = Logger.getLogger(FlumeOnOffApp.class); + + @Override + public void initialize() { + + } + + @Override + public Event intercept(Event event) { + String message = null; + try { + message = new String(event.getBody(), "utf-8"); + } catch (UnsupportedEncodingException e) { + message = new String(event.getBody()); + } + try { + message = parsingMessage(message); + if (StringUtils.isNotBlank(message)) { + event.setBody(message.getBytes()); + return event; + } else { + return null; + } + } catch (Exception e) { + logger.error("FlumeOnOffApp intercept(Event event) method is error===>{" + e + "}<==="); + e.printStackTrace(); + } + return null; + } + + @Override + public List intercept(List list) { + List resultList = new ArrayList(); + for (Event event : list) { + Event r = intercept(event); + if (r != null) { + resultList.add(r); + } + } + return resultList; + } + + @Override + public void close() { + logger.warn("FlumeOnOffApp is closed."); + } + + /** + * 解析日志,并补全 + * 补domain,补subscriber_id + * + * @param message Security原始日志 + * @return 补全后的日志 + *

+ */ + private String parsingMessage(String message) { + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + //数据需包含 radius_packet_type and radius_acct_status_type 字段 + if (jsonObject.containsKey(OnOffConfig.RADIUS_PACKET_TYPE) && jsonObject.containsKey(OnOffConfig.RADIUS_ACCT_STATUS_TYPE)) { + int packetType = jsonObject.getInteger(OnOffConfig.RADIUS_PACKET_TYPE); + int statusType = jsonObject.getInteger(OnOffConfig.RADIUS_ACCT_STATUS_TYPE); + //条件radius_packet_type = 4 and radius_acct_status_type = 1 or 2 +// boolean existed = OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType); + if (OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType)) { + + Knowledge knowledge = new Knowledge(); + knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); + knowledge.setAccount(jsonObject.getString("radius_account")); + knowledge.setAcct_status_type(statusType); + + /* + *如果存在时间戳则选择此时间戳没有获取当前时间 + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp")); + } else { + knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000)); + } + + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2. 不存在取 acct_session_id + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); + } else { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); + } + + /* + *用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_SESSION_TIME)) { + knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); + } else { + knowledge.setAcct_session_time(0); + } + + return JSONObject.toJSONString(knowledge); + } + } + } + return null; + } + + public static class FlumeDynamicAppBuilder implements Interceptor.Builder { + + @Override + public Interceptor build() { + return new FlumeOnOffApp(); + } + + @Override + public void configure(Context context) { + + } + + } + +} + diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java new file mode 100644 index 0000000..9d054ed --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java @@ -0,0 +1,61 @@ +package com.zdjizhi.flume.interceptor.bean; + +/** + * @author qidaijie + */ +public class Knowledge { + private String framed_ip; + private String account; + private String acct_session_id; + private int acct_status_type; + private int acct_session_time; + private long event_timestamp; + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public int getAcct_status_type() { + return acct_status_type; + } + + public void setAcct_status_type(int acct_status_type) { + this.acct_status_type = acct_status_type; + } + + public long getEvent_timestamp() { + return event_timestamp; + } + + public void setEvent_timestamp(long event_timestamp) { + this.event_timestamp = event_timestamp; + } + + public String getAcct_session_id() { + return acct_session_id; + } + + public void setAcct_session_id(String acct_session_id) { + this.acct_session_id = acct_session_id; + } + + public int getAcct_session_time() { + return acct_session_time; + } + + public void setAcct_session_time(int acct_session_time) { + this.acct_session_time = acct_session_time; + } +} diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java new file mode 100644 index 0000000..7407290 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java @@ -0,0 +1,50 @@ +package com.zdjizhi.flume.interceptor.common; + + +/** + * @author Administrator + */ +public class OnOffConfig { + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * 2、停止计费 + */ + public static final int STOP_BILLING = 2; + + /** + * 计费请求报文类型 + */ + public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; + /** + * 报文类型 + */ + public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; + + /** + * 发送计费请求报文时间戳 + */ + public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; + + /** + * 一个用户多个计费ID关联属性 + */ + public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id"; + + /** + * 用户的在线时长,以秒为单位 + */ + public static final String RADIUS_SESSION_TIME = "radius_acct_session_time"; + + /** + * flume使用配置 + */ + public static final String HBASE_ZOOKEEPER_SERVERS = OnOffConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = OnOffConfigurations.getStringProperty(0, "hbase.table.name"); +} \ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java new file mode 100644 index 0000000..caf8572 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java @@ -0,0 +1,55 @@ +package com.zdjizhi.flume.interceptor.common; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class OnOffConfigurations { + + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(OnOffConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { + propService = null; + } + } +} diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml new file mode 100644 index 0000000..a359d60 --- /dev/null +++ b/FlumeSubscriberInterceptor/pom.xml @@ -0,0 +1,163 @@ + + + + dynamic_complement + com.zdjizhi + 1.0 + + 4.0.0 + + FlumeSubscriberInterceptor + + + + + ebi + www.ebi.ac.uk + http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/ + + + + + UTF-8 + 1.9.0 + 2.2.1 + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.1 + + true + + + + package + + shade + + + + + + com.zdjizhi.flume.interceptor.FlumeSubscriberInterceptor + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + exec + + + + + java + true + false + compile + com.zdjizhi.flume.interceptor.FlumeSubscriberApp + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + **/*.xml + + false + + + + + + + org.apache.flume + flume-ng-core + ${flume.version} + provided + + + + + com.alibaba + fastjson + 1.2.47 + + + + cglib + cglib-nodep + 3.2.4 + + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + org.apache.hbase + hbase-server + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + + \ No newline at end of file diff --git a/FlumeSubscriberInterceptor/properties/service_flow_config.properties b/FlumeSubscriberInterceptor/properties/service_flow_config.properties new file mode 100644 index 0000000..bf2e470 --- /dev/null +++ b/FlumeSubscriberInterceptor/properties/service_flow_config.properties @@ -0,0 +1,16 @@ +#kafka broker下的topic名称 +#kafka.topic=SESSION-TEST-LOG + +#数据中心(UID) +#data.center.id.num=15 + +#zookeeper.servers=192.168.40.207:2181 + +#用于过滤对准用户名 +#check.ip.scope=10,100,192 + +#hbase-zookeeper地址 +hbase.zookeeper.servers=192.168.40.224:2181 + +#hbase表名 +hbase.table.name=subscriber_info \ No newline at end of file diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java new file mode 100644 index 0000000..9800a86 --- /dev/null +++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java @@ -0,0 +1,261 @@ +package com.zdjizhi.flume.interceptor; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Preconditions; +import com.zdjizhi.flume.interceptor.common.SubscriberConfig; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.*; + + +/** + * @author qidaijie + */ +public class FlumeSubscriberApp implements Interceptor { + private static Logger logger = Logger.getLogger(FlumeSubscriberApp.class); + + private static Map subIdMap; + private List putList; + private static Connection connection; + private String hbaseZookeeperIp; + private String hbaseTableName; + private int updateHBaseTime; + + public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName, int updateHBaseTime) { + this.hbaseZookeeperIp = hbaseZookeeperIp; + this.hbaseTableName = hbaseTableName; + this.updateHBaseTime = updateHBaseTime; + } + + + @Override + public void initialize() { + subIdMap = new HashMap<>(256); + putList = new ArrayList<>(); + + // 管理HBase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", hbaseZookeeperIp); + configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + logger.error("用户名信息写入HBase程序,连接HBase异常"); + e.printStackTrace(); + } + + getAll(hbaseTableName); + + new Timer().schedule(new TimerTask() { + @Override + public void run() { + try { + insertData(putList, hbaseTableName); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 0, updateHBaseTime * 1000); + } + + + @Override + public Event intercept(Event event) { + String message = null; + try { + message = new String(event.getBody(), "utf-8"); + } catch (UnsupportedEncodingException e) { + message = new String(event.getBody()); + } + try { + if (StringUtils.isNotBlank(message)) { + message = dealCommonMessage(message); + event.setBody(message.getBytes()); + return event; + } + } catch (Exception e) { + logger.error("FlumeSubscriberApp intercept(Event event) method is error===>{" + e + "}<==="); + e.printStackTrace(); + } + return event; + } + + @Override + public List intercept(List list) { + List resultList = new ArrayList(); + for (Event event : list) { + Event r = intercept(event); + if (r != null) { + resultList.add(r); + } + } + return resultList; + } + + @Override + public void close() { + logger.warn("FlumeSubscriberApp is closed."); + } + + /** + * 解析日志,并补全 + * 补domain,补subscriber_id + * + * @param message Security原始日志 + * @return 补全后的日志 + *

+ */ + private String dealCommonMessage(String message) { + JSONObject jsonObject = JSONObject.parseObject(message); + if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) { + if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE) + && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) { + String framedIp = jsonObject.getString("radius_framed_ip"); + String account = jsonObject.getString("radius_account"); + dataValidation(framedIp, account, putList); + } + if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) { + insertData(putList, hbaseTableName); + } + + } + return message; + } + + + /** + * 获取所有的 key value + */ + private static void getAll(String tableNmae) { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + tableNmae)); + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + scanner.close(); + } catch (IOException e) { + logger.error("获取HBase所有row key出现异常"); + e.printStackTrace(); + } + } + + /** + * 写入数据到HBase + * + * @param putList puts list + */ + private static void insertData(List putList, String tableName) { + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + tableName)); + table.put(putList); + putList.clear(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + logger.error("更新数据写入HBase失败"); + e.printStackTrace(); + } + } + + } + + /** + * 验证数据并与内存中的对比 + * + * @param ip framed_ip + * @param account account + */ + private static void dataValidation(String ip, String account, List putList) { + if (subIdMap.containsKey(ip)) { + if (!subIdMap.get(ip).equals(account)) { + Put put = new Put(ip.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } else { + Put put = new Put(ip.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } + + + public static class FlumeDynamicAppBuilder implements Interceptor.Builder { + private String hbaseZookeeperIp; + private String hbaseTableName; + private int updateHBaseTime; + + + @Override + public Interceptor build() { + return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName, updateHBaseTime); + } + + @Override + public void configure(Context context) { + try { + this.hbaseZookeeperIp = context.getString("hbaseZookeeperIp", ""); + Preconditions.checkNotNull("".equals(hbaseZookeeperIp), "hbaseZookeeperIp must be set!!"); + logger.info("FlumeSubscriberApp Read hbaseZookeeperIp from configuration : " + hbaseZookeeperIp); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp hbaseZookeeperIp invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get hbaseZookeeperIp is error : " + e); + } + + try { + this.hbaseTableName = context.getString("hbaseTableName", ""); + Preconditions.checkNotNull("".equals(hbaseTableName), "hbaseTableName must be set!!"); + logger.info("FlumeSubscriberApp Read hbaseTableName from configuration : " + hbaseTableName); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp hbaseTableName invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get hbaseTableName is error : " + e); + } + + try { + this.updateHBaseTime = context.getInteger("updateHBaseTime", 30); + Preconditions.checkNotNull("".equals(updateHBaseTime), "updateHBaseTime must be set!!"); + logger.info("FlumeSubscriberApp Read updateHBaseTime from configuration : " + updateHBaseTime); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp updateHBaseTime invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get updateHBaseTime is error : " + e); + } + + } + + } + +} + diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java new file mode 100644 index 0000000..7bd14d4 --- /dev/null +++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java @@ -0,0 +1,34 @@ +package com.zdjizhi.flume.interceptor.common; + + +/** + * @author Administrator + */ +public class SubscriberConfig { + /** + * 最多存在多少数据即写入hbase + */ + public static final int LIST_SIZE_MAX = 5000; + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * radius_packet_type + */ + public static final String PACKET_TYPE = "radius_packet_type"; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * radius_acct_status_type + */ + public static final String STATUS_TYPE = "radius_acct_status_type"; + + /** + * flume使用配置 + */ + public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name"); +} \ No newline at end of file diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java new file mode 100644 index 0000000..cbae69d --- /dev/null +++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java @@ -0,0 +1,55 @@ +package com.zdjizhi.flume.interceptor.common; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class SubscriberConfigurations { + + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { + propService = null; + } + } +} diff --git a/pom.xml b/pom.xml index 1224ebf..023c16b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,6 +10,8 @@ 1.0 FlumeDynamicInterceptor + FlumeSubscriberInterceptor + FlumeRadiusOnOffInterceptor