删除fastjson测试类
This commit is contained in:
@@ -1,153 +0,0 @@
|
||||
package com.zdjizhi.utils.fast;
|
||||
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.serializer.SerializerFeature;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.SnowflakeId;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import com.zdjizhi.utils.json.JsonTypeUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* 描述:转换或补全工具类
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class TransFormFast {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
/**
|
||||
* 获取任务列表
|
||||
* list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
|
||||
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
|
||||
*/
|
||||
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
|
||||
|
||||
/**
|
||||
* 解析日志,并补全
|
||||
*
|
||||
* @param message kafka Topic原始日志
|
||||
* @return 补全后的日志
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String dealCommonMessage(String message) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
JSONObject jsonMap = JSONObject.parseObject(message);
|
||||
JsonParseUtil.dropJsonField(jsonMap);
|
||||
for (String[] strings : jobList) {
|
||||
//用到的参数的值
|
||||
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
|
||||
//需要补全的字段的key
|
||||
String appendToKeyName = strings[1];
|
||||
//需要补全的字段的值
|
||||
Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
|
||||
//匹配操作函数的字段
|
||||
String function = strings[2];
|
||||
//额外的参数的值
|
||||
String param = strings[3];
|
||||
functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
|
||||
}
|
||||
|
||||
return FastJsonTypeUtils.typeTransform(jsonMap);
|
||||
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("Logs TransForm Exception! Error message is:" + e);
|
||||
e.printStackTrace();
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 根据schema描述对应字段进行操作的 函数集合
|
||||
*
|
||||
* @param function 匹配操作函数的字段
|
||||
* @param jsonMap 原始日志解析map
|
||||
* @param appendToKeyName 需要补全的字段的key
|
||||
* @param appendToKeyValue 需要补全的字段的值
|
||||
* @param logValue 用到的参数的值
|
||||
* @param param 额外的参数的值
|
||||
*/
|
||||
private static void functionSet(String function, JSONObject jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
|
||||
switch (function) {
|
||||
case "current_timestamp":
|
||||
if (!(appendToKeyValue instanceof Long)) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getCurrentTime());
|
||||
}
|
||||
break;
|
||||
case "snowflake_id":
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
|
||||
break;
|
||||
case "geo_ip_detail":
|
||||
if (logValue != null && appendToKeyValue == null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpDetail(logValue.toString()));
|
||||
}
|
||||
break;
|
||||
case "geo_asn":
|
||||
if (logValue != null && appendToKeyValue == null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoAsn(logValue.toString()));
|
||||
}
|
||||
break;
|
||||
case "geo_ip_country":
|
||||
if (logValue != null && appendToKeyValue == null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpCountry(logValue.toString()));
|
||||
}
|
||||
break;
|
||||
case "set_value":
|
||||
if (param != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.setValue(param));
|
||||
}
|
||||
break;
|
||||
case "get_value":
|
||||
if (logValue != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
|
||||
}
|
||||
break;
|
||||
case "if":
|
||||
if (param != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.condition(jsonMap, param));
|
||||
}
|
||||
break;
|
||||
case "sub_domain":
|
||||
if (appendToKeyValue == null && logValue != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getTopDomain(logValue.toString()));
|
||||
}
|
||||
break;
|
||||
case "radius_match":
|
||||
if (logValue != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.radiusMatch(logValue.toString()));
|
||||
}
|
||||
break;
|
||||
case "app_match":
|
||||
if (logValue != null && appendToKeyValue == null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.appMatch(logValue.toString()));
|
||||
}
|
||||
break;
|
||||
case "decode_of_base64":
|
||||
if (logValue != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.decodeBase64(logValue.toString(), TransFunctionFast.isJsonValue(jsonMap, param)));
|
||||
}
|
||||
break;
|
||||
case "flattenSpec":
|
||||
if (logValue != null && param != null) {
|
||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.flattenSpec(logValue.toString(), param));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,239 +0,0 @@
|
||||
package com.zdjizhi.utils.fast;
|
||||
|
||||
import cn.hutool.core.codec.Base64;
|
||||
import cn.hutool.core.text.StrSpliter;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.jayway.jsonpath.InvalidPathException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.FormatUtils;
|
||||
import com.zdjizhi.utils.IpLookup;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.app.AppUtils;
|
||||
import com.zdjizhi.utils.hbase.HBaseUtils;
|
||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||
import com.zdjizhi.utils.json.TypeUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
class TransFunctionFast {
|
||||
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
private static final Pattern PATTERN = Pattern.compile("[0-9]*");
|
||||
|
||||
/**
|
||||
* IP定位库工具类
|
||||
*/
|
||||
private static IpLookup ipLookup = new IpLookup.Builder(false)
|
||||
.loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
|
||||
.loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
|
||||
.loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
|
||||
.loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
|
||||
.loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
|
||||
.loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
|
||||
.build();
|
||||
|
||||
/**
|
||||
* 生成当前时间戳的操作
|
||||
*/
|
||||
static long getCurrentTime() {
|
||||
|
||||
return System.currentTimeMillis() / 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据clientIp获取location信息
|
||||
*
|
||||
* @param ip client IP
|
||||
* @return ip地址详细信息
|
||||
*/
|
||||
static String getGeoIpDetail(String ip) {
|
||||
|
||||
return ipLookup.cityLookupDetail(ip);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据ip获取asn信息
|
||||
*
|
||||
* @param ip client/server IP
|
||||
* @return ASN
|
||||
*/
|
||||
static String getGeoAsn(String ip) {
|
||||
|
||||
return ipLookup.asnLookup(ip);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据ip获取country信息
|
||||
*
|
||||
* @param ip server IP
|
||||
* @return 国家
|
||||
*/
|
||||
static String getGeoIpCountry(String ip) {
|
||||
|
||||
return ipLookup.countryLookup(ip);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* radius借助HBase补齐
|
||||
*
|
||||
* @param ip client IP
|
||||
* @return account
|
||||
*/
|
||||
static String radiusMatch(String ip) {
|
||||
return HBaseUtils.getAccount(ip.trim());
|
||||
}
|
||||
|
||||
/**
|
||||
* appId与缓存中对应关系补全appName
|
||||
*
|
||||
* @param appIds app id 列表
|
||||
* @return appName
|
||||
*/
|
||||
static String appMatch(String appIds) {
|
||||
try {
|
||||
String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
|
||||
return AppUtils.getAppName(Integer.parseInt(appId));
|
||||
} catch (NumberFormatException | ClassCastException exception) {
|
||||
logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析顶级域名
|
||||
*
|
||||
* @param domain 初始域名
|
||||
* @return 顶级域名
|
||||
*/
|
||||
static String getTopDomain(String domain) {
|
||||
try {
|
||||
return FormatUtils.getTopPrivateDomain(domain);
|
||||
} catch (StringIndexOutOfBoundsException outException) {
|
||||
logger.error("解析顶级域名异常,异常域名:" + domain);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据编码解码base64
|
||||
*
|
||||
* @param message base64
|
||||
* @param charset 编码
|
||||
* @return 解码字符串
|
||||
*/
|
||||
static String decodeBase64(String message, Object charset) {
|
||||
String result = "";
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
if (charset == null) {
|
||||
result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
|
||||
} else {
|
||||
result = Base64.decodeStr(message, charset.toString());
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException rune) {
|
||||
logger.error("解析 Base64 异常,异常信息:" + rune);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据表达式解析json
|
||||
*
|
||||
* @param message json
|
||||
* @param expr 解析表达式
|
||||
* @return 解析结果
|
||||
*/
|
||||
static String flattenSpec(String message, String expr) {
|
||||
String flattenResult = "";
|
||||
try {
|
||||
if (StringUtil.isNotBlank(expr)) {
|
||||
ArrayList<String> read = JsonPath.parse(message).read(expr);
|
||||
if (read.size() >= 1) {
|
||||
flattenResult = read.get(0);
|
||||
}
|
||||
}
|
||||
} catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
|
||||
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
|
||||
}
|
||||
return flattenResult;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 判断是否为日志字段,是则返回对应value,否则返回原始字符串
|
||||
*
|
||||
* @param jsonMap 内存实体类
|
||||
* @param param 字段名/普通字符串
|
||||
* @return JSON.Value or String
|
||||
*/
|
||||
static Object isJsonValue(JSONObject jsonMap, String param) {
|
||||
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
|
||||
return JsonParseUtil.getValue(jsonMap, param.substring(2));
|
||||
} else {
|
||||
return param;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
|
||||
*
|
||||
* @param jsonMap 内存实体类
|
||||
* @param ifParam 字段名/普通字符串
|
||||
* @return resultA or resultB or null
|
||||
*/
|
||||
static Object condition(JSONObject jsonMap, String ifParam) {
|
||||
Object result = null;
|
||||
try {
|
||||
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
|
||||
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
|
||||
Object direction = isJsonValue(jsonMap, norms[0]);
|
||||
Object resultA = isJsonValue(jsonMap, split[1]);
|
||||
Object resultB = isJsonValue(jsonMap, split[2]);
|
||||
if (direction instanceof Number) {
|
||||
result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
|
||||
} else if (direction instanceof String) {
|
||||
result = direction.equals(norms[1]) ? resultA : resultB;
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("IF 函数执行异常,异常信息:" + e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 设置固定值函数 若为数字则转为long返回
|
||||
*
|
||||
* @param param 默认值
|
||||
* @return 返回数字或字符串
|
||||
*/
|
||||
static Object setValue(String param) {
|
||||
try {
|
||||
Matcher isNum = PATTERN.matcher(param);
|
||||
if (isNum.matches()) {
|
||||
return Long.parseLong(param);
|
||||
} else {
|
||||
return param;
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("SetValue 函数异常,异常信息:" + e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
import com.zdjizhi.utils.fast.TransFormFast;
|
||||
import com.zdjizhi.utils.general.TransFormTypeMap;
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
|
||||
@@ -18,6 +17,5 @@ public class TypeMapCompletedFunction implements MapFunction<String, String> {
|
||||
public String map(String logs) {
|
||||
|
||||
return TransFormTypeMap.dealCommonMessage(logs);
|
||||
// return TransFormFast.dealCommonMessage(logs);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user