+ */
+ private String parsingMessage(String message) {
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ //数据需包含 radius_packet_type and radius_acct_status_type 字段
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_PACKET_TYPE) && jsonObject.containsKey(OnOffConfig.RADIUS_ACCT_STATUS_TYPE)) {
+ int packetType = jsonObject.getInteger(OnOffConfig.RADIUS_PACKET_TYPE);
+ int statusType = jsonObject.getInteger(OnOffConfig.RADIUS_ACCT_STATUS_TYPE);
+ //条件radius_packet_type = 4 and radius_acct_status_type = 1 or 2
+// boolean existed = OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType);
+ if (OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType)) {
+
+ Knowledge knowledge = new Knowledge();
+ knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
+ knowledge.setAccount(jsonObject.getString("radius_account"));
+ knowledge.setAcct_status_type(statusType);
+
+ /*
+ *如果存在时间戳则选择此时间戳没有获取当前时间
+ */
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_EVENT_TIMESTAMP)) {
+ knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp"));
+ } else {
+ knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000));
+ }
+
+ /*
+ * 标识同一个连接:
+ * 1.数据若存在acct_multi_session_id属性,取该属性
+ * 2. 不存在取 acct_session_id
+ */
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_MULTI_SESSION_ID)) {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
+ } else {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
+ }
+
+ /*
+ *用户的在线时长,以秒为单位,下线用户无此属性,默认为0
+ */
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_SESSION_TIME)) {
+ knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
+ } else {
+ knowledge.setAcct_session_time(0);
+ }
+
+ return JSONObject.toJSONString(knowledge);
+ }
+ }
+ }
+ return null;
+ }
+
+ public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
+
+ @Override
+ public Interceptor build() {
+ return new FlumeOnOffApp();
+ }
+
+ @Override
+ public void configure(Context context) {
+
+ }
+
+ }
+
+}
+
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java
new file mode 100644
index 0000000..9d054ed
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java
@@ -0,0 +1,61 @@
+package com.zdjizhi.flume.interceptor.bean;
+
+/**
+ * @author qidaijie
+ */
+public class Knowledge {
+ private String framed_ip;
+ private String account;
+ private String acct_session_id;
+ private int acct_status_type;
+ private int acct_session_time;
+ private long event_timestamp;
+
+ public String getFramed_ip() {
+ return framed_ip;
+ }
+
+ public void setFramed_ip(String framed_ip) {
+ this.framed_ip = framed_ip;
+ }
+
+ public String getAccount() {
+ return account;
+ }
+
+ public void setAccount(String account) {
+ this.account = account;
+ }
+
+ public int getAcct_status_type() {
+ return acct_status_type;
+ }
+
+ public void setAcct_status_type(int acct_status_type) {
+ this.acct_status_type = acct_status_type;
+ }
+
+ public long getEvent_timestamp() {
+ return event_timestamp;
+ }
+
+ public void setEvent_timestamp(long event_timestamp) {
+ this.event_timestamp = event_timestamp;
+ }
+
+ public String getAcct_session_id() {
+ return acct_session_id;
+ }
+
+ public void setAcct_session_id(String acct_session_id) {
+ this.acct_session_id = acct_session_id;
+ }
+
+ public int getAcct_session_time() {
+ return acct_session_time;
+ }
+
+ public void setAcct_session_time(int acct_session_time) {
+ this.acct_session_time = acct_session_time;
+ }
+}
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
new file mode 100644
index 0000000..7407290
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.flume.interceptor.common;
+
+
+/**
+ * @author Administrator
+ */
+public class OnOffConfig {
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * 2、停止计费
+ */
+ public static final int STOP_BILLING = 2;
+
+ /**
+ * 计费请求报文类型
+ */
+ public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
+ /**
+ * 报文类型
+ */
+ public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
+
+ /**
+ * 发送计费请求报文时间戳
+ */
+ public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
+
+ /**
+ * 一个用户多个计费ID关联属性
+ */
+ public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
+
+ /**
+ * 用户的在线时长,以秒为单位
+ */
+ public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
+
+ /**
+ * flume使用配置
+ */
+ public static final String HBASE_ZOOKEEPER_SERVERS = OnOffConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String HBASE_TABLE_NAME = OnOffConfigurations.getStringProperty(0, "hbase.table.name");
+}
\ No newline at end of file
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java
new file mode 100644
index 0000000..caf8572
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java
@@ -0,0 +1,55 @@
+package com.zdjizhi.flume.interceptor.common;
+
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class OnOffConfigurations {
+
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(OnOffConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ } catch (Exception e) {
+ propService = null;
+ }
+ }
+}
diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml
new file mode 100644
index 0000000..a359d60
--- /dev/null
+++ b/FlumeSubscriberInterceptor/pom.xml
@@ -0,0 +1,163 @@
+
+
+ */
+ private String dealCommonMessage(String message) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) {
+ if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE)
+ && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) {
+ String framedIp = jsonObject.getString("radius_framed_ip");
+ String account = jsonObject.getString("radius_account");
+ dataValidation(framedIp, account, putList);
+ }
+ if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
+ insertData(putList, hbaseTableName);
+ }
+
+ }
+ return message;
+ }
+
+
+ /**
+ * 获取所有的 key value
+ */
+ private static void getAll(String tableNmae) {
+ try {
+ Table table = connection.getTable(TableName.valueOf("sub:" + tableNmae));
+ Scan scan = new Scan();
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+ }
+ }
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("获取HBase所有row key出现异常");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 写入数据到HBase
+ *
+ * @param putList puts list
+ */
+ private static void insertData(List