增加radius写入hbase 和 radius上线下功能代码
This commit is contained in:
145
dynamic_complement/FlumeRadiusOnOffInterceptor/pom.xml
Normal file
145
dynamic_complement/FlumeRadiusOnOffInterceptor/pom.xml
Normal file
@@ -0,0 +1,145 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>dynamic_complement</artifactId>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>FlumeRadiusOnOffInterceptor</artifactId>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>nexus</id>
|
||||
<name>Team Nexus Repository</name>
|
||||
<url>http://192.168.40.125:8099/content/groups/public</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<flume.version>1.9.0</flume.version>
|
||||
<hbase.version>2.2.1</hbase.version>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.4.1</version>
|
||||
<configuration>
|
||||
<createDependencyReducedPom>true</createDependencyReducedPom>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.handlers</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.schemas</resource>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>exec-maven-plugin</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>exec</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<executable>java</executable>
|
||||
<includeProjectDependencies>true</includeProjectDependencies>
|
||||
<includePluginDependencies>false</includePluginDependencies>
|
||||
<classpathScope>compile</classpathScope>
|
||||
<mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>properties</directory>
|
||||
<includes>
|
||||
<include>**/*.properties</include>
|
||||
<include>**/*.xml</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.flume</groupId>
|
||||
<artifactId>flume-ng-core</artifactId>
|
||||
<version>${flume.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.47</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cglib</groupId>
|
||||
<artifactId>cglib-nodep</artifactId>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>1.0.2</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,16 @@
|
||||
#kafka broker下的topic名称
|
||||
#kafka.topic=SESSION-TEST-LOG
|
||||
|
||||
#数据中心(UID)
|
||||
#data.center.id.num=15
|
||||
|
||||
#zookeeper.servers=192.168.40.207:2181
|
||||
|
||||
#用于过滤对准用户名
|
||||
#check.ip.scope=10,100,192
|
||||
|
||||
#hbase-zookeeper地址
|
||||
hbase.zookeeper.servers=192.168.40.224:2181
|
||||
|
||||
#hbase表名
|
||||
hbase.table.name=subscriber_info
|
||||
@@ -0,0 +1,145 @@
|
||||
package com.zdjizhi.flume.interceptor;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.zdjizhi.flume.interceptor.bean.Knowledge;
|
||||
import com.zdjizhi.flume.interceptor.common.OnOffConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.flume.Context;
|
||||
import org.apache.flume.Event;
|
||||
import org.apache.flume.interceptor.Interceptor;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.*;
|
||||
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class FlumeOnOffApp implements Interceptor {
|
||||
private static Logger logger = Logger.getLogger(FlumeOnOffApp.class);
|
||||
|
||||
@Override
|
||||
public void initialize() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Event intercept(Event event) {
|
||||
String message = null;
|
||||
try {
|
||||
message = new String(event.getBody(), "utf-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
message = new String(event.getBody());
|
||||
}
|
||||
try {
|
||||
message = parsingMessage(message);
|
||||
if (StringUtils.isNotBlank(message)) {
|
||||
event.setBody(message.getBytes());
|
||||
return event;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("FlumeOnOffApp intercept(Event event) method is error===>{" + e + "}<===");
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Event> intercept(List<Event> list) {
|
||||
List resultList = new ArrayList();
|
||||
for (Event event : list) {
|
||||
Event r = intercept(event);
|
||||
if (r != null) {
|
||||
resultList.add(r);
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
logger.warn("FlumeOnOffApp is closed.");
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析日志,并补全
|
||||
* 补domain,补subscriber_id
|
||||
*
|
||||
* @param message Security原始日志
|
||||
* @return 补全后的日志
|
||||
* <p>
|
||||
*/
|
||||
private String parsingMessage(String message) {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
JSONObject jsonObject = JSONObject.parseObject(message);
|
||||
//数据需包含 radius_packet_type and radius_acct_status_type 字段
|
||||
if (jsonObject.containsKey(OnOffConfig.RADIUS_PACKET_TYPE) && jsonObject.containsKey(OnOffConfig.RADIUS_ACCT_STATUS_TYPE)) {
|
||||
int packetType = jsonObject.getInteger(OnOffConfig.RADIUS_PACKET_TYPE);
|
||||
int statusType = jsonObject.getInteger(OnOffConfig.RADIUS_ACCT_STATUS_TYPE);
|
||||
//条件radius_packet_type = 4 and radius_acct_status_type = 1 or 2
|
||||
// boolean existed = OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType);
|
||||
if (OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType)) {
|
||||
|
||||
Knowledge knowledge = new Knowledge();
|
||||
knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
|
||||
knowledge.setAccount(jsonObject.getString("radius_account"));
|
||||
knowledge.setAcct_status_type(statusType);
|
||||
|
||||
/*
|
||||
*如果存在时间戳则选择此时间戳没有获取当前时间
|
||||
*/
|
||||
if (jsonObject.containsKey(OnOffConfig.RADIUS_EVENT_TIMESTAMP)) {
|
||||
knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp"));
|
||||
} else {
|
||||
knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000));
|
||||
}
|
||||
|
||||
/*
|
||||
* 标识同一个连接:
|
||||
* 1.数据若存在acct_multi_session_id属性,取该属性
|
||||
* 2. 不存在取 acct_session_id
|
||||
*/
|
||||
if (jsonObject.containsKey(OnOffConfig.RADIUS_MULTI_SESSION_ID)) {
|
||||
knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
|
||||
} else {
|
||||
knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
|
||||
}
|
||||
|
||||
/*
|
||||
*用户的在线时长,以秒为单位,下线用户无此属性,默认为0
|
||||
*/
|
||||
if (jsonObject.containsKey(OnOffConfig.RADIUS_SESSION_TIME)) {
|
||||
knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
|
||||
} else {
|
||||
knowledge.setAcct_session_time(0);
|
||||
}
|
||||
|
||||
return JSONObject.toJSONString(knowledge);
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
|
||||
|
||||
@Override
|
||||
public Interceptor build() {
|
||||
return new FlumeOnOffApp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Context context) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.zdjizhi.flume.interceptor.bean;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class Knowledge {
|
||||
private String framed_ip;
|
||||
private String account;
|
||||
private String acct_session_id;
|
||||
private int acct_status_type;
|
||||
private int acct_session_time;
|
||||
private long event_timestamp;
|
||||
|
||||
public String getFramed_ip() {
|
||||
return framed_ip;
|
||||
}
|
||||
|
||||
public void setFramed_ip(String framed_ip) {
|
||||
this.framed_ip = framed_ip;
|
||||
}
|
||||
|
||||
public String getAccount() {
|
||||
return account;
|
||||
}
|
||||
|
||||
public void setAccount(String account) {
|
||||
this.account = account;
|
||||
}
|
||||
|
||||
public int getAcct_status_type() {
|
||||
return acct_status_type;
|
||||
}
|
||||
|
||||
public void setAcct_status_type(int acct_status_type) {
|
||||
this.acct_status_type = acct_status_type;
|
||||
}
|
||||
|
||||
public long getEvent_timestamp() {
|
||||
return event_timestamp;
|
||||
}
|
||||
|
||||
public void setEvent_timestamp(long event_timestamp) {
|
||||
this.event_timestamp = event_timestamp;
|
||||
}
|
||||
|
||||
public String getAcct_session_id() {
|
||||
return acct_session_id;
|
||||
}
|
||||
|
||||
public void setAcct_session_id(String acct_session_id) {
|
||||
this.acct_session_id = acct_session_id;
|
||||
}
|
||||
|
||||
public int getAcct_session_time() {
|
||||
return acct_session_time;
|
||||
}
|
||||
|
||||
public void setAcct_session_time(int acct_session_time) {
|
||||
this.acct_session_time = acct_session_time;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.zdjizhi.flume.interceptor.common;
|
||||
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
public class OnOffConfig {
|
||||
/**
|
||||
* 4- Accounting-Request(账户授权)
|
||||
*/
|
||||
public static final int ACCOUNTING_REQUEST = 4;
|
||||
/**
|
||||
* 1、开始计费
|
||||
*/
|
||||
public static final int START_BILLING = 1;
|
||||
/**
|
||||
* 2、停止计费
|
||||
*/
|
||||
public static final int STOP_BILLING = 2;
|
||||
|
||||
/**
|
||||
* 计费请求报文类型
|
||||
*/
|
||||
public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
|
||||
/**
|
||||
* 报文类型
|
||||
*/
|
||||
public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
|
||||
|
||||
/**
|
||||
* 发送计费请求报文时间戳
|
||||
*/
|
||||
public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
|
||||
|
||||
/**
|
||||
* 一个用户多个计费ID关联属性
|
||||
*/
|
||||
public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
|
||||
|
||||
/**
|
||||
* 用户的在线时长,以秒为单位
|
||||
*/
|
||||
public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
|
||||
|
||||
/**
|
||||
* flume使用配置
|
||||
*/
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = OnOffConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
||||
public static final String HBASE_TABLE_NAME = OnOffConfigurations.getStringProperty(0, "hbase.table.name");
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.zdjizhi.flume.interceptor.common;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
|
||||
public final class OnOffConfigurations {
|
||||
|
||||
private static Properties propService = new Properties();
|
||||
|
||||
|
||||
public static String getStringProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return propService.getProperty(key);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Integer getIntProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return Integer.parseInt(propService.getProperty(key));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Long getLongProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return Long.parseLong(propService.getProperty(key));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Boolean getBooleanProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return "true".equals(propService.getProperty(key).toLowerCase().trim());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
try {
|
||||
propService.load(OnOffConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
|
||||
} catch (Exception e) {
|
||||
propService = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user