diff --git a/FlumeDynamicInterceptor/dependency-reduced-pom.xml b/FlumeDynamicInterceptor/dependency-reduced-pom.xml
index ff88a66..1410d46 100644
--- a/FlumeDynamicInterceptor/dependency-reduced-pom.xml
+++ b/FlumeDynamicInterceptor/dependency-reduced-pom.xml
@@ -3,7 +3,7 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.21.01.18
4.0.0
FlumeDynamicInterceptor
@@ -87,6 +87,15 @@
www.ebi.ac.uk
http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/
+
+
+
+ always
+ fail
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
@@ -125,6 +134,18 @@
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ hamcrest-core
+ org.hamcrest
+
+
+
2.2.1
diff --git a/FlumeDynamicInterceptor/pom.xml b/FlumeDynamicInterceptor/pom.xml
index 5d6bae9..3183510 100644
--- a/FlumeDynamicInterceptor/pom.xml
+++ b/FlumeDynamicInterceptor/pom.xml
@@ -5,7 +5,7 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.21.04.23
4.0.0
@@ -18,20 +18,12 @@
http://192.168.40.125:8099/content/groups/public
-
- ebi
- www.ebi.ac.uk
- http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/
-
UTF-8
1.9.0
- 2.2.1
-
-
-
+ 2.2.3
@@ -41,11 +33,11 @@
maven-shade-plugin
2.4.1
- true
+ false
- package
+ install
shade
@@ -70,6 +62,21 @@
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ install
+
+
+
+
org.codehaus.mojo
exec-maven-plugin
@@ -108,13 +115,6 @@
false
-
-
-
-
-
-
-
@@ -129,7 +129,6 @@
com.zdjizhi
galaxy
-
1.0.3
@@ -150,30 +149,16 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.70
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ junit
+ junit
+ 4.12
+ test
+
cglib
@@ -199,94 +184,34 @@
- org.apache.hbase
- hbase-server
- ${hbase.version}
-
-
- slf4j-log4j12
- org.slf4j
-
-
- log4j-over-slf4j
- org.slf4j
-
-
+ org.apache.httpcomponents
+ httpclient
+ 4.5.2
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ org.apache.httpcomponents
+ httpcore
+ 4.4.1
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ io.prometheus
+ simpleclient_pushgateway
+ 0.9.0
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ cn.hutool
+ hutool-all
+ 5.5.2
+
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java
index 55423ec..e379612 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java
@@ -1,8 +1,13 @@
package com.zdjizhi.flume.interceptor;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
+import com.zdjizhi.flume.interceptor.utils.app.AppUtils;
import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils;
import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil;
import com.zdjizhi.utils.Encodes;
@@ -20,17 +25,21 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* @author qidaijie
*/
public class FlumeDynamicApp implements Interceptor {
- private static Logger logger = Logger.getLogger(FlumeDynamicApp.class);
+ private static final Log logger = LogFactory.get();
+ private static final Pattern PATTERN = Pattern.compile("[0-9]*");
private static IpLookup ipLookup;
private static FormatUtils formatUtils;
private String schemaHttpUrl;
+ private String appIdHttpUrl;
private String uidZookeeperIp;
private long dataCenterIdNum;
private String ipDatPath;
@@ -50,17 +59,6 @@ public class FlumeDynamicApp implements Interceptor {
formatUtils = new FormatUtils.Builder(false).build();
//载入定位库
ipLookup = new IpLookup.Builder(false)
- /**
- * v1.0.2-com.zdjizhi.galaxy
- */
-// .loadDataFileV4(ipDatPath + "Kazakhstan.mmdb")
-// .loadDataFileV6(ipDatPath + "Kazakhstan.mmdb")
-// .loadAsnDataFileV4(ipDatPath + "asn_v4.mmdb")
-// .loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb")
-
- /**
- * v1.0.3-com.zdjizhi.galaxy
- */
.loadDataFileV4(ipDatPath + "ip_v4.mmdb")
.loadDataFileV6(ipDatPath + "ip_v6.mmdb")
.loadDataFilePrivateV4(ipDatPath + "ip_private_v4.mmdb")
@@ -70,8 +68,9 @@ public class FlumeDynamicApp implements Interceptor {
.build();
}
- public FlumeDynamicApp(String schemaHttpUrl, String zookeeperIp, long dataCenterIdNum, String ipDatPath, String hbaseZookeeperIp, String hbaseTableName) {
+ public FlumeDynamicApp(String schemaHttpUrl, String appIdHttpUrl, String zookeeperIp, long dataCenterIdNum, String ipDatPath, String hbaseZookeeperIp, String hbaseTableName) {
this.schemaHttpUrl = schemaHttpUrl;
+ this.appIdHttpUrl = appIdHttpUrl;
this.uidZookeeperIp = zookeeperIp;
this.dataCenterIdNum = dataCenterIdNum;
this.ipDatPath = ipDatPath;
@@ -133,18 +132,29 @@ public class FlumeDynamicApp implements Interceptor {
Object name = JsonParseUtil.getValue(object, strings[0]);
String appendToKeyName = strings[1];
String functionName = strings[2];
- Object param = null;
- if (strings[3] != null) {
- param = JsonParseUtil.getValue(object, strings[3]);
- }
+ Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
+
+ String param = strings[3];
switch (functionName) {
case "current_timestamp":
- JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
+ if ((long) appendTo == 0L) {
+ JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
+ }
break;
case "snowflake_id":
JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
break;
+ case "set_value":
+ if (name != null && param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, setValue(param));
+ }
+ break;
+ case "flattenSpec":
+ if (name != null && StringUtil.isNotBlank(param)) {
+ JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
+ }
+ break;
case "geo_ip_detail":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
@@ -155,9 +165,14 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
}
break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
+ }
+ break;
case "get_value":
if (name != null) {
- JsonParseUtil.setValue(object, appendToKeyName, name.toString());
+ JsonParseUtil.setValue(object, appendToKeyName, name);
}
break;
case "radius_match":
@@ -165,6 +180,11 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString(), hbaseZookeeperIp, hbaseTableName));
}
break;
+ case "app_match":
+ if ((int) name != 0 && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, appMatch(appIdHttpUrl, Integer.parseInt(name.toString())));
+ }
+ break;
case "geo_ip_country":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(name.toString()));
@@ -172,19 +192,12 @@ public class FlumeDynamicApp implements Interceptor {
break;
case "decode_of_base64":
if (name != null) {
- if (param != null) {
- JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), param.toString()));
- } else {
- JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), FlowWriteConfig.MAIL_DEFAULT_CHARSET));
- }
+ JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
}
break;
case "sub_domain":
- if (name != null) {
- Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
- if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
- JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(FormatUtils.getTopPrivateDomain(name.toString())));
- }
+ if (appendTo == null && name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(name.toString()));
}
break;
default:
@@ -194,10 +207,9 @@ public class FlumeDynamicApp implements Interceptor {
return JSONObject.toJSONString(object);
} catch (Exception e) {
- logger.error("FlumeDynamicApp dealCommonMessage is error===>{" + e + "}<===");
- e.printStackTrace();
-// return "";
- return message;//返回原数据
+ logger.error("FlumeDynamicApp dealCommonMessage is error===>", e);
+ //返回原数据
+ return message;
}
}
@@ -208,15 +220,20 @@ public class FlumeDynamicApp implements Interceptor {
* @param url
* @return 顶级域名
*/
- private String replaceGetTopDomain(String url) {
- return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
+ private static String replaceGetTopDomain(String url) {
+ try {
+ return FormatUtils.getTopPrivateDomain(url).replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("解析顶级域名异常,异常域名:" + url, outException);
+ return "";
+ }
}
/**
* 生成当前时间戳的操作
*/
- private int getCurrentTime() {
- return (int) (System.currentTimeMillis() / 1000);
+ private static long getCurrentTime() {
+ return (System.currentTimeMillis() / 1000);
}
/**
@@ -225,7 +242,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
- private String getGeoIpDetail(String ip) {
+ private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
@@ -235,7 +252,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
- private String getGeoAsn(String ip) {
+ private static String getGeoAsn(String ip) {
// return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy
return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy
}
@@ -246,7 +263,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
- private String getGeoIpCountry(String ip) {
+ private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
@@ -260,8 +277,137 @@ public class FlumeDynamicApp implements Interceptor {
return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable);
}
+ /**
+ * appId与缓存中对应关系补全appName
+ *
+ * @param appId id
+ * @return appName
+ */
+ private static String appMatch(String appIdHttpUrl, int appId) {
+ String appName = AppUtils.getAppName(appIdHttpUrl, appId);
+ if (StringUtil.isBlank(appName)) {
+ logger.warn("AppMap get appName is null, ID is :{}", appId);
+ }
+ return appName;
+ }
+
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ private static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset != null) {
+ result = Encodes.decodeBase64String(message, charset.toString());
+ } else {
+ result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("解析 Base64 异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ private static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ ArrayList read = JsonPath.parse(message).read(expr);
+ flattenResult = read.get(0);
+ } catch (ClassCastException | InvalidPathException e) {
+ logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
+ }
+ return flattenResult;
+ }
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param object 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ private static String isJsonValue(Object object, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ Object value = JsonParseUtil.getValue(object, param.substring(2));
+ if (value != null) {
+ return value.toString();
+ } else {
+ return "";
+ }
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算
+ *
+ * @param object 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or ""
+ */
+ private static Object condition(Object object, String ifParam) {
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ String direction = isJsonValue(object, norms[0]);
+ if (StringUtil.isNotBlank(direction)) {
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String resultA = isJsonValue(object, split[1]);
+ String resultB = isJsonValue(object, split[2]);
+ String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ Matcher isNum = PATTERN.matcher(result);
+ if (isNum.matches()) {
+ return Long.parseLong(result);
+ } else {
+ return result;
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * 设置固定值函数 若为数字则转为long返回
+ *
+ * @param param 默认值
+ * @return 返回数字或字符串
+ */
+ static Object setValue(String param) {
+ try {
+ Matcher isNum = PATTERN.matcher(param);
+ if (isNum.matches()) {
+ return Long.parseLong(param);
+ } else {
+ return param;
+ }
+ } catch (Exception e) {
+ logger.error("SetValue 函数异常,异常信息:" + e);
+ e.printStackTrace();
+ }
+ return null;
+ }
+
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
private String schemaHttpUrl;
+ private String appIdHttpUrl;
private String uidZookeeperIp;
private long dataCenterIdNum;
private String ipDatPath;
@@ -272,6 +418,7 @@ public class FlumeDynamicApp implements Interceptor {
@Override
public Interceptor build() {
return new FlumeDynamicApp(this.schemaHttpUrl,
+ this.appIdHttpUrl,
this.uidZookeeperIp, this.dataCenterIdNum,
this.ipDatPath,
this.hbaseZookeeperIp, this.hbaseTableName);
@@ -289,6 +436,16 @@ public class FlumeDynamicApp implements Interceptor {
logger.error("FlumeDynamicApp Get schemaHttpUrl is error : " + e);
}
+ try {
+ this.appIdHttpUrl = context.getString("appIdHttpUrl", "");
+ Preconditions.checkNotNull("".equals(appIdHttpUrl), "appIdHttpUrl must be set!!");
+ logger.info("FlumeDynamicApp Read appIdHttpUrl from configuration : " + appIdHttpUrl);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeDynamicApp appIdHttpUrl invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeDynamicApp Get appIdHttpUrl is error : " + e);
+ }
+
try {
this.uidZookeeperIp = context.getString("uidZookeeperIp", "");
Preconditions.checkNotNull("".equals(uidZookeeperIp), "uidZookeeperIp must be set!!");
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java
index 958197a..c31ea2c 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java
@@ -7,6 +7,11 @@ import com.zdjizhi.flume.interceptor.utils.system.FlowWriteConfigurations;
* @author Administrator
*/
public class FlowWriteConfig {
+ public static final String VISIBILITY = "disabled";
+ public static final int IF_PARAM_LENGTH = 3;
+ public static final String FORMAT_SPLITTER = ",";
+ public static final String IS_JSON_KEY_TAG = "$.";
+ public static final String IF_CONDITION_SPLITTER = "=";
// public static final String SEGMENTATION = ",";
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/app/AppUtils.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/app/AppUtils.java
new file mode 100644
index 0000000..9035cf7
--- /dev/null
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/app/AppUtils.java
@@ -0,0 +1,116 @@
+package com.zdjizhi.flume.interceptor.utils.app;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil;
+import com.zdjizhi.utils.StringUtil;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AppId 工具类
+ *
+ * @author qidaijie
+ */
+
+public class AppUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map appIdMap = new ConcurrentHashMap<>(128);
+ private static AppUtils appUtils;
+ private static String appIdHttpUrl;
+
+ private static void getAppInstance(String url) {
+ appUtils = new AppUtils(url);
+ }
+
+
+ /**
+ * 构造函数-新
+ */
+ private AppUtils(String url) {
+ appIdHttpUrl = url;
+ timestampsFilter();
+ //定时更新
+ updateAppIdCache();
+ }
+
+ /**
+ * 更新变量
+ */
+ private static void change() {
+ timestampsFilter();
+ }
+
+
+ /**
+ * 获取变更内容
+ */
+ private static void timestampsFilter() {
+ try {
+ Long begin = System.currentTimeMillis();
+ String schema = HttpClientUtil.requestByGetMethod(appIdHttpUrl);
+ if (StringUtil.isNotBlank(schema)) {
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ int key = jsonArray.getInteger(0);
+ String value = jsonArray.getString(1);
+ if (appIdMap.containsKey(key)) {
+ if (!value.equals(appIdMap.get(key))) {
+ appIdMap.put(key, value);
+ }
+ } else {
+ appIdMap.put(key, value);
+ }
+ }
+ logger.warn("Updating the correspondence takes time:" + (System.currentTimeMillis() - begin));
+ logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Update cache app-id failed, exception:" + e);
+ }
+ }
+
+
+ /**
+ * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
+ */
+ private void updateAppIdCache() {
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ timestampsFilter();
+ } catch (RuntimeException e) {
+ logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, 300, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 获取 appName
+ *
+ * @param appId app_id
+ * @return account
+ */
+ public static String getAppName(String url, int appId) {
+
+ if (appUtils == null) {
+ getAppInstance(url);
+ }
+
+ return appIdMap.get(appId);
+
+ }
+
+}
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java
index d8d6fd9..f4d0d44 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java
@@ -1,6 +1,7 @@
package com.zdjizhi.flume.interceptor.utils.http;
-import com.alibaba.fastjson.JSON;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -13,50 +14,62 @@ import java.io.InputStreamReader;
/**
* 获取网关schema工具类
+ * @author qidaijie
*/
public class HttpClientUtil {
- public static String requestByGetMethod(String s) {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 请求网关获取schema
+ *
+ * @param http 网关url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
- StringBuilder entityStringBuilder = null;
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
try {
- HttpGet get = new HttpGet(s);
- CloseableHttpResponse httpResponse = null;
httpResponse = httpClient.execute(get);
- try {
- HttpEntity entity = httpResponse.getEntity();
- entityStringBuilder = new StringBuilder();
- if (null != entity) {
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
- String line = null;
- while ((line = bufferedReader.readLine()) != null) {
- entityStringBuilder.append(line);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
}
+ entityStringBuilder.append(c);
}
- } finally {
- httpResponse.close();
+
+ return entityStringBuilder.toString();
}
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
} finally {
- try {
- if (httpClient != null) {
+ if (httpClient != null) {
+ try {
httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
}
- } catch (IOException e) {
- e.printStackTrace();
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ org.apache.commons.io.IOUtils.closeQuietly(bufferedReader);
}
}
- return entityStringBuilder.toString();
+ return "";
}
-
-
-// public static void main(String[] args) {
-//// String s = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log");
-//// System.out.println(s);
-//// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log");
-//// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.151:9999/metadata/schema/v1/fields/security_event_log");
-// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/connection_record_log");
-// String data = JSON.parseObject(schemaHttpRes).get("data").toString();
-// System.out.println(data);
-// }
}
diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java
index b7e100b..3ee9f3b 100644
--- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java
+++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java
@@ -3,6 +3,8 @@ package com.zdjizhi.flume.interceptor.utils.json;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
@@ -118,17 +120,39 @@ public class JsonParseUtil {
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
- String name = JSON.parseObject(field.toString()).get("name").toString();
- String type = JSON.parseObject(field.toString()).get("type").toString();
- map.put(name, getClassName(type));
-
+ String filedStr = field.toString();
+ if (checkKeepField(filedStr)) {
+ String name = JsonPath.read(filedStr, "$.name").toString();
+ String type = JsonPath.read(filedStr, "$.type").toString();
+ //组合用来生成实体类的map
+ map.put(name, getClassName(type));
+ }
}
-
-
return map;
}
+ /**
+ * 判断字段是否需要保留
+ *
+ * @param message 单个field-json
+ * @return true or false
+ */
+ private static boolean checkKeepField(String message) {
+ boolean isKeepField = true;
+ boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
+ if (isHiveDoc) {
+ boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
+ if (isHiveVi) {
+ String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
+ isKeepField = false;
+ }
+ }
+ }
+ return isKeepField;
+ }
+
/**
* 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList)
*
diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml
index 4517ae3..63a761f 100644
--- a/FlumeRadiusOnOffInterceptor/pom.xml
+++ b/FlumeRadiusOnOffInterceptor/pom.xml
@@ -5,7 +5,7 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.21.04.23
4.0.0
@@ -17,12 +17,13 @@
Team Nexus Repository
http://192.168.40.125:8099/content/groups/public
+
UTF-8
1.9.0
- 2.2.1
+ 2.2.3
@@ -32,11 +33,11 @@
maven-shade-plugin
2.4.1
- true
+ false
- package
+ install
shade
@@ -61,6 +62,21 @@
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ install
+
+
+
+
org.codehaus.mojo
exec-maven-plugin
@@ -114,7 +130,7 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.70
@@ -126,7 +142,7 @@
com.zdjizhi
galaxy
- 1.0.2
+ 1.0.3
slf4j-log4j12
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java
index 3eeafc7..13bc36d 100644
--- a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java
@@ -25,6 +25,7 @@ public class FlumeOnOffApp implements Interceptor {
@Override
public void initialize() {
+
}
@Override
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
index 7407290..d2ce4a4 100644
--- a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
@@ -18,15 +18,16 @@ public class OnOffConfig {
*/
public static final int STOP_BILLING = 2;
- /**
- * 计费请求报文类型
- */
- public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
/**
* 报文类型
*/
public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
+ /**
+ * 计费请求报文类型
+ */
+ public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
+
/**
* 发送计费请求报文时间戳
*/
diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml
index a359d60..49d353a 100644
--- a/FlumeSubscriberInterceptor/pom.xml
+++ b/FlumeSubscriberInterceptor/pom.xml
@@ -5,25 +5,16 @@
dynamic_complement
com.zdjizhi
- 1.0
+ v3.21.04.23
4.0.0
FlumeSubscriberInterceptor
-
-
-
- ebi
- www.ebi.ac.uk
- http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/
-
-
-
UTF-8
1.9.0
- 2.2.1
+ 2.2.3
@@ -33,11 +24,11 @@
maven-shade-plugin
2.4.1
- true
+ false
- package
+ install
shade
@@ -62,6 +53,21 @@
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ install
+
+
+
+
org.codehaus.mojo
exec-maven-plugin
@@ -115,7 +121,7 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.70
@@ -141,21 +147,6 @@
-
- org.apache.hbase
- hbase-server
- ${hbase.version}
-
-
- slf4j-log4j12
- org.slf4j
-
-
- log4j-over-slf4j
- org.slf4j
-
-
-
diff --git a/pom.xml b/pom.xml
index 023c16b..951e272 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.zdjizhi
dynamic_complement
pom
- 1.0
+ v3.21.04.23
FlumeDynamicInterceptor
FlumeSubscriberInterceptor