diff --git a/FlumeDynamicInterceptor/dependency-reduced-pom.xml b/FlumeDynamicInterceptor/dependency-reduced-pom.xml
deleted file mode 100644
index ff88a66..0000000
--- a/FlumeDynamicInterceptor/dependency-reduced-pom.xml
+++ /dev/null
@@ -1,135 +0,0 @@
-
-
-
- dynamic_complement
- com.zdjizhi
- 1.0
-
- 4.0.0
- FlumeDynamicInterceptor
-
-
-
- properties
-
- **/*.properties
- **/*.xml
-
-
-
-
-
- maven-shade-plugin
- 2.4.1
-
-
- package
-
- shade
-
-
-
-
-
- com.zdjizhi.flume.interceptor.FlumeDynamicApp
-
-
- META-INF/spring.handlers
-
-
- META-INF/spring.schemas
-
-
-
-
-
-
- true
-
-
-
- org.codehaus.mojo
- exec-maven-plugin
- 1.2.1
-
-
-
- exec
-
-
-
-
- java
- true
- false
- compile
- com.zdjizhi.flume.interceptor.FlumeDynamicApp
-
-
-
- maven-compiler-plugin
- 2.3.2
-
- 1.8
- 1.8
-
-
-
-
-
-
- nexus
- Team Nexus Repository
- http://192.168.40.125:8099/content/groups/public
-
-
- ebi
- www.ebi.ac.uk
- http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/
-
-
-
-
- org.apache.flume
- flume-ng-core
- 1.9.0
- provided
-
-
- flume-ng-sdk
- org.apache.flume
-
-
- flume-ng-configuration
- org.apache.flume
-
-
- flume-ng-auth
- org.apache.flume
-
-
- avro-ipc
- org.apache.avro
-
-
- jetty-jmx
- org.eclipse.jetty
-
-
- libthrift
- org.apache.thrift
-
-
- mina-core
- org.apache.mina
-
-
-
-
-
- 2.2.1
- 1.9.0
- UTF-8
-
-
-
diff --git a/FlumeDynamicInterceptor/pom.xml b/FlumeDynamicInterceptor/pom.xml
index 5d6bae9..cc26f7d 100644
--- a/FlumeDynamicInterceptor/pom.xml
+++ b/FlumeDynamicInterceptor/pom.xml
@@ -5,7 +5,7 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.20.09.22
4.0.0
@@ -23,6 +23,20 @@
www.ebi.ac.uk
http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+ true
+
+
+ true
+ always
+ fail
+
+
+
@@ -150,7 +164,7 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.70
@@ -214,6 +228,12 @@
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java
index 55423ec..b30faf2 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java
@@ -2,6 +2,8 @@ package com.zdjizhi.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils;
import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil;
@@ -133,18 +135,27 @@ public class FlumeDynamicApp implements Interceptor {
Object name = JsonParseUtil.getValue(object, strings[0]);
String appendToKeyName = strings[1];
String functionName = strings[2];
- Object param = null;
- if (strings[3] != null) {
- param = JsonParseUtil.getValue(object, strings[3]);
- }
+// Object param = null;
+// if (strings[3] != null) {
+// param = JsonParseUtil.getValue(object, strings[3]);
+// }
+
+ String param = strings[3];
switch (functionName) {
case "current_timestamp":
+// if (Long.parseLong(JsonParseUtil.getValue(object, appendToKeyName)) == 0L) {
JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
+// }
break;
case "snowflake_id":
JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
break;
+ case "flattenSpec":
+ if (name != null && StringUtil.isNotBlank(param)) {
+ JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
+ }
+ break;
case "geo_ip_detail":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
@@ -155,6 +166,11 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
}
break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
+ }
+ break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, name.toString());
@@ -172,11 +188,7 @@ public class FlumeDynamicApp implements Interceptor {
break;
case "decode_of_base64":
if (name != null) {
- if (param != null) {
- JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), param.toString()));
- } else {
- JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), FlowWriteConfig.MAIL_DEFAULT_CHARSET));
- }
+ JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
}
break;
case "sub_domain":
@@ -208,14 +220,20 @@ public class FlumeDynamicApp implements Interceptor {
* @param url
* @return 顶级域名
*/
- private String replaceGetTopDomain(String url) {
- return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
+ private static String replaceGetTopDomain(String url) {
+// return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
+ try {
+ return FormatUtils.getTopPrivateDomain(url);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("解析顶级域名异常,异常域名:" + url, outException);
+ return "";
+ }
}
/**
* 生成当前时间戳的操作
*/
- private int getCurrentTime() {
+ private static int getCurrentTime() {
return (int) (System.currentTimeMillis() / 1000);
}
@@ -225,7 +243,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
- private String getGeoIpDetail(String ip) {
+ private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
@@ -235,7 +253,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
- private String getGeoAsn(String ip) {
+ private static String getGeoAsn(String ip) {
// return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy
return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy
}
@@ -246,7 +264,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
- private String getGeoIpCountry(String ip) {
+ private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
@@ -260,6 +278,94 @@ public class FlumeDynamicApp implements Interceptor {
return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable);
}
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ private static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset != null) {
+ result = Encodes.decodeBase64String(message, charset.toString());
+ } else {
+ result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("解析 Base64 异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ private static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ ArrayList read = JsonPath.parse(message).read(expr);
+ flattenResult = read.get(0);
+ } catch (ClassCastException | InvalidPathException e) {
+ logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
+ }
+ return flattenResult;
+ }
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param object 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ private static String isJsonValue(Object object, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ Object value = JsonParseUtil.getValue(object, param.substring(2));
+ if (value != null) {
+ return value.toString();
+ } else {
+ return "";
+ }
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算
+ *
+ * @param object 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or ""
+ */
+ private static String condition(Object object, String ifParam) {
+ String result = "";
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ String direction = isJsonValue(object, norms[0]);
+ if (StringUtil.isNotBlank(direction)) {
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String resultA = isJsonValue(object, split[1]);
+ String resultB = isJsonValue(object, split[2]);
+ result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ e.printStackTrace();
+ }
+ return result;
+ }
+
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
private String schemaHttpUrl;
private String uidZookeeperIp;
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java
index 958197a..c31ea2c 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java
@@ -7,6 +7,11 @@ import com.zdjizhi.flume.interceptor.utils.system.FlowWriteConfigurations;
* @author Administrator
*/
public class FlowWriteConfig {
+ public static final String VISIBILITY = "disabled";
+ public static final int IF_PARAM_LENGTH = 3;
+ public static final String FORMAT_SPLITTER = ",";
+ public static final String IS_JSON_KEY_TAG = "$.";
+ public static final String IF_CONDITION_SPLITTER = "=";
// public static final String SEGMENTATION = ",";
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java
index b7e100b..8ccc1f2 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java
@@ -3,6 +3,8 @@ package com.zdjizhi.flume.interceptor.utils.json;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
@@ -118,17 +120,42 @@ public class JsonParseUtil {
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
- String name = JSON.parseObject(field.toString()).get("name").toString();
- String type = JSON.parseObject(field.toString()).get("type").toString();
- map.put(name, getClassName(type));
-
+ String filedStr = field.toString();
+ if (checkKeepField(filedStr)) {
+ String name = JsonPath.read(filedStr, "$.name").toString();
+ String type = JsonPath.read(filedStr, "$.type").toString();
+ //组合用来生成实体类的map
+ map.put(name, getClassName(type));
+ }
}
-
-
return map;
}
+ public static void main(String[] args) {
+ System.out.println(getMapFromhttp("http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log").toString());
+ }
+ /**
+ * 判断字段是否需要保留
+ *
+ * @param message 单个field-json
+ * @return true or false
+ */
+ private static boolean checkKeepField(String message) {
+ boolean isKeepField = true;
+ boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
+ if (isHiveDoc) {
+ boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
+ if (isHiveVi) {
+ String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
+ isKeepField = false;
+ }
+ }
+ }
+ return isKeepField;
+ }
+
/**
* 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList)
*
diff --git a/FlumeDynamicInterceptor/src/test/java/com/zdjizhi/flume/JsonTest.java b/FlumeDynamicInterceptor/src/test/java/com/zdjizhi/flume/JsonTest.java
new file mode 100644
index 0000000..360b317
--- /dev/null
+++ b/FlumeDynamicInterceptor/src/test/java/com/zdjizhi/flume/JsonTest.java
@@ -0,0 +1,317 @@
+package com.zdjizhi.flume;
+
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.flume.interceptor.FlumeDynamicApp;
+import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
+import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils;
+import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.Encodes;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.IpLookup;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.flume
+ * @Description:
+ * @date 2020/9/229:38
+ */
+public class JsonTest {
+ private static Logger logger = Logger.getLogger(JsonTest.class);
+
+ private static IpLookup ipLookup;
+ private static FormatUtils formatUtils;
+ private static String schemaHttpUrl = "http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log";
+ private static String uidZookeeperIp = "192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181";
+ private static long dataCenterIdNum = 1L;
+ private static String ipDatPath = "D:\\workerSpace\\K18-Phase2\\tsgSpace\\dat\\";
+ private static String hbaseZookeeperIp = "192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181";
+ private static String hbaseTableName = "subscriber_info";
+
+ private static HashMap map;
+ private static Object mapObject;
+ private static ArrayList jobList;
+
+
+ public static void initialize() {
+ map = JsonParseUtil.getMapFromhttp(schemaHttpUrl);
+ mapObject = JsonParseUtil.generateObject(map);
+ jobList = JsonParseUtil.getJobListFromHttp(schemaHttpUrl);
+ //载入工具类
+ formatUtils = new FormatUtils.Builder(false).build();
+ //载入定位库
+ ipLookup = new IpLookup.Builder(false)
+ /**
+ * v1.0.2-com.zdjizhi.galaxy
+ */
+// .loadDataFileV4(ipDatPath + "Kazakhstan.mmdb")
+// .loadDataFileV6(ipDatPath + "Kazakhstan.mmdb")
+// .loadAsnDataFileV4(ipDatPath + "asn_v4.mmdb")
+// .loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb")
+
+ /**
+ * v1.0.3-com.zdjizhi.galaxy
+ */
+ .loadDataFileV4(ipDatPath + "ip_v4.mmdb")
+ .loadDataFileV6(ipDatPath + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(ipDatPath + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(ipDatPath + "ip_private_v6.mmdb")
+ .loadAsnDataFile(ipDatPath + "asn_v4.mmdb")
+ .loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb")
+ .build();
+ }
+
+
+
+ public static void main(String[] args) {
+ initialize();
+ String json = "{\"common_client_ip\":\"192.168.40.2\",\"common_server_ip\":\"192.168.40.1\",\"common_direction\":0,\"mail_attachment_name\":\"dGVzdA==\",\"common_device_tag\":{\"tag_sets\": [{\"tag\": \"data_center\", \"value\": \"北京/朝阳/华严北里/甲22号\"}, {\"tag\": \"isp\", \"value\": \"电信\"}]}}";
+ System.out.println(dealCommonMessage(json));
+ }
+
+ private static String dealCommonMessage(String message) {
+ Object object = JSONObject.parseObject(message, mapObject.getClass());
+
+ try {
+ for (String[] strings : jobList) {
+ Object name = JsonParseUtil.getValue(object, strings[0]);
+ String appendToKeyName = strings[1];
+ String functionName = strings[2];
+// Object param = null;
+// if (strings[3] != null) {
+// param = JsonParseUtil.getValue(object, strings[3]);
+// }
+
+ String param = strings[3];
+
+ switch (functionName) {
+ case "current_timestamp":
+// if (Long.parseLong(JsonParseUtil.getValue(object, appendToKeyName)) == 0L) {
+ JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
+// }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
+ break;
+ case "flattenSpec":
+ if (name != null && StringUtil.isNotBlank(param)) {
+ JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
+ }
+ break;
+ case "geo_ip_detail":
+ if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
+ }
+ break;
+ case "get_value":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, name.toString());
+ }
+ break;
+ case "radius_match":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString(), hbaseZookeeperIp, hbaseTableName));
+ }
+ break;
+ case "geo_ip_country":
+ if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(name.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
+ }
+ break;
+ case "sub_domain":
+ if (name != null) {
+ Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
+ if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
+ JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(FormatUtils.getTopPrivateDomain(name.toString())));
+ }
+ }
+ break;
+ default:
+ }
+ }
+
+ return JSONObject.toJSONString(object);
+
+ } catch (Exception e) {
+ logger.error("FlumeDynamicApp dealCommonMessage is error===>{" + e + "}<===");
+ e.printStackTrace();
+// return "";
+ return message;//返回原数据
+ }
+ }
+
+ /**
+ * 有host根据host获取域名,有sni通过sni获取域名
+ * 这里是直接根据传入的获取域名
+ *
+ * @param url
+ * @return 顶级域名
+ */
+ private static String replaceGetTopDomain(String url) {
+// return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
+ try {
+ return FormatUtils.getTopPrivateDomain(url);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("解析顶级域名异常,异常域名:" + url, outException);
+ return "";
+ }
+ }
+
+ /**
+ * 生成当前时间戳的操作
+ */
+ private static int getCurrentTime() {
+ return (int) (System.currentTimeMillis() / 1000);
+ }
+
+ /**
+ * 根据clientIp获取location信息
+ *
+ * @param ip
+ * @return
+ */
+ private static String getGeoIpDetail(String ip) {
+ return ipLookup.cityLookupDetail(ip);
+ }
+
+ /**
+ * 根据ip获取asn信息
+ *
+ * @param ip
+ * @return
+ */
+ private static String getGeoAsn(String ip) {
+// return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy
+ return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy
+ }
+
+ /**
+ * 根据ip获取country信息
+ *
+ * @param ip
+ * @return
+ */
+ private static String getGeoIpCountry(String ip) {
+ return ipLookup.countryLookup(ip);
+ }
+
+ /**
+ * radius借助hbase补齐
+ *
+ * @param ip
+ * @return
+ */
+ private static String radiusMatch(String ip, String hbaseZookeeper, String hbaseTable) {
+ return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable);
+ }
+
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ private static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset != null) {
+ result = Encodes.decodeBase64String(message, charset.toString());
+ } else {
+ result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("解析 Base64 异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ private static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ ArrayList read = JsonPath.parse(message).read(expr);
+ flattenResult = read.get(0);
+ } catch (ClassCastException | InvalidPathException e) {
+ logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
+ }
+ return flattenResult;
+ }
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param object 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ private static String isJsonValue(Object object, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ Object value = JsonParseUtil.getValue(object, param.substring(2));
+ if (value != null) {
+ return value.toString();
+ } else {
+ return "";
+ }
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算
+ *
+ * @param object 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or ""
+ */
+ private static String condition(Object object, String ifParam) {
+ String result = "";
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ String direction = isJsonValue(object, norms[0]);
+ if (StringUtil.isNotBlank(direction)) {
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String resultA = isJsonValue(object, split[1]);
+ String resultB = isJsonValue(object, split[2]);
+ result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+}
diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml
index 4517ae3..f3f2523 100644
--- a/FlumeRadiusOnOffInterceptor/pom.xml
+++ b/FlumeRadiusOnOffInterceptor/pom.xml
@@ -5,7 +5,7 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.20.09.22
4.0.0
@@ -17,6 +17,19 @@
Team Nexus Repository
http://192.168.40.125:8099/content/groups/public
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+ true
+
+
+ true
+ always
+ fail
+
+
@@ -114,7 +127,7 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.70
@@ -126,7 +139,7 @@
com.zdjizhi
galaxy
- 1.0.2
+ 1.0.3
slf4j-log4j12
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
index 7407290..d2ce4a4 100644
--- a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
@@ -18,15 +18,16 @@ public class OnOffConfig {
*/
public static final int STOP_BILLING = 2;
- /**
- * 计费请求报文类型
- */
- public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
/**
* 报文类型
*/
public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
+ /**
+ * 计费请求报文类型
+ */
+ public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
+
/**
* 发送计费请求报文时间戳
*/
diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml
index a359d60..d7f2c77 100644
--- a/FlumeSubscriberInterceptor/pom.xml
+++ b/FlumeSubscriberInterceptor/pom.xml
@@ -5,7 +5,7 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.20.09.22
4.0.0
@@ -18,6 +18,19 @@
www.ebi.ac.uk
http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+ true
+
+
+ true
+ always
+ fail
+
+
@@ -115,7 +128,7 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.70