package com.zdjizhi.source; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CustomFile; import com.zdjizhi.utils.FlinkEnvironmentUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.List; import java.util.Map; import java.util.Properties; /** * @author wlh */ public class DosSketchSource { private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; public static DataStreamSource createDosSketchSource(){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); } return streamExeEnv.addSource(new FlinkKafkaConsumer( CommonConfig.KAFKA_INPUT_TOPIC_NAME, new SimpleStringSchema(), properties)) .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM); } public static DataStreamSource> broadcastSource(Properties nacosProperties, String STORE_PATH){ return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT,STORE_PATH)); } public static DataStreamSource> singleBroadcastSource(Properties nacosProperties){ return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT)); } }