diff --git a/pom.xml b/pom.xml index 7ee04ac..c55ca23 100644 --- a/pom.xml +++ b/pom.xml @@ -6,9 +6,14 @@ com.zdjizhi flink-dos-detection - 1.0-SNAPSHOT + 23.11 + + flink-dos-detection + http://www.example.com + + 1.2.1 1.13.1 2.1.1 2.7.1 @@ -152,7 +157,6 @@ org.apache.flink flink-connector-kafka_2.12 ${flink.version} - @@ -160,7 +164,7 @@ org.apache.flink flink-clients_2.12 ${flink.version} - + provided org.apache.hadoop @@ -203,7 +207,7 @@ org.apache.hbase hbase-client 2.2.3 - + provided slf4j-log4j12 @@ -258,26 +262,6 @@ 5.3.3 - - com.zdjizhi - galaxy - 1.1.3 - - - slf4j-log4j12 - org.slf4j - - - log4j-over-slf4j - org.slf4j - - - com.google.guava - guava - - - - com.alibaba.fastjson2 @@ -354,6 +338,12 @@ + + + com.geedgenetworks + galaxy + ${galaxy.tools.version} + diff --git a/src/main/java/com/zdjizhi/common/CustomFile.java b/src/main/java/com/zdjizhi/common/CustomFile.java deleted file mode 100644 index 701024c..0000000 --- a/src/main/java/com/zdjizhi/common/CustomFile.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.zdjizhi.common; - -import java.io.Serializable; - -public class CustomFile implements Serializable { - - String fileName; - - byte[] content; - - public String getFileName() { - return fileName; - } - - public void setFileName(String fileName) { - this.fileName = fileName; - } - - public byte[] getContent() { - return content; - } - - public void setContent(byte[] content) { - this.content = content; - } -} diff --git a/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java index 401673a..dc0825b 100644 --- a/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java +++ b/src/main/java/com/zdjizhi/common/DosDetectionThreshold.java @@ -9,110 +9,110 @@ import java.util.Objects; * @author wlh */ public class DosDetectionThreshold implements Serializable { - private long profileId; - private String attackType; - private ArrayList serverIpList; - private String serverIpAddr; - private long packetsPerSec; - private long bitsPerSec; - private long sessionsPerSec; - private int isValid; - private int vsysId; - private Integer[] superiorIds; + private long profile_id; + private String attack_type; + private ArrayList server_ip_list; + private String server_ip_addr; + private long packets_per_sec; + private long bits_per_sec; + private long sessions_per_sec; + private int is_valid; + private int vsys_id; + private Integer[] superior_ids; - public long getProfileId() { - return profileId; + public long getProfile_id() { + return profile_id; } - public void setProfileId(long profileId) { - this.profileId = profileId; + public void setProfile_id(long profile_id) { + this.profile_id = profile_id; } - public String getAttackType() { - return attackType; + public String getAttack_type() { + return attack_type; } - public void setAttackType(String attackType) { - this.attackType = attackType; + public void setAttack_type(String attack_type) { + this.attack_type = attack_type; } - public ArrayList getServerIpList() { - return serverIpList; + public ArrayList getServer_ip_list() { + return server_ip_list; } - public void setServerIpList(ArrayList serverIpList) { - this.serverIpList = serverIpList; + public void setServer_ip_list(ArrayList server_ip_list) { + this.server_ip_list = server_ip_list; } - public String getServerIpAddr() { - return serverIpAddr; + public String getServer_ip_addr() { + return server_ip_addr; } - public void setServerIpAddr(String serverIpAddr) { - this.serverIpAddr = serverIpAddr; + public void setServer_ip_addr(String server_ip_addr) { + this.server_ip_addr = server_ip_addr; } - public long getPacketsPerSec() { - return packetsPerSec; + public long getPackets_per_sec() { + return packets_per_sec; } - public void setPacketsPerSec(long packetsPerSec) { - this.packetsPerSec = packetsPerSec; + public void setPackets_per_sec(long packets_per_sec) { + this.packets_per_sec = packets_per_sec; } - public long getBitsPerSec() { - return bitsPerSec; + public long getBits_per_sec() { + return bits_per_sec; } - public void setBitsPerSec(long bitsPerSec) { - this.bitsPerSec = bitsPerSec; + public void setBits_per_sec(long bits_per_sec) { + this.bits_per_sec = bits_per_sec; } - public long getSessionsPerSec() { - return sessionsPerSec; + public long getSessions_per_sec() { + return sessions_per_sec; } - public void setSessionsPerSec(long sessionsPerSec) { - this.sessionsPerSec = sessionsPerSec; + public void setSessions_per_sec(long sessions_per_sec) { + this.sessions_per_sec = sessions_per_sec; } - public int getIsValid() { - return isValid; + public int getIs_valid() { + return is_valid; } - public void setIsValid(int isValid) { - this.isValid = isValid; + public void setIs_valid(int is_valid) { + this.is_valid = is_valid; } - public int getVsysId() { - return vsysId; + public int getVsys_id() { + return vsys_id; } - public void setVsysId(int vsysId) { - this.vsysId = vsysId; + public void setVsys_id(int vsys_id) { + this.vsys_id = vsys_id; } - public Integer[] getSuperiorIds() { - return superiorIds; + public Integer[] getSuperior_ids() { + return superior_ids; } - public void setSuperiorIds(Integer[] superiorIds) { - this.superiorIds = superiorIds; + public void setSuperior_ids(Integer[] superior_ids) { + this.superior_ids = superior_ids; } @Override public String toString() { return "DosDetectionThreshold{" + - "profileId='" + profileId + '\'' + - ", attackType='" + attackType + '\'' + - ", serverIpList=" + serverIpList + - ", serverIpAddr='" + serverIpAddr + '\'' + - ", packetsPerSec=" + packetsPerSec + - ", bitsPerSec=" + bitsPerSec + - ", sessionsPerSec=" + sessionsPerSec + - ", isValid=" + isValid + - ", vsysId=" + vsysId + - ", superiorIds=" + Arrays.toString(superiorIds) + + "profile_id=" + profile_id + + ", attack_type='" + attack_type + '\'' + + ", server_ip_list=" + server_ip_list + + ", server_ip_addr='" + server_ip_addr + '\'' + + ", packets_per_sec=" + packets_per_sec + + ", bits_per_sec=" + bits_per_sec + + ", sessions_per_sec=" + sessions_per_sec + + ", is_valid=" + is_valid + + ", vsys_id=" + vsys_id + + ", superior_ids=" + Arrays.toString(superior_ids) + '}'; } } diff --git a/src/main/java/com/zdjizhi/common/DosVsysId.java b/src/main/java/com/zdjizhi/common/DosVsysId.java index 0369f69..b5465f2 100644 --- a/src/main/java/com/zdjizhi/common/DosVsysId.java +++ b/src/main/java/com/zdjizhi/common/DosVsysId.java @@ -4,7 +4,7 @@ import java.util.Arrays; public class DosVsysId { private Integer id; - private Integer[] superiorIds; + private Integer[] superior_ids; public Integer getId() { return id; @@ -14,19 +14,19 @@ public class DosVsysId { this.id = id; } - public Integer[] getSuperiorIds() { - return superiorIds; + public Integer[] getSuperior_ids() { + return superior_ids; } - public void setSuperiorIds(Integer[] superiorIds) { - this.superiorIds = superiorIds; + public void setSuperior_ids(Integer[] superior_ids) { + this.superior_ids = superior_ids; } @Override public String toString() { return "DosVsysId{" + "id=" + id + - ", superiorIds=" + Arrays.toString(superiorIds) + + ", superior_ids=" + Arrays.toString(superior_ids) + '}'; } } diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java similarity index 72% rename from src/main/java/com/zdjizhi/common/CommonConfig.java rename to src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 9bb50c6..a93496f 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -7,12 +7,11 @@ import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; * @author wlh * @date 2021/1/6 */ -public class CommonConfig { +public class FlowWriteConfig { /** * 定位库默认分隔符 */ - public static final String LOCATION_SEPARATOR = "."; private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); @@ -51,7 +50,6 @@ public class CommonConfig { public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num"); public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num"); - public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path"); public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri"); public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path"); @@ -74,37 +72,20 @@ public class CommonConfig { public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag"); - public static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr"); + public static final String NACOS_SERVER = CommonConfigurations.getStringProperty("nacos.server.addr"); public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username"); - public static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password"); - public static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace"); - public static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id"); - public static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group"); - public static final int NACOS_READ_TIMEOUT = CommonConfigurations.getIntProperty("nacos.read.timeout"); + public static final String NACOS_PIN = CommonConfigurations.getStringProperty("nacos.password"); + public static final String NACOS_PUBLIC_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace"); + public static final String NACOS_KNOWLEDGEBASE_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id"); + public static final String NACOS_PUBLIC_GROUP = CommonConfigurations.getStringProperty("nacos.group"); + public static final Integer NACOS_CONNECTION_TIMEOUT = CommonConfigurations.getIntProperty("nacos.connection.timeout"); - public static final String HOS_TOKEN = CommonConfigurations.getStringProperty("hos.token"); - public static final String CLUSTER_OR_SINGLE = CommonConfigurations.getStringProperty("cluster.or.single"); + public static final String NACOS_DOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.dos.namespace"); + public static final String NACOS_DOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.dos.data.id"); + public static final String NACOS_DOS_GROUP = CommonConfigurations.getStringProperty("nacos.dos.group"); - public static final String HDFS_URI_NS1 = CommonConfigurations.getStringProperty("hdfs.uri.nn1"); - public static final String HDFS_URI_NS2 = CommonConfigurations.getStringProperty("hdfs.uri.nn2"); - public static final String HDFS_PATH = CommonConfigurations.getStringProperty("hdfs.path"); - public static final String HDFS_USER = CommonConfigurations.getStringProperty("hdfs.user"); + public static final Integer HTTP_SOCKET_TIMEOUT = CommonConfigurations.getIntProperty("http.socket.timeout"); - public static final String DOWNLOAD_PATH = CommonConfigurations.getStringProperty("download.path"); - - public static void main(String[] args) { - StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - // 配置加密解密的密码/salt值 - encryptor.setPassword("galaxy"); - // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p -// String password = "galaxy2019"; - String password = "nacos"; - String encPwd = encryptor.encrypt(password); - System.out.println(encPwd); - // 再进行解密:raw_password - String rawPwd = encryptor.decrypt(encPwd); - System.out.println(rawPwd); - } } diff --git a/src/main/java/com/zdjizhi/common/KnowledgeLog.java b/src/main/java/com/zdjizhi/common/pojo/KnowlegeBaseMeta.java similarity index 52% rename from src/main/java/com/zdjizhi/common/KnowledgeLog.java rename to src/main/java/com/zdjizhi/common/pojo/KnowlegeBaseMeta.java index d72f7df..1701367 100644 --- a/src/main/java/com/zdjizhi/common/KnowledgeLog.java +++ b/src/main/java/com/zdjizhi/common/pojo/KnowlegeBaseMeta.java @@ -1,14 +1,24 @@ -package com.zdjizhi.common; +package com.zdjizhi.common.pojo; -public class KnowledgeLog { - public String id; - public String name; - public String path; - public Long size; - public String format; - public String sha256; - public String version; - public String updateTime; +import java.io.Serializable; + +/** + * + */ +public class KnowlegeBaseMeta implements Serializable { + private String id; + private String name; + private String sha256; + private String format; + private String path; + + public KnowlegeBaseMeta(String id, String name, String sha256, String format, String path) { + this.id = id; + this.name = name; + this.sha256 = sha256; + this.format = format; + this.path = path; + } public String getId() { return id; @@ -26,20 +36,12 @@ public class KnowledgeLog { this.name = name; } - public String getPath() { - return path; + public String getSha256() { + return sha256; } - public void setPath(String path) { - this.path = path; - } - - public Long getSize() { - return size; - } - - public void setSize(Long size) { - this.size = size; + public void setSha256(String sha256) { + this.sha256 = sha256; } public String getFormat() { @@ -50,42 +52,23 @@ public class KnowledgeLog { this.format = format; } - public String getSha256() { - return sha256; + public String getPath() { + return path; } - public void setSha256(String sha256) { - this.sha256 = sha256; + public void setPath(String path) { + this.path = path; } - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public String getUpdateTime() { - return updateTime; - } - - public void setUpdateTime(String updateTime) { - this.updateTime = updateTime; - } - - @Override public String toString() { - return "KnowledgeLog{" + + return "KnowlegeBaseMeta{" + "id='" + id + '\'' + ", name='" + name + '\'' + - ", path='" + path + '\'' + - ", size=" + size + - ", format='" + format + '\'' + ", sha256='" + sha256 + '\'' + - ", version='" + version + '\'' + - ", updateTime='" + updateTime + '\'' + + ", format='" + format + '\'' + + ", path='" + path + '\'' + '}'; } } + diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index c8d68d1..87cb76a 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -1,17 +1,19 @@ package com.zdjizhi.etl; -import cn.hutool.core.math.MathUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.geedgenetworks.utils.DateUtils; +import com.geedgenetworks.utils.StringUtil; import com.zdjizhi.common.*; import com.zdjizhi.utils.*; +import com.zdjizhi.utils.connections.nacos.NacosUtils; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; -import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import java.math.BigDecimal; @@ -24,12 +26,12 @@ import java.util.concurrent.TimeUnit; /** * @author wlh */ -public class DosDetection extends BroadcastProcessFunction, DosEventLog> { +public class DosDetection extends ProcessFunction { private static final Log logger = LogFactory.get(); private static Map> baselineMap = new HashMap<>(); private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); - private HashMap>> thresholdRangeMap; + private HashMap>> thresholdRangeMap; private final static int BASELINE_SIZE = 144; private final static int STATIC_CONDITION_TYPE = 1; @@ -42,22 +44,20 @@ public class DosDetection extends BroadcastProcessFunction thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0, - CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); + FlowWriteConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0, - CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); + FlowWriteConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); } catch (Exception e) { logger.error("定时器任务执行失败", e); } @@ -65,7 +65,7 @@ public class DosDetection extends BroadcastProcessFunction out) { + public void processElement(DosSketchLog value, Context ctx, Collector out) throws Exception { DosEventLog finalResult = null; try { String destinationIp = value.getDestination_ip(); @@ -75,13 +75,13 @@ public class DosDetection extends BroadcastProcessFunction value, Context ctx, Collector out) throws Exception { - if (!value.isEmpty()){ - IpUtils.updateIpLook(value); - } - } private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { long sketchSessions = value.getSketch_sessions(); Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold"); long diff = sketchSessions - staticSensitivityThreshold; - return getDosEventLog(value, staticSensitivityThreshold, diff,0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); + return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); } - private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) { + private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String key) { String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType); Integer base = getBaseValue(dosBaselineThreshold, value); long diff = sketchSessions - base; - return getDosEventLog(value, base, diff, 0,BASELINE_CONDITION_TYPE, SESSIONS_TAG); + return getDosEventLog(value, base, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG); } private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException { - long sessionBase = threshold.getSessionsPerSec(); - long pktBase=threshold.getPacketsPerSec(); - long bitBase=threshold.getBitsPerSec(); + long sessionBase = threshold.getSessions_per_sec(); + long pktBase = threshold.getPackets_per_sec(); + long bitBase = threshold.getBits_per_sec(); long diffSession = value.getSketch_sessions() - sessionBase; long diffPkt = value.getSketch_packets() - pktBase; long diffByte = value.getSketch_bytes() - bitBase; -// Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100; -// Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100; -// Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100; - - double diffSessionPercent=0.0; - double diffPktPercent=0.0; - double diffBitPercent=0.0; - if (sessionBase != 0 && sessionBase > 0){ - diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100; + double diffSessionPercent = 0.0; + double diffPktPercent = 0.0; + double diffBitPercent = 0.0; + //todo 代码Review发现该部分存在bug,23.11版本做修复,需测试。 + if (sessionBase > 0) { + diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100; } - else if (pktBase != 0 && pktBase > 0){ - diffPktPercent = getDiffPercent(diffPkt, pktBase)*100; + if (pktBase > 0) { + diffPktPercent = getDiffPercent(diffPkt, pktBase) * 100; } - else if (bitBase != 0 && bitBase > 0){ - diffBitPercent = getDiffPercent(diffByte, bitBase)*100; + if (bitBase > 0) { + diffBitPercent = getDiffPercent(diffByte, bitBase) * 100; } long profileId = 0; - DosEventLog result =null; + DosEventLog result = null; - if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){ - profileId = threshold.getProfileId(); - result= getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG); - }else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){ - profileId = threshold.getProfileId(); - result = getDosEventLog(value, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG); - }else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){ - profileId = threshold.getProfileId(); + if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) { + profileId = threshold.getProfile_id(); + result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG); + } else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) { + profileId = threshold.getProfile_id(); + result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG); + } else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) { + profileId = threshold.getProfile_id(); result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG); } return result; @@ -175,15 +166,15 @@ public class DosDetection extends BroadcastProcessFunction " + base + " " + - tag + "/s" + "(>"+condition+"%)"; + tag + "/s" + "(>" + condition + "%)"; case BASELINE_CONDITION_TYPE: return tag + " > " + percent + " of baseline"; @@ -276,8 +267,8 @@ public class DosDetection extends BroadcastProcessFunction countrySet = new HashSet<>(); for (String ip : ipArr) { - String country = IpUtils.ipLookup.countryLookup(ip); - if (StringUtil.isNotBlank(country)){ + String country = IpLookupUtils.getCountryLookup(ip); + if (StringUtil.isNotBlank(country)) { countrySet.add(country); } } @@ -304,19 +295,13 @@ public class DosDetection extends BroadcastProcessFunction endTime ? newSketchLog.getSketch_start_time() : endTime; duration = endTime - startTime == 0 ? 5 : endTime - startTime; }else { - if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){ + if (sourceIpSet.size() < FlowWriteConfig.SOURCE_IP_LIST_LIMIT){ sourceIpSet.add(sourceIp); } } } String sourceIpList = StringUtils.join(sourceIpSet, ","); - return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME, - bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration); + return Tuple6.of(sessions/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,packets/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME, + bytes*8/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration); }catch (Exception e){ logger.error("聚合中间结果集失败 {}",e); } diff --git a/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java b/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java index dc37bdf..8d5b9ca 100644 --- a/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java @@ -2,9 +2,9 @@ package com.zdjizhi.etl; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.CommonConfig; +import com.geedgenetworks.utils.DateUtils; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosBaselineThreshold; -import com.zdjizhi.utils.DateUtils; import com.zdjizhi.utils.HbaseUtils; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -17,7 +17,7 @@ import java.util.*; public class ParseBaselineThreshold { -// private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class); + private static final Log logger = LogFactory.get(); private static ArrayList floodTypeList = new ArrayList<>(); @@ -34,23 +34,23 @@ public class ParseBaselineThreshold { private static void prepareHbaseEnv() throws IOException { org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); + config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM); config.set("hbase.client.retries.number", "3"); config.set("hbase.bulkload.retries.number", "3"); config.set("zookeeper.recovery.retry", "3"); config.set("hbase.defaults.for.version", "2.2.3"); config.set("hbase.defaults.for.version.skip", "true"); - config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); - config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT); + config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); + TableName tableName = TableName.valueOf(FlowWriteConfig.HBASE_BASELINE_TABLE_NAME); Connection conn = ConnectionFactory.createConnection(config); table = conn.getTable(tableName); long currentTimeMillis = System.currentTimeMillis(); scan = new Scan() .setAllowPartialResults(true) - .setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis) - .setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM); + .setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(FlowWriteConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis) + .setLimit(FlowWriteConfig.HBASE_BASELINE_TOTAL_NUM); logger.info("连接hbase成功,正在读取baseline数据"); } @@ -84,29 +84,4 @@ public class ParseBaselineThreshold { } return baselineMap; } - - public static void main(String[] args) { - long currentTimeMillis = System.currentTimeMillis(); - long p200D = DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime(); - System.out.println(p200D); - System.out.println(currentTimeMillis); - System.out.println(currentTimeMillis - p200D); - - - Map> baselineMap = readFromHbase(); - Set keySet = baselineMap.keySet(); - for (String key : keySet) { - Map stringTuple2Map = baselineMap.get(key); - Set strings = stringTuple2Map.keySet(); - for (String s:strings){ - DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s); - System.out.println(key+"---"+s+"---"+dosBaselineThreshold); - } - } - System.out.println(baselineMap.size()); - } - - - - } diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index 6731beb..5f31300 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -1,13 +1,11 @@ package com.zdjizhi.etl; import com.alibaba.fastjson2.JSONObject; -import com.fasterxml.jackson.databind.JavaType; -import com.zdjizhi.common.CommonConfig; +import com.geedgenetworks.utils.StringUtil; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; -//import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -24,22 +22,19 @@ import java.util.*; public class ParseSketchLog { private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); -// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); -// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); -// private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); - public static SingleOutputStreamOperator getSketchSource(){ + public static SingleOutputStreamOperator getSketchSource() { return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); } - private static SingleOutputStreamOperator flatSketchSource(){ + private static SingleOutputStreamOperator flatSketchSource() { return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog()); } - private static WatermarkStrategy createWatermarkStrategy(){ + private static WatermarkStrategy createWatermarkStrategy() { return WatermarkStrategy - .forBoundedOutOfOrderness(Duration.ofSeconds(CommonConfig.FLINK_WATERMARK_MAX_ORDERNESS)) + .forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); } @@ -47,17 +42,15 @@ public class ParseSketchLog { @Override public void flatMap(String s, Collector collector) { try { - if (StringUtil.isNotBlank(s)){ + if (StringUtil.isNotBlank(s)) { HashMap sketchSource = JSONObject.parseObject(s, HashMap.class); -// HashMap sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list")); ArrayList> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class); -// ArrayList> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); - for (HashMap obj : reportIpList) { + for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); @@ -74,19 +67,12 @@ public class ParseSketchLog { dosSketchLog.setSketch_packets(sketchPackets); dosSketchLog.setSketch_bytes(sketchBytes); collector.collect(dosSketchLog); - logger.debug("数据解析成功:{}",dosSketchLog.toString()); + logger.debug("数据解析成功:{}", dosSketchLog.toString()); } } } catch (Exception e) { - logger.error("数据解析错误:{} \n{}",s,e); + logger.error("数据解析错误:{} \n{}", s, e); } } } - - - public static void main(String[] args) throws Exception { - flatSketchSource().print(); - FlinkEnvironmentUtils.streamExeEnv.execute(); - } - } diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 5fd4fa3..1edf2b4 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -5,16 +5,18 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; //import com.fasterxml.jackson.databind.JavaType; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosDetectionThreshold; import com.zdjizhi.common.DosVsysId; import com.zdjizhi.utils.HttpClientUtils; //import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.NacosUtils; + +import com.zdjizhi.utils.connections.nacos.NacosUtils; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; import org.apache.flink.shaded.guava18.com.google.common.collect.Range; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; + import org.apache.http.client.utils.URIBuilder; import org.apache.http.message.BasicHeader; @@ -28,15 +30,9 @@ import java.util.Map; * @author wlh */ public class ParseStaticThreshold { -// private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class); private static final Log logger = LogFactory.get(); private static String encryptpwd; -// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); -// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); -// private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class); -// private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class); - static { //加载加密登录密码 encryptpwd = getEncryptpwd(); @@ -48,20 +44,18 @@ public class ParseStaticThreshold { private static String getEncryptpwd() { String psw = HttpClientUtils.ERROR_MESSAGE; try { - URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); parms.put("password", "admin"); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms); + HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms); String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build()); if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class); -// HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); boolean success = (boolean) resposeMap.get("success"); String msg = resposeMap.get("msg").toString(); if (success) { HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class); -// HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); psw = data.get("encryptpwd").toString(); } else { logger.error(msg); @@ -75,40 +69,6 @@ public class ParseStaticThreshold { return psw; } - /** - * 登录bifang服务,获取token - * - * @return token - */ - private static String loginBifangServer() { - String token = HttpClientUtils.ERROR_MESSAGE; - try { - if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) { - URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); - HashMap parms = new HashMap<>(); - parms.put("username", "admin"); - parms.put("password", encryptpwd); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms); - String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { - HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class); -// HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); - boolean success = (boolean) resposeMap.get("success"); - String msg = resposeMap.get("msg").toString(); - if (success) { - HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class); -// HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); - token = data.get("token").toString(); - } else { - logger.error(msg); - } - } - } - } catch (Exception e) { - logger.error("登录失败,未获取到token ", e); - } - return token; - } /** * 获取vsysId配置列表 @@ -118,12 +78,12 @@ public class ParseStaticThreshold { private static ArrayList getVsysId() { ArrayList vsysIdList = null; try { - URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); - parms.put("pageSize", -1); + parms.put("page_size", -1); // parms.put("orderBy", "vsysId desc"); parms.put("type", 1); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms); + HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms); String token = NacosUtils.getStringProperty("bifang.server.token"); if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); @@ -131,21 +91,15 @@ public class ParseStaticThreshold { String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class); -// HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); boolean success = (boolean) resposeMap.get("success"); String msg = resposeMap.get("msg").toString(); if (success) { HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class); -// HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); - Object list = data.get("list"); + Object list = data.get("list"); if (list != null) { - String s = JSONObject.toJSONString(list); List dosVsysIds = JSON.parseArray(JSONObject.toJSONString(list), DosVsysId.class); -// vsysIdList= JSONObject.parseObject(JSONObject.toJSONString(list), DosVsysId.class); vsysIdList= (ArrayList)dosVsysIds; - -// vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType); - logger.info("获取到vsysId {}条", vsysIdList.size()); + logger.info("获取到vsysId {}条", vsysIdList.size()); } else { logger.warn("vsysIdList为空"); } @@ -172,35 +126,37 @@ public class ParseStaticThreshold { if (vsysIds != null) { for (DosVsysId dosVsysId : vsysIds) { Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId(); - Integer[] superiorIds = dosVsysId.getSuperiorIds(); - URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + Integer[] superiorIds = dosVsysId.getSuperior_ids(); + URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); - parms.put("pageSize", -1); - parms.put("orderBy", "profileId asc"); - parms.put("isValid", 1); - parms.put("vsysId", vsysId); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); + parms.put("page_size", -1); + parms.put("order_by", "profileId asc"); + parms.put("is_valid", 1); + parms.put("vsys_id", vsysId); + HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); String token = NacosUtils.getStringProperty("bifang.server.token"); if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { -// HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + HashMap resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class); + + boolean success = (boolean) resposeMap.get("success"); String msg = resposeMap.get("msg").toString(); if (success) { -// HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); - HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class); + HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class); Object list = data.get("list"); + if (list != null) { -// ArrayList thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); -// ArrayList thresholds = JSONObject.parseObject(JSONObject.toJSONString(list), ArrayList.class); - List dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class); + System.out.println(list); + List dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class); + System.out.println(dosDetectionThresholds); ArrayList thresholds = (ArrayList)dosDetectionThresholds; for (DosDetectionThreshold dosDetectionThreshold : thresholds) { - dosDetectionThreshold.setSuperiorIds(superiorIds); + dosDetectionThreshold.setSuperior_ids(superiorIds); vsysThresholds.add(dosDetectionThreshold); } logger.info("获取到vsys id是{}静态阈值配置{}条", vsysId, thresholds.size()); @@ -217,7 +173,6 @@ public class ParseStaticThreshold { } catch (Exception e) { logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); } - return vsysThresholds; } @@ -230,14 +185,19 @@ public class ParseStaticThreshold { HashMap>> thresholdRangeMap = new HashMap<>(4); try { ArrayList dosDetectionThreshold = getDosDetectionThreshold(); + if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { for (DosDetectionThreshold threshold : dosDetectionThreshold) { - String attackType = threshold.getAttackType(); - int vsysId = threshold.getVsysId(); + + String attackType = threshold.getAttack_type(); + int vsysId = threshold.getVsys_id(); HashMap> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>()); TreeRangeMap treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create()); - ArrayList serverIpList = threshold.getServerIpList(); + + + ArrayList serverIpList = threshold.getServer_ip_list(); + for (String sip : serverIpList) { IPAddressString ipAddressString = new IPAddressString(sip); if (ipAddressString.isIPAddress()) { @@ -279,33 +239,38 @@ public class ParseStaticThreshold { return thresholdRangeMap; } - public static void main(String[] args) { - /* - ArrayList dosDetectionThreshold = getDosDetectionThreshold(); -// dosDetectionThreshold.forEach(System.out::println); - getVsysId().forEach(System.out::println); - System.out.println("------------------------"); - */ - HashMap>> staticThreshold = createStaticThreshold(); - System.out.println("------------------------"); - for (Integer integer : staticThreshold.keySet()) { - HashMap> stringTreeRangeMapHashMap = staticThreshold.get(integer); - for (String type : stringTreeRangeMapHashMap.keySet()) { - Map, DosDetectionThreshold> asMapOfRanges = stringTreeRangeMapHashMap.get(type).asMapOfRanges(); - for (Range range : asMapOfRanges.keySet()) { - DosDetectionThreshold threshold = asMapOfRanges.get(range); - System.out.println(integer + "---" + type + "---" + range + "---" + threshold); + /** + * 登录bifang服务,获取token + * + * @return token + */ + private static String loginBifangServer() { + String token = HttpClientUtils.ERROR_MESSAGE; + try { + if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) { + URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); + HashMap parms = new HashMap<>(); + parms.put("username", "admin"); + parms.put("password", encryptpwd); + HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_LOGIN_PATH, parms); + String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class); + token = data.get("token").toString(); + } else { + logger.error(msg); + } } - System.out.println("------------------------"); } - + } catch (Exception e) { + logger.error("登录失败,未获取到token ", e); } -// String s = loginBifangServer(); -// System.out.println(s); - + return token; } - - } diff --git a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java index 8206cff..a4e5bdd 100644 --- a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java +++ b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java @@ -2,7 +2,7 @@ package com.zdjizhi.etl; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosMetricsLog; import com.zdjizhi.common.DosSketchLog; @@ -26,16 +26,11 @@ class TrafficServerIpMetrics { } private static long timeFloor(long sketchStartTime){ - return sketchStartTime / CommonConfig.FLINK_WINDOW_MAX_TIME * CommonConfig.FLINK_WINDOW_MAX_TIME; + return sketchStartTime / FlowWriteConfig.FLINK_WINDOW_MAX_TIME * FlowWriteConfig.FLINK_WINDOW_MAX_TIME; } private static int getPartitionNumByIp(String destinationIp){ - return Math.abs(destinationIp.hashCode()) % CommonConfig.DESTINATION_IP_PARTITION_NUM; - } - - public static void main(String[] args) { - System.out.println(getPartitionNumByIp("146.177.223.43")); - System.out.println("146.177.223.43".hashCode()); + return Math.abs(destinationIp.hashCode()) % FlowWriteConfig.DESTINATION_IP_PARTITION_NUM; } } diff --git a/src/main/java/com/zdjizhi/sink/DosEventSink.java b/src/main/java/com/zdjizhi/sink/DosEventSink.java index 18694ba..87795e6 100644 --- a/src/main/java/com/zdjizhi/sink/DosEventSink.java +++ b/src/main/java/com/zdjizhi/sink/DosEventSink.java @@ -1,7 +1,7 @@ package com.zdjizhi.sink; import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosEventLog; //import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.KafkaUtils; @@ -16,8 +16,8 @@ class DosEventSink { .filter(Objects::nonNull) // .map(JsonMapper::toJsonString) .map(JSONObject::toJSONString) - .addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME)) - .setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM); + .addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME)) + .setParallelism(FlowWriteConfig.KAFKA_OUTPUT_EVENT_PARALLELISM); } } diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index f6ebeb0..35db54f 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -3,18 +3,14 @@ package com.zdjizhi.sink; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.nacos.api.PropertyKeyConst; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosMetricsLog; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.etl.DosDetection; import com.zdjizhi.etl.EtlProcessFunction; import com.zdjizhi.etl.ParseSketchLog; -import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; @@ -22,7 +18,6 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; -import java.util.Map; import java.util.Properties; /** @@ -39,44 +34,24 @@ public class OutputStreamSink { SingleOutputStreamOperator middleStream = getMiddleStream(); DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream)); TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream); - FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); + FlinkEnvironmentUtils.streamExeEnv.execute(FlowWriteConfig.STREAM_EXECUTION_JOB_NAME); } catch (Exception e) { logger.error("任务启动失败 {}",e); } } private static SingleOutputStreamOperator getEventSinkStream(SingleOutputStreamOperator middleStream){ - DataStreamSource> broadcastSource=null; - Properties nacosProperties = new Properties(); - - nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR); - nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); - nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); - nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE); - - if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){ - broadcastSource = DosSketchSource.broadcastSource(nacosProperties); - }else { - broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties); - } - - MapStateDescriptor descriptor = - new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class)); - - BroadcastStream> broadcast = broadcastSource.broadcast(descriptor); - return middleStream - .connect(broadcast) - .process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); + .process(new DosDetection()).setParallelism(FlowWriteConfig.FLINK_DETECTION_MAP_PARALLELISM); } private static SingleOutputStreamOperator getMiddleStream(){ return ParseSketchLog.getSketchSource() .keyBy(new KeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) + .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.FLINK_WINDOW_MAX_TIME))) .process(new EtlProcessFunction()) - .setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM); + .setParallelism(FlowWriteConfig.FLINK_FIRST_AGG_PARALLELISM); } private static class KeysSelector implements KeySelector>{ diff --git a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java index 7c5faa4..0025544 100644 --- a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java +++ b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java @@ -1,10 +1,10 @@ package com.zdjizhi.sink; import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosMetricsLog; import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.utils.JsonMapper; + import com.zdjizhi.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -16,10 +16,8 @@ class TrafficServerIpMetricsSink { static void sideOutputMetricsSink(SingleOutputStreamOperator outputStream){ DataStream sideOutput = outputStream.getSideOutput(outputTag); // sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) - sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) - .setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM); - - + sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) + .setParallelism(FlowWriteConfig.KAFKA_OUTPUT_METRIC_PARALLELISM); } diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java index e1e73b3..238e350 100644 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java @@ -1,15 +1,14 @@ package com.zdjizhi.source; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.CustomFile; + + +import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.FlinkEnvironmentUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import java.util.List; -import java.util.Map; import java.util.Properties; /** @@ -21,26 +20,18 @@ public class DosSketchSource { public static DataStreamSource createDosSketchSource(){ Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); - properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); - if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ + properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); + properties.setProperty("group.id", FlowWriteConfig.KAFKA_GROUP_ID); + if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){ properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); } return streamExeEnv.addSource(new FlinkKafkaConsumer( - CommonConfig.KAFKA_INPUT_TOPIC_NAME, + FlowWriteConfig.KAFKA_INPUT_TOPIC_NAME, new SimpleStringSchema(), properties)) - .setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM); + .setParallelism(FlowWriteConfig.KAFKA_INPUT_PARALLELISM); } - - public static DataStreamSource> broadcastSource(Properties nacosProperties){ - return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT)); - } - - public static DataStreamSource>singleBroadcastSource(Properties nacosProperties){ - return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT)); - } } diff --git a/src/main/java/com/zdjizhi/source/HttpSource.java b/src/main/java/com/zdjizhi/source/HttpSource.java deleted file mode 100644 index 2e465a7..0000000 --- a/src/main/java/com/zdjizhi/source/HttpSource.java +++ /dev/null @@ -1,260 +0,0 @@ -package com.zdjizhi.source; - -import cn.hutool.core.io.FileUtil; -import cn.hutool.core.io.IoUtil; -import cn.hutool.core.io.file.FileReader; -import cn.hutool.crypto.digest.DigestUtil; -import cn.hutool.json.JSONObject; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.fasterxml.jackson.databind.JavaType; -import com.google.common.base.Joiner; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.CustomFile; -import com.zdjizhi.common.KnowledgeLog; -import com.zdjizhi.utils.*; -import org.apache.commons.io.IOUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.Executor; - - -public class HttpSource extends RichHttpSourceFunction> { - private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); - - private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; - - private static Map knowledgeMetaCache = new HashMap<>(); - - private static HashMap knowledgeUpdateCache; - - private static final int TRY_TIMES = 3; - - private static HttpClientUtils2 httpClientUtils; - - //连接nacos的配置 - private Properties nacosProperties; - - //nacos data id - private String NACOS_DATA_ID; - - //nacos group - private String NACOS_GROUP; - - //nacos 连接超时时间 - private long NACOS_READ_TIMEOUT; - - private ConfigService configService; - - private static Header header; - - //运行状态cancel时置为false - private boolean isRunning = true; - //是否下发,默认不发送 - private boolean isSending = false; - - - public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) { - this.nacosProperties = nacosProperties; - this.NACOS_DATA_ID = NACOS_DATA_ID; - this.NACOS_GROUP = NACOS_GROUP; - this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - httpClientUtils = new HttpClientUtils2(); - //初始化元数据缓存 - knowledgeMetaCache = new HashMap<>(16); - //初始化定位库缓存 - knowledgeUpdateCache = new HashMap<>(16); - - header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - //连接nacos配置 - try { - configService = NacosFactory.createConfigService(nacosProperties); - }catch (NacosException e){ - logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage()); - } - - //初始化知识库 - initKnowledge(); - logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); - - - } - @Override - public void run(SourceContext ctx) throws Exception { - if (!knowledgeUpdateCache.isEmpty()){ - ctx.collect(knowledgeUpdateCache); - knowledgeUpdateCache.clear(); - } -// } - - - configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - try { - logger.info("receive update config:" + configMsg); - if (StringUtil.isNotBlank(configMsg)) { - ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); - if (metaList.size() > 0) { - for (Object metadata : metaList) { - JSONObject knowledgeJson = new JSONObject(metadata, false, true); - String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), - knowledgeJson.getStr("format")); - String sha256 = knowledgeJson.getStr("sha256"); - String filePath = knowledgeJson.getStr("path"); - if (!sha256.equals(knowledgeMetaCache.get(fileName))) { - knowledgeMetaCache.put(fileName, sha256); - updateKnowledge(fileName, filePath,sha256); - } - } - if (!knowledgeUpdateCache.isEmpty()){ - ctx.collect(knowledgeUpdateCache); - knowledgeUpdateCache.clear(); - } - } - } - - } catch (Exception e) { - logger.error("监听nacos配置失败", e); - } - } - }); - - while (isRunning) { - try { - Thread.sleep(10000); - }catch (InterruptedException e) { - e.printStackTrace(); - } - } - - } - - private void initKnowledge(){ - String configMsg = ""; - try { - configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); - } catch (NacosException e) { - logger.error("从Nacos获取知识库元数据配置文件异常,异常信息为:{}", e.getMessage()); - } - - if (StringUtil.isNotBlank(configMsg)){ - ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); - if (metaList.size() > 0) { - for (Object metadata : metaList) { - JSONObject knowledgeJson = new JSONObject(metadata, false, true); - String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), - knowledgeJson.getStr("format")); - String sha256 = knowledgeJson.getStr("sha256"); - String filePath = knowledgeJson.getStr("path"); - byte[] localFileByte = getLocalFile(fileName); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)){ - logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256); - knowledgeMetaCache.put(fileName, sha256); - }else { - logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256); - updateKnowledge(fileName,filePath,sha256); - } - - } - } - } - } - - private void updateKnowledge(String fileName, String filePath,String sha256) { - InputStream inputStream = null; - int retryNum = 0; - try { - while (retryNum < TRY_TIMES){ - inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header); - if (inputStream !=null){ - byte[] downloadBytes = IOUtils.toByteArray(inputStream); - String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); - if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){ - logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); - boolean updateStatus = updateLocalFile(fileName, downloadBytes); - if (updateStatus){ - knowledgeMetaCache.put(fileName,sha256); - knowledgeUpdateCache.put(fileName, sha256); - retryNum = TRY_TIMES; - }else { - retryNum++; - //避免频繁请求HOS - Thread.sleep(10000); - } - }else { - logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum); - retryNum++; - //避免频繁请求HOS - Thread.sleep(10000); - } - } - } - } catch (IOException ioException) { - ioException.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(inputStream); - } - } - - - private boolean updateLocalFile(String fileName,byte[] downloadBytes) { - FileOutputStream outputStream = null; - boolean updateStatus = false; - try { - HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes); - updateStatus=true; - } catch (IOException ioe) { - logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); - ioe.printStackTrace(); - } catch (RuntimeException e) { - logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage()); - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(outputStream); - } - return updateStatus; - } - - private static byte[] getLocalFile(String name) { - byte[] fileBytes = null; - try { - fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ; - } catch (RuntimeException | IOException e) { - logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage()); - e.printStackTrace(); - } - return fileBytes; - } - - @Override - public void cancel() { - this.isRunning = false; - } - - -} diff --git a/src/main/java/com/zdjizhi/source/HttpSourceFunction.java b/src/main/java/com/zdjizhi/source/HttpSourceFunction.java deleted file mode 100644 index 8fd58a9..0000000 --- a/src/main/java/com/zdjizhi/source/HttpSourceFunction.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.zdjizhi.source; - -import org.apache.flink.streaming.api.functions.source.SourceFunction; - -public interface HttpSourceFunction extends SourceFunction { -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java b/src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java deleted file mode 100644 index 582aa13..0000000 --- a/src/main/java/com/zdjizhi/source/RichHttpSourceFunction.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.zdjizhi.source; - -import org.apache.flink.api.common.functions.AbstractRichFunction; - -public abstract class RichHttpSourceFunction extends AbstractRichFunction implements HttpSourceFunction { - private static final long serialVersionUID = 1L; - - public RichHttpSourceFunction() { - } -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/source/SingleHttpSource.java b/src/main/java/com/zdjizhi/source/SingleHttpSource.java deleted file mode 100644 index 4e4ce87..0000000 --- a/src/main/java/com/zdjizhi/source/SingleHttpSource.java +++ /dev/null @@ -1,270 +0,0 @@ -package com.zdjizhi.source; - -import cn.hutool.core.io.FileUtil; -import cn.hutool.core.io.IoUtil; -import cn.hutool.core.io.file.FileReader; -import cn.hutool.crypto.digest.DigestUtil; -import cn.hutool.json.JSONObject; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.fasterxml.jackson.databind.JavaType; -import com.google.common.base.Joiner; -import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.CustomFile; -import com.zdjizhi.common.KnowledgeLog; -import com.zdjizhi.utils.*; -import org.apache.commons.io.IOUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.Executor; - -public class SingleHttpSource extends RichHttpSourceFunction> { - - private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class); - - private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']"; - - - private static Map knowledgeMetaCache = new HashMap<>(); - - private static HashMap knowledgeUpdateCache; - - private static final int TRY_TIMES = 3; - - private Properties nacosProperties; - - private String NACOS_DATA_ID; - - private String NACOS_GROUP; - - private long NACOS_READ_TIMEOUT; - - private static HttpClientUtils2 httpClientUtils ; - - private ConfigService configService; - - private static Header header; - - private boolean isRunning = true; - - - - public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) { - this.nacosProperties = nacosProperties; - this.NACOS_DATA_ID = NACOS_DATA_ID; - this.NACOS_GROUP = NACOS_GROUP; - this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; - } - - - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - httpClientUtils = new HttpClientUtils2(); - //初始化元数据缓存 - knowledgeMetaCache = new HashMap<>(16); - //初始化定位库缓存 - knowledgeUpdateCache = new HashMap<>(16); - - header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - - //连接nacos配置 - try { - configService = NacosFactory.createConfigService(nacosProperties); - }catch (NacosException e){ - logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage()); - } - - //初始化知识库 - initKnowledge(); - logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); - } - - @Override - public void run(SourceContext ctx) throws Exception { - if (!knowledgeUpdateCache.isEmpty()){ - ctx.collect(knowledgeUpdateCache); - knowledgeUpdateCache.clear(); - } - - configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String configMsg) { - try { - logger.info("receive update config:" + configMsg); - if (StringUtil.isNotBlank(configMsg)) { - ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); - if (metaList.size() > 0) { - for (Object metadata : metaList) { - JSONObject knowledgeJson = new JSONObject(metadata, false, true); - String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), - knowledgeJson.getStr("format")); - String sha256 = knowledgeJson.getStr("sha256"); - String filePath = knowledgeJson.getStr("path"); - if (!sha256.equals(knowledgeMetaCache.get(fileName))) { - knowledgeMetaCache.put(fileName, sha256); - updateKnowledge(fileName, filePath,sha256); - } - } - if (!knowledgeUpdateCache.isEmpty()){ - ctx.collect(knowledgeUpdateCache); - knowledgeUpdateCache.clear(); - } - } - } - - } catch (Exception e) { - logger.error("监听nacos配置失败", e); - } - System.out.println(configMsg); - } - }); - - while (isRunning) { - try { - Thread.sleep(10000); - }catch (InterruptedException e){ - e.printStackTrace(); - } - - } - - - - } - - - - private void initKnowledge(){ - String configMsg = ""; - try { - configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); - } catch (NacosException e) { - logger.error("从Nacos获取知识库元数据配置文件异常,异常信息为:{}", e.getMessage()); - } - - if (StringUtil.isNotBlank(configMsg)){ - ArrayList metaList = JsonPath.parse(configMsg).read(EXPR); - if (metaList.size() > 0) { - for (Object metadata : metaList) { - JSONObject knowledgeJson = new JSONObject(metadata, false, true); - String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), - knowledgeJson.getStr("format")); - String sha256 = knowledgeJson.getStr("sha256"); - String filePath = knowledgeJson.getStr("path"); - byte[] localFileByte = getLocalFile(fileName); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)){ - logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256); - knowledgeMetaCache.put(fileName, sha256); - }else { - logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256); - updateKnowledge(fileName,filePath,sha256); - } - - } - } - } - } - - - - private void updateKnowledge(String fileName, String filePath,String sha256) { - InputStream inputStream = null; - int retryNum = 0; - try { - while (retryNum < TRY_TIMES){ - inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); - if (inputStream !=null){ - byte[] downloadBytes = IOUtils.toByteArray(inputStream); - String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes); - if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){ - logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256); - boolean updateStatus = updateLocalFile(fileName, downloadBytes); - if (updateStatus){ - knowledgeMetaCache.put(fileName,sha256); - knowledgeUpdateCache.put(fileName, sha256); - retryNum = TRY_TIMES; - }else { - retryNum++; - //避免频繁请求HOS - Thread.sleep(10000); - } -// isSending = true; - }else { - logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum); - retryNum++; - //避免频繁请求HOS - Thread.sleep(10000); - } - } - } - - } catch (IOException ioException) { - ioException.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(inputStream); - } - } - - - - private boolean updateLocalFile(String fileName,byte[] downloadBytes) { - FileOutputStream outputStream = null; - boolean updateStatus = false; - try { - FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH); - File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName)); - outputStream = new FileOutputStream(file); - IoUtil.copy(new ByteArrayInputStream(downloadBytes), outputStream); - updateStatus=true; - } catch (IOException ioe) { - logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage()); - ioe.printStackTrace(); - } catch (RuntimeException e) { - logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage()); - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(outputStream); - } - return updateStatus; - } - - private static byte[] getLocalFile(String name) { - byte[] fileBytes = null; - try { - fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes(); - } catch (RuntimeException e) { - logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage()); - e.printStackTrace(); - } - return fileBytes; - } - - - @Override - public void cancel() { - this.isRunning = false; - } -} - diff --git a/src/main/java/com/zdjizhi/utils/CollectionUtils.java b/src/main/java/com/zdjizhi/utils/CollectionUtils.java deleted file mode 100644 index 69d4592..0000000 --- a/src/main/java/com/zdjizhi/utils/CollectionUtils.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.utils; - -import java.util.Collection; -import java.util.HashSet; - -/** - * @author wlh - * 扩展集合处理工具 - */ -public class CollectionUtils { - - public static Collection takeUniqueLimit(Collection collection, int limit){ - int i =0; - Collection newSet = new HashSet<>(); - for (T t:collection){ - if (i < limit){ - newSet.add(t); - i += 1; - } - } - return newSet; - } -} diff --git a/src/main/java/com/zdjizhi/utils/FileByteUtils.java b/src/main/java/com/zdjizhi/utils/FileByteUtils.java deleted file mode 100644 index bb1f5aa..0000000 --- a/src/main/java/com/zdjizhi/utils/FileByteUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.zdjizhi.utils; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; - -public class FileByteUtils { - - public static byte[] getFileBytes (String filePath) throws IOException { - File file = new File(filePath); - FileInputStream fis = new FileInputStream(file); - ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); - byte[] b = new byte[1024]; - int n; - while ((n = fis.read(b)) != -1) { - bos.write(b, 0, n); - } - fis.close(); - byte[] data = bos.toByteArray(); - bos.close(); - return data; - } -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java index cd628c5..f3d6d11 100644 --- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java +++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java @@ -1,8 +1,6 @@ package com.zdjizhi.utils; -import com.zdjizhi.common.CommonConfig; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.CheckpointConfig; +import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -13,7 +11,7 @@ public class FlinkEnvironmentUtils { public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); static { - streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + streamExeEnv.setParallelism(FlowWriteConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); /* // 每 1000ms 开始一次 checkpoint diff --git a/src/main/java/com/zdjizhi/utils/HdfsUtils.java b/src/main/java/com/zdjizhi/utils/HdfsUtils.java deleted file mode 100644 index 4cbc199..0000000 --- a/src/main/java/com/zdjizhi/utils/HdfsUtils.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.zdjizhi.utils; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.CommonConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -public class HdfsUtils { - - private static final Log logger = LogFactory.get(); - private static FileSystem fileSystem; - - static { - Configuration configuration = new Configuration(); - try { - //配置hdfs相关信息 - configuration.set("fs.defaultFS","hdfs://ns1"); - configuration.set("hadoop.proxyuser.root.hosts","*"); - configuration.set("hadoop.proxyuser.root.groups","*"); - configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); - configuration.set("dfs.nameservices","ns1"); - configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); - configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1); - configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2); - configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - //指定用户 - System.setProperty("HADOOP_USER_NAME", CommonConfig.HDFS_USER); - //创建fileSystem,用于连接hdfs - fileSystem = FileSystem.get(configuration); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static boolean isExists(String filePath) throws IOException { - return fileSystem.exists(new Path(filePath)); - } - - public static byte[] getFileBytes(String filePath) throws IOException { - try (FSDataInputStream open = fileSystem.open(new Path(filePath))) { - byte[] bytes = new byte[open.available()]; - open.read(0, bytes, 0, open.available()); - return bytes; - } catch (IOException e) { - logger.error("An I/O exception when files are download from HDFS. Message is :" + e.getMessage()); - } - return null; - } - - public static void uploadFileByBytes(String filePath,byte[] bytes) throws IOException { - try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) { - fsDataOutputStream.write(bytes); -// fsDataOutputStream.flush(); - } catch (RuntimeException e) { - logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage()); - } catch (IOException e) { - logger.error("An I/O exception when files are uploaded to HDFS. Message is :" + e.getMessage()); - } - } - - public static void rename(String src, String dst) throws IOException { - fileSystem.rename(new Path(src),new Path(dst)); - } - - -} diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java index d4bf174..b69e38b 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java @@ -2,12 +2,16 @@ package com.zdjizhi.utils; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.CommonConfig; +import com.geedgenetworks.utils.StringUtil; + +import com.zdjizhi.common.FlowWriteConfig; import org.apache.http.*; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.*; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; import org.apache.http.conn.ConnectTimeoutException; @@ -48,9 +52,9 @@ public class HttpClientUtils { static { // 设置最大连接数 - CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + CONN_MANAGER.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION); // 设置每个连接的路由数 - CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + CONN_MANAGER.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE); } @@ -62,11 +66,11 @@ public class HttpClientUtils { // 创建Http请求配置参数 RequestConfig requestConfig = RequestConfig.custom() // 获取连接超时时间 - .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT) + .setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT) // 请求超时时间 - .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT) + .setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT) // 响应超时时间 - .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .setSocketTimeout(FlowWriteConfig.HTTP_POOL_RESPONSE_TIMEOUT) .build(); /* diff --git a/src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java b/src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java deleted file mode 100644 index 1141400..0000000 --- a/src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.zdjizhi.utils; -import lombok.Data; -/** - * @author fy - * @version 1.0 - * @date 2022/10/19 18:27 - */ - - -@Data -public class IpLocationConfiguration { - - private String ipV4UserDefined; - - private String ipV4BuiltIn; - - private String ipV6UserDefined; - - private String ipV6BuiltIn; - -} diff --git a/src/main/java/com/zdjizhi/utils/IpLookupUtils.java b/src/main/java/com/zdjizhi/utils/IpLookupUtils.java new file mode 100644 index 0000000..8d84bb9 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/IpLookupUtils.java @@ -0,0 +1,160 @@ +package com.zdjizhi.utils; + +import cn.hutool.crypto.digest.DigestUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.*; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.geedgenetworks.utils.IpLookupV2; +import com.geedgenetworks.utils.StringUtil; +import com.google.common.base.Joiner; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.common.pojo.KnowlegeBaseMeta; +import com.zdjizhi.utils.connections.http.HttpClientService; +import com.zdjizhi.utils.connections.nacos.NacosConnection; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.concurrent.Executor; + +/** + * @author wangchengcheng + * @version 2023/11/10 15:23 + */ +public class IpLookupUtils { + private static final Log logger = LogFactory.get(); + private static final String ipBuiltInName = "ip_builtin.mmdb"; + private static final String ipUserDefinedName = "ip_user_defined.mmdb"; + + /** + * ip定位库 + */ + private static IpLookupV2 ipLookup; + + /** + * 定位库默认分隔符 + */ + private static final String LOCATION_SEPARATOR = "."; + + /** + * 最大重试次数 + */ + private static final int TRY_TIMES = 5; + + /** + * http connections + */ + private static final HttpClientService httpClientService; + + /** + * 定位库元数据缓存 + */ + private static final HashMap knowledgeMetaCache = new HashMap<>(16); + + static { + JSONPath jsonPath = JSONPath.of(getFilterParameter()); + httpClientService = new HttpClientService(); + + NacosConnection nacosConnection = new NacosConnection(); + ConfigService schemaService = nacosConnection.getPublicService(); + try { + String configInfo = schemaService.getConfigAndSignListener(FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID, FlowWriteConfig.NACOS_PUBLIC_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configInfo) { + if (StringUtil.isNotBlank(configInfo)) { + updateIpLookup(jsonPath, configInfo); + } + } + }); + + if (StringUtil.isNotBlank(configInfo)) { + updateIpLookup(jsonPath, configInfo); + } + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } + + private static void updateIpLookup(JSONPath jsonPath, String configInfo) { + String extract = jsonPath.extract(JSONReader.of(configInfo)).toString(); + if (StringUtil.isNotBlank(extract)) { + JSONArray jsonArray = JSON.parseArray(extract); + if (jsonArray.size() > 0) { + for (int i = 0; i < jsonArray.size(); i++) { + KnowlegeBaseMeta knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class); + String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(knowlegeBaseMeta.getName(), knowlegeBaseMeta.getFormat()); + knowledgeMetaCache.put(fileName, knowlegeBaseMeta); + } + reloadIpLookup(); + } + } + } + + /** + * 从HDFS下载文件更新IpLookup + */ + private static void reloadIpLookup() { + IpLookupV2.Builder builder = new IpLookupV2.Builder(false); + for (String fileName : knowledgeMetaCache.keySet()) { + int retryNum = 0; + KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName); + String metaSha256 = knowlegeBaseMeta.getSha256(); + while (retryNum < TRY_TIMES) { + System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath()); + Long startTime = System.currentTimeMillis(); + byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT); + if (httpGetByte != null && httpGetByte.length > 0) { + String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte); + if (metaSha256.equals(downloadFileSha256)) { + ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte); + switch (fileName) { + case ipBuiltInName: + builder.loadDataFile(inputStream); + break; + case ipUserDefinedName: + builder.loadDataFilePrivate(inputStream); + break; + default: + } + System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms"); + retryNum = TRY_TIMES; + } else { + logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum); + retryNum++; + } + } else { + logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum); + retryNum++; + } + } + } + ipLookup = builder.build(); + } + + /** + * 根据配置组合生成知识库元数据过滤参数 + * + * @return 过滤参数 + */ + private static String getFilterParameter() { +// String expr = "$.[?(@.version=='latest' && @.name in ['ip_built_in','ip_user_defined'])].['name','sha256','format','path']"; + + + String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined'))]"; + + return expr; + } + + public static String getCountryLookup(String ip) { + return ipLookup.countryLookup(ip); + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/IpUtils.java b/src/main/java/com/zdjizhi/utils/IpUtils.java deleted file mode 100644 index 4e02664..0000000 --- a/src/main/java/com/zdjizhi/utils/IpUtils.java +++ /dev/null @@ -1,224 +0,0 @@ -package com.zdjizhi.utils; - -import cn.hutool.core.io.file.FileReader; -import cn.hutool.crypto.digest.DigestUtil; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.CustomFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.Map; - -public class IpUtils { - public static IpLookupV2 ipLookup ; - - private static Logger LOG = LoggerFactory.getLogger(IpUtils.class); - - - /** - * IP定位库工具类 - */ -// public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) -// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb") -// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb") -// .loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb") -// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb") -// .build(); - - - public static void loadIpLook(){ - try { - IpLookupV2.Builder builder = new IpLookupV2.Builder(false); - if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) { - byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb"); - if (ipv4BuiltBytes!=null){ - InputStream ipv4BuiltInputStream = new ByteArrayInputStream(ipv4BuiltBytes); - builder.loadDataFileV4(ipv4BuiltInputStream); - } - - byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb"); - if (ipv6BuiltBytes!=null){ - InputStream ipv6BuiltInputStream = new ByteArrayInputStream(ipv6BuiltBytes); - builder.loadDataFileV6(ipv6BuiltInputStream); - } - - byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb"); - if (ipv4UserBytes!=null){ - InputStream ipv4UserInputStream = new ByteArrayInputStream(ipv4UserBytes); - builder.loadDataFilePrivateV4(ipv4UserInputStream); - } - - byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb"); - if (ipv6UserBytes!=null){ - InputStream ipv6UserInputStream = new ByteArrayInputStream(ipv6UserBytes); - builder.loadDataFilePrivateV6(ipv6UserInputStream); - } - }else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){ - byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb"); - if (ipv4BuiltBytes!=null){ - InputStream ipv4BuiltInputStream = new ByteArrayInputStream(ipv4BuiltBytes); - builder.loadDataFileV4(ipv4BuiltInputStream); - } - - byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb"); - if (ipv6BuiltBytes!=null){ - InputStream ipv6BuiltInputStream = new ByteArrayInputStream(ipv6BuiltBytes); - builder.loadDataFileV6(ipv6BuiltInputStream); - } - - byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb"); - if (ipv4UserBytes!=null){ - InputStream ipv4UserInputStream = new ByteArrayInputStream(ipv4UserBytes); - builder.loadDataFilePrivateV4(ipv4UserInputStream); - } - - byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb"); - if (ipv6UserBytes!=null){ - InputStream ipv6UserInputStream = new ByteArrayInputStream(ipv6UserBytes); - builder.loadDataFilePrivateV6(ipv6UserInputStream); - } - } - ipLookup = builder.build(); - - }catch (Exception e){ - LOG.error("加载失败",e); - } - } - - public static void updateIpLook(Map knowledgeFileCache){ - try{ - IpLookupV2.Builder builder = new IpLookupV2.Builder(false); - if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) { - byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb"); - if (ipv4BuiltBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){ - String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb"); - byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)){ - builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes)); - } - } - } - - byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb"); - if (ipv6BuiltBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) { - String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb"); - byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)) { - builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes)); - } - } - } - - byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb"); - if (ipv4UserBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) { - String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb"); - byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)) { - builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes)); - } - } - } - - byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb"); - if (ipv6UserBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) { - String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb"); - byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)) { - builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes)); - } - } - } - }else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){ - byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb"); - if (ipv4BuiltBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){ - String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb"); - byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)){ - builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes)); - } - } - } - - byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb"); - if (ipv6BuiltBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) { - String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb"); - byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)) { - builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes)); - } - } - } - - byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb"); - if (ipv4UserBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) { - String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb"); - byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)) { - builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes)); - } - } - } - - byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb"); - if (ipv6UserBytes!=null){ - if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) { - String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb"); - byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb"); - String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte); - if (sha256.equals(localFileSha256Hex)) { - builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes)); - } - } - } - } - ipLookup = builder.build(); - }catch (Exception e){ - LOG.error("加载失败",e); - } - - } - - - private static byte[] getLocalFile(String name) { - byte[] fileBytes = null; - try { - fileBytes = "CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE) ? - HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) : - new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes(); - } catch (RuntimeException | IOException e) { - e.printStackTrace(); - } - return fileBytes; - } - - public static void main(String[] args) { - System.out.println(ipLookup.countryLookup("49.7.115.37")); - - String ips = "182.168.50.23,182.168.50.45,182.168.56.9,182.168.56.8,92.168.50.58,19.168.56.7,12.168.56.6,2.168.50.40,1.168.50.19,9.168.50.6,2.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10"; - for (String ip:ips.split(",")){ - System.out.println(ip+"--"+ipLookup.countryLookup(ip)); - } - - } - - - -} diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java index b0312a5..4298c40 100644 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java @@ -1,6 +1,6 @@ package com.zdjizhi.utils; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; @@ -11,11 +11,11 @@ public class KafkaUtils { private static Properties getKafkaSinkProperty(){ Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); - if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ + properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); + if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){ properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); } return properties; diff --git a/src/main/java/com/zdjizhi/utils/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/SnowflakeId.java index 8b26f43..0af5582 100644 --- a/src/main/java/com/zdjizhi/utils/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/utils/SnowflakeId.java @@ -2,7 +2,7 @@ package com.zdjizhi.utils; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.FlowWriteConfig; public class SnowflakeId { // private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class); @@ -99,7 +99,7 @@ public class SnowflakeId { private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); static { - idWorker = new SnowflakeId(CommonConfig.HBASE_ZOOKEEPER_QUORUM, CommonConfig.DATA_CENTER_ID_NUM); + idWorker = new SnowflakeId(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, FlowWriteConfig.DATA_CENTER_ID_NUM); } //==============================Constructors===================================== @@ -108,7 +108,7 @@ public class SnowflakeId { * 构造函数 */ private SnowflakeId(String zookeeperIp, long dataCenterIdNum) { - DistributedLock lock = new DistributedLock(CommonConfig.HBASE_ZOOKEEPER_QUORUM, "disLocks1"); + DistributedLock lock = new DistributedLock(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, "disLocks1"); try { lock.lock(); int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java b/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java similarity index 71% rename from src/main/java/com/zdjizhi/utils/HttpClientUtils2.java rename to src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java index 1136e6d..280c0a2 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java +++ b/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java @@ -1,6 +1,11 @@ -package com.zdjizhi.utils; +package com.zdjizhi.utils.connections.http; -import com.zdjizhi.common.CommonConfig; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; + +import com.geedgenetworks.utils.StringUtil; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.exception.FlowWriteException; import org.apache.commons.io.IOUtils; import org.apache.http.*; import org.apache.http.client.ClientProtocolException; @@ -8,63 +13,35 @@ import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.client.utils.URIBuilder; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ConnectionKeepAliveStrategy; -import org.apache.http.conn.HttpHostConnectException; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeaderElementIterator; import org.apache.http.protocol.HTTP; import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.net.ssl.*; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; -import java.net.URI; +import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; -import java.util.Map; -import static org.apache.kafka.common.requests.FetchMetadata.log; +public class HttpClientService { -/** - * http client工具类 - */ -public class HttpClientUtils2 { - /** 全局连接池对象 */ - private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); - - private static Logger logger = LoggerFactory.getLogger(HttpClientUtils2.class); - public static final String ERROR_MESSAGE = "-1"; - - /* - * 静态代码块配置连接池信息 - */ - static { - - // 设置最大连接数 - CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); - // 设置每个连接的路由数 - CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); - - } + private static final Log log = LogFactory.get(); /** * 在调用SSL之前需要重写验证方法,取消检测SSL @@ -80,9 +57,11 @@ public class HttpClientUtils2 { public X509Certificate[] getAcceptedIssuers() { return null; } + @Override public void checkClientTrusted(X509Certificate[] xcs, String str) { } + @Override public void checkServerTrusted(X509Certificate[] xcs, String str) { } @@ -96,31 +75,33 @@ public class HttpClientUtils2 { // 创建ConnectionManager,添加Connection配置信息 PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); // 设置最大连接数 - connManager.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + connManager.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION); // 设置每个连接的路由数 - connManager.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + connManager.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE); return connManager; } catch (KeyManagementException | NoSuchAlgorithmException e) { - throw new RuntimeException(e.getMessage()); + throw new FlowWriteException(e.getMessage()); } } /** * 获取Http客户端连接对象 + * + * @param socketTimeOut 响应超时时间 * @return Http客户端连接对象 */ - private CloseableHttpClient getHttpClient() { + private CloseableHttpClient getHttpClient(int socketTimeOut) { // 创建Http请求配置参数 RequestConfig requestConfig = RequestConfig.custom() // 获取连接超时时间 - .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT) + .setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT) // 请求超时时间 - .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT) + .setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT) // 响应超时时间 - .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .setSocketTimeout(socketTimeOut) .build(); - /* + /** * 测出超时重试机制为了防止超时不生效而设置 * 如果直接放回false,不重试 * 这里会根据情况进行判断是否重试 @@ -135,25 +116,32 @@ public class HttpClientUtils2 { if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 return false; } + if (exception instanceof SocketTimeoutException) { + if (exception.getMessage().contains("Read timed out")) { + return false; + } + } if (exception instanceof UnknownHostException) {// 目标服务器不可达 return false; } if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 return false; } - if (exception instanceof HttpHostConnectException) {// 连接被拒绝 - return false; - } if (exception instanceof SSLException) {// ssl握手异常 return false; } if (exception instanceof InterruptedIOException) {// 超时 return true; } + + HttpClientContext clientContext = HttpClientContext.adapt(context); HttpRequest request = clientContext.getRequest(); // 如果请求是幂等的,就再次尝试 - return !(request instanceof HttpEntityEnclosingRequest); + if (!(request instanceof HttpEntityEnclosingRequest)) { + return true; + } + return false; }; @@ -164,7 +152,7 @@ public class HttpClientUtils2 { HeaderElement he = it.nextElement(); String param = he.getName(); String value = he.getValue(); - if (value != null && "timeout".equalsIgnoreCase(param)) { + if (value != null && param.equalsIgnoreCase("timeout")) { return Long.parseLong(value) * 1000; } } @@ -183,11 +171,10 @@ public class HttpClientUtils2 { .build(); } - // TODO: 2022/10/19 加载知识库 public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) { InputStream result = null; // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient();// TODO: 2022/10/19 去掉了 socketTimeout + CloseableHttpClient httpClient = getHttpClient(socketTimeout); // 创建GET请求对象 HttpGet httpGet = new HttpGet(url); if (StringUtil.isNotEmpty(headers)) { @@ -228,7 +215,47 @@ public class HttpClientUtils2 { } + public byte[] httpGetByte(String url, int socketTimeout, Header... headers) { + byte[] result = null; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(socketTimeout); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(url); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + } + } + CloseableHttpResponse response = null; + try { + // 执行请求 + response = httpClient.execute(httpGet); + // 获取响应实体 + result = IOUtils.toByteArray(response.getEntity().getContent()); + // 获取响应信息 + EntityUtils.consume(response.getEntity()); + } catch (ClientProtocolException e) { + log.error("current file: {},Protocol error:{}", url, e.getMessage()); + } catch (ParseException e) { + log.error("current file: {}, Parser error:{}", url, e.getMessage()); + + } catch (IOException e) { + log.error("current file: {},IO error:{}", url, e.getMessage()); + + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + log.error("Release Connection error:{}", e.getMessage()); + + } + } + return result; + } + } } diff --git a/src/main/java/com/zdjizhi/utils/connections/nacos/NacosConnection.java b/src/main/java/com/zdjizhi/utils/connections/nacos/NacosConnection.java new file mode 100644 index 0000000..8de0ae0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/connections/nacos/NacosConnection.java @@ -0,0 +1,54 @@ +package com.zdjizhi.utils.connections.nacos; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; + + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.tools.connections.nacos + * @Description: + * @date 2023/7/2714:49 + */ +public class NacosConnection { + private static final Log logger = LogFactory.get(); + + private ConfigService configService; + + + public ConfigService getDosService() { + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + properties.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_DOS_NAMESPACE); + properties.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + properties.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + try { + configService = NacosFactory.createConfigService(properties); + } catch (NacosException e) { + logger.error("NacosException:{}", e); + } + return configService; + } + + + public ConfigService getPublicService() { + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + properties.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_PUBLIC_NAMESPACE); + properties.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + properties.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + try { + configService = NacosFactory.createConfigService(properties); + } catch (NacosException e) { + logger.error("NacosException:{}", e); + } + return configService; + } +} diff --git a/src/main/java/com/zdjizhi/utils/NacosUtils.java b/src/main/java/com/zdjizhi/utils/connections/nacos/NacosUtils.java similarity index 50% rename from src/main/java/com/zdjizhi/utils/NacosUtils.java rename to src/main/java/com/zdjizhi/utils/connections/nacos/NacosUtils.java index 25e6809..d60dd0f 100644 --- a/src/main/java/com/zdjizhi/utils/NacosUtils.java +++ b/src/main/java/com/zdjizhi/utils/connections/nacos/NacosUtils.java @@ -1,11 +1,10 @@ -package com.zdjizhi.utils; +package com.zdjizhi.utils.connections.nacos; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; +import com.zdjizhi.common.FlowWriteConfig; import java.io.IOException; import java.io.StringReader; @@ -13,40 +12,18 @@ import java.util.Properties; import java.util.concurrent.Executor; public class NacosUtils { -// private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class); private static final Log logger = LogFactory.get(); - private static Properties nacosProperties = new Properties(); private static Properties commonProperties = new Properties(); - - private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr"); - private static final String NACOS_STATIC_NAMESPACE = CommonConfigurations.getStringProperty("nacos.static.namespace"); - private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username"); - private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password"); - private static final String NACOS_STATIC_DATA_ID = CommonConfigurations.getStringProperty("nacos.static.data.id"); - private static final String NACOS_STATIC_GROUP = CommonConfigurations.getStringProperty("nacos.static.group"); - private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout"); - static { - createConfigService(); - } - - private static void getProperties() { - nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR); - nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_STATIC_NAMESPACE); - nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME); - nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD); - } - - private static void createConfigService() { + NacosConnection nacosConnection = new NacosConnection(); + ConfigService dosService = nacosConnection.getDosService(); try { - getProperties(); - ConfigService configService = NacosFactory.createConfigService(nacosProperties); - String config = configService.getConfig(NACOS_STATIC_DATA_ID, NACOS_STATIC_GROUP, NACOS_READ_TIMEOUT); + String config = dosService.getConfig(FlowWriteConfig.NACOS_DOS_DATA_ID, FlowWriteConfig.NACOS_DOS_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT); + commonProperties.load(new StringReader(config)); - - configService.addListener(NACOS_STATIC_DATA_ID, NACOS_STATIC_GROUP, new Listener() { + dosService.addListener(FlowWriteConfig.NACOS_DOS_DATA_ID, FlowWriteConfig.NACOS_DOS_GROUP, new Listener() { @Override public Executor getExecutor() { return null; diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java new file mode 100644 index 0000000..44fdef8 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java @@ -0,0 +1,13 @@ +package com.zdjizhi.utils.exception; + + +public class FlowWriteException extends RuntimeException { + + public FlowWriteException() { + } + + public FlowWriteException(String message) { + super(message); + } + +} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 083dbba..b4b44ec 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -74,10 +74,6 @@ destination.ip.partition.num=10000 data.center.id.num=15 -#IP mmdb库路径 -ip.mmdb.path=D:\\data\\dat\\bak\\ -#ip.mmdb.path=/home/bigdata/topology/dat/ -#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #bifang服务访问地址 bifang.server.uri=http://192.168.44.72:80 @@ -90,10 +86,10 @@ bifang.server.encryptpwd.path=/v1/user/encryptpwd bifang.server.login.path=/v1/user/login #获取vaysId路径信息 -bifang.server.policy.vaysid.path=/v1/system/vsys/ +bifang.server.policy.vaysid.path=/v1/admin/vsys #获取静态阈值路径信息 -bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold +bifang.server.policy.threshold.path=/v1/policy/profile/dos/detection/threshold #http请求相关参数 #最大连接数 @@ -129,30 +125,15 @@ sasl.jaas.config.flag=1 nacos.server.addr=192.168.44.12:8848 nacos.username=nacos nacos.password=nacos -nacos.read.timeout=5000 ############################## Nacos ---知识库配置 ###################################### nacos.namespace=public nacos.data.id=knowledge_base.json nacos.group=DEFAULT_GROUP - +nacos.connection.timeout=60000 ############################## Nacos ---静态阈值配置 ###################################### -nacos.static.namespace=test -nacos.static.data.id=dos_detection.properties -nacos.static.group=Galaxy +nacos.dos.namespace=test +nacos.dos.data.id=dos_detection.properties +nacos.dos.group=Galaxy -############################## hos Token 配置 ###################################### -hos.token=c21f969b5f03d33d43e04f8f136e7682 - -############################# 选择集群模式或者单机模式 配置 ###################################### -cluster.or.single=CLUSTER -#cluster.or.single=SINGLE - -############################## 集群模式配置文件路径 配置 ###################################### -hdfs.path=/test/TEST/ -hdfs.uri.nn1=192.168.40.151:9000 -hdfs.uri.nn2=192.168.40.152:9000 -hdfs.user=dos - -############################## 单机模式配置文件下载路径 配置 ###################################### -download.path=D:\\ttt\\ \ No newline at end of file +http.socket.timeout=90000 \ No newline at end of file diff --git a/src/main/resources/core-site.xml b/src/main/resources/core-site.xml deleted file mode 100644 index c103340..0000000 --- a/src/main/resources/core-site.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - - - - - - fs.defaultFS - hdfs://ns1 - - - hadoop.tmp.dir - file:/home/tsg/olap/hadoop/tmp - - - io.file.buffer.size - 131702 - - - hadoop.proxyuser.root.hosts - * - - - hadoop.proxyuser.root.groups - * - - - hadoop.logfile.size - 10000000 - The max size of each log file - - - hadoop.logfile.count - 1 - The max number of log files - - - ha.zookeeper.quorum - 192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181 - - - ipc.client.connect.timeout - 90000 - - diff --git a/src/main/resources/hdfs-site.xml b/src/main/resources/hdfs-site.xml deleted file mode 100644 index e1408d2..0000000 --- a/src/main/resources/hdfs-site.xml +++ /dev/null @@ -1,142 +0,0 @@ - - - - - - - - - dfs.namenode.name.dir - file:/home/tsg/olap/hadoop/dfs/name - - - dfs.datanode.data.dir - file:/home/tsg/olap/hadoop/dfs/data - - - dfs.replication - 2 - - - dfs.webhdfs.enabled - true - - - dfs.permissions - false - - - dfs.permissions.enabled - false - - - dfs.nameservices - ns1 - - - dfs.blocksize - 134217728 - - - dfs.ha.namenodes.ns1 - nn1,nn2 - - - - dfs.namenode.rpc-address.ns1.nn1 - 192.168.40.151:9000 - - - - dfs.namenode.http-address.ns1.nn1 - 192.168.40.151:50070 - - - - dfs.namenode.rpc-address.ns1.nn2 - 192.168.40.152:9000 - - - - dfs.namenode.http-address.ns1.nn2 - 192.168.40.152:50070 - - - - dfs.namenode.shared.edits.dir - qjournal://192.168.40.151:8485;192.168.40.152:8485;192.168.40.203:8485/ns1 - - - - dfs.journalnode.edits.dir - /home/tsg/olap/hadoop/journal - - - - dfs.client.failover.proxy.provider.ns1 - org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider - - - - dfs.ha.fencing.methods - sshfence - shell(true) - - - - dfs.ha.fencing.ssh.private-key-files - /root/.ssh/id_rsa - - - - dfs.ha.fencing.ssh.connect-timeout - 30000 - - - - dfs.ha.automatic-failover.enabled - true - - - dfs.datanode.max.transfer.threads - 8192 - - - - dfs.namenode.handler.count - 30 - - - - dfs.datanode.handler.count - 40 - - - - dfs.balance.bandwidthPerSec - 104857600 - - - - dfs.datanode.du.reserved - 5368709120 - - - - heartbeat.recheck.interval - 100000 - - - diff --git a/src/main/resources/yarn-site.xml b/src/main/resources/yarn-site.xml deleted file mode 100644 index 8a4b2fa..0000000 --- a/src/main/resources/yarn-site.xml +++ /dev/null @@ -1,196 +0,0 @@ - - - - - yarn.nodemanager.aux-services - mapreduce_shuffle - - - yarn.resourcemanager.ha.enabled - true - - - - yarn.resourcemanager.cluster-id - rmcluster - - - yarn.resourcemanager.ha.rm-ids - rsm1,rsm2 - - - - yarn.resourcemanager.hostname.rsm1 - 192.168.40.152 - - - - yarn.resourcemanager.address.rsm1 - 192.168.40.152:9916 - - - yarn.resourcemanager.scheduler.address.rsm1 - 192.168.40.152:9917 - - - yarn.resourcemanager.webapp.address.rsm1 - 192.168.40.152:9918 - - - yarn.resourcemanager.admin.address.rsm1 - 192.168.40.152:9919 - - - yarn.resourcemanager.resource-tracker.address.rsm1 - 192.168.40.152:9920 - - - yarn.resourcemanager.ha.admin.address.rsm1 - 192.168.40.152:23142 - - - - - yarn.resourcemanager.hostname.rsm2 - 192.168.40.203 - - - - yarn.resourcemanager.address.rsm2 - 192.168.40.203:9916 - - - yarn.resourcemanager.scheduler.address.rsm2 - 192.168.40.203:9917 - - - yarn.resourcemanager.webapp.address.rsm2 - 192.168.40.203:9918 - - - yarn.resourcemanager.admin.address.rsm2 - 192.168.40.203:9919 - - - yarn.resourcemanager.resource-tracker.address.rsm2 - 192.168.40.203:9920 - - - yarn.resourcemanager.ha.admin.address.rsm2 - 192.168.40.203:23142 - - - - yarn.resourcemanager.zk-address - 192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181 - - - - yarn.resourcemanager.recovery.enabled - true - - - - yarn.nodemanager.recovery.enabled - true - - - - yarn.nodemanager.recovery.dir - /home/tsg/olap/hadoop-2.7.1/yarn - - - - yarn.nodemanager.recovery.supervised - true - - - - yarn.nodemanager.address - ${yarn.nodemanager.hostname}:9923 - - - yarn.resourcemanager.store.class - org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore - - - yarn.nodemanager.resource.memory-mb - 30720 - - - yarn.scheduler.minimum-allocation-mb - 1024 - - - - yarn.scheduler.maximum-allocation-mb - 30720 - - - - yarn.log-aggregation-enable - true - - - yarn.nodemanager.heartbeat-interval-ms - 3000 - - - - yarn.log-aggregation.retain-seconds - 604800 - - - yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds - 3600 - - - yarn.nodemanager.remote-app-log-dir - /tmp/logs - - - yarn.nodemanager.resource.cpu-vcores - 14 - - - yarn.scheduler.minimum-allocation-vcores - 1 - - - yarn.scheduler.maximum-allocation-vcores - 14 - - - yarn.nodemanager.vmem-check-enabled - false - - - yarn.nodemanager.pmem-check-enabled - false - - - yarn.nodemanager.disk-health-checker.enable - false - - - - yarn.resourcemanager.am.max-attempts - 10000 - - - yarn.log.server.url - http://bigdata-151:19888/jobhistory/logs - - diff --git a/src/test/java/com/zdjizhi/common/HbaseTest.java b/src/test/java/com/zdjizhi/common/HbaseTest.java index f5e1961..3467cc3 100644 --- a/src/test/java/com/zdjizhi/common/HbaseTest.java +++ b/src/test/java/com/zdjizhi/common/HbaseTest.java @@ -19,12 +19,12 @@ public class HbaseTest { public static void main(String[] args) throws IOException { org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); + config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM); config.set("hbase.client.retries.number", "3"); config.set("hbase.bulkload.retries.number", "3"); config.set("zookeeper.recovery.retry", "3"); - config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); - config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT); + config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); TableName tableName = TableName.valueOf("dos_test"); Connection conn = ConnectionFactory.createConnection(config); diff --git a/src/test/java/com/zdjizhi/etl/DosDetectionTest.java b/src/test/java/com/zdjizhi/etl/DosDetectionTest.java index dcd3656..4428dc0 100644 --- a/src/test/java/com/zdjizhi/etl/DosDetectionTest.java +++ b/src/test/java/com/zdjizhi/etl/DosDetectionTest.java @@ -1,12 +1,12 @@ package com.zdjizhi.etl; +import com.geedgenetworks.utils.StringUtil; import com.zdjizhi.common.DosDetectionThreshold; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.utils.IpUtils; -import com.zdjizhi.utils.NacosUtils; + +import com.zdjizhi.utils.IpLookupUtils; import com.zdjizhi.utils.SnowflakeId; -import com.zdjizhi.utils.StringUtil; import org.apache.commons.lang3.StringUtils; import org.junit.Test; @@ -34,14 +34,14 @@ public class DosDetectionTest { serverIpList.add("192.168.50.1/24"); serverIpList.add("FC::12:0:0/54"); serverIpList.add("FC::12:0:0"); - dosDetectionThreshold.setProfileId(4437); - dosDetectionThreshold.setAttackType("DNS Flood"); - dosDetectionThreshold.setServerIpList(serverIpList); - dosDetectionThreshold.setSessionsPerSec(1); - dosDetectionThreshold.setPacketsPerSec(1); - dosDetectionThreshold.setBitsPerSec(100000); - dosDetectionThreshold.setIsValid(1); - dosDetectionThreshold.setSuperiorIds(new Integer[]{5,4,12,27}); + dosDetectionThreshold.setProfile_id(4437); + dosDetectionThreshold.setAttack_type("DNS Flood"); + dosDetectionThreshold.setServer_ip_list(serverIpList); + dosDetectionThreshold.setSessions_per_sec(1); + dosDetectionThreshold.setPackets_per_sec(1); + dosDetectionThreshold.setBits_per_sec(100000); + dosDetectionThreshold.setIs_valid(1); + dosDetectionThreshold.setSuperior_ids(new Integer[]{5,4,12,27}); DosSketchLog dosSketchLog = new DosSketchLog (); @@ -54,9 +54,9 @@ public class DosDetectionTest { dosSketchLog.setSource_ip("45.170.244.25"); dosSketchLog.setDestination_ip("24.152.57.56"); //静态阈值获取 - long sessionBase = dosDetectionThreshold.getSessionsPerSec(); - long pktBase=dosDetectionThreshold.getPacketsPerSec(); - long bitBase=dosDetectionThreshold.getBitsPerSec(); + long sessionBase = dosDetectionThreshold.getSessions_per_sec(); + long pktBase=dosDetectionThreshold.getPackets_per_sec(); + long bitBase=dosDetectionThreshold.getBits_per_sec(); //基于速率进行计算 long diffSession = dosSketchLog.getSketch_sessions() - sessionBase; long diffPkt = dosSketchLog.getSketch_packets() - pktBase; @@ -69,15 +69,15 @@ public class DosDetectionTest { long profileId = 0; DosEventLog result =null; if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){ - profileId = dosDetectionThreshold.getProfileId(); + profileId = dosDetectionThreshold.getProfile_id(); result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG); System.out.println(result); }else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){ - profileId = dosDetectionThreshold.getProfileId(); + profileId = dosDetectionThreshold.getProfile_id(); result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG); System.out.println(result); }else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){ - profileId = dosDetectionThreshold.getProfileId(); + profileId = dosDetectionThreshold.getProfile_id(); result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG); System.out.println(result); } @@ -174,7 +174,7 @@ public class DosDetectionTest { String[] ipArr = sourceIpList.split(","); HashSet countrySet = new HashSet<>(); for (String ip : ipArr) { - String country = IpUtils.ipLookup.countryLookup(ip); + String country = IpLookupUtils.getCountryLookup(ip); if (StringUtil.isNotBlank(country)){ countrySet.add(country); }