package com.zdjizhi.sink; import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosMetricsLog; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import static com.zdjizhi.sink.OutputStreamSink.outputTag; class TrafficServerIpMetricsSink { static void sideOutputMetricsSink(SingleOutputStreamOperator outputStream){ DataStream sideOutput = outputStream.getSideOutput(outputTag); // sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) .setParallelism(FlowWriteConfig.KAFKA_OUTPUT_METRIC_PARALLELISM); } }