diff --git a/src/main/java/com/zdjizhi/bolt/CompletionMapBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionMapBolt.java new file mode 100644 index 0000000..8e39798 --- /dev/null +++ b/src/main/java/com/zdjizhi/bolt/CompletionMapBolt.java @@ -0,0 +1,48 @@ +package com.zdjizhi.bolt; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.TransFormTypeMap; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +/** + * @author qidaijie + */ + +public class CompletionMapBolt extends BaseBasicBolt { + private static final long serialVersionUID = 9006119186526123734L; + private static final Log logger = LogFactory.get(); + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(TransFormTypeMap.dealCommonMessage(message))); + } + } catch (RuntimeException e) { + logger.error("处理原始日志下发过程异常,异常信息:" + e); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("tsgLog")); + } + +} diff --git a/src/main/java/com/zdjizhi/bolt/CompletionTypeMapBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionTypeMapBolt.java new file mode 100644 index 0000000..7223748 --- /dev/null +++ b/src/main/java/com/zdjizhi/bolt/CompletionTypeMapBolt.java @@ -0,0 +1,48 @@ +package com.zdjizhi.bolt; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.TransFormTypeMap; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +/** + * @author qidaijie + */ + +public class CompletionTypeMapBolt extends BaseBasicBolt { + private static final long serialVersionUID = 9006119186526123734L; + private static final Log logger = LogFactory.get(); + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(TransFormTypeMap.dealCommonMessage(message))); + } + } catch (RuntimeException e) { + logger.error("处理原始日志下发过程异常,异常信息:" + e); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("tsgLog")); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java new file mode 100644 index 0000000..67c88f0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class FlowWriteException extends RuntimeException { + + public FlowWriteException() { + } + + public FlowWriteException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java new file mode 100644 index 0000000..8ffb351 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -0,0 +1,141 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.JsonTypeUtils; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormMap { + private static final Log logger = LogFactory.get(); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); + for (String[] strings : jobList) { + //用到的参数的值 + Object name = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendTo, name, param); + } + return JSONObject.toJSONString(jsonMap); + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return ""; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendTo 需要补全的字段的值 + * @param name 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object name, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); + } + break; + case "geo_asn": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(name.toString())); + } + break; + case "geo_ip_country": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); + } + break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, name); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendTo == null && name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(name.toString())); + } + break; + case "radius_match": + if (name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(name.toString())); + } + break; + case "app_match": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(name.toString())); + } + break; + case "decode_of_base64": + if (name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (name != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java new file mode 100644 index 0000000..0bfe490 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -0,0 +1,142 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.JsonTypeUtils; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormTypeMap { + private static final Log logger = LogFactory.get(); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); +// Map jsonMap = JsonTypeUtils.typeTransform((Map) JsonMapper.fromJsonString(message, Map.class)); + for (String[] strings : jobList) { + //用到的参数的值 + Object name = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendTo, name, param); + } + return JSONObject.toJSONString(JsonTypeUtils.typeTransform(jsonMap)); + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return ""; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendTo 需要补全的字段的值 + * @param name 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Map jsonMap, String appendToKeyName, Object appendTo, Object name, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); + } + break; + case "geo_asn": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(name.toString())); + } + break; + case "geo_ip_country": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); + } + break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, name); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendTo == null && name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(name.toString())); + } + break; + case "radius_match": + if (name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(name.toString())); + } + break; + case "app_match": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(name.toString())); + } + break; + case "decode_of_base64": + if (name != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (name != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java new file mode 100644 index 0000000..0b6bc1e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -0,0 +1,187 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.FlowWriteException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtils { + private static final Log logger = LogFactory.get(); + /** + * 在内存中加载反射类用的map + */ + private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 类型转换 + * + * @param jsonMap 原始日志map + */ + public static Map typeTransform(Map jsonMap) throws RuntimeException { + JsonParseUtil.dropJsonField(jsonMap); + HashMap tmpMap = new HashMap<>(192); + for (String key : jsonMap.keySet()) { + if (map.containsKey(key)) { + String simpleName = map.get(key).getSimpleName(); + switch (simpleName) { + case "String": + tmpMap.put(key, checkString(jsonMap.get(key))); + break; + case "Integer": + tmpMap.put(key, getIntValue(jsonMap.get(key))); + break; + case "long": + tmpMap.put(key, checkLongValue(jsonMap.get(key))); + break; + case "List": + tmpMap.put(key, checkArray(jsonMap.get(key))); + break; + case "Map": + tmpMap.put(key, checkObject(jsonMap.get(key))); + break; + case "double": + tmpMap.put(key, checkDouble(jsonMap.get(key))); + break; + default: + tmpMap.put(key, checkString(jsonMap.get(key))); + } + } + } + return tmpMap; + } + + /** + * String 类型检验转换方法 + * + * @param value json value + * @return String value + */ + private static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map){ + return JsonMapper.toJsonString(value); + } + + if (value instanceof List){ + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new FlowWriteException("can not cast to map, value : " + value); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new FlowWriteException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return Long value + */ + private static long checkLongValue(Object value) { + Long longVal = TypeUtils.castToLong(value); + if (longVal == null) { + return 0L; + } + +// return longVal.longValue(); + return longVal; + } + + /** + * Double 类型校验转换方法 + * + * @param value json value + * @return Double value + */ + private static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return int value + */ + private static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + if (intVal == null) { + return 0; + } + +// return intVal.intValue(); + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..9374b66 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,201 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.FlowWriteException; + +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + //此判断数值超范围不抛出异常,会截取成对应类型数值 +// if (value instanceof Number) { +// return ((Number) value).intValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Double类型判断方法 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Double) { + return (Double) value; + } + + //此判断数值超范围不抛出异常,会截取成对应类型数值 +// if (value instanceof Number) { +// return ((Number) value).doubleValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to double, value : " + value); + } + + /** + * Long类型判断方法 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + + if(value instanceof BigDecimal){ + return longValue((BigDecimal) value); + } + +// 此判断数值超范围不抛出异常,会截取成对应类型数值 + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to long, value : " + value); + } + + public static long longValue(BigDecimal decimal) { + if (decimal == null) { + return 0; + } + + int scale = decimal.scale(); + if (scale >= -100 && scale <= 100) { + return decimal.longValue(); + } + + return decimal.longValueExact(); + } + +}