diff --git a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java b/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java deleted file mode 100644 index 48d8757..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.zdjizhi.utils.functions.map; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.pojo.Fields; -import com.zdjizhi.common.pojo.Tags; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; - - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class MetricsParseMap implements MapFunction> { - private static final Log logger = LogFactory.get(); - - @Override - public Tuple2 map(String message) { - try { - JSONObject originalLog = JSON.parseObject(message); - Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class); - Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class); - - String appFullPath = tags.getApp_name(); - if (StringUtil.isNotBlank(appFullPath)) { - String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1); - String protocolLabel = tags.getProtocol_stack_id(); - - tags.setApp_name(appName); - tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath)); - } - - return new Tuple2<>(tags, fields); - } catch (RuntimeException e) { - logger.error("An error occurred in the original log parsing reorganization,error message is:" + e); - return new Tuple2<>(null, null); - } - } - -} diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java new file mode 100644 index 0000000..020fe77 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java @@ -0,0 +1,47 @@ +package com.zdjizhi.utils.functions.process; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.JSONPath; +import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Tags; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +public class ParsingData extends ProcessFunction> { + private static final Log logger = LogFactory.get(); + + private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]"; + + @Override + public void processElement(String value, ProcessFunction>.Context ctx, Collector> out) { + try { + if (StringUtil.isNotBlank(value)) { + Object isProtocolData = JSONPath.eval(value, dataTypeExpr); + if (isProtocolData != null) { + JSONObject originalLog = JSON.parseObject(value); + Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class); + Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class); + Long timestamp = originalLog.getLong("timestamp"); + + String appFullPath = tags.getApp_name(); + if (StringUtil.isNotBlank(appFullPath)) { + String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1); + String protocolLabel = tags.getProtocol_stack_id(); + + tags.setApp_name(appName); + tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath)); + } + + out.collect(new Tuple3<>(tags, fields, timestamp)); + } + } + } catch (RuntimeException e) { + logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage()); + } + } +}