From 9a2a5b3957a5af16473266cb60a959d6dfafd78d Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 23 Nov 2022 15:30:24 +0800 Subject: [PATCH] =?UTF-8?q?GAL-224=20DoS=E6=A3=80=E6=B5=8B=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E7=9F=A5=E8=AF=86=E5=BA=93=E5=8A=A8=E6=80=81=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 16 +- .../java/com/zdjizhi/common/CommonConfig.java | 23 ++ .../java/com/zdjizhi/common/CustomFile.java | 23 +- .../java/com/zdjizhi/common/KnowledgeLog.java | 91 +++++ .../java/com/zdjizhi/etl/ParseSketchLog.java | 36 +- .../function/BroadcastProcessFunc.java | 76 ++++ .../com/zdjizhi/source/DosSketchSource.java | 12 + .../java/com/zdjizhi/source/HttpSource.java | 182 +++++++++ .../zdjizhi/source/HttpSourceFunction.java | 6 + .../source/RichHttpSourceFunction.java | 10 + .../com/zdjizhi/source/SingleHttpSource.java | 237 ++++++++++++ .../java/com/zdjizhi/utils/FileByteUtils.java | 24 ++ .../java/com/zdjizhi/utils/HdfsUtils.java | 68 ++-- .../java/com/zdjizhi/utils/HttpSource.java | 139 ------- src/main/java/com/zdjizhi/utils/IpUtils.java | 98 ++++- .../java/com/zdjizhi/utils/KnowledgeBase.java | 32 -- .../com/zdjizhi/utils/KnowledgeConstant.java | 24 -- .../com/zdjizhi/utils/KnowledgeUtils.java | 95 ----- .../java/com/zdjizhi/utils/LinkUtils.java | 75 ---- .../java/com/zdjizhi/utils/NacosUtils.java | 12 +- .../java/com/zdjizhi/utils/NacosUtils2.java | 352 ------------------ .../java/com/zdjizhi/utils/TestKnowledge.java | 160 -------- src/main/resources/common.properties | 71 +++- src/main/resources/core-site.xml | 58 +++ src/main/resources/hdfs-site.xml | 142 +++++++ src/main/resources/yarn-site.xml | 196 ++++++++++ 26 files changed, 1336 insertions(+), 922 deletions(-) create mode 100644 src/main/java/com/zdjizhi/common/KnowledgeLog.java create mode 100644 src/main/java/com/zdjizhi/function/BroadcastProcessFunc.java create mode 100644 src/main/java/com/zdjizhi/source/HttpSource.java create mode 100644 src/main/java/com/zdjizhi/source/HttpSourceFunction.java create mode 100644 src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java create mode 100644 src/main/java/com/zdjizhi/source/SingleHttpSource.java create mode 100644 src/main/java/com/zdjizhi/utils/FileByteUtils.java delete mode 100644 src/main/java/com/zdjizhi/utils/HttpSource.java delete mode 100644 src/main/java/com/zdjizhi/utils/KnowledgeBase.java delete mode 100644 src/main/java/com/zdjizhi/utils/KnowledgeConstant.java delete mode 100644 src/main/java/com/zdjizhi/utils/KnowledgeUtils.java delete mode 100644 src/main/java/com/zdjizhi/utils/LinkUtils.java delete mode 100644 src/main/java/com/zdjizhi/utils/NacosUtils2.java delete mode 100644 src/main/java/com/zdjizhi/utils/TestKnowledge.java create mode 100644 src/main/resources/core-site.xml create mode 100644 src/main/resources/hdfs-site.xml create mode 100644 src/main/resources/yarn-site.xml diff --git a/pom.xml b/pom.xml index 2a0d6dc..e4cd910 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ 2.1.1 2.7.1 2.11 + 2.4.0 @@ -122,6 +123,12 @@ 1.7.21 + + com.jayway.jsonpath + json-path + ${jsonpath.version} + + org.apache.flink flink-connector-kafka_2.12 @@ -156,6 +163,13 @@ + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + org.apache.hbase @@ -211,7 +225,7 @@ com.zdjizhi galaxy - 1.1.0 + 1.1.1 slf4j-log4j12 diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 61c310b..97e9a32 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -10,6 +10,11 @@ import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; */ public class CommonConfig { + /** + * 定位库默认分隔符 + */ + public static final String LOCATION_SEPARATOR = "."; + private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); static { @@ -80,6 +85,24 @@ public class CommonConfig { public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag"); + public static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr"); + public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username"); + public static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password"); + public static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id"); + public static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group"); + public static final int NACOS_READ_TIMEOUT = CommonConfigurations.getIntProperty("nacos.read.timeout"); + + public static final String HOS_TOKEN = CommonConfigurations.getStringProperty("hos.token"); + + public static final String CLUSTER_OR_SINGLE = CommonConfigurations.getStringProperty("cluster.or.single"); + + public static final String HDFS_URI_NS1 = CommonConfigurations.getStringProperty("hdfs.uri.nn1"); + public static final String HDFS_URI_NS2 = CommonConfigurations.getStringProperty("hdfs.uri.nn2"); + public static final String HDFS_PATH = CommonConfigurations.getStringProperty("hdfs.path"); + public static final String HDFS_USER = CommonConfigurations.getStringProperty("hdfs.user"); + + public static final String DOWNLOAD_PATH = CommonConfigurations.getStringProperty("download.path"); + public static void main(String[] args) { StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); // 配置加密解密的密码/salt值 diff --git a/src/main/java/com/zdjizhi/common/CustomFile.java b/src/main/java/com/zdjizhi/common/CustomFile.java index 0c22eb8..701024c 100644 --- a/src/main/java/com/zdjizhi/common/CustomFile.java +++ b/src/main/java/com/zdjizhi/common/CustomFile.java @@ -1,5 +1,26 @@ package com.zdjizhi.common; +import java.io.Serializable; -public class CustomFile { +public class CustomFile implements Serializable { + + String fileName; + + byte[] content; + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public byte[] getContent() { + return content; + } + + public void setContent(byte[] content) { + this.content = content; + } } diff --git a/src/main/java/com/zdjizhi/common/KnowledgeLog.java b/src/main/java/com/zdjizhi/common/KnowledgeLog.java new file mode 100644 index 0000000..d72f7df --- /dev/null +++ b/src/main/java/com/zdjizhi/common/KnowledgeLog.java @@ -0,0 +1,91 @@ +package com.zdjizhi.common; + +public class KnowledgeLog { + public String id; + public String name; + public String path; + public Long size; + public String format; + public String sha256; + public String version; + public String updateTime; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public Long getSize() { + return size; + } + + public void setSize(Long size) { + this.size = size; + } + + public String getFormat() { + return format; + } + + public void setFormat(String format) { + this.format = format; + } + + public String getSha256() { + return sha256; + } + + public void setSha256(String sha256) { + this.sha256 = sha256; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(String updateTime) { + this.updateTime = updateTime; + } + + + @Override + public String toString() { + return "KnowledgeLog{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", path='" + path + '\'' + + ", size=" + size + + ", format='" + format + '\'' + + ", sha256='" + sha256 + '\'' + + ", version='" + version + '\'' + + ", updateTime='" + updateTime + '\'' + + '}'; + } +} diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index d30d1f0..2ef2b1b 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -1,22 +1,31 @@ package com.zdjizhi.etl; +import com.alibaba.nacos.api.PropertyKeyConst; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.DosSketchLog; +import com.zdjizhi.function.BroadcastProcessFunc; import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.*; /** * @author wlh @@ -34,7 +43,28 @@ public class ParseSketchLog { } private static SingleOutputStreamOperator flatSketchSource(){ - return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog()); + + DataStreamSource> broadcastSource=null; + Properties nacosProperties = new Properties(); + + nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR); + nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); + nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); + + if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){ + broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH); + }else { + broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties); + } + + MapStateDescriptor descriptor = + new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class)); + + BroadcastStream> broadcast = broadcastSource.broadcast(descriptor); +// BroadcastConnectedStream> connect = DosSketchSource.createDosSketchSource().connect(broadcast); + return DosSketchSource.createDosSketchSource() + .connect(broadcast).process(new BroadcastProcessFunc()); +// .flatMap(new FlatSketchLog()); } private static WatermarkStrategy createWatermarkStrategy(){ diff --git a/src/main/java/com/zdjizhi/function/BroadcastProcessFunc.java b/src/main/java/com/zdjizhi/function/BroadcastProcessFunc.java new file mode 100644 index 0000000..fcfb624 --- /dev/null +++ b/src/main/java/com/zdjizhi/function/BroadcastProcessFunc.java @@ -0,0 +1,76 @@ +package com.zdjizhi.function; + +import com.fasterxml.jackson.databind.JavaType; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.common.DosSketchLog; +import com.zdjizhi.etl.ParseSketchLog; +import com.zdjizhi.utils.IpUtils; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BroadcastProcessFunc extends BroadcastProcessFunction, DosSketchLog> { + private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); + private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + System.out.println("begin init"); + IpUtils.loadIpLook(); + System.out.println("init over"); + } + + @Override + public void processElement(String value, ReadOnlyContext ctx, Collector out) throws Exception { + try { + if (StringUtil.isNotBlank(value)){ + HashMap sketchSource = jsonMapperInstance.fromJson(value, hashmapJsonType); + long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); + long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); + String attackType = sketchSource.get("attack_type").toString(); + ArrayList> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); + for (HashMap obj : reportIpList) { + DosSketchLog dosSketchLog = new DosSketchLog(); + dosSketchLog.setSketch_start_time(sketchStartTime); + dosSketchLog.setSketch_duration(sketchDuration); + dosSketchLog.setAttack_type(attackType); + String sourceIp = obj.get("source_ip").toString(); + String destinationIp = obj.get("destination_ip").toString(); + long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); + long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); + long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); + dosSketchLog.setSource_ip(sourceIp); + dosSketchLog.setDestination_ip(destinationIp); + dosSketchLog.setSketch_sessions(sketchSessions); + dosSketchLog.setSketch_packets(sketchPackets); + dosSketchLog.setSketch_bytes(sketchBytes); + out.collect(dosSketchLog); + logger.debug("数据解析成功:{}",dosSketchLog.toString()); + } + } + } catch (Exception e) { + logger.error("数据解析错误:{} \n{}",value,e); + } + } + + @Override + public void processBroadcastElement(Map value, Context ctx, Collector out) throws Exception { + IpUtils.updateIpLook(value); + } + +} + + + diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java index 6980062..deaed61 100644 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java @@ -1,12 +1,15 @@ package com.zdjizhi.source; import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.CustomFile; import com.zdjizhi.utils.FlinkEnvironmentUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import java.util.List; +import java.util.Map; import java.util.Properties; /** @@ -31,4 +34,13 @@ public class DosSketchSource { new SimpleStringSchema(), properties)) .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM); } + + + public static DataStreamSource> broadcastSource(Properties nacosProperties, String STORE_PATH){ + return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT,STORE_PATH)); + } + + public static DataStreamSource> singleBroadcastSource(Properties nacosProperties){ + return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT)); + } } diff --git a/src/main/java/com/zdjizhi/source/HttpSource.java b/src/main/java/com/zdjizhi/source/HttpSource.java new file mode 100644 index 0000000..6451fc1 --- /dev/null +++ b/src/main/java/com/zdjizhi/source/HttpSource.java @@ -0,0 +1,182 @@ +package com.zdjizhi.source; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.json.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.fasterxml.jackson.databind.JavaType; +import com.google.common.base.Joiner; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.common.KnowledgeLog; +import com.zdjizhi.utils.*; +import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.Executor; + + +public class HttpSource extends RichHttpSourceFunction> { + + private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); + + private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; + + //连接nacos的配置 + private Properties nacosProperties; + + //nacos data id + private String NACOS_DATA_ID; + + //nacos group + private String NACOS_GROUP; + + //nacos 连接超时时间 + private long NACOS_READ_TIMEOUT; + + //上传到hdfs的路径 + private String STORE_PATH; + + private ConfigService configService; + +// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); +// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); + private static Map updateMap = new HashMap<>(); + private static HashMap knowledgeFileCache; + private boolean isRunning = true; + + + public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) { + this.nacosProperties = nacosProperties; + this.NACOS_DATA_ID = NACOS_DATA_ID; + this.NACOS_GROUP = NACOS_GROUP; + this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; + this.STORE_PATH = storePath; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + //初始化元数据缓存 + updateMap = new HashMap<>(16); + //初始化定位库缓存 + knowledgeFileCache = new HashMap<>(16); + logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); + configService = NacosFactory.createConfigService(nacosProperties); + } + @Override + public void run(SourceContext ctx) throws Exception { +// ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String format = formatter.format(new Date()); + logger.info(format + "receive config from nacos:" + config); + System.out.println(format + "receive config from nacos:" + config); + if (StringUtil.isNotBlank(config)) { + ArrayList metaList = JsonPath.parse(config).read(EXPR); + loadKnowledge(metaList); + } + + + configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + try { + logger.info("receive update config:" + configMsg); + if (StringUtil.isNotBlank(configMsg)) { + ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + String filePath = knowledgeJson.getStr("path"); + if (!sha256.equals(updateMap.get(fileName))) { + updateMap.put(fileName, sha256); + updateKnowledge(fileName, filePath); + } + + } + ctx.collect(knowledgeFileCache); + } + } + + } catch (Exception e) { + logger.error("监听nacos配置失败", e); + } + System.out.println(configMsg); + } + }); + + while (isRunning) { + Thread.sleep(10000); + } + + } + + private void loadKnowledge(ArrayList metaList) { + InputStream inputStream = null; + try { + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + String filePath = knowledgeJson.getStr("path"); + Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); + HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); + inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); + updateMap.put(fileName, sha256); + knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream)); + } + } + } catch (IOException ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + } + } + + + private void updateKnowledge(String fileName, String filePath) { + InputStream inputStream = null; + FileOutputStream outputStream = null; + try { + Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); + HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); + inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); + byte[] bytes = IOUtils.toByteArray(inputStream); + HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes); + knowledgeFileCache.put(fileName, bytes); + } catch (IOException ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(outputStream); + } + } + @Override + public void cancel() { + this.isRunning = false; + } + + +} diff --git a/src/main/java/com/zdjizhi/source/HttpSourceFunction.java b/src/main/java/com/zdjizhi/source/HttpSourceFunction.java new file mode 100644 index 0000000..8fd58a9 --- /dev/null +++ b/src/main/java/com/zdjizhi/source/HttpSourceFunction.java @@ -0,0 +1,6 @@ +package com.zdjizhi.source; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +public interface HttpSourceFunction extends SourceFunction { +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java b/src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java new file mode 100644 index 0000000..582aa13 --- /dev/null +++ b/src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java @@ -0,0 +1,10 @@ +package com.zdjizhi.source; + +import org.apache.flink.api.common.functions.AbstractRichFunction; + +public abstract class RichHttpSourceFunction extends AbstractRichFunction implements HttpSourceFunction { + private static final long serialVersionUID = 1L; + + public RichHttpSourceFunction() { + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/source/SingleHttpSource.java b/src/main/java/com/zdjizhi/source/SingleHttpSource.java new file mode 100644 index 0000000..f0aa11b --- /dev/null +++ b/src/main/java/com/zdjizhi/source/SingleHttpSource.java @@ -0,0 +1,237 @@ +package com.zdjizhi.source; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.json.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.fasterxml.jackson.databind.JavaType; +import com.google.common.base.Joiner; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.common.KnowledgeLog; +import com.zdjizhi.utils.*; +import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.concurrent.Executor; + +public class SingleHttpSource extends RichHttpSourceFunction> { + + private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); + private static HashMap knowledgeFileCache; + + private Properties nacosProperties; + + private String NACOS_DATA_ID; + + private String NACOS_GROUP; + + private long NACOS_READ_TIMEOUT; + + private static String STORE_PATH; + + private ConfigService configService; + +// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); +// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); + private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; + + + private static Map updateMap = new HashMap<>(); + + private boolean isRunning = true; + + + public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) { + this.nacosProperties = nacosProperties; + this.NACOS_DATA_ID = NACOS_DATA_ID; + this.NACOS_GROUP = NACOS_GROUP; + this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; + } + + + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); + configService = NacosFactory.createConfigService(nacosProperties); + //初始化元数据缓存 + updateMap = new HashMap<>(16); + //初始化定位库缓存 + knowledgeFileCache = new HashMap<>(16); + } + + @Override + public void run(SourceContext ctx) throws Exception { +// ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); +// List customFiles = new ArrayList<>(); + if (StringUtil.isNotBlank(config)) { + ArrayList metaList = JsonPath.parse(config).read(EXPR); + loadKnowledge(metaList); + } +// if (StringUtil.isNotBlank(config)) { +// List knowledgeLogListList = jsonMapperInstance.fromJson(config, listType); +// if (knowledgeLogListList.size()>=1){ +// for (KnowledgeLog knowledgeLog : knowledgeLogListList) { +// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()); +// String sha256 = knowledgeLog.getSha256(); +// updateMap.put(name,sha256); +// } +// } +// } + + configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + try { + logger.info("receive update config:" + configMsg); + if (StringUtil.isNotBlank(configMsg)) { + ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + String filePath = knowledgeJson.getStr("path"); + if (!sha256.equals(updateMap.get(fileName))) { + updateMap.put(fileName, sha256); + updateKnowledge(fileName, filePath); + } + + } + ctx.collect(knowledgeFileCache); + } + } +// ArrayList customFiles = new ArrayList<>(); +// List knowledgeInfo = jsonMapperInstance.fromJson(configMsg, listType); +// for (KnowledgeLog knowledgeLog : knowledgeInfo) { +// String fileName = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()); +// String sha256 = knowledgeLog.getSha256(); +// String filePath = knowledgeLog.getPath(); +// String version = knowledgeLog.getVersion(); +// if (updateMap.containsKey(fileName)){ +// if ((!sha256.equals(updateMap.get(fileName)))&& ("latest".equals(version))){ +// CustomFile customFile = loadKnowledge(fileName, filePath); +// customFiles.add(customFile); +// updateMap.put(fileName,sha256); +// } +// }else { +// updateMap.put(fileName, sha256); +// CustomFile customFile = loadKnowledge(fileName, filePath); +// customFiles.add(customFile); +// } +// } +//// customFiles=loadKnowledge(configMsg); +//// ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); +// ctx.collect(customFiles); + + } catch (Exception e) { + logger.error("监听nacos配置失败", e); + } + System.out.println(configMsg); + } + }); + + while (isRunning) { + Thread.sleep(10000); + } + + + + } + +// private CustomFile loadKnowledge(String fileName, String filePath) { +// InputStream inputStream = null; +// FileOutputStream outputStream = null; +// CustomFile customFile = new CustomFile(); +// try { +// customFile.setFileName(fileName); +// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); +// HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); +// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); +// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); +// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); +// outputStream = new FileOutputStream(file); +// byte[] bytes = IOUtils.toByteArray(inputStream); +// customFile.setContent(bytes); +// inputStream = new ByteArrayInputStream(customFile.getContent()); +// IoUtil.copy(inputStream, outputStream); +// +// } catch (IOException ioException) { +// ioException.printStackTrace(); +// } finally { +// IOUtils.closeQuietly(inputStream); +// IOUtils.closeQuietly(outputStream); +// } +// return customFile; +// } +private void loadKnowledge(ArrayList metaList) { + InputStream inputStream = null; + try { + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + String filePath = knowledgeJson.getStr("path"); + Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); + HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); + inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); + updateMap.put(fileName, sha256); + knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream)); + } + } + } catch (IOException ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + } +} + + + private void updateKnowledge(String fileName, String filePath) { + InputStream inputStream = null; + FileOutputStream outputStream = null; + try { + Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); + HttpClientUtils2 httpClientUtils = new HttpClientUtils2(); + inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); + FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); + File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); + outputStream = new FileOutputStream(file); + byte[] bytes = IOUtils.toByteArray(inputStream); + knowledgeFileCache.put(fileName, bytes); + } catch (IOException ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(outputStream); + } + } + + @Override + public void cancel() { + this.isRunning = false; + } +} + diff --git a/src/main/java/com/zdjizhi/utils/FileByteUtils.java b/src/main/java/com/zdjizhi/utils/FileByteUtils.java new file mode 100644 index 0000000..bb1f5aa --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/FileByteUtils.java @@ -0,0 +1,24 @@ +package com.zdjizhi.utils; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +public class FileByteUtils { + + public static byte[] getFileBytes (String filePath) throws IOException { + File file = new File(filePath); + FileInputStream fis = new FileInputStream(file); + ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); + byte[] b = new byte[1024]; + int n; + while ((n = fis.read(b)) != -1) { + bos.write(b, 0, n); + } + fis.close(); + byte[] data = bos.toByteArray(); + bos.close(); + return data; + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/HdfsUtils.java b/src/main/java/com/zdjizhi/utils/HdfsUtils.java index e9484e7..25f90bf 100644 --- a/src/main/java/com/zdjizhi/utils/HdfsUtils.java +++ b/src/main/java/com/zdjizhi/utils/HdfsUtils.java @@ -1,4 +1,7 @@ package com.zdjizhi.utils; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.CommonConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -8,48 +11,69 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; public class HdfsUtils { - - private static final Logger logger = LoggerFactory.getLogger(HdfsUtils.class); - + private static final Log logger = LogFactory.get(); private static FileSystem fileSystem; static { Configuration configuration = new Configuration(); try { + //指定用户 + //配置hdfs相关信息 +// configuration.set("fs.defaultFS","hdfs://ns1"); +// configuration.set("hadoop.proxyuser.root.hosts","*"); +// configuration.set("hadoop.proxyuser.root.groups","*"); +// configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); +// configuration.set("dfs.nameservices","ns1"); +// configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); +// configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1); +// configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2); +// configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); +// //创建fileSystem,用于连接hdfs +//// fileSystem = FileSystem.get(configuration); + System.setProperty("HADOOP_USER_NAME", CommonConfig.HDFS_USER); //创建fileSystem,用于连接hdfs - fileSystem = FileSystem.get(configuration); + fileSystem = FileSystem.get(new URI(CommonConfig.HDFS_URI_NS1),configuration); } catch (IOException e) { throw new RuntimeException(e); } + catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + public static boolean isExists(String filePath) throws IOException { + return fileSystem.exists(new Path(filePath)); } public static byte[] getFileBytes(String filePath) throws IOException { - FSDataInputStream open = null; - try { - open = fileSystem.open(new Path(filePath)); + try (FSDataInputStream open = fileSystem.open(new Path(filePath))) { byte[] bytes = new byte[open.available()]; - open.read(0,bytes,0, open.available()); + open.read(0, bytes, 0, open.available()); return bytes; - } finally { - if (open != null) { - open.close(); - } + } catch (IOException e) { + logger.error("An I/O exception when files are download from HDFS. Message is :" + e.getMessage()); + } + return null; + } + + public static void uploadFileByBytes(String filePath,byte[] bytes) throws IOException { + try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) { + fsDataOutputStream.write(bytes); + fsDataOutputStream.flush(); + } catch (RuntimeException e) { + logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage()); + } catch (IOException e) { + logger.error("An I/O exception when files are uploaded to HDFS. Message is :" + e.getMessage()); } } - public static void uploadFileByBytes(String filePath, byte[] bytes) throws IOException { - FSDataOutputStream fsDataOutputStream = null; - try { - fsDataOutputStream = fileSystem.create(new Path(filePath), true); - fsDataOutputStream.write(bytes); - } finally { - if (fsDataOutputStream != null) { - fsDataOutputStream.close(); - } - } + public static void rename(String src, String dst) throws IOException { + fileSystem.rename(new Path(src),new Path(dst)); } diff --git a/src/main/java/com/zdjizhi/utils/HttpSource.java b/src/main/java/com/zdjizhi/utils/HttpSource.java deleted file mode 100644 index 8891645..0000000 --- a/src/main/java/com/zdjizhi/utils/HttpSource.java +++ /dev/null @@ -1,139 +0,0 @@ -package com.zdjizhi.utils; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.fasterxml.jackson.databind.JavaType; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.CustomFile; -import com.zdjizhi.common.KnowledgeLog; -//import com.zdjizhi.function.source.RichHttpSourceFunction; -import org.apache.commons.io.IOUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executor; - - -public class HttpSource extends RichHttpSourceFunction> { - - private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); - - //连接nacos的配置 - private Properties nacosProperties; - - //nacos data id - private String NACOS_DATA_ID; - - //nacos group - private String NACOS_GROUP; - - //nacos 连接超时时间 - private long NACOS_READ_TIMEOUT; - - //上传到hdfs的路径 - private String STORE_PATH; - - private ConfigService configService; - - private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); - private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); - - private boolean isRunning = true; - - - public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) { - this.nacosProperties = nacosProperties; - this.NACOS_DATA_ID = NACOS_DATA_ID; - this.NACOS_GROUP = NACOS_GROUP; - this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; - this.STORE_PATH = storePath; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); - configService = NacosFactory.createConfigService(nacosProperties); - } - - @Override - public void run(SourceContext ctx) throws Exception { - String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); - logger.info("receive config from nacos:" + config); - System.out.println("receive config from nacos:" + config); - List customFiles; - //从nacos拿到配置后连接hos下载知识库到内存 - customFiles = loadKnowledge(config); - //将知识库传递到下一节点,即广播 - ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); - - configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - try { - logger.info("receive update config:" + configMsg); - List customFiles = new ArrayList<>(); - customFiles = loadKnowledge(configMsg); - ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); - //将更新后的文件重新上传至hdfs - for (CustomFile customFile : customFiles) { - logger.info("begin upload to hdfs:" + STORE_PATH + customFile.getFileName()); - HdfsUtils.uploadFileByBytes(STORE_PATH + customFile.getFileName(),customFile.getContent()); - logger.info(STORE_PATH + customFile.getFileName() + " upload finished"); - } - } catch (Exception e) { - logger.error("监听nacos配置失败", e); - } - System.out.println(configMsg); - } - }); - } - - //下载知识库到内存 - public List loadKnowledge(String config) { - List customFiles = new ArrayList<>(); - List knowledges = jsonMapperInstance.fromJson(config, listType); - for (KnowledgeLog knowledge : knowledges) { - CustomFile customFile = new CustomFile(); - String fileName = knowledge.getName().concat(".").concat(knowledge.getFormat()); - customFile.setFileName(fileName); - InputStream inputStream = null; - try { - Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - HttpClientUtils httpClientUtils = new HttpClientUtils(); - inputStream = httpClientUtils.httpGetInputStream(knowledge.getPath(), 3000, header); - byte[] bytes = IOUtils.toByteArray(inputStream); - customFile.setContent(bytes); - } catch (IOException ioException) { - ioException.printStackTrace(); - } finally { - IOUtils.closeQuietly(inputStream); - } - customFiles.add(customFile); - } - return customFiles; - } - - @Override - public void cancel() { - this.isRunning = false; - } - - - -} diff --git a/src/main/java/com/zdjizhi/utils/IpUtils.java b/src/main/java/com/zdjizhi/utils/IpUtils.java index 43bfc11..adc1d57 100644 --- a/src/main/java/com/zdjizhi/utils/IpUtils.java +++ b/src/main/java/com/zdjizhi/utils/IpUtils.java @@ -1,18 +1,104 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.CustomFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.Map; public class IpUtils { + public static IpLookupV2 ipLookup ; + + private static Logger LOG = LoggerFactory.getLogger(IpUtils.class); + /** * IP定位库工具类 */ - public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) - .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb") - .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb") - .loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb") - .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb") - .build(); +// public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) +// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb") +// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb") +// .loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb") +// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb") +// .build(); + + + public static void loadIpLook(){ + try { + IpLookupV2.Builder builder = new IpLookupV2.Builder(false); + if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) { + byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb"); + if (ipv4BuiltBytes!=null){ + InputStream ipv4BuiltInputStream = new ByteArrayInputStream(ipv4BuiltBytes); + builder.loadDataFileV4(ipv4BuiltInputStream); + } + + byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb"); + if (ipv6BuiltBytes!=null){ + InputStream ipv6BuiltInputStream = new ByteArrayInputStream(ipv6BuiltBytes); + builder.loadDataFileV6(ipv6BuiltInputStream); + } + + byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb"); + if (ipv4UserBytes!=null){ + InputStream ipv4UserInputStream = new ByteArrayInputStream(ipv4UserBytes); + builder.loadDataFilePrivateV4(ipv4UserInputStream); + } + + byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb"); + if (ipv6UserBytes!=null){ + InputStream ipv6UserInputStream = new ByteArrayInputStream(ipv6UserBytes); + builder.loadDataFilePrivateV6(ipv6UserInputStream); + } + }else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){ + byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb"); + if (ipv4BuiltBytes!=null){ + InputStream ipv4BuiltInputStream = new ByteArrayInputStream(ipv4BuiltBytes); + builder.loadDataFileV4(ipv4BuiltInputStream); + } + + byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb"); + if (ipv6BuiltBytes!=null){ + InputStream ipv6BuiltInputStream = new ByteArrayInputStream(ipv6BuiltBytes); + builder.loadDataFileV6(ipv6BuiltInputStream); + } + + byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb"); + if (ipv4UserBytes!=null){ + InputStream ipv4UserInputStream = new ByteArrayInputStream(ipv4UserBytes); + builder.loadDataFilePrivateV4(ipv4UserInputStream); + } + + byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb"); + if (ipv6UserBytes!=null){ + InputStream ipv6UserInputStream = new ByteArrayInputStream(ipv6UserBytes); + builder.loadDataFilePrivateV6(ipv6UserInputStream); + } + } + ipLookup = builder.build(); + + }catch (Exception e){ + LOG.error("加载失败",e); + } + } + + public static void updateIpLook(Map knowledgeFileCache){ + try{ + IpLookupV2.Builder builder = new IpLookupV2.Builder(false); + ipLookup= builder.loadDataFileV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_built_in.mmdb"))) + .loadDataFileV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_built_in.mmdb"))) + .loadDataFilePrivateV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_user_defined.mmdb"))) + .loadDataFilePrivateV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_user_defined.mmdb"))) + .build(); + }catch (Exception e){ + LOG.error("加载失败",e); + } + + } public static void main(String[] args) { System.out.println(ipLookup.countryLookup("49.7.115.37")); diff --git a/src/main/java/com/zdjizhi/utils/KnowledgeBase.java b/src/main/java/com/zdjizhi/utils/KnowledgeBase.java deleted file mode 100644 index f4198f5..0000000 --- a/src/main/java/com/zdjizhi/utils/KnowledgeBase.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.zdjizhi.utils; -import lombok.Data; -/** - * @author fy - * @version 1.0 - * @date 2022/10/19 18:27 - */ - - -@Data -public class KnowledgeBase { - - private String id; - - private String name; - - private String type; - - private String path; - - private Long size; - - private String format; - - private String sha256; - - private String origin_url; - - private String version; - - private String updateTime; -} diff --git a/src/main/java/com/zdjizhi/utils/KnowledgeConstant.java b/src/main/java/com/zdjizhi/utils/KnowledgeConstant.java deleted file mode 100644 index c674bb6..0000000 --- a/src/main/java/com/zdjizhi/utils/KnowledgeConstant.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.zdjizhi.utils; - -/** - * @author fy - * @version 1.0 - * @date 2022/10/19 18:25 - * 提取了下知识库的名称 - */ -public class KnowledgeConstant { - - public static final String DAT = "dat"; - public static final String IP_USER_DEFINED_V4 = "ip_v4_user_defined"; - public static final String IP_USER_DEFINED_V6 = "ip_v6_user_defined"; - public static final String IP_BUILT_IN_V4 = "ip_v4_built_in"; - public static final String IP_BUILT_IN_V6 = "ip_v6_built_in"; - public static final String IP_USER_DEFINED_V4_MMDB = "ip_v4_user_defined.mmdb"; - public static final String IP_USER_DEFINED_V6_MMDB = "ip_v6_user_defined.mmdb"; - public static final String IP_BUILT_IN_V4_MMDB = "ip_v4_built_in.mmdb"; - public static final String IP_BUILT_IN_V6_MMDB = "ip_v6_built_in.mmdb"; - public static final String LATEST = "latest"; - public static final String TOKEN = "token"; - -} - diff --git a/src/main/java/com/zdjizhi/utils/KnowledgeUtils.java b/src/main/java/com/zdjizhi/utils/KnowledgeUtils.java deleted file mode 100644 index b9ec7e0..0000000 --- a/src/main/java/com/zdjizhi/utils/KnowledgeUtils.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.zdjizhi.utils; - -import cn.hutool.core.io.IoUtil; -import cn.hutool.http.Header; -import com.fasterxml.jackson.databind.JavaType; -import org.apache.http.message.BasicHeader; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.kafka.common.requests.FetchMetadata.log; - -/** - * @author fy - * @version 1.0 - * @date 2022/10/19 14:07 - */ -public class KnowledgeUtils { - - private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); - private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeBase.class); - - - - - public Map queryLoadKnowledgeInfo(String knowledgeMetadata, IpLocationConfiguration ipLocationConfiguration) { - - log.info("update knowledge Base version:{},update knowledge Base content:{}", ipLocationConfiguration, knowledgeMetadata); - Map map = new HashMap<>(); - - if (StringUtil.isNotEmpty(ipLocationConfiguration) && StringUtil.isNotEmpty(knowledgeMetadata)) { -// List knowledgeBaseList = JsonMapper.fromJsonString(knowledgeMetadata, List.class, KnowledgeBase.class); - - List knowledgeBaseList = jsonMapperInstance.fromJson(knowledgeMetadata, listType);// TODO: 2022/10/21 - - String ipV4BuiltIn = ipLocationConfiguration.getIpV4BuiltIn(); - String ipV4UserDefined = ipLocationConfiguration.getIpV4UserDefined(); - String ipV6BuiltIn = ipLocationConfiguration.getIpV6BuiltIn(); - String ipV6UserDefined = ipLocationConfiguration.getIpV6UserDefined(); - - for (KnowledgeBase knowledgeBase : knowledgeBaseList) { - String name = knowledgeBase.getName(); - String version = knowledgeBase.getVersion(); - String concat = name.concat(":").concat(version); - - if (StringUtil.equals(concat, ipV4BuiltIn) - || (StringUtil.equals(name, ipV4BuiltIn) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { - map.put(KnowledgeConstant.IP_BUILT_IN_V4, knowledgeBase); - } else if (StringUtil.equals(concat, ipV4UserDefined) - || (StringUtil.equals(name, ipV4UserDefined) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { - map.put(KnowledgeConstant.IP_USER_DEFINED_V4, knowledgeBase); - } else if (StringUtil.equals(concat, ipV6BuiltIn) - || (StringUtil.equals(name, ipV6BuiltIn) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { - map.put(KnowledgeConstant.IP_BUILT_IN_V6, knowledgeBase); - } else if (StringUtil.equals(concat, ipV6UserDefined) - || (StringUtil.equals(name, ipV6UserDefined) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { - map.put(KnowledgeConstant.IP_USER_DEFINED_V6, knowledgeBase); - } - } - } - return map; - } - - - private void download(KnowledgeBase knowledgeBase) { - - String id = knowledgeBase.getId(); - String name = knowledgeBase.getName(); - String sha256 = knowledgeBase.getSha256(); - String format = knowledgeBase.getFormat(); - FileOutputStream outputStream = null; - InputStream inputStream = null; - try { - Header header = new BasicHeader(KnowledgeConstant.TOKEN, hosConfig.getToken()); - inputStream = httpClientService.httpGetInputStream(knowledgeBase.getPath(), httpConfig.getServerResponseTimeOut(), header); - File file = new File(KnowledgeConstant.DAT.concat(File.separator).concat(name).concat(".").concat(format)); - outputStream = new FileOutputStream(file); - IoUtil.copy(inputStream, outputStream); - log.info("knowledge download name :{},version:{}", name, knowledgeBase.getVersion()); - } catch (IOException e) { - log.error("file not fount,:{}", e); - } finally { - IoUtil.close(inputStream); - IoUtil.close(outputStream); - } - updateMap.put(id, sha256); - } - - -} diff --git a/src/main/java/com/zdjizhi/utils/LinkUtils.java b/src/main/java/com/zdjizhi/utils/LinkUtils.java deleted file mode 100644 index 01510be..0000000 --- a/src/main/java/com/zdjizhi/utils/LinkUtils.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.zdjizhi.etl.utils; - -import com.opencsv.CSVParser; -import com.opencsv.CSVReader; -import com.zdjizhi.base.common.CnRecordLog; -import com.zdjizhi.base.common.CommonConfig; -import com.zdjizhi.etl.common.LinkAndIspInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Set; - -/** - * @ Author pengfeixu - * @ Date 2022/3/30 - * @ Description - */ -public class LinkUtils { - private static Logger LOG = LoggerFactory.getLogger(LinkUtils.class); - public static HashMap linkmap = new HashMap<>(); -// private static NacosUtils nacosUtils = new NacosUtils(); - // private static HashMap linkmap = readLinkCsv(CommonConfig.LINK_PATH); - - public static void readLinkCsv(String path) { - HashMap linkDirectionmap = new HashMap<>(); - - try { - DataInputStream in = new DataInputStream(new FileInputStream(new File(path))); - CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR, - CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1); - - for (String[] strs : csvReader) { - long key=Long.parseLong(strs[6]); - LinkAndIspInfo linkDirectionInfo = new LinkAndIspInfo(); - linkDirectionInfo.setEgress_link_direction(strs[3]); - linkDirectionInfo.setIngress_link_direction(strs[3]); - linkDirectionInfo.setLink_id(Integer.parseInt(strs[6])); - linkmap.put(key, linkDirectionInfo); - } - csvReader.close(); - } catch (Exception e) { - LOG.error(e.getMessage()); - } -// return linkDirectionmap; - } - - public static void setLinkMessage(CnRecordLog cnRecordLog) { - if (! linkmap.containsKey(cnRecordLog.getCommon_egress_link_id()) || ! linkmap.containsKey(cnRecordLog.getCommon_ingress_link_id())){ - cnRecordLog.setCommon_egress_link_id(0); - cnRecordLog.setCommon_ingress_link_id(0); - } - - if (linkmap.containsKey(cnRecordLog.getCommon_egress_link_id())){ - cnRecordLog.setEgress_link_direction(linkmap.get(cnRecordLog.getCommon_egress_link_id()).getEgress_link_direction()); - } - if (linkmap.containsKey(cnRecordLog.getCommon_ingress_link_id())){ - cnRecordLog.setIngress_link_direction(linkmap.get(cnRecordLog.getCommon_ingress_link_id()).getIngress_link_direction()); - } - } - - public static void main(String[] args) { -// HashMap map = readLinkCsv(CommonConfig.LINK_PATH); - Set keySet = linkmap.keySet(); - for (long key : keySet){ - System.out.println(linkmap.get(key).toString()); - } - System.out.println(linkmap.size()); - } -} diff --git a/src/main/java/com/zdjizhi/utils/NacosUtils.java b/src/main/java/com/zdjizhi/utils/NacosUtils.java index 6adfc23..fefbe27 100644 --- a/src/main/java/com/zdjizhi/utils/NacosUtils.java +++ b/src/main/java/com/zdjizhi/utils/NacosUtils.java @@ -19,11 +19,11 @@ public class NacosUtils { private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr"); - private static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace"); + private static final String NACOS_STATIC_NAMESPACE = CommonConfigurations.getStringProperty("nacos.static.namespace"); private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username"); private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password"); - private static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id"); - private static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group"); + private static final String NACOS_STATIC_DATA_ID = CommonConfigurations.getStringProperty("nacos.static.data.id"); + private static final String NACOS_STATIC_GROUP = CommonConfigurations.getStringProperty("nacos.static.group"); private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout"); static { @@ -32,7 +32,7 @@ public class NacosUtils { private static void getProperties() { nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR); - nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE); + nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_STATIC_NAMESPACE); nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME); nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD); } @@ -41,11 +41,11 @@ public class NacosUtils { try { getProperties(); ConfigService configService = NacosFactory.createConfigService(nacosProperties); - String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); + String config = configService.getConfig(NACOS_STATIC_DATA_ID, NACOS_STATIC_GROUP, NACOS_READ_TIMEOUT); commonProperties.load(new StringReader(config)); - configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { + configService.addListener(NACOS_STATIC_DATA_ID, NACOS_STATIC_GROUP, new Listener() { @Override public Executor getExecutor() { return null; diff --git a/src/main/java/com/zdjizhi/utils/NacosUtils2.java b/src/main/java/com/zdjizhi/utils/NacosUtils2.java deleted file mode 100644 index 89add4b..0000000 --- a/src/main/java/com/zdjizhi/utils/NacosUtils2.java +++ /dev/null @@ -1,352 +0,0 @@ -package com.zdjizhi.utils; - -import cn.hutool.core.io.IoUtil; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.fasterxml.jackson.databind.JavaType; -//import com.zdjizhi.base.common.CommonConfig; -//import com.zdjizhi.base.common.KnowledgeConstant; -//import com.zdjizhi.base.common.KnowledgeLog; -//import com.zdjizhi.base.utils.CommonConfigurations; -//import com.zdjizhi.base.utils.HttpClientUtils; -import com.zdjizhi.utils.*; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.*; -import java.util.concurrent.Executor; - - -public class NacosUtils2 { - - - private static final Logger logger = LoggerFactory.getLogger(NacosUtils2.class); - private static CommonConfigurations configurations; - - - - - private static Properties nacosProperties = new Properties(); - private static Properties commonProperties = new Properties(); - - private static Map updateMap = new HashMap<>();//存文件路径(放在服务器了) - - private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); - private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); - - private static final String NACOS_SERVER_ADDR = configurations.getStringProperty("nacos.server.addr"); - private static final String NACOS_NAMESPACE = configurations.getStringProperty("nacos.namespace"); - private static final String NACOS_USERNAME = configurations.getStringProperty("nacos.username"); - private static final String NACOS_PASSWORD = configurations.getStringProperty("nacos.password"); - private static final String NACOS_DATA_ID = configurations.getStringProperty("nacos.data.id"); - private static final String NACOS_GROUP = configurations.getStringProperty("nacos.group"); - private static final long NACOS_READ_TIMEOUT = configurations.getLongProperty("nacos.read.timeout"); - - static { - Properties propService; - try { - propService = new Properties(); - propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); - configurations = new CommonConfigurations(propService); - } catch (Exception e) { - logger.error("加载common.properties配置文件失败"); - System.exit(1); - } - } - - - - static { - createConfigService(); - } - - private static void getProperties() { - nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR); - nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE); - nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME); - nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD); - } - - public static void createConfigService() { - try { - getProperties(); - ConfigService configService = NacosFactory.createConfigService(nacosProperties); - String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); -// commonProperties.load(new StringReader(config)); - -// Map pathMap = - loadKnowledge(config);; - - configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - try { -// commonProperties.clear(); -// commonProperties.load(new StringReader(configMsg)); - loadKnowledge(configMsg); -// IdcRenterUtils.readServerCsv(configMsg); - } catch (Exception e) { - logger.error("监听nacos配置失败", e); - } - System.out.println(configMsg); - } - }); - } catch (Exception e) { - e.printStackTrace(); - logger.error("获取nacos配置失败", e); - } - - } - - - //加载nacos配置 - public static void loadKnowledge(String config){ - Map knowledgeLogMap = queryLoadKnowledgeInfo(config); - - - for (Map.Entry knowledgeLogEntry : knowledgeLogMap.entrySet()) { - KnowledgeLog knowledgeLog = knowledgeLogEntry.getValue(); - String id = knowledgeLog.getId(); - String sha256 = knowledgeLog.getSha256(); - if (!StringUtil.equals(sha256,updateMap.get(id))){ - System.out.println(knowledgeLogEntry.getValue()); - download( knowledgeLogEntry.getValue());//下载文件 - } - } - Map path = queryPath(knowledgeLogMap); - - //初始化ip定位库 - IpUtils.initIpLookup(path.get(KnowledgeConstant.IP_BUILT_IN_V4_MMDB),path.get(KnowledgeConstant.ASN_V4),path.get(KnowledgeConstant.ASN_V6)); - IdcRenterUtils.readServerCsv(path.get(KnowledgeConstant.ISP)); - LinkUtils.readLinkCsv(path.get(KnowledgeConstant.LINK)); - IspUtils.readServerCsv(path.get(KnowledgeConstant.ISP)); - FcUtils.readCsv(path.get(KnowledgeConstant.WEBSKT)); - FcUtils.readIcpCsv(path.get(KnowledgeConstant.ICP)); - WhoisUtils.readJson(path.get(KnowledgeConstant.WHOIS)); - DnsServerUtils.readServerCsv(path.get(KnowledgeConstant.DNSPATH)); - AppUtils.readJson(path.get(KnowledgeConstant.APPSKT)); - IPUtils.readInternalIpCsv(path.get(KnowledgeConstant.INTERNALIP)); - - } - - //query知识库中json信息 - public static Map queryLoadKnowledgeInfo(String content){ -// KnowledgeLog knowledgeLog = new KnowledgeLog(); - Map map = new HashMap<>(); - -// ArrayList> o = jsonMapperInstance.fromJson(content, listType); - List o = jsonMapperInstance.fromJson(content, listType); - for (KnowledgeLog knowledgeLog : o) { - -// } -// for (HashMap obj : o) { -// knowledgeLog.setId(obj.get("id").toString()); -// knowledgeLog.setName(obj.get("name").toString()); -// knowledgeLog.setPath(obj.get("path").toString()); -// knowledgeLog.setSize(Long.parseLong(obj.get("size").toString())); -// knowledgeLog.setFormat(obj.get("format").toString()); -// knowledgeLog.setSha256(obj.get("sha256").toString()); -// knowledgeLog.setVersion(obj.get("version").toString()); -// knowledgeLog.setUpdateTime(obj.get("updateTime").toString()); - - if ((KnowledgeConstant.IP_BUILT_IN_V4_MMDB.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))) && (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.IP_BUILT_IN_V4_MMDB,knowledgeLog); - } - if (((KnowledgeConstant.ASN_V4.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))) )&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.ASN_V4,knowledgeLog); - } - if ((KnowledgeConstant.ASN_V6.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion())) ){ - map.put(KnowledgeConstant.ASN_V6,knowledgeLog); - } - if ((KnowledgeConstant.WEBSKT.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion())) )){ - map.put(KnowledgeConstant.WEBSKT,knowledgeLog); - } - if ((KnowledgeConstant.DNSPATH.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.DNSPATH,knowledgeLog); - } - if ((KnowledgeConstant.APPSKT.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.APPSKT,knowledgeLog); - } - if ((KnowledgeConstant.WHOIS.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.WHOIS,knowledgeLog); - } - if( (KnowledgeConstant.ICP.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.ICP,knowledgeLog); - } - if ((KnowledgeConstant.IOC.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.IOC,knowledgeLog); - } - if ((KnowledgeConstant.LINK.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.LINK,knowledgeLog); - } - if (KnowledgeConstant.ISP.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.ISP,knowledgeLog); - } - if (KnowledgeConstant.IDCRENTER.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.IDCRENTER,knowledgeLog); - } - if (KnowledgeConstant.INTERNALIP.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ - map.put(KnowledgeConstant.INTERNALIP,knowledgeLog); - } -// } -// download(knowledgeLog); - } - return map; - } - - - - - - //匹配路径,文件存放在服务器的路径 - private static Map queryPath(Map map){ - Map pathMap=new HashMap<>(); - KnowledgeLog ip_v4 = map.get(KnowledgeConstant.IP_BUILT_IN_V4_MMDB);//获取值 - KnowledgeLog asn_v4 = map.get(KnowledgeConstant.ASN_V4); - KnowledgeLog asn_v6 = map.get(KnowledgeConstant.ASN_V6); - KnowledgeLog webskt = map.get(KnowledgeConstant.WEBSKT); - KnowledgeLog dnsPath = map.get(KnowledgeConstant.DNSPATH); - KnowledgeLog appskt = map.get(KnowledgeConstant.APPSKT); - KnowledgeLog whois = map.get(KnowledgeConstant.WHOIS); - KnowledgeLog icp = map.get(KnowledgeConstant.ICP); - KnowledgeLog ioc = map.get(KnowledgeConstant.IOC); - KnowledgeLog link = map.get(KnowledgeConstant.LINK); - KnowledgeLog isp = map.get(KnowledgeConstant.ISP); - KnowledgeLog idcRenter = map.get(KnowledgeConstant.IDCRENTER); - KnowledgeLog internalIp = map.get(KnowledgeConstant.INTERNALIP); - - if (StringUtil.isEmpty(ip_v4)|| !new File(CommonConfig.IP_PATH.concat(ip_v4.getName().concat(".").concat(ip_v4.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.IP_BUILT_IN_V4_MMDB,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.IP_BUILT_IN_V4_MMDB); - }else{ - pathMap.put(KnowledgeConstant.IP_BUILT_IN_V4_MMDB,CommonConfig.IP_PATH.concat(File.separator).concat(ip_v4.getName()).concat(".").concat(ip_v4.getFormat())); - } - if (StringUtil.isEmpty(asn_v4)|| !new File(CommonConfig.IP_PATH.concat(asn_v4.getName().concat(".").concat(asn_v4.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.ASN_V4,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ASN_V4); - }else{ - pathMap.put(KnowledgeConstant.ASN_V4,CommonConfig.IP_PATH.concat(File.separator).concat(asn_v4.getName()).concat(".").concat(asn_v4.getFormat())); - } - if (StringUtil.isEmpty(asn_v6)|| !new File(CommonConfig.IP_PATH.concat(asn_v6.getName().concat(".").concat(asn_v6.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.ASN_V6,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ASN_V6); - }else{ - pathMap.put(KnowledgeConstant.ASN_V6,CommonConfig.IP_PATH.concat(File.separator).concat(asn_v6.getName()).concat(".").concat(asn_v6.getFormat())); - } - if (StringUtil.isEmpty(webskt)|| !new File(CommonConfig.IP_PATH.concat(webskt.getName().concat(".").concat(webskt.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.WEBSKT,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.WEBSKT); - }else{ - pathMap.put(KnowledgeConstant.WEBSKT,CommonConfig.IP_PATH.concat(File.separator).concat(webskt.getName()).concat(".").concat(webskt.getFormat())); - } - if (StringUtil.isEmpty(dnsPath)|| !new File(CommonConfig.IP_PATH.concat(dnsPath.getName().concat(".").concat(dnsPath.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.DNSPATH,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.DNSPATH); - }else{ - pathMap.put(KnowledgeConstant.DNSPATH,CommonConfig.IP_PATH.concat(File.separator).concat(dnsPath.getName()).concat(".").concat(dnsPath.getFormat())); - } - if (StringUtil.isEmpty(appskt)|| !new File(CommonConfig.IP_PATH.concat(appskt.getName().concat(".").concat(appskt.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.APPSKT,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.APPSKT); - }else{ - pathMap.put(KnowledgeConstant.APPSKT,CommonConfig.IP_PATH.concat(File.separator).concat(appskt.getName()).concat(".").concat(appskt.getFormat())); - } - if (StringUtil.isEmpty(whois)|| !new File(CommonConfig.IP_PATH.concat(whois.getName().concat(".").concat(whois.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.WHOIS,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.WHOIS); - }else{ - pathMap.put(KnowledgeConstant.WHOIS,CommonConfig.IP_PATH.concat(File.separator).concat(whois.getName()).concat(".").concat(whois.getFormat())); - } - if (StringUtil.isEmpty(icp)|| !new File(CommonConfig.IP_PATH.concat(icp.getName().concat(".").concat(icp.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.ICP,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ICP); - }else{ - pathMap.put(KnowledgeConstant.ICP,CommonConfig.IP_PATH.concat(File.separator).concat(icp.getName()).concat(".").concat(icp.getFormat())); - } - if (StringUtil.isEmpty(ioc)|| !new File(CommonConfig.IP_PATH.concat(ioc.getName().concat(".").concat(ioc.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.IOC,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.IOC); - }else{ - pathMap.put(KnowledgeConstant.IOC,CommonConfig.IP_PATH.concat(File.separator).concat(ioc.getName()).concat(".").concat(ioc.getFormat())); - } - if (StringUtil.isEmpty(link)|| !new File(CommonConfig.IP_PATH.concat(link.getName().concat(".").concat(link.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.LINK,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.LINK); - }else{ - pathMap.put(KnowledgeConstant.LINK,CommonConfig.IP_PATH.concat(File.separator).concat(link.getName()).concat(".").concat(link.getFormat())); - } - if (StringUtil.isEmpty(isp)|| !new File(CommonConfig.IP_PATH.concat(isp.getName().concat(".").concat(isp.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.ISP,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ISP); - }else{ - pathMap.put(KnowledgeConstant.ISP,CommonConfig.IP_PATH.concat(File.separator).concat(isp.getName()).concat(".").concat(isp.getFormat())); - } - if (StringUtil.isEmpty(idcRenter)|| !new File(CommonConfig.IP_PATH.concat(idcRenter.getName().concat(".").concat(idcRenter.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.IDCRENTER,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.IDCRENTER); - }else{ - pathMap.put(KnowledgeConstant.IDCRENTER,CommonConfig.IP_PATH.concat(File.separator).concat(idcRenter.getName()).concat(".").concat(idcRenter.getFormat())); - } - if (StringUtil.isEmpty(internalIp)|| !new File(CommonConfig.IP_PATH.concat(internalIp.getName().concat(".").concat(internalIp.getFormat()))).exists()){ - pathMap.put(KnowledgeConstant.INTERNALIP,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.INTERNALIP); - }else{ - pathMap.put(KnowledgeConstant.INTERNALIP,CommonConfig.IP_PATH.concat(File.separator).concat(internalIp.getName()).concat(".").concat(internalIp.getFormat())); - } - return pathMap; - } - - - //下载文件 - public static void download(KnowledgeLog knowledgeLog){ - String id = knowledgeLog.getId(); - String name = knowledgeLog.getName(); - String sha256 = knowledgeLog.getSha256(); - String format = knowledgeLog.getFormat(); -// String path=null; -// Map map=new HashMap<>(); - InputStream inputStream = null; - FileOutputStream outputStream = null; - - try { - Header header = new BasicHeader("token", CommonConfig.TOKEN); - HttpClientUtils httpClientUtils = new HttpClientUtils(); - inputStream = httpClientUtils.httpGetInputStream(knowledgeLog.getPath(), 3000, header); - File file=new File(CommonConfig.IP_PATH.concat(File.separator).concat(knowledgeLog.getName()).concat(".").concat(knowledgeLog.getFormat())); - outputStream=new FileOutputStream(file); - IoUtil.copy(inputStream, outputStream); - } catch (FileNotFoundException e) { - e.printStackTrace(); - }finally { - IoUtil.close(inputStream); - IoUtil.close(outputStream); - } - updateMap.put(id,sha256); - } - - public static String getStringProperty(String key) { - return commonProperties.getProperty(key); - } - - public static Integer getIntProperty(String key) { - return Integer.parseInt(commonProperties.getProperty(key)); - } - - public static Double getDoubleProperty(String key) { - return Double.parseDouble(commonProperties.getProperty(key)); - } - - public static Long getLongProperty(String key) { - return Long.parseLong(commonProperties.getProperty(key)); - } - - public static Boolean getBooleanProperty(String key) { - return "true".equals(commonProperties.getProperty(key).toLowerCase().trim()); - } - - - - - - - - - -} diff --git a/src/main/java/com/zdjizhi/utils/TestKnowledge.java b/src/main/java/com/zdjizhi/utils/TestKnowledge.java deleted file mode 100644 index 0456be6..0000000 --- a/src/main/java/com/zdjizhi/utils/TestKnowledge.java +++ /dev/null @@ -1,160 +0,0 @@ -package com.zdjizhi.utils; - - -import com.alibaba.nacos.api.PropertyKeyConst; -import com.fasterxml.jackson.databind.JsonNode; -import com.maxmind.db.CHMCache; -import com.zdjizhi.common.CustomFile; -import org.apache.flink.api.common.state.BroadcastState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; -import org.apache.flink.streaming.api.datastream.BroadcastStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.util.Collector; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.Reader; -import java.net.InetAddress; -import java.util.*; - -public class TestKnowledge { - - @Test - public void jobTest() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - //连接nacos配置 - Properties nacosProperties = new Properties(); - //hdfs存储路径 - String STORE_PATH = "/test/"; - nacosProperties.put(PropertyKeyConst.SERVER_ADDR,"192.168.44.12"); - nacosProperties.setProperty(PropertyKeyConst.USERNAME, "nacos"); - nacosProperties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); - //获取知识库的流 - DataStreamSource> customFileDataStreamSource = env.addSource(new HttpSource(nacosProperties, "knowledge_base.json", "DEFAULT_GROUP", 30000,STORE_PATH)); - //将该流设置并行度为1,多了会产生并发拉数据情况 - customFileDataStreamSource.setParallelism(1); - - //用于存储广播数据的state - MapStateDescriptor> descriptor = - new MapStateDescriptor>("descriptorTest", Types.STRING, Types.LIST(TypeInformation.of(CustomFile.class))); - //将该流广播 - BroadcastStream> broadcast = customFileDataStreamSource.broadcast(descriptor); - - //将数据流和广播流connect - BroadcastConnectedStream> connect = env.addSource(new MySource()).setParallelism(1).connect(broadcast); - - connect.process(new BroadcastProcessFunction, Tuple2>() { - Reader reader = null; - - //初始化,从hdfs拉知识库 - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - System.out.println("begin init"); - byte[] fileBytes = HdfsUtils.getFileBytes(STORE_PATH + "ip_v4.mmdb"); - InputStream inputStream = new ByteArrayInputStream(fileBytes); - this.reader = new Reader(inputStream,new CHMCache()); - System.out.println("init over"); - } - - //数据流处理逻辑 - @Override - public void processElement(String value, BroadcastProcessFunction, Tuple2>.ReadOnlyContext ctx, Collector> out) throws Exception { - InetAddress ipAddress = InetAddress.getByName(value); - String result = "ip not find"; - JsonNode jsonNode = reader.get(ipAddress); - if (jsonNode != null) { - result = jsonNode.toString(); - } - out.collect(Tuple2.of(value,result)); - } - - //广播流处理逻辑,即知识库更新逻辑 - @Override - public void processBroadcastElement(List value, BroadcastProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception { - BroadcastState> broadcastState = ctx.getBroadcastState(descriptor); - broadcastState.clear(); - broadcastState.put("test",value); - InputStream inputStream = null; - for (CustomFile customFile : value) { - if ("ip_v4_built_in.mmdb".equals(customFile.getFileName())) { - inputStream = new ByteArrayInputStream(customFile.getContent()); - } - } - this.reader = new Reader(inputStream, new CHMCache()); - } - }).setParallelism(1).addSink(new CollectSink1()); - - env.execute(); - - for (Tuple2 value : CollectSink1.values) { - System.out.println(value); - } - System.out.println(CollectSink1.values); - } - - //自定义sink,用于test - public static class CollectSink implements SinkFunction { - - public static final List values = Collections.synchronizedList(new ArrayList()); - - public void invoke(CustomFile value, Context context) throws Exception { - values.add(value); - } - } - - //自定义sink,用于test - public static class CollectSink1 implements SinkFunction> { - - public static final List> values = Collections.synchronizedList(new ArrayList>()); - - public void invoke(Tuple2 value, Context context) throws Exception { - //System.out.println(value.f0 + " | " + value.f1); - values.add(value); - } - } - - //自定义数据source,用于test - public static class MySource implements SourceFunction { - - private boolean isRunning = true; - - @Override - public void run(SourceContext ctx) throws Exception { - System.out.println("MySource begin"); - int count = 0; - while (isRunning) { - Random r = new Random(); - String randomIp = r.nextInt(255) + "." + r.nextInt(255) + "." + r.nextInt(255) + "." + r.nextInt(255); - ctx.collect(randomIp); - Thread.sleep(1000); - } - } - - @Override - public void cancel() { - isRunning = false; - } - } - - - - - - - - - - -} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 7390f33..259ef0c 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -22,15 +22,15 @@ kafka.input.group.id=2112080949 kafka.output.metric.parallelism=1 #发送kafka metrics topic名 -kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS -#kafka.output.metric.topic.name=test +#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS +kafka.output.metric.topic.name=test #发送kafka event并行度大小 kafka.output.event.parallelism=1 #发送kafka event topic名 -kafka.output.event.topic.name=DOS-EVENT -#kafka.output.event.topic.name=storm-dos-test +#kafka.output.event.topic.name=DOS-EVENT +kafka.output.event.topic.name=abcd #kafka输出地址 kafka.output.bootstrap.servers=192.168.44.12:9094 @@ -38,6 +38,7 @@ kafka.output.bootstrap.servers=192.168.44.12:9094 #zookeeper地址 hbase.zookeeper.quorum=192.168.44.12:2181 +#hbase.zookeeper.quorum=192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181 #hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 #hbase客户端处理时间 @@ -129,10 +130,68 @@ sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ sasl.jaas.config.flag=1 #nacos配置 +#nacos.server.addr=192.168.44.12:8848 +#nacos.namespace=public +#nacos.username=nacos +#nacos.password=nacos +#nacos.data.id=knowledge_base.json +#nacos.group=DEFAULT_GROUP +#nacos.read.timeout=5000 + + + +############################## Nacos 配置 ###################################### nacos.server.addr=192.168.44.12:8848 -nacos.namespace=public nacos.username=nacos nacos.password=nacos +nacos.read.timeout=5000 +############################## Nacos ---知识库配置 ###################################### +nacos.namespace=public nacos.data.id=knowledge_base.json nacos.group=DEFAULT_GROUP -nacos.read.timeout=5000 \ No newline at end of file + + +############################## Nacos ---静态阈值配置 ###################################### +nacos.static.namespace=test +nacos.static.data.id=dos_detection.properties +nacos.static.group=Galaxy + +############################## HTTP 配置 ###################################### +#http请求相关参数 +#最大连接数 +#http.pool.max.connection=400 +# +##单路由最大连接数 +#http.pool.max.per.route=80 +# +##向服务端请求超时时间设置(单位:毫秒) +#http.pool.request.timeout=60000 +# +##向服务端连接超时时间设置(单位:毫秒) +#http.pool.connect.timeout=60000 +# +##服务端响应超时时间设置(单位:毫秒) +#http.pool.response.timeout=60000 + + +#server.uri=http://192.168.44.12:9098 +#server.path=/hos/knowledge_base_hos_bucket + + + + +############################## hos Token 配置 ###################################### +hos.token=c21f969b5f03d33d43e04f8f136e7682 + +############################# 选择集群模式或者单机模式 配置 ###################################### +cluster.or.single=CLUSTER +#cluster.or.single=SINGLE + +############################## 集群模式配置文件路径 配置 ###################################### +hdfs.path=/test/TEST/ +hdfs.uri.nn1=hdfs://192.168.40.151:9000 +hdfs.uri.nn2=hdfs://192.168.40.152:9000 +hdfs.user=dos + +############################## 单机模式配置文件下载路径 配置 ###################################### +download.path=D:\\ttt\\ \ No newline at end of file diff --git a/src/main/resources/core-site.xml b/src/main/resources/core-site.xml new file mode 100644 index 0000000..c103340 --- /dev/null +++ b/src/main/resources/core-site.xml @@ -0,0 +1,58 @@ + + + + + + + + + fs.defaultFS + hdfs://ns1 + + + hadoop.tmp.dir + file:/home/tsg/olap/hadoop/tmp + + + io.file.buffer.size + 131702 + + + hadoop.proxyuser.root.hosts + * + + + hadoop.proxyuser.root.groups + * + + + hadoop.logfile.size + 10000000 + The max size of each log file + + + hadoop.logfile.count + 1 + The max number of log files + + + ha.zookeeper.quorum + 192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181 + + + ipc.client.connect.timeout + 90000 + + diff --git a/src/main/resources/hdfs-site.xml b/src/main/resources/hdfs-site.xml new file mode 100644 index 0000000..e1408d2 --- /dev/null +++ b/src/main/resources/hdfs-site.xml @@ -0,0 +1,142 @@ + + + + + + + + + dfs.namenode.name.dir + file:/home/tsg/olap/hadoop/dfs/name + + + dfs.datanode.data.dir + file:/home/tsg/olap/hadoop/dfs/data + + + dfs.replication + 2 + + + dfs.webhdfs.enabled + true + + + dfs.permissions + false + + + dfs.permissions.enabled + false + + + dfs.nameservices + ns1 + + + dfs.blocksize + 134217728 + + + dfs.ha.namenodes.ns1 + nn1,nn2 + + + + dfs.namenode.rpc-address.ns1.nn1 + 192.168.40.151:9000 + + + + dfs.namenode.http-address.ns1.nn1 + 192.168.40.151:50070 + + + + dfs.namenode.rpc-address.ns1.nn2 + 192.168.40.152:9000 + + + + dfs.namenode.http-address.ns1.nn2 + 192.168.40.152:50070 + + + + dfs.namenode.shared.edits.dir + qjournal://192.168.40.151:8485;192.168.40.152:8485;192.168.40.203:8485/ns1 + + + + dfs.journalnode.edits.dir + /home/tsg/olap/hadoop/journal + + + + dfs.client.failover.proxy.provider.ns1 + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + + dfs.ha.fencing.methods + sshfence + shell(true) + + + + dfs.ha.fencing.ssh.private-key-files + /root/.ssh/id_rsa + + + + dfs.ha.fencing.ssh.connect-timeout + 30000 + + + + dfs.ha.automatic-failover.enabled + true + + + dfs.datanode.max.transfer.threads + 8192 + + + + dfs.namenode.handler.count + 30 + + + + dfs.datanode.handler.count + 40 + + + + dfs.balance.bandwidthPerSec + 104857600 + + + + dfs.datanode.du.reserved + 5368709120 + + + + heartbeat.recheck.interval + 100000 + + + diff --git a/src/main/resources/yarn-site.xml b/src/main/resources/yarn-site.xml new file mode 100644 index 0000000..8a4b2fa --- /dev/null +++ b/src/main/resources/yarn-site.xml @@ -0,0 +1,196 @@ + + + + + yarn.nodemanager.aux-services + mapreduce_shuffle + + + yarn.resourcemanager.ha.enabled + true + + + + yarn.resourcemanager.cluster-id + rmcluster + + + yarn.resourcemanager.ha.rm-ids + rsm1,rsm2 + + + + yarn.resourcemanager.hostname.rsm1 + 192.168.40.152 + + + + yarn.resourcemanager.address.rsm1 + 192.168.40.152:9916 + + + yarn.resourcemanager.scheduler.address.rsm1 + 192.168.40.152:9917 + + + yarn.resourcemanager.webapp.address.rsm1 + 192.168.40.152:9918 + + + yarn.resourcemanager.admin.address.rsm1 + 192.168.40.152:9919 + + + yarn.resourcemanager.resource-tracker.address.rsm1 + 192.168.40.152:9920 + + + yarn.resourcemanager.ha.admin.address.rsm1 + 192.168.40.152:23142 + + + + + yarn.resourcemanager.hostname.rsm2 + 192.168.40.203 + + + + yarn.resourcemanager.address.rsm2 + 192.168.40.203:9916 + + + yarn.resourcemanager.scheduler.address.rsm2 + 192.168.40.203:9917 + + + yarn.resourcemanager.webapp.address.rsm2 + 192.168.40.203:9918 + + + yarn.resourcemanager.admin.address.rsm2 + 192.168.40.203:9919 + + + yarn.resourcemanager.resource-tracker.address.rsm2 + 192.168.40.203:9920 + + + yarn.resourcemanager.ha.admin.address.rsm2 + 192.168.40.203:23142 + + + + yarn.resourcemanager.zk-address + 192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181 + + + + yarn.resourcemanager.recovery.enabled + true + + + + yarn.nodemanager.recovery.enabled + true + + + + yarn.nodemanager.recovery.dir + /home/tsg/olap/hadoop-2.7.1/yarn + + + + yarn.nodemanager.recovery.supervised + true + + + + yarn.nodemanager.address + ${yarn.nodemanager.hostname}:9923 + + + yarn.resourcemanager.store.class + org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore + + + yarn.nodemanager.resource.memory-mb + 30720 + + + yarn.scheduler.minimum-allocation-mb + 1024 + + + + yarn.scheduler.maximum-allocation-mb + 30720 + + + + yarn.log-aggregation-enable + true + + + yarn.nodemanager.heartbeat-interval-ms + 3000 + + + + yarn.log-aggregation.retain-seconds + 604800 + + + yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds + 3600 + + + yarn.nodemanager.remote-app-log-dir + /tmp/logs + + + yarn.nodemanager.resource.cpu-vcores + 14 + + + yarn.scheduler.minimum-allocation-vcores + 1 + + + yarn.scheduler.maximum-allocation-vcores + 14 + + + yarn.nodemanager.vmem-check-enabled + false + + + yarn.nodemanager.pmem-check-enabled + false + + + yarn.nodemanager.disk-health-checker.enable + false + + + + yarn.resourcemanager.am.max-attempts + 10000 + + + yarn.log.server.url + http://bigdata-151:19888/jobhistory/logs + +