1:新增日志字段类型弱校验功能,根据Schema定义的字段类型对原始日志不匹配的字段进行类型转换。

2:增加complete.check.type配置对此功能进行开关。
This commit is contained in:
qidaijie
2021-07-14 13:57:40 +08:00
parent 163b724a12
commit 3a60e16e71
7 changed files with 785 additions and 0 deletions

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.bolt;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.TransFormTypeMap;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author qidaijie
*/
public class CompletionMapBolt extends BaseBasicBolt {
private static final long serialVersionUID = 9006119186526123734L;
private static final Log logger = LogFactory.get();
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(TransFormTypeMap.dealCommonMessage(message)));
}
} catch (RuntimeException e) {
logger.error("处理原始日志下发过程异常,异常信息:" + e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tsgLog"));
}
}

View File

@@ -0,0 +1,48 @@
package com.zdjizhi.bolt;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.TransFormTypeMap;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author qidaijie
*/
public class CompletionTypeMapBolt extends BaseBasicBolt {
private static final long serialVersionUID = 9006119186526123734L;
private static final Log logger = LogFactory.get();
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(TransFormTypeMap.dealCommonMessage(message)));
}
} catch (RuntimeException e) {
logger.error("处理原始日志下发过程异常,异常信息:" + e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tsgLog"));
}
}

View File

@@ -0,0 +1,18 @@
package com.zdjizhi.utils.exception;
/**
* @author qidaijie
* @Package com.zdjizhi.storm.utils.execption
* @Description:
* @date 2021/3/259:42
*/
public class FlowWriteException extends RuntimeException {
public FlowWriteException() {
}
public FlowWriteException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,141 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.utils.json.JsonTypeUtils;
import java.util.ArrayList;
import java.util.Map;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormMap {
private static final Log logger = LogFactory.get();
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段补全的字段用到的功能函数用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
*/
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static String dealCommonMessage(String message) {
try {
Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, name, param);
}
return JSONObject.toJSONString(jsonMap);
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
return "";
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key
* @param appendTo 需要补全的字段的值
* @param name 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object name, String param) {
switch (function) {
case "current_timestamp":
if (!(appendTo instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
break;
case "geo_ip_detail":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
}
break;
case "geo_asn":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
}
break;
case "geo_ip_country":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
}
break;
case "set_value":
if (name != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, name);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendTo == null && name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(name.toString()));
}
break;
case "radius_match":
if (name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(name.toString()));
}
break;
case "app_match":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(name.toString()));
}
break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "flattenSpec":
if (name != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
}
break;
default:
}
}
}

View File

@@ -0,0 +1,142 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.utils.json.JsonTypeUtils;
import java.util.ArrayList;
import java.util.Map;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormTypeMap {
private static final Log logger = LogFactory.get();
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段补全的字段用到的功能函数用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
*/
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static String dealCommonMessage(String message) {
try {
Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
// Map<String, Object> jsonMap = JsonTypeUtils.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(message, Map.class));
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, name, param);
}
return JSONObject.toJSONString(JsonTypeUtils.typeTransform(jsonMap));
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
return "";
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param jsonMap 原始日志解析map
* @param appendToKeyName 需要补全的字段的key
* @param appendTo 需要补全的字段的值
* @param name 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object name, String param) {
switch (function) {
case "current_timestamp":
if (!(appendTo instanceof Long)) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
break;
case "geo_ip_detail":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
}
break;
case "geo_asn":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
}
break;
case "geo_ip_country":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
}
break;
case "set_value":
if (name != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, name);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
}
break;
case "sub_domain":
if (appendTo == null && name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(name.toString()));
}
break;
case "radius_match":
if (name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(name.toString()));
}
break;
case "app_match":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(name.toString()));
}
break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(jsonMap, param)));
}
break;
case "flattenSpec":
if (name != null && param != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
}
break;
default:
}
}
}

View File

