修改版本到最21.04.23,增加app_match函数

This commit is contained in:
qidaijie
2021-04-23 17:25:47 +08:00
parent 24ff96be4c
commit c8f8d3c9cf
12 changed files with 514 additions and 244 deletions

View File

@@ -3,7 +3,7 @@
<parent>
<artifactId>dynamic_complement</artifactId>
<groupId>com.zdjizhi</groupId>
<version>1.0</version>
<version>v3.21.01.18</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>FlumeDynamicInterceptor</artifactId>
@@ -87,6 +87,15 @@
<name>www.ebi.ac.uk</name>
<url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url>
</repository>
<repository>
<releases />
<snapshots>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
@@ -125,6 +134,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>hamcrest-core</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<properties>
<hbase.version>2.2.1</hbase.version>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dynamic_complement</artifactId>
<groupId>com.zdjizhi</groupId>
<version>1.0</version>
<version>v3.21.04.23</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -18,20 +18,12 @@
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
<repository>
<id>ebi</id>
<name>www.ebi.ac.uk</name>
<url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flume.version>1.9.0</flume.version>
<hbase.version>2.2.1</hbase.version>
<!--<hadoop.version>2.8.5</hadoop.version>-->
<!--<hbase.version>1.4.9</hbase.version>-->
<!--<hadoop.version>2.7.1</hadoop.version>-->
<hbase.version>2.2.3</hbase.version>
</properties>
<build>
@@ -41,11 +33,11 @@
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
@@ -70,6 +62,21 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>install</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
@@ -108,13 +115,6 @@
</includes>
<filtering>false</filtering>
</resource>
<!--<resource>-->
<!--<directory>src/main/java</directory>-->
<!--<includes>-->
<!--<include>log4j.properties</include>-->
<!--</includes>-->
<!--<filtering>false</filtering>-->
<!--</resource>-->
</resources>
</build>
@@ -129,7 +129,6 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<!-- <version>1.0.2</version>-->
<version>1.0.3</version>
<exclusions>
<exclusion>
@@ -150,30 +149,16 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
<version>1.2.70</version>
</dependency>
<!--<dependency>-->
<!--<groupId>redis.clients</groupId>-->
<!--<artifactId>jedis</artifactId>-->
<!--<version>2.8.1</version>-->
<!--</dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.zookeeper</groupId>-->
<!-- <artifactId>zookeeper</artifactId>-->
<!-- <version>3.4.9</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <artifactId>log4j-over-slf4j</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cglib</groupId>
@@ -199,94 +184,34 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-common</artifactId>-->
<!--<version>${hadoop.version}</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<artifactId>slf4j-log4j12</artifactId>-->
<!--<groupId>org.slf4j</groupId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<artifactId>log4j-over-slf4j</artifactId>-->
<!--<groupId>org.slf4j</groupId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.1</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-client</artifactId>-->
<!--<version>${hadoop.version}</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<artifactId>slf4j-log4j12</artifactId>-->
<!--<groupId>org.slf4j</groupId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<artifactId>log4j-over-slf4j</artifactId>-->
<!--<groupId>org.slf4j</groupId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-hdfs</artifactId>-->
<!--<version>${hadoop.version}</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<artifactId>slf4j-log4j12</artifactId>-->
<!--<groupId>org.slf4j</groupId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<artifactId>log4j-over-slf4j</artifactId>-->
<!--<groupId>org.slf4j</groupId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.9.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.junit.jupiter</groupId>-->
<!-- <artifactId>junit-jupiter-api</artifactId>-->
<!-- <version>5.3.2</version>-->
<!-- <scope>compile</scope>-->
<!-- </dependency>-->
<!--<dependency>-->
<!--<groupId>com.google.guava</groupId>-->
<!--<artifactId>guava</artifactId>-->
<!--&lt;!&ndash;<version>18.0</version>&ndash;&gt;-->
<!--<version>11.0.2</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.httpcomponents</groupId>-->
<!--<artifactId>httpclient</artifactId>-->
<!--<version>4.5.2</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.httpcomponents</groupId>-->
<!--<artifactId>httpcore</artifactId>-->
<!--<version>4.4.1</version>-->
<!--</dependency>-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>
</dependencies>

