1:新增日志字段类型弱校验功能,根据Schema定义的字段类型对原始日志不匹配的字段进行类型转换。

2:增加complete.check.type配置对此功能进行开关。
This commit is contained in:
qidaijie
2021-07-14 13:56:54 +08:00
parent 5764bb999e
commit 163b724a12
8 changed files with 218 additions and 79 deletions

View File

@@ -4,7 +4,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-completion-schema</artifactId>
<version>v3.21.06.28-jackson</version>
<version>v3.21.07.13-map</version>
<packaging>jar</packaging>
<name>log-stream-completion-schema</name>

View File

@@ -13,20 +13,22 @@ zookeeper.servers=192.168.44.12:2181
hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
#ip.library=/home/bigdata/topology/dat/
#网关的schema位置
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
#schema.http=http://192.168.40.203:9999/metadata/schema/v1/fields/connection_record_log
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log
#网关APP_ID 获取接口
app.id.http=http://192.168.44.67:9999/open-api/appDicList
app.id.http=http://192.168.44.12:9999/open-api/appDicList
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
kafka.topic=CONNECTION-RECORD-LOG
kafka.topic=test
#补全数据 输出 topic
results.output.topic=test-result
@@ -92,12 +94,12 @@ topology.tick.tuple.freq.secs=5
#spout接收睡眠时间
topology.spout.sleep.time=1
#允许发送kafka最大失败数
max.failure.num=20
#邮件默认编码
mail.default.charset=UTF-8
#需不要补全,不需要则原样日志输出
log.need.complete=yes
#补全校验类型 0 强制类型校验1 弱类型校验2 不校验
complete.check.type=1

View File

@@ -3,6 +3,7 @@ package com.zdjizhi.bolt;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.TransFormObject;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -13,13 +14,11 @@ import org.apache.storm.tuple.Values;
import java.util.Map;
import static com.zdjizhi.utils.general.TransFormUtils.dealCommonMessage;
/**
* @author qidaijie
*/
public class CompletionBolt extends BaseBasicBolt {
public class CompletionObjectBolt extends BaseBasicBolt {
private static final long serialVersionUID = 9006119186526123734L;
private static final Log logger = LogFactory.get();
@@ -35,7 +34,7 @@ public class CompletionBolt extends BaseBasicBolt {
try {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
basicOutputCollector.emit(new Values(TransFormObject.dealCommonMessage(message)));
}
} catch (RuntimeException e) {
logger.error("处理原始日志下发过程异常,异常信息:" + e);

View File

@@ -30,9 +30,9 @@ public class FlowWriteConfig {
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
public static final String LOG_NEED_COMPLETE = FlowWriteConfigurations.getStringProperty(0, "log.need.complete");
public static final Integer COMPLETE_CHECK_TYPE = FlowWriteConfigurations.getIntProperty(0, "complete.check.type");
/**
* kafka

View File

@@ -1,7 +1,9 @@
package com.zdjizhi.topology;
import com.zdjizhi.bolt.CompletionBolt;
import com.zdjizhi.bolt.CompletionMapBolt;
import com.zdjizhi.bolt.CompletionTypeMapBolt;
import com.zdjizhi.bolt.CompletionObjectBolt;
import com.zdjizhi.bolt.LogSendBolt;
import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
@@ -51,9 +53,9 @@ public class LogFlowWriteTopology {
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,DefaultProConfig.TRANSFER_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, DefaultProConfig.TRANSFER_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
@@ -62,12 +64,33 @@ public class LogFlowWriteTopology {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) {
builder.setBolt("LogCompletionBolt", new CompletionBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("LogSendBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
switch (FlowWriteConfig.COMPLETE_CHECK_TYPE) {
case 0:
builder.setBolt("LogObjectCompletionBolt", new CompletionObjectBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("SendKafkaBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogObjectCompletionBolt");
break;
case 1:
builder.setBolt("LogTypeMapCompletionBolt", new CompletionTypeMapBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("SendKafkaBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogTypeMapCompletionBolt");
break;
case 2:
builder.setBolt("LogMapCompletionBolt", new CompletionMapBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("SendKafkaBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogMapCompletionBolt");
break;
default:
builder.setBolt("LogCompletionBolt", new CompletionObjectBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("SendKafkaBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
}
} else {
builder.setBolt("LogSendBolt", new LogSendBolt(),
builder.setBolt("SendKafkaBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
}
}

View File

@@ -3,6 +3,8 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.json.JsonParseUtil;
@@ -16,7 +18,7 @@ import java.util.HashMap;
*
* @author qidaijie
*/
public class TransFormUtils {
public class TransFormObject {
private static final Log logger = LogFactory.get();
/**
@@ -44,7 +46,8 @@ public class TransFormUtils {
*/
public static String dealCommonMessage(String message) {
try {
Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
// Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
Object object = JSONObject.parseObject(message, mapObject.getClass());
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(object, strings[0]);
@@ -58,10 +61,10 @@ public class TransFormUtils {
String param = strings[3];
functionSet(function, object, appendToKeyName, appendTo, name, param);
}
return JsonMapper.toJsonString(object);
// return JsonMapper.toJsonString(object);
return JSONObject.toJSONString(object);
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e);
e.printStackTrace();
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
return "";
}
}
@@ -80,7 +83,7 @@ public class TransFormUtils {
private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
switch (function) {
case "current_timestamp":
if (! (appendTo instanceof Long)) {
if (!(appendTo instanceof Long)) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime());
}
break;
@@ -139,7 +142,7 @@ public class TransFormUtils {
break;
case "flattenSpec":
if (name != null && param != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), TransFunction.isJsonValue(object, param)));
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
}
break;
default:

View File

@@ -15,8 +15,10 @@ import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.app.AppUtils;
import com.zdjizhi.utils.hbase.HBaseUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.utils.json.TypeUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -136,14 +138,14 @@ class TransFunction {
* @param charset 编码
* @return 解码字符串
*/
static String decodeBase64(String message, String charset) {
static String decodeBase64(String message, Object charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (StringUtil.isNotBlank(charset)) {
result = Base64.decodeStr(message, charset);
} else {
if (charset == null) {
result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
} else {
result = Base64.decodeStr(message, charset.toString());
}
}
} catch (RuntimeException rune) {
@@ -180,14 +182,24 @@ class TransFunction {
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static String isJsonValue(Object object, String param) {
static Object isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
Object value = JsonParseUtil.getValue(object, param.substring(2));
if (value != null) {
return value.toString();
} else {
return "";
}
return JsonParseUtil.getValue(object, param.substring(2));
} else {
return param;
}
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param jsonMap 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static Object isJsonValue(Map<String, Object> jsonMap, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
return JsonParseUtil.getValue(jsonMap, param.substring(2));
} else {
return param;
}
@@ -198,32 +210,92 @@ class TransFunction {
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or ""
* @return resultA or resultB or null
*/
static Object condition(Object object, String ifParam) {
Object result = null;
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
String direction = isJsonValue(object, norms[0]);
if (StringUtil.isNotBlank(direction)) {
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String resultA = isJsonValue(object, split[1]);
String resultB = isJsonValue(object, split[2]);
String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
Matcher isNum = PATTERN.matcher(result);
if (isNum.matches()) {
return Long.parseLong(result);
} else {
return result;
}
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
Object direction = isJsonValue(object, norms[0]);
Object resultA = isJsonValue(object, split[1]);
Object resultB = isJsonValue(object, split[2]);
if (direction instanceof Number) {
// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
} else if (direction instanceof String) {
result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
// result = direction.equals(norms[1]) ? resultA : resultB;
}
}
} catch (RuntimeException e) {
logger.error("IF 函数执行异常,异常信息:" + e);
}
return null;
return result;
}
/**
* IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
*
* @param jsonMap 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or null
*/
static Object condition(Map<String, Object> jsonMap, String ifParam) {
Object result = null;
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
Object direction = isJsonValue(jsonMap, norms[0]);
Object resultA = isJsonValue(jsonMap, split[1]);
Object resultB = isJsonValue(jsonMap, split[2]);
if (direction instanceof Number) {
result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
} else if (direction instanceof String) {
// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
result = direction.equals(norms[1]) ? resultA : resultB;
}
}
} catch (RuntimeException e) {
logger.error("IF 函数执行异常,异常信息:" + e);
}
return result;
}
// /**
// * IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
// *
// * @param jsonMap 原始日志
// * @param ifParam 字段名/普通字符串
// * @return resultA or resultB or null
// */
// static Object condition(Map<String, Object> jsonMap, String ifParam) {
// try {
// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
// String direction = isJsonValue(jsonMap, norms[0]);
// if (StringUtil.isNotBlank(direction)) {
// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
// String resultA = isJsonValue(jsonMap, split[1]);
// String resultB = isJsonValue(jsonMap, split[2]);
// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
// Matcher isNum = PATTERN.matcher(result);
// if (isNum.matches()) {
// return Long.parseLong(result);
// } else {
// return result;
// }
// }
// }
// } catch (RuntimeException e) {
// logger.error("IF 函数执行异常,异常信息:" + e);
// }
// return null;
// }
/**
* 设置固定值函数 若为数字则转为long返回
*

View File

@@ -21,9 +21,10 @@ import java.util.*;
* @author qidaijie
*/
public class JsonParseUtil {
private static final Log logger = LogFactory.get();
private static List<String> dropFieldList = new ArrayList<>();
private static ArrayList<String> dropList = new ArrayList<>();
/**
* 模式匹配,给定一个类型字符串返回一个类类型
@@ -39,17 +40,14 @@ public class JsonParseUtil {
case "int":
clazz = Integer.class;
break;
case "String":
case "string":
clazz = String.class;
break;
case "long":
clazz = long.class;
break;
case "array":
clazz = JSONArray.class;
break;
case "Integer":
clazz = Integer.class;
clazz = List.class;
break;
case "double":
clazz = double.class;
@@ -75,22 +73,6 @@ public class JsonParseUtil {
return clazz;
}
/**
* 根据反射生成对象的方法
*
* @param properties 反射类用的map
* @return 生成的Object类型的对象
*/
public static Object generateObject(Map properties) {
BeanGenerator generator = new BeanGenerator();
Set keySet = properties.keySet();
for (Object aKeySet : keySet) {
String key = (String) aKeySet;
generator.addProperty(key, (Class) properties.get(key));
}
return generator.create();
}
/**
* 获取属性值的方法
*
@@ -99,8 +81,44 @@ public class JsonParseUtil {
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
try {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
} catch (RuntimeException e) {
logger.error("获取json-value异常异常key" + property + "异常信息为:" + e);
return null;
}
}
/**
* 获取属性值的方法
*
* @param jsonMap 原始日志
* @param property key
* @return 属性的值
*/
public static Object getValue(Map<String, Object> jsonMap, String property) {
try {
return jsonMap.getOrDefault(property, null);
} catch (RuntimeException e) {
logger.error("获取json-value异常异常key" + property + "异常信息为:" + e);
return null;
}
}
/**
* 更新属性值的方法
*
* @param jsonMap 原始日志json map
* @param property 更新的key
* @param value 更新的值
*/
public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
try {
jsonMap.put(property, value);
} catch (RuntimeException e) {
logger.error("赋予实体类错误类型数据", e);
}
}
/**
@@ -119,6 +137,22 @@ public class JsonParseUtil {
}
}
/**
* 根据反射生成对象的方法
*
* @param properties 反射类用的map
* @return 生成的Object类型的对象
*/
public static Object generateObject(Map properties) {
BeanGenerator generator = new BeanGenerator();
Set keySet = properties.keySet();
for (Object aKeySet : keySet) {
String key = (String) aKeySet;
generator.addProperty(key, (Class) properties.get(key));
}
return generator.create();
}
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
*
@@ -146,7 +180,7 @@ public class JsonParseUtil {
//组合用来生成实体类的map
map.put(name, getClassName(type));
} else {
dropFieldList.add(JsonPath.read(filedStr, "$.name").toString());
dropList.add(filedStr);
}
}
return map;
@@ -173,6 +207,12 @@ public class JsonParseUtil {
return isKeepField;
}
static void dropJsonField(Map<String, Object> jsonMap) {
for (String field : dropList) {
jsonMap.remove(field);
}
}
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList paramlist)
*