@@ -0,0 +1,187 @@
package com.zdjizhi.utils.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.exception.FlowWriteException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author qidaijie
* @Package PACKAGE_NAME
* @Description:
* @date 2021/7/1217:34
*/
public class JsonTypeUtils {
private static final Log logger = LogFactory.get();
/**
* 在内存中加载反射类用的map
*/
private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 类型转换
*
* @param jsonMap 原始日志map
*/
public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
JsonParseUtil.dropJsonField(jsonMap);
HashMap<String, Object> tmpMap = new HashMap<>(192);
for (String key : jsonMap.keySet()) {
if (map.containsKey(key)) {
String simpleName = map.get(key).getSimpleName();
switch (simpleName) {
case "String":
tmpMap.put(key, checkString(jsonMap.get(key)));
break;
case "Integer":
tmpMap.put(key, getIntValue(jsonMap.get(key)));
break;
case "long":
tmpMap.put(key, checkLongValue(jsonMap.get(key)));
break;
case "List":
tmpMap.put(key, checkArray(jsonMap.get(key)));
break;
case "Map":
tmpMap.put(key, checkObject(jsonMap.get(key)));
break;
case "double":
tmpMap.put(key, checkDouble(jsonMap.get(key)));
break;
default:
tmpMap.put(key, checkString(jsonMap.get(key)));
}
}
}
return tmpMap;
}
/**
* String 类型检验转换方法
*
* @param value json value
* @return String value
*/
private static String checkString(Object value) {
if (value == null) {
return null;
}
if (value instanceof Map){
return JsonMapper.toJsonString(value);
}
if (value instanceof List){
return JsonMapper.toJsonString(value);
}
return value.toString();
}
/**
* array 类型检验转换方法
*
* @param value json value
* @return List value
*/
private static Map checkObject(Object value) {
if (value == null) {
return null;
}
if (value instanceof Map) {
return (Map) value;
}
throw new FlowWriteException("can not cast to map, value : " + value);
}
/**
* array 类型检验转换方法
*
* @param value json value
* @return List value
*/
private static List checkArray(Object value) {
if (value == null) {
return null;
}
if (value instanceof List) {
return (List) value;
}
throw new FlowWriteException("can not cast to List, value : " + value);
}
private static Long checkLong(Object value) {
if (value == null) {
return null;
}
return TypeUtils.castToLong(value);
}
/**
* long 类型检验转换方法,若为空返回基础值
*
* @param value json value
* @return Long value
*/
private static long checkLongValue(Object value) {
Long longVal = TypeUtils.castToLong(value);
if (longVal == null) {
return 0L;
}
// return longVal.longValue();
return longVal;
}
/**
* Double 类型校验转换方法
*
* @param value json value
* @return Double value
*/
private static Double checkDouble(Object value) {
if (value == null) {
return null;
}
return TypeUtils.castToDouble(value);
}
private static Integer checkInt(Object value) {
if (value == null) {
return null;
}
return TypeUtils.castToInt(value);
}
/**
* int 类型检验转换方法,若为空返回基础值
*
* @param value json value
* @return int value
*/
private static int getIntValue(Object value) {
Integer intVal = TypeUtils.castToInt(value);
if (intVal == null) {
return 0;
}
// return intVal.intValue();
return intVal;
}
}

View File

@@ -0,0 +1,201 @@
package com.zdjizhi.utils.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.FlowWriteException;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* @author qidaijie
* @Package PACKAGE_NAME
* @Description:
* @date 2021/7/1218:20
*/
public class TypeUtils {
private static final Log logger = LogFactory.get();
/**
* Integer 类型判断方法
*
* @param value json value
* @return Integer value or null
*/
public static Object castToIfFunction(Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return value.toString();
}
if (value instanceof Integer) {
return ((Number) value).intValue();
}
if (value instanceof Long) {
return ((Number) value).longValue();
}
// if (value instanceof Map) {
// return (Map) value;
// }
//
// if (value instanceof List) {
// return Collections.singletonList(value.toString());
// }
if (value instanceof Boolean) {
return (Boolean) value ? 1 : 0;
}
throw new FlowWriteException("can not cast to int, value : " + value);
}
/**
* Integer 类型判断方法
*
* @param value json value
* @return Integer value or null
*/
static Integer castToInt(Object value) {
if (value == null) {
return null;
}
if (value instanceof Integer) {
return (Integer) value;
}
//此判断数值超范围不抛出异常,会截取成对应类型数值
// if (value instanceof Number) {
// return ((Number) value).intValue();
// }
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
return null;
}
//将 10,20 类数据转换为10
if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
}
try {
return Integer.parseInt(strVal);
} catch (NumberFormatException ex) {
logger.error("String change Integer Error,The error Str is:" + strVal);
}
}
if (value instanceof Boolean) {
return (Boolean) value ? 1 : 0;
}
throw new FlowWriteException("can not cast to int, value : " + value);
}
/**
* Double类型判断方法
*
* @param value json value
* @return double value or null
*/
static Double castToDouble(Object value) {
if (value instanceof Double) {
return (Double) value;
}
//此判断数值超范围不抛出异常,会截取成对应类型数值
// if (value instanceof Number) {
// return ((Number) value).doubleValue();
// }
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
return null;
}
//将 10,20 类数据转换为10
if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
}
try {
return Double.parseDouble(strVal);
} catch (NumberFormatException ex) {
logger.error("String change Double Error,The error Str is:" + strVal);
}
}
throw new FlowWriteException("can not cast to double, value : " + value);
}
/**
* Long类型判断方法
*
* @param value json value
* @return (Long)value or null
*/
static Long castToLong(Object value) {
if (value == null) {
return null;
}
if(value instanceof BigDecimal){
return longValue((BigDecimal) value);
}
// 此判断数值超范围不抛出异常,会截取成对应类型数值
if (value instanceof Number) {
return ((Number) value).longValue();
}
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
return null;
}
//将 10,20 类数据转换为10
if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
}
try {
return Long.parseLong(strVal);
} catch (NumberFormatException ex) {
logger.error("String change Long Error,The error Str is:" + strVal);
}
}
throw new FlowWriteException("can not cast to long, value : " + value);
}
public static long longValue(BigDecimal decimal) {
if (decimal == null) {
return 0;
}
int scale = decimal.scale();
if (scale >= -100 && scale <= 100) {
return decimal.longValue();
}
return decimal.longValueExact();
}
}