View File

@@ -1,8 +1,13 @@
package com.zdjizhi.flume.interceptor;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.app.AppUtils;
import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils;
import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil;
import com.zdjizhi.utils.Encodes;
@@ -20,17 +25,21 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author qidaijie
*/
public class FlumeDynamicApp implements Interceptor {
private static Logger logger = Logger.getLogger(FlumeDynamicApp.class);
private static final Log logger = LogFactory.get();
private static final Pattern PATTERN = Pattern.compile("[0-9]*");
private static IpLookup ipLookup;
private static FormatUtils formatUtils;
private String schemaHttpUrl;
private String appIdHttpUrl;
private String uidZookeeperIp;
private long dataCenterIdNum;
private String ipDatPath;
@@ -50,17 +59,6 @@ public class FlumeDynamicApp implements Interceptor {
formatUtils = new FormatUtils.Builder(false).build();
//载入定位库
ipLookup = new IpLookup.Builder(false)
/**
* v1.0.2-com.zdjizhi.galaxy
*/
// .loadDataFileV4(ipDatPath + "Kazakhstan.mmdb")
// .loadDataFileV6(ipDatPath + "Kazakhstan.mmdb")
// .loadAsnDataFileV4(ipDatPath + "asn_v4.mmdb")
// .loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb")
/**
* v1.0.3-com.zdjizhi.galaxy
*/
.loadDataFileV4(ipDatPath + "ip_v4.mmdb")
.loadDataFileV6(ipDatPath + "ip_v6.mmdb")
.loadDataFilePrivateV4(ipDatPath + "ip_private_v4.mmdb")
@@ -70,8 +68,9 @@ public class FlumeDynamicApp implements Interceptor {
.build();
}
public FlumeDynamicApp(String schemaHttpUrl, String zookeeperIp, long dataCenterIdNum, String ipDatPath, String hbaseZookeeperIp, String hbaseTableName) {
public FlumeDynamicApp(String schemaHttpUrl, String appIdHttpUrl, String zookeeperIp, long dataCenterIdNum, String ipDatPath, String hbaseZookeeperIp, String hbaseTableName) {
this.schemaHttpUrl = schemaHttpUrl;
this.appIdHttpUrl = appIdHttpUrl;
this.uidZookeeperIp = zookeeperIp;
this.dataCenterIdNum = dataCenterIdNum;
this.ipDatPath = ipDatPath;
@@ -133,18 +132,29 @@ public class FlumeDynamicApp implements Interceptor {
Object name = JsonParseUtil.getValue(object, strings[0]);
String appendToKeyName = strings[1];
String functionName = strings[2];
Object param = null;
if (strings[3] != null) {
param = JsonParseUtil.getValue(object, strings[3]);
}
Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
String param = strings[3];
switch (functionName) {
case "current_timestamp":
JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
if ((long) appendTo == 0L) {
JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
break;
case "set_value":
if (name != null && param != null) {
JsonParseUtil.setValue(object, appendToKeyName, setValue(param));
}
break;
case "flattenSpec":
if (name != null && StringUtil.isNotBlank(param)) {
JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
}
break;
case "geo_ip_detail":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
@@ -155,9 +165,14 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, name.toString());
JsonParseUtil.setValue(object, appendToKeyName, name);
}
break;
case "radius_match":
@@ -165,6 +180,11 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString(), hbaseZookeeperIp, hbaseTableName));
}
break;
case "app_match":
if ((int) name != 0 && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, appMatch(appIdHttpUrl, Integer.parseInt(name.toString())));
}
break;
case "geo_ip_country":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(name.toString()));
@@ -172,19 +192,12 @@ public class FlumeDynamicApp implements Interceptor {
break;
case "decode_of_base64":
if (name != null) {
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), param.toString()));
} else {
JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), FlowWriteConfig.MAIL_DEFAULT_CHARSET));
}
JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
}
break;
case "sub_domain":
if (name != null) {
Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(FormatUtils.getTopPrivateDomain(name.toString())));
}
if (appendTo == null && name != null) {
JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(name.toString()));
}
break;
default:
@@ -194,10 +207,9 @@ public class FlumeDynamicApp implements Interceptor {
return JSONObject.toJSONString(object);
} catch (Exception e) {
logger.error("FlumeDynamicApp dealCommonMessage is error===>{" + e + "}<===");
e.printStackTrace();
// return "";
return message;//返回原数据
logger.error("FlumeDynamicApp dealCommonMessage is error===>", e);
//返回原数据
return message;
}
}
@@ -208,15 +220,20 @@ public class FlumeDynamicApp implements Interceptor {
* @param url
* @return 顶级域名
*/
private String replaceGetTopDomain(String url) {
return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
private static String replaceGetTopDomain(String url) {
try {
return FormatUtils.getTopPrivateDomain(url).replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
} catch (StringIndexOutOfBoundsException outException) {
logger.error("解析顶级域名异常,异常域名:" + url, outException);
return "";
}
}
/**
* 生成当前时间戳的操作
*/
private int getCurrentTime() {
return (int) (System.currentTimeMillis() / 1000);
private static long getCurrentTime() {
return (System.currentTimeMillis() / 1000);
}
/**
@@ -225,7 +242,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
private String getGeoIpDetail(String ip) {
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
@@ -235,7 +252,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
private String getGeoAsn(String ip) {
private static String getGeoAsn(String ip) {
// return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy
return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy
}
@@ -246,7 +263,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
private String getGeoIpCountry(String ip) {
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
@@ -260,8 +277,137 @@ public class FlumeDynamicApp implements Interceptor {
return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable);
}
/**
* appId与缓存中对应关系补全appName
*
* @param appId id
* @return appName
*/
private static String appMatch(String appIdHttpUrl, int appId) {
String appName = AppUtils.getAppName(appIdHttpUrl, appId);
if (StringUtil.isBlank(appName)) {
logger.warn("AppMap get appName is null, ID is :{}", appId);
}
return appName;
}
/**
* 根据编码解码base64
*
* @param message base64
* @param charset 编码
* @return 解码字符串
*/
private static String decodeBase64(String message, Object charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (charset != null) {
result = Encodes.decodeBase64String(message, charset.toString());
} else {
result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
}
}
} catch (Exception e) {
logger.error("解析 Base64 异常,异常信息:" + e);
}
return result;
}
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
private static String flattenSpec(String message, String expr) {
String flattenResult = "";
try {
ArrayList<String> read = JsonPath.parse(message).read(expr);
flattenResult = read.get(0);
} catch (ClassCastException | InvalidPathException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param object 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
private static String isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
Object value = JsonParseUtil.getValue(object, param.substring(2));
if (value != null) {
return value.toString();
} else {
return "";
}
} else {
return param;
}
}
/**
* IF函数实现解析日志构建三目运算
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or ""
*/
private static Object condition(Object object, String ifParam) {
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
String direction = isJsonValue(object, norms[0]);
if (StringUtil.isNotBlank(direction)) {
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String resultA = isJsonValue(object, split[1]);
String resultB = isJsonValue(object, split[2]);
String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
Matcher isNum = PATTERN.matcher(result);
if (isNum.matches()) {
return Long.parseLong(result);
} else {
return result;
}
}
}
} catch (Exception e) {
logger.error("IF 函数执行异常,异常信息:" + e);
e.printStackTrace();
}
return null;
}
/**
* 设置固定值函数 若为数字则转为long返回
*
* @param param 默认值
* @return 返回数字或字符串
*/
static Object setValue(String param) {
try {
Matcher isNum = PATTERN.matcher(param);
if (isNum.matches()) {
return Long.parseLong(param);
} else {
return param;
}
} catch (Exception e) {
logger.error("SetValue 函数异常,异常信息:" + e);
e.printStackTrace();
}
return null;
}
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
private String schemaHttpUrl;
private String appIdHttpUrl;
private String uidZookeeperIp;
private long dataCenterIdNum;
private String ipDatPath;
@@ -272,6 +418,7 @@ public class FlumeDynamicApp implements Interceptor {
@Override
public Interceptor build() {
return new FlumeDynamicApp(this.schemaHttpUrl,
this.appIdHttpUrl,
this.uidZookeeperIp, this.dataCenterIdNum,
this.ipDatPath,
this.hbaseZookeeperIp, this.hbaseTableName);
@@ -289,6 +436,16 @@ public class FlumeDynamicApp implements Interceptor {
logger.error("FlumeDynamicApp Get schemaHttpUrl is error : " + e);
}
try {
this.appIdHttpUrl = context.getString("appIdHttpUrl", "");
Preconditions.checkNotNull("".equals(appIdHttpUrl), "appIdHttpUrl must be set!!");
logger.info("FlumeDynamicApp Read appIdHttpUrl from configuration : " + appIdHttpUrl);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("FlumeDynamicApp appIdHttpUrl invalid", e);
} catch (Exception e) {
logger.error("FlumeDynamicApp Get appIdHttpUrl is error : " + e);
}
try {
this.uidZookeeperIp = context.getString("uidZookeeperIp", "");
Preconditions.checkNotNull("".equals(uidZookeeperIp), "uidZookeeperIp must be set!!");

View File

@@ -7,6 +7,11 @@ import com.zdjizhi.flume.interceptor.utils.system.FlowWriteConfigurations;
* @author Administrator
*/
public class FlowWriteConfig {
public static final String VISIBILITY = "disabled";
public static final int IF_PARAM_LENGTH = 3;
public static final String FORMAT_SPLITTER = ",";
public static final String IS_JSON_KEY_TAG = "$.";
public static final String IF_CONDITION_SPLITTER = "=";
// public static final String SEGMENTATION = ",";

View File

@@ -0,0 +1,116 @@
package com.zdjizhi.flume.interceptor.utils.app;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil;
import com.zdjizhi.utils.StringUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* AppId 工具类
*
* @author qidaijie
*/
public class AppUtils {
private static final Log logger = LogFactory.get();
private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
private static AppUtils appUtils;
private static String appIdHttpUrl;
private static void getAppInstance(String url) {
appUtils = new AppUtils(url);
}
/**
* 构造函数-新
*/
private AppUtils(String url) {
appIdHttpUrl = url;
timestampsFilter();
//定时更新
updateAppIdCache();
}
/**
* 更新变量
*/
private static void change() {
timestampsFilter();
}
/**
* 获取变更内容
*/
private static void timestampsFilter() {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(appIdHttpUrl);
if (StringUtil.isNotBlank(schema)) {
String data = JSONObject.parseObject(schema).getString("data");
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
int key = jsonArray.getInteger(0);
String value = jsonArray.getString(1);
if (appIdMap.containsKey(key)) {
if (!value.equals(appIdMap.get(key))) {
appIdMap.put(key, value);
}
} else {
appIdMap.put(key, value);
}
}
logger.warn("Updating the correspondence takes time:" + (System.currentTimeMillis() - begin));
logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
}
} catch (RuntimeException e) {
logger.error("Update cache app-id failed, exception" + e);
}
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateAppIdCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
timestampsFilter();
} catch (RuntimeException e) {
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
}
}
}, 1, 300, TimeUnit.SECONDS);
}
/**
* 获取 appName
*
* @param appId app_id
* @return account
*/
public static String getAppName(String url, int appId) {
if (appUtils == null) {
getAppInstance(url);
}
return appIdMap.get(appId);
}
}

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.flume.interceptor.utils.http;
import com.alibaba.fastjson.JSON;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -13,50 +14,62 @@ import java.io.InputStreamReader;
/**
* 获取网关schema工具类
* @author qidaijie
*/
public class HttpClientUtil {
public static String requestByGetMethod(String s) {
private static final Log logger = LogFactory.get();
/**
* 请求网关获取schema
*
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder = null;
StringBuilder entityStringBuilder;
HttpGet get = new HttpGet(http);
BufferedReader bufferedReader = null;
CloseableHttpResponse httpResponse = null;
try {
HttpGet get = new HttpGet(s);
CloseableHttpResponse httpResponse = null;
httpResponse = httpClient.execute(get);
try {
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
int intC;
while ((intC = bufferedReader.read()) != -1) {
char c = (char) intC;
if (c == '\n') {
break;
}
entityStringBuilder.append(c);
}
} finally {
httpResponse.close();
return entityStringBuilder.toString();
}
} catch (Exception e) {
e.printStackTrace();
} catch (IOException e) {
logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
} finally {
try {
if (httpClient != null) {
if (httpClient != null) {
try {
httpClient.close();
} catch (IOException e) {
logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
}
} catch (IOException e) {
e.printStackTrace();
}
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
}
}
if (bufferedReader != null) {
org.apache.commons.io.IOUtils.closeQuietly(bufferedReader);
}
}
return entityStringBuilder.toString();
return "";
}
// public static void main(String[] args) {
//// String s = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log");
//// System.out.println(s);
//// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log");
//// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.151:9999/metadata/schema/v1/fields/security_event_log");
// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/connection_record_log");
// String data = JSON.parseObject(schemaHttpRes).get("data").toString();
// System.out.println(data);
// }
}

View File

@@ -3,6 +3,8 @@ package com.zdjizhi.flume.interceptor.utils.json;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
@@ -118,17 +120,39 @@ public class JsonParseUtil {
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
String name = JSON.parseObject(field.toString()).get("name").toString();
String type = JSON.parseObject(field.toString()).get("type").toString();
map.put(name, getClassName(type));
String filedStr = field.toString();
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
//组合用来生成实体类的map
map.put(name, getClassName(type));
}
}
return map;
}
/**
* 判断字段是否需要保留
*
* @param message 单个field-json
* @return true or false
*/
private static boolean checkKeepField(String message) {
boolean isKeepField = true;
boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
if (isHiveDoc) {
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
if (isHiveVi) {
String visibility = JsonPath.read(message, "$.doc.visibility").toString();
if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
isKeepField = false;
}
}
}
return isKeepField;
}
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList)
*

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>dynamic_complement</artifactId>
<groupId>com.zdjizhi</groupId>
<version>1.0</version>
<version>v3.21.04.23</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -17,12 +17,13 @@
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flume.version>1.9.0</flume.version>
<hbase.version>2.2.1</hbase.version>
<hbase.version>2.2.3</hbase.version>
</properties>
<build>
@@ -32,11 +33,11 @@
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
@@ -61,6 +62,21 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>install</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
@@ -114,7 +130,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
<version>1.2.70</version>
</dependency>
<dependency>
@@ -126,7 +142,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>

View File

@@ -25,6 +25,7 @@ public class FlumeOnOffApp implements Interceptor {
@Override
public void initialize() {
}
@Override

View File

@@ -18,15 +18,16 @@ public class OnOffConfig {
*/
public static final int STOP_BILLING = 2;
/**
* 计费请求报文类型
*/
public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
/**
* 报文类型
*/
public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
/**
* 计费请求报文类型
*/
public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
/**
* 发送计费请求报文时间戳
*/

View File

@@ -5,25 +5,16 @@
<parent>
<artifactId>dynamic_complement</artifactId>
<groupId>com.zdjizhi</groupId>
<version>1.0</version>
<version>v3.21.04.23</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>FlumeSubscriberInterceptor</artifactId>
<repositories>
<repository>
<id>ebi</id>
<name>www.ebi.ac.uk</name>
<url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flume.version>1.9.0</flume.version>
<hbase.version>2.2.1</hbase.version>
<hbase.version>2.2.3</hbase.version>
</properties>
<build>
@@ -33,11 +24,11 @@
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
@@ -62,6 +53,21 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>install</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
@@ -115,7 +121,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
<version>1.2.70</version>
</dependency>
<dependency>
@@ -141,21 +147,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>dynamic_complement</artifactId>
<packaging>pom</packaging>
<version>1.0</version>
<version>v3.21.04.23</version>
<modules>
<module>FlumeDynamicInterceptor</module>
<module>FlumeSubscriberInterceptor</module>