feat:adapt to percent log structure
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
package com.zdjizhi.operator.percent;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
public class PercentSessionProcess extends ProcessFunction<JSONObject, String> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
Properties prop = new Properties();
|
||||
private ConvertRecordToPERCENT convertRecordToPERCENT;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
try {
|
||||
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_session_record.properties"));
|
||||
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
|
||||
logger.info("percent_session_record.properties日志加载成功");
|
||||
} catch (Exception e) {
|
||||
logger.error("percent_session_record.properties日志加载失败,失败原因为:" + e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
|
||||
try {
|
||||
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
|
||||
} catch (Exception e) {
|
||||
logger.error("删减percent_session日志字段失败,失败原因为:{},数据为:{}", e, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user