toJSONString替换为fastjson工具类

This commit is contained in:
qidaijie
2021-11-11 09:14:09 +03:00
parent 8bf733385f
commit 60e4bcfca0
10 changed files with 67 additions and 45 deletions

View File

@@ -28,8 +28,8 @@ public class LogFlowWriteTopology {
//开启Checkpointinterval用于指定checkpoint的触发间隔(单位milliseconds)
// environment.enableCheckpointing(5000);
//
environment.setBufferTimeout(5000);
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
@@ -41,26 +41,31 @@ public class LogFlowWriteTopology {
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 1:
//对原始日志进行处理补全转换等强制要求日志字段类型与schema一致。
cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 2:
//对原始日志进行处理补全转换等对日志字段类型做若校验可根据schema进行强转。
cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
default:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
//过滤空数据不发送到Kafka内
// //过滤空数据不发送到Kafka内
DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);