1:修改配置命名consumer-surce,producer-sink等

2:增加不同方式处理日志开关
This commit is contained in:
qidaijie
2021-11-07 17:13:13 +03:00
parent 159d00cfb0
commit 8bf733385f
12 changed files with 93 additions and 106 deletions

View File

@@ -19,7 +19,8 @@ public class FlowWriteConfig {
/**
* System config
*/
public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism");
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
@@ -27,6 +28,7 @@ public class FlowWriteConfig {
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
/**
* kafka source config
@@ -39,13 +41,13 @@ public class FlowWriteConfig {
/**
* kafka sink config
*/
public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");

View File

@@ -5,9 +5,10 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.MapCompletedFunction;
import com.zdjizhi.utils.functions.ObjectCompletedFunction;
import com.zdjizhi.utils.functions.TypeMapCompletedFunction;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -27,26 +28,49 @@ public class LogFlowWriteTopology {
//开启Checkpointinterval用于指定checkpoint的触发间隔(单位milliseconds)
// environment.enableCheckpointing(5000);
//
environment.setBufferTimeout(5000);
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(FlowWriteConfig.CONSUMER_PARALLELISM);
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
//对原始日志进行处理补全转换等
DataStream<String> cleaningLog = streamSource.map(new MapCompletedFunction()).name("TransFormLogs")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
DataStream<String> cleaningLog;
switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) {
case 0:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 1:
//对原始日志进行处理补全转换等强制要求日志字段类型与schema一致。
cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 2:
//对原始日志进行处理补全转换等对日志字段类型做若校验可根据schema进行强转。
cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
default:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
//过滤空数据不发送到Kafka内
DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
} else {
//过滤空数据不发送到Kafka内
DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
}
try {

View File

@@ -1,8 +1,7 @@
package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.general.TransFormTypeMap;
import com.zdjizhi.utils.general.TransFormMap;
import org.apache.flink.api.common.functions.MapFunction;
@@ -13,16 +12,10 @@ import org.apache.flink.api.common.functions.MapFunction;
* @date 2021/5/2715:01
*/
public class MapCompletedFunction implements MapFunction<String, String> {
private static final Log logger = LogFactory.get();
@Override
@SuppressWarnings("unchecked")
public String map(String logs) {
try {
return TransFormTypeMap.dealCommonMessage(logs);
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + logs);
return "";
}
return TransFormMap.dealCommonMessage(logs);
}
}

View File

@@ -38,6 +38,7 @@ public class TransFormMap {
try {
if (StringUtil.isNotBlank(message)) {
Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : jobList) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
@@ -122,11 +123,6 @@ public class TransFormMap {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
}
break;
case "app_match":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
}
break;
case "decode_of_base64":
if (logValue != null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
@@ -137,6 +133,11 @@ public class TransFormMap {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
case "app_match":
if (logValue != null && appendTo == null) {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
}
break;
default:
}
}

View File

@@ -131,11 +131,6 @@ public class TransFormObject {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
}
break;
case "app_match":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
}
break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
@@ -146,6 +141,11 @@ public class TransFormObject {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
}
break;
case "app_match":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
}
break;
default:
}
}

View File

@@ -218,11 +218,9 @@ class TransFunction {
Object resultA = isJsonValue(object, split[1]);
Object resultB = isJsonValue(object, split[2]);
if (direction instanceof Number) {
// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
} else if (direction instanceof String) {
result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
// result = direction.equals(norms[1]) ? resultA : resultB;
}
}
} catch (RuntimeException e) {
@@ -249,9 +247,7 @@ class TransFunction {
Object resultB = isJsonValue(jsonMap, split[2]);
if (direction instanceof Number) {
result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
} else if (direction instanceof String) {
// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
result = direction.equals(norms[1]) ? resultA : resultB;
}
}
@@ -261,36 +257,6 @@ class TransFunction {
return result;
}
// /**
// * IF函数实现解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
// *
// * @param jsonMap 原始日志
// * @param ifParam 字段名/普通字符串
// * @return resultA or resultB or null
// */
// static Object condition(Map<String, Object> jsonMap, String ifParam) {
// try {
// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
// String direction = isJsonValue(jsonMap, norms[0]);
// if (StringUtil.isNotBlank(direction)) {
// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
// String resultA = isJsonValue(jsonMap, split[1]);
// String resultB = isJsonValue(jsonMap, split[2]);
// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
// Matcher isNum = PATTERN.matcher(result);
// if (isNum.matches()) {
// return Long.parseLong(result);
// } else {
// return result;
// }
// }
// }
// } catch (RuntimeException e) {
// logger.error("IF 函数执行异常,异常信息:" + e);
// }
// return null;
// }
/**
* 设置固定值函数 若为数字则转为long返回

View File

@@ -206,7 +206,7 @@ public class JsonParseUtil {
return isKeepField;
}
static void dropJsonField(Map<String, Object> jsonMap) {
public static void dropJsonField(Map<String, Object> jsonMap) {
for (String field : dropList) {
jsonMap.remove(field);
}

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.utils.kafka;
import com.sun.tools.javac.comp.Flow;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -18,11 +19,11 @@ import java.util.Properties;
public class Consumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS);
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", FlowWriteConfig.GROUP_ID);
properties.put("session.timeout.ms", "60000");
properties.put("max.poll.records", "3000");
properties.put("max.partition.fetch.bytes", "31457280");
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@@ -32,7 +33,7 @@ public class Consumer {
}
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.INPUT_KAFKA_TOPIC,
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

View File

@@ -19,7 +19,7 @@ public class Producer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS);
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
properties.put("retries", FlowWriteConfig.RETRIES);
properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
@@ -37,7 +37,7 @@ public class Producer {
public static FlinkKafkaProducer<String> getKafkaProducer() {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.OUTPUT_KAFKA_TOPIC,
FlowWriteConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());