From 1b75e5b1c0abe217c19259fd493b90588df69158 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Thu, 11 Nov 2021 09:31:14 +0300 Subject: [PATCH] =?UTF-8?q?toJSONString=E6=9B=BF=E6=8D=A2=E4=B8=BAfastjson?= =?UTF-8?q?=E5=B7=A5=E5=85=B7=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zdjizhi/utils/fast/TransFormFast.java | 155 ++++++++++++ .../zdjizhi/utils/fast/TransFunctionFast.java | 239 ++++++++++++++++++ .../functions/ObjectCompletedFunction.java | 20 ++ .../functions/TypeMapCompletedFunction.java | 21 ++ .../com/zdjizhi/utils/json/JsonParseUtil.java | 45 ++++ 5 files changed, 480 insertions(+) create mode 100644 src/main/java/com/zdjizhi/utils/fast/TransFormFast.java create mode 100644 src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java diff --git a/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java b/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java new file mode 100644 index 0000000..c6ff46f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java @@ -0,0 +1,155 @@ +package com.zdjizhi.utils.fast; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.SnowflakeId; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.JsonTypeUtils; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormFast { + private static final Log logger = LogFactory.get(); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + JSONObject jsonMap = JSONObject.parseObject(message); + JsonParseUtil.dropJsonField(jsonMap); + for (String[] strings : jobList) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); + } + + return JSONObject.toJSONString(jsonMap, + SerializerFeature.DisableCircularReferenceDetect + , SerializerFeature.WriteNullStringAsEmpty + , SerializerFeature.WriteNullNumberAsZero); + + } else { + return null; + } + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return null; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendToKeyValue 需要补全的字段的值 + * @param logValue 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, JSONObject jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendToKeyValue instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.setValue(param)); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendToKeyValue == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.radiusMatch(logValue.toString())); + } + break; + case "app_match": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.appMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.decodeBase64(logValue.toString(), TransFunctionFast.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.flattenSpec(logValue.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java b/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java new file mode 100644 index 0000000..eeb2aaa --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java @@ -0,0 +1,239 @@ +package com.zdjizhi.utils.fast; + +import cn.hutool.core.codec.Base64; +import cn.hutool.core.text.StrSpliter; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.FormatUtils; +import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.app.AppUtils; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.TypeUtils; + +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author qidaijie + */ +class TransFunctionFast { + + private static final Log logger = LogFactory.get(); + + private static final Pattern PATTERN = Pattern.compile("[0-9]*"); + + /** + * IP定位库工具类 + */ + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb") + .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb") + .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") + .build(); + + /** + * 生成当前时间戳的操作 + */ + static long getCurrentTime() { + + return System.currentTimeMillis() / 1000; + } + + /** + * 根据clientIp获取location信息 + * + * @param ip client IP + * @return ip地址详细信息 + */ + static String getGeoIpDetail(String ip) { + + return ipLookup.cityLookupDetail(ip); + + } + + /** + * 根据ip获取asn信息 + * + * @param ip client/server IP + * @return ASN + */ + static String getGeoAsn(String ip) { + + return ipLookup.asnLookup(ip); + } + + /** + * 根据ip获取country信息 + * + * @param ip server IP + * @return 国家 + */ + static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + + /** + * radius借助HBase补齐 + * + * @param ip client IP + * @return account + */ + static String radiusMatch(String ip) { + return HBaseUtils.getAccount(ip.trim()); + } + + /** + * appId与缓存中对应关系补全appName + * + * @param appIds app id 列表 + * @return appName + */ + static String appMatch(String appIds) { + try { + String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); + return AppUtils.getAppName(Integer.parseInt(appId)); + } catch (NumberFormatException | ClassCastException exception) { + logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds); + return ""; + } + } + + /** + * 解析顶级域名 + * + * @param domain 初始域名 + * @return 顶级域名 + */ + static String getTopDomain(String domain) { + try { + return FormatUtils.getTopPrivateDomain(domain); + } catch (StringIndexOutOfBoundsException outException) { + logger.error("解析顶级域名异常,异常域名:" + domain); + return ""; + } + } + + /** + * 根据编码解码base64 + * + * @param message base64 + * @param charset 编码 + * @return 解码字符串 + */ + static String decodeBase64(String message, Object charset) { + String result = ""; + try { + if (StringUtil.isNotBlank(message)) { + if (charset == null) { + result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); + } else { + result = Base64.decodeStr(message, charset.toString()); + } + } + } catch (RuntimeException rune) { + logger.error("解析 Base64 异常,异常信息:" + rune); + } + return result; + } + + /** + * 根据表达式解析json + * + * @param message json + * @param expr 解析表达式 + * @return 解析结果 + */ + static String flattenSpec(String message, String expr) { + String flattenResult = ""; + try { + if (StringUtil.isNotBlank(expr)) { + ArrayList read = JsonPath.parse(message).read(expr); + if (read.size() >= 1) { + flattenResult = read.get(0); + } + } + } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) { + logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e); + } + return flattenResult; + } + + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param jsonMap 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + static Object isJsonValue(JSONObject jsonMap, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(jsonMap, param.substring(2)); + } else { + return param; + } + } + + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param jsonMap 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + static Object condition(JSONObject jsonMap, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, norms[0]); + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (direction instanceof Number) { + result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; + } else if (direction instanceof String) { + result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 函数执行异常,异常信息:" + e); + } + return result; + } + + + /** + * 设置固定值函数 若为数字则转为long返回 + * + * @param param 默认值 + * @return 返回数字或字符串 + */ + static Object setValue(String param) { + try { + Matcher isNum = PATTERN.matcher(param); + if (isNum.matches()) { + return Long.parseLong(param); + } else { + return param; + } + } catch (RuntimeException e) { + logger.error("SetValue 函数异常,异常信息:" + e); + } + return null; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java new file mode 100644 index 0000000..131d2f6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java @@ -0,0 +1,20 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.general.TransFormObject; +import org.apache.flink.api.common.functions.MapFunction; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class ObjectCompletedFunction implements MapFunction { + + @Override + @SuppressWarnings("unchecked") + public String map(String logs) { + return TransFormObject.dealCommonMessage(logs); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java new file mode 100644 index 0000000..99c92e8 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java @@ -0,0 +1,21 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.general.TransFormTypeMap; +import org.apache.flink.api.common.functions.MapFunction; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class TypeMapCompletedFunction implements MapFunction { + + @Override + @SuppressWarnings("unchecked") + public String map(String logs) { + + return TransFormTypeMap.dealCommonMessage(logs); + } +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 0aacc78..afa1bf3 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -105,6 +105,22 @@ public class JsonParseUtil { } } + /** + * 获取属性值的方法 + * + * @param jsonMap 原始日志 + * @param property key + * @return 属性的值 + */ + public static Object getValue(JSONObject jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + /** * 更新属性值的方法 * @@ -136,6 +152,21 @@ public class JsonParseUtil { } } + /** + * 更新属性值的方法 + * + * @param jsonMap 原始日志json map + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(JSONObject jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + /** * 根据反射生成对象的方法 * @@ -206,12 +237,26 @@ public class JsonParseUtil { return isKeepField; } + /** + * 删除schema内指定的无效字段(jackson) + * @param jsonMap + */ public static void dropJsonField(Map jsonMap) { for (String field : dropList) { jsonMap.remove(field); } } + /** + * 删除schema内指定的无效字段(fastjson) + * @param jsonMap + */ + public static void dropJsonField(JSONObject jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } + /** * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) *