更新20.11-rc1

This commit is contained in:
qidaijie
2020-10-28 17:16:37 +08:00
parent 24ff96be4c
commit 10bf6b71e9
9 changed files with 534 additions and 167 deletions

View File

@@ -2,6 +2,8 @@ package com.zdjizhi.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils;
import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil;
@@ -133,18 +135,27 @@ public class FlumeDynamicApp implements Interceptor {
Object name = JsonParseUtil.getValue(object, strings[0]);
String appendToKeyName = strings[1];
String functionName = strings[2];
Object param = null;
if (strings[3] != null) {
param = JsonParseUtil.getValue(object, strings[3]);
}
// Object param = null;
// if (strings[3] != null) {
// param = JsonParseUtil.getValue(object, strings[3]);
// }
String param = strings[3];
switch (functionName) {
case "current_timestamp":
// if (Long.parseLong(JsonParseUtil.getValue(object, appendToKeyName)) == 0L) {
JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
// }
break;
case "snowflake_id":
JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
break;
case "flattenSpec":
if (name != null && StringUtil.isNotBlank(param)) {
JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
}
break;
case "geo_ip_detail":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
@@ -155,6 +166,11 @@ public class FlumeDynamicApp implements Interceptor {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, name.toString());
@@ -172,11 +188,7 @@ public class FlumeDynamicApp implements Interceptor {
break;
case "decode_of_base64":
if (name != null) {
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), param.toString()));
} else {
JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), FlowWriteConfig.MAIL_DEFAULT_CHARSET));
}
JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
}
break;
case "sub_domain":
@@ -208,14 +220,20 @@ public class FlumeDynamicApp implements Interceptor {
* @param url
* @return 顶级域名
*/
private String replaceGetTopDomain(String url) {
return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
private static String replaceGetTopDomain(String url) {
// return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
try {
return FormatUtils.getTopPrivateDomain(url);
} catch (StringIndexOutOfBoundsException outException) {
logger.error("解析顶级域名异常,异常域名:" + url, outException);
return "";
}
}
/**
* 生成当前时间戳的操作
*/
private int getCurrentTime() {
private static int getCurrentTime() {
return (int) (System.currentTimeMillis() / 1000);
}
@@ -225,7 +243,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
private String getGeoIpDetail(String ip) {
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
@@ -235,7 +253,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
private String getGeoAsn(String ip) {
private static String getGeoAsn(String ip) {
// return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy
return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy
}
@@ -246,7 +264,7 @@ public class FlumeDynamicApp implements Interceptor {
* @param ip
* @return
*/
private String getGeoIpCountry(String ip) {
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
@@ -260,6 +278,94 @@ public class FlumeDynamicApp implements Interceptor {
return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable);
}
/**
* 根据编码解码base64
*
* @param message base64
* @param charset 编码
* @return 解码字符串
*/
private static String decodeBase64(String message, Object charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (charset != null) {
result = Encodes.decodeBase64String(message, charset.toString());
} else {
result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
}
}
} catch (Exception e) {
logger.error("解析 Base64 异常,异常信息:" + e);
}
return result;
}
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
private static String flattenSpec(String message, String expr) {
String flattenResult = "";
try {
ArrayList<String> read = JsonPath.parse(message).read(expr);
flattenResult = read.get(0);
} catch (ClassCastException | InvalidPathException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param object 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
private static String isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
Object value = JsonParseUtil.getValue(object, param.substring(2));
if (value != null) {
return value.toString();
} else {
return "";
}
} else {
return param;
}
}
/**
* IF函数实现解析日志构建三目运算
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or ""
*/
private static String condition(Object object, String ifParam) {
String result = "";
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
String direction = isJsonValue(object, norms[0]);
if (StringUtil.isNotBlank(direction)) {
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String resultA = isJsonValue(object, split[1]);
String resultB = isJsonValue(object, split[2]);
result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
}
}
} catch (Exception e) {
logger.error("IF 函数执行异常,异常信息:" + e);
e.printStackTrace();
}
return result;
}
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
private String schemaHttpUrl;
private String uidZookeeperIp;

View File

@@ -7,6 +7,11 @@ import com.zdjizhi.flume.interceptor.utils.system.FlowWriteConfigurations;
* @author Administrator
*/
public class FlowWriteConfig {
public static final String VISIBILITY = "disabled";
public static final int IF_PARAM_LENGTH = 3;
public static final String FORMAT_SPLITTER = ",";
public static final String IS_JSON_KEY_TAG = "$.";
public static final String IF_CONDITION_SPLITTER = "=";
// public static final String SEGMENTATION = ",";

View File

@@ -3,6 +3,8 @@ package com.zdjizhi.flume.interceptor.utils.json;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
@@ -118,17 +120,42 @@ public class JsonParseUtil {
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
String name = JSON.parseObject(field.toString()).get("name").toString();
String type = JSON.parseObject(field.toString()).get("type").toString();
map.put(name, getClassName(type));
String filedStr = field.toString();
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
//组合用来生成实体类的map
map.put(name, getClassName(type));
}
}
return map;
}
public static void main(String[] args) {
System.out.println(getMapFromhttp("http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log").toString());
}
/**
* 判断字段是否需要保留
*
* @param message 单个field-json
* @return true or false
*/
private static boolean checkKeepField(String message) {
boolean isKeepField = true;
boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
if (isHiveDoc) {
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
if (isHiveVi) {
String visibility = JsonPath.read(message, "$.doc.visibility").toString();
if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
isKeepField = false;
}
}
}
return isKeepField;
}
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList)
*

View File

@@ -0,0 +1,317 @@
package com.zdjizhi.flume;
import com.alibaba.fastjson.JSONObject;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.flume.interceptor.FlumeDynamicApp;
import com.zdjizhi.flume.interceptor.common.FlowWriteConfig;
import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils;
import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil;
import com.zdjizhi.utils.Encodes;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
/**
* @author qidaijie
* @Package com.zdjizhi.flume
* @Description:
* @date 2020/9/229:38
*/
public class JsonTest {
private static Logger logger = Logger.getLogger(JsonTest.class);
private static IpLookup ipLookup;
private static FormatUtils formatUtils;
private static String schemaHttpUrl = "http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log";
private static String uidZookeeperIp = "192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181";
private static long dataCenterIdNum = 1L;
private static String ipDatPath = "D:\\workerSpace\\K18-Phase2\\tsgSpace\\dat\\";
private static String hbaseZookeeperIp = "192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181";
private static String hbaseTableName = "subscriber_info";
private static HashMap<String, Class> map;
private static Object mapObject;
private static ArrayList<String[]> jobList;
public static void initialize() {
map = JsonParseUtil.getMapFromhttp(schemaHttpUrl);
mapObject = JsonParseUtil.generateObject(map);
jobList = JsonParseUtil.getJobListFromHttp(schemaHttpUrl);
//载入工具类
formatUtils = new FormatUtils.Builder(false).build();
//载入定位库
ipLookup = new IpLookup.Builder(false)
/**
* v1.0.2-com.zdjizhi.galaxy
*/
// .loadDataFileV4(ipDatPath + "Kazakhstan.mmdb")
// .loadDataFileV6(ipDatPath + "Kazakhstan.mmdb")
// .loadAsnDataFileV4(ipDatPath + "asn_v4.mmdb")
// .loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb")
/**
* v1.0.3-com.zdjizhi.galaxy
*/
.loadDataFileV4(ipDatPath + "ip_v4.mmdb")
.loadDataFileV6(ipDatPath + "ip_v6.mmdb")
.loadDataFilePrivateV4(ipDatPath + "ip_private_v4.mmdb")
.loadDataFilePrivateV6(ipDatPath + "ip_private_v6.mmdb")
.loadAsnDataFile(ipDatPath + "asn_v4.mmdb")
.loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb")
.build();
}
public static void main(String[] args) {
initialize();
String json = "{\"common_client_ip\":\"192.168.40.2\",\"common_server_ip\":\"192.168.40.1\",\"common_direction\":0,\"mail_attachment_name\":\"dGVzdA==\",\"common_device_tag\":{\"tag_sets\": [{\"tag\": \"data_center\", \"value\": \"北京/朝阳/华严北里/甲22号\"}, {\"tag\": \"isp\", \"value\": \"电信\"}]}}";
System.out.println(dealCommonMessage(json));
}
private static String dealCommonMessage(String message) {
Object object = JSONObject.parseObject(message, mapObject.getClass());
try {
for (String[] strings : jobList) {
Object name = JsonParseUtil.getValue(object, strings[0]);
String appendToKeyName = strings[1];
String functionName = strings[2];
// Object param = null;
// if (strings[3] != null) {
// param = JsonParseUtil.getValue(object, strings[3]);
// }
String param = strings[3];
switch (functionName) {
case "current_timestamp":
// if (Long.parseLong(JsonParseUtil.getValue(object, appendToKeyName)) == 0L) {
JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
// }
break;
case "snowflake_id":
JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum));
break;
case "flattenSpec":
if (name != null && StringUtil.isNotBlank(param)) {
JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
}
break;
case "geo_ip_detail":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString()));
}
break;
case "geo_asn":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString()));
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, name.toString());
}
break;
case "radius_match":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString(), hbaseZookeeperIp, hbaseTableName));
}
break;
case "geo_ip_country":
if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(name.toString()));
}
break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
}
break;
case "sub_domain":
if (name != null) {
Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(FormatUtils.getTopPrivateDomain(name.toString())));
}
}
break;
default:
}
}
return JSONObject.toJSONString(object);
} catch (Exception e) {
logger.error("FlumeDynamicApp dealCommonMessage is error===>{" + e + "}<===");
e.printStackTrace();
// return "";
return message;//返回原数据
}
}
/**
* 有host根据host获取域名,有sni通过sni获取域名
* 这里是直接根据传入的获取域名
*
* @param url
* @return 顶级域名
*/
private static String replaceGetTopDomain(String url) {
// return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", "");
try {
return FormatUtils.getTopPrivateDomain(url);
} catch (StringIndexOutOfBoundsException outException) {
logger.error("解析顶级域名异常,异常域名:" + url, outException);
return "";
}
}
/**
* 生成当前时间戳的操作
*/
private static int getCurrentTime() {
return (int) (System.currentTimeMillis() / 1000);
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return
*/
private static String getGeoAsn(String ip) {
// return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy
return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy
}
/**
* 根据ip获取country信息
*
* @param ip
* @return
*/
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助hbase补齐
*
* @param ip
* @return
*/
private static String radiusMatch(String ip, String hbaseZookeeper, String hbaseTable) {
return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable);
}
/**
* 根据编码解码base64
*
* @param message base64
* @param charset 编码
* @return 解码字符串
*/
private static String decodeBase64(String message, Object charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (charset != null) {
result = Encodes.decodeBase64String(message, charset.toString());
} else {
result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
}
}
} catch (Exception e) {
logger.error("解析 Base64 异常,异常信息:" + e);
}
return result;
}
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
private static String flattenSpec(String message, String expr) {
String flattenResult = "";
try {
ArrayList<String> read = JsonPath.parse(message).read(expr);
flattenResult = read.get(0);
} catch (ClassCastException | InvalidPathException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param object 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
private static String isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
Object value = JsonParseUtil.getValue(object, param.substring(2));
if (value != null) {
return value.toString();
} else {
return "";
}
} else {
return param;
}
}
/**
* IF函数实现解析日志构建三目运算
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or ""
*/
private static String condition(Object object, String ifParam) {
String result = "";
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
String direction = isJsonValue(object, norms[0]);
if (StringUtil.isNotBlank(direction)) {
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String resultA = isJsonValue(object, split[1]);
String resultB = isJsonValue(object, split[2]);
result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
}
}
} catch (Exception e) {
logger.error("IF 函数执行异常,异常信息:" + e);
e.printStackTrace();
}
return result;
}